Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified service discovery and refactoring of registry events #342

Merged
merged 8 commits into from
Sep 18, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public void beforeAll() {
"Seed address: "
+ seed.discovery().address()
+ ", services address: "
+ node.serviceAddress()
+ ", seed serviceRegistry: "
+ seed.serviceRegistry().listServiceReferences());
+ node.serviceAddress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class ServiceEndpoint {

Expand Down Expand Up @@ -64,10 +65,27 @@ public Map<String, String> tags() {
return tags;
}

/**
* Return collection of service registratrions.
*
* @return collection of {@link ServiceRegistration}
*/
public Collection<ServiceRegistration> serviceRegistrations() {
return serviceRegistrations;
}

/**
* Creates collection of service references from this service endpoint.
*
* @return collection of {@link ServiceReference}
*/
public Collection<ServiceReference> serviceReferences() {
return serviceRegistrations
.stream()
.flatMap(sr -> sr.methods().stream().map(sm -> new ServiceReference(sm, sr, this)))
.collect(Collectors.toList());
}

@Override
public String toString() {
return "ServiceEndpoint{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ static ServiceDiscovery getDiscovery() {
.orElseThrow(() -> new IllegalStateException("ServiceDiscovery not configured"));
}

Mono<ServiceDiscovery> start(DiscoveryConfig discoveryConfig);
Mono<ServiceDiscovery> start(ServiceDiscoveryConfig config);

Mono<Void> shutdown();

Flux<DiscoveryEvent> listen();
Flux<ServiceDiscoveryEvent> listen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.transport.Address;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

public class DiscoveryConfig {
public class ServiceDiscoveryConfig {

private Integer port;
private Address[] seeds;
Expand All @@ -18,7 +19,7 @@ public class DiscoveryConfig {
private String memberHost;
private Integer memberPort;

private DiscoveryConfig(Builder builder) {
private ServiceDiscoveryConfig(Builder builder) {
this.seeds = builder.seeds;
this.serviceRegistry = builder.serviceRegistry;
this.port = builder.port;
Expand Down Expand Up @@ -99,8 +100,8 @@ public Builder serviceRegistry(ServiceRegistry serviceRegistry) {
return this;
}

public DiscoveryConfig build() {
return new DiscoveryConfig(this);
public ServiceDiscoveryConfig build() {
return new ServiceDiscoveryConfig(this);
}

public Builder tags(Map<String, String> tags) {
Expand All @@ -123,4 +124,18 @@ public Builder memberPort(Integer memberPort) {
return this;
}
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ServiceDiscoveryConfig{");
sb.append("port=").append(port);
sb.append(", seeds=").append(Arrays.toString(seeds));
sb.append(", serviceRegistry=").append(serviceRegistry);
sb.append(", tags=").append(tags);
sb.append(", endpoint=").append(endpoint);
sb.append(", memberHost='").append(memberHost).append('\'');
sb.append(", memberPort=").append(memberPort);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Service registration event. This event is being fired when {@link ServiceEndpoint} is being added
* (or removed from) to (from) {@link ServiceRegistry}.
*/
public class DiscoveryEvent {
public class ServiceDiscoveryEvent {

public enum Type {
REGISTERED, // service endpoint added
Expand All @@ -17,17 +17,17 @@ public enum Type {
private final ServiceEndpoint serviceEndpoint;
private final Type type;

private DiscoveryEvent(Type type, ServiceEndpoint serviceEndpoint) {
private ServiceDiscoveryEvent(ServiceEndpoint serviceEndpoint, Type type) {
this.serviceEndpoint = serviceEndpoint;
this.type = type;
}

public static DiscoveryEvent registered(ServiceEndpoint serviceEndpoint) {
return new DiscoveryEvent(Type.REGISTERED, serviceEndpoint);
public static ServiceDiscoveryEvent registered(ServiceEndpoint serviceEndpoint) {
return new ServiceDiscoveryEvent(serviceEndpoint, Type.REGISTERED);
}

public static DiscoveryEvent unregistered(ServiceEndpoint serviceEndpoint) {
return new DiscoveryEvent(Type.UNREGISTERED, serviceEndpoint);
public static ServiceDiscoveryEvent unregistered(ServiceEndpoint serviceEndpoint) {
return new ServiceDiscoveryEvent(serviceEndpoint, Type.UNREGISTERED);
}

public ServiceEndpoint serviceEndpoint() {
Expand All @@ -38,16 +38,20 @@ public Type type() {
return this.type;
}

@Override
public String toString() {
return "RegistrationEvent [serviceEndpoint=" + serviceEndpoint + ", type=" + type + "]";
}

public boolean isRegistered() {
return Type.REGISTERED.equals(this.type);
}

public boolean isUnregistered() {
return Type.UNREGISTERED.equals(this.type);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ServiceDiscoveryEvent{");
sb.append("serviceEndpoint=").append(serviceEndpoint);
sb.append(", type=").append(type);
sb.append('}');
return sb.toString();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.ServiceMessage;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* Service registry interface provides API to register/unregister services in the system and make
Expand All @@ -22,8 +20,4 @@ public interface ServiceRegistry {
boolean registerService(ServiceEndpoint serviceEndpoint);

ServiceEndpoint unregisterService(String endpointId);

Flux<RegistryEvent> listen();

Mono<Void> close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import io.scalecube.cluster.ClusterConfig.Builder;
import io.scalecube.cluster.Member;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.discovery.api.DiscoveryConfig;
import io.scalecube.services.discovery.api.DiscoveryEvent;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryConfig;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.transport.Address;
import java.util.Optional;
Expand All @@ -24,18 +24,16 @@

public class ScalecubeServiceDiscovery implements ServiceDiscovery {

public static final String SERVICE_METADATA = "service";

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

private ServiceRegistry serviceRegistry;
public static final String SERVICE_METADATA = "service";

private ServiceRegistry serviceRegistry;
private Cluster cluster;

private ServiceEndpoint endpoint;

private final DirectProcessor<DiscoveryEvent> subject = DirectProcessor.create();
private final FluxSink<DiscoveryEvent> sink = subject.serialize().sink();
private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
private final FluxSink<ServiceDiscoveryEvent> sink = subject.serialize().sink();

private enum DiscoveryType {
ADDED,
Expand All @@ -54,12 +52,12 @@ public ServiceEndpoint endpoint() {
}

@Override
public Mono<ServiceDiscovery> start(DiscoveryConfig discoveryConfig) {
this.serviceRegistry = discoveryConfig.serviceRegistry();
this.endpoint = discoveryConfig.endpoint();
public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig config) {
this.serviceRegistry = config.serviceRegistry();
this.endpoint = config.endpoint();

ClusterConfig clusterConfig =
clusterConfigBuilder(discoveryConfig)
clusterConfigBuilder(config)
.addMetadata(
this.serviceRegistry
.listServiceEndpoints()
Expand All @@ -85,9 +83,9 @@ public Mono<ServiceDiscovery> start(DiscoveryConfig discoveryConfig) {
}

@Override
public Flux<DiscoveryEvent> listen() {
public Flux<ServiceDiscoveryEvent> listen() {
return Flux.fromIterable(serviceRegistry.listServiceEndpoints())
.map(DiscoveryEvent::registered)
.map(ServiceDiscoveryEvent::registered)
.concatWith(subject);
}

Expand All @@ -102,7 +100,7 @@ public Mono<Void> shutdown() {
});
}

private ClusterConfig.Builder clusterConfigBuilder(DiscoveryConfig config) {
private ClusterConfig.Builder clusterConfigBuilder(ServiceDiscoveryConfig config) {
Builder builder = ClusterConfig.builder();
if (config.seeds() != null) {
builder.seedMembers(config.seeds());
Expand Down Expand Up @@ -141,12 +139,7 @@ private void listenCluster(Cluster cluster) {
}

private void loadClusterServices(Cluster cluster) {
cluster
.otherMembers()
.forEach(
member -> {
loadMemberServices(DiscoveryType.DISCOVERED, member);
});
cluster.otherMembers().forEach(member -> loadMemberServices(DiscoveryType.DISCOVERED, member));
}

private void loadMemberServices(DiscoveryType type, Member member) {
Expand All @@ -165,27 +158,25 @@ private void loadMemberServices(DiscoveryType type, Member member) {
LOGGER.debug("Member: {} is {} : {}", member, type, serviceEndpoint);
if ((type.equals(DiscoveryType.ADDED) || type.equals(DiscoveryType.DISCOVERED))
&& (this.serviceRegistry.registerService(serviceEndpoint))) {

LOGGER.info(
"Service Reference was ADDED since new Member has joined the cluster {} : {}",
member,
serviceEndpoint);

DiscoveryEvent registrationEvent = DiscoveryEvent.registered(serviceEndpoint);
LOGGER.debug("Publish registered: " + registrationEvent);
sink.next(registrationEvent);
ServiceDiscoveryEvent event = ServiceDiscoveryEvent.registered(serviceEndpoint);
LOGGER.debug("Publish registered: " + event);
sink.next(event);

} else if (type.equals(DiscoveryType.REMOVED)
&& (this.serviceRegistry.unregisterService(serviceEndpoint.id()) != null)) {

LOGGER.info(
"Service Reference was REMOVED since Member have left the cluster {} : {}",
member,
serviceEndpoint);

DiscoveryEvent registrationEvent = DiscoveryEvent.unregistered(serviceEndpoint);
LOGGER.debug("Publish unregistered: " + registrationEvent);
sink.next(registrationEvent);
ServiceDiscoveryEvent event = ServiceDiscoveryEvent.unregistered(serviceEndpoint);
LOGGER.debug("Publish unregistered: " + event);
sink.next(event);
}
});
}
Expand Down
Loading