Skip to content

Commit

Permalink
Better caching for item details and user notification
Browse files Browse the repository at this point in the history
  • Loading branch information
BoPeng committed Jan 30, 2025
1 parent 539bcbb commit 5c1ec72
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 76 deletions.
30 changes: 15 additions & 15 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ beautifulsoup4 = "^4.12.3"
playwright = "^1.49.1"
rich = "^13.9.4"
"pushbullet.py" = "^0.12.0"
joblib = "^1.4.2"
diskcache = "^5.6.3"
watchdog = "^6.0.0"
openai = "^1.60.1"
tomli = { version = "2.2.1", markers = "python_version < '3.11'" }
Expand Down
17 changes: 9 additions & 8 deletions src/ai_marketplace_monitor/facebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .items import SearchedItem
from .marketplace import Marketplace
from .utils import extract_price, is_substring, memory
from .utils import cache, extract_price, is_substring


class FacebookMarketplace(Marketplace):
Expand Down Expand Up @@ -45,10 +45,6 @@ def __init__(
) -> None:
assert name == self.name
super().__init__(name, browser, logger)
# cache the output of website, but ignore the change of "self" and browser
# see https://joblib.readthedocs.io/en/latest/memory.html#gotchas for details
self.get_item_details = memory.cache(self._get_item_details, ignore=["self"])
#
self.page: Page | None = None

@classmethod
Expand Down Expand Up @@ -285,14 +281,19 @@ def search(
if self.filter_item(item, item_config):
yield item

# get_item_details is wrapped around this function to cache results for urls
def _get_item_details(self: "FacebookMarketplace", post_url: str) -> SearchedItem:
def get_item_details(self: "FacebookMarketplace", post_url: str) -> SearchedItem:
details = cache.get(("get_item_details", post_url))
if details is not None:
return details

if not self.page:
self.login()

assert self.page is not None
self.goto_url(f"https://www.facebook.com{post_url}")
return FacebookItemPage(self.page.content(), self.logger).parse(post_url)
details = FacebookItemPage(self.page.content(), self.logger).parse(post_url)
cache.set(("get_item_details", post_url), details, tag="item_details")
return details

def filter_item(
self: "FacebookMarketplace", item: SearchedItem, item_config: Dict[str, Any]
Expand Down
96 changes: 46 additions & 50 deletions src/ai_marketplace_monitor/monitor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import random
import time
Expand All @@ -13,15 +12,13 @@
from .facebook import FacebookMarketplace
from .items import SearchedItem
from .users import User
from .utils import amm_home, calculate_file_hash, memory, sleep_with_watchdog
from .utils import cache, calculate_file_hash, sleep_with_watchdog

supported_marketplaces = {"facebook": FacebookMarketplace}
supported_ai_backends = {"deepseek": DeepSeekBackend, "openai": OpenAIBackend}


class MarketplaceMonitor:
search_history_cache = os.path.join(amm_home, "searched_items.json")

active_marketplaces: ClassVar = {}

def __init__(
Expand All @@ -46,12 +43,8 @@ def __init__(
self.headless = headless
self.ai_agents: List[AIBackend] = []
self.logger = logger
self.notified_items: List[SearchedItem] = self.load_notified_items()
if clear_cache:
if os.path.exists(self.search_history_cache):
os.remove(self.search_history_cache)
#
memory.clear()
cache.clear()

def load_config_file(self: "MarketplaceMonitor") -> Dict[str, Any]:
"""Load the configuration file."""
Expand Down Expand Up @@ -131,8 +124,17 @@ def start_monitor(self: "MarketplaceMonitor") -> None:
f"Searching {marketplace_name} for [magenta]{item_name}[/magenta]"
)
new_items = []
# users to notify is determined from item, then marketplace, then all users
users_to_notify = item_config.get(
"notify",
marketplace_config.get("notify", list(self.config["user"].keys())),
)
for item in marketplace.search(item_config):
if self.already_notified(item):
# if everyone has been notified
if ("notify_user", item["id"]) in cache and all(
user in cache.get(("notify_user", item["id"]), ())
for user in users_to_notify
):
self.logger.info(
f"Already sent notification for item [magenta]{item['title']}[/magenta], skipping."
)
Expand All @@ -148,11 +150,7 @@ def start_monitor(self: "MarketplaceMonitor") -> None:
f"""[magenta]{len(new_items)}[/magenta] new listing{"" if len(new_items) == 1 else "s"} for {item_name} {"is" if len(new_items) == 1 else "are"} found."""
)
if new_items:
self.notify_users(
marketplace_config.get("notify", [])
+ item_config.get("notify", []),
new_items,
)
self.notify_users(users_to_notify, new_items)
time.sleep(5)

# if configuration file has been changed, do not sleep
Expand All @@ -179,7 +177,7 @@ def stop_monitor(self: "MarketplaceMonitor") -> None:
"""Stop the monitor."""
for marketplace in self.active_marketplaces.values():
marketplace.stop()
self.save_notified_items()
cache.close()

def check_items(
self: "MarketplaceMonitor", items: List[str] | None = None, for_item: str | None = None
Expand Down Expand Up @@ -261,23 +259,9 @@ def check_items(
)
marketplace.filter_item(listing, item_config)
self.confirmed_by_ai(listing, item_name=item_name, item_config=item_config)
if self.already_notified(listing):
if ("notify_user", listing["id"]) in cache:
self.logger.info(f"Already sent notification for item {item_name}.")

def load_notified_items(self: "MarketplaceMonitor") -> List[SearchedItem]:

if os.path.isfile(self.search_history_cache):
with open(self.search_history_cache, "r") as f:
return json.load(f)
return []

def save_notified_items(self: "MarketplaceMonitor") -> None:
with open(self.search_history_cache, "w") as f:
json.dump(self.notified_items, f)

def already_notified(self: "MarketplaceMonitor", item: SearchedItem) -> bool:
return any(x["id"] == item["id"] for x in self.notified_items)

def confirmed_by_ai(
self: "MarketplaceMonitor", item: SearchedItem, item_name: str, item_config: Dict[str, Any]
) -> bool:
Expand All @@ -293,30 +277,42 @@ def confirmed_by_ai(
def notify_users(
self: "MarketplaceMonitor", users: List[str], items: List[SearchedItem]
) -> None:
users = list(set(users))
if not users:
self.logger.warning("Will notify all users since no user is listed for notify.")
assert self.config is not None
users = list(self.config["user"].keys())

# we cache notified user in the format of
#
# ("notify_user", item_id) = (user1, user2, user3)
#
# get notification msg for this item
msgs = []
for item in items:
self.logger.info(
f"""New item found: {item["title"]} with URL https://www.facebook.com{item["post_url"]}"""
)
msgs.append(
f"""{item['title']}\n{item['price']}, {item['location']}\nhttps://www.facebook.com{item['post_url']}"""
)
# add to notified items
self.notified_items.append(item)
# found the user from the user configuration
for user in users:
title = f"Found {len(items)} new item from {item['marketplace']}: "
msgs = []
unnotified_items = []
for item in items:
if ("notify_user", item["id"]) not in cache or user not in cache.get(
("notify_user", item["id"]), ()
):
continue
self.logger.info(
f"""New item found: {item["title"]} with URL https://www.facebook.com{item["post_url"]} for user {user}"""
)
msgs.append(
f"""{item['title']}\n{item['price']}, {item['location']}\nhttps://www.facebook.com{item['post_url']}"""
)
unnotified_items.append(item)

title = f"Found {len(msgs)} new item from {item['marketplace']}: "
message = "\n\n".join(msgs)
self.logger.info(
f"Sending {user} a message with title [magenta]{title}[/magenta] and message [magenta]{message}[/magenta]"
)
assert self.config is not None
assert self.config["user"] is not None
User(user, self.config["user"][user], logger=self.logger).notify(title, message)
try:
User(user, self.config["user"][user], logger=self.logger).notify(title, message)
for item in unnotified_items:
cache.set(
("notify_user", item["id"]),
(user, *cache.get(("notify_user", item["id"]), ())),
tag="notify_user",
)
except Exception as e:
self.logger.error(f"Failed to notify {user}: {e}")
continue
4 changes: 2 additions & 2 deletions src/ai_marketplace_monitor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import time
from typing import Any, Dict, List

from joblib import Memory # type: ignore
from diskcache import Cache # type: ignore
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

# home directory for all settings and caches
amm_home = os.path.join(os.path.expanduser("~"), ".ai-marketplace-monitor")
os.makedirs(amm_home, exist_ok=True)

memory = Memory(location=amm_home, verbose=0)
cache = Cache(amm_home)


def calculate_file_hash(file_paths: List[str]) -> str:
Expand Down

0 comments on commit 5c1ec72

Please sign in to comment.