From c5bf7f5af73d0297c8460f07939774ac9c778c1f Mon Sep 17 00:00:00 2001 From: jasonwcfan Date: Sun, 13 Oct 2024 12:19:00 -0400 Subject: [PATCH] UI for finic copilot --- python_library/finic_py/cli.py | 67 ++-- python_library/finic_py/copilot.py | 475 +++++++++++++---------------- python_library/finic_py/utils.py | 31 -- 3 files changed, 250 insertions(+), 323 deletions(-) delete mode 100644 python_library/finic_py/utils.py diff --git a/python_library/finic_py/cli.py b/python_library/finic_py/cli.py index 1b2ad98a..c9a55700 100644 --- a/python_library/finic_py/cli.py +++ b/python_library/finic_py/cli.py @@ -7,11 +7,11 @@ from .finic import Finic import subprocess import argparse -from .capture import capture +from .copilot import copilot load_dotenv() -def check_api_key(): +def get_api_key() -> str: # Check if API key exists env_path = os.path.join(os.getcwd(), '.env') @@ -61,22 +61,31 @@ def zip_files_cli(zip_file): print(f"Error occurred during zipping: {e}") -def create_finic_app(argv=sys.argv): - if len(argv) < 2: - print("Please specify the project directory:\n create-finic-app ") +def finic_init(): + current_directory = os.getcwd() + new_directory_path = os.path.join(current_directory, "finic_tasks") + + # Check if the directory already exists + if os.path.exists(new_directory_path): + print("Error: The directory 'finic_tasks' already exists.") return - directory_name = argv[1] + + # Create the new directory + os.makedirs(new_directory_path) + + # Change to the new directory + os.chdir(new_directory_path) os.system( - f"git clone https://github.com/finic-ai/create-finic-app {directory_name}" + f"git clone --depth 1 https://github.com/finic-ai/finic-tasks-boilerplate . && rm -rf .git" ) print( - f"Finic app created successfully. cd into /{directory_name} and run `poetry install` to install dependencies" + f"The /finic_tasks directory has been created.\ncd into it and run `poetry install` to install dependencies, then `finic copilot` to create your first task." ) def deploy(): - api_key = check_api_key() + api_key = get_api_key() # Check if finic_config.json exists if not os.path.exists("finic_config.json"): @@ -126,9 +135,6 @@ def main(): parser = argparse.ArgumentParser(description="CLI for Finic's python library.") subparsers = parser.add_subparsers(dest='command', required=True) - anthropic_api_key = os.getenv('ANTHROPIC_API_KEY') - openai_api_key = os.getenv('OPENAI_API_KEY') - # Deploy command deploy_parser = subparsers.add_parser( 'deploy', @@ -151,46 +157,35 @@ def main(): help='A Finic API key', required=True ) + + # Initialize Finic directory command + init_parser = subparsers.add_parser( + 'init', + help='Initialize a Finic directory' + ) # Generate selectors command generate_parser = subparsers.add_parser( - 'capture', - help='Opens a browser to capture a workflow' - ) - generate_parser.add_argument( - '--api-key', - help='An API key for OpenAI or Anthropic must be provided to generate selectors', - required=(openai_api_key is None and anthropic_api_key is None) + 'copilot', + help='Start the Finic copilot' ) generate_parser.add_argument( '--url', help='The URL of the starting page of the workflow', required=True ) - generate_parser.add_argument( - '--llm-provider', - help='The LLM provider to use for generating selectors', - choices=['openai', 'anthropic'], default='anthropic' - ) args = parser.parse_args() - if args.api_key: - llm_provider_api_key = args.api_key - elif anthropic_api_key: - llm_provider_api_key = anthropic_api_key - elif openai_api_key: - llm_provider_api_key = openai_api_key + finic_api_key = get_api_key() if args.command == 'deploy': deploy() - elif args.command == 'connect': - pass - # finic = Finic(api_key=args.api_key) - # finic.launch_browser_sync(cdp_url=args.cdp_url) - elif args.command == 'capture': - asyncio.run(capture(args.llm_provider.lower(), llm_provider_api_key, args.url)) + elif args.command == 'init': + finic_init() + elif args.command == 'copilot': + asyncio.run(copilot(args.url, finic_api_key)) if __name__ == "__main__": main() \ No newline at end of file diff --git a/python_library/finic_py/copilot.py b/python_library/finic_py/copilot.py index 3894806d..1bf3a132 100644 --- a/python_library/finic_py/copilot.py +++ b/python_library/finic_py/copilot.py @@ -4,26 +4,45 @@ from typing import List, Dict, Any, Optional from bs4 import BeautifulSoup, Tag import shutil -from pydantic import BaseModel -from baml_client import b as baml -from baml_client.types import Element, Step, Plan, PlanWithSelectors -from baml_py import ClientRegistry, Image +from pydantic import BaseModel, field_validator import uuid import json import base64 from typing import Literal -from .utils import generate_selector import requests +import platform +import os +import sys +class NodeDetails(BaseModel): + selector: Optional[str] = None + nodeId: int + backendNodeId: int + nodeType: int + nodeName: str + localName: str + nodeValue: str + childNodeCount: Optional[int] = 0 + attributes: List[Dict[str, str]] = [] + textContent: Optional[str] = None + outerHTML: Optional[str] = None -client_registry = ClientRegistry() + @field_validator('attributes', mode='before') + def parse_attributes(cls, v): + if isinstance(v, list): + if all(isinstance(item, str) for item in v): + return [{'name': v[i], 'value': v[i+1]} for i in range(0, len(v), 2) if i+1 < len(v)] + elif all(isinstance(item, dict) for item in v): + return v + raise ValueError('attributes must be a list of strings or dictionaries') -class NodeInfo(BaseModel): - tagName: str - backend_id: int - user_assigned_id: Optional[str] = str(uuid.uuid4()) - className: str - textContent: str - outerHTML: str +def generate_selector(node: NodeDetails): + pass + +async def enable_inspection(cdp_session: CDPSession): + await cdp_session.send('Overlay.setInspectMode', {'mode': 'searchForNode', 'highlightConfig': {'showInfo': True, 'showExtensionLines': True, 'contentColor': {'r': 255, 'g': 81, 'b': 6, 'a': 0.2}}}) + +async def disable_inspection(cdp_session: CDPSession): + await cdp_session.send('Overlay.setInspectMode', {'mode': 'none', 'highlightConfig': {}}) def print_welcome_message(): # Clear the terminal @@ -39,298 +58,242 @@ def print_welcome_message(): print("\033[1m\033[38;2;255;165;0mGenerate code to automate any website using AI.\033[0m\n") print("Type 'help' for a list of commands. View full instructions here: https://docs.finic.ai/capture-mode") -def print_command_prompt(current_step: Optional[Step] = None): - print(f"\033[1m\033[38;2;255;165;0mCurrent step:\033[0m {current_step.description}") - print("Please select the element for this step in the browser") - -async def handle_add_node(instructions: str, step_index: int, current_node: List[NodeInfo], cdp_session: CDPSession, page: Page) -> Step: - target_node = await cdp_session.send("DOM.resolveNode", {"backendNodeId": current_node[0].backend_id}) - target_object_id = target_node["object"]["objectId"] - - target_element = await cdp_session.send("Runtime.callFunctionOn", { - "functionDeclaration": """ - function() { - return { - tagName: this.tagName, - id: this.id, - className: this.className, - attributes: Array.from(this.attributes).map(attr => ({name: attr.name, value: attr.value})), - textContent: this.textContent - }; - } - """, - "objectId": target_object_id, - "returnByValue": True - }) +async def handle_process_node(finic_api_key: str, task_name: str, intent: str, selected_node: List[NodeDetails], cdp_session: CDPSession, page: Page) -> None: + # Generate a selector for the selected node + selector = generate_selector(selected_node[0]) + selected_node[0].selector = selector + generated_code = None - target = Element( - tagName=target_element["result"]["value"]["tagName"], - textContent=target_element["result"]["value"]["textContent"], - attributes=target_element["result"]["value"]["attributes"] - ) - - next_sibling_element = await cdp_session.send("Runtime.callFunctionOn", { - "functionDeclaration": """ - function() { - if (this.nextElementSibling) { - return { - tagName: this.nextElementSibling.tagName, - id: this.nextElementSibling.id, - className: this.nextElementSibling.className, - attributes: Array.from(this.nextElementSibling.attributes).map(attr => ({name: attr.name, value: attr.value})), - textContent: this.nextElementSibling.textContent - }; - } else { - return null; - } - } - """, - "objectId": target_object_id, - "returnByValue": True - }) + task_file_path = f"finic_tasks/{task_name}.py" + with open(task_file_path, 'r') as f: + existing_code = f.read() - previous_sibling_element = await cdp_session.send("Runtime.callFunctionOn", { - "functionDeclaration": """ - function() { - if (this.previousElementSibling) { - return { - tagName: this.previousElementSibling.tagName, - id: this.previousElementSibling.id, - className: this.previousElementSibling.className, - attributes: Array.from(this.previousElementSibling.attributes).map(attr => ({name: attr.name, value: attr.value})), - textContent: this.previousElementSibling.textContent - }; - } else { - return null; - } + # Make a GET request to the Finic server to get the code + try: + headers = { + "Authorization": f"Bearer {finic_api_key}" # Assuming api_key is available in this scope } - """, - "objectId": target_object_id, - "returnByValue": True - }) - - ancestors = await cdp_session.send("Runtime.callFunctionOn", { - "functionDeclaration": """ - function() { - let ancestors = []; - let currentNode = this; - while (currentNode && currentNode.tagName !== 'BODY') { - currentNode = currentNode.parentNode; - if (currentNode) { - ancestors.push({ - tagName: currentNode.tagName, - id: currentNode.id, - className: currentNode.className, - innerHTML: currentNode.innerHTML, - attributes: Array.from(currentNode.attributes).map(attr => ({name: attr.name, value: attr.value})) - }); - } - } - return ancestors; + body = { + "intent": intent, + "element": selected_node[0], + "existing_code": existing_code } - """, - "objectId": target_object_id, - "returnByValue": True - }) - next_sibling = None - if next_sibling_element["result"]["value"] is not None: - next_sibling = Element( - tagName=next_sibling_element["result"]["value"]["tagName"], - textContent=next_sibling_element["result"]["value"]["textContent"], - attributes=next_sibling_element["result"]["value"]["attributes"] - ) - previous_sibling = None - if previous_sibling_element["result"]["value"] is not None: - previous_sibling = Element( - tagName=previous_sibling_element["result"]["value"]["tagName"], - textContent=previous_sibling_element["result"]["value"]["textContent"], - attributes=previous_sibling_element["result"]["value"]["attributes"] - ) - - trimmed_ancestors = "" - for i, ancestor in enumerate(ancestors["result"]["value"][:10]): - if ancestor["tagName"] == "BODY": - continue - indent = " " * i - attributes = " ".join([f'{attr["name"]}="{attr["value"]}"' for attr in ancestor["attributes"]]) - trimmed_ancestors += f"{indent}<{ancestor['tagName'].lower()} {attributes}>\n" - trimmed_ancestors = trimmed_ancestors.rstrip() - import pdb; pdb.set_trace() - selector_styles = baml.DetermineSelectorStyle(instructions=instructions, target=target, next_sibling=next_sibling, previous_sibling=previous_sibling, ancestors=trimmed_ancestors, baml_options={"client_registry": client_registry}) - if "ID_BASED" in selector_styles and "id" not in target.attributes: - selector_styles.remove("ID_BASED") - if "CLASS_BASED" in selector_styles and "class" not in target.attributes: - selector_styles.remove("CLASS_BASED") - if "INNER_TEXT_BASED" in selector_styles and (target.textContent == "" or target.textContent is None): - selector_styles.remove("INNER_TEXT_BASED") - if "SIBLING_BASED" in selector_styles and next_sibling is None and previous_sibling is None: - selector_styles.remove("SIBLING_BASED") - selectors = baml.GenerateXPathSelector( - instructions=instructions, - target=target, - next_sibling=next_sibling, - previous_sibling=previous_sibling, - ancestors=trimmed_ancestors if "PATH_BASED" in selector_styles else None, - selector_styles=selector_styles, - baml_options={"client_registry": client_registry} - ) - - chosen_selector = None - # Test the selectors - for selector in selectors: - element = await page.query_selector(selector) - if element is not None: - chosen_selector = selector - break + response = requests.post("https://api.finic.ai/copilot", headers=headers, json=body) + response.raise_for_status() # Raise an exception for bad status codes + generated_code = response.json()["code"] + # You can process the copilot_data here as needed + except requests.RequestException as e: + print(f"Error making request to Finic AI Copilot API: {e}") + + # Write the generated code to the appropriate file + with open(task_file_path, 'w') as f: + f.write(existing_code + generated_code) - if chosen_selector is None: - print("No generated selectors were valid. Please enter a selector manually:") - chosen_selector = input() - print(f"Element processed. Choose another element or type 'g|generate' to generate selectors for queued elements:") - return Step(step_index=step_index, selector=chosen_selector, instructions=instructions) -def print_element_to_terminal(result: NodeInfo): +def print_element_to_terminal(element: NodeDetails): terminal_width, _ = shutil.get_terminal_size() box_width = min(80, terminal_width - 2) # Max 80, or 2 less than terminal width + text_content = element.textContent[:box_width-4] if element.textContent else "" # Truncate to fit + attributes = ' '.join([f"{attribute['name']}=\"{attribute['value']}\"" + for attribute in element.attributes]) # Print the box print("\033[1m\033[94m┌─" + "─" * (box_width - 2) + "┐\033[0m") print(f"\033[1m\033[94m│\033[0m {'Currently selected element':^{box_width-2}}\033[1m\033[94m│\033[0m") print("\033[1m\033[94m├─" + "─" * (box_width - 2) + "┤\033[0m") - print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mTag:\033[0m {result.tagName:<{box_width-7}}\033[1m\033[94m│\033[0m") - print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mID:\033[0m {result.backend_id:<{box_width-6}}\033[1m\033[94m│\033[0m") - print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mClass:\033[0m {result.className:<{box_width-9}}\033[1m\033[94m│\033[0m") + print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mTag:\033[0m {element.localName:<{box_width-7}}\033[1m\033[94m│\033[0m") + print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mID:\033[0m {element.backendNodeId:<{box_width-6}}\033[1m\033[94m│\033[0m") + print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mAttributes:\033[0m{' ':{box_width-14}}\033[1m\033[94m│\033[0m") + + # Print attributes, wrapping if necessary + attr_lines = [attributes[i:i+box_width-4] for i in range(0, len(attributes), box_width-4)] + for line in attr_lines: + print(f"\033[1m\033[94m│\033[0m {line:<{box_width-2}}\033[1m\033[94m│\033[0m") + print(f"\033[1m\033[94m│\033[0m \033[1m\033[92mText content:\033[0m{' ':{box_width-15}}\033[1m\033[94m│\033[0m") - text_content = result.textContent[:box_width-4] # Truncate to fit print(f"\033[1m\033[94m│\033[0m {text_content:<{box_width-2}}\033[1m\033[94m│\033[0m") print("\033[1m\033[94m└─" + "─" * (box_width - 2) + "┘\033[0m") -async def generate_procedure( - procedure_name: str, - procedure_steps: List[Step], - llm_provider: Literal["openai", "anthropic"], - provider_api_key: str, - cdp_session: CDPSession, - node_queue: List[NodeInfo]): - - procedure_code = await baml.GeneratePlaywrightCode(procedure_name, procedure_steps) - procedure_filename = f"procedures/{procedure_name}.py" - with open(procedure_filename, "w") as f: - f.write(procedure_code) - - print(f"\nProcedure code has been written to {procedure_filename}") +async def cycle_elements_up(cdp_session: CDPSession, selected_node: List[NodeDetails], dom_snapshot: List[Dict[str, Any]]) -> None: + node_tree = dom_snapshot["documents"][0]["nodes"] + selected_node_index = node_tree["backendNodeId"].index(selected_node[0].backendNodeId) + parent_index = node_tree["parentIndex"][selected_node_index] + + if parent_index: + parent_backend_id = node_tree["backendNodeId"][parent_index] + await handle_inspect_node(cdp_session, selected_node, backend_node_id=parent_backend_id) -async def handle_inspect_node(cdp_session: CDPSession, page: Page, event: Dict[str, Any], selected_node: List[NodeInfo], current_step: Optional[Step] = None): - backend_node_id = event["backendNodeId"] +async def cycle_elements_down(cdp_session: CDPSession, selected_node: List[NodeDetails], dom_snapshot: List[Dict[str, Any]]) -> None: + node_tree = dom_snapshot["documents"][0]["nodes"] + selected_node_index = node_tree["backendNodeId"].index(selected_node[0].backendNodeId) + try: + child_index = node_tree["parentIndex"].index(selected_node_index) + child_backend_id = node_tree["backendNodeId"][child_index] + await handle_inspect_node(cdp_session, selected_node, backend_node_id=child_backend_id) + except ValueError: + return +async def handle_inspect_node(cdp_session: CDPSession, selected_node: List[NodeDetails], node_id: Optional[int] = None, backend_node_id: Optional[int] = None): + if not node_id and not backend_node_id: + raise ValueError("Either node_id or backend_node_id must be provided") # Request the document so DOM.pushNodesByBackendIdsToFrontend works document = await cdp_session.send("DOM.getDocument") - - # Find the node in the DOM - await cdp_session.send("DOM.pushNodesByBackendIdsToFrontend", {"backendNodeIds": [backend_node_id]}) - node_details = await cdp_session.send("DOM.describeNode", {"backendNodeId": backend_node_id}) - node_id = node_details["node"]["nodeId"] - await cdp_session.send("DOM.setInspectedNode", {"nodeId": node_id}) - # Navigate to the node in the devtools elements inspector - await cdp_session.send('DOM.setInspectedNode', { - 'nodeId': node_id - }) + if backend_node_id and not node_id: + # Find the node in the DOM + await cdp_session.send("DOM.pushNodesByBackendIdsToFrontend", {"backendNodeIds": [backend_node_id]}) + node_details = await cdp_session.send("DOM.describeNode", {"backendNodeId": backend_node_id}) + node_id = node_details["node"]["nodeId"] outer_html = await cdp_session.send("DOM.getOuterHTML", {"nodeId": node_id}) - result = NodeInfo( - tagName=node_details["node"]["localName"], - backend_id=backend_node_id, - className=" ".join([value for attr, value in zip(node_details["node"].get("attributes", [])[::2], node_details["node"].get("attributes", [])[1::2]) if attr == "class"]), - textContent=BeautifulSoup(outer_html["outerHTML"], 'html.parser').get_text(strip=True), - outerHTML=outer_html["outerHTML"] - ) + + await cdp_session.send("DOM.setInspectedNode", {"nodeId": node_id}) + await cdp_session.send("DOM.highlightNode", { + "highlightConfig": { + "showInfo": True, + "showExtensionLines": True, + "containerColor": {"r": 0, "g": 255, "b": 0, "a": 0.3}, + "contentColor": {"r": 0, "g": 255, "b": 0, "a": 0.3} + }, + "nodeId": node_id + }) + result = NodeDetails(**node_details["node"]) + result.outerHTML = outer_html["outerHTML"] + result.textContent = BeautifulSoup(outer_html["outerHTML"], 'html.parser').get_text(strip=True) selected_node[0] = result print_welcome_message() print_element_to_terminal(result) - print_command_prompt(current_step) + print("Describe the action to be taken on this element: ", end="", flush=True) -def execute_plan(plan: List[Step], cdp_session: CDPSession, page: Page): - pass +def create_task_file(task_name: str): + # Create the finic_tasks directory if it doesn't exist + + # Check if the finic_tasks directory exists in the current directory + current_directory = os.getcwd() + finic_tasks_path = os.path.join(current_directory, 'finic_tasks') + finic_config_path = os.path.join(current_directory, 'finic_config.yaml') + + if os.path.exists(finic_tasks_path) and os.path.exists(finic_config_path): + task_file_path = os.path.join(finic_tasks_path, f"{task_name}.py") + if not os.path.exists(task_file_path): + with open(task_file_path, 'w') as f: + f.write(f"""from finic import Finic +from playwright.sync_api import Page + +def main(page: Page, finic: Finic): +""") + else: + print("This is not a Finic project. Run `finic init` to initialize Finic.") + sys.exit(0) + +async def copilot(url: str, finic_api_key: str): + print_welcome_message() + task_name = input("\n\nGive your task a unique name (e.g. 'automate_tax_website'): ") + # Replace spaces and dashes with underscores in task_name + task_name = task_name.replace(' ', '_').replace('-', '_') + task_file_path = create_task_file(task_name) + print(f"Created task file: {task_file_path}") + + inspection_mode = True + selected_node: List[NodeDetails] = [None] + dom_snapshot = None -def request_elements(plan: List[Step], cdp_session: CDPSession, page: Page, selected_node: List[NodeInfo]): - import pdb; pdb.set_trace() - selected_node: List[NodeInfo] = [] - for step in plan: - cdp_session.on("Overlay.inspectNodeRequested", lambda event: handle_inspect_node(cdp_session, page, event, selected_node, step)) - input() - step.selector = generate_selector(selected_node[0]) - # Unregister the Overlay.inspectNodeRequested listener - cdp_session.off("Overlay.inspectNodeRequested") - -async def copilot(llm_provider: Literal["openai", "anthropic"], provider_api_key: str, url: str): playwright = await async_playwright().start() - browser = await playwright.chromium.launch(headless=False) + browser = await playwright.chromium.launch(headless=False, devtools=True) page = await browser.new_page() - # Enable CDP Session + ### SET UP CDP AND EVENT LISTENERS ### cdp_session = await page.context.new_cdp_session(page) await cdp_session.send('DOM.enable') await cdp_session.send('Overlay.enable') + await cdp_session.send('Runtime.enable') + await cdp_session.send('DOMSnapshot.enable') - # Enable inspect mode - interaction_mode = True await cdp_session.send('Overlay.setInspectMode', { 'mode': 'searchForNode', 'highlightConfig': {'showInfo': True, 'showExtensionLines': True, 'contentColor': {'r': 255, 'g': 81, 'b': 6, 'a': 0.2}} }) - + + async def handle_key_event(event): + nonlocal inspection_mode + modifier = 5 # Represents Ctrl+Shift + # Toggle inspect mode + if event.get('type') == 'keyDown' and event.get('code') == 'KeyF' and event.get('modifiers') == modifier: + if inspection_mode: + await disable_inspection(cdp_session) + inspection_mode = False + print("Switched to interaction mode. Use CMD/CTRL+SHIFT+F to re-enable selection mode.") + else: + await enable_inspection(cdp_session) + inspection_mode = True + print("Switched to selection mode. Use CMD/CTRL+SHIFT+F to re-enable interaction mode.") + + # Cycle through element layers + elif len(selected_node) > 0 and inspection_mode: + if event.get('type') == 'keyDown' and event.get('code') == 'ArrowUp': + await cycle_elements_up(cdp_session, selected_node, dom_snapshot) + elif event.get('type') == 'keyDown' and event.get('code') == 'ArrowDown': + await cycle_elements_down(cdp_session, selected_node, dom_snapshot) + + def handle_console_event(event: Dict[str, Any]): + if event['type'] == 'log': + data = event.get('args', []) + if data and data[0].get('value', '').startswith('KeyEvent:'): + key_event = json.loads(data[1]['value']) + asyncio.create_task(handle_key_event(key_event)) + + cdp_session.on("Overlay.inspectNodeRequested", lambda event: handle_inspect_node(cdp_session, selected_node, backend_node_id=event["backendNodeId"])) + cdp_session.on("Runtime.consoleAPICalled", lambda params: handle_console_event(params)) + # Navigate to a website await page.goto(url) await page.wait_for_load_state("load") - print_welcome_message() - print("\n\nPress enter to begin...") - - # Set up BAML client registry - client_registry.add_llm_client(name="Sonnet", - provider=llm_provider, - options={ - "api_key": provider_api_key, - "model": "claude-3-5-sonnet-20240620" - }) - client_registry.set_primary("Sonnet") - - print("I need to take a screenshot of the page. Please wait...") - await page.screenshot(path='screenshots/start.png') - print("Screenshot taken") - - print("Describe the task you want to automate:") - task_instructions = input() - print("Creating plan...") - - try: - # Convert the screenshot to base64 - with open('screenshots/start.png', 'rb') as image_file: - encoded_screenshot = base64.b64encode(image_file.read()).decode('utf-8') - except Exception as e: - print(f"Error encoding screenshot: {e}") - return + + await page.evaluate(""" + window.addEventListener('keydown', (event) => { + const key = event.key; + const code = event.code; + const modifiers = (event.metaKey ? 8 : 0) | (event.ctrlKey ? 4 : 0) | (event.altKey ? 2 : 0) | (event.shiftKey ? 1 : 0); + console.log('KeyEvent:', JSON.stringify({ type: 'keyDown', key, code, modifiers })); + + if (code === 'ArrowUp' || code === 'ArrowDown') { + event.preventDefault(); + } + }); + """) + print("\nSelect an element in the browser. Enter 'mode' to toggle between selection and interaction mode, or 'quit' to exit: ", end="", flush=True) - plan = baml.CreatePlan(screenshot=Image.from_base64(encoded_screenshot), task_instructions=task_instructions) + dom_snapshot = await cdp_session.send("DOMSnapshot.captureSnapshot", {"computedStyles": []}) + while True: - print("Here's my plan:") - for step in plan: - print(f"{step.step_number}. {step.description}") - print("Would you like to make any changes to the plan? (yes|no)") - user_input = input() - if user_input.lower() in ['n', 'no']: + user_input = await asyncio.get_event_loop().run_in_executor(None, input) + if user_input.lower() in ['quit', 'q']: + browser.close() break - elif user_input.lower() in ['y', 'yes']: - print("What changes would you like to make?") - changes = input() - plan = baml.ChangePlan(screenshot=page.screenshot(), changes=changes, old_plan=plan) + elif user_input.lower() in ['mode', 'm']: + if inspection_mode: + await cdp_session.send('Overlay.setInspectMode', {'mode': 'none', 'highlightConfig': {}}) + inspection_mode = False + print("Switched to interaction mode.") + else: + await cdp_session.send('Overlay.setInspectMode', {'mode': 'searchForNode', 'highlightConfig': {'showInfo': True, 'showExtensionLines': True, 'contentColor': {'r': 255, 'g': 81, 'b': 6, 'a': 0.2}}}) + inspection_mode = True + print("Switched to selection mode.") + elif user_input.lower() in ['help', 'h']: + print("Click on eleements in the browser. Confirm your selection in the box above. Use 'a|add' to queue an element for generation. Use 'g|generate' to generate selectors for queued elements. Use 'l|list' to view queued elements.") + print("\n\033[1mCommands:\033[0m") + print(" • \033[1m'm'|'mode'\033[0m - Change between interaction and selection mode") + print(" • \033[1m'quit'|'q'\033[0m - Quit the program") + print("\n\033[1m\033[38;2;255;165;0mEnter command:\033[0m ", end="", flush=True) else: - print("Invalid input. Please answer 'yes' or 'no'.") + if len(selected_node) > 0: + pass + # result = await handle_process_node(finic_api_key, task_name, user_input, selected_node, cdp_session, page) + else: + print("Invalid input. Please select an element in the browser first or enter a command.") - request_elements(plan, cdp_session, page) - execute_plan(plan, cdp_session, page) - browser.close() \ No newline at end of file + browser.close() diff --git a/python_library/finic_py/utils.py b/python_library/finic_py/utils.py deleted file mode 100644 index 67af3eb2..00000000 --- a/python_library/finic_py/utils.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import Optional -from playwright.sync_api import Response - - -def get_cookies_from_string(cookies_string: str, url: Optional[str] = None, domain: Optional[str] = None, path: Optional[str] = None): - # Split the content by ";" - pairs = cookies_string.split("; ") - - cookies = [] - - # Populate the dictionary - for pair in pairs: - if "=" in pair: - key, value = pair.split("=", 1) - cookie = {"name": key, "value": value} - if url: - cookie["domain"] = url - elif domain and path: - cookie["domain"] = domain - cookie["path"] = path - else: - raise ValueError("Either url, or domain and path must be provided") - cookies.append(cookie) - - return cookies - -def handle_response(response: Response): - pass - -def generate_selector(element: NodeInfo): - pass \ No newline at end of file