-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzzzInfluencergrabfromyoutubeandchunk.py
130 lines (119 loc) · 5.15 KB
/
zzzInfluencergrabfromyoutubeandchunk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import os
import requests
import json
from pymongo import MongoClient
from youtube_transcript_api import YouTubeTranscriptApi
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import (
SentenceSplitter,
SemanticSplitterNodeParser,
)
from llama_index.core import Document
YOUTUBE_API_KEY = 'AIzaSyBLLngRq6oVOmTDtu-uUCp_G_dH8bMdqPw'
#CHANNEL_ID = 'UCL_f53ZEJxp8TtlOkHwMV9Q' # Jordan Peterson's Channel
#CHANNEL_ID = 'UCod8o80imMaMr5TwcOK7iqw' # Prof G Channel
CHANNEL_ID = 'UC_GvMVffgctSISDkeWlKhXQ'
CHANNEL_NAME = 'Dope211' # Add this line to set the channel name
MAX_TOKENS = 1024 # Maximum number of tokens per chunk
MONGO_URI = os.getenv("MONGO_URI")
DATABASE_NAME = 'influencers'
COLLECTION_NAME = 'MakeUpHelper'
embed_model = OpenAIEmbedding()
splitter = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
base_splitter = SentenceSplitter(chunk_size=512)
# MongoDB setup
client = MongoClient(MONGO_URI)
db = client[DATABASE_NAME]
collection = db[COLLECTION_NAME]
def get_video_ids(api_key, channel_id):
videos = []
url = f"https://www.googleapis.com/youtube/v3/search?part=id&channelId={channel_id}&maxResults=50&type=video&key={api_key}"
while True:
response = requests.get(url).json()
if 'items' not in response:
break
videos.extend(item['id']['videoId'] for item in response['items'])
page_token = response.get('nextPageToken')
if not page_token:
break
url = f"{url}&pageToken={page_token}"
return videos
def get_video_details(api_key, video_ids):
details = {}
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
video_id_chunks = list(chunks(video_ids, 20))
for chunk in video_id_chunks:
videos_string = ','.join(chunk)
url = f"https://www.googleapis.com/youtube/v3/videos?part=snippet,statistics&id={videos_string}&key={api_key}"
response = requests.get(url).json()
if 'items' not in response:
print("Error in API response:", response)
continue
for item in response['items']:
video_id = item['id']
details[video_id] = {
'title': item['snippet']['title'],
'description': item['snippet']['description'],
'publishDate': item['snippet']['publishedAt'],
'statistics': item['statistics'],
'tags': item['snippet'].get('tags', []),
'url': f"https://www.youtube.com/watch?v={video_id}"
}
return details
def format_transcript(transcript_data):
formatted_transcript = []
for item in transcript_data:
formatted_transcript.append(f"{item['start']} - {item['text']}")
return "\n".join(formatted_transcript)
def split_into_chunks(transcript_data, max_tokens):
chunks = []
current_chunk = []
current_length = 0
for item in transcript_data:
text = item['text']
start_time = item['start']
sentence_length = len(text.split()) + 1 # +1 for space or punctuation
if current_length + sentence_length > max_tokens:
chunks.append(current_chunk)
current_chunk = []
current_length = 0
current_chunk.append({'text': text, 'start': start_time})
current_length += sentence_length
if current_chunk:
chunks.append(current_chunk)
return chunks
def store_transcript_to_mongodb(video_id, transcript_data, video_details):
text_chunks = split_into_chunks(transcript_data, MAX_TOKENS)
for idx, chunk in enumerate(text_chunks):
formatted_chunk = format_transcript(chunk)
video_entry = {
"Title": video_details[video_id]['title'],
"VideoID": video_id,
"Tags": video_details[video_id].get('tags', []),
"ViewCount": video_details[video_id]['statistics'].get('viewCount', 0),
"LikeCount": video_details[video_id]['statistics'].get('likeCount', 0),
"CommentCount": video_details[video_id]['statistics'].get('commentCount', 0),
"URL": video_details[video_id]['url'],
"Transcript": formatted_chunk,
"ChunkIndex": idx + 1,
"Channel_Name": CHANNEL_NAME # Add this line to include the channel name
}
collection.insert_one(video_entry)
print(f"Stored transcript chunk {idx + 1} for video ID: {video_id}")
def fetch_and_store_transcripts(video_ids, video_details):
for video_id in video_ids:
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
transcript_data = [{'start': item['start'], 'text': item['text']} for item in transcript]
store_transcript_to_mongodb(video_id, transcript_data, video_details)
except Exception as e:
print(f"Could not fetch transcript for video ID {video_id}: {e}")
if __name__ == "__main__":
video_ids = get_video_ids(YOUTUBE_API_KEY, CHANNEL_ID)
#video_ids = video_ids[100:102] # Test first 2 videos
video_details = get_video_details(YOUTUBE_API_KEY, video_ids)
fetch_and_store_transcripts(video_ids, video_details)