-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrepository.py
161 lines (134 loc) · 5.67 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import api
import json
from util import parse_date
import cache
import logging
#unused
class ApiRepository:
page_size = 100
def __init__(self, token, sleep = None):
self._client = api.Client(token, sleep)
def get_activities(self):
all_activities = []
page = 1
while True:
activities = self._client.get_activities_page(
page, ApiRepository.page_size)
all_activities.extend(activities)
num = len(activities)
if num < ApiRepository.page_size or num == 0:
break
page += 1
return all_activities
def get_bikes(self):
athlete = self._client.get_athlete()
return athlete["bikes"]
def get_shoes(self):
athlete = self._client.get_athlete()
return athlete["shoes"]
def update_activity(self, id, data):
self._client.update_activity(id, **data)
class CachedRepository:
page_size = 100
def __init__(self, token, cache, update_cache = True, sleep = None):
self._client = api.Client(token, sleep)
self._cache = cache
self._run_update_cache = update_cache
def get_all_activities(self):
all_activities = []
page = 1
num = CachedRepository.page_size
while num == CachedRepository.page_size:
activities = self._client.get_activities_page(
page, CachedRepository.page_size)
all_activities.extend(activities)
num = len(activities)
page += 1
return all_activities
def _get_latest_timestamp(self, activities):
if not activities:
return 0
max_date = max([parse_date(a['start_date']) for a in activities])
return int(max_date.timestamp())
def _init_cache(self):
logging.getLogger('CachedRepository').debug("Initializing cache")
activities = self.get_all_activities()
self._cache.update_activities(activities)
def _merge_lists(self, activities, new_activities):
#combine the two lists and make sure the new list is sorted by utc date
activity_ids = {activity['id']: index for index, activity in enumerate(activities)}
import sys
for new_activity in new_activities:
id = new_activity['id']
if id in activity_ids:
activities[activity_ids[id]] = new_activity
else:
activities.append(new_activity)
return sorted(activities, key=lambda activity: activity['start_date'], reverse=True)
def _update_cache(self):
if not self._cache.is_initialized():
self._init_cache()
return
if not self._run_update_cache:
return
activities = self._cache.get_activities()
timestamp = self._get_latest_timestamp(activities)
logging.getLogger('CachedRepository').debug(
"Newest activity in cache {}".format(timestamp))
new_activities = []
page = 1
while True:
logging.getLogger('CachedRepository').info(
"Loading page {} of {} elements".format(page, CachedRepository.page_size))
curr_activities = self._client.get_activities_after(
timestamp, page, CachedRepository.page_size)
logging.getLogger('CachedRepository').debug(
"{} activities loaded".format(len(curr_activities)))
page += 1
new_activities.extend(curr_activities)
if len(curr_activities) < CachedRepository.page_size:
logging.getLogger('CachedRepository').debug(
"No more activities to load")
break
activities = self._merge_lists(activities, new_activities)
self._cache.update_activities(activities)
def get_activities(self):
self._update_cache()
return self._cache.get_activities()
def get_activity(self, id):
self._update_cache()
return self._cache.get_activity(id)
def get_activity_detail(self, id):
activity_detail = None
if not self._run_update_cache:
activity_detail = self._cache.get_activity_detail(id)
if activity_detail is None:
activity_detail = self._client.get_activity_detail(id)
self._cache.update_activity_detail(activity_detail)
return activity_detail
def get_gps(self, id):
streams = self._client.get_streams(id)
activity = self.get_activity(int(id))
start_time = int(parse_date(activity['start_date']).timestamp())
streams = zip(*(streams[key]['data'] if key in streams else [] for key in ('time', 'latlng', 'altitude')))
return activity, [(time + start_time, point, altitude) for time, point, altitude in streams]
def get_bikes(self):
athlete = self._client.get_athlete()
return athlete["bikes"]
def get_shoes(self):
athlete = self._client.get_athlete()
return athlete["shoes"]
def _merge_activity(self, activity, data):
for k, v in data.items():
if k in activity:
activity[k] = v
def update_activity(self, id, data):
logging.getLogger('CachedRepository').info(
"Updating activity {} with data {}".format(id, data))
self._client.update_activity(id, **data)
activity = self._cache.get_activity(id)
if activity is not None:
self._merge_activity(activity, data)
self._cache.update_activity(activity)
def get_repository(token, update_cache = True, sleep = None):
return CachedRepository(token, cache.get_cache(), update_cache, sleep)