Skip to content

Commit

Permalink
refactoring to improve subclass testability for base scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
popduke committed Jun 26, 2024
1 parent 9a25889 commit 3e045e5
Show file tree
Hide file tree
Showing 38 changed files with 908 additions and 411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.baidu.bifromq.basekv.store.proto.RWCoProcInput;
import com.baidu.bifromq.basekv.store.proto.RWCoProcOutput;
import com.baidu.bifromq.basekv.store.proto.ReplyCode;
import com.baidu.bifromq.basescheduler.CallTask;
import com.baidu.bifromq.basescheduler.IBatchCall;
import com.baidu.bifromq.basescheduler.ICallTask;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
Expand All @@ -35,10 +35,10 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class BatchMutationCall<Req, Resp> implements IBatchCall<Req, Resp, MutationCallBatcherKey> {
public abstract class BatchMutationCall<ReqT, RespT> implements IBatchCall<ReqT, RespT, MutationCallBatcherKey> {
private final KVRangeId rangeId;
private final LoadingCache<String, IMutationPipeline> storePipelines;
private final Deque<BatchCallTask<Req, Resp>> batchCallTasks = new ArrayDeque<>();
private final Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks = new ArrayDeque<>();

protected BatchMutationCall(KVRangeId rangeId, IBaseKVStoreClient storeClient, Duration pipelineExpiryTime) {
this.rangeId = rangeId;
Expand All @@ -53,10 +53,10 @@ protected BatchMutationCall(KVRangeId rangeId, IBaseKVStoreClient storeClient, D
}

@Override
public final void add(CallTask<Req, Resp, MutationCallBatcherKey> callTask) {
BatchCallTask<Req, Resp> lastBatchCallTask;
MutationCallBatcherKey batcherKey = callTask.batcherKey;
assert callTask.batcherKey.id.equals(rangeId);
public final void add(ICallTask<ReqT, RespT, MutationCallBatcherKey> callTask) {
MutationCallTaskBatch<ReqT, RespT> lastBatchCallTask;
MutationCallBatcherKey batcherKey = callTask.batcherKey();
assert callTask.batcherKey().id.equals(rangeId);
if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) {
if (lastBatchCallTask.storeId.equals(batcherKey.leaderStoreId) && lastBatchCallTask.ver == batcherKey.ver) {
if (!lastBatchCallTask.isBatchable(callTask)) {
Expand All @@ -76,16 +76,16 @@ public final void add(CallTask<Req, Resp, MutationCallBatcherKey> callTask) {
}
}

protected BatchCallTask<Req, Resp> newBatch(String storeId, long ver) {
return new BatchCallTask<>(storeId, ver);
protected MutationCallTaskBatch<ReqT, RespT> newBatch(String storeId, long ver) {
return new MutationCallTaskBatch<>(storeId, ver);
}

protected abstract RWCoProcInput makeBatch(Iterator<Req> reqIterator);
protected abstract RWCoProcInput makeBatch(Iterator<ReqT> reqIterator);

protected abstract void handleOutput(Queue<CallTask<Req, Resp, MutationCallBatcherKey>> batchedTasks,
protected abstract void handleOutput(Queue<ICallTask<ReqT, RespT, MutationCallBatcherKey>> batchedTasks,
RWCoProcOutput output);

protected abstract void handleException(CallTask<Req, Resp, MutationCallBatcherKey> callTask, Throwable e);
protected abstract void handleException(ICallTask<ReqT, RespT, MutationCallBatcherKey> callTask, Throwable e);

@Override
public void reset() {
Expand All @@ -94,7 +94,7 @@ public void reset() {

@Override
public CompletableFuture<Void> execute() {
BatchCallTask<Req, Resp> batchCallTask = batchCallTasks.poll();
MutationCallTaskBatch<ReqT, RespT> batchCallTask = batchCallTasks.poll();
if (batchCallTask == null) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -108,16 +108,15 @@ public void destroy() {
}

private CompletableFuture<Void> fireBatchCall() {
BatchCallTask<Req, Resp> batchCallTask = batchCallTasks.poll();
MutationCallTaskBatch<ReqT, RespT> batchCallTask = batchCallTasks.poll();
if (batchCallTask == null) {
return CompletableFuture.completedFuture(null);
}
return fireBatchCall(batchCallTask);
}

private CompletableFuture<Void> fireBatchCall(BatchCallTask<Req, Resp> batchCallTask) {
RWCoProcInput input = makeBatch(batchCallTask.batchedTasks.stream()
.map(call -> call.call).iterator());
private CompletableFuture<Void> fireBatchCall(MutationCallTaskBatch<ReqT, RespT> batchCallTask) {
RWCoProcInput input = makeBatch(batchCallTask.batchedTasks.stream().map(ICallTask::call).iterator());
long reqId = System.nanoTime();
return storePipelines.get(batchCallTask.storeId)
.execute(KVRangeRWRequest.newBuilder()
Expand All @@ -135,7 +134,7 @@ private CompletableFuture<Void> fireBatchCall(BatchCallTask<Req, Resp> batchCall
})
.handle((v, e) -> {
if (e != null) {
CallTask<Req, Resp, MutationCallBatcherKey> callTask;
ICallTask<ReqT, RespT, MutationCallBatcherKey> callTask;
while ((callTask = batchCallTask.batchedTasks.poll()) != null) {
handleException(callTask, e);
}
Expand All @@ -147,21 +146,22 @@ private CompletableFuture<Void> fireBatchCall(BatchCallTask<Req, Resp> batchCall
.thenCompose(v -> fireBatchCall());
}

protected static class BatchCallTask<Req, Resp> {
protected static class MutationCallTaskBatch<CallT, CallResultT> {
private final String storeId;
private final long ver;
private final LinkedList<CallTask<Req, Resp, MutationCallBatcherKey>> batchedTasks = new LinkedList<>();
private final LinkedList<ICallTask<CallT, CallResultT, MutationCallBatcherKey>> batchedTasks =
new LinkedList<>();

protected BatchCallTask(String storeId, long ver) {
protected MutationCallTaskBatch(String storeId, long ver) {
this.storeId = storeId;
this.ver = ver;
}

protected void add(CallTask<Req, Resp, MutationCallBatcherKey> callTask) {
protected void add(ICallTask<CallT, CallResultT, MutationCallBatcherKey> callTask) {
this.batchedTasks.add(callTask);
}

protected boolean isBatchable(CallTask<Req, Resp, MutationCallBatcherKey> callTask) {
protected boolean isBatchable(ICallTask<CallT, CallResultT, MutationCallBatcherKey> callTask) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.baidu.bifromq.basekv.store.proto.ROCoProcInput;
import com.baidu.bifromq.basekv.store.proto.ROCoProcOutput;
import com.baidu.bifromq.basekv.store.proto.ReplyCode;
import com.baidu.bifromq.basescheduler.CallTask;
import com.baidu.bifromq.basescheduler.IBatchCall;
import com.baidu.bifromq.basescheduler.ICallTask;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
Expand Down Expand Up @@ -62,9 +62,9 @@ protected BatchQueryCall(KVRangeId rangeId,
}

@Override
public void add(CallTask<Req, Resp, QueryCallBatcherKey> callTask) {
public void add(ICallTask<Req, Resp, QueryCallBatcherKey> callTask) {
BatchQueryCall.BatchCallTask<Req, Resp> lastBatchCallTask;
QueryCallBatcherKey batcherKey = callTask.batcherKey;
QueryCallBatcherKey batcherKey = callTask.batcherKey();
if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) {
if (lastBatchCallTask.storeId.equals(batcherKey.storeId) && lastBatchCallTask.ver == batcherKey.ver) {
lastBatchCallTask.batchedTasks.add(callTask);
Expand All @@ -82,10 +82,10 @@ public void add(CallTask<Req, Resp, QueryCallBatcherKey> callTask) {

protected abstract ROCoProcInput makeBatch(Iterator<Req> reqIterator);

protected abstract void handleOutput(Queue<CallTask<Req, Resp, QueryCallBatcherKey>> batchedTasks,
protected abstract void handleOutput(Queue<ICallTask<Req, Resp, QueryCallBatcherKey>> batchedTasks,
ROCoProcOutput output);

protected abstract void handleException(CallTask<Req, Resp, QueryCallBatcherKey> callTask, Throwable e);
protected abstract void handleException(ICallTask<Req, Resp, QueryCallBatcherKey> callTask, Throwable e);

@Override
public void reset() {
Expand Down Expand Up @@ -117,7 +117,7 @@ private CompletableFuture<Void> fireBatchCall() {

private CompletableFuture<Void> fireBatchCall(BatchQueryCall.BatchCallTask<Req, Resp> batchCallTask) {
ROCoProcInput input = makeBatch(batchCallTask.batchedTasks.stream()
.map(call -> call.call).iterator());
.map(ICallTask::call).iterator());
long reqId = System.nanoTime();
return storePipelines.get(batchCallTask.storeId)
.query(KVRangeRORequest.newBuilder()
Expand All @@ -135,7 +135,7 @@ private CompletableFuture<Void> fireBatchCall(BatchQueryCall.BatchCallTask<Req,
})
.handle((v, e) -> {
if (e != null) {
CallTask<Req, Resp, QueryCallBatcherKey> callTask;
ICallTask<Req, Resp, QueryCallBatcherKey> callTask;
while ((callTask = batchCallTask.batchedTasks.poll()) != null) {
handleException(callTask, e);
}
Expand All @@ -151,7 +151,7 @@ private CompletableFuture<Void> fireBatchCall(BatchQueryCall.BatchCallTask<Req,
private static class BatchCallTask<Req, Resp> {
final String storeId;
final long ver;
final LinkedList<CallTask<Req, Resp, QueryCallBatcherKey>> batchedTasks = new LinkedList<>();
final LinkedList<ICallTask<Req, Resp, QueryCallBatcherKey>> batchedTasks = new LinkedList<>();

private BatchCallTask(String storeId, long ver) {
this.storeId = storeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class MutationCallScheduler<Req, Resp> extends BatchCallScheduler<Req, Resp, MutationCallBatcherKey> {
public abstract class MutationCallScheduler<ReqT, RespT>
extends BatchCallScheduler<ReqT, RespT, MutationCallBatcherKey> {
protected final IBaseKVStoreClient storeClient;

public MutationCallScheduler(String name,
Expand All @@ -43,10 +44,10 @@ public MutationCallScheduler(String name,
}

@Override
protected final Optional<MutationCallBatcherKey> find(Req subCall) {
protected final Optional<MutationCallBatcherKey> find(ReqT subCall) {
Optional<KVRangeSetting> rangeSetting = storeClient.findByKey(rangeKey(subCall));
return rangeSetting.map(setting -> new MutationCallBatcherKey(setting.id, setting.leader, setting.ver));
}

protected abstract ByteString rangeKey(Req call);
protected abstract ByteString rangeKey(ReqT call);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.time.Duration;
import java.util.Optional;

public abstract class QueryCallScheduler<Req, Resp> extends BatchCallScheduler<Req, Resp, QueryCallBatcherKey> {
public abstract class QueryCallScheduler<ReqT, RespT> extends BatchCallScheduler<ReqT, RespT, QueryCallBatcherKey> {
protected final IBaseKVStoreClient storeClient;

public QueryCallScheduler(String name,
Expand All @@ -41,16 +41,16 @@ public QueryCallScheduler(String name,
}


protected String selectStore(KVRangeSetting setting, Req request) {
protected String selectStore(KVRangeSetting setting, ReqT request) {
return setting.leader;
}

protected abstract int selectQueue(Req request);
protected abstract int selectQueue(ReqT request);

protected abstract ByteString rangeKey(Req request);
protected abstract ByteString rangeKey(ReqT request);

@Override
protected final Optional<QueryCallBatcherKey> find(Req req) {
protected final Optional<QueryCallBatcherKey> find(ReqT req) {
return storeClient.findByKey(rangeKey(req)).map(
range -> new QueryCallBatcherKey(range.id, selectStore(range, req), selectQueue(req), range.ver));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.baidu.bifromq.basekv.proto.KVRangeId;
import com.baidu.bifromq.basekv.store.proto.RWCoProcInput;
import com.baidu.bifromq.basekv.store.proto.RWCoProcOutput;
import com.baidu.bifromq.basescheduler.CallTask;
import com.baidu.bifromq.basescheduler.ICallTask;
import com.google.protobuf.ByteString;
import java.time.Duration;
import java.util.HashSet;
Expand All @@ -33,7 +33,7 @@ protected TestBatchMutationCall(KVRangeId rangeId,
}

@Override
protected BatchCallTask<ByteString, ByteString> newBatch(String storeId, long ver) {
protected MutationCallTaskBatch<ByteString, ByteString> newBatch(String storeId, long ver) {
return new TestBatchCallTask(storeId, ver);
}

Expand All @@ -49,36 +49,36 @@ protected RWCoProcInput makeBatch(Iterator<ByteString> byteStringIterator) {
}

@Override
protected void handleOutput(Queue<CallTask<ByteString, ByteString, MutationCallBatcherKey>> batchedTasks,
protected void handleOutput(Queue<ICallTask<ByteString, ByteString, MutationCallBatcherKey>> batchedTasks,
RWCoProcOutput output) {
CallTask<ByteString, ByteString, MutationCallBatcherKey> task;
ICallTask<ByteString, ByteString, MutationCallBatcherKey> task;
while ((task = batchedTasks.poll()) != null) {
// just echo the request
task.callResult.complete(task.call);
task.resultPromise().complete(task.call());
}
}

@Override
protected void handleException(CallTask<ByteString, ByteString, MutationCallBatcherKey> callTask, Throwable e) {
callTask.callResult.completeExceptionally(e);
protected void handleException(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask, Throwable e) {
callTask.resultPromise().completeExceptionally(e);
}

private static class TestBatchCallTask extends BatchCallTask<ByteString, ByteString> {
private static class TestBatchCallTask extends MutationCallTaskBatch<ByteString, ByteString> {
private final Set<ByteString> keys = new HashSet<>();

protected TestBatchCallTask(String storeId, long ver) {
super(storeId, ver);
}

@Override
protected void add(CallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
protected void add(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
super.add(callTask);
keys.add(callTask.call);
keys.add(callTask.call());
}

@Override
protected boolean isBatchable(CallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
return !keys.contains(callTask.call);
protected boolean isBatchable(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
return !keys.contains(callTask.call());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.baidu.bifromq.basekv.proto.KVRangeId;
import com.baidu.bifromq.basekv.store.proto.ROCoProcInput;
import com.baidu.bifromq.basekv.store.proto.ROCoProcOutput;
import com.baidu.bifromq.basescheduler.CallTask;
import com.baidu.bifromq.basescheduler.ICallTask;
import com.google.protobuf.ByteString;
import java.time.Duration;
import java.util.Iterator;
Expand All @@ -43,18 +43,18 @@ protected ROCoProcInput makeBatch(Iterator<ByteString> byteStringIterator) {
}

@Override
protected void handleOutput(Queue<CallTask<ByteString, ByteString, QueryCallBatcherKey>> batchedTasks,
protected void handleOutput(Queue<ICallTask<ByteString, ByteString, QueryCallBatcherKey>> batchedTasks,
ROCoProcOutput output) {
CallTask<ByteString, ByteString, QueryCallBatcherKey> task;
ICallTask<ByteString, ByteString, QueryCallBatcherKey> task;
while ((task = batchedTasks.poll()) != null) {
// just echo the request
task.callResult.complete(task.call);
task.resultPromise().complete(task.call());
}

}

@Override
protected void handleException(CallTask<ByteString, ByteString, QueryCallBatcherKey> callTask, Throwable e) {
callTask.callResult.completeExceptionally(e);
protected void handleException(ICallTask<ByteString, ByteString, QueryCallBatcherKey> callTask, Throwable e) {
callTask.resultPromise().completeExceptionally(e);
}
}
Loading

0 comments on commit 3e045e5

Please sign in to comment.