Skip to content

Commit

Permalink
Merge pull request #35 from lyft/custom-hive-xec
Browse files Browse the repository at this point in the history
Spark Thrift Server Fixes
  • Loading branch information
rohit-menon authored Apr 28, 2021
2 parents 6efc63b + 8b32ffc commit bf60b67
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,7 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
<classifier>${hive.classifier}</classifier>
<version>${hive.version}</version>
<version>2.3.6.39</version>
<scope>${hive.deps.scope}</scope>
<exclusions>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ public class ColumnBasedSet implements RowSet {
public static final Logger LOG = LoggerFactory.getLogger(ColumnBasedSet.class);

public ColumnBasedSet(TableSchema schema) {
descriptors = schema.toTypeDescriptors();
columns = new ArrayList<ColumnBuffer>();
for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
columns.add(new ColumnBuffer(colDesc.getType()));
if (schema == null) {
descriptors = new TypeDescriptor[0];
columns = new ArrayList<ColumnBuffer>();
} else {
descriptors = schema.toTypeDescriptors();
columns = new ArrayList<ColumnBuffer>();
for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) {
columns.add(new ColumnBuffer(colDesc.getType()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
public class HiveCommandOperation extends ExecuteStatementOperation {
private CommandProcessor commandProcessor;
private TableSchema resultSchema = null;
private int readRows = 0;

/**
* For processors other than Hive queries (Driver), they output to session.out (a temp file)
Expand Down Expand Up @@ -157,10 +158,11 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H
}
List<String> rows = readResults((int) maxRows);
RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false);

rowSet.setStartOffset(readRows);
for (String row : rows) {
rowSet.addRow(new String[] {row});
}
readRows += rows.size();
return rowSet;
}

Expand Down Expand Up @@ -211,5 +213,6 @@ private void resetResultReader() {
ServiceUtils.cleanup(LOG, resultReader);
resultReader = null;
}
readRows = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle,
public RowSet getOperationLogRowSet(OperationHandle opHandle,
FetchOrientation orientation, long maxRows)
throws HiveSQLException {
TableSchema tableSchema = new TableSchema(getLogSchema());
RowSet rowSet = RowSetFactory.create(tableSchema,
getOperation(opHandle).getProtocolVersion(), false);

// get the OperationLog object from the operation
OperationLog operationLog = getOperation(opHandle).getOperationLog();
if (operationLog == null) {
Expand All @@ -257,17 +261,14 @@ public RowSet getOperationLogRowSet(OperationHandle opHandle,

// read logs
List<String> logs;
rowSet.setStartOffset(operationLog.getStartPosition(isFetchFirst(orientation)));
try {
logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows);
} catch (SQLException e) {
throw new HiveSQLException(e.getMessage(), e.getCause());
}


// convert logs to RowSet
TableSchema tableSchema = new TableSchema(getLogSchema());
RowSet rowSet = RowSetFactory.create(tableSchema,
getOperation(opHandle).getProtocolVersion(), false);
for (String log : logs) {
rowSet.addRow(new String[] {log});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,11 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H
fetchStarted = true;
driver.setMaxRows((int) maxRows);
if (driver.getResults(convey)) {
return decode(convey, rowSet);
decode(convey, rowSet);
}
long startRowOffset = driver.getStartRowOffset();
rowSet.setStartOffset(startRowOffset);
driver.setStartRowOffset(startRowOffset + rowSet.numRows());
return rowSet;
} catch (IOException e) {
throw new HiveSQLException(e);
Expand Down

0 comments on commit bf60b67

Please sign in to comment.