Skip to content

Commit

Permalink
Fix some leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
jduo committed Feb 5, 2024
1 parent a3771f5 commit 13b1af2
Showing 1 changed file with 82 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

final class ObjectMetadataBuilder {

private final FlightSqlClient client;
private final FlightSqlClientWithCallOptions client;
private final VectorSchemaRoot root;
private final VarCharVector adbcCatalogNames;
private final UnionListWriter adbcCatalogDbSchemasWriter;
Expand All @@ -67,7 +67,7 @@ final class ObjectMetadataBuilder {

ObjectMetadataBuilder(
BufferAllocator allocator,
FlightSqlClient client,
FlightSqlClientWithCallOptions client,
final AdbcConnection.GetObjectsDepth depth,
final String catalogPattern,
final String dbSchemaPattern,
Expand Down Expand Up @@ -105,6 +105,9 @@ private void writeVarChar(VarCharWriter writer, String value) {
}
}

/**
* The caller must close the returned root.
*/
VectorSchemaRoot build() throws AdbcException {
// TODO Catalogs and schemas that don't contain tables are being left out
FlightInfo info;
Expand All @@ -127,105 +130,108 @@ VectorSchemaRoot build() throws AdbcException {
int catalogIndex = 0;

for (FlightEndpoint endpoint : info.getEndpoints()) {
FlightStream stream = client.getStream(endpoint.getTicket());
while (stream.next()) {
try (VectorSchemaRoot res = stream.getRoot()) {
VarCharVector catalogVector = (VarCharVector) res.getVector(0);

for (int i = 0; i < res.getRowCount(); i++) {
byte[] catalog = catalogVector.get(i);

if (i == 0 || lastCatalogAdded != catalog) {
if (catalog == null) {
adbcCatalogNames.setNull(catalogIndex);
} else {
adbcCatalogNames.setSafe(catalogIndex, catalog);
}
if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) {
adbcCatalogDbSchemasWriter.writeNull();
} else {
if (catalogIndex != 0) {
adbcCatalogDbSchemasWriter.endList();
}
adbcCatalogDbSchemasWriter.startList();
lastDbSchemaAdded = null;
}
catalogIndex++;
lastCatalogAdded = catalog;
}
try (FlightStream stream = client.getStream(endpoint.getTicket())) {
while (stream.next()) {
try (VectorSchemaRoot res = stream.getRoot()) {
VarCharVector catalogVector = (VarCharVector) res.getVector(0);

if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) {
VarCharVector dbSchemaVector = (VarCharVector) res.getVector(1);
byte[] dbSchema = dbSchemaVector.get(i);
for (int i = 0; i < res.getRowCount(); i++) {
byte[] catalog = catalogVector.get(i);

if (!Arrays.equals(lastDbSchemaAdded, dbSchema)) {
if (i != 0) {
adbcCatalogDbSchemaTablesWriter.endList();
adbcCatalogDbSchemasStructWriter.end();
if (i == 0 || lastCatalogAdded != catalog) {
if (catalog == null) {
adbcCatalogNames.setNull(catalogIndex);
} else {
adbcCatalogNames.setSafe(catalogIndex, catalog);
}
adbcCatalogDbSchemasStructWriter.start();
writeVarChar(
adbcCatalogDbSchemaNameWriter, new String(dbSchema, StandardCharsets.UTF_8));
if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) {
adbcCatalogDbSchemaTablesWriter.writeNull();
if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) {
adbcCatalogDbSchemasWriter.writeNull();
} else {
adbcCatalogDbSchemaTablesWriter.startList();
if (catalogIndex != 0) {
adbcCatalogDbSchemasWriter.endList();
}
adbcCatalogDbSchemasWriter.startList();
lastDbSchemaAdded = null;
}
catalogIndex++;
lastCatalogAdded = catalog;
}

if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) {
VarCharVector dbSchemaVector = (VarCharVector) res.getVector(1);
byte[] dbSchema = dbSchemaVector.get(i);

if (!Arrays.equals(lastDbSchemaAdded, dbSchema)) {
if (i != 0) {
adbcCatalogDbSchemaTablesWriter.endList();
adbcCatalogDbSchemasStructWriter.end();
}
adbcCatalogDbSchemasStructWriter.start();
writeVarChar(
adbcCatalogDbSchemaNameWriter, new String(dbSchema, StandardCharsets.UTF_8));
if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) {
adbcCatalogDbSchemaTablesWriter.writeNull();
} else {
adbcCatalogDbSchemaTablesWriter.startList();
}

lastDbSchemaAdded = dbSchema;
lastDbSchemaAdded = dbSchema;
}
}
}

if (depth != AdbcConnection.GetObjectsDepth.CATALOGS
if (depth != AdbcConnection.GetObjectsDepth.CATALOGS
&& depth != AdbcConnection.GetObjectsDepth.DB_SCHEMAS) {
VarCharVector tableNameVector = (VarCharVector) res.getVector(2);
VarCharVector tableTypeVector = (VarCharVector) res.getVector(3);
VarCharVector tableNameVector = (VarCharVector) res.getVector(2);
VarCharVector tableTypeVector = (VarCharVector) res.getVector(3);

adbcTablesStructWriter.start();
writeVarChar(
adbcTablesStructWriter.start();
writeVarChar(
adbcTableNameWriter, new String(tableNameVector.get(i), StandardCharsets.UTF_8));
writeVarChar(
writeVarChar(
adbcTableTypeWriter, new String(tableTypeVector.get(i), StandardCharsets.UTF_8));

if (depth == AdbcConnection.GetObjectsDepth.ALL) {
VarBinaryVector tableSchemaVector = (VarBinaryVector) res.getVector(4);
Schema schema;
if (depth == AdbcConnection.GetObjectsDepth.ALL) {
VarBinaryVector tableSchemaVector = (VarBinaryVector) res.getVector(4);
Schema schema;

try {
schema =
try {
schema =
MessageSerializer.deserializeSchema(
new ReadChannel(
Channels.newChannel(
new ByteArrayInputStream(tableSchemaVector.get(i)))));
} catch (IOException e) {
throw new RuntimeException(e);
}
new ReadChannel(
Channels.newChannel(
new ByteArrayInputStream(tableSchemaVector.get(i)))));
} catch (IOException e) {
throw new RuntimeException(e);
}

adbcTableColumnsWriter.startList();
adbcTableColumnsWriter.startList();

for (int y = 0; y < schema.getFields().size(); y++) {
Field field = schema.getFields().get(y);
if (precompiledColumnNamePattern == null || precompiledColumnNamePattern.matcher(field.getName()).matches()) {
adbcTableColumnsWriter.struct().start();
writeVarChar(
for (int y = 0; y < schema.getFields().size(); y++) {
Field field = schema.getFields().get(y);
if (precompiledColumnNamePattern == null || precompiledColumnNamePattern.matcher(field.getName()).matches()) {
adbcTableColumnsWriter.struct().start();
writeVarChar(
adbcTableColumnsWriter.struct().varChar("column_name"), field.getName());
adbcTableColumnsWriter.struct().integer("ordinal_position").writeInt(y + 1);
adbcTableColumnsWriter.struct().end();
adbcTableColumnsWriter.struct().integer("ordinal_position").writeInt(y + 1);
adbcTableColumnsWriter.struct().end();
}
}
adbcTableColumnsWriter.endList();
}
adbcTableColumnsWriter.endList();
}

adbcTablesStructWriter.end();
adbcTablesStructWriter.end();
}
}
}

if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) {
adbcCatalogDbSchemaTablesWriter.endList();
adbcCatalogDbSchemasStructWriter.end();
adbcCatalogDbSchemasWriter.endList();
if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) {
adbcCatalogDbSchemaTablesWriter.endList();
adbcCatalogDbSchemasStructWriter.end();
adbcCatalogDbSchemasWriter.endList();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down

0 comments on commit 13b1af2

Please sign in to comment.