Skip to content

Commit

Permalink
Merge pull request #96 from digidem/v8-missing
Browse files Browse the repository at this point in the history
  • Loading branch information
okdistribute authored Jul 2, 2020
2 parents 5e1b8ff + 2ea5d16 commit 99c9cc4
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 13 deletions.
31 changes: 22 additions & 9 deletions lib/db-sync-progress.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ module.exports = function (db, opts) {

var stream = multifeed.replicate(opts)

var feeds = []
var progress = {}
var feeds = new Map()
var listeners = []

multifeed.ready(function () {
multifeed.feeds().forEach(onFeed)
multifeed.on('feed', onFeed)
stream.on('remote-feeds', onRemoteFeeds)

function onRemoteFeeds () {
multifeed.feeds().forEach(onFeed)
}

eos(stream, function () {
multifeed.removeListener('feed', onFeed)
stream.removeListener('remote-feeds', onRemoteFeeds)
listeners.forEach(function (l) {
l.feed.removeListener('upload', l.listener)
l.feed.removeListener('download', l.listener)
})
})
Expand All @@ -24,19 +30,26 @@ module.exports = function (db, opts) {
return stream

function onFeed (feed) {
feeds.push(feed)
if (!feed.writable && feeds.has(feed.key.toString('hex'))) return
feeds.set(feed.key.toString('hex'), feed)
feed.ready(updateFeed.bind(null, feed))
feed.on('download', onDownload)
function onDownload () {
feed.on('download', listener)
feed.on('upload', listener)

function listener () {
updateFeed(feed)
}
listeners.push({ feed: feed, listener: onDownload })
listeners.push({ feed, listener })
}

function updateFeed (feed) {
progress[feed.key.toString('hex')] = feed.downloaded(0, feed.length)
var total = feeds.reduce(function (acc, feed) { return acc + feed.length }, 0)
var sofar = feeds.reduce(function (acc, feed) { return acc + feed.downloaded(0, feed.length) }, 0)
var all = Array.from(feeds.values())
var total = all.reduce(function (acc, feed) {
return acc + feed.length
}, 0)
var sofar = all.reduce(function (acc, feed) {
return acc + feed.downloaded(0, feed.length)
}, 0)
stream.emit('progress', sofar, total)
}
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"handshake-stream": "^3.0.0",
"hypercore-crypto": "^1.0.0",
"inherits": "^2.0.3",
"multifeed": "^4.3.0",
"multiplex": "^6.7.0",
"osm-p2p-geojson": "^4.0.1",
"osm-p2p-syncfile": "^4.1.0",
Expand Down
20 changes: 18 additions & 2 deletions sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const DEFAULT_INTERNET_DISCO = Object.assign(
}
)

const DEFAULT_HEARTBEAT_INTERVAL = 1000 * 20 // 20 seconds

const ReplicationState = {
WIFI_READY: 'replication-wifi-ready',
PROGRESS: 'replication-progress',
Expand Down Expand Up @@ -97,8 +99,10 @@ class SyncState {
}
}

activePeers () {
return this.peers().filter(this._isactive)
stale (peer) {
var staleDate = Date.now() - DEFAULT_HEARTBEAT_INTERVAL
return peer.state.topic === ReplicationState.PROGRESS &&
(peer.state.message.timestamp < staleDate)
}

_isactive (peer) {
Expand Down Expand Up @@ -146,6 +150,7 @@ class SyncState {

onprogress (peer, progress) {
if (this._isclosed(peer)) return
progress.timestamp = Date.now()
peer.state = PeerState(ReplicationState.PROGRESS, progress)
}

Expand Down Expand Up @@ -408,6 +413,7 @@ class Sync extends events.EventEmitter {
let peer
let deviceType
let disconnected = false
let heartbeat

connection.on('close', onClose)
connection.on('error', onClose)
Expand All @@ -419,6 +425,7 @@ class Sync extends events.EventEmitter {
function onClose (err) {
disconnected = true
if (peer) peer.connected = false
if (heartbeat) clearInterval(heartbeat)
debug('onClose', info.host, info.port, err)
if (!open) return
open = false
Expand Down Expand Up @@ -448,11 +455,20 @@ class Sync extends events.EventEmitter {
debug('sync started', info.host, info.port)
if (peer) peer.sync.emit('sync-start')
self.osm.core.pause()
// XXX: This is a hack to ensure sync streams always end eventually
// Ideally, we'd open a sparse hypercore instead.
heartbeat = setInterval(() => {
if (self.state.stale(peer)) {
connection.destroy(new Error('timed out due to missing data'))
}
debug('heartbeat', self.state.stale(peer))
}, DEFAULT_HEARTBEAT_INTERVAL)
})
stream.on('progress', (progress) => {
debug('sync progress', info.host, info.port, progress)
if (peer) peer.sync.emit('progress', progress)
})

pump(stream, connection, stream, function (err) {
debug('pump ended', info.host, info.port)
if (peer && peer.started) {
Expand Down
78 changes: 76 additions & 2 deletions test/db-sync-progress.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ test('sync progress: 6 entries', function (t) {
var a = sync(db1, { live: false })
var b = sync(db2, { live: false })

var eventsLeftA = 5
var eventsLeftB = 5
var eventsLeftA = 12
var eventsLeftB = 12
a.on('progress', function (sofar, total) {
eventsLeftA--
})
Expand Down Expand Up @@ -123,3 +123,77 @@ test('sync progress: 200 entries', function (t) {
})
})
})

test('sync progress: 3 devices', function (t) {
t.plan(29)

setup(3, function (err, db1) {
t.error(err)
setup(3, function (err, db2) {
t.error(err)
setup(3, function (err, db3) {
t.error(err)
var feed1 = db1.osm.core._logs
var feed2 = db2.osm.core._logs
var feed3 = db3.osm.core._logs

var a = sync(db1, { live: false })
var b = sync(db2, { live: false })
var c = sync(db3, { live: false })

a.on('progress', function (sofar, total) {
t.notOk(sofar > total)
})
b.on('progress', function (sofar, total) {
t.notOk(sofar > total)
})
c.on('progress', function (sofar, total) {
t.notOk(sofar > total)
})

pump(a, b, a, function (err) {
t.error(err)
})
})
})
})
})

test('sync progress: 200 entries', function (t) {
t.plan(11)

setup(100, function (err, db1) {
t.error(err)
setup(100, function (err, db2) {
t.error(err)
var feed1 = db1.osm.core._logs
var feed2 = db2.osm.core._logs

var a = sync(db1, { live: false })
var b = sync(db2, { live: false })

var sofarA, totalA
var sofarB, totalB
a.on('progress', function (sofar, total) {
sofarA = sofar
totalA = total
})
b.on('progress', function (sofar, total) {
sofarB = sofar
totalB = total
})

pump(a, b, a, function (err) {
t.error(err)
t.equals(feed1.feeds()[0].length, 100)
t.equals(feed1.feeds()[1].length, 100)
t.equals(feed2.feeds()[0].length, 100)
t.equals(feed2.feeds()[1].length, 100)
t.equals(sofarA, 200)
t.equals(sofarA, 200)
t.equals(totalB, 200)
t.equals(totalB, 200)
})
})
})
})
11 changes: 11 additions & 0 deletions test/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var Mapeo = require('..')

module.exports = {
createApi,
writeBigDataNoPhotos,
writeBigData,
generateObservations
}
Expand All @@ -37,6 +38,16 @@ function createApi (_dir, opts) {
return mapeo
}

function writeBigDataNoPhotos (mapeo, n, cb) {
generateObservations(n, function (_, obs, i) {
mapeo.observationCreate(obs, (_, node) => {
if (i === 0) {
cb()
}
})
})
}

function writeBigData (mapeo, n, cb) {
var tasks = []
var left = n
Expand Down
93 changes: 93 additions & 0 deletions test/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ tape('sync: syncfile replication: osm-p2p-syncfile', function (t) {

function syncfileWritten (err) {
t.error(err, 'first syncfile written ok')
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: 1, total: 1 },
media: { sofar: 1, total: 1 }
Expand All @@ -355,6 +356,7 @@ tape('sync: syncfile replication: osm-p2p-syncfile', function (t) {

function secondSyncfileWritten (err) {
t.error(err, 'second syncfile written ok')
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: 1, total: 1 },
media: { sofar: 1, total: 1 }
Expand Down Expand Up @@ -444,6 +446,7 @@ tape('sync: syncfile /wo projectKey, api with projectKey set', function (t) {

function syncfileWritten (err) {
t.error(err, 'first syncfile written ok')
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: 1, total: 1 },
media: { sofar: 1, total: 1 }
Expand All @@ -459,6 +462,7 @@ tape('sync: syncfile /wo projectKey, api with projectKey set', function (t) {

function secondSyncfileWritten (err) {
t.error(err, 'second syncfile written ok')
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: 1, total: 1 },
media: { sofar: 1, total: 1 }
Expand Down Expand Up @@ -532,6 +536,7 @@ tape('sync: desktop <-> desktop photos', function (t) {

syncer.on('end', function () {
t.ok(true, 'replication complete')
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: 5, total: 5 },
media: { sofar: 18, total: 18 }
Expand Down Expand Up @@ -647,6 +652,7 @@ tape('sync: deletes are not synced back', function (t) {

syncer.on('end', function () {
t.ok(true, 'replication complete')
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: 5, total: 5 },
media: { sofar: 18, total: 18 }
Expand Down Expand Up @@ -1031,6 +1037,92 @@ tape('sync: peer.connected property on graceful exit', function (t) {
})
})

tape('sync: missing data still ends', function (t) {
t.plan(18)
var opts = {api1:{deviceType:'desktop'}, api2:{deviceType:'desktop'}}
createApis(opts, function (api1, api2, close) {
var pending = 4
var restarted = false
var _api1 = null

api1.sync.once('peer', written.bind(null, null))
api2.sync.once('peer', written.bind(null, null))
api1.sync.listen(() => {
api1.sync.join()
})
api2.sync.listen(() => {
api2.sync.join()
})
helpers.writeBigData(api1, 50, written)
helpers.writeBigDataNoPhotos(api2, 450, written)

function written (err) {
t.error(err, 'written no error')
if (--pending === 0) {
t.ok(api1.sync.peers().length > 0, 'api 1 has peers')
t.ok(api2.sync.peers().length > 0, 'api 2 has peers')
if (api2.sync.peers().length >= 1) {
sync(api2.sync.peers()[0], true)
}
}
}

function restart () {
api1.sync.leave()
api1.close((err) => {
api1.osm.close(() => {
t.error(err, 'closed first api')
_api1 = helpers.createApi(api1._dir)
helpers.writeBigDataNoPhotos(_api1, 200, () => {
_api1.sync.listen(() => {
api2.sync.once('peer', (peer) => {
sync(peer, false)
})
_api1.sync.join()
})
})
})
})
}

function done () {
_api1.close(() => {
_api1.osm.close(() => {
api2.close(() => {
t.end()
})
})
})
}

function sync (peer, first) {
t.ok(peer, 'syncronizing ' + first)
api2.sync.on('down', (peer) => {
t.pass('emit down event on close')
t.notOk(peer.connected, 'not connected anymore')
if (!first) done()
})
var syncer = api2.sync.replicate(peer)
syncer.on('error', function (err) {
if (first) t.ok(err, 'error on first ok')
else t.same(err.message, 'timed out due to missing data', 'error message for missing data')
})

syncer.on('progress', function (progress) {
if (first && progress.db.sofar > 450 && progress.db.sofar < 455 && !restarted) {
t.ok(peer.started, 'started is true')
restart()
restarted = true
}
})

syncer.on('end', function () {
t.ok(true, 'replication complete')
})
}
})
})

tape('sync: 200 photos & close/reopen real-world scenario', function (t) {
t.plan(18)
var opts = {api1:{deviceType:'desktop'}, api2:{deviceType:'desktop'}}
Expand Down Expand Up @@ -1111,6 +1203,7 @@ tape('sync: 200 photos & close/reopen real-world scenario', function (t) {
'preview/goodbye_world.png',
'thumbnail/goodbye_world.png'
])
delete lastProgress.timestamp
t.deepEquals(lastProgress, {
db: { sofar: total, total: total },
media: { sofar: expectedMedia.length, total: expectedMedia.length }
Expand Down

0 comments on commit 99c9cc4

Please sign in to comment.