-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfakerProduct.py
236 lines (192 loc) · 8.71 KB
/
fakerProduct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import os
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import re
import sys
import json
def setup_driver():
options = webdriver.ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
return driver
def extract_detail_images_selenium(driver, detail_url):
"""Selenium을 사용하여 상세 페이지에서 이미지 URL 추출"""
print(f"[INFO] 상세 페이지로 이동 중: {detail_url}")
driver.get(detail_url)
try:
# 상세 페이지 로드 대기
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "product-detail-content-inside"))
)
print("[INFO] 상세 페이지 로드 완료.")
# "더보기" 버튼 클릭
try:
see_more_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.CLASS_NAME, "product-detail-seemore-btn"))
)
see_more_button.click()
print("[INFO] '더보기' 버튼 클릭 성공.")
except Exception as e:
print(f"[WARN] '더보기' 버튼 클릭 실패 또는 버튼 없음: {str(e)}")
# product-detail-content-inside 내 모든 이미지 추출
content_inside = driver.find_element(By.CLASS_NAME, "product-detail-content-inside")
# 모든 <img> 태그 선택
image_elements = content_inside.find_elements(By.TAG_NAME, "img")
image_urls = []
for img in image_elements:
src = img.get_attribute("src")
if src:
# 상대 경로 처리
if src.startswith("https://"):
image_urls.append(src)
elif src.startswith("//"):
image_urls.append("https:" + src)
if not image_urls:
print("[WARN] 상세 이미지가 없습니다. 이 제품은 제외됩니다.")
return None # 이미지가 없으면 None 반환
print(f"[INFO] 총 {len(image_urls)}개의 이미지 URL을 수집했습니다.")
return image_urls
except Exception as e:
print(f"[ERROR] 상세 페이지 크롤링 중 오류 발생: {str(e)}")
return None # 오류 발생 시 None 반환
def extract_product_id(link):
"""상세 페이지 링크에서 ID 추출"""
match = re.search(r"/products/(\d+)", link)
if match:
return int(match.group(1))
print(f"[WARN] 비정상적인 링크: {link}")
return None
def search_coupang(driver, keyword):
"""검색 결과 목록 크롤링"""
driver.get("https://www.coupang.com/")
try:
# 팝업 닫기
try:
close_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.CLASS_NAME, "close"))
)
close_button.click()
except:
pass
# 검색어 입력
search_box = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.NAME, "q"))
)
search_box.send_keys(keyword)
search_box.send_keys("\n")
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "search-product"))
)
print("[INFO] 페이지 스크롤 중...")
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
print("[INFO] 스크롤 완료. 모든 제품 로드.")
products = driver.find_elements(By.CLASS_NAME, "search-product")
print(f"[INFO] 총 {len(products)}개의 제품 발견.")
results = []
for idx, product in enumerate(products, 1):
try:
item = {}
name_element = product.find_elements(By.CLASS_NAME, "name")
if not name_element or not name_element[0].text.strip():
continue
item['name'] = name_element[0].text.strip()
price_element = product.find_elements(By.CLASS_NAME, "price-value")
if not price_element or not price_element[0].text.strip():
continue
item['price'] = price_element[0].text.strip()
thumbnail_element = product.find_elements(By.CLASS_NAME, "search-product-wrap-img")
if not thumbnail_element or not thumbnail_element[0].get_attribute("src"):
continue
src = thumbnail_element[0].get_attribute("src")
item['thumbnail'] = "https:" + src if src.startswith("//") else src
ahref = product.find_elements(By.TAG_NAME, 'a')
if not ahref:
continue
item['link'] = ahref[0].get_attribute('href')
# ID 추출 및 검증
product_id = extract_product_id(item['link'])
if not product_id:
print(f"[WARN] 유효하지 않은 제품 건너뜀: {item['name']}")
continue
item['id'] = product_id
results.append(item)
print(f"[INFO] 제품 {idx}:\n ID: {item['id']}\n 이름: {item['name']}\n 가격: {item['price']}")
except Exception as e:
print(f"[ERROR] 제품 {idx} 처리 중 오류 발생: {str(e)}")
continue
return results
except Exception as e:
print(f"[ERROR] 검색 결과 크롤링 중 오류 발생: {str(e)}")
return []
def save_to_sql_file(product_data, keyword, filename="products.sql"):
"""SQL 파일 생성 - 제품 데이터를 JSON으로 저장, category 필드 추가"""
added_products = set()
with open(filename, "w", encoding="utf-8") as file:
file.write("-- SQL 명령어 파일\n")
file.write("-- products 테이블에 데이터 삽입\n\n")
for product in product_data:
# 제품 중복 확인
if product['id'] in added_products:
print(f"[INFO] 이미 추가된 제품 ID: {product['id']} - {product['name']} 건너뜀.")
continue
# JSON 배열로 상세 이미지 데이터를 변환
detail_images_json = json.dumps(product['detail_images'], ensure_ascii=False)
product_insert = f"""
INSERT INTO product (id, name, price, main_img, detail_imgs, category)
VALUES (
{product['id']},
'{product['name'].replace("'", "''")}',
{float(product['price'].replace(",", "").replace("원", "").strip())},
'{product['thumbnail']}',
'{detail_images_json.replace("'", "''")}',
'{keyword.replace("'", "''")}'
);
"""
file.write(product_insert)
added_products.add(product['id'])
print(f"[INFO] SQL 파일 생성 완료: {filename}")
def main():
driver = setup_driver()
keyword = input("검색할 키워드를 입력하세요: ")
results = search_coupang(driver, keyword)
if not results:
print("[ERROR] 검색 결과가 없습니다.")
driver.quit()
return
# 상세 페이지에서 이미지 가져오기
valid_results = []
for product in results:
print(f"[INFO] '{product['name']}' 상세 페이지에서 이미지 크롤링 시작...")
detail_images = extract_detail_images_selenium(driver, product['link'])
if detail_images is None:
print(f"[INFO] '{product['name']}'는 이미지가 없어 제외됩니다.")
continue
product['detail_images'] = detail_images
valid_results.append(product)
# SQL 파일 생성 (이미지가 있는 제품만 포함)
if valid_results:
save_to_sql_file(valid_results, keyword)
else:
print("[INFO] 유효한 제품이 없어 SQL 파일을 생성하지 않습니다.")
input("[INFO] 모든 작업이 완료되었습니다. 브라우저를 닫으려면 Enter 키를 누르세요.")
driver.quit()
if __name__ == "__main__":
main()