diff --git a/poetry.lock b/poetry.lock index 12ab25b..a667640 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1795,6 +1795,20 @@ pydantic = ">=2.6.0,<2.10.0" ruamel-yaml = ">=0.17.21" typing-extensions = ">=4.7.1" +[[package]] +name = "schedule" +version = "1.2.2" +description = "Job scheduling for humans." +optional = false +python-versions = ">=3.7" +files = [ + {file = "schedule-1.2.2-py3-none-any.whl", hash = "sha256:5bef4a2a0183abf44046ae0d164cadcac21b1db011bdd8102e4a0c1e91e06a7d"}, + {file = "schedule-1.2.2.tar.gz", hash = "sha256:15fe9c75fe5fd9b9627f3f19cc0ef1420508f9f9a46f45cd0769ef75ede5f0b7"}, +] + +[package.extras] +timezone = ["pytz"] + [[package]] name = "setuptools" version = "75.8.0" @@ -2206,4 +2220,4 @@ tests-strict = ["pytest (==4.6.0)", "pytest (==6.2.5)", "pytest-cov (==3.0.0)"] [metadata] lock-version = "2.0" python-versions = "<3.13,>=3.10" -content-hash = "3dca5a126792f08b637d2c6a6bc9548fb41b6b36b3996da9b8de4f24c6be0a0f" +content-hash = "435926310f802e1ead120b52d0ef2e607ae1d46975098aea69751324f99abb13" diff --git a/pyproject.toml b/pyproject.toml index f4fd176..703e498 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ diskcache = "^5.6.3" watchdog = "^6.0.0" openai = "^1.60.1" parsedatetime = "^2.6" +schedule = "^1.2.2" tomli = { version = "2.2.1", markers = "python_version < '3.11'" } [tool.poetry.group.dev.dependencies] diff --git a/src/ai_marketplace_monitor/monitor.py b/src/ai_marketplace_monitor/monitor.py index 0e8f24d..ed15c1e 100644 --- a/src/ai_marketplace_monitor/monitor.py +++ b/src/ai_marketplace_monitor/monitor.py @@ -4,6 +4,7 @@ from logging import Logger from typing import Any, ClassVar, Dict, List +import schedule from playwright.sync_api import Browser, sync_playwright from rich.pretty import pretty_repr @@ -11,6 +12,7 @@ from .config import Config from .facebook import FacebookMarketplace from .items import SearchedItem +from .marketplace import Marketplace from .users import User from .utils import cache, calculate_file_hash, sleep_with_watchdog @@ -88,90 +90,115 @@ def load_ai_agents(self: "MarketplaceMonitor") -> None: self.logger.error(f"Error connecting to {ai_name}: {e}") continue - def start_monitor(self: "MarketplaceMonitor") -> None: - """Main function to monitor the marketplace.""" + def search_item( + self: "MarketplaceMonitor", + marketplace_name: str, + marketplace_config: Dict[str, Any], + marketplace: Marketplace, + item_name: str, + item_config: Dict[str, Any], + ) -> None: + """Search for an item on the marketplace.""" + self.logger.info(f"Searching {marketplace_name} for [magenta]{item_name}[/magenta]") + new_items = [] + # users to notify is determined from item, then marketplace, then all users + assert self.config is not None + users_to_notify = item_config.get( + "notify", + marketplace_config.get("notify", list(self.config["user"].keys())), + ) + for item in marketplace.search(item_config): + # if everyone has been notified + if ("notify_user", item["id"]) in cache and all( + user in cache.get(("notify_user", item["id"]), ()) for user in users_to_notify + ): + self.logger.info( + f"Already sent notification for item [magenta]{item['title']}[/magenta], skipping." + ) + continue + # for x in self.find_new_items(found_items) + if not self.confirmed_by_ai(item, item_name=item_name, item_config=item_config): + continue + new_items.append(item) + + self.logger.info( + f"""[magenta]{len(new_items)}[/magenta] new listing{"" if len(new_items) == 1 else "s"} for {item_name} {"is" if len(new_items) == 1 else "are"} found.""" + ) + if new_items: + self.notify_users(users_to_notify, new_items) + time.sleep(5) + + def schedule_jobs(self: "MarketplaceMonitor") -> None: + """Schedule jobs to run periodically.""" # start a browser with playwright with sync_playwright() as p: # Open a new browser page. browser: Browser = p.chromium.launch(headless=self.headless) - while True: - # we reload the config file each time when a scan action is completed - # this allows users to add/remove products dynamically. - self.load_config_file() - self.load_ai_agents() - - assert self.config is not None - for marketplace_name, marketplace_config in self.config["marketplace"].items(): - marketplace_class = supported_marketplaces[marketplace_name] - if marketplace_name in self.active_marketplaces: - marketplace = self.active_marketplaces[marketplace_name] - else: - marketplace = marketplace_class(marketplace_name, browser, self.logger) - self.active_marketplaces[marketplace_name] = marketplace + # we reload the config file each time when a scan action is completed + # this allows users to add/remove products dynamically. + self.load_config_file() + self.load_ai_agents() - # Configure might have been changed - marketplace.configure(marketplace_config) - - for item_name, item_config in self.config["item"].items(): - if ( - "marketplace" not in item_config - or item_config["marketplace"] == marketplace_name - ): - if not item_config.get("enabled", True): - continue - - self.logger.info( - f"Searching {marketplace_name} for [magenta]{item_name}[/magenta]" - ) - new_items = [] - # users to notify is determined from item, then marketplace, then all users - users_to_notify = item_config.get( - "notify", - marketplace_config.get("notify", list(self.config["user"].keys())), - ) - for item in marketplace.search(item_config): - # if everyone has been notified - if ("notify_user", item["id"]) in cache and all( - user in cache.get(("notify_user", item["id"]), ()) - for user in users_to_notify - ): - self.logger.info( - f"Already sent notification for item [magenta]{item['title']}[/magenta], skipping." - ) - continue - # for x in self.find_new_items(found_items) - if not self.confirmed_by_ai( - item, item_name=item_name, item_config=item_config - ): - continue - new_items.append(item) + assert self.config is not None + for marketplace_name, marketplace_config in self.config["marketplace"].items(): + marketplace_class = supported_marketplaces[marketplace_name] + if marketplace_name in self.active_marketplaces: + marketplace = self.active_marketplaces[marketplace_name] + else: + marketplace = marketplace_class(marketplace_name, browser, self.logger) + self.active_marketplaces[marketplace_name] = marketplace + + # Configure might have been changed + marketplace.configure(marketplace_config) + + for item_name, item_config in self.config["item"].items(): + if ( + "marketplace" not in item_config + or item_config["marketplace"] == marketplace_name + ): + if not item_config.get("enabled", True): + continue + # wait for some time before next search + # interval (in minutes) can be defined both for the marketplace + # if there is any configuration file change, stop sleeping and search again + search_interval = max( + item_config.get( + "search_interval", marketplace_config.get("search_interval", 30) + ), + 1, + ) + max_search_interval = max( + item_config.get(marketplace_config.get("max_search_interval", 1)), + search_interval, + ) + schedule.every( + random.randint(search_interval, max_search_interval) + ).minutes.do( + self.search_item, + marketplace_name, + marketplace_config, + marketplace, + item_name, + item_config, + ) - self.logger.info( - f"""[magenta]{len(new_items)}[/magenta] new listing{"" if len(new_items) == 1 else "s"} for {item_name} {"is" if len(new_items) == 1 else "are"} found.""" - ) - if new_items: - self.notify_users(users_to_notify, new_items) - time.sleep(5) - - # if configuration file has been changed, do not sleep - new_file_hash = calculate_file_hash(self.config_files) - assert self.config_hash is not None - if new_file_hash != self.config_hash: - self.logger.info("Config file changed, restarting monitor.") - continue - - # wait for some time before next search - # interval (in minutes) can be defined both for the marketplace - # if there is any configuration file change, stop sleeping and search again - search_interval = max(marketplace_config.get("search_interval", 30), 1) - max_search_interval = max( - marketplace_config.get("max_search_interval", 1), - search_interval, - ) - sleep_with_watchdog( - random.randint(search_interval * 60, max_search_interval * 60), - self.config_files, - ) + def start_monitor(self: "MarketplaceMonitor") -> None: + """Main function to monitor the marketplace.""" + while True: + self.schedule_jobs() + while True: + schedule.run_pending() + sleep_with_watchdog( + 60, + self.config_files, + ) + # if configuration file has been changed, clear all scheduled jobs and restart + new_file_hash = calculate_file_hash(self.config_files) + assert self.config_hash is not None + if new_file_hash != self.config_hash: + self.logger.info("Config file changed, restarting monitor.") + schedule.clear() + break def stop_monitor(self: "MarketplaceMonitor") -> None: """Stop the monitor.""" diff --git a/src/ai_marketplace_monitor/utils.py b/src/ai_marketplace_monitor/utils.py index d6856b9..985a18b 100644 --- a/src/ai_marketplace_monitor/utils.py +++ b/src/ai_marketplace_monitor/utils.py @@ -4,7 +4,7 @@ import time from typing import Any, Dict, List -import parsedatetime +import parsedatetime # type: ignore from diskcache import Cache # type: ignore from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer @@ -117,6 +117,6 @@ def extract_price(price: str) -> str: def convert_to_minutes(time_str: str) -> int: - cal = parsedatetime.Calendar() + cal = parsedatetime.Calendar(version=parsedatetime.VERSION_CONTEXT_STYLE) time_struct, _ = cal.parse(time_str) return int(time.mktime(time_struct) - time.mktime(time.localtime())) // 60 diff --git a/tests/test_cli.py b/tests/test_cli.py index b61be7a..6ea0950 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -50,10 +50,8 @@ def generate_config_file(content: str) -> str: full_marketplace_cfg = """ [marketplace.facebook] login_wait_time = 50 -max_search_interval = 40 password = "password" search_city = ['houston'] -search_interval = 10 username = "username" # the following are common options acceptable_locations = "city" @@ -63,10 +61,13 @@ def generate_config_file(content: str) -> str: exclude_sellers = "seller" max_price = 300 min_price = 200 +max_search_interval = 40 notify = 'user1' radius = 100 +search_interval = 10 search_region = 'usa' """ + base_item_cfg = """ [item.name] keywords = 'search word one' @@ -88,6 +89,8 @@ def generate_config_file(content: str) -> str: delivery_method = 'local_pick_up' exclude_sellers = "seller" max_price = 300 +max_search_interval = '1d' +search_interval = '12h' min_price = 200 notify = 'user1' radius = 100