Skip to content

main.py

File: selenium/main.py

# monitor_lims.py
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json
from datetime import datetime

# Setup Chrome with options
options = Options()
# options.add_argument('--headless')  # Start with visible for SSO login
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--user-data-dir=./chrome-profile')  # Save cookies/session
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.set_capability('goog:loggingPrefs', {'performance': 'ALL', 'browser': 'ALL'})

driver = webdriver.Chrome(options=options)

# Enable network interception
driver.execute_cdp_cmd('Network.enable', {})
driver.execute_cdp_cmd('Network.setCacheDisabled', {'cacheDisabled': True})

# Storage for logs
network_logs = []
dom_changes = []

def setup_network_logging():
    """Intercept all network requests/responses"""
    def process_browser_logs():
        logs = driver.get_log('performance')
        for log in logs:
            message = json.loads(log['message'])['message']

            if message['method'] == 'Network.responseReceived':
                response = message['params']['response']
                network_logs.append({
                    'timestamp': datetime.now().isoformat(),
                    'type': 'response',
                    'url': response['url'],
                    'status': response['status'],
                    'method': response.get('requestMethod', 'GET'),
                    'headers': response.get('headers', {})
                })

            elif message['method'] == 'Network.requestWillBeSent':
                request = message['params']['request']
                network_logs.append({
                    'timestamp': datetime.now().isoformat(),
                    'type': 'request',
                    'url': request['url'],
                    'method': request['method'],
                    'headers': request.get('headers', {}),
                    'postData': request.get('postData', None)
                })

        return logs

    return process_browser_logs

def save_logs_to_file():
    """Save accumulated logs to JSON file"""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    with open(f'lims_logs_{timestamp}.json', 'w') as f:
        json.dump({
            'network': network_logs[-1000:],  # Last 1000 entries
            'dom_changes': dom_changes[-1000:]
        }, f, indent=2)

    print(f"Logs saved to lims_logs_{timestamp}.json")

def login_via_sso(lims_url, sso_username=None, sso_password=None):
    """
    Handle SSO login

    If credentials provided: attempt auto-login
    If not: wait for manual login, then save session
    """
    driver.get(lims_url)

    # Check if already logged in (from saved session)
    try:
        # Wait to see if we land on LIMS dashboard (adjust selector)
        WebDriverWait(driver, 5).until(
            EC.presence_of_element_located((By.ID, "dashboard"))  # Change to actual element
        )
        print("Already logged in via saved session")
        return True
    except:
        print("Not logged in, proceeding with SSO...")

    # If SSO credentials provided, attempt auto-login
    if sso_username and sso_password:
        try:
            # Wait for SSO login page (adjust selectors to your SSO provider)
            username_field = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.ID, "username"))  # Change to actual
            )
            username_field.send_keys(sso_username)

            password_field = driver.find_element(By.ID, "password")  # Change to actual
            password_field.send_keys(sso_password)

            login_button = driver.find_element(By.ID, "submit")  # Change to actual
            login_button.click()

            # Wait for redirect back to LIMS
            WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.ID, "dashboard"))  # Change to actual
            )

            print("SSO login successful")
            return True

        except Exception as e:
            print(f"Auto-login failed: {e}")
            print("Falling back to manual login...")

    # Manual login fallback
    print("=" * 50)
    print("MANUAL LOGIN REQUIRED")
    print("Please complete SSO login in the browser window...")
    print("Waiting for login completion...")
    print("=" * 50)

    # Wait for successful login (check for dashboard or specific element)
    try:
        WebDriverWait(driver, 300).until(  # 5 minute timeout
            EC.presence_of_element_located((By.ID, "dashboard"))  # Change to actual element
        )
        print("Login detected! Session saved for future runs.")
        return True
    except:
        print("Login timeout. Exiting.")
        return False

def skip_login(url):
    """Just load URL without any login"""
    driver.get(url)
    print(f"Loaded {url} - no login required")
    return True

def inject_dom_monitor():
    """Inject MutationObserver to track DOM changes"""
    script = """
    if (!window.__domMonitorInstalled) {
        window.__domMonitorInstalled = true;
        window.__domChanges = [];

        const observer = new MutationObserver((mutations) => {
            mutations.forEach(mutation => {
                // Only log significant changes (filter noise)
                if (mutation.target.id?.includes('sample') ||
                    mutation.target.className?.includes('result') ||
                    mutation.target.className?.includes('status')) {

                    window.__domChanges.push({
                        timestamp: Date.now(),
                        type: mutation.type,
                        target: {
                            tag: mutation.target.tagName,
                            id: mutation.target.id,
                            class: mutation.target.className,
                            text: mutation.target.textContent?.substring(0, 100)
                        },
                        addedNodes: mutation.addedNodes.length,
                        removedNodes: mutation.removedNodes.length
                    });

                    // Keep last 1000 changes
                    if (window.__domChanges.length > 1000) {
                        window.__domChanges.shift();
                    }
                }
            });
        });

        observer.observe(document.body, {
            childList: true,
            subtree: true,
            attributes: true,
            attributeFilter: ['class', 'data-status', 'value']
        });

        console.log('DOM monitor installed');
    }
    """
    driver.execute_script(script)

def collect_dom_changes():
    """Retrieve DOM changes from injected script"""
    try:
        changes = driver.execute_script("return window.__domChanges || [];")
        if changes:
            dom_changes.extend(changes)
            # Clear the buffer in browser
            driver.execute_script("window.__domChanges = [];")
            print(f"Collected {len(changes)} DOM changes")
    except Exception as e:
        print(f"Error collecting DOM changes: {e}")

def keep_session_alive():
    """Prevent session timeout by simulating activity"""
    try:
        driver.execute_script("""
            // Dispatch mouse movement
            document.dispatchEvent(new MouseEvent('mousemove', {
                bubbles: true,
                cancelable: true,
                view: window
            }));
        """)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Session keepalive triggered")
    except Exception as e:
        print(f"Keepalive error: {e}")

def search_logs(sample_id):
    """Search collected logs for specific sample ID"""
    results = {
        'network': [],
        'dom': []
    }

    # Search network logs
    for log in network_logs:
        log_str = json.dumps(log).lower()
        if sample_id.lower() in log_str:
            results['network'].append(log)

    # Search DOM changes
    for change in dom_changes:
        change_str = json.dumps(change).lower()
        if sample_id.lower() in change_str:
            results['dom'].append(change)

    return results

# ============================================
# MAIN MONITORING LOOP
# ============================================

def main():
    LIMS_URL = 'https://google.com'

    # SSO credentials (optional - leave None for manual login)
    SSO_USERNAME = None  # or 'your.email@company.com'
    SSO_PASSWORD = None  # or 'your-password'

    print("Starting LIMS Monitor...")

    # Setup network logging
    process_logs = setup_network_logging()


    # Ask user if login needed
    needs_login = input("Does this app require login? (y/n): ").strip().lower()

    if needs_login == 'y':
        if not login_via_sso(LIMS_URL, SSO_USERNAME, SSO_PASSWORD):
            print("Login failed. Exiting.")
            driver.quit()
            return
    else:
        skip_login(LIMS_URL)

    # Inject DOM monitoring
    inject_dom_monitor()

    print("\n" + "="*50)
    print("MONITORING ACTIVE")
    print("="*50)
    print("Press Ctrl+C to stop and save logs\n")

    save_interval = 300  # Save logs every 5 minutes
    keepalive_interval = 60  # Keep session alive every 1 minute

    last_save = time.time()
    last_keepalive = time.time()

    try:
        while True:
            current_time = time.time()

            # Process network logs
            process_logs()

            # Collect DOM changes
            collect_dom_changes()

            # Keep session alive
            if current_time - last_keepalive > keepalive_interval:
                keep_session_alive()
                last_keepalive = current_time

            # Periodic save
            if current_time - last_save > save_interval:
                save_logs_to_file()
                print(f"Total logs: {len(network_logs)} network, {len(dom_changes)} DOM")
                last_save = current_time

            time.sleep(1)  # Check every second

    except KeyboardInterrupt:
        print("\n\nStopping monitor...")
        save_logs_to_file()

        # Offer search before exit
        while True:
            sample_id = input("\nSearch for sample ID (or 'quit'): ").strip()
            if sample_id.lower() == 'quit':
                break

            results = search_logs(sample_id)
            print(f"\nFound {len(results['network'])} network logs")
            print(f"Found {len(results['dom'])} DOM changes")

            if results['network'] or results['dom']:
                filename = f"search_{sample_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
                with open(filename, 'w') as f:
                    json.dump(results, f, indent=2)
                print(f"Results saved to {filename}")

    finally:
        driver.quit()
        print("Monitor stopped.")

if __name__ == '__main__':
    main()