-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better design of the Emitter #26
base: main
Are you sure you want to change the base?
Conversation
closed = gt.FromEmitQueue.IsClosed() | ||
} | ||
return closed, count | ||
return closed, count, largest, false, false | ||
} | ||
|
||
// Injects expired edges into the topology event buffer. To be done after the topology event buffer has been remapped with internal source ids. | ||
func InjectExpired[EP EPP[E], V VPI[V], E EPI[E], M MVI[M], N any](g *Graph[V, E, M, N], gt *GraphThread[V, E, M, N], changeCount uint64, uniqueCount uint64, delOnExpire uint64) (newUniqueCount uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of the way the TopologyEventBuff is now indexed (via InputEvent ThreadOrder and topologyFlushedUpTo), this will need to be updated to inject deletes in a different way. (They can't go into the main TopologyEventBuff unless they actually came from the Emitter, or this would mess with the FlushedUpTo index.)
One way could be to have the Emitter track them and inject them, but this would be terrible for performance.
Possibly better to keep a separate buffer of the expired events and have perhaps a flag in the VertexPendingBuff that signifies which buffer to pull from for the next change...
A third possibility (and probably best for performance) is to indeed use the ThreadOrder and use additional offset calculations (incremented with each injected delete into the buffer), but this might be complicated...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can turn TopologyEventBuff
into a FIFO queue with one head pointer (for inserting new adds) and two tail pointers (next event to add and next event to delete). With this queue, we don't need to re-insert the same event into TopologyEventBuff
to delete the event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the Emitter doesn't track deletes, can deletes still have proper global event IDs or ThreadOrder? This might be a problem for enforcing ordering between messages and events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should consider inserting deletes directly in the input file, which should make things much more straightforward...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! The new design is really cool, as it provides both higher performance and better ordering guarantees. It also gets rid of one extra thread!
I think we can further improve the performance on directed graphs by skipping the remit messages from src to dst? Those messages don't seem to do anything.
@@ -557,3 +561,150 @@ func (rb *GrowableRingBuff[T]) GetSlow(pos uint64) (item T, closed bool, fails i | |||
} | |||
} | |||
} | |||
|
|||
// ---------------------------- MPSC Token Ring Buffer ---------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These ring buffers are really cool!
gt.Response <- ACK | ||
wantsAsyncQueryNow = false | ||
blockTop = true // We will block further topology until we receive the resume command. | ||
// TODO: Check for g.Options.AllowAsyncVertexProps , need to adjust for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is g.Options.AllowAsyncVertexProps
assumed to be true
here?
pos = vs.InEventPos | ||
if onInEdgeAddFunc != nil { | ||
onInEdgeAddFunc(g, gt, gt.Vertex(didx), gt.VertexProperty(didx), didx, pos, &event) | ||
if gt.InputEvent.EventType() == ADD { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are deletes from Emitter supported?
gt.FailedRemit = true | ||
return false, count, largest, true, false | ||
} | ||
gt.TopologyEventBuff[gt.InputEvent.ThreadOrderDst-topologyFlushedUpTo] = RawEdgeEvent[E]{TypeAndEventIdx: gt.InputEvent.TypeAndEventIdx, Sidx: vidx, Edge: gt.RemitEvent.Edge} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to save the event here? (Same for Line 94)
closed = gt.FromEmitQueue.IsClosed() | ||
} | ||
return closed, count | ||
return closed, count, largest, false, false | ||
} | ||
|
||
// Injects expired edges into the topology event buffer. To be done after the topology event buffer has been remapped with internal source ids. | ||
func InjectExpired[EP EPP[E], V VPI[V], E EPI[E], M MVI[M], N any](g *Graph[V, E, M, N], gt *GraphThread[V, E, M, N], changeCount uint64, uniqueCount uint64, delOnExpire uint64) (newUniqueCount uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can turn TopologyEventBuff
into a FIFO queue with one head pointer (for inserting new adds) and two tail pointers (next event to add and next event to delete). With this queue, we don't need to re-insert the same event into TopologyEventBuff
to delete the event.
eventsHeight := uint64(0) | ||
eventsApplyCountNow := uint64(0) | ||
eventsTotalCount := uint64(0) | ||
eventsFlushedUpTo := uint64(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventsFlushedUpTo := uint64(0) | |
eventsFlushedUpTo := uint64(0) // ThreadOrder of the last event flushed - 1 (or the number of events flushed) |
ThreadOrderSrc uint64 // For src thread, the emmiter tracked index. | ||
ThreadOrderDst uint64 // For dst thread, the emmiter tracked index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadOrderSrc uint64 // For src thread, the emmiter tracked index. | |
ThreadOrderDst uint64 // For dst thread, the emmiter tracked index. | |
ThreadOrderSrc uint64 // For src thread, the emitter tracked index. | |
ThreadOrderDst uint64 // For dst thread, the emitter tracked index. |
gt.Status = REMIT | ||
remitClosed, remitCount = checkToRemit(alg, g, gt, onInEdgeAddFunc) | ||
remitClosed, remitCount, currLargest, _, wantsQueryNow = checkToRemit(alg, g, gt, onInEdgeAddFunc, eventsFlushedUpTo) | ||
largest = utils.Max(largest, currLargest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
largest = utils.Max(largest, currLargest) | |
largest = utils.Max(largest, currLargest) // checkToRemit might return 0 for currLargest |
remitCount := uint64(0) | ||
remitTotalCount := uint64(0) | ||
|
||
largest := uint64(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
largest := uint64(0) | |
largest := uint64(0) // ThreadOrder index of the last event remitted (i.e., the last event buffered in TopologyEventBuff). |
if sidx, ok = gt.VertexMap[gt.TopologyEventBuff[i].SrcRaw]; !ok { | ||
sidx = NewVertex(alg, g, gt, gt.TopologyEventBuff[i].SrcRaw, gt.TopologyEventBuff[i].EventIdx()) | ||
if !undirected && gt.TopologyEventBuff[i].EventIdx()%2 == 0 { | ||
continue // Skip even events for directed graphs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On direct graphs, why do both src and dst still need to remit to each other? It seems like the remit events from src to dst are just ignored.
This is the updated design of the Emitter, where it emits to both source and destination asynchronously. The source and destination then remit to the other. In this design, performance with directed graphs is still quite good, and undirected graphs can see a significant boost in performance.
Further, this design offers far more stable guarantees for vertex creation: vertexes are now always created in temporal order within a graph thread (i.e., they are sorted by first observation, regardless of if the first observation was from some other graph thread creating a new edge to a vertex that didn't exist yet.)
The resultant graph structure is now fully deterministic, and everything is sorted by total global ordering 🎉