Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better design of the Emitter #26

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft

Better design of the Emitter #26

wants to merge 2 commits into from

Conversation

ScottSallinen
Copy link
Owner

@ScottSallinen ScottSallinen commented Jun 30, 2024

This is the updated design of the Emitter, where it emits to both source and destination asynchronously. The source and destination then remit to the other. In this design, performance with directed graphs is still quite good, and undirected graphs can see a significant boost in performance.

Further, this design offers far more stable guarantees for vertex creation: vertexes are now always created in temporal order within a graph thread (i.e., they are sorted by first observation, regardless of if the first observation was from some other graph thread creating a new edge to a vertex that didn't exist yet.)

The resultant graph structure is now fully deterministic, and everything is sorted by total global ordering 🎉

closed = gt.FromEmitQueue.IsClosed()
}
return closed, count
return closed, count, largest, false, false
}

// Injects expired edges into the topology event buffer. To be done after the topology event buffer has been remapped with internal source ids.
func InjectExpired[EP EPP[E], V VPI[V], E EPI[E], M MVI[M], N any](g *Graph[V, E, M, N], gt *GraphThread[V, E, M, N], changeCount uint64, uniqueCount uint64, delOnExpire uint64) (newUniqueCount uint64) {
Copy link
Owner Author

@ScottSallinen ScottSallinen Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the way the TopologyEventBuff is now indexed (via InputEvent ThreadOrder and topologyFlushedUpTo), this will need to be updated to inject deletes in a different way. (They can't go into the main TopologyEventBuff unless they actually came from the Emitter, or this would mess with the FlushedUpTo index.)

One way could be to have the Emitter track them and inject them, but this would be terrible for performance.

Possibly better to keep a separate buffer of the expired events and have perhaps a flag in the VertexPendingBuff that signifies which buffer to pull from for the next change...

A third possibility (and probably best for performance) is to indeed use the ThreadOrder and use additional offset calculations (incremented with each injected delete into the buffer), but this might be complicated...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can turn TopologyEventBuff into a FIFO queue with one head pointer (for inserting new adds) and two tail pointers (next event to add and next event to delete). With this queue, we don't need to re-insert the same event into TopologyEventBuff to delete the event.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Emitter doesn't track deletes, can deletes still have proper global event IDs or ThreadOrder? This might be a problem for enforcing ordering between messages and events.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should consider inserting deletes directly in the input file, which should make things much more straightforward...

Copy link
Collaborator

@LuuO LuuO left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! The new design is really cool, as it provides both higher performance and better ordering guarantees. It also gets rid of one extra thread!

I think we can further improve the performance on directed graphs by skipping the remit messages from src to dst? Those messages don't seem to do anything.

@@ -557,3 +561,150 @@ func (rb *GrowableRingBuff[T]) GetSlow(pos uint64) (item T, closed bool, fails i
}
}
}

// ---------------------------- MPSC Token Ring Buffer ----------------------------
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ring buffers are really cool!

gt.Response <- ACK
wantsAsyncQueryNow = false
blockTop = true // We will block further topology until we receive the resume command.
// TODO: Check for g.Options.AllowAsyncVertexProps , need to adjust for this.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is g.Options.AllowAsyncVertexProps assumed to be true here?

pos = vs.InEventPos
if onInEdgeAddFunc != nil {
onInEdgeAddFunc(g, gt, gt.Vertex(didx), gt.VertexProperty(didx), didx, pos, &event)
if gt.InputEvent.EventType() == ADD {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are deletes from Emitter supported?

gt.FailedRemit = true
return false, count, largest, true, false
}
gt.TopologyEventBuff[gt.InputEvent.ThreadOrderDst-topologyFlushedUpTo] = RawEdgeEvent[E]{TypeAndEventIdx: gt.InputEvent.TypeAndEventIdx, Sidx: vidx, Edge: gt.RemitEvent.Edge}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to save the event here? (Same for Line 94)

closed = gt.FromEmitQueue.IsClosed()
}
return closed, count
return closed, count, largest, false, false
}

// Injects expired edges into the topology event buffer. To be done after the topology event buffer has been remapped with internal source ids.
func InjectExpired[EP EPP[E], V VPI[V], E EPI[E], M MVI[M], N any](g *Graph[V, E, M, N], gt *GraphThread[V, E, M, N], changeCount uint64, uniqueCount uint64, delOnExpire uint64) (newUniqueCount uint64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can turn TopologyEventBuff into a FIFO queue with one head pointer (for inserting new adds) and two tail pointers (next event to add and next event to delete). With this queue, we don't need to re-insert the same event into TopologyEventBuff to delete the event.

eventsHeight := uint64(0)
eventsApplyCountNow := uint64(0)
eventsTotalCount := uint64(0)
eventsFlushedUpTo := uint64(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
eventsFlushedUpTo := uint64(0)
eventsFlushedUpTo := uint64(0) // ThreadOrder of the last event flushed - 1 (or the number of events flushed)

Comment on lines +79 to +80
ThreadOrderSrc uint64 // For src thread, the emmiter tracked index.
ThreadOrderDst uint64 // For dst thread, the emmiter tracked index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ThreadOrderSrc uint64 // For src thread, the emmiter tracked index.
ThreadOrderDst uint64 // For dst thread, the emmiter tracked index.
ThreadOrderSrc uint64 // For src thread, the emitter tracked index.
ThreadOrderDst uint64 // For dst thread, the emitter tracked index.

gt.Status = REMIT
remitClosed, remitCount = checkToRemit(alg, g, gt, onInEdgeAddFunc)
remitClosed, remitCount, currLargest, _, wantsQueryNow = checkToRemit(alg, g, gt, onInEdgeAddFunc, eventsFlushedUpTo)
largest = utils.Max(largest, currLargest)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
largest = utils.Max(largest, currLargest)
largest = utils.Max(largest, currLargest) // checkToRemit might return 0 for currLargest

remitCount := uint64(0)
remitTotalCount := uint64(0)

largest := uint64(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
largest := uint64(0)
largest := uint64(0) // ThreadOrder index of the last event remitted (i.e., the last event buffered in TopologyEventBuff).

if sidx, ok = gt.VertexMap[gt.TopologyEventBuff[i].SrcRaw]; !ok {
sidx = NewVertex(alg, g, gt, gt.TopologyEventBuff[i].SrcRaw, gt.TopologyEventBuff[i].EventIdx())
if !undirected && gt.TopologyEventBuff[i].EventIdx()%2 == 0 {
continue // Skip even events for directed graphs.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On direct graphs, why do both src and dst still need to remit to each other? It seems like the remit events from src to dst are just ignored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants