Skip to content

Commit

Permalink
Merge pull request #30 from BoPeng/issue28
Browse files Browse the repository at this point in the history
Use playwright instead of beautifulsoup to scrap information.
  • Loading branch information
BoPeng authored Feb 3, 2025
2 parents 246a6cb + 7776078 commit 0b1cdb2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 183 deletions.
42 changes: 5 additions & 37 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ ai-marketplace-monitor = 'ai_marketplace_monitor.cli:app'
[tool.poetry.dependencies]
python = "<3.13,>=3.10"
typer = { extras = ["all"], version = "^0.15.1" }
beautifulsoup4 = "^4.12.3"
playwright = "^1.49.1"
rich = "^13.9.4"
"pushbullet.py" = "^0.12.0"
Expand Down
234 changes: 89 additions & 145 deletions src/ai_marketplace_monitor/facebook.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import re
import time
from dataclasses import dataclass
from enum import Enum
from itertools import repeat
from logging import Logger
from typing import Any, Generator, List, Type, Union, cast
from typing import Any, Generator, List, Type, cast
from urllib.parse import quote

import humanize
import rich
from bs4 import BeautifulSoup, element # type: ignore
from playwright.sync_api import Browser, Page # type: ignore
from rich.pretty import pretty_repr

Expand Down Expand Up @@ -352,9 +350,7 @@ def search(
for keyword in item_config.keywords or []:
self.goto_url(marketplace_url + "&".join([f"query={quote(keyword)}", *options]))

found_items = FacebookSearchResultPage(
self.page.content(), self.logger
).get_listings()
found_items = FacebookSearchResultPage(self.page, self.logger).get_listings()
time.sleep(5)
# go to each item and get the description
# if we have not done that before
Expand Down Expand Up @@ -394,7 +390,7 @@ def get_item_details(self: "FacebookMarketplace", post_url: str) -> SearchedItem

assert self.page is not None
self.goto_url(post_url)
details = FacebookItemPage(self.page.content(), self.logger).parse(post_url)
details = FacebookItemPage(self.page, self.logger).parse(post_url)
cache.set(
(CacheType.ITEM_DETAILS.value, post_url.split("?")[0]), details, tag="item_details"
)
Expand Down Expand Up @@ -458,175 +454,123 @@ def filter_item(

class WebPage:

def __init__(self: "WebPage", html: str, logger: Logger) -> None:
self.html = html
self.soup = BeautifulSoup(self.html, "html.parser")
def __init__(self: "WebPage", page: Page, logger: Logger) -> None:
self.page = page
self.logger = logger


class FacebookSearchResultPage(WebPage):

def get_listings_from_structure(
self: "FacebookSearchResultPage",
) -> List[Union[element.Tag, element.NavigableString]]:
heading = self.soup.find(attrs={"aria-label": "Collection of Marketplace items"})
child1 = next(heading.children)
child2 = next(child1.children)
grid_parent = list(child2.children)[2] # groups of listings
for group in grid_parent.children:
grid_child2 = list(group.children)[1] # the actual grid container
return list(grid_child2.children)
return []

def get_listing_from_css(
self: "FacebookSearchResultPage",
) -> List[Union[element.Tag, element.NavigableString]]:
return self.soup.find_all(
"div",
class_="x9f619 x78zum5 x1r8uery xdt5ytf x1iyjqo2 xs83m0k x1e558r4 x150jy0e x1iorvi4 xjkvuk6 xnpuxes x291uyu x1uepa24",
)

def parse_listing(
self: "FacebookSearchResultPage", listing: Union[element.Tag, element.NavigableString]
) -> SearchedItem | None:
# if the element has no text (only image etc)
if not listing.get_text().strip():
return None

child1 = next(listing.children)
child2 = next(child1.children)
child3 = next(child2.children) # span class class="x1lliihq x1iyjqo2"
child4 = next(child3.children) # div
child5 = next(child4.children) # div class="x78zum5 xdt5ytf"
child5 = next(child5.children) # div class="x9f619 x1n2onr6 x1ja2u2z"
child6 = next(child5.children) # div class="x3ct3a4" (real data here)
atag = next(child6.children) # a tag
post_url = atag["href"]
atag_child1 = next(atag.children)
atag_child2 = list(atag_child1.children) # 2 divs here
# Get the item image.
image = listing.find("img")["src"]

details = list(
atag_child2[1].children
) # x9f619 x78zum5 xdt5ytf x1qughib x1rdy4ex xz9dl7a xsag5q8 xh8yej3 xp0eagm x1nrcals
# There are 4 divs in 'details', in this order: price, title, location, distance
price = extract_price(details[0].contents[-1].text)

title = details[1].contents[-1].text
location = details[2].contents[-1].text

# Append the parsed data to the list.
return SearchedItem(
marketplace="facebook",
name="",
id=post_url.split("?")[0].rstrip("/").split("/")[-1],
title=title,
image=image,
price=price,
# all the ?referral_code&referral_sotry_type etc
# could be helpful for live navigation, but will be stripped
# for caching item details.
post_url=post_url,
location=location,
seller="",
description="",
)

def get_listings(self: "FacebookSearchResultPage") -> List[SearchedItem]:
try:
listings = self.get_listings_from_structure()
except Exception as e1:
try:
listings = self.get_listing_from_css()
except Exception as e2:
self.logger.debug(
f"""{hilight("[Retrieve]", "fail")} No listings found from structure and css: {e1}, {e2}"""
listings = []
heading = self.page.locator('[aria-label="Collection of Marketplace items"]')
# find the grid box
grid = heading.locator(
":scope > :first-child > :first-child > :nth-child(3) > :first-child > :nth-child(2)"
)
# find each listing
for listing in grid.locator("> div").all():
if not listing.text_content():
continue
atag = listing.locator(
":scope > :first-child > :first-child > :first-child > :first-child > :first-child > :first-child > :first-child > :first-child"
)
post_url = atag.get_attribute("href") or ""
details = atag.locator(":scope > :first-child > div").nth(1)
raw_price = details.locator(":scope > div").nth(0).text_content() or ""
title = details.locator(":scope > div").nth(1).text_content() or ""
location = details.locator(":scope > div").nth(2).text_content() or ""
image = listing.locator("img").get_attribute("src") or ""
price = extract_price(raw_price)

listings.append(
SearchedItem(
marketplace="facebook",
name="",
id=post_url.split("?")[0].rstrip("/").split("/")[-1],
title=title,
image=image,
price=price,
# all the ?referral_code&referral_sotry_type etc
# could be helpful for live navigation, but will be stripped
# for caching item details.
post_url=post_url,
location=location,
seller="",
description="",
)
return []

result = [self.parse_listing(listing) for listing in listings]
# case from SearchedItem|None to SearchedItem
return [cast(SearchedItem, x) for x in result if x is not None]
)
# Append the parsed data to the list.
return listings


class FacebookItemPage(WebPage):

def get_image_url(self: "FacebookItemPage") -> str:
def get_title(self: "FacebookItemPage") -> str:
try:
return self.soup.find("img")["src"]
h1_element = self.page.query_selector_all("h1")[-1]
return h1_element.text_content() or ""
except Exception as e:
self.logger.debug(f'{hilight("[Retrieve]", "fail")} {e}')
return ""

def get_title_and_price(self: "FacebookItemPage") -> List[str]:
title = ""
price = ""
def get_price(self: "FacebookItemPage") -> str:
try:
title_element = self.soup.find("h1")
title = title_element.get_text(strip=True)
price = extract_price(title_element.next_sibling.get_text())
price_element = self.page.locator("h1 + *")
return price_element.text_content() or ""
except Exception as e:
self.logger.debug(f'{hilight("[Skip]", "fail")} {e}')
self.logger.debug(f'{hilight("[Retrieve]", "fail")} {e}')
return ""

return [title, price]
def get_image_url(self: "FacebookItemPage") -> str:
try:
image_url = self.page.locator("img").first.get_attribute("src") or ""
return image_url
except Exception as e:
self.logger.debug(f'{hilight("[Retrieve]", "fail")} {e}')
return ""

def get_description_and_location(self: "FacebookItemPage") -> List[str]:
description = ""
location = ""
def get_seller(self: "FacebookItemPage") -> str:
try:
cond = self.soup.find("span", string="Condition")
if cond is None:
raise ValueError("No span for condition is fond")
ul = cond.find_parent("ul")
if ul is None:
raise ValueError("No ul as parent for condition is fond")
description_div = ul.find_next_sibling()
description = description_div.get_text(strip=True)
#
location_element = description_div.find_next_siblings()[-1]
location = location_element.find("span").get_text()
seller_link = self.page.locator('a[href^="/marketplace/profile"]').last
return seller_link.text_content() or ""
except Exception as e:
self.logger.debug(f'{hilight("[Retrieve]", "fail")} {e}')
return ""

return [description, location]
def get_description(self: "FacebookItemPage") -> str:
try:
# description
description_element = self.page.locator(
'span:text("condition") >> xpath=ancestor::ul[1] >> xpath=following-sibling::*[1]'
)
return description_element.text_content() or ""
except Exception as e:
self.logger.debug(f'{hilight("[Retrieve]", "fail")} {e}')
return ""

def get_seller(self: "FacebookItemPage") -> str:
seller = ""
def get_location(self: "FacebookItemPage") -> str:
try:
profiles = self.soup.find_all("a", href=re.compile(r"/marketplace/profile"))
seller = profiles[-1].get_text()
# Find the span with text "condition", then parent, then next...
description_element = self.page.locator(
'span:text("condition") >> xpath=ancestor::ul[1] >> xpath=following-sibling::*[1]'
)
description_parent = description_element.locator("xpath=following-sibling::*[last()]")
location_element = description_parent.locator("span:not(:has(*))").first
return location_element.text_content() or ""
except Exception as e:
self.logger.debug(f'{hilight("[Retrieve]", "fail")} {e}')
return seller
return ""

def parse(self: "FacebookItemPage", post_url: str) -> SearchedItem:
# title
item_id = post_url.split("?")[0].rstrip("/").split("/")[-1]
title, price = self.get_title_and_price()
description, location = self.get_description_and_location()

# if not title or not price:
# with open(f"{item_id}.html", "w") as f:
# f.write(self.html)

if not title:
raise ValueError(
f"""No title was found for item {post_url}, which is most likely caused by a network issue. Please report the issue to the developer if the problem persists."""
)
if not price:
# with open(f"{item_id}.html", "w") as f:
# f.write(self.html)
raise ValueError(
f"""No price was found for item {post_url}, which is most likely caused by a network issue. Consider running with option --disable-javascript"""
)
title = self.get_title()
price = self.get_price()
description = self.get_description()

if not description:
# with open(f"{item_id}.html", "w") as f:
# f.write(self.html)
raise ValueError(
f"""No description was found for item {post_url}, which is most likely caused by a network issue. Consider running with option --disable-javascript"""
)
if not title or not price or not description:
raise ValueError(f"Failed to parse {post_url}")

self.logger.info(f'{hilight("[Retrieve]", "succ")} Parsing {hilight(title)}')
res = SearchedItem(
Expand All @@ -635,9 +579,9 @@ def parse(self: "FacebookItemPage", post_url: str) -> SearchedItem:
id=item_id,
title=title,
image=self.get_image_url(),
price=price,
price=extract_price(price),
post_url=post_url,
location=location,
location=self.get_location(),
description=description,
seller=self.get_seller(),
)
Expand Down

0 comments on commit 0b1cdb2

Please sign in to comment.