Skip to content

Commit

Permalink
remove code
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jul 16, 2024
1 parent 2c872d0 commit d7f23eb
Showing 1 changed file with 3 additions and 79 deletions.
82 changes: 3 additions & 79 deletions arrow-flight/tests/flight_sql_client_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@ mod common {
pub mod trailers_layer;
}

use std::collections::HashMap;
use std::{pin::Pin, sync::Arc};

use crate::common::fixture::TestFixture;
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::sql::EndTransaction;
use arrow_flight::{
decode::FlightRecordBatchStream,
flight_service_server::{FlightService, FlightServiceServer},
sql::{
server::{FlightSqlService, PeekableFlightDataStream},
ActionBeginTransactionRequest, ActionBeginTransactionResult,
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult,
ActionEndTransactionRequest, Any, CommandPreparedStatementQuery, CommandStatementQuery,
DoPutPreparedStatementResult, ProstMessageExt, SqlInfo,
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandPreparedStatementQuery, CommandStatementQuery, DoPutPreparedStatementResult,
ProstMessageExt, SqlInfo,
},
utils::batches_to_flight_data,
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
Expand All @@ -48,9 +44,7 @@ use assert_cmd::Command;
use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use prost::Message;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status, Streaming};
use uuid::Uuid;

const QUERY: &str = "SELECT * FROM table;";

Expand Down Expand Up @@ -144,7 +138,6 @@ async fn test_do_put_prepared_statement(test_server: FlightSqlServiceImpl) {
pub async fn test_do_put_prepared_statement_stateless() {
test_do_put_prepared_statement(FlightSqlServiceImpl {
stateless_prepared_statements: true,
transactions: Arc::new(Mutex::new(HashMap::new())),
})
.await
}
Expand All @@ -153,57 +146,22 @@ pub async fn test_do_put_prepared_statement_stateless() {
pub async fn test_do_put_prepared_statement_stateful() {
test_do_put_prepared_statement(FlightSqlServiceImpl {
stateless_prepared_statements: false,
transactions: Arc::new(Mutex::new(HashMap::new())),
})
.await
}

#[tokio::test]
pub async fn test_begin_end_transaction() {
let test_server = FlightSqlServiceImpl {
stateless_prepared_statements: true,
transactions: Arc::new(Mutex::new(HashMap::new())),
};
let fixture = TestFixture::new(test_server.service()).await;
let channel = fixture.channel().await;
let mut flight_sql_client = FlightSqlServiceClient::new(channel);

// begin commit
let transaction_id = flight_sql_client.begin_transaction().await.unwrap();
flight_sql_client
.end_transaction(transaction_id, EndTransaction::Commit)
.await
.unwrap();

// begin rollback
let transaction_id = flight_sql_client.begin_transaction().await.unwrap();
flight_sql_client
.end_transaction(transaction_id, EndTransaction::Rollback)
.await
.unwrap();

// unknown transaction id
let transaction_id = "UnknownTransactionId".to_string().into();
assert!(flight_sql_client
.end_transaction(transaction_id, EndTransaction::Commit)
.await
.is_err());
}

#[derive(Clone)]
pub struct FlightSqlServiceImpl {
/// Whether to emulate stateless (true) or stateful (false) behavior for
/// prepared statements. stateful servers will not return an updated
/// handle after executing `DoPut(CommandPreparedStatementQuery)`
stateless_prepared_statements: bool,
transactions: Arc<Mutex<HashMap<String, ()>>>,
}

impl Default for FlightSqlServiceImpl {
fn default() -> Self {
Self {
stateless_prepared_statements: true,
transactions: Arc::new(Mutex::new(HashMap::new())),
}
}
}
Expand Down Expand Up @@ -398,40 +356,6 @@ impl FlightSqlService for FlightSqlServiceImpl {
.map_err(|e| Status::internal(format!("Unable to serialize schema: {e}")))
}

async fn do_action_begin_transaction(
&self,
_query: ActionBeginTransactionRequest,
_request: Request<Action>,
) -> Result<ActionBeginTransactionResult, Status> {
let transaction_id = Uuid::new_v4().to_string();
self.transactions
.lock()
.await
.insert(transaction_id.clone(), ());
Ok(ActionBeginTransactionResult {
transaction_id: transaction_id.as_bytes().to_vec().into(),
})
}

async fn do_action_end_transaction(
&self,
query: ActionEndTransactionRequest,
_request: Request<Action>,
) -> Result<(), Status> {
let transaction_id = String::from_utf8(query.transaction_id.to_vec())
.map_err(|_| Status::invalid_argument("Invalid transaction id"))?;
if self
.transactions
.lock()
.await
.remove(&transaction_id)
.is_none()
{
return Err(Status::invalid_argument("Transaction id not found"));
}
Ok(())
}

async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
}

Expand Down

0 comments on commit d7f23eb

Please sign in to comment.