Skip to content

Commit

Permalink
migrates to zipkin-reporter 3.2 BytesMessageSender (#3690)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Jan 15, 2024
1 parent 87ec1c1 commit 9dc354b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 94 deletions.
2 changes: 1 addition & 1 deletion zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

<!-- Sometimes we need to override Armeria's Brave version -->
<brave.version>6.0.0</brave.version>
<zipkin-reporter.version>3.1.1</zipkin-reporter.version>
<zipkin-reporter.version>3.2.1</zipkin-reporter.version>
<log4j.version>2.21.1</log4j.version>
<proto.generatedSourceDirectory>${project.build.directory}/generated-test-sources/wire</proto.generatedSourceDirectory>
</properties>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.CollectorMetrics;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.ReporterMetrics;
import zipkin2.reporter.Sender;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.server.internal.ConditionalOnSelfTracing;
import zipkin2.storage.StorageComponent;
Expand Down Expand Up @@ -135,44 +132,29 @@ public class ZipkinSelfTracingConfiguration {
}

/** Lazily looks up the storage component in order to avoid proxying. */
static final class LocalSender extends Sender {
static final class LocalSender extends BytesMessageSender.Base {
final BeanFactory factory;
volatile StorageComponent delegate; // volatile to prevent stale reads

LocalSender(BeanFactory factory) {
this.factory = factory;
}

@Override public Encoding encoding() {
// TODO: less memory efficient, but not a huge problem for self-tracing which is rarely on
// https://github.com/openzipkin/zipkin-reporter-java/issues/178
return Encoding.JSON;
super(Encoding.JSON);
this.factory = factory;
}

@Override public int messageMaxBytes() {
return 5 * 1024 * 1024; // arbitrary
}

@Override public int messageSizeInBytes(List<byte[]> list) {
return Encoding.JSON.listSizeInBytes(list);
}

@Override public Call<Void> sendSpans(List<byte[]> encodedSpans) {
@Override public void send(List<byte[]> encodedSpans) throws IOException {
List<Span> spans = new ArrayList<>(encodedSpans.size());
for (byte[] encodedSpan : encodedSpans) {
Span v2Span = SpanBytesDecoder.JSON_V2.decodeOne(encodedSpan);
spans.add(v2Span);
}

return new CallAdapter<>(delegate().spanConsumer().accept(spans));
}

@Override public CheckResult check() {
zipkin2.CheckResult result = delegate().check();
if (result.ok()) {
return CheckResult.OK;
}
return CheckResult.failed(result.error());
delegate().spanConsumer().accept(spans).execute();
}

@Override public String toString() {
Expand All @@ -197,36 +179,6 @@ StorageComponent delegate() {
}
}

static final class CallAdapter<V> extends Call<V> {
private final zipkin2.Call<V> delegate;

public CallAdapter(zipkin2.Call<V> delegate) {
this.delegate = delegate;
}

@Override public V execute() throws IOException {
return delegate.execute();
}

@Override public void enqueue(Callback<V> callback) {
delegate.enqueue(new CallbackAdapter<>(callback));
}

@Override public void cancel() {
delegate.cancel();
}

@Override public boolean isCanceled() {
return delegate.isCanceled();
}

@Override public Call<V> clone() {
return new CallAdapter<>(delegate.clone());
}
}



static final class ReporterMetricsAdapter implements ReporterMetrics {
final BeanFactory factory;
volatile CollectorMetrics delegate; // volatile to prevent stale reads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.reporter.AwaitableCallback;
import zipkin2.server.internal.brave.CallbackAdapter;

import static com.linecorp.armeria.common.util.Exceptions.clearTrace;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -182,18 +182,30 @@ class ThrottledCallTest {
.isEqualTo(STORAGE_THROTTLE_MAX_CONCURRENCY);
}

@Test void enqueue_throttlesBack_whenStorageRejects() {
@Test void enqueue_throttlesBack_whenStorageRejects() throws Exception {
Listener listener = mock(Listener.class);
FakeCall call = new FakeCall();
call.overCapacity = true;

ThrottledCall throttle =
new ThrottledCall(call, executor, mockLimiter(listener), limiterMetrics, isOverCapacity);

AwaitableCallback callback = new AwaitableCallback();
throttle.enqueue(new CallbackAdapter<>(callback));
final CountDownLatch countDown = new CountDownLatch(1);
final AtomicReference<Throwable> throwable = new AtomicReference<>();
throttle.enqueue(new Callback<>() {
@Override public void onSuccess(Void value) {
countDown.countDown();
}

@Override public void onError(Throwable t) {
throwable.set(t);
countDown.countDown();
}
});

countDown.await();

assertThatThrownBy(callback::await).isEqualTo(OVER_CAPACITY);
assertThat(throwable).hasValue(OVER_CAPACITY);

verify(listener).onDropped();
}
Expand Down

0 comments on commit 9dc354b

Please sign in to comment.