-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollect.py
300 lines (244 loc) · 11.9 KB
/
collect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import tweepy
import re
import json
import sqlite3 as lite
import datetime, time, os, sys
import argparse, configparser
Config = configparser.ConfigParser()
Config.read('config.cnf')
litecon = lite.connect('data/pundits.db')
with litecon:
# set up SQL tables
litecur = litecon.cursor()
# the sample, with two columns for either the Tweet itself, or the error in trying to retrieve it
litecur.execute("CREATE TABLE IF NOT EXISTS sample (doi TEXT, old_screen_name TEXT, tweet_id TEXT, tweet TEXT, error TEXT, modified TEXT)")
litecur.execute("CREATE INDEX IF NOT EXISTS sample_old_screen_name ON sample (old_screen_name)")
litecur.execute("CREATE INDEX IF NOT EXISTS sample_tweet_id ON sample (tweet_id)")
litecur.execute("CREATE INDEX IF NOT EXISTS sample_modified ON sample (modified)")
# the users that were found
litecur.execute("CREATE TABLE IF NOT EXISTS users (user_id TEXT, screen_name TEXT, user_object TEXT, timeline TEXT, timeline_error TEXT, timeline_modified TEXT, user_modified TEXT)")
litecur.execute("CREATE UNIQUE INDEX IF NOT EXISTS users_user_id ON users (user_id)")
litecur.execute("CREATE INDEX IF NOT EXISTS users_screen_name ON users (screen_name)")
litecur.execute("CREATE TABLE IF NOT EXISTS friends (user_id TEXT, friend_id TEXT, modified TEXT)")
litecur.execute("CREATE INDEX IF NOT EXISTS friends_user_id ON friends (user_id)")
litecur.execute("CREATE UNIQUE INDEX IF NOT EXISTS friends_user_friend_id ON friends (user_id, friend_id)")
litecur.execute("CREATE TABLE IF NOT EXISTS followers (user_id TEXT, follower_id TEXT, modified TEXT)")
litecur.execute("CREATE INDEX IF NOT EXISTS followers_user_id ON followers (user_id)")
litecur.execute("CREATE UNIQUE INDEX IF NOT EXISTS followers_user_follower_id ON followers (user_id, follower_id)")
consumer_key = Config.get('twittersfupubresearch', 'consumer_key')
consumer_secret = Config.get('twittersfupubresearch', 'consumer_secret')
access_token = Config.get('twittersfupubresearch', 'access_token')
access_token_secret = Config.get('twittersfupubresearch', 'access_token_secret')
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# set up access to the Twitter API
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
twitter_app_auth = {
'consumer_key': consumer_key,
'consumer_secret': consumer_secret,
'access_token': access_token,
'access_token_secret': access_token_secret,
'wait_on_rate_limit': True,
'wait_on_rate_limit_notify': True
}
def load_file(filename):
with open(filename, 'r') as f:
l = f.readline()
with litecon:
litecur = litecon.cursor()
for l in f:
l = [x.strip('"') for x in l.strip().split('\t')]
doi = l[0]
screenname = l[1]
tweet_id = l[2]
try:
litecur.execute('INSERT INTO sample (doi, old_screen_name, tweet_id) VALUES (?, ?, ?)', (doi, screenname, tweet_id))
except lite.IntegrityError:
# don't worry about duplicates
pass
def __save_tweet(tweet_id, tweet, error = None):
'''
Do the actual SQLite update with the info collected
'''
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
with litecon:
litecur = litecon.cursor()
if error:
try:
m = error.reason
except:
m = str(error)
litecur.execute('UPDATE sample SET error = ?, modified = ? WHERE tweet_id = ?', (m, now, tweet_id))
else:
litecur.execute('UPDATE sample SET tweet = ?, modified = ? WHERE tweet_id = ?', (json.dumps(tweet._json), now, tweet_id))
try:
litecur.execute('INSERT INTO users (user_id, screen_name, user_object, user_modified) VALUES (?, ?, ?, ?)', (tweet.user.id, tweet.user.screen_name, json.dumps(tweet.user._json), now))
except lite.IntegrityError:
# don't worry about duplicates
pass
def __save_timeline(user_id, timeline, error = None):
'''
Do the actual SQLite update with the info collected
'''
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
with litecon:
litecur = litecon.cursor()
if error:
try:
m = error.reason
except:
m = str(error)
litecur.execute('UPDATE users SET timeline_error = ?, timeline_modified = ? WHERE user_id = ?', (m, now, user_id))
else:
litecur.execute('UPDATE users SET timeline = ?, timeline_modified = ? WHERE user_id = ?', (json.dumps([s._json for s in timeline]), now, user_id))
def get_tweets_in_sample():
'''
Find all the tweets in the sample that have not been fetched yet
and make individual calls to find the users associated with them
'''
with litecon:
litecur = litecon.cursor()
litecur.execute("SELECT tweet_id, old_screen_name FROM sample WHERE tweet IS NULL")
sampled = litecur.fetchall()
for s in sampled:
tweet_id = s[0]
try:
tweet = api.get_status(tweet_id)
__save_tweet(tweet_id, tweet)
except tweepy.TweepError as error:
# bit hacky, we're passing a tweet_id instead of a tweet here
try:
user = api.get_user(screen_name=s[1])
try:
error.reason += '; Found by screen_name'
__save_tweet(tweet_id, None, error)
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
litecur.execute('INSERT INTO users (user_id, screen_name, user_object, user_modified) VALUES (?, ?, ?, ?)', (user.id, user.screen_name, json.dumps(user._json), now))
except lite.IntegrityError:
# don't worry about duplicates
pass
except tweepy.TweepError as error2:
__save_tweet(tweet_id, None, error) # leave the original error
# now try all the errors one more time
litecur.execute('SELECT tweet_id FROM sample WHERE tweet IS NULL AND error IS NOT NULL')
sampled = litecur.fetchall()
def get_tweets_in_sample_batch():
'''
Find all the tweets in the sample that have not been fetched yet
and make a batch call to find the users associated with them
'''
with litecon:
litecur = litecon.cursor()
litecur.execute('SELECT tweet_id FROM sample WHERE tweet IS NULL AND error IS NULL')
while (True):
tweet_ids = [t[0] for t in litecur.fetchmany(100)]
if not tweet_ids: break
try:
statuses = api.statuses_lookup(tweet_ids)
for tweet in statuses:
__save_tweet(tweet.id, tweet)
except tweepy.TweepError as error:
print( error)
exit(1)
def get_timelines_batch():
'''
Get all the timeline JSON objects for users we know exist (we've saved)
'''
while (True):
with litecon:
litecur = litecon.cursor()
litecur.execute('SELECT u.user_id FROM users u WHERE timeline IS NULL AND timeline_error IS NULL')
# go 100 at a time so we're not hitting the DB so much
user_ids = [u[0] for u in litecur.fetchmany(100)]
if not user_ids: break
for user_id in user_ids:
try:
timeline = api.user_timeline(user_id)
__save_timeline(user_id, timeline)
except tweepy.TweepError as error:
__save_timeline(user_id, None, error)
def __save_network(endpoint, user_id, ids, error = None):
'''
Do the actual SQLite update with the info collected
'''
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
with litecon:
litecur = litecon.cursor()
if error:
try:
m = error.reason
except:
m = str(error)
print( "Error: ", user_id, m)
litecur.execute('INSERT INTO %s (user_id, %s_id, modified) VALUES (?, ?, ?)' % (endpoint, endpoint[:-1]), (user_id, -1, now))
else:
print( 'saving', len(ids))
for f in ids:
try:
litecur.execute('INSERT INTO %s (user_id, %s_id, modified) VALUES (?, ?, ?)' % (endpoint, endpoint[:-1]), (user_id, f, now))
except lite.IntegrityError:
pass # ignore duplicates, they wont change the network
def get_friends(user_id = None):
get_network('friends', user_id)
def get_followers(user_id = None):
get_network('followers', user_id)
def get_network(endpoint, user_id = None):
'''
Get the friends/followers list for all users (or for a specific user
'''
if user_id is None:
while (True):
with litecon:
litecur = litecon.cursor()
litecur.execute('SELECT u.user_id FROM users u LEFT JOIN %s f ON (u.user_id = f.user_id) WHERE f.user_id IS NULL' % endpoint)
# go 100 at a time so we're not hitting the DB so much
users = [u[0] for u in litecur.fetchmany(100)]
if not users: break
for user_id in users:
ids = get_network(endpoint, user_id)
else:
try:
ids = []
# a user_id was passed in, fetch it and return a friends list
if endpoint == 'friends':
for page in tweepy.Cursor(api.friends_ids, id=user_id).pages():
ids.extend(page)
elif endpoint == 'followers':
for page in tweepy.Cursor(api.followers_ids, id=user_id).pages():
ids.extend(page)
# put in something so that we know we've gone after this user
if len(ids) == 0:
__save_network(endpoint, user_id, None, 'No %s found' % endpoint)
__save_network(endpoint, user_id, ids)
except tweepy.TweepError as error:
__save_network(endpoint, user_id, None, error)
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument("--tweet-id", required=False, help="Specify a Tweet ID of the user you are interested in")
ap.add_argument("--screen-name", required=False, help="Screen name or user ID of twitter user")
ap.add_argument("--input-file", required=False, help="Specify a dump file from ESBI")
ap.add_argument("--batch", required=False, help="Try to fetch everything from DB in 100 tweet batches", action="store_true")
ap.add_argument("--individual", required=False, help="Fetch tweets that have not been found, one at a time", action="store_true")
ap.add_argument("--timelines", required=False, help="Fetch the timeslines of users", action="store_true")
ap.add_argument("--friends", required=False, help="Get all the friends of a user id", action="store_true")
ap.add_argument("--followers", required=False, help="Get all the friends of a user id", action="store_true")
args = vars(ap.parse_args())
tweet_id = args['tweet_id']
screenname = args['screen_name']
input_filename = args['input_file']
if input_filename:
load_file(input_filename)
if tweet_id:
tweet = api.get_status(tweet_id)
__save_tweet(tweet_id, tweet)
if args['batch']:
get_tweets_in_sample_batch()
if args['individual']:
# this will go back and fill in individual errors. The batch simply doesn't return errors
get_tweets_in_sample()
if args ['timelines']:
get_timelines_batch()
if args['friends']:
get_friends()
if args['followers']:
get_followers()