Skip to content

Commit

Permalink
feat: impl configurable OperatorOutputStream maxBytes (apache#5422)
Browse files Browse the repository at this point in the history
* feat: impl configurable OperatorOutputStream maxBytes

Signed-off-by: tison <[email protected]>

* test: testAzblobLargeFile

Signed-off-by: tison <[email protected]>

* do fix test

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Dec 18, 2024
1 parent e9df3eb commit a9d0ec6
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 6 deletions.
4 changes: 4 additions & 0 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public OperatorOutputStream createOutputStream(String path) {
return new OperatorOutputStream(this, path);
}

public OperatorOutputStream createOutputStream(String path, int maxBytes) {
return new OperatorOutputStream(this, path, maxBytes);
}

public byte[] read(String path) {
return read(nativeHandle, path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,38 @@ protected void disposeInternal(long handle) {
}
}

private static final int MAX_BYTES = 16384;
private static final int DEFAULT_MAX_BYTES = 16384;

private final Writer writer;
private final byte[] bytes = new byte[MAX_BYTES];
private final byte[] bytes;
private final int maxBytes;

private int offset = 0;

public OperatorOutputStream(Operator operator, String path) {
this(operator, path, DEFAULT_MAX_BYTES);
}

public OperatorOutputStream(Operator operator, String path, int maxBytes) {
final long op = operator.nativeHandle;
this.writer = new Writer(constructWriter(op, path));
this.maxBytes = maxBytes;
this.bytes = new byte[maxBytes];
}

@Override
public void write(int b) throws IOException {
bytes[offset++] = (byte) b;
if (offset >= MAX_BYTES) {
if (offset >= maxBytes) {
flush();
}
}

@Override
public void flush() throws IOException {
if (offset > MAX_BYTES) {
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES);
} else if (offset < MAX_BYTES) {
if (offset > maxBytes) {
throw new IOException("INTERNAL ERROR: " + offset + " > " + maxBytes);
} else if (offset < maxBytes) {
final byte[] bytes = Arrays.copyOf(this.bytes, offset);
writeBytes(writer.nativeHandle, bytes);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class BehaviorExtension implements BeforeAllCallback, AfterAllCallback, TestWatcher {
private String testName;

public String scheme;
public AsyncOperator asyncOperator;
public Operator operator;

Expand Down Expand Up @@ -67,6 +68,7 @@ public void beforeAll(ExtensionContext context) {
this.asyncOperator = op.layer(RetryLayer.builder().build());
this.operator = this.asyncOperator.blocking();

this.scheme = scheme;
this.testName = String.format("%s(%s)", context.getDisplayName(), scheme);
log.info(
"\n================================================================================"
Expand Down Expand Up @@ -94,6 +96,7 @@ public void afterAll(ExtensionContext context) {
operator = null;
}

this.scheme = null;
this.testName = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ protected Operator op() {
return behaviorExtension.operator;
}

protected String scheme() {
return behaviorExtension.scheme;
}

/**
* Generates a byte array of random content.
*/
Expand All @@ -57,6 +61,16 @@ public static byte[] generateBytes() {
return content;
}

/**
* Generates a byte array of random content with a specific size.
*/
public static byte[] generateBytes(int size) {
final Random random = new Random();
final byte[] content = new byte[size];
random.nextBytes(content);
return content;
}

/**
* Calculate SHA256 digest of input bytes
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal.test.behavior;

import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.util.UUID;
import org.apache.opendal.OperatorOutputStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class RegressionTest extends BehaviorTestBase {
// @see https://github.com/apache/opendal/issues/5421
@Test
public void testAzblobLargeFile() throws Exception {
assumeTrue(scheme() != null && scheme().equalsIgnoreCase("azblob"));

final String path = UUID.randomUUID().toString();
final int size = 16384 * 10; // 10 x OperatorOutputStream.DEFAULT_MAX_BYTES (10 flushes per write)
final byte[] content = generateBytes(size);

try (OperatorOutputStream operatorOutputStream = op().createOutputStream(path, size)) {
for (int i = 0; i < 20000; i++) {
// More iterations in case BlockCountExceedsLimit doesn't pop up exactly after 100K blocks.
operatorOutputStream.write(content);
}
}
}
}

0 comments on commit a9d0ec6

Please sign in to comment.