Skip to content

Commit

Permalink
Adds javadoc for common senders, fixes small bug in libthrift (#150)
Browse files Browse the repository at this point in the history
This adds javadoc for common senders, notably showing how to use basic
auth. This also fixes some unnecessary exception throwing in spring
beans and fixes a bug where scribe was never closed.
  • Loading branch information
adriancole authored Jun 25, 2019
1 parent 776fc8d commit 89e911c
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,43 @@
import zipkin2.Callback;
import zipkin2.CheckResult;
import zipkin2.codec.Encoding;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.Sender;

/** This sends (usually json v2) encoded spans to an ActiveMQ queue. */
/**
* This sends (usually json v2) encoded spans to an ActiveMQ queue.
*
* <h3>Usage</h3>
*
* This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}.
*
* <p>Here's a simple configuration, configured for json:
*
* <pre>{@code
* sender = ActiveMQSender.create("failover:tcp://localhost:61616");
* }</pre>
*
* <p>Here's an example with an explicit connection factory and protocol buffers encoding:
*
* <pre>{@code
* connectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
* connectionFactory.setClientIDPrefix("zipkin");
* connectionFactory.setConnectionIDPrefix("zipkin");
* sender = ActiveMQSender.newBuilder()
* .connectionFactory(connectionFactory)
* .encoding(Encoding.PROTO3)
* .build();
* }</pre>
*
* <h3>Compatibility with Zipkin Server</h3>
*
* <a href="https://github.com/openzipkin/zipkin">Zipkin server</a> should be v2.15 or higher.
*
* <h3>Implementation Notes</h3>
*
* <p>This sender is thread-safe.
*/
public final class ActiveMQSender extends Sender {

public static ActiveMQSender create(String brokerUrl) {
Expand Down Expand Up @@ -133,7 +166,7 @@ public final ActiveMQSender build() {
return lazyInit.result.checkResult;
}

@Override public void close() throws IOException {
@Override public void close() {
closeCalled = true;
lazyInit.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ActiveMQConn get() throws IOException {
return result;
}

void close() throws IOException {
void close() {
ActiveMQConn maybe = result;
if (maybe != null) result.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,41 @@
/**
* This sends (usually json v2) encoded spans to a RabbitMQ queue.
*
* <h3>Usage</h3>
*
* This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}.
*
* <p>Here's a simple configuration, configured for json:
*
* <pre>{@code
* sender = RabbitMQSender.create("localhost:5672");
* }</pre>
*
* <p>Here's an example with an explicit SSL connection factory and protocol buffers encoding:
*
* <pre>{@code
* connectionFactory = new ConnectionFactory();
* connectionFactory.setHost("localhost");
* connectionFactory.setPort(5671);
* connectionFactory.useSslProtocol();
* sender = RabbitMQSender.newBuilder()
* .connectionFactory(connectionFactory)
* .encoding(Encoding.PROTO3)
* .build();
* }</pre>
*
* <h3>Compatibility with Zipkin Server</h3>
*
* <a href="https://github.com/openzipkin/zipkin">Zipkin server</a> should be v2.1 or higher.
*
* <h3>Implementation Notes</h3>
*
* <p>The sender does not use <a href="https://www.rabbitmq.com/confirms.html">RabbitMQ Publisher
* Confirms</a>, so messages considered sent may not necessarily be received by consumers in case of
* RabbitMQ failure.
*
* <p>For thread safety, a channel is created for each thread that calls {@link #sendSpans(List)}.
* <p>This sender is thread-safe: a channel is created for each thread that calls
* {@link #sendSpans(List)}.
*/
public final class RabbitMQSender extends Sender {
/** Creates a sender that sends {@link Encoding#JSON} messages. */
Expand Down
32 changes: 30 additions & 2 deletions kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,44 @@
import zipkin2.Callback;
import zipkin2.CheckResult;
import zipkin2.codec.Encoding;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.AwaitableCallback;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.Sender;

/**
* This sends (usually json v2) encoded spans to a Kafka topic.
*
* <p>This sender is thread-safe.
* <h3>Usage</h3>
*
* <p>This sender is linked against Kafka 0.10.2+, which allows it to work with Kafka 0.10+ brokers
* This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}.
*
* <p>Here's a simple configuration, configured for json:
*
* <pre>{@code
* sender = KafkaSender.create("localhost:9092");
* }</pre>
*
* <p>Here's an example that overrides properties and protocol buffers encoding:
*
* <pre>{@code
* Properties overrides = new Properties();
* overrides.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
* sender = KafkaSender.newBuilder()
* .bootstrapServers("host1:9092,host2:9092")
* .overrides(overrides)
* .encoding(Encoding.PROTO3)
* .build();
* }</pre>
*
* <h3>Compatibility with Zipkin Server</h3>
*
* <a href="https://github.com/openzipkin/zipkin">Zipkin server</a> should be v1.26 or higher.
*
* <h3>Implementation Notes</h3>
*
* <p>This sender is thread-safe. This sender is linked against Kafka 0.10.2+, which allows it to
* work with Kafka 0.10+ brokers
*/
public final class KafkaSender extends Sender {
/** Creates a sender that sends {@link Encoding#JSON} messages. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,14 @@ public CheckResult check() {
}
}

@Override
public void close() throws IOException {
@Override public void close() {
if (closeCalled) return;
closeCalled = true;
super.close();
ScribeClient client = this.client;
if (client != null) client.close();
}

@Override
public final String toString() {
@Override public final String toString() {
return "LibthriftSender(" + host + ":" + port + ")";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ boolean log(List<byte[]> encodedSpans) throws TException {
}
}

@Override
public void close() {
@Override public void close() {
socket.close();
}
}
37 changes: 35 additions & 2 deletions okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,54 @@
import okio.Okio;
import zipkin2.CheckResult;
import zipkin2.codec.Encoding;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Reports spans to Zipkin, using its <a href="https://zipkin.io/zipkin-api/#/">POST</a> endpoint.
*
* <h3>Usage</h3>
*
* This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}.
*
* <p>Here's a simple configuration, configured for json:
*
* <pre>{@code
* sender = OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans");
* }</pre>
*
* <p>Here's an example that adds <a href="https://github.com/square/okhttp/blob/master/samples/guide/src/main/java/okhttp3/recipes/Authenticate.java">basic
* auth</a> (assuming you have an authenticating proxy):
*
* <pre>{@code
* credential = Credentials.basic("me", "secure");
* sender = OkHttpSender.newBuilder()
* .endpoint("https://authenticated-proxy/api/v2/spans")
* .clientBuilder().authenticator(new Authenticator() {
* @Override
* public Request authenticate(Route route, Response response) throws IOException {
* if (response.request().header("Authorization") != null) {
* return null; // Give up, we've already attempted to authenticate.
* }
* return response.request().newBuilder()
* .header("Authorization", credential)
* .build();
* }
* })
* .build();
* }</pre>
*
* <h3>Implementation Notes</h3>
*
* <p>This sender is thread-safe.
*/
public final class OkHttpSender extends Sender {

/** Creates a sender that posts {@link Encoding#JSON} messages. */
public static OkHttpSender create(String endpoint) {
return newBuilder().encoding(Encoding.JSON).endpoint(endpoint).build();
return newBuilder().endpoint(endpoint).build();
}

public static Builder newBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ActiveMQSenderFactoryBean extends AbstractFactoryBean {
Encoding encoding;
Integer messageMaxBytes;

@Override protected ActiveMQSender createInstance() throws Exception {
@Override protected ActiveMQSender createInstance() {
ActiveMQSender.Builder builder = ActiveMQSender.newBuilder();
if (url == null) throw new IllegalArgumentException("url is required");
if (queue != null) builder.queue(queue);
Expand All @@ -53,7 +53,7 @@ public class ActiveMQSenderFactoryBean extends AbstractFactoryBean {
return true;
}

@Override protected void destroyInstance(Object instance) throws Exception {
@Override protected void destroyInstance(Object instance) {
((ActiveMQSender) instance).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AsyncReporterFactoryBean extends AbstractFactoryBean {
return AsyncReporter.class;
}

@Override protected AsyncReporter createInstance() throws Exception {
@Override protected AsyncReporter createInstance() {
AsyncReporter.Builder builder = AsyncReporter.builder(sender);
if (metrics != null) builder.metrics(metrics);
if (messageMaxBytes != null) builder.messageMaxBytes(messageMaxBytes);
Expand All @@ -46,7 +46,7 @@ public class AsyncReporterFactoryBean extends AbstractFactoryBean {
return encoder != null ? builder.build(encoder) : builder.build();
}

@Override protected void destroyInstance(Object instance) throws Exception {
@Override protected void destroyInstance(Object instance) {
((AsyncReporter) instance).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class KafkaSenderFactoryBean extends AbstractFactoryBean {
Encoding encoding;
Integer messageMaxBytes;

@Override protected KafkaSender createInstance() throws Exception {
@Override protected KafkaSender createInstance() {
KafkaSender.Builder builder = KafkaSender.newBuilder();
if (bootstrapServers != null) builder.bootstrapServers(bootstrapServers);
if (encoding != null) builder.encoding(encoding);
Expand All @@ -41,7 +41,7 @@ public class KafkaSenderFactoryBean extends AbstractFactoryBean {
return true;
}

@Override protected void destroyInstance(Object instance) throws Exception {
@Override protected void destroyInstance(Object instance) {
((KafkaSender) instance).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class LibthriftSenderFactoryBean extends AbstractFactoryBean {
Integer messageMaxBytes;

@Override
protected LibthriftSender createInstance() throws Exception {
protected LibthriftSender createInstance() {
LibthriftSender.Builder builder = LibthriftSender.newBuilder();
if (host != null) builder.host(host);
if (port != null) builder.port(port);
Expand All @@ -46,7 +46,7 @@ public boolean isSingleton() {
}

@Override
protected void destroyInstance(Object instance) throws Exception {
protected void destroyInstance(Object instance) {
((LibthriftSender) instance).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class OkHttpSenderFactoryBean extends AbstractFactoryBean {
Boolean compressionEnabled;
Integer messageMaxBytes;

@Override protected OkHttpSender createInstance() throws Exception {
@Override protected OkHttpSender createInstance() {
OkHttpSender.Builder builder = OkHttpSender.newBuilder();
if (endpoint != null) builder.endpoint(endpoint);
if (encoding != null) builder.encoding(encoding);
Expand All @@ -48,7 +48,7 @@ public class OkHttpSenderFactoryBean extends AbstractFactoryBean {
return true;
}

@Override protected void destroyInstance(Object instance) throws Exception {
@Override protected void destroyInstance(Object instance) {
((OkHttpSender) instance).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.reporter.beans;

import java.io.IOException;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import zipkin2.codec.Encoding;
import zipkin2.reporter.amqp.RabbitMQSender;
Expand All @@ -27,7 +28,7 @@ public class RabbitMQSenderFactoryBean extends AbstractFactoryBean {
String username, password;
Integer messageMaxBytes;

@Override protected RabbitMQSender createInstance() throws Exception {
@Override protected RabbitMQSender createInstance() {
RabbitMQSender.Builder builder = RabbitMQSender.newBuilder();
if (addresses != null) builder.addresses(addresses);
if (encoding != null) builder.encoding(encoding);
Expand All @@ -48,7 +49,7 @@ public class RabbitMQSenderFactoryBean extends AbstractFactoryBean {
return true;
}

@Override protected void destroyInstance(Object instance) throws Exception {
@Override protected void destroyInstance(Object instance) throws IOException {
((RabbitMQSender) instance).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class URLConnectionSenderFactoryBean extends AbstractFactoryBean {
Boolean compressionEnabled;
Integer messageMaxBytes;

@Override protected URLConnectionSender createInstance() throws Exception {
@Override protected URLConnectionSender createInstance() {
URLConnectionSender.Builder builder = URLConnectionSender.newBuilder();
if (endpoint != null) builder.endpoint(endpoint);
if (encoding != null) builder.encoding(encoding);
Expand All @@ -45,7 +45,7 @@ public class URLConnectionSenderFactoryBean extends AbstractFactoryBean {
return true;
}

@Override protected void destroyInstance(Object instance) throws Exception {
@Override protected void destroyInstance(Object instance) {
((URLConnectionSender) instance).close();
}

Expand Down

0 comments on commit 89e911c

Please sign in to comment.