Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a StreamInterceptor interface to allow users to plug in custom interceptors for formats like Zstd. #930

Merged
merged 9 commits into from
Jan 22, 2025

Conversation

tgregg
Copy link
Contributor

@tgregg tgregg commented Aug 30, 2024

Description of changes:

This library has always had auto-detection of GZIP streams built in, meaning that when users attempt to construct an IonReader from an InputStream or byte[], the given input is checked for the GZIP format header and wrapped in a GZIPInputStream if the header is present.

Now that many users are replacing GZIP with other compression formats like Zstd, we have to decide how to make this library as user-friendly as possible for users making that transition while limiting the amount of special-case code and dependencies that we add to the library.

This PR sketches out one possibility: to define an interface (provisionally named StreamInterceptor) that can be implemented either by users directly, or by external libraries that we vend, to plug in support for any desired format. The PR demonstrates how this mechanism is used by replacing the existing GZIP detection support with support that is delivered via a new GZIPStreamInterceptor implementation.

The ZstdStreamInterceptorTest demonstrates how a StreamInterceptor that recognizes Zstd streams can be plugged into the IonReaderBuilder and IonSystem. In summary, users that wish to support Zstd would change existing code that looks like

IonReaderBuilder readerBuilder = IonReaderBuilder.standard();

to

IonReaderBuilder readerBuilder = IonReaderBuilder.standard().addStreamInterceptor(ZstdStreamInterceptor.INSTANCE);

and code that looks like

IonSystem ION_SYSTEM = IonSystemBuilder.standard().build();

to

IonSystem ION_SYSTEM = IonSystemBuilder.standard()
    .withReaderBuilder(IonReaderBuilder.standard().addStreamInterceptor(ZstdStreamInterceptor.INSTANCE))
    .build();

Critically, this does not require code changes in every location that an IonReader is constructed, and works with all methods of constructing readers (e.g. IonSystem.newReader variants, IonSystem.singleValue, IonSystem.iterate, IonLoader.load, IonReaderBuilder.build variants, etc.).

Comments on the approach are welcomed.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@artemkach
Copy link

I think a bit of working backwards would help inform your design. Thinking out loud, for a customer migrating to new formats, the desired experience would be, in order of preference:

  1. No-op.
  2. Configuration change.
  3. Trivial code change.
  4. Non-trivial code change.

#1 is out due to the desire to limit this library's dependencies and scope.

An example of #2 would be someone else building a new library that wraps ion-java and injects new functionality in a completely transparent way. This could be done with intercepting proxies using an AOP library or java.lang.reflect.Proxy. Customer would add the new library as a dependency without having to make any code changes. While that's cool in theory, ion-java has multiple places where IonReader is constructed, as you mention, so this might be challenging to implement.

#3 is potentially a good tradeoff: if customers are willing to make a small configuration change, they might as well agree to a trivial code change. I think your proposal works well to enable this approach: they add the new library that provides custom ion-java interceptors and they make a low-risk code change where their reader is constructed.

The other aspect of migration is being able to handle both the old gzip format and the new zstd/whatever format. Your proposed design addresses that by supporting multiple interceptors and using the first one that claims a header match. One can think of use cases where chaining more than one interceptor would be beneficial, but I don't know if that's stepping into overengineering territory. Just make sure to avoid a one-way door and leave the opening for future extension.

Another possibly-overengineering point is that not all format detection logic fits into the "fixed header" mold. Sometimes headers are not fixed length, and sometimes they are not headers at all. This works 99% of the time though, so the same comment about avoiding one-way doors.

@toddjonker
Copy link
Contributor

toddjonker commented Sep 11, 2024

Please consider automatically discovering interceptors via the services API, to make integration as easy as dropping them on the classpath. It's going to be annoying if one needs to configure these all over, when I expect most of the time a customer will want to enable something for the entire application or application suite.

[Update] Per @artemkach comment, this is effectively a 1.5 classpath-only change, and much simpler for everyone than AOP or proxy injection.

Copy link
Contributor

@toddjonker toddjonker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new classes should be in com.amazon.ion.util since they aren't coupled to Ion really.

src/main/java/com/amazon/ion/StreamInterceptor.java Outdated Show resolved Hide resolved
src/main/java/com/amazon/ion/StreamInterceptor.java Outdated Show resolved Hide resolved
src/main/java/com/amazon/ion/StreamInterceptor.java Outdated Show resolved Hide resolved
* @return a new InputStream.
* @throws IOException if thrown when constructing the new InputStream.
*/
InputStream newInputStream(InputStream interceptedStream) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether this should have a sibling method for character streams, so one can accomplish transformations of text inputs.

For example, suppose I want to teach the Ion reader to ignore shebang lines atop my Fusion scripts...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand the use case, though I don't think it's as simple as adding one sibling method; headerLength and matchesHeader would also need different flavors to operate on text instead of bytes. At that point I think a different interface would be best. If we do this, then we could use the same pattern for registration/discovery as we establish in this PR, but I'll leave that out of scope for now.

@tgregg
Copy link
Contributor Author

tgregg commented Sep 26, 2024

Please consider automatically discovering interceptors via the services API, to make integration as easy as dropping them on the classpath. It's going to be annoying if one needs to configure these all over, when I expect most of the time a customer will want to enable something for the entire application or application suite.

That sounds like a good experience. I will look into it.

@tgregg tgregg force-pushed the compressed-stream-interceptor branch from 8d3aede to 4c24d36 Compare December 5, 2024 21:10
@tgregg
Copy link
Contributor Author

tgregg commented Dec 5, 2024

Revision 2:

  • Incorporates feedback from @toddjonker about naming, documentation, and class locations.
  • Allows users to add InputStreamInterceptor implementations by registering service providers on the classpath (as suggested by @toddjonker and @artemkach's suggestion to consider making this available via configuration change), as an alternative to adding them manually using IonReaderBuilder.addInputStreamInterceptor. In addition to the added unit tests, I tried this out in a separate project and was able to register a stream interceptor by simply adding a file named com.amazon.ion.util.InputStreamInterceptor, containing the fully qualified class name of my custom implementation, to the project's META-INF/services/. More information about service providers can be found at https://docs.oracle.com/javase/9/docs/api/java/util/ServiceLoader.html

@tgregg tgregg marked this pull request as ready for review December 5, 2024 21:19
@tgregg tgregg force-pushed the compressed-stream-interceptor branch from 4c24d36 to b50164a Compare January 6, 2025 21:21
Comment on lines 41 to 44
// Detected stream interceptors. Each thread may have its own list because each thread may have a different
// context class loader. This list could be an instance variable, but since there may be use cases that require
// creating many IonReaderBuilder instances per thread, we prefer to inspect the classpath once per thread instead
// of once per instance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel right, but I'm not educated on ServiceLoader best practices. My main worry is that I don't think there's a tight/stable relationship between threads and context class loaders. Indeed, the relation is mutable.

I wonder if instead this should cache by ClassLoader --> ServiceLoader, not by thread. Since ServiceLoader does its own caching, that may end up being simpler (since that removes the need to cache the interceptors themselves).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a synchronized map of ClassLoader -> ServiceLoader.

// Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called
// from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream,
// GZIPInputStream -> PushbackInputStream -> ByteArrayInputStream). If this creates a drag on efficiency,
// alternatives should be evaluated.
byte[] possibleIVM = new byte[_Private_IonConstants.BINARY_VERSION_MARKER_SIZE];
byte[] possibleIVM = new byte[maxHeaderLength];
InputStream ionData = source;
int bytesRead;
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dosen't this need to read in a loop, to ensure sufficient bytes are present?

read can return any number of bytes, for any reason, and almost always needs to loop. I think this should loop until it either reads maxHeaderLength or hits EOF.

Looking at the comment below, this feels already-defective in a way that'll probably fail very rarely, but given support for additional encoding formats, it's now more worrisome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call. It's an easy enough fix, so I included it in the next revision. This doesn't necessarily solve the problem of an InputStream that is actually growing, but it does solve the problem of an InputStream that provides fewer bytes than are actually available when read is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it "solves" the growing-stream situation by blocking until we can apply all potentially-relevant interceptors. That seems like the right behavior.

Comment on lines 318 to 311
} catch (IOException e) {
// Some InputStream implementations throw IOExceptions (e.g. EOFException) in certain cases to convey
// that the end of the stream has been reached.
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this catch should be removed.

Throwing an exception at EOF violates the contract of read, which specifically states (emphasis added):

IOException - If the first byte cannot be read for any reason other than end of file, or if the input stream has been closed, or if some other I/O error occurs.

If the exception is not expressing EOF, then this code swallows a significant exception indicating something is broken with the stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed, the general IOException case should throw. We will catch and handle EOFException instead, as this is known to be used by GZIPInputStream. See this test for an illustration: https://github.com/amazon-ion/ion-java/blob/master/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java#L205

This fails if we don't handle EOFException here.

Comment on lines 323 to 325
// detected. This happens on every call to `build()`. If this becomes a performance bottleneck in an
// application, the application owner should be encouraged to manually specify stream interceptors using
// `addInputStreamInterceptor()`, causing this method to be bypassed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could preload all interceptors in computeIfAbsent (or similar) and cache the interceptor list instead of the ServiceLoader.

Caching the ServiceLoader is buying laziness (looks like it won't instantiate any instances until an iterator pulls them) but that's not really needed here, because the list of interceptors is not going to be large.

(TBH I'm more worried about potential contention on the static map, which could happen if the application churns through lots of classloaders and creates lots of readers.)

(Oh crud, maybe that needs to be a WeakHashMap so classloaders can be GCd?)

I don't think any of this is blocking, but it might be worth commenting and/or cutting an issue to follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Josh pointed me at Hadoop's technique for achieving this: https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java#L42-L43

Initially I was surprised to see they were relying on simple static initialization -- no additional attempts to cache. After reading up a little more on static initialization, I think we can simplify our solution by using the same technique. That's because static initialization is guaranteed thread-safe and is done once per ClassLoader that loads the class. If the class is loaded more than once, it will have different statically initialized state -- meaning we get the per-ClassLoader caching for free.

This is slightly more annoying to test, as it requires using a lot of reflection, but it's simple in the source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, I'm gonna bookmark that!

public void expectFailureWhenHeaderLengthIsInvalid(ZstdStream stream) {
IonReaderBuilder builder = IonReaderBuilder.standard()
// This header length is invalid because an array of that size cannot be allocated.
.addInputStreamInterceptor(new LengthTooLongInterceptor(Integer.MAX_VALUE))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job; I'd never have considered this case.

src/main/java/com/amazon/ion/system/IonReaderBuilder.java Outdated Show resolved Hide resolved
src/main/java/com/amazon/ion/system/IonReaderBuilder.java Outdated Show resolved Hide resolved
public void headerRequiresMultipleInputStreamReads() throws IOException {
IonReaderBuilder builder = IonReaderBuilder.standard()
.addInputStreamInterceptor(new ZstdStreamInterceptor());
try (IonReader reader = builder.build(new OneBytePerReadInputStream(new ByteArrayInputStream(ZstdStream.BINARY_BYTES)))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try (IonReader reader = builder.build(new OneBytePerReadInputStream(new ByteArrayInputStream(ZstdStream.BINARY_BYTES)))) {
try (IonReader reader = builder.build(new OneBytePerReadInputStream(stream(binaryBytes())))) {

Assuming you take the ZstdStream suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to ignore the SpotBugs warning that I point out in my other comment, which required re-baselining. Annoyingly, this changes the entire file.

*/
public List<InputStreamInterceptor> getInputStreamInterceptors() {
if (streamInterceptors == null) {
return DETECTED_STREAM_INTERCEPTORS;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spotbugs warns here:

EI_EXPOSE_REP: May expose internal representation by returning reference to mutable object

However, this list is unmodifiable. Please double-check this in your review. Rather than do something redundant to satisfy Spotbugs, I decided to re-baseline (effectively ignoring this violation).

@tgregg tgregg force-pushed the compressed-stream-interceptor branch from 54c4b7a to 514ca6c Compare January 14, 2025 00:31
Comment on lines +42 to +46
// Stream interceptors detected by the ClassLoader.
// Note: each ClassLoader may have access to a different list of detected interceptors, but static initialization
// is performed once per load of the class, and is guaranteed thread-safe. Therefore, there is no need to manually
// maintain a thread-safe cache of detected interceptors.
private static final List<InputStreamInterceptor> DETECTED_STREAM_INTERCEPTORS = Collections.unmodifiableList(detectStreamInterceptorsOnClasspath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the comment :D

Comment on lines 280 to 281
* As an alternative, or in addition, to adding stream interceptors manually using this method,
* users may register implementations as service providers on the classpath.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* As an alternative, or in addition, to adding stream interceptors manually using this method,
* users may register implementations as service providers on the classpath.
* Users may also or instead register implementations as service providers on the classpath.


@Override
public String formatName() {
return "Zstd";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "Zstd";
return "zstd";

Comment on lines 313 to 314
AtomicReference<Throwable> withDefaultClassLoaderError = registerExceptionHandler(withDefaultClassLoader);
Thread withCustomClassLoader = new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AtomicReference<Throwable> withDefaultClassLoaderError = registerExceptionHandler(withDefaultClassLoader);
Thread withCustomClassLoader = new Thread(() -> {
AtomicReference<Throwable> withDefaultClassLoaderError = registerExceptionHandler(withDefaultClassLoader);
Thread withCustomClassLoader = new Thread(() -> {

Comment on lines +347 to +348
failOnAnyError(withDefaultClassLoaderError);
failOnAnyError(withCustomClassLoaderError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this instead be phrased as a retrieval operation, fetching the list of interceptors from each thread? Then you could do the assertions in the main thread and each thread would be doing the same work (retrieving a list) absent the classloader configuration part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could probably be done that way, though the second thread would still have some special work to assert the sameness of the interceptors returned from a repetitive call. I'm not really inclined to overhaul it.

Copy link
Contributor

@toddjonker toddjonker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

@tgregg tgregg merged commit 4ccee4d into master Jan 22, 2025
13 checks passed
@tgregg tgregg deleted the compressed-stream-interceptor branch January 22, 2025 00:21
tgregg added a commit that referenced this pull request Jan 23, 2025
…ustom interceptors for formats like Zstd. (#930)

* Adds a StreamInterceptor interface to allow users to plug in custom interceptors for formats like Zstd.

* Adds support for detection of interceptors on the classpath; renames StreamInterceptor to InputStreamInterceptor.

* Addresses PR feedback on the addition of InputStreamInterceptor.

* Adds support for detecting format headers from InputStreams that require multiple read() calls to produce enough bytes to fill a header.

* Addresses more PR feedback.

* Uses static initialization of stream interceptors instead of caching.

* Makes manually added stream interceptors follow detected stream interceptors, rather than replacing them.

* Renames InputStreamInterceptor methods to avoid referring to 'headers'.

* Minor cleanups.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants