Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Added tests for gRPC retries. #69

Merged
merged 10 commits into from
Nov 13, 2015
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.genomics.utils.grpc;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.auth.ClientAuthInterceptor;
Expand All @@ -26,10 +36,13 @@

import javax.net.ssl.SSLException;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.genomics.utils.GenomicsFactory;

/**
* A convenience class for creating gRPC channels to the Google Genomics API.
*/
public class GenomicsChannel extends Channel {
public class GenomicsChannel extends ManagedChannel {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't suggest you to implement ManagedChannel. You can keep the factory methods, but I would suggest they just return a ManagedChannel.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any public non-static methods. Did you make this class thinking you might add some in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question Eric. I made this class when we needed to expose shutdown for the hangs 042a406 ManagedChannel did not exist at that time, only ChanneImpl.

I'll refactor now - thanks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, before you could add interceptors via the ChannelBuilder. That makes sense.

private static final String GENOMICS_ENDPOINT = "genomics.googleapis.com";
private static final String GENOMICS_SCOPE = "https://www.googleapis.com/auth/genomics";
private static final String API_KEY_HEADER = "X-Goog-Api-Key";
Expand All @@ -40,12 +53,11 @@ public class GenomicsChannel extends Channel {
// the shutdown method and the ClientInterceptors do not return the ManagedChannel instance.
private final ManagedChannel managedChannel;
private final Channel delegate;

private ManagedChannel getGenomicsManagedChannel() throws SSLException {
// Java 8's implementation of GCM ciphers is extremely slow. Therefore we disable
// them here.
List<String> defaultCiphers =
GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> defaultCiphers = GrpcSslContexts.forClient().ciphers(null).build().cipherSuites();
List<String> performantCiphers = new ArrayList<>();
for (String cipher : defaultCiphers) {
if (!cipher.contains("GCM")) {
Expand All @@ -55,75 +67,96 @@ private ManagedChannel getGenomicsManagedChannel() throws SSLException {

return NettyChannelBuilder.forAddress(GENOMICS_ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build())
.build();
.sslContext(GrpcSslContexts.forClient().ciphers(performantCiphers).build()).build();
}

private GenomicsChannel(String apiKey) throws SSLException {
managedChannel = getGenomicsManagedChannel();
Metadata headers = new Metadata();
Metadata.Key<String> apiKeyHeader =
Metadata.Key.of(API_KEY_HEADER, Metadata.ASCII_STRING_MARSHALLER);
headers.put(apiKeyHeader, apiKey);
delegate = ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(headers));
delegate =
ClientInterceptors.intercept(managedChannel,
MetadataUtils.newAttachHeadersInterceptor(headers));
}

private GenomicsChannel(GoogleCredentials creds) throws SSLException {
managedChannel = getGenomicsManagedChannel();
creds = creds.createScoped(
Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor = new ClientAuthInterceptor(creds,
Executors.newSingleThreadExecutor());
creds = creds.createScoped(Arrays.asList(GENOMICS_SCOPE));
ClientAuthInterceptor interceptor =
new ClientAuthInterceptor(creds, Executors.newSingleThreadExecutor());
delegate = ClientInterceptors.intercept(managedChannel, interceptor);
}

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> arg0, CallOptions arg1) {
return delegate.newCall(arg0, arg1);
}

/**
* @see io.grpc.ManagedChannel#shutdownNow()
*/
public void shutdownNow() {
managedChannel.shutdownNow();
}

/**
* @throws InterruptedException
* @throws InterruptedException
* @see io.grpc.ManagedChannel#shutdown()
* @see io.grpc.ManagedChannel#awaitTermination(long, TimeUnit)
*/
public void shutdown(long timeout, TimeUnit unit) throws InterruptedException {
managedChannel.shutdown().awaitTermination(timeout, unit);
}


@Override
public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
return managedChannel.awaitTermination(time, unit);
}

@Override
public boolean isShutdown() {
return managedChannel.isShutdown();
}

@Override
public boolean isTerminated() {
return managedChannel.isTerminated();
}

@Override
public ManagedChannel shutdown() {
return managedChannel.shutdown();
}

@Override
public ManagedChannel shutdownNow() {
return managedChannel.shutdownNow();
}

@Override
public String authority() {
return managedChannel.authority();
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the application
* default credentials for auth.
* Creates a new gRPC channel to the Google Genomics API, using the application default
* credentials for auth.
*/
public static GenomicsChannel fromDefaultCreds() throws IOException {
return fromCreds(GoogleCredentials.getApplicationDefault());
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* api key for auth.
* Creates a new gRPC channel to the Google Genomics API, using the provided api key for auth.
*/
public static GenomicsChannel fromApiKey(String apiKey) throws SSLException {
return new GenomicsChannel(apiKey);
}

/**
* Creates a new gRPC channel to the Google Genomics API, using the provided
* credentials for auth.
* Creates a new gRPC channel to the Google Genomics API, using the provided credentials for auth.
*/
public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOException {
return new GenomicsChannel(creds);
}

/**
* Initialize auth for a gRPC channel from OfflineAuth or the application default credentials.
*
Expand All @@ -132,23 +165,19 @@ public static GenomicsChannel fromCreds(GoogleCredentials creds) throws IOExcept
* https://developers.google.com/identity/protocols/application-default-credentials
*
* @param auth An OfflineAuth object.
* @return The gRPC channel authorized using either the information in the OfflineAuth or application default credentials.
* @return The gRPC channel authorized using either the information in the OfflineAuth or
* application default credentials.
* @throws IOException
* @throws GeneralSecurityException
*/
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth) throws IOException, GeneralSecurityException {
if(auth.hasUserCredentials()) {
public static GenomicsChannel fromOfflineAuth(GenomicsFactory.OfflineAuth auth)
throws IOException, GeneralSecurityException {
if (auth.hasUserCredentials()) {
return fromCreds(auth.getUserCredentials());
} else if(auth.hasApiKey()) {
} else if (auth.hasApiKey()) {
return fromApiKey(auth.apiKey);
}
// Fall back to Default Credentials if the user did not specify user credentials or an api key.
return fromDefaultCreds();
}

@Override
public String authority() {
return delegate.authority();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package com.google.cloud.genomics.utils.grpc;

import io.grpc.ManagedChannel;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand All @@ -33,7 +33,7 @@
* Includes complex retry logic to upon failure resume the stream at the last known good start
* position without returning duplicate data.
*
* TODO: refactor this further to simplify the generic signature.
* TODO: refactor this further to simplify the generic signature.
*
* @param <Request> Streaming request type.
* @param <Response> Streaming response type.
Expand All @@ -43,12 +43,14 @@
public abstract class GenomicsStreamIterator<Request, Response, Item, Stub extends io.grpc.stub.AbstractStub<Stub>>
implements Iterator<Response> {
private static final Logger LOG = Logger.getLogger(GenomicsStreamIterator.class.getName());
private final ExponentialBackOff backoff;
private final GenomicsChannel genomicsChannel;
private final Predicate<Item> shardPredicate;

protected final ManagedChannel genomicsChannel;
protected final Predicate<Item> shardPredicate;
protected final Stub stub;
protected final Request originalRequest;

protected ExponentialBackOff backoff;

// Stateful members used to facilitate complex retry behavior for gRPC streams.
private Iterator<Response> delegate;
private Item lastSuccessfulDataItem;
Expand All @@ -57,20 +59,19 @@ public abstract class GenomicsStreamIterator<Request, Response, Item, Stub exten
/**
* Create a stream iterator that will filter shard data using the predicate, if supplied.
*
* @param channel The channel.
* @param request The request for the shard of data.
* @param auth The OfflineAuth to use for the request.
* @param fields Which fields to include in a partial response or null for all. NOT YET
* IMPLEMENTED.
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce
* a shard boundary and/or limit to SNPs only) or null for no filtering.
* @throws IOException
* @throws GeneralSecurityException
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce a
* shard boundary and/or limit to SNPs only) or null for no filtering.
*/
public GenomicsStreamIterator(Request request, GenomicsFactory.OfflineAuth auth, String fields,
Predicate<Item> shardPredicate) throws IOException, GeneralSecurityException {

protected GenomicsStreamIterator(ManagedChannel channel, Request request, String fields,
Predicate<Item> shardPredicate) {
this.originalRequest = request;
this.shardPredicate = shardPredicate;
genomicsChannel = GenomicsChannel.fromOfflineAuth(auth);
this.genomicsChannel = channel;
stub = createStub(genomicsChannel);

// Using default backoff settings. For details, see
Expand All @@ -83,7 +84,7 @@ public GenomicsStreamIterator(Request request, GenomicsFactory.OfflineAuth auth,
idSentinel = null;
}

abstract Stub createStub(GenomicsChannel genomicsChannel);
abstract Stub createStub(ManagedChannel genomicsChannel);

abstract Iterator<Response> createIteratorFromStub(Request request);

Expand Down Expand Up @@ -168,6 +169,7 @@ private void setStreamStateForRetry() {
// We have never returned any data. No need to set up state needed to filter previously
// returned results.
delegate = createIterator(originalRequest);
return;
}

if (getRequestStart(originalRequest) < getDataItemStart(lastSuccessfulDataItem)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.google.cloud.genomics.utils.grpc;

import io.grpc.ManagedChannel;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Iterator;
Expand Down Expand Up @@ -43,43 +45,57 @@ public class ReadStreamIterator
/**
* Create a stream iterator that can enforce shard boundary semantics.
*
* @param request
* @param auth
* @param shardBoundary
* @param fields
* @param auth The OfflineAuth to use for the request.
* @param request The request for the shard of data.
* @param shardBoundary The shard boundary semantics to enforce.
* @param fields Which fields to include in a partial response or null for all. NOT YET
* IMPLEMENTED.
* @throws IOException
* @throws GeneralSecurityException
*/
public static ReadStreamIterator enforceShardBoundary(StreamReadsRequest request,
OfflineAuth auth, Requirement shardBoundary, String fields) throws IOException,
public static ReadStreamIterator enforceShardBoundary(OfflineAuth auth,
StreamReadsRequest request, Requirement shardBoundary, String fields) throws IOException,
GeneralSecurityException {
return ReadStreamIterator.enforceShardBoundary(GenomicsChannel.fromOfflineAuth(auth), request,
shardBoundary, fields);
}

/**
* Create a stream iterator that can enforce shard boundary semantics.
*
* @param channel The ManagedChannel.
* @param request The request for the shard of data.
* @param shardBoundary The shard boundary semantics to enforce.
* @param fields Which fields to include in a partial response or null for all. NOT YET
* IMPLEMENTED.
*/
public static ReadStreamIterator enforceShardBoundary(ManagedChannel channel,
StreamReadsRequest request, Requirement shardBoundary, String fields) {
Predicate<Read> shardPredicate =
(ShardBoundary.Requirement.STRICT == shardBoundary) ? ShardBoundary
.getStrictReadPredicate(request.getStart()) : null;
// TODO: Facilitate shard boundary predicate here by checking for minimum set of fields in
// partial request.
return new ReadStreamIterator(request, auth, fields, shardPredicate);
return new ReadStreamIterator(channel, request, fields, shardPredicate);
}

/**
* Create a stream iterator.
*
* @param channel The ManagedChannel.
* @param request The request for the shard of data.
* @param auth The OfflineAuth to use for the request.
* @param fields Which fields to include in a partial response or null for all. NOT YET
* IMPLEMENTED.
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce
* a shard boundary and/or limit to SNPs only) or null for no filtering.
* @throws IOException
* @throws GeneralSecurityException
* @param shardPredicate A predicate used to client-side filter results returned (e.g., enforce a
* shard boundary and/or limit to SNPs only) or null for no filtering.
*/
public ReadStreamIterator(StreamReadsRequest request, OfflineAuth auth, String fields,
Predicate<Read> shardPredicate) throws IOException, GeneralSecurityException {
super(request, auth, fields, shardPredicate);
public ReadStreamIterator(ManagedChannel channel, StreamReadsRequest request, String fields,
Predicate<Read> shardPredicate) {
super(channel, request, fields, shardPredicate);
}

@Override
StreamingReadServiceBlockingStub createStub(GenomicsChannel genomicsChannel) {
StreamingReadServiceBlockingStub createStub(ManagedChannel genomicsChannel) {
return StreamingReadServiceGrpc.newBlockingStub(genomicsChannel);
}

Expand Down Expand Up @@ -115,8 +131,7 @@ List<Read> getDataList(StreamReadsResponse response) {

@Override
StreamReadsResponse buildResponse(StreamReadsResponse response, Iterable<Read> dataList) {
return StreamReadsResponse.newBuilder(response).clearAlignments()
.addAllAlignments(dataList)
return StreamReadsResponse.newBuilder(response).clearAlignments().addAllAlignments(dataList)
.build();
}
}
Loading