diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServer.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServer.java index cedd92677..ad03cecd6 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServer.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServer.java @@ -46,7 +46,8 @@ public class CounterServer { private CounterStateMachine fsm; public CounterServer(final String dataPath, final String groupId, final PeerId serverId, - final NodeOptions nodeOptions) throws IOException { + final NodeOptions nodeOptions, final CounterServiceProvider counterServiceProvider) + throws IOException { // init raft data path, it contains log,meta,snapshot FileUtils.forceMkdir(new File(dataPath)); @@ -57,7 +58,7 @@ public CounterServer(final String dataPath, final String groupId, final PeerId s CounterGrpcHelper.setRpcServer(rpcServer); // register business processor - CounterService counterService = new CounterServiceImpl(this); + CounterService counterService = counterServiceProvider.getCounterService(); rpcServer.registerProcessor(new GetValueRequestProcessor(counterService)); rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService)); // init state machine @@ -116,6 +117,8 @@ public static void main(final String[] args) throws IOException { final String serverIdStr = args[2]; final String initConfStr = args[3]; + CounterServiceProvider counterServiceProvider = new CounterServiceImpl(null); + final NodeOptions nodeOptions = new NodeOptions(); // for test, modify some params // set election timeout to 1s @@ -137,7 +140,9 @@ public static void main(final String[] args) throws IOException { nodeOptions.setInitialConf(initConf); // start raft server - final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions); + final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions, + counterServiceProvider); + // ((CounterServiceImpl) counterServiceProvider).counterServer = counterServer; System.out.println("Started counter server at port:" + counterServer.getNode().getNodeId().getPeerId().getPort()); // GrpcServer need block to prevent process exit diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceImpl.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceImpl.java index 2dbe9be8d..734cebc74 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceImpl.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceImpl.java @@ -36,7 +36,7 @@ /** * @author likun (saimu.msm@antfin.com) */ -public class CounterServiceImpl implements CounterService { +public class CounterServiceImpl implements CounterService, CounterServiceProvider { private static final Logger LOG = LoggerFactory.getLogger(CounterServiceImpl.class); private final CounterServer counterServer; @@ -47,6 +47,11 @@ public CounterServiceImpl(CounterServer counterServer) { this.readIndexExecutor = createReadIndexExecutor(); } + @Override + public CounterService getCounterService() { + return this; + } + private Executor createReadIndexExecutor() { final StoreEngineOptions opts = new StoreEngineOptions(); return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceProvider.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceProvider.java new file mode 100644 index 000000000..99803345d --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterServiceProvider.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.counter; + +public interface CounterServiceProvider { + CounterService getCounterService(); +} \ No newline at end of file diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawDeleteOperations.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawDeleteOperations.java new file mode 100644 index 000000000..5db7bbdce --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawDeleteOperations.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +/** + * Seperation of concerns for batch delete operations from {@link BatchRawKVStore}. + * + * @author jiachun.fjc + JAYDIPSINH27 + */ +public class BatchRawDeleteOperations { + private final BatchRawKVStore store; + + public BatchRawDeleteOperations(BatchRawKVStore store) { + this.store = store; + } + + public void batchDelete(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + store.delete(kvState.getOp().getKey(), kvState.getDone()); + } + } + + public void batchDeleteRange(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + final KVOperation op = kvState.getOp(); + store.deleteRange(op.getStartKey(), op.getEndKey(), kvState.getDone()); + } + } + + public void batchDeleteList(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + store.delete(kvState.getOp().getKeys(), kvState.getDone()); + } + } +} diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java index 7830730c3..e154fb50d 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java @@ -27,49 +27,36 @@ */ public abstract class BatchRawKVStore extends BaseRawKVStore { + private final BatchRawPutOperations batchRawPutOperations; + private final BatchRawDeleteOperations batchRawDeleteOperations; + + public BatchRawKVStore() { + this.batchRawPutOperations = new BatchRawPutOperations<>(this); + this.batchRawDeleteOperations = new BatchRawDeleteOperations<>(this); + } + public void batchPut(final KVStateOutputList kvStates) { - for (int i = 0, l = kvStates.size(); i < l; i++) { - final KVState kvState = kvStates.get(i); - final KVOperation op = kvState.getOp(); - put(op.getKey(), op.getValue(), kvState.getDone()); - } + batchRawPutOperations.batchPut(kvStates); } public void batchPutIfAbsent(final KVStateOutputList kvStates) { - for (int i = 0, l = kvStates.size(); i < l; i++) { - final KVState kvState = kvStates.get(i); - final KVOperation op = kvState.getOp(); - putIfAbsent(op.getKey(), op.getValue(), kvState.getDone()); - } + batchRawPutOperations.batchPutIfAbsent(kvStates); } public void batchPutList(final KVStateOutputList kvStates) { - for (int i = 0, l = kvStates.size(); i < l; i++) { - final KVState kvState = kvStates.get(i); - put(kvState.getOp().getEntries(), kvState.getDone()); - } + batchRawPutOperations.batchPutList(kvStates); } public void batchDelete(final KVStateOutputList kvStates) { - for (int i = 0, l = kvStates.size(); i < l; i++) { - final KVState kvState = kvStates.get(i); - delete(kvState.getOp().getKey(), kvState.getDone()); - } + batchRawDeleteOperations.batchDelete(kvStates); } public void batchDeleteRange(final KVStateOutputList kvStates) { - for (int i = 0, l = kvStates.size(); i < l; i++) { - final KVState kvState = kvStates.get(i); - final KVOperation op = kvState.getOp(); - deleteRange(op.getStartKey(), op.getEndKey(), kvState.getDone()); - } + batchRawDeleteOperations.batchDeleteRange(kvStates); } public void batchDeleteList(final KVStateOutputList kvStates) { - for (int i = 0, l = kvStates.size(); i < l; i++) { - final KVState kvState = kvStates.get(i); - delete(kvState.getOp().getKeys(), kvState.getDone()); - } + batchRawDeleteOperations.batchDeleteList(kvStates); } public void batchGetSequence(final KVStateOutputList kvStates) { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawPutOperations.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawPutOperations.java new file mode 100644 index 000000000..9a5db920b --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawPutOperations.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +/** + * Seperation of concerns for batch put operations from {@link BatchRawKVStore}. + * + * @author jiachun.fjc + JAYDIPSINH27 + */ +public class BatchRawPutOperations { + private final BatchRawKVStore store; + + public BatchRawPutOperations(BatchRawKVStore store) { + this.store = store; + } + + public void batchPut(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + final KVOperation op = kvState.getOp(); + store.put(op.getKey(), op.getValue(), kvState.getDone()); + } + } + + public void batchPutIfAbsent(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + final KVOperation op = kvState.getOp(); + store.putIfAbsent(op.getKey(), op.getValue(), kvState.getDone()); + } + } + + public void batchPutList(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + store.put(kvState.getOp().getEntries(), kvState.getDone()); + } + } +} diff --git a/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicServer.java b/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicServer.java index f58e90ff7..344982bbb 100644 --- a/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicServer.java +++ b/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicServer.java @@ -42,12 +42,12 @@ */ public class AtomicServer { - private static final Logger LOG = LoggerFactory.getLogger(AtomicServer.class); + private static final Logger LOG = LoggerFactory.getLogger(AtomicServer.class); - private TreeMap nodes = new TreeMap<>(); - private TreeMap groups = new TreeMap<>(); - private int totalSlots; - private StartupConf conf; + public TreeMap nodes = new TreeMap<>(); + public TreeMap groups = new TreeMap<>(); + private int totalSlots; + private StartupConf conf; public AtomicRangeGroup getGroupBykey(String key) { return nodes.get(HashUtils.getHeadKey(this.nodes, key)); @@ -67,57 +67,10 @@ public TreeMap getGroups() { return this.groups; } - public void start() throws IOException { - PeerId serverId = new PeerId(); - if (!serverId.parse(conf.getServerAddress())) { - throw new IllegalArgumentException("Fail to parse serverId:" + conf.getServerAddress()); - } - - FileUtils.forceMkdir(new File(conf.getDataPath())); - // The same in-process raft group shares the same RPC Server. - RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); - // Register biz handler - rpcServer.registerProcessor(new GetSlotsCommandProcessor(this)); - rpcServer.registerProcessor(new GetCommandProcessor(this)); - rpcServer.registerProcessor(new IncrementAndGetCommandProcessor(this)); - rpcServer.registerProcessor(new CompareAndSetCommandProcessor(this)); - rpcServer.registerProcessor(new SetCommandProcessor(this)); - - long step = conf.getMaxSlot() / totalSlots; - if (conf.getMaxSlot() % totalSlots > 0) { - step = step + 1; - } - for (int i = 0; i < totalSlots; i++) { - long min = i * step; - long mayMax = (i + 1) * step; - long max = mayMax > conf.getMaxSlot() || mayMax <= 0 ? conf.getMaxSlot() : mayMax; - StartupConf nodeConf = new StartupConf(); - String nodeDataPath = conf.getDataPath() + File.separator + i; - nodeConf.setDataPath(nodeDataPath); - String nodeGroup = conf.getGroupId() + "_" + i; - nodeConf.setGroupId(nodeGroup); - nodeConf.setMaxSlot(max); - nodeConf.setMinSlot(min); - nodeConf.setConf(conf.getConf()); - nodeConf.setServerAddress(conf.getServerAddress()); - nodeConf.setTotalSlots(conf.getTotalSlots()); - LOG.info("Starting range node {}-{} with conf {}", min, max, nodeConf); - nodes.put(i * step, AtomicRangeGroup.start(nodeConf, rpcServer)); - groups.put(i * step, nodeGroup); - } - } - - public static void start(String confFilePath) throws IOException { - StartupConf conf = new StartupConf(); - if (!conf.loadFromFile(confFilePath)) { - throw new IllegalStateException("Load startup config from " + confFilePath + " failed"); - } - AtomicServer server = new AtomicServer(conf); - server.start(); - } - //for test + public static void main(String[] arsg) throws Exception { - start("config/server.properties"); + StartupConf conf = new StartupConf(); + conf.start("config/server.properties"); } } diff --git a/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/StartupConf.java b/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/StartupConf.java index c8b987848..5997ad911 100644 --- a/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/StartupConf.java +++ b/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/StartupConf.java @@ -21,12 +21,24 @@ import java.io.IOException; import java.util.Properties; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.test.atomic.server.processor.*; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.Configuration; +import static com.alipay.sofa.jraft.test.atomic.server.AtomicRangeGroup.LOG; + public class StartupConf { + + public StartupConf() { + + } + private String groupId; private String dataPath; private String conf; @@ -139,4 +151,54 @@ public String toString() { + ", totalSlots=" + this.totalSlots + "]"; } + public void start() throws IOException { + // StartupConf conf = new StartupConf(); + AtomicServer server = new AtomicServer(this); + PeerId serverId = new PeerId(); + if (!serverId.parse(this.getServerAddress())) { + throw new IllegalArgumentException("Fail to parse serverId:" + this.getServerAddress()); + } + + FileUtils.forceMkdir(new File(this.getDataPath())); + // The same in-process raft group shares the same RPC Server. + RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); + // Register biz handler + rpcServer.registerProcessor(new GetSlotsCommandProcessor(server)); + rpcServer.registerProcessor(new GetCommandProcessor(server)); + rpcServer.registerProcessor(new IncrementAndGetCommandProcessor(server)); + rpcServer.registerProcessor(new CompareAndSetCommandProcessor(server)); + rpcServer.registerProcessor(new SetCommandProcessor(server)); + + long step = this.getMaxSlot() / totalSlots; + if (this.getMaxSlot() % totalSlots > 0) { + step = step + 1; + } + for (int i = 0; i < totalSlots; i++) { + long min = i * step; + long mayMax = (i + 1) * step; + long max = mayMax > this.getMaxSlot() || mayMax <= 0 ? this.getMaxSlot() : mayMax; + StartupConf nodeConf = new StartupConf(); + String nodeDataPath = this.getDataPath() + File.separator + i; + nodeConf.setDataPath(nodeDataPath); + String nodeGroup = this.getGroupId() + "_" + i; + nodeConf.setGroupId(nodeGroup); + nodeConf.setMaxSlot(max); + nodeConf.setMinSlot(min); + nodeConf.setConf(this.getConf()); + nodeConf.setServerAddress(this.getServerAddress()); + nodeConf.setTotalSlots(this.getTotalSlots()); + LOG.info("Starting range node {}-{} with conf {}", min, max, nodeConf); + server.nodes.put(i * step, AtomicRangeGroup.start(nodeConf, rpcServer)); + server.groups.put(i * step, nodeGroup); + } + } + + public void start(String confFilePath) throws IOException { + StartupConf conf = new StartupConf(); + if (!conf.loadFromFile(confFilePath)) { + throw new IllegalStateException("Load startup config from " + confFilePath + " failed"); + } + this.start(); + } + }