Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 16, 2023
1 parent be27ca8 commit 1fa268d
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import io.grpc.CallOptions;
import io.grpc.Metadata;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** A cookie that holds information for retry or routing */
class CookiesHolder {
Expand All @@ -30,17 +30,17 @@ class CookiesHolder {
static final String COOKIE_KEY_PREFIX = "x-goog-cbt-cookie";

/** A map that stores all the routing cookies. */
private final Map<Metadata.Key<String>, String> cookies = new ConcurrentHashMap<>();
private final Map<Metadata.Key<String>, String> cookies = new HashMap<>();

/** Returns CookiesHolder if presents in CallOptions. Otherwise returns null. */
static CookiesHolder fromCallOptions(CallOptions options) {
return options.getOption(COOKIES_HOLDER_KEY);
}

/** Add all the routing cookies to headers if any. */
Metadata addRoutingCookieToHeaders(Metadata headers) {
if (headers != null && !cookies.isEmpty()) {
for (Metadata.Key<String> key : cookies.keySet()) headers.put(key, cookies.get(key));
Metadata injectCookiesInRequestHeaders(Metadata headers) {
for (Metadata.Key<String> key : cookies.keySet()) {
headers.put(key, cookies.get(key));
}
return headers;
}
Expand All @@ -49,14 +49,15 @@ Metadata addRoutingCookieToHeaders(Metadata headers) {
* Iterate through all the keys in trailing metadata, and add all the keys that match
* COOKIE_KEY_PREFIX to cookies.
*/
void setRoutingCookieFromTrailers(Metadata trailers) {
if (trailers != null) {
for (String key : trailers.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
cookies.put(metadataKey, value);
}
void extractCookiesFromResponseTrailers(Metadata trailers) {
if (trailers == null) {
return;
}
for (String key : trailers.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
cookies.put(metadataKey, value);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import io.grpc.Status;

/**
* A cookie interceptor that checks the cookie value from returned ErrorInfo, updates the cookie
* A cookie interceptor that checks the cookie value from returned trailer, updates the cookie
* holder, and inject it in the header of the next request.
*/
class CookieInterceptor implements ClientInterceptor {
class CookiesInterceptor implements ClientInterceptor {

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
Expand All @@ -38,32 +38,33 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Gets the CookiesHolder added from CookiesServerStreamingCallable and
// CookiesUnaryCallable.
// Add CookiesHolder content to request headers if there's any.
CookiesHolder cookie = CookiesHolder.fromCallOptions(callOptions);
if (cookie != null) {
cookie.addRoutingCookieToHeaders(headers);
responseListener = new UpdateCookieListener<>(responseListener, callOptions);
cookie.injectCookiesInRequestHeaders(headers);
responseListener = new UpdateCookieListener<>(responseListener, cookie);
}
super.start(responseListener, headers);
}
};
}

/** Add trailers to CookiesHolder if there's any. * */
static class UpdateCookieListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {

private final CallOptions callOptions;
private final CookiesHolder cookie;

UpdateCookieListener(ClientCall.Listener<RespT> delegate, CallOptions callOptions) {
UpdateCookieListener(ClientCall.Listener<RespT> delegate, CookiesHolder cookiesHolder) {
super(delegate);
this.callOptions = callOptions;
this.cookie = cookiesHolder;
}

@Override
public void onClose(Status status, Metadata trailers) {
CookiesHolder cookiesHolder = CookiesHolder.fromCallOptions(callOptions);
if (cookiesHolder != null) {
cookiesHolder.setRoutingCookieFromTrailers(trailers);
}
cookie.extractCookiesFromResponseTrailers(trailers);
super.onClose(status, trailers);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;

/** Cookie callable injects a placeholder for bigtable retry cookie. */
/**
* The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's
* cookies will be merged into the value holder and will be sent out with the next retry attempt.
*/
class CookiesServerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;

/** Cookie callable injects a placeholder for bigtable retry cookie. */
/**
* The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's
* cookies will be merged into the value holder and will be sent out with the next retry attempt.
*/
class CookiesUnaryCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, ResponseT> {
private UnaryCallable<RequestT, ResponseT> innerCallable;
private final UnaryCallable<RequestT, ResponseT> innerCallable;

CookiesUnaryCallable(UnaryCallable<RequestT, ResponseT> callable) {
this.innerCallable = callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

// Inject channel priming
if (settings.isRefreshingChannel()) {
// Fix the credentials so that they can be shared
Expand All @@ -194,20 +202,18 @@ public static EnhancedBigtableStubSettings finalizeSettings(
}
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));

// Inject the primer
InstantiatingGrpcChannelProvider transportProvider =
(InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider();

builder.setTransportChannelProvider(
transportProvider
.toBuilder()
.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()))
.build());
if (transportProvider != null) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
}
}

if (transportProvider != null) {
builder.setTransportChannelProvider(transportProvider.build());
}

ImmutableMap<TagKey, TagValue> attributes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi
Duration.ofSeconds(10)) // wait this long before considering the connection dead
// Attempts direct access to CBT service over gRPC to improve throughput,
// whether the attempt is allowed is totally controlled by service owner.
.setAttemptDirectPath(true)
.setInterceptorProvider(() -> ImmutableList.of(new CookieInterceptor()));
.setAttemptDirectPath(true);
}

@SuppressWarnings("WeakerAccess")
Expand Down
Loading

0 comments on commit 1fa268d

Please sign in to comment.