forked from vas3k/infomate.club
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
108 lines (79 loc) · 3 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
import re
import socket
from datetime import datetime
from time import mktime
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from requests import RequestException
from urllib3.exceptions import InsecureRequestWarning
import requests
log = logging.getLogger(__name__)
DEFAULT_REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 "
"Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
}
DEFAULT_REQUEST_TIMEOUT = 10
MAX_PARSABLE_CONTENT_LENGTH = 15 * 1024 * 1024 # 15Mb
socket.setdefaulttimeout(DEFAULT_REQUEST_TIMEOUT)
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
def resolve_url(entry_link):
url = str(entry_link)
content_type = None
content_length = MAX_PARSABLE_CONTENT_LENGTH + 1 # don't parse null content-types
depth = 10
while depth > 0:
depth -= 1
try:
response = requests.head(url, timeout=DEFAULT_REQUEST_TIMEOUT, verify=False, stream=True)
except RequestException:
log.warning(f"Failed to resolve URL: {url}")
return None, content_type, content_length
if 300 < response.status_code < 400:
url = response.headers["location"] # follow redirect
else:
content_type = response.headers.get("content-type")
content_length = int(response.headers.get("content-length") or 0)
break
return url, content_type, content_length
def parse_domain(url):
domain = urlparse(url).netloc
if domain.startswith("www."):
domain = domain[4:]
return domain
def parse_datetime(entry):
published_time = entry.get("published_parsed") or entry.get("updated_parsed")
if published_time:
return datetime.fromtimestamp(mktime(published_time))
return datetime.utcnow()
def parse_title(entry):
title = entry.get("title") or entry.get("description") or entry.get("summary")
return re.sub("<[^<]+?>", "", title).strip()
def parse_link(entry):
if entry.get("link"):
return entry["link"]
if entry.get("links"):
return entry["links"][0]["href"]
return None
def parse_rss_image(entry):
if entry.get("media_content"):
images = [m["url"] for m in entry["media_content"] if m.get("medium") == "image" and m.get("url")]
if images:
return images[0]
if entry.get("image"):
if isinstance(entry["image"], dict):
return entry["image"].get("href")
return entry["image"]
return None
def parse_rss_text_and_image(entry):
if not entry.get("summary"):
return "", ""
bs = BeautifulSoup(entry["summary"], features="lxml")
text = re.sub(r"\s\s+", " ", bs.text or "").strip()
img_tags = bs.findAll("img")
for img_tag in img_tags:
src = img_tag.get("src", None)
if src:
return text, src
return text, ""