main.py¶
File:
selenium/main.py
# monitor_lims.py
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json
from datetime import datetime
# Setup Chrome with options
options = Options()
# options.add_argument('--headless') # Start with visible for SSO login
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--user-data-dir=./chrome-profile') # Save cookies/session
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.set_capability('goog:loggingPrefs', {'performance': 'ALL', 'browser': 'ALL'})
driver = webdriver.Chrome(options=options)
# Enable network interception
driver.execute_cdp_cmd('Network.enable', {})
driver.execute_cdp_cmd('Network.setCacheDisabled', {'cacheDisabled': True})
# Storage for logs
network_logs = []
dom_changes = []
def setup_network_logging():
"""Intercept all network requests/responses"""
def process_browser_logs():
logs = driver.get_log('performance')
for log in logs:
message = json.loads(log['message'])['message']
if message['method'] == 'Network.responseReceived':
response = message['params']['response']
network_logs.append({
'timestamp': datetime.now().isoformat(),
'type': 'response',
'url': response['url'],
'status': response['status'],
'method': response.get('requestMethod', 'GET'),
'headers': response.get('headers', {})
})
elif message['method'] == 'Network.requestWillBeSent':
request = message['params']['request']
network_logs.append({
'timestamp': datetime.now().isoformat(),
'type': 'request',
'url': request['url'],
'method': request['method'],
'headers': request.get('headers', {}),
'postData': request.get('postData', None)
})
return logs
return process_browser_logs
def save_logs_to_file():
"""Save accumulated logs to JSON file"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
with open(f'lims_logs_{timestamp}.json', 'w') as f:
json.dump({
'network': network_logs[-1000:], # Last 1000 entries
'dom_changes': dom_changes[-1000:]
}, f, indent=2)
print(f"Logs saved to lims_logs_{timestamp}.json")
def login_via_sso(lims_url, sso_username=None, sso_password=None):
"""
Handle SSO login
If credentials provided: attempt auto-login
If not: wait for manual login, then save session
"""
driver.get(lims_url)
# Check if already logged in (from saved session)
try:
# Wait to see if we land on LIMS dashboard (adjust selector)
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.ID, "dashboard")) # Change to actual element
)
print("Already logged in via saved session")
return True
except:
print("Not logged in, proceeding with SSO...")
# If SSO credentials provided, attempt auto-login
if sso_username and sso_password:
try:
# Wait for SSO login page (adjust selectors to your SSO provider)
username_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "username")) # Change to actual
)
username_field.send_keys(sso_username)
password_field = driver.find_element(By.ID, "password") # Change to actual
password_field.send_keys(sso_password)
login_button = driver.find_element(By.ID, "submit") # Change to actual
login_button.click()
# Wait for redirect back to LIMS
WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.ID, "dashboard")) # Change to actual
)
print("SSO login successful")
return True
except Exception as e:
print(f"Auto-login failed: {e}")
print("Falling back to manual login...")
# Manual login fallback
print("=" * 50)
print("MANUAL LOGIN REQUIRED")
print("Please complete SSO login in the browser window...")
print("Waiting for login completion...")
print("=" * 50)
# Wait for successful login (check for dashboard or specific element)
try:
WebDriverWait(driver, 300).until( # 5 minute timeout
EC.presence_of_element_located((By.ID, "dashboard")) # Change to actual element
)
print("Login detected! Session saved for future runs.")
return True
except:
print("Login timeout. Exiting.")
return False
def skip_login(url):
"""Just load URL without any login"""
driver.get(url)
print(f"Loaded {url} - no login required")
return True
def inject_dom_monitor():
"""Inject MutationObserver to track DOM changes"""
script = """
if (!window.__domMonitorInstalled) {
window.__domMonitorInstalled = true;
window.__domChanges = [];
const observer = new MutationObserver((mutations) => {
mutations.forEach(mutation => {
// Only log significant changes (filter noise)
if (mutation.target.id?.includes('sample') ||
mutation.target.className?.includes('result') ||
mutation.target.className?.includes('status')) {
window.__domChanges.push({
timestamp: Date.now(),
type: mutation.type,
target: {
tag: mutation.target.tagName,
id: mutation.target.id,
class: mutation.target.className,
text: mutation.target.textContent?.substring(0, 100)
},
addedNodes: mutation.addedNodes.length,
removedNodes: mutation.removedNodes.length
});
// Keep last 1000 changes
if (window.__domChanges.length > 1000) {
window.__domChanges.shift();
}
}
});
});
observer.observe(document.body, {
childList: true,
subtree: true,
attributes: true,
attributeFilter: ['class', 'data-status', 'value']
});
console.log('DOM monitor installed');
}
"""
driver.execute_script(script)
def collect_dom_changes():
"""Retrieve DOM changes from injected script"""
try:
changes = driver.execute_script("return window.__domChanges || [];")
if changes:
dom_changes.extend(changes)
# Clear the buffer in browser
driver.execute_script("window.__domChanges = [];")
print(f"Collected {len(changes)} DOM changes")
except Exception as e:
print(f"Error collecting DOM changes: {e}")
def keep_session_alive():
"""Prevent session timeout by simulating activity"""
try:
driver.execute_script("""
// Dispatch mouse movement
document.dispatchEvent(new MouseEvent('mousemove', {
bubbles: true,
cancelable: true,
view: window
}));
""")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Session keepalive triggered")
except Exception as e:
print(f"Keepalive error: {e}")
def search_logs(sample_id):
"""Search collected logs for specific sample ID"""
results = {
'network': [],
'dom': []
}
# Search network logs
for log in network_logs:
log_str = json.dumps(log).lower()
if sample_id.lower() in log_str:
results['network'].append(log)
# Search DOM changes
for change in dom_changes:
change_str = json.dumps(change).lower()
if sample_id.lower() in change_str:
results['dom'].append(change)
return results
# ============================================
# MAIN MONITORING LOOP
# ============================================
def main():
LIMS_URL = 'https://google.com'
# SSO credentials (optional - leave None for manual login)
SSO_USERNAME = None # or 'your.email@company.com'
SSO_PASSWORD = None # or 'your-password'
print("Starting LIMS Monitor...")
# Setup network logging
process_logs = setup_network_logging()
# Ask user if login needed
needs_login = input("Does this app require login? (y/n): ").strip().lower()
if needs_login == 'y':
if not login_via_sso(LIMS_URL, SSO_USERNAME, SSO_PASSWORD):
print("Login failed. Exiting.")
driver.quit()
return
else:
skip_login(LIMS_URL)
# Inject DOM monitoring
inject_dom_monitor()
print("\n" + "="*50)
print("MONITORING ACTIVE")
print("="*50)
print("Press Ctrl+C to stop and save logs\n")
save_interval = 300 # Save logs every 5 minutes
keepalive_interval = 60 # Keep session alive every 1 minute
last_save = time.time()
last_keepalive = time.time()
try:
while True:
current_time = time.time()
# Process network logs
process_logs()
# Collect DOM changes
collect_dom_changes()
# Keep session alive
if current_time - last_keepalive > keepalive_interval:
keep_session_alive()
last_keepalive = current_time
# Periodic save
if current_time - last_save > save_interval:
save_logs_to_file()
print(f"Total logs: {len(network_logs)} network, {len(dom_changes)} DOM")
last_save = current_time
time.sleep(1) # Check every second
except KeyboardInterrupt:
print("\n\nStopping monitor...")
save_logs_to_file()
# Offer search before exit
while True:
sample_id = input("\nSearch for sample ID (or 'quit'): ").strip()
if sample_id.lower() == 'quit':
break
results = search_logs(sample_id)
print(f"\nFound {len(results['network'])} network logs")
print(f"Found {len(results['dom'])} DOM changes")
if results['network'] or results['dom']:
filename = f"search_{sample_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
print(f"Results saved to {filename}")
finally:
driver.quit()
print("Monitor stopped.")
if __name__ == '__main__':
main()