Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
rmarrowstone authored Jan 15, 2025
2 parents 03d3b07 + 318ce93 commit f069d41
Show file tree
Hide file tree
Showing 31 changed files with 152 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.execution.buffer;

import com.google.common.collect.AbstractIterator;
import com.google.common.io.ByteStreams;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
Expand All @@ -33,7 +32,7 @@
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.io.ByteStreams.readFully;
import static com.google.common.base.Verify.verify;
import static io.trino.block.BlockSerdeUtil.readBlock;
import static io.trino.block.BlockSerdeUtil.writeBlock;
import static io.trino.execution.buffer.PageCodecMarker.COMPRESSED;
Expand Down Expand Up @@ -158,7 +157,7 @@ private static class PageReader
protected Page computeNext()
{
try {
int read = ByteStreams.read(inputStream, headerBuffer, 0, headerBuffer.length);
int read = inputStream.readNBytes(headerBuffer, 0, headerBuffer.length);
if (read <= 0) {
return endOfData();
}
Expand Down Expand Up @@ -195,7 +194,7 @@ private static class SerializedPageReader
protected Slice computeNext()
{
try {
int read = ByteStreams.read(inputStream, headerBuffer, 0, headerBuffer.length);
int read = inputStream.readNBytes(headerBuffer, 0, headerBuffer.length);
if (read <= 0) {
return endOfData();
}
Expand All @@ -219,7 +218,8 @@ public static Slice readSerializedPage(Slice headerSlice, InputStream inputStrea
int compressedSize = headerSlice.getIntUnchecked(SERIALIZED_PAGE_COMPRESSED_SIZE_OFFSET);
byte[] outputBuffer = new byte[SERIALIZED_PAGE_HEADER_SIZE + compressedSize];
headerSlice.getBytes(0, outputBuffer, 0, SERIALIZED_PAGE_HEADER_SIZE);
readFully(inputStream, outputBuffer, SERIALIZED_PAGE_HEADER_SIZE, compressedSize);
int bytes = inputStream.readNBytes(outputBuffer, SERIALIZED_PAGE_HEADER_SIZE, compressedSize);
verify(bytes == compressedSize, "expected to read %s bytes, but read %s", compressedSize, bytes);
return Slices.wrappedBuffer(outputBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.net.URI;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
Expand All @@ -53,6 +54,7 @@ public class TrimmedBasicQueryInfo
private final Optional<ErrorCode> errorCode;
private final Optional<QueryType> queryType;
private final RetryPolicy retryPolicy;
private final Optional<Set<String>> clientTags;

public TrimmedBasicQueryInfo(BasicQueryInfo queryInfo)
{
Expand All @@ -79,6 +81,7 @@ public TrimmedBasicQueryInfo(BasicQueryInfo queryInfo)
this.queryStats = requireNonNull(queryInfo.getQueryStats(), "queryStats is null");
this.queryType = requireNonNull(queryInfo.getQueryType(), "queryType is null");
this.retryPolicy = requireNonNull(queryInfo.getRetryPolicy(), "retryPolicy is null");
this.clientTags = Optional.ofNullable(queryInfo.getSession().getClientTags());
}

@JsonProperty
Expand Down Expand Up @@ -183,6 +186,12 @@ public RetryPolicy getRetryPolicy()
return retryPolicy;
}

@JsonProperty
public Optional<Set<String>> getClientTags()
{
return clientTags;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Optional;
import java.util.Set;

import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.Request.Builder.prepareGet;
Expand Down Expand Up @@ -226,7 +225,7 @@ public byte[] handle(Request request, io.airlift.http.client.Response response)
if (!APPLICATION_JSON.equals(response.getHeader(CONTENT_TYPE))) {
throw new RuntimeException("Response received was not of type " + APPLICATION_JSON);
}
return toByteArray(response.getInputStream());
return response.getInputStream().readAllBytes();
}
catch (IOException e) {
throw new RuntimeException("Unable to read response from worker", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.sql.gen;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import io.airlift.bytecode.DynamicClassLoader;

import java.io.IOException;
Expand Down Expand Up @@ -60,7 +59,7 @@ private static byte[] getBytecode(Class<?> clazz)
{
try (InputStream stream = clazz.getClassLoader().getResourceAsStream(clazz.getName().replace('.', '/') + ".class")) {
checkArgument(stream != null, "Could not obtain byte code for class %s", clazz.getName());
return ByteStreams.toByteArray(stream);
return stream.readAllBytes();
}
catch (IOException e) {
throw new RuntimeException(format("Could not obtain byte code for class %s", clazz.getName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.uniqueIndex;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
Expand Down Expand Up @@ -887,7 +886,7 @@ public synchronized Response handle(Request request)
checkState(response.getStatusCode() == HttpStatus.OK.code(), "Unexpected status code: %s", response.getStatusCode());
ListMultimap<String, String> headers = response.getHeaders().entries().stream()
.collect(toImmutableListMultimap(entry -> entry.getKey().toString(), Map.Entry::getValue));
byte[] bytes = toByteArray(response.getInputStream());
byte[] bytes = response.getInputStream().readAllBytes();
checkState(bytes.length > 42, "too short");
savedResponse = new TestingResponse(HttpStatus.OK, headers, bytes.clone());
// corrupt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ export class QueryList extends React.Component {
) {
return true
}

if (
query.clientTags &&
query.clientTags.some((clientTag) => clientTag.toLowerCase().indexOf(term) !== -1)
) {
return true
}
}, this)
}
}
Expand Down Expand Up @@ -810,7 +817,7 @@ export class QueryList extends React.Component {
<input
type="text"
className="form-control form-control-small search-bar"
placeholder="User, source, query ID, query state, resource group, error name, or query text"
placeholder="User, source, query ID, query state, resource group, error name, query text or client tags"
onChange={this.handleSearchStringChange}
value={this.state.searchString}
/>
Expand Down
11 changes: 6 additions & 5 deletions docs/src/main/sphinx/connector/phoenix.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ to load custom Phoenix client connection properties.

The following Phoenix-specific configuration properties are available:

| Property name | Required | Description |
| ----------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `phoenix.connection-url` | Yes | `jdbc:phoenix[:zk_quorum][:zk_port][:zk_hbase_path]`. The `zk_quorum` is a comma separated list of ZooKeeper servers. The `zk_port` is the ZooKeeper port. The `zk_hbase_path` is the HBase root znode path, that is configurable using `hbase-site.xml`. By default the location is `/hbase` |
| `phoenix.config.resources` | No | Comma-separated list of configuration files (e.g. `hbase-site.xml`) to use for connection properties. These files must exist on the machines running Trino. |
| `phoenix.max-scans-per-split` | No | Maximum number of HBase scans that will be performed in a single split. Default is 20. Lower values will lead to more splits in Trino. Can also be set via session propery `max_scans_per_split`. For details see: [https://phoenix.apache.org/update_statistics.html](https://phoenix.apache.org/update_statistics.html). (This setting has no effect when guideposts are disabled in Phoenix.) |
| Property name | Required | Description |
|------------------------------------| -------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `phoenix.connection-url` | Yes | `jdbc:phoenix[:zk_quorum][:zk_port][:zk_hbase_path]`. The `zk_quorum` is a comma separated list of ZooKeeper servers. The `zk_port` is the ZooKeeper port. The `zk_hbase_path` is the HBase root znode path, that is configurable using `hbase-site.xml`. By default the location is `/hbase` |
| `phoenix.config.resources` | No | Comma-separated list of configuration files (e.g. `hbase-site.xml`) to use for connection properties. These files must exist on the machines running Trino. |
| `phoenix.max-scans-per-split` | No | Maximum number of HBase scans that will be performed in a single split. Default is 20. Lower values will lead to more splits in Trino. Can also be set via session propery `max_scans_per_split`. For details see: [https://phoenix.apache.org/update_statistics.html](https://phoenix.apache.org/update_statistics.html). (This setting has no effect when guideposts are disabled in Phoenix.) |
| `phoenix.server-scan-page-timeout` | No | The time limit on the amount of work single RPC request can do before it times out. Type: [](prop-type-duration). |

```{include} jdbc-common-configurations.fragment
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.filesystem.s3;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import io.airlift.log.Logging;
import io.trino.filesystem.AbstractTestTrinoFileSystem;
Expand Down Expand Up @@ -172,7 +171,7 @@ void testFileWithTrailingWhitespaceAgainstNativeClient()
TrinoInputFile inputFile = getFileSystem().newInputFile(fileEntry.location());
assertThat(inputFile.exists()).as("exists").isTrue();
try (TrinoInputStream inputStream = inputFile.newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(contents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -1153,7 +1152,7 @@ public void testFileWithTrailingWhitespace()
TrinoInputFile inputFile = getFileSystem().newInputFile(location);
assertThat(inputFile.exists()).as("exists").isTrue();
try (TrinoInputStream inputStream = inputFile.newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(("test blob content for " + location).getBytes(UTF_8));
}

Expand All @@ -1165,7 +1164,7 @@ public void testFileWithTrailingWhitespace()
// This can break some file system read operations (e.g., TrinoInput.readTail for most filesystems, newStream for caching file systems).
TrinoInputFile newInputFile = getFileSystem().newInputFile(location);
try (TrinoInputStream inputStream = newInputFile.newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(newContents);
}

Expand All @@ -1182,7 +1181,7 @@ public void testFileWithTrailingWhitespace()
assertThat(getFileSystem().newInputFile(target).exists()).as("target exists after rename").isTrue();

try (TrinoInputStream inputStream = getFileSystem().newInputFile(target).newStream()) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
byte[] bytes = inputStream.readAllBytes();
assertThat(bytes).isEqualTo(("test blob content for " + source).getBytes(UTF_8));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.io.ByteStreams.toByteArray;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.hdfs.s3.TrinoS3FileSystem.NO_SUCH_BUCKET_ERROR_CODE;
Expand Down Expand Up @@ -894,7 +893,7 @@ public void testStreamingUpload()
InputStream concatInputStream = parts.stream()
.map(UploadPartRequest::getInputStream)
.reduce(new ByteArrayInputStream(new byte[0]), SequenceInputStream::new);
String data = new String(toByteArray(concatInputStream), US_ASCII);
String data = new String(concatInputStream.readAllBytes(), US_ASCII);
assertThat(data).isEqualTo("a" + "foo".repeat(21) + "bar".repeat(44) + "orange".repeat(22));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.filesystem.Location;
Expand Down Expand Up @@ -452,7 +451,7 @@ public String readActual(TrinoDataInputStream input)
throws IOException
{
byte[] bytes = new byte[valueSize() + 10];
ByteStreams.readFully(input, bytes, 5, valueSize());
input.readFully(bytes, 5, valueSize());
return new String(bytes, 5, valueSize(), UTF_8);
}
});
Expand Down Expand Up @@ -649,7 +648,7 @@ private static void testReadOffEnd(DataInputTester tester, byte[] bytes)
throws IOException
{
TrinoDataInputStream input = createTrinoDataInputStream(bytes);
ByteStreams.skipFully(input, bytes.length - tester.valueSize() + 1);
input.skipNBytes(bytes.length - tester.valueSize() + 1);
tester.verifyReadOffEnd(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.hive.formats.line.LineSerializer;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DateType;
Expand Down Expand Up @@ -67,6 +68,7 @@
import static io.trino.hive.formats.FormatTestUtils.toSingleRowPage;
import static io.trino.hive.formats.FormatTestUtils.toSqlTimestamp;
import static io.trino.hive.formats.HiveFormatUtils.TIMESTAMP_FORMATS_KEY;
import static io.trino.hive.formats.HiveFormatsErrorCode.HIVE_UNSERIALIZABLE_JSON_VALUE;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.CharType.createCharType;
Expand Down Expand Up @@ -595,9 +597,12 @@ public void testReal()
assertValue(REAL, "1.5645e33", 1.5645e33f);

assertValueFails(REAL, "NaN", false);
assertUnserializableJsonValue(REAL, Float.NaN);
assertValueFails(REAL, "Infinity", false);
assertValueFails(REAL, "+Infinity", false);
assertUnserializableJsonValue(REAL, Float.POSITIVE_INFINITY);
assertValueFails(REAL, "-Infinity", false);
assertUnserializableJsonValue(REAL, Float.NEGATIVE_INFINITY);
assertValueFails(REAL, "+Inf");
assertValueFails(REAL, "-Inf");
// Map keys support NaN, infinity and negative infinity
Expand Down Expand Up @@ -632,9 +637,12 @@ public void testDouble()
assertValue(DOUBLE, "1.5645e33", 1.5645e33);

assertValueFails(DOUBLE, "NaN", false);
assertUnserializableJsonValue(DOUBLE, Double.NaN);
assertValueFails(DOUBLE, "Infinity", false);
assertValueFails(DOUBLE, "+Infinity", false);
assertUnserializableJsonValue(DOUBLE, Double.POSITIVE_INFINITY);
assertValueFails(DOUBLE, "-Infinity", false);
assertUnserializableJsonValue(DOUBLE, Double.NEGATIVE_INFINITY);
assertValueFails(DOUBLE, "+Inf");
assertValueFails(DOUBLE, "-Inf");
// Map keys support NaN, infinity and negative infinity
Expand All @@ -653,6 +661,19 @@ public void testDouble()
assertValueFails(DOUBLE, "{ \"x\" : 42 }", false);
}

private static void assertUnserializableJsonValue(Type type, Object value)
{
List<Column> columns = ImmutableList.of(new Column("test", type, 33));
Page page = toSingleRowPage(columns, singletonList(value));

// write the data to json
LineSerializer serializer = new JsonSerializerFactory().create(columns, ImmutableMap.of());
SliceOutput sliceOutput = new DynamicSliceOutput(64);
assertThatThrownBy(() -> serializer.write(page, 0, sliceOutput))
.isInstanceOf(TrinoException.class)
.matches(e -> ((TrinoException) e).getErrorCode() == HIVE_UNSERIALIZABLE_JSON_VALUE.toErrorCode());
}

@Test
public void testDate()
throws Exception
Expand Down
Loading

0 comments on commit f069d41

Please sign in to comment.