Skip to content

Commit

Permalink
refactor: renamed client implementation types
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Feb 26, 2025
1 parent b817242 commit ac107eb
Show file tree
Hide file tree
Showing 157 changed files with 383 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.orientechnologies.common.io.OIOException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.common.log.OLogger;
import com.orientechnologies.orient.client.remote.OStorageRemoteNodeSession;
import com.orientechnologies.orient.client.remote.OStorageRemoteSession;
import com.orientechnologies.orient.client.remote.ORemoteClientNodeSession;
import com.orientechnologies.orient.client.remote.ORemoteClientSession;
import com.orientechnologies.orient.client.remote.message.OError37Response;
import com.orientechnologies.orient.core.OConstants;
import com.orientechnologies.orient.core.config.OContextConfiguration;
Expand Down Expand Up @@ -375,14 +375,13 @@ public void handleException(Throwable throwable) {
}
}

public void beginRequest(final byte iCommand, final OStorageRemoteSession session)
public void beginRequest(final byte iCommand, final ORemoteClientSession session)
throws IOException {
final OStorageRemoteNodeSession nodeSession = session.getServerSession(getServerURL());
final ORemoteClientNodeSession nodeSession = session.getServerSession(getServerURL());
beginRequest(iCommand, nodeSession);
}

public void beginRequest(byte iCommand, OStorageRemoteNodeSession nodeSession)
throws IOException {
public void beginRequest(byte iCommand, ORemoteClientNodeSession nodeSession) throws IOException {
if (nodeSession == null)
throw new OIOException("Invalid session for URL '" + getServerURL() + "'");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
/** Created by tglman on 07/06/16. */
public interface OBinaryRequest<T extends OBinaryResponse> {

void write(final OChannelDataOutput network, OStorageRemoteSession session) throws IOException;
void write(final OChannelDataOutput network, ORemoteClientSession session) throws IOException;

void read(OChannelDataInput channel, int protocolVersion, ORecordSerializer serializer)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ public interface OBinaryResponse {
void write(OChannelDataOutput channel, int protocolVersion, ORecordSerializer serializer)
throws IOException;

void read(final OChannelDataInput network, OStorageRemoteSession session) throws IOException;
void read(final OChannelDataInput network, ORemoteClientSession session) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ public enum CONNECTION_STRATEGY {
private OCluster[] clusters = OCommonConst.EMPTY_CLUSTER_ARRAY;
private int defaultClusterId;
public ORemoteConnectionManager connectionManager;
private final Set<OStorageRemoteSession> sessions =
Collections.newSetFromMap(new ConcurrentHashMap<OStorageRemoteSession, Boolean>());
private final Set<ORemoteClientSession> sessions =
Collections.newSetFromMap(new ConcurrentHashMap<ORemoteClientSession, Boolean>());

private final Map<Integer, OLiveQueryClientListener> liveQueryListener =
new ConcurrentHashMap<>();
private volatile OStorageRemotePushThread pushThread;
private volatile ORemoteClientPushThread pushThread;
protected final OrientDBRemote context;
protected OSharedContext sharedContext = null;
protected final String url;
Expand Down Expand Up @@ -409,8 +409,8 @@ public <T extends OBinaryResponse> T networkOperation(
}

public <T> T baseNetworkOperation(
final OStorageRemoteOperation<T> operation, final String errorMessage, int retry) {
OStorageRemoteSession session = getCurrentSession();
final ORemoteClientOperation<T> operation, final String errorMessage, int retry) {
ORemoteClientSession session = getCurrentSession();
if (session.commandExecuting)
throw new ODatabaseException(
"Cannot execute the request because an asynchronous operation is in progress. Please use"
Expand Down Expand Up @@ -442,7 +442,7 @@ public <T> T baseNetworkOperation(

// In case i do not have a token or i'm switching between server i've to execute a open
// operation.
OStorageRemoteNodeSession nodeSession = session.getServerSession(network.getServerURL());
ORemoteClientNodeSession nodeSession = session.getServerSession(network.getServerURL());
if (nodeSession == null || !nodeSession.isValid() && !session.isStickToSession()) {
if (nodeSession != null) {
session.removeServerSession(nodeSession.getServerURL());
Expand Down Expand Up @@ -480,7 +480,7 @@ public <T> T baseNetworkOperation(
connectionManager.release(network);
// Remove the current url because the node is offline
this.serverURLs.remove(serverUrl);
for (OStorageRemoteSession activeSession : sessions) {
for (ORemoteClientSession activeSession : sessions) {
// Not thread Safe ...
activeSession.removeServerSession(serverUrl);
}
Expand Down Expand Up @@ -518,12 +518,12 @@ public boolean isAssigningClusterIds() {
}

public int getSessionId() {
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
return session != null ? session.getSessionId() : -1;
}

public String getServerURL() {
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
return session != null ? session.getServerUrl() : null;
}

Expand All @@ -532,7 +532,7 @@ public void open(

addUser();
try {
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
if (status == STATUS.CLOSED
|| !iUserName.equals(session.connectionUserName)
|| !iUserPassword.equals(session.connectionUserPassword)
Expand Down Expand Up @@ -594,9 +594,9 @@ public void reload() {
public void close(final boolean iForce) {
if (status == STATUS.CLOSED) return;

final OStorageRemoteSession session = getCurrentSession();
final ORemoteClientSession session = getCurrentSession();
if (session != null) {
final Collection<OStorageRemoteNodeSession> nodes = session.getAllServerSessions();
final Collection<ORemoteClientNodeSession> nodes = session.getAllServerSessions();
if (!nodes.isEmpty()) {
OContextConfiguration config = null;
if (configuration != null) {
Expand Down Expand Up @@ -907,12 +907,12 @@ public long count(final int[] iClusterIds) {
}

public void stickToSession() {
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
session.stickToSession();
}

public void unstickToSession() {
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
session.unStickToSession();
}

Expand Down Expand Up @@ -1401,7 +1401,7 @@ public String getType() {
}

public String getUserName() {
final OStorageRemoteSession session = getCurrentSession();
final ORemoteClientSession session = getCurrentSession();
if (session == null) return null;
return session.connectionUserName;
}
Expand All @@ -1412,8 +1412,8 @@ protected String reopenRemoteDatabase() throws IOException {
do {
final OChannelBinaryAsynchClient network = getNetwork(currentURL);
try {
OStorageRemoteSession session = getCurrentSession();
OStorageRemoteNodeSession nodeSession =
ORemoteClientSession session = getCurrentSession();
ORemoteClientNodeSession nodeSession =
session.getOrCreateServerSession(network.getServerURL());
if (nodeSession == null || !nodeSession.isValid()) {
openRemoteDatabase(network);
Expand Down Expand Up @@ -1464,7 +1464,7 @@ protected String reopenRemoteDatabase() throws IOException {
logger.debug("Cannot open database with url %s", e, currentURL);
} catch (OSecurityException ex) {
logger.debug("Invalidate token for url=%s", ex, currentURL);
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
session.removeServerSession(currentURL);

if (network != null) {
Expand Down Expand Up @@ -1513,9 +1513,8 @@ protected void openRemoteDatabase() throws IOException {

public void openRemoteDatabase(OChannelBinaryAsynchClient network) throws IOException {

OStorageRemoteSession session = getCurrentSession();
OStorageRemoteNodeSession nodeSession =
session.getOrCreateServerSession(network.getServerURL());
ORemoteClientSession session = getCurrentSession();
ORemoteClientNodeSession nodeSession = session.getOrCreateServerSession(network.getServerURL());
OOpen37Request request =
new OOpen37Request(name, session.connectionUserName, session.connectionUserPassword);
try {
Expand Down Expand Up @@ -1558,13 +1557,13 @@ public void openRemoteDatabase(OChannelBinaryAsynchClient network) throws IOExce
}
}

private void initPush(OStorageRemoteSession session) {
private void initPush(ORemoteClientSession session) {
if (pushThread == null) {
stateLock.writeLock().lock();
try {
if (pushThread == null) {
pushThread =
new OStorageRemotePushThread(
new ORemoteClientPushThread(
this,
getCurrentServerURL(),
connectionRetryDelay,
Expand All @@ -1585,27 +1584,27 @@ private void initPush(OStorageRemoteSession session) {
}
}

private void subscribeDistributedConfiguration(OStorageRemoteSession nodeSession) {
private void subscribeDistributedConfiguration(ORemoteClientSession nodeSession) {
pushThread.subscribe(new OSubscribeDistributedConfigurationRequest(), nodeSession);
}

private void subscribeStorageConfiguration(OStorageRemoteSession nodeSession) {
private void subscribeStorageConfiguration(ORemoteClientSession nodeSession) {
pushThread.subscribe(new OSubscribeStorageConfigurationRequest(), nodeSession);
}

private void subscribeSchema(OStorageRemoteSession nodeSession) {
private void subscribeSchema(ORemoteClientSession nodeSession) {
pushThread.subscribe(new OSubscribeSchemaRequest(), nodeSession);
}

private void subscribeFunctions(OStorageRemoteSession nodeSession) {
private void subscribeFunctions(ORemoteClientSession nodeSession) {
pushThread.subscribe(new OSubscribeFunctionsRequest(), nodeSession);
}

private void subscribeSequences(OStorageRemoteSession nodeSession) {
private void subscribeSequences(ORemoteClientSession nodeSession) {
pushThread.subscribe(new OSubscribeSequencesRequest(), nodeSession);
}

private void subscribeIndexManager(OStorageRemoteSession nodeSession) {
private void subscribeIndexManager(ORemoteClientSession nodeSession) {
pushThread.subscribe(new OSubscribeIndexManagerRequest(), nodeSession);
}

Expand Down Expand Up @@ -1675,7 +1674,7 @@ protected String useNewServerURL(final String iUrl) {

final String url = pos > -1 ? iUrl.substring(0, pos) : iUrl;
String newUrl = serverURLs.removeAndGet(url);
OStorageRemoteSession session = getCurrentSession();
ORemoteClientSession session = getCurrentSession();
if (session != null) {
session.currentUrl = newUrl;
session.serverURLIndex = 0;
Expand All @@ -1696,14 +1695,14 @@ protected void parseServerURLs() {
* @return connection to server
*/
public OChannelBinaryAsynchClient beginRequest(
final OChannelBinaryAsynchClient network, final byte iCommand, OStorageRemoteSession session)
final OChannelBinaryAsynchClient network, final byte iCommand, ORemoteClientSession session)
throws IOException {
network.beginRequest(iCommand, session);
return network;
}

protected String getNextAvailableServerURL(
boolean iIsConnectOperation, OStorageRemoteSession session) {
boolean iIsConnectOperation, ORemoteClientSession session) {

OContextConfiguration config = null;
if (configuration != null) config = configuration.getContextConfiguration();
Expand Down Expand Up @@ -1748,8 +1747,8 @@ public static OChannelBinaryAsynchClient getNetwork(
}

public static void beginResponse(
OChannelBinaryAsynchClient iNetwork, OStorageRemoteSession session) throws IOException {
OStorageRemoteNodeSession nodeSession = session.getServerSession(iNetwork.getServerURL());
OChannelBinaryAsynchClient iNetwork, ORemoteClientSession session) throws IOException {
ORemoteClientNodeSession nodeSession = session.getServerSession(iNetwork.getServerURL());
byte[] newToken = iNetwork.beginResponse(nodeSession.getSessionId(), true);
if (newToken != null && newToken.length > 0) {
nodeSession.setSession(nodeSession.getSessionId(), newToken);
Expand Down Expand Up @@ -1810,16 +1809,16 @@ public void updateStorageConfiguration(OStorageConfiguration storageConfiguratio
}
}

protected OStorageRemoteSession getCurrentSession() {
protected ORemoteClientSession getCurrentSession() {
ODatabaseDocumentInternal db = null;
if (ODatabaseRecordThreadLocal.instance() != null)
db = ODatabaseRecordThreadLocal.instance().getIfDefined();
ODatabaseDocumentInternal internal = db;
if (internal == null || !(internal instanceof ODatabaseDocumentRemote)) return null;
ODatabaseDocumentRemote remote = (ODatabaseDocumentRemote) internal;
OStorageRemoteSession session = remote.getSessionMetadata();
ORemoteClientSession session = remote.getSessionMetadata();
if (session == null) {
session = new OStorageRemoteSession(sessionSerialId.decrementAndGet());
session = new ORemoteClientSession(sessionSerialId.decrementAndGet());
sessions.add(session);
remote.setSessionMetadata(session);
}
Expand All @@ -1828,7 +1827,7 @@ protected OStorageRemoteSession getCurrentSession() {

public boolean isClosed() {
if (status == STATUS.CLOSED) return true;
final OStorageRemoteSession session = getCurrentSession();
final ORemoteClientSession session = getCurrentSession();
if (session == null) return false;
return session.isClosed();
}
Expand All @@ -1839,11 +1838,11 @@ public ORemoteClient copy(
if (ODatabaseRecordThreadLocal.instance() != null)
origin = ODatabaseRecordThreadLocal.instance().getIfDefined();

final OStorageRemoteSession session = source.getSessionMetadata();
final ORemoteClientSession session = source.getSessionMetadata();
if (session != null) {
// TODO:may run a session reopen
final OStorageRemoteSession newSession =
new OStorageRemoteSession(sessionSerialId.decrementAndGet());
final ORemoteClientSession newSession =
new ORemoteClientSession(sessionSerialId.decrementAndGet());
newSession.connectionUserName = session.connectionUserName;
newSession.connectionUserPassword = session.connectionUserPassword;
dest.setSessionMetadata(newSession);
Expand Down Expand Up @@ -2063,8 +2062,8 @@ public void onPushReconnect(String host) {
// AVOID RECONNECT ON CLOSE
return;
}
OStorageRemoteSession aValidSession = null;
for (OStorageRemoteSession session : sessions) {
ORemoteClientSession aValidSession = null;
for (ORemoteClientSession session : sessions) {
if (session.getServerSession(host) != null) {
aValidSession = session;
break;
Expand All @@ -2078,7 +2077,7 @@ public void onPushReconnect(String host) {
"Cannot find a valid session for subscribe for event to host '%s' forward the"
+ " subscribe for the next session open ",
host);
OStorageRemotePushThread old;
ORemoteClientPushThread old;
stateLock.writeLock().lock();
try {
old = pushThread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import java.io.IOException;

/** Created by tglman on 12/04/16. */
public class OStorageRemoteNodeSession {
public class ORemoteClientNodeSession {
private static final OLogger logger =
OLogManager.instance().logger(OStorageRemoteNodeSession.class);
OLogManager.instance().logger(ORemoteClientNodeSession.class);
private final String serverURL;
private Integer sessionId = -1;
private byte[] token = null;
private OToken tokenInstance = null;

public OStorageRemoteNodeSession(String serverURL, Integer uniqueClientSessionId) {
public ORemoteClientNodeSession(String serverURL, Integer uniqueClientSessionId) {
this.serverURL = serverURL;
this.sessionId = uniqueClientSessionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.io.IOException;

/** Created by tglman on 16/12/15. */
public interface OStorageRemoteOperation<T> {
public interface ORemoteClientOperation<T> {

T execute(final OChannelBinaryAsynchClient network, OStorageRemoteSession session)
T execute(final OChannelBinaryAsynchClient network, ORemoteClientSession session)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import java.util.concurrent.TimeUnit;

/** Created by tglman on 11/01/17. */
public class OStorageRemotePushThread extends Thread {
public class ORemoteClientPushThread extends Thread {
private static final OLogger logger =
OLogManager.instance().logger(OStorageRemotePushThread.class);
OLogManager.instance().logger(ORemoteClientPushThread.class);

private final ORemotePushHandler pushHandler;
private final String host;
Expand All @@ -30,7 +30,7 @@ public class OStorageRemotePushThread extends Thread {
private volatile OBinaryRequest currentRequest;
private volatile boolean shutDown;

public OStorageRemotePushThread(
public ORemoteClientPushThread(
ORemotePushHandler storage, String host, int retryDelay, long requestTimeout) {
setDaemon(true);
this.pushHandler = storage;
Expand Down Expand Up @@ -114,7 +114,7 @@ public void run() {
}

public <T extends OBinaryResponse> T subscribe(
OBinaryRequest<T> request, OStorageRemoteSession session) {
OBinaryRequest<T> request, ORemoteClientSession session) {
try {
long timeout;
synchronized (this) {
Expand Down
Loading

0 comments on commit ac107eb

Please sign in to comment.