Skip to content

Commit

Permalink
Use Java APIs instead of ByteStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Jan 15, 2025
1 parent c767dad commit 318ce93
Show file tree
Hide file tree
Showing 19 changed files with 68 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.execution.buffer;

import com.google.common.collect.AbstractIterator;
import com.google.common.io.ByteStreams;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
Expand All @@ -33,7 +32,7 @@
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.io.ByteStreams.readFully;
import static com.google.common.base.Verify.verify;
import static io.trino.block.BlockSerdeUtil.readBlock;
import static io.trino.block.BlockSerdeUtil.writeBlock;
import static io.trino.execution.buffer.PageCodecMarker.COMPRESSED;
Expand Down Expand Up @@ -158,7 +157,7 @@ private static class PageReader
protected Page computeNext()
{
try {
int read = ByteStreams.read(inputStream, headerBuffer, 0, headerBuffer.length);
int read = inputStream.readNBytes(headerBuffer, 0, headerBuffer.length);
if (read <= 0) {
return endOfData();
}
Expand Down Expand Up @@ -195,7 +194,7 @@ private static class SerializedPageReader
protected Slice computeNext()
{
try {
int read = ByteStreams.read(inputStream, headerBuffer, 0, headerBuffer.length);
int read = inputStream.readNBytes(headerBuffer, 0, headerBuffer.length);
if (read <= 0) {
return endOfData();
}
Expand All @@ -219,7 +218,8 @@ public static Slice readSerializedPage(Slice headerSlice, InputStream inputStrea
int compressedSize = headerSlice.getIntUnchecked(SERIALIZED_PAGE_COMPRESSED_SIZE_OFFSET);
byte[] outputBuffer = new byte[SERIALIZED_PAGE_HEADER_SIZE + compressedSize];
headerSlice.getBytes(0, outputBuffer, 0, SERIALIZED_PAGE_HEADER_SIZE);
readFully(inputStream, outputBuffer, SERIALIZED_PAGE_HEADER_SIZE, compressedSize);
int bytes = inputStream.readNBytes(outputBuffer, SERIALIZED_PAGE_HEADER_SIZE, compressedSize);
verify(bytes == compressedSize, "expected to read %s bytes, but read %s", compressedSize, bytes);
return Slices.wrappedBuffer(outputBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.Request.Builder.prepareGet;
Expand Down Expand Up @@ -226,7 +225,7 @@ public byte[] handle(Request request, io.airlift.http.client.Response response)
if (!APPLICATION_JSON.equals(response.getHeader(CONTENT_TYPE))) {
throw new RuntimeException("Response received was not of type " + APPLICATION_JSON);
}
return toByteArray(response.getInputStream());
return response.getInputStream().readAllBytes();
}
catch (IOException e) {
throw new RuntimeException("Unable to read response from worker", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.sql.gen;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import io.airlift.bytecode.DynamicClassLoader;

import java.io.IOException;
Expand Down Expand Up @@ -60,7 +59,7 @@ private static byte[] getBytecode(Class<?> clazz)
{
try (InputStream stream = clazz.getClassLoader().getResourceAsStream(clazz.getName().replace('.', '/') + ".class")) {
checkArgument(stream != null, "Could not obtain byte code for class %s", clazz.getName());
return ByteStreams.toByteArray(stream);
return stream.readAllBytes();
}
catch (IOException e) {
throw new RuntimeException(format("Could not obtain byte code for class %s", clazz.getName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.uniqueIndex;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
Expand Down Expand Up @@ -887,7 +886,7 @@ public synchronized Response handle(Request request)
checkState(response.getStatusCode() == HttpStatus.OK.code(), "Unexpected status code: %s", response.getStatusCode());
ListMultimap<String, String> headers = response.getHeaders().entries().stream()
.collect(toImmutableListMultimap(entry -> entry.getKey().toString(), Map.Entry::getValue));
byte[] bytes = toByteArray(response.getInputStream());
byte[] bytes = response.getInputStream().readAllBytes();
checkState(bytes.length > 42, "too short");
savedResponse = new TestingResponse(HttpStatus.OK, headers, bytes.clone());
// corrupt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.filesystem.s3;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import io.airlift.log.Logging;
import io.trino.filesystem.AbstractTestTrinoFileSystem;
Expand Down Expand Up @@ -172,7 +171,7 @@ void testFileWithTrailingWhitespaceAgainstNativeClient()
TrinoInputFile inputFile = getFileSystem().newInputFile(fileEntry.location());
assertThat(inputFile.exists()).as("exists").isTrue();
try (TrinoInputStream inputStream = inputFile.newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(contents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -1153,7 +1152,7 @@ public void testFileWithTrailingWhitespace()
TrinoInputFile inputFile = getFileSystem().newInputFile(location);
assertThat(inputFile.exists()).as("exists").isTrue();
try (TrinoInputStream inputStream = inputFile.newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(("test blob content for " + location).getBytes(UTF_8));
}

Expand All @@ -1165,7 +1164,7 @@ public void testFileWithTrailingWhitespace()
// This can break some file system read operations (e.g., TrinoInput.readTail for most filesystems, newStream for caching file systems).
TrinoInputFile newInputFile = getFileSystem().newInputFile(location);
try (TrinoInputStream inputStream = newInputFile.newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(newContents);
}

Expand All @@ -1182,7 +1181,7 @@ public void testFileWithTrailingWhitespace()
assertThat(getFileSystem().newInputFile(target).exists()).as("target exists after rename").isTrue();

try (TrinoInputStream inputStream = getFileSystem().newInputFile(target).newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(("test blob content for " + source).getBytes(UTF_8));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.hdfs.s3.TrinoS3FileSystem.NO_SUCH_BUCKET_ERROR_CODE;
Expand Down Expand Up @@ -894,7 +893,7 @@ public void testStreamingUpload()
InputStream concatInputStream = parts.stream()
.map(UploadPartRequest::getInputStream)
.reduce(new ByteArrayInputStream(new byte[0]), SequenceInputStream::new);
String data = new String(toByteArray(concatInputStream), US_ASCII);
String data = new String(concatInputStream.readAllBytes(), US_ASCII);
assertThat(data).isEqualTo("a" + "foo".repeat(21) + "bar".repeat(44) + "orange".repeat(22));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.filesystem.Location;
Expand Down Expand Up @@ -452,7 +451,7 @@ public String readActual(TrinoDataInputStream input)
throws IOException
{
byte[] bytes = new byte[valueSize() + 10];
ByteStreams.readFully(input, bytes, 5, valueSize());
input.readFully(bytes, 5, valueSize());
return new String(bytes, 5, valueSize(), UTF_8);
}
});
Expand Down Expand Up @@ -649,7 +648,7 @@ private static void testReadOffEnd(DataInputTester tester, byte[] bytes)
throws IOException
{
TrinoDataInputStream input = createTrinoDataInputStream(bytes);
ByteStreams.skipFully(input, bytes.length - tester.valueSize() + 1);
input.skipNBytes(bytes.length - tester.valueSize() + 1);
tester.verifyReadOffEnd(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.parquet;

import com.google.common.io.ByteStreams;
import io.airlift.compress.v3.Decompressor;
import io.airlift.compress.v3.lz4.Lz4Decompressor;
import io.airlift.compress.v3.lzo.LzoDecompressor;
Expand Down Expand Up @@ -88,7 +87,7 @@ private static Slice decompressGzip(Slice input, int uncompressedSize)

try (GZIPInputStream gzipInputStream = new GZIPInputStream(input.getInput(), min(GZIP_BUFFER_SIZE, input.length()))) {
byte[] buffer = new byte[uncompressedSize];
int bytesRead = ByteStreams.read(gzipInputStream, buffer, 0, buffer.length);
int bytesRead = gzipInputStream.readNBytes(buffer, 0, buffer.length);
if (bytesRead != uncompressedSize) {
throw new IllegalArgumentException(format("Invalid uncompressedSize for GZIP input. Expected %s, actual: %s", uncompressedSize, bytesRead));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.io.ByteStreams.readFully;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -69,7 +69,8 @@ public Slice getSlice(int length)
// requested length crosses the slice boundary
byte[] bytes = new byte[length];
try {
readFully(this, bytes, 0, bytes.length);
int read = this.readNBytes(bytes, 0, bytes.length);
verify(read == length, "expected to read %s bytes but got %s", length, read);
}
catch (IOException e) {
throw new RuntimeException("Failed to read " + length + " bytes", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.parquet.reader;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -67,7 +66,7 @@ public void testInput(List<byte[]> chunks)

// rad more than total length
ChunkedInputStream input = input(slices);
int bytesRead = ByteStreams.read(input, buffer, 0, buffer.length);
int bytesRead = input.readNBytes(buffer, 0, buffer.length);
assertThat(bytesRead).isEqualTo(expectedBytes.length > 0 ? expectedBytes.length : -1);

// read after input is done returns -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.decoder.json;

import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
import io.airlift.json.ObjectMapperProvider;
import io.trino.decoder.DecoderColumnHandle;
import io.trino.decoder.DecoderTestColumnHandle;
Expand Down Expand Up @@ -63,7 +62,7 @@ public class TestJsonDecoder
public void testSimple()
throws Exception
{
byte[] json = ByteStreams.toByteArray(TestJsonDecoder.class.getResourceAsStream("/decoder/json/message.json"));
byte[] json = TestJsonDecoder.class.getResourceAsStream("/decoder/json/message.json").readAllBytes();

DecoderTestColumnHandle column1 = new DecoderTestColumnHandle(0, "column1", createVarcharType(100), "source", null, null, false, false, false);
DecoderTestColumnHandle column2 = new DecoderTestColumnHandle(1, "column2", createVarcharType(10), "user/screen_name", null, null, false, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -1406,7 +1405,7 @@ private <T> Optional<T> readFile(String type, Location file, JsonCodec<T> codec)
{
try {
try (InputStream inputStream = fileSystem.newInputFile(file).newStream()) {
byte[] json = ByteStreams.toByteArray(inputStream);
byte[] json = inputStream.readAllBytes();
return Optional.of(codec.fromJson(json));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.io.ByteStreams.toByteArray;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.airlift.units.Duration.nanosSince;
Expand Down Expand Up @@ -204,7 +203,7 @@ private static KafkaTopicDescription createTable(SchemaTableName table, JsonCode
throws IOException
{
String fileName = format("/%s/%s.json", table.getSchemaName(), table.getTableName());
KafkaTopicDescription tableTemplate = topicDescriptionJsonCodec.fromJson(toByteArray(KafkaQueryRunner.class.getResourceAsStream(fileName)));
KafkaTopicDescription tableTemplate = topicDescriptionJsonCodec.fromJson(KafkaQueryRunner.class.getResourceAsStream(fileName).readAllBytes());

Optional<KafkaTopicFieldGroup> key = tableTemplate.key()
.map(keyTemplate -> new KafkaTopicFieldGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.kafka.util;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import io.airlift.json.JsonCodec;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.KafkaTopicFieldDescription;
Expand All @@ -37,7 +36,7 @@ private TestUtils() {}
public static Map.Entry<SchemaTableName, KafkaTopicDescription> loadTpchTopicDescription(JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec, String topicName, SchemaTableName schemaTableName)
throws IOException
{
KafkaTopicDescription tpchTemplate = topicDescriptionJsonCodec.fromJson(ByteStreams.toByteArray(TestUtils.class.getResourceAsStream(format("/tpch/%s.json", schemaTableName.getTableName()))));
KafkaTopicDescription tpchTemplate = topicDescriptionJsonCodec.fromJson(TestUtils.class.getResourceAsStream(format("/tpch/%s.json", schemaTableName.getTableName())).readAllBytes());

return new AbstractMap.SimpleImmutableEntry<>(
schemaTableName,
Expand Down
Loading

0 comments on commit 318ce93

Please sign in to comment.