-
Notifications
You must be signed in to change notification settings - Fork 516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-11898. design doc leader side execution #7583
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sumitagrawl for the docs.
Please combine the files into a single markdown file with headers (title, author, status, etc.), license (please see other design docs for example).
This will help readers know where to start, and it is also needed for display on the website: https://ozone.apache.org/docs/edge/design.html
@adoroszlai Please recheck, now have below as separate
Above kept separate as these are independent feature as part of leader execution and its design further will go independently, |
- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. | ||
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. | ||
- The number of transactions that can be pushed through Ratis currently caps out around 25k. | ||
- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is my understanding here correct?
- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second. | |
- The Current performance envelope for OM is around 12k transactions per second. The early testing of this feature pushes this to 40k transactions per second. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. | ||
4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current phrasing does not make it clear that these are things this feature aims to remove. The other items listed are things it is going to add or improve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
### Batching (Ratis request) | ||
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. | ||
|
||
### Apply Transaction (via ratis at all nodes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's another step after this that needs to specified: that we don't return success to the client until the apply transaction of their request has completed on the leader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX" | ||
Format: <timestamp>#<index> | ||
Time stamp: This will be used to identify last saved transaction executed | ||
Index: index identifier of the request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the rendered version of this section I don't think it is being displayed as intended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
- Upgrade: Last Ratis index + 1 | ||
|
||
|
||
#### Index Persistence: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a lot more details to this section, it doesn't really explain how this will work. I assume there is going to be some sort of atomic long incremented in memory. The control request section also does not add much information to explain this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow. | ||
2. objectId generation: need follow old logic of index to objectId mapping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These steps aren't clear to me. This section also needs to cover update ID handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no specific change for updateId as part, its just change in IndexProvider, and both objectId and updateId will use new index for further processing.
Updated this info.
|
||
### No-Cache for write operation | ||
|
||
In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. | |
In old flow, a key creation / update is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
- lock: granular level locking | ||
- unlock: unlock locked keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens while we are holding the lock down? Shouldn't this be where processing is happening? This seems like a duplicate of the information in the "Leader Execution" section but both sections are missing steps. For example submitting to Ratis is not mentioned here anywhere.
- [Create key](request/obs-create-key.md) | ||
- [Commit key](request/obs-commit-key.md) | ||
|
||
### Execution persist and distribution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this whole section needs to be redesigned. In theory, Ratis + RocksDB should be able to exist in its own module as a replicated DB with no dependencies on anything Ozone specific. We will need this eventually to bring the same code flow to SCM (for rolling upgrade) and Recon (for non-voting follower) without rewriting these critical pieces that deal with replication and persistence. Actually moving the code to separate modules may be outside the scope of this feature, but we need to define the API surface such that it is possible to avoid having to rewrite/refactor what is soon to be already new code. For this example I will refer to the replicated DB as its own module, even if V1 of the code does not structure it this way for migration purposes. It is the API surface used by each request that is more important to lock down now.
Input to this module should be of the form of protos that define the DB updates to perform. The actual values written to the DB should already have been serialized to bytes by this point and they should not be deserialized at any point later in the flow (with the exception of merges). This means the module has no knowledge of client ID, quota info, etc.
We would have one proto message defining each operation supported by the DB. The module takes one Batch
which contains these operations and will be treated as one Ratis request
message Put {
optional bytes columnFamily
optional bytes key
optional bytes value
}
message Delete {
optional bytes columnFamily
optional bytes key
}
message Merge {
optional bytes columnFamily
optional bytes key
optional bytes value
}
message Checkpoint {
// Path to place the checkpoint
optional string destination
}
// Only one field should be present to define the operation to do.
// The module can validate this input.
message Operation {
optional Put put
optional Delete delete
optional Merge merge
optional Checkpoint checkpoint
}
// Each OM request would result in one list of ordered operations submitted to the module.
// The module can internally combine these lists into one Batch proto that gets submitted to Ratis.
// The update to the transaction ID table needs to be handled within the module for each batch applied.
message Batch {
repeated Operation operations
}
Now to translate each proto to a DB update in Ratis' applyTransaction
:
Put
andDelete
simply map to existing RocksDB put and delete key ops. Note that RocksDB does not have a move operation.Checkpoint
creates a RocksDB checkpoint and will be used by snapshots.Merge
will be used to implement any increments required, like quota using the RocksDB associative merge operator. Initializers of the module will pass in a mapping of column families to their corresponding merge operators if required.- For example, the OM would initialize the module with a
BucketInfoMergeOperator
on theBucketTable
, aVolumeInfoMergeOperator
on theVolumeTable
, etc.
- For example, the OM would initialize the module with a
Then the API surface between OM or any other service and the replicated DB module is just a list of column families to open, with some optionally mapped to merge operator callbacks provided on construction, and calls to submit new Operation
lists to the module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Left an initial comments.
Regarding the request flow, could you add a more detailed sequence diagram? Similar to https://issues.apache.org/jira/browse/HDDS-1595 so that it's easier to visualize the new flow.
Here is the summary of the challenges: | ||
|
||
- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. | ||
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what does "effective batching" entails? Does it mean 1.2 OM requests per batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this should be clarified in the doc. I think this meant to say "the effect of batching on performance is a 1.2x speedup at best", as in best case the double buffer is only adding a 20% speedup, while prototypes of the new design show far greater improvements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, effective batching means,
at time T, 2 request changes flushed to db together
at time T+1, 1 request flushed to db together
...
at time T+n, 3 request flushed to db together
On average, it comes to be 1.2 request getting flushed to db. So this kind of batching is not effective.
Updated, 1.2 request (on average)
at best
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So rocksdb batching is not effective on NVME cluster as observed.
|
||
- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests. | ||
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features. | ||
- The number of transactions that can be pushed through Ratis currently caps out around 25k. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is the theoretical bottleneck on the Ratis itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, with ratis test, this is observed with previous test on ratis by some other member.
With OM when testing, with increase load and capacity, OM request processing was not able to improve and reached only 12K/second -- CPU used only approx 16% and having a lot of memory available.
When multiple request are merged as part of this batching, its able to reach 40K/second, CPU utilization has 33% as per report. At this point, request batching for ratis submit was high - reached 20 also as in performance test report for prototype graph -- "Merge_ratis_status"
| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% | | ||
| 4 | CPU Utilization Follower | 6% above | 4% below | | ||
|
||
Refer [performance prototype result](performance-prototype-result.pdf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these performance results referring to the prototype in #7406 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
- On restart (leader): last preserved index + 1 | ||
- On Switch over: last index + 1 | ||
- Request execution: index + 1 | ||
- Upgrade: Last Ratis index + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for existing cluster, the subsequent object IDs will be based on the Ratis last applied index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, This is only during upgrade case, where need to use last index to avoid duplicate index, and in-turn avoiding duplicate objectId which can have impact in logic processing.
Updated the document for upgrade impact for this new entry.
### Batching (Ratis request) | ||
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does the OM decide whether a batch will be sent to Ratis? Is it decided based on time / size of the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also some suggestions
### Batching (Ratis request) | |
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. | |
### Batching (Ratis request) | |
All requests executed in parallel are batched and send as single request to other nodes. This helps improve performance over network with batching. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the doc,
batcher is waiting over queue, as soon as request is available, it will be merging and sending to ratis.
During this time, more request can be added to queue.
So in idle case, batcher will be waiting over queue
In busy case, it will pick and send over ratis, and process. After that will again check queue.
--> Else continue request handling` | ||
|
||
#### Client request replay at leader node | ||
- When request is received at leader node, it will cache the request in replayCache immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to use retryCache
instead of replayCache
to standardize the terminology with Ratis.
Personally, "replay" terminology seems to be more related to the replaying unapplied Ratis transactions from the previous OM design documentations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
#### Replay cache distribution to other nodes | ||
Request - response will be cached to other node via ratis distribution | ||
- It will be added to memory cache with expiry handling | ||
- Also will be added to DB for persistence for restart handing | ||
|
||
Below information will be sync to all nodes via ratis: | ||
``` | ||
message ClientRequestInfo { | ||
optional string uuidClientId = 1; | ||
optional uint64 callId = 2; | ||
optional unint64 timestamp = 5; | ||
optional OMResponse response = 3; | ||
} | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this. Previously, each OM / Ratis request corresponds to a single OM / Ratis response because the Ratis will only reply pending request after the log is applied at the leader.
How would the client reply and retry cache mechanisms work now since each Ratis request contains multiple DB updates from multiple OM requests? So during log apply, the state machine needs to reply to multiple clients at the same time? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ratis reply from pending request after the log is applied in happy path.
But ratis also have retryCache present which will cache response corresponding to "callId#ClientId", and reply back with same response if same request is retried. This is additional fallback mechanism present in ratis when client retry same request due to some failure where response is not received.
As handling,
Each request and response is synchronized from leader with db updates to follower:
- callId and clientId (used to identify retry of request)
- timestamp
- response message (to be replied back if same request is retried as ratis behavior)
Now any incoming request to the node can be checked with "callId and clientId" if exist in cache. If exist, it will reply back with same "response" as provided.
#### Memory caching: | ||
``` | ||
Memory Map: ClientId#CallId Vs Response | ||
Expiry: 10 minute (as current default for ratis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This expiry is done independently for each OM node? It won't be replicated from leader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leader will sync below information with db changes,
- callI and clientId (used to identify retry of request)
- timestamp
- response message (to be replied back if same request is retried as ratis behavior)
Each node will make use of above information once received in applyTransaction and handle independently.
1. With Leader side execution, metrics and its capturing information can change. | ||
- Certain metrics may not be valid | ||
- New metrics needs to be added | ||
- Metrics will be updated at leader side now like for key create. At follower node, its just db update, so value will not be udpated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, also note that write audit logs will only be generated in leader, instead of both in leader and follower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
- If we shard OM, then across OMs the object ID will not be unique. | ||
- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs. | ||
|
||
Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about update ID? In the future if we decide to shard the OMs, we probably need some kind of sequence generator to generate a monotonically increasing ID for each update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateId is just represent change in Existing object or like versioning. So this may be just, lastUpdateId + 1
This part is not included as part of this feature and can be done with approach like above or any distributed indexing if required.
@ivandika3 @errose28 Updated docs for all comments. below is pending,
|
What changes were proposed in this pull request?
Design doc for leader side execuiton
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-11898
How was this patch tested?
NA