Skip to content

Commit

Permalink
Use a scheduler to allow items to have different search intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
BoPeng committed Jan 30, 2025
1 parent 9b9c0f2 commit 1217d42
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 83 deletions.
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ diskcache = "^5.6.3"
watchdog = "^6.0.0"
openai = "^1.60.1"
parsedatetime = "^2.6"
schedule = "^1.2.2"
tomli = { version = "2.2.1", markers = "python_version < '3.11'" }

[tool.poetry.group.dev.dependencies]
Expand Down
183 changes: 105 additions & 78 deletions src/ai_marketplace_monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from logging import Logger
from typing import Any, ClassVar, Dict, List

import schedule
from playwright.sync_api import Browser, sync_playwright
from rich.pretty import pretty_repr

from .ai import AIBackend, DeepSeekBackend, OpenAIBackend
from .config import Config
from .facebook import FacebookMarketplace
from .items import SearchedItem
from .marketplace import Marketplace
from .users import User
from .utils import cache, calculate_file_hash, sleep_with_watchdog

Expand Down Expand Up @@ -88,90 +90,115 @@ def load_ai_agents(self: "MarketplaceMonitor") -> None:
self.logger.error(f"Error connecting to {ai_name}: {e}")
continue

def start_monitor(self: "MarketplaceMonitor") -> None:
"""Main function to monitor the marketplace."""
def search_item(
self: "MarketplaceMonitor",
marketplace_name: str,
marketplace_config: Dict[str, Any],
marketplace: Marketplace,
item_name: str,
item_config: Dict[str, Any],
) -> None:
"""Search for an item on the marketplace."""
self.logger.info(f"Searching {marketplace_name} for [magenta]{item_name}[/magenta]")
new_items = []
# users to notify is determined from item, then marketplace, then all users
assert self.config is not None
users_to_notify = item_config.get(
"notify",
marketplace_config.get("notify", list(self.config["user"].keys())),
)
for item in marketplace.search(item_config):
# if everyone has been notified
if ("notify_user", item["id"]) in cache and all(
user in cache.get(("notify_user", item["id"]), ()) for user in users_to_notify
):
self.logger.info(
f"Already sent notification for item [magenta]{item['title']}[/magenta], skipping."
)
continue
# for x in self.find_new_items(found_items)
if not self.confirmed_by_ai(item, item_name=item_name, item_config=item_config):
continue
new_items.append(item)

self.logger.info(
f"""[magenta]{len(new_items)}[/magenta] new listing{"" if len(new_items) == 1 else "s"} for {item_name} {"is" if len(new_items) == 1 else "are"} found."""
)
if new_items:
self.notify_users(users_to_notify, new_items)
time.sleep(5)

def schedule_jobs(self: "MarketplaceMonitor") -> None:
"""Schedule jobs to run periodically."""
# start a browser with playwright
with sync_playwright() as p:
# Open a new browser page.
browser: Browser = p.chromium.launch(headless=self.headless)
while True:
# we reload the config file each time when a scan action is completed
# this allows users to add/remove products dynamically.
self.load_config_file()
self.load_ai_agents()

assert self.config is not None
for marketplace_name, marketplace_config in self.config["marketplace"].items():
marketplace_class = supported_marketplaces[marketplace_name]
if marketplace_name in self.active_marketplaces:
marketplace = self.active_marketplaces[marketplace_name]
else:
marketplace = marketplace_class(marketplace_name, browser, self.logger)
self.active_marketplaces[marketplace_name] = marketplace
# we reload the config file each time when a scan action is completed
# this allows users to add/remove products dynamically.
self.load_config_file()
self.load_ai_agents()

# Configure might have been changed
marketplace.configure(marketplace_config)

for item_name, item_config in self.config["item"].items():
if (
"marketplace" not in item_config
or item_config["marketplace"] == marketplace_name
):
if not item_config.get("enabled", True):
continue

self.logger.info(
f"Searching {marketplace_name} for [magenta]{item_name}[/magenta]"
)
new_items = []
# users to notify is determined from item, then marketplace, then all users
users_to_notify = item_config.get(
"notify",
marketplace_config.get("notify", list(self.config["user"].keys())),
)
for item in marketplace.search(item_config):
# if everyone has been notified
if ("notify_user", item["id"]) in cache and all(
user in cache.get(("notify_user", item["id"]), ())
for user in users_to_notify
):
self.logger.info(
f"Already sent notification for item [magenta]{item['title']}[/magenta], skipping."
)
continue
# for x in self.find_new_items(found_items)
if not self.confirmed_by_ai(
item, item_name=item_name, item_config=item_config
):
continue
new_items.append(item)
assert self.config is not None
for marketplace_name, marketplace_config in self.config["marketplace"].items():
marketplace_class = supported_marketplaces[marketplace_name]
if marketplace_name in self.active_marketplaces:
marketplace = self.active_marketplaces[marketplace_name]
else:
marketplace = marketplace_class(marketplace_name, browser, self.logger)
self.active_marketplaces[marketplace_name] = marketplace

# Configure might have been changed
marketplace.configure(marketplace_config)

for item_name, item_config in self.config["item"].items():
if (
"marketplace" not in item_config
or item_config["marketplace"] == marketplace_name
):
if not item_config.get("enabled", True):
continue
# wait for some time before next search
# interval (in minutes) can be defined both for the marketplace
# if there is any configuration file change, stop sleeping and search again
search_interval = max(
item_config.get(
"search_interval", marketplace_config.get("search_interval", 30)
),
1,
)
max_search_interval = max(
item_config.get(marketplace_config.get("max_search_interval", 1)),
search_interval,
)
schedule.every(
random.randint(search_interval, max_search_interval)
).minutes.do(
self.search_item,
marketplace_name,
marketplace_config,
marketplace,
item_name,
item_config,
)

self.logger.info(
f"""[magenta]{len(new_items)}[/magenta] new listing{"" if len(new_items) == 1 else "s"} for {item_name} {"is" if len(new_items) == 1 else "are"} found."""
)
if new_items:
self.notify_users(users_to_notify, new_items)
time.sleep(5)

# if configuration file has been changed, do not sleep
new_file_hash = calculate_file_hash(self.config_files)
assert self.config_hash is not None
if new_file_hash != self.config_hash:
self.logger.info("Config file changed, restarting monitor.")
continue

# wait for some time before next search
# interval (in minutes) can be defined both for the marketplace
# if there is any configuration file change, stop sleeping and search again
search_interval = max(marketplace_config.get("search_interval", 30), 1)
max_search_interval = max(
marketplace_config.get("max_search_interval", 1),
search_interval,
)
sleep_with_watchdog(
random.randint(search_interval * 60, max_search_interval * 60),
self.config_files,
)
def start_monitor(self: "MarketplaceMonitor") -> None:
"""Main function to monitor the marketplace."""
while True:
self.schedule_jobs()
while True:
schedule.run_pending()
sleep_with_watchdog(
60,
self.config_files,
)
# if configuration file has been changed, clear all scheduled jobs and restart
new_file_hash = calculate_file_hash(self.config_files)
assert self.config_hash is not None
if new_file_hash != self.config_hash:
self.logger.info("Config file changed, restarting monitor.")
schedule.clear()
break

def stop_monitor(self: "MarketplaceMonitor") -> None:
"""Stop the monitor."""
Expand Down
4 changes: 2 additions & 2 deletions src/ai_marketplace_monitor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from typing import Any, Dict, List

import parsedatetime
import parsedatetime # type: ignore
from diskcache import Cache # type: ignore
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
Expand Down Expand Up @@ -117,6 +117,6 @@ def extract_price(price: str) -> str:


def convert_to_minutes(time_str: str) -> int:
cal = parsedatetime.Calendar()
cal = parsedatetime.Calendar(version=parsedatetime.VERSION_CONTEXT_STYLE)
time_struct, _ = cal.parse(time_str)
return int(time.mktime(time_struct) - time.mktime(time.localtime())) // 60
7 changes: 5 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ def generate_config_file(content: str) -> str:
full_marketplace_cfg = """
[marketplace.facebook]
login_wait_time = 50
max_search_interval = 40
password = "password"
search_city = ['houston']
search_interval = 10
username = "username"
# the following are common options
acceptable_locations = "city"
Expand All @@ -63,10 +61,13 @@ def generate_config_file(content: str) -> str:
exclude_sellers = "seller"
max_price = 300
min_price = 200
max_search_interval = 40
notify = 'user1'
radius = 100
search_interval = 10
search_region = 'usa'
"""

base_item_cfg = """
[item.name]
keywords = 'search word one'
Expand All @@ -88,6 +89,8 @@ def generate_config_file(content: str) -> str:
delivery_method = 'local_pick_up'
exclude_sellers = "seller"
max_price = 300
max_search_interval = '1d'
search_interval = '12h'
min_price = 200
notify = 'user1'
radius = 100
Expand Down

0 comments on commit 1217d42

Please sign in to comment.