Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11898. design doc leader side execution #7583

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

sumitagrawl
Copy link
Contributor

What changes were proposed in this pull request?

Design doc for leader side execuiton

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-11898

How was this patch tested?

NA

@errose28 errose28 self-requested a review December 17, 2024 19:33
@sumitagrawl sumitagrawl marked this pull request as ready for review December 20, 2024 12:54
Copy link
Contributor

@adoroszlai adoroszlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sumitagrawl for the docs.

Please combine the files into a single markdown file with headers (title, author, status, etc.), license (please see other design docs for example).

This will help readers know where to start, and it is also needed for display on the website: https://ozone.apache.org/docs/edge/design.html

@sumitagrawl
Copy link
Contributor Author

Thanks @sumitagrawl for the docs.

Please combine the files into a single markdown file with headers (title, author, status, etc.), license (please see other design docs for example).

This will help readers know where to start, and it is also needed for display on the website: https://ozone.apache.org/docs/edge/design.html

@adoroszlai Please recheck, now have below as separate

  1. Leader-execution.md
  2. obs-lock.md
  3. requests:
  • obs-create-key.md
  • obs-commit-key.md

Above kept separate as these are independent feature as part of leader execution and its design further will go independently,

@sumitagrawl sumitagrawl requested a review from adoroszlai January 6, 2025 10:23
- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests.
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features.
- The number of transactions that can be pushed through Ratis currently caps out around 25k.
- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding here correct?

Suggested change
- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second.
- The Current performance envelope for OM is around 12k transactions per second. The early testing of this feature pushes this to 40k transactions per second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines 35 to 36
3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation.
4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current phrasing does not make it clear that these are things this feature aims to remove. The other items listed are things it is going to add or improve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

### Batching (Ratis request)
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching.

### Apply Transaction (via ratis at all nodes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's another step after this that needs to specified: that we don't return success to the client until the apply transaction of their request has completed on the leader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines 106 to 109
Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX"
Format: <timestamp>#<index>
Time stamp: This will be used to identify last saved transaction executed
Index: index identifier of the request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the rendered version of this section I don't think it is being displayed as intended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

- Upgrade: Last Ratis index + 1


#### Index Persistence:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a lot more details to this section, it doesn't really explain how this will work. I assume there is going to be some sort of atomic long incremented in memory. The control request section also does not add much information to explain this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +117 to +118
1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow.
2. objectId generation: need follow old logic of index to objectId mapping.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These steps aren't clear to me. This section also needs to cover update ID handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no specific change for updateId as part, its just change in IndexProvider, and both objectId and updateId will use new index for further processing.
Updated this info.


### No-Cache for write operation

In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes.
In old flow, a key creation / update is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +190 to +191
- lock: granular level locking
- unlock: unlock locked keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens while we are holding the lock down? Shouldn't this be where processing is happening? This seems like a duplicate of the information in the "Leader Execution" section but both sections are missing steps. For example submitting to Ratis is not mentioned here anywhere.

- [Create key](request/obs-create-key.md)
- [Commit key](request/obs-commit-key.md)

### Execution persist and distribution
Copy link
Contributor

@errose28 errose28 Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole section needs to be redesigned. In theory, Ratis + RocksDB should be able to exist in its own module as a replicated DB with no dependencies on anything Ozone specific. We will need this eventually to bring the same code flow to SCM (for rolling upgrade) and Recon (for non-voting follower) without rewriting these critical pieces that deal with replication and persistence. Actually moving the code to separate modules may be outside the scope of this feature, but we need to define the API surface such that it is possible to avoid having to rewrite/refactor what is soon to be already new code. For this example I will refer to the replicated DB as its own module, even if V1 of the code does not structure it this way for migration purposes. It is the API surface used by each request that is more important to lock down now.

Input to this module should be of the form of protos that define the DB updates to perform. The actual values written to the DB should already have been serialized to bytes by this point and they should not be deserialized at any point later in the flow (with the exception of merges). This means the module has no knowledge of client ID, quota info, etc.

We would have one proto message defining each operation supported by the DB. The module takes one Batch which contains these operations and will be treated as one Ratis request

message Put {
  optional bytes columnFamily
  optional bytes key
  optional bytes value
}

message Delete {
  optional bytes columnFamily
  optional bytes key
}

message Merge {
  optional bytes columnFamily
  optional bytes key
  optional bytes value
}

message Checkpoint {
  // Path to place the checkpoint  
  optional string destination
}

// Only one field should be present to define the operation to do.
// The module can validate this input.
message Operation {
  optional Put put
  optional Delete delete
  optional Merge merge
  optional Checkpoint checkpoint
}

// Each OM request would result in one list of ordered operations submitted to the module.
// The module can internally combine these lists into one Batch proto that gets submitted to Ratis.
// The update to the transaction ID table needs to be handled within the module for each batch applied.
message Batch {
  repeated Operation operations
}

Now to translate each proto to a DB update in Ratis' applyTransaction:

  • Put and Delete simply map to existing RocksDB put and delete key ops. Note that RocksDB does not have a move operation.
  • Checkpoint creates a RocksDB checkpoint and will be used by snapshots.
  • Merge will be used to implement any increments required, like quota using the RocksDB associative merge operator. Initializers of the module will pass in a mapping of column families to their corresponding merge operators if required.
    • For example, the OM would initialize the module with a BucketInfoMergeOperator on the BucketTable, a VolumeInfoMergeOperator on the VolumeTable, etc.

Then the API surface between OM or any other service and the replicated DB module is just a list of column families to open, with some optionally mapped to merge operator callbacks provided on construction, and calls to submit new Operation lists to the module.

@ivandika3 ivandika3 self-requested a review January 13, 2025 08:20
Copy link
Contributor

@ivandika3 ivandika3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Left an initial comments.

Regarding the request flow, could you add a more detailed sequence diagram? Similar to https://issues.apache.org/jira/browse/HDDS-1595 so that it's easier to visualize the new flow.

image

Here is the summary of the challenges:

- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests.
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what does "effective batching" entails? Does it mean 1.2 OM requests per batch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this should be clarified in the doc. I think this meant to say "the effect of batching on performance is a 1.2x speedup at best", as in best case the double buffer is only adding a 20% speedup, while prototypes of the new design show far greater improvements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, effective batching means,
at time T, 2 request changes flushed to db together
at time T+1, 1 request flushed to db together
...
at time T+n, 3 request flushed to db together

On average, it comes to be 1.2 request getting flushed to db. So this kind of batching is not effective.

Updated, 1.2 request (on average) at best

Copy link
Contributor Author

@sumitagrawl sumitagrawl Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So rocksdb batching is not effective on NVME cluster as observed.


- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests.
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features.
- The number of transactions that can be pushed through Ratis currently caps out around 25k.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the theoretical bottleneck on the Ratis itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, with ratis test, this is observed with previous test on ratis by some other member.

With OM when testing, with increase load and capacity, OM request processing was not able to improve and reached only 12K/second -- CPU used only approx 16% and having a lot of memory available.

When multiple request are merged as part of this batching, its able to reach 40K/second, CPU utilization has 33% as per report. At this point, request batching for ratis submit was high - reached 20 also as in performance test report for prototype graph -- "Merge_ratis_status"

Uploading image.png…

| 3 | CPU Utilization Leader | 16% (unable to increase load) | 33% |
| 4 | CPU Utilization Follower | 6% above | 4% below |

Refer [performance prototype result](performance-prototype-result.pdf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these performance results referring to the prototype in #7406 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

- On restart (leader): last preserved index + 1
- On Switch over: last index + 1
- Request execution: index + 1
- Upgrade: Last Ratis index + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for existing cluster, the subsequent object IDs will be based on the Ratis last applied index?

Copy link
Contributor Author

@sumitagrawl sumitagrawl Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, This is only during upgrade case, where need to use last index to avoid duplicate index, and in-turn avoiding duplicate objectId which can have impact in logic processing.

Updated the document for upgrade impact for this new entry.

Comment on lines 80 to 81
### Batching (Ratis request)
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does the OM decide whether a batch will be sent to Ratis? Is it decided based on time / size of the batch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also some suggestions

Suggested change
### Batching (Ratis request)
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching.
### Batching (Ratis request)
All requests executed in parallel are batched and send as single request to other nodes. This helps improve performance over network with batching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc,

batcher is waiting over queue, as soon as request is available, it will be merging and sending to ratis.
During this time, more request can be added to queue.

So in idle case, batcher will be waiting over queue
In busy case, it will pick and send over ratis, and process. After that will again check queue.

--> Else continue request handling`

#### Client request replay at leader node
- When request is received at leader node, it will cache the request in replayCache immediately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to use retryCache instead of replayCache to standardize the terminology with Ratis.

Personally, "replay" terminology seems to be more related to the replaying unapplied Ratis transactions from the previous OM design documentations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Comment on lines 334 to 347
#### Replay cache distribution to other nodes
Request - response will be cached to other node via ratis distribution
- It will be added to memory cache with expiry handling
- Also will be added to DB for persistence for restart handing

Below information will be sync to all nodes via ratis:
```
message ClientRequestInfo {
optional string uuidClientId = 1;
optional uint64 callId = 2;
optional unint64 timestamp = 5;
optional OMResponse response = 3;
}
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. Previously, each OM / Ratis request corresponds to a single OM / Ratis response because the Ratis will only reply pending request after the log is applied at the leader.

How would the client reply and retry cache mechanisms work now since each Ratis request contains multiple DB updates from multiple OM requests? So during log apply, the state machine needs to reply to multiple clients at the same time? Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ratis reply from pending request after the log is applied in happy path.

But ratis also have retryCache present which will cache response corresponding to "callId#ClientId", and reply back with same response if same request is retried. This is additional fallback mechanism present in ratis when client retry same request due to some failure where response is not received.

As handling,
Each request and response is synchronized from leader with db updates to follower:

  • callId and clientId (used to identify retry of request)
  • timestamp
  • response message (to be replied back if same request is retried as ratis behavior)

Now any incoming request to the node can be checked with "callId and clientId" if exist in cache. If exist, it will reply back with same "response" as provided.

#### Memory caching:
```
Memory Map: ClientId#CallId Vs Response
Expiry: 10 minute (as current default for ratis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This expiry is done independently for each OM node? It won't be replicated from leader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leader will sync below information with db changes,

  • callI and clientId (used to identify retry of request)
  • timestamp
  • response message (to be replied back if same request is retried as ratis behavior)

Each node will make use of above information once received in applyTransaction and handle independently.

Comment on lines 419 to 422
1. With Leader side execution, metrics and its capturing information can change.
- Certain metrics may not be valid
- New metrics needs to be added
- Metrics will be updated at leader side now like for key create. At follower node, its just db update, so value will not be udpated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, also note that write audit logs will only be generated in leader, instead of both in leader and follower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

- If we shard OM, then across OMs the object ID will not be unique.
- When batching multiple requests, we cannot utilize Ratis metadata to generate object IDs.

Longer term, we should move to a UUID based object ID generation. This will allow us to generate object IDs that are globally unique. In the mean time, we are moving to a persistent counter based object ID generation. The counter is persisted during apply transaction and is incremented for each new object created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about update ID? In the future if we decide to shard the OMs, we probably need some kind of sequence generator to generate a monotonically increasing ID for each update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateId is just represent change in Existing object or like versioning. So this may be just, lastUpdateId + 1

This part is not included as part of this feature and can be done with approach like above or any distributed indexing if required.

@ivandika3
Copy link
Contributor

cc: @xichen01 @symious

@sumitagrawl
Copy link
Contributor Author

@ivandika3 @errose28 Updated docs for all comments. below is pending,

  1. detailed flow diagram including lock / unlock flow (tried with abstract flow and Gateway responsibility, but this needs more details and further visualization for flow)

  2. Ratis + RocksD separation with OM logic - to check further

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants