-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathTracked.hs
449 lines (396 loc) · 14.5 KB
/
Tracked.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
module Tracked where
import Prelude
import qualified Control.Exception as E
import Control.Concurrent
import Control.Monad
import Data.List (intercalate)
import Data.Maybe (fromMaybe)
import Data.Hashable (Hashable, hashWithSalt)
import Data.Data (Typeable)
import Data.Text (Text)
import Control.Concurrent.STM
import qualified Data.Map.Strict as Map
import qualified Data.HashMap.Strict as HM
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as BC
import Data.Aeson (Value)
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Data.HashSet (HashSet)
import qualified Data.HashSet as HashSet
import Numeric (showHex)
import Data.Bits (shiftL, (.|.))
import Data.Word (Word8)
import Utils (getNow)
import Model (InfoHash)
newtype PeerId = PeerId { unPeerId :: BC.ByteString }
deriving (Show, Typeable, Eq, Ord, Hashable)
data PeerAddress = Peer4 !BC.ByteString
| Peer6 !BC.ByteString
deriving (Read, Typeable, Eq, Ord)
instance Show PeerAddress where
show (Peer4 bs) =
intercalate "." $
map show $
B.unpack bs
show (Peer6 bs) =
let groupAt :: Int -> [a] -> [[a]]
groupAt n xs
| length xs <= n =
[xs]
| otherwise =
take n xs : groupAt n (drop n xs)
words = groupAt 2 $
map (fromIntegral :: Word8 -> Int) $
B.unpack bs
in if length words == 8
then intercalate ":" $
map (\[b1, b2] ->
showHex ((b1 `shiftL` 8) .|. b2) ""
) words
else "::" -- ^invalid
instance Hashable PeerAddress where
hashWithSalt salt (Peer4 bs) = hashWithSalt salt bs
hashWithSalt salt (Peer6 bs) = hashWithSalt salt bs
data ConnInfo = BittorrentInfo !PeerAddress !Int
| WebtorrentInfo !PeerAddress (TChan Value)
instance Show ConnInfo where
show (BittorrentInfo addr port) = show addr ++ ":" ++ show port
show (WebtorrentInfo addr _) = "<WS " ++ show addr ++ ">"
cKind :: ConnInfo -> TrackedKind
cKind (BittorrentInfo _ _) =
Bittorrent
cKind (WebtorrentInfo _ _) =
Webtorrent
cAddr :: ConnInfo -> PeerAddress
cAddr (BittorrentInfo addr _) = addr
cAddr (WebtorrentInfo addr _) = addr
data TrackedPeer = TrackedPeer { pConnInfo :: !ConnInfo
, pUploaded :: !Int
, pDownloaded :: !Int
, pUpspeed :: !Int
, pDownspeed :: !Int
, pLeft :: !Int
, pLastRequest :: !Int
} deriving (Show)
data TrackedKind = Bittorrent | Webtorrent
deriving (Eq, Show, Ord)
pKind :: TrackedPeer -> TrackedKind
pKind (TrackedPeer { pConnInfo = c }) =
cKind c
pAddr :: TrackedPeer -> PeerAddress
pAddr (TrackedPeer { pConnInfo = c }) =
cAddr c
updatePeerDeltas :: TrackedPeer -> TrackedPeer -> TrackedPeer
updatePeerDeltas oldPeer newPeer =
newPeer { pUpspeed = max 0 $
(pUploaded newPeer - pUploaded oldPeer) `div` dt
, pDownspeed = max 0 $
(pDownloaded newPeer - pDownloaded oldPeer) `div` dt
}
where dt = max 1 $
pLastRequest newPeer - pLastRequest oldPeer
data TrackedScrape = TrackedScrape {
scrapeLeechers :: !Int,
scrapeSeeders :: !Int,
scrapeDownspeed :: !Int,
scrapeUpspeed :: !Int
} deriving (Show, Typeable)
instance Monoid TrackedScrape where
mempty = TrackedScrape 0 0 0 0
(TrackedScrape l1 s1 d1 u1) `mappend` (TrackedScrape l2 s2 d2 u2) =
TrackedScrape (l1 + l2) (s1 + s2) (d1 + d2) (u1 + u2)
data TrackedData = TrackedData { dataPeers :: HM.HashMap PeerId TrackedPeer
, dataBittorrentScrape :: TrackedScrape
, dataWebtorrentScrape :: TrackedScrape
}
updateData :: (HM.HashMap PeerId TrackedPeer -> HM.HashMap PeerId TrackedPeer) -> TrackedData -> TrackedData
updateData f (TrackedData { dataPeers = peers }) =
let peers' = f peers
scrape kind =
HM.foldl'
(\scrape peer ->
let isSeeder =
pLeft peer == 0
in if pKind peer == kind
then scrape `mappend`
TrackedScrape
{ scrapeLeechers =
if isSeeder
then 0
else 1
, scrapeSeeders =
if isSeeder
then 1
else 0
, scrapeDownspeed =
pDownspeed peer
, scrapeUpspeed =
pUploaded peer
}
else scrape
) mempty peers'
in TrackedData peers' (scrape Bittorrent) (scrape Webtorrent)
newData :: TrackedData
newData =
TrackedData { dataPeers = HM.empty
, dataBittorrentScrape = mempty
, dataWebtorrentScrape = mempty
}
data Tracked = Tracked
{ trackedTTracked :: TVar (HM.HashMap InfoHash (TVar TrackedData))
, trackedTPopular :: TVar [InfoHash]
}
trackedPopular :: Tracked -> IO [InfoHash]
trackedPopular = atomically . readTVar . trackedTPopular
newTracked :: IO Tracked
newTracked = do
tTracked <- newTVarIO HM.empty
tPopular <- newTVarIO []
let tracked = Tracked tTracked tPopular
let cleanAndStats = do
t1 <- getCurrentTime
infoHashes <- atomically $
HM.keys <$>
readTVar tTracked
t2 <- getCurrentTime
popular <-
(map snd . Map.toDescList) <$>
foldM (\popular infoHash -> do
now <- getNow
trackedModifyData' tracked infoHash $ \d ->
let d' =
-- Drop outdated peers
updateData (HM.filter $
\peer ->
now <= pLastRequest peer + peerTimeout
) d
-- |Collect popular list
scrape =
dataBittorrentScrape d' `mappend`
dataWebtorrentScrape d'
popularity =
(scrapeSeeders scrape +
scrapeLeechers scrape,
scrapeSeeders scrape,
infoHash
)
popular' =
Map.insert popularity infoHash popular
popular''
| Map.size popular < popularTorrents =
-- Need more popularTorrents
popular'
| otherwise =
-- Drop one off
let lowestPopularity =
fst $ head $
Map.toAscList popular'
in Map.delete lowestPopularity popular'
in (popular'', d')
) Map.empty infoHashes
t3 <- getCurrentTime
atomically $
writeTVar tPopular popular
t4 <- getCurrentTime
putStrLn $ "Found " ++
show (length popular) ++ " popular torrents from " ++
show (length infoHashes) ++ " info_hashes in " ++
show (truncate $ (t2 `diffUTCTime` t1) * 1000 :: Int) ++ "+" ++
show (truncate $ (t3 `diffUTCTime` t2) * 1000 :: Int) ++ "+" ++
show (truncate $ (t4 `diffUTCTime` t3) * 1000 :: Int) ++ "ms"
handleException :: E.SomeException -> IO ()
handleException = print
cleanAndStatsLoop =
forever $ do
E.catch cleanAndStats handleException
-- Sleep 10s before next run
threadDelay 10000000
void $ forkIO cleanAndStatsLoop
-- Assume a single instance, clear all gauges of a previous crash
-- TODO
return tracked
where
peerTimeout = 3600
popularTorrents = 100
trackedGetData :: Tracked -> InfoHash -> IO TrackedData
trackedGetData (Tracked tTracked _) infoHash =
atomically $
HM.lookup infoHash <$>
readTVar tTracked >>=
maybe (return newData) readTVar
trackedModifyData' :: Tracked -> InfoHash -> (TrackedData -> (a, TrackedData)) -> IO a
trackedModifyData' (Tracked tTracked _) infoHash f =
atomically $ do
tracked <- readTVar tTracked
let mDataRef = HM.lookup infoHash tracked
d <- maybe (return newData) readTVar mDataRef
let (a, d') = f d
case HM.null $ dataPeers d' of
True ->
case mDataRef of
-- infoHash was there before but has no more peers now
Just _ ->
writeTVar tTracked $
HM.delete infoHash tracked
-- infoHash wasn't there before and has no peers now
Nothing ->
return ()
False ->
case mDataRef of
-- infoHash wasn't there before and has peers now
Nothing -> do
dataRef <- newTVar d'
writeTVar tTracked $
HM.insert infoHash dataRef tracked
-- infoHash was there before but has no more peers now
Just dataRef ->
writeTVar dataRef d'
return a
trackedModifyData :: Tracked -> InfoHash -> (TrackedData -> TrackedData) -> IO ()
trackedModifyData tracked infoHash f =
trackedModifyData' tracked infoHash $
((), ) . f
data TrackedAnnounce
= TrackedAnnounce { aInfoHash :: !InfoHash
, aPeerId :: !PeerId
, aConnInfo :: !ConnInfo
, aUploaded :: !Int
, aDownloaded :: !Int
, aLeft :: !Int
, aEvent :: Maybe Text
} deriving (Show)
data TrackedAnnounced
= TrackedAnnounced { adPeers :: [(PeerId, TrackedPeer)]
, adCompleted :: Bool
, adUploaded :: Int
, adDownloaded :: Int
} deriving (Show)
announce :: Tracked -> TrackedAnnounce -> IO TrackedAnnounced
announce tracked announce
| aEvent announce == Just "stopped" = do
let peerId = aPeerId announce
(uploaded, downloaded) <- trackedModifyData' tracked (aInfoHash announce) $ \d ->
case HM.lookup peerId (dataPeers d) of
Nothing ->
((0, 0), d)
Just peer ->
let uploaded = aUploaded announce - pUploaded peer
downloaded = aDownloaded announce - pDownloaded peer
d' = updateData (HM.delete peerId) d
in ((uploaded, downloaded), d')
return $ TrackedAnnounced [] False uploaded downloaded
announce tracked announce@(TrackedAnnounce {}) = do
now <- getNow
let isSeeder = aLeft announce == 0
newPeer =
TrackedPeer { pUploaded = aUploaded announce
, pDownloaded = aDownloaded announce
, pUpspeed = 0
, pDownspeed = 0
, pLeft = aLeft announce
, pLastRequest = now
, pConnInfo = aConnInfo announce
}
-- | For filtering result peers
isEqInfo :: TrackedPeer -> Bool
isEqInfo peer =
pKind peer == cKind (aConnInfo announce)
trackedModifyData' tracked (aInfoHash announce) $ \data' ->
let
oldPeer =
aPeerId announce `HM.lookup` dataPeers data'
data'' =
updateData
(HM.alter
(maybe (Just newPeer)
(\oldPeer ->
Just $! updatePeerDeltas oldPeer newPeer
))
(aPeerId announce)
) data'
peers =
HM.foldlWithKey'
(\peers peerId peer ->
let emit
| peerId == aPeerId announce =
-- Don't return the same peer that requested
False
| not (isEqInfo peer) =
-- Don't return TCP peers to WebRTC peers and vice versa
False
| isSeeder =
-- Return only leechers to seeders
pLeft peer > 0
| otherwise =
-- Return any valid peer to seeders
True
in if emit
then (peerId, peer) : peers
else peers
)
[] (dataPeers data'')
completed =
-- Count event=completed
aEvent announce == Just "completed" ||
-- Or when left=0 and it previously wasn't
(aLeft announce == 0 && fromMaybe 0 (pLeft <$> oldPeer) > 0)
uploaded =
max 0 $
fromMaybe 0 $ do
oldPeer' <- oldPeer
return $ pUploaded newPeer - pUploaded oldPeer'
downloaded =
max 0 $
fromMaybe 0 $ do
oldPeer' <- oldPeer
return $ pDownloaded newPeer - pDownloaded oldPeer'
ad =
TrackedAnnounced { adPeers = peers
, adCompleted = completed
, adUploaded = uploaded
, adDownloaded = downloaded
}
in (ad, data'')
scrapeWebtorrent :: Tracked -> InfoHash -> IO TrackedScrape
scrapeWebtorrent tracked infoHash =
dataWebtorrentScrape <$>
trackedGetData tracked infoHash
scrapeBittorrent :: Tracked -> InfoHash -> IO TrackedScrape
scrapeBittorrent tracked infoHash =
dataBittorrentScrape <$>
trackedGetData tracked infoHash
getPeer :: Tracked -> InfoHash -> PeerId -> IO (Maybe TrackedPeer)
getPeer tracked infoHash peerId =
HM.lookup peerId <$>
dataPeers <$>
trackedGetData tracked infoHash
clearPeer :: Tracked -> InfoHash -> PeerId -> IO ()
clearPeer tracked infoHash peerId =
trackedModifyData tracked infoHash $
updateData $
HM.delete $ peerId
getChangedAddrs :: Tracked -> HashSet PeerAddress -> IO (HashSet PeerAddress, HashSet PeerAddress, HashSet PeerAddress)
getChangedAddrs tracked known =
atomically $ do
tracked' <- readTVar (trackedTTracked tracked)
current <-
foldM
(\current tData ->
foldl
(\current peer ->
HashSet.insert (pAddr peer) current
) current <$>
dataPeers <$>
readTVar tData
) HashSet.empty (HM.elems tracked')
let
-- | removed means it was known but is not current
removed = known `HashSet.difference` current
-- | added means it is there currently but was not known before
added = current `HashSet.difference` known
case (HashSet.null removed, HashSet.null added) of
(True, True) ->
-- No changes
retry
_ ->
return (current, removed, added)