Skip to content

Commit

Permalink
Merge pull request #517 from splitgraph/improve-logging
Browse files Browse the repository at this point in the history
Improve Arrow Flight SQL interface logging
  • Loading branch information
gruuya authored Apr 22, 2024
2 parents b718403 + 0ed634c commit 84f80be
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ services:

minio:
image: minio/minio:latest
depends_on:
- dex
ports:
- 9000:9000
- 9001:9001
Expand Down
23 changes: 14 additions & 9 deletions src/frontend/flight/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::metadata::MetadataMap;
use tonic::Status;
use tracing::{error, info};

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand Down Expand Up @@ -61,8 +62,14 @@ impl SeafowlFlightHandler {
self.context.clone()
};

let plan = ctx.plan_query(query).await?;
let batch_stream = ctx.execute_stream(plan).await?;
let plan = ctx
.plan_query(query)
.await
.inspect_err(|err| info!("Error planning query id {query_id}: {err}"))?;
let batch_stream = ctx
.execute_stream(plan)
.await
.inspect_err(|err| info!("Error executing query id {query_id}: {err}"))?;
let schema = batch_stream.schema();

self.results.insert(query_id, Mutex::new(batch_stream));
Expand All @@ -73,14 +80,12 @@ impl SeafowlFlightHandler {
// Get a specific stream from the map
pub async fn fetch_stream(
&self,
query_id: String,
query_id: &str,
) -> core::result::Result<SendableRecordBatchStream, Status> {
let (_, batch_stream_mutex) =
self.results
.remove(&query_id)
.ok_or(Status::not_found(format!(
"No results found for query id {query_id}"
)))?;
let (_, batch_stream_mutex) = self.results.remove(query_id).ok_or_else(|| {
error!("No results found for query id {query_id}");
Status::not_found(format!("No results found for query id {query_id}"))
})?;

Ok(batch_stream_mutex.into_inner())
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/flight/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where
Ok(ref r) => {
// Tonic converts any `Err`s into veritable `http::Response`s,
// by simply dumping the `Status` info into some headers inside
// of `Grpc::map_response`.
// `Grpc::map_response`.
if let Some(header_value) = r.headers().get("grpc-status") {
Code::from_bytes(header_value.as_bytes())
} else {
Expand Down
25 changes: 21 additions & 4 deletions src/frontend/flight/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use prost::Message;
use std::pin::Pin;
use tonic::metadata::MetadataValue;
use tonic::{Request, Response, Status, Streaming};
use tracing::debug;
use uuid::Uuid;

#[async_trait]
Expand All @@ -28,11 +29,12 @@ impl FlightSqlService for SeafowlFlightHandler {
// Perform authentication; for now just pass-through everything
async fn do_handshake(
&self,
_request: Request<Streaming<HandshakeRequest>>,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<
Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
Status,
> {
debug!("Handshake request: {:?}", request.metadata());
let result = HandshakeResponse {
protocol_version: 0,
payload: vec![].into(),
Expand All @@ -56,6 +58,10 @@ impl FlightSqlService for SeafowlFlightHandler {
query: CommandGetSqlInfo,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
debug!(
"Flight SQL server metadata request: {:?}",
request.metadata()
);
let flight_descriptor = request.into_inner();
let ticket = Ticket::new(query.encode_to_vec());
let endpoint = FlightEndpoint::new().with_ticket(ticket);
Expand All @@ -80,13 +86,18 @@ impl FlightSqlService for SeafowlFlightHandler {
// TODO: Should we use something else here (and keep that in the results map)?
let query_id = Uuid::new_v4().to_string();

debug!(
"Executing query with id {query_id} for request {:?}:\n {}",
request.metadata(),
query.query,
);
let schema = self
.query_to_stream(&query.query, query_id.clone(), request.metadata())
.await
.map_err(|e| Status::internal(e.to_string()))?;

let ticket = TicketStatementQuery {
statement_handle: query_id.into(),
statement_handle: query_id.clone().into(),
};

let endpoint = FlightEndpoint::new()
Expand All @@ -99,18 +110,23 @@ impl FlightSqlService for SeafowlFlightHandler {
.with_descriptor(request.into_inner());

let resp = Response::new(flight_info);
debug!("Results for query id {query_id} ready for streaming");
Ok(resp)
}

// Fetch the result batch stream, convert to flight stream and return.
async fn do_get_statement(
&self,
ticket: TicketStatementQuery,
_request: Request<Ticket>,
request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
let query_id =
String::from_utf8_lossy(ticket.statement_handle.as_ref()).to_string();
let batch_stream = self.fetch_stream(query_id).await?;
debug!(
"Fetching stream for query id {query_id}, request: {:?}",
request.metadata()
);
let batch_stream = self.fetch_stream(&query_id).await?;
let schema = batch_stream.schema();

// The Flight encoder below expects a stream where the error type on the item is a
Expand All @@ -124,6 +140,7 @@ impl FlightSqlService for SeafowlFlightHandler {
.build(mapped_stream)
.map_err(Status::from);

debug!("Returning stream for query id {query_id}");
Ok(Response::new(Box::pin(stream)))
}

Expand Down

0 comments on commit 84f80be

Please sign in to comment.