Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Query::fetch_bytes #182

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ required-features = ["time", "uuid", "chrono"]
name = "data_types_variant"
required-features = ["time"]

[[example]]
name = "stream_into_file"
required-features = ["futures03"]

[[example]]
name = "stream_arbitrary_format_rows"
required-features = ["futures03"]

[profile.release]
debug = true

Expand All @@ -70,6 +78,8 @@ uuid = ["dep:uuid"]
time = ["dep:time"]
lz4 = ["dep:lz4_flex", "dep:cityhash-rs"]
chrono = ["dep:chrono"]
futures03 = []

## TLS
native-tls = ["dep:hyper-tls"]
# ext: native-tls-alpn
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ If something is missing, or you found a mistake in one of these examples, please
- [custom_http_headers.rs](custom_http_headers.rs) - setting additional HTTP headers to the client, or overriding the generated ones
- [query_id.rs](query_id.rs) - setting a specific `query_id` on the query level
- [session_id.rs](session_id.rs) - using the client in the session context with temporary tables
- [stream_into_file.rs](stream_into_file.rs) - streaming the query result as raw bytes into a file in an arbitrary format. Required cargo features: `futures03`.
- [stream_arbitrary_format_rows.rs](stream_arbitrary_format_rows.rs) - streaming the query result in an arbitrary format, row by row. Required cargo features: `futures03`.

## How to run

Expand Down
38 changes: 38 additions & 0 deletions examples/stream_arbitrary_format_rows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use futures::{AsyncBufReadExt, StreamExt};

use clickhouse::Client;

/// An example of streaming raw data row-by-row in an arbitrary format
/// leveraging the [`AsyncBufReadExt`] and [`StreamExt`] helpers.
/// In this case, the format is JSONEachRow.
/// Similarly, it can be used with other formats such as CSV, TSV,
/// and others that produce each row on a new line;
/// the only difference will be in how the data is parsed.
/// See also: https://clickhouse.com/docs/en/interfaces/formats

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let client = Client::default().with_url("http://localhost:8123");
let mut lines_cursor = client
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 100
",
)
.fetch_bytes("JSONEachRow")?
.lines();

while let Some(data) = lines_cursor.next().await {
match data {
Ok(line_bytes) => {
let value = serde_json::json!(&line_bytes);
println!("JSONEachRow value: {}", value);
}
Err(err) => return Err(clickhouse::error::Error::Custom(err.to_string())),
}
}

Ok(())
}
37 changes: 37 additions & 0 deletions examples/stream_into_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use clickhouse::Client;
use futures::StreamExt;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

/// An example of streaming the result of a query in an arbitrary format into a file
/// leveraging the [`StreamExt`] helper. In this case, `CSVWithNamesAndTypes` format is used.
/// See also: https://clickhouse.com/docs/en/interfaces/formats

#[tokio::main]
async fn main() -> clickhouse::error::Result<()> {
let client = Client::default().with_url("http://localhost:8123");
let filename = "output.csv";
let mut bytes_cursor = client
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10000
",
)
.fetch_bytes("CSVWithNamesAndTypes")?;

let mut file = File::create(filename).await.unwrap();
while let Some(data) = bytes_cursor.next().await {
match data {
Ok(bytes) => {
file.write_all(&bytes).await.unwrap();
println!("Bytes written: {}", bytes.len());
}
Err(err) => return Err(err),
}
}

println!("Raw data is written to {filename}");
Ok(())
}
152 changes: 131 additions & 21 deletions src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use std::marker::PhantomData;

use bytes::Bytes;
use futures::TryStreamExt;
use serde::Deserialize;

use crate::{
bytes_ext::BytesExt,
error::{Error, Result},
response::{Chunks, Response, ResponseFuture},
rowbinary,
};
use bytes::Bytes;
use futures::TryStreamExt;
use serde::Deserialize;
use std::marker::PhantomData;

#[cfg(feature = "futures03")]
const INITIAL_BYTES_CURSOR_BUFFER_SIZE: usize = 1024;

// === RawCursor ===

struct RawCursor(RawCursorInner);
struct RawCursor(RawCursorState);

enum RawCursorInner {
enum RawCursorState {
Waiting(ResponseFuture),
Loading(RawCursorLoading),
}
Expand All @@ -27,18 +28,18 @@ struct RawCursorLoading {
}

impl RawCursor {
fn new(response: Response) -> Self {
Self(RawCursorInner::Waiting(response.into_future()))
pub(crate) fn new(response: Response) -> Self {
Self(RawCursorState::Waiting(response.into_future()))
}

async fn next(&mut self) -> Result<Option<Bytes>> {
if matches!(self.0, RawCursorInner::Waiting(_)) {
if matches!(self.0, RawCursorState::Waiting(_)) {
self.resolve().await?;
}

let state = match &mut self.0 {
RawCursorInner::Loading(state) => state,
RawCursorInner::Waiting(_) => unreachable!(),
RawCursorState::Loading(state) => state,
RawCursorState::Waiting(_) => unreachable!(),
};

match state.chunks.try_next().await {
Expand All @@ -53,9 +54,9 @@ impl RawCursor {
}

async fn resolve(&mut self) -> Result<()> {
if let RawCursorInner::Waiting(future) = &mut self.0 {
if let RawCursorState::Waiting(future) = &mut self.0 {
let chunks = future.await;
self.0 = RawCursorInner::Loading(RawCursorLoading {
self.0 = RawCursorState::Loading(RawCursorLoading {
chunks: chunks?,
net_size: 0,
data_size: 0,
Expand All @@ -66,15 +67,124 @@ impl RawCursor {

fn received_bytes(&self) -> u64 {
match &self.0 {
RawCursorInner::Waiting(_) => 0,
RawCursorInner::Loading(state) => state.net_size,
RawCursorState::Waiting(_) => 0,
RawCursorState::Loading(state) => state.net_size,
}
}

fn decoded_bytes(&self) -> u64 {
match &self.0 {
RawCursorInner::Waiting(_) => 0,
RawCursorInner::Loading(state) => state.data_size,
RawCursorState::Waiting(_) => 0,
RawCursorState::Loading(state) => state.data_size,
}
}
}

// === BytesCursor ===

/// Unlike [`RowCursor`] which emits rows deserialized as structures from RowBinary,
/// this cursor emits raw bytes without deserialization.
/// It can be iterated over using [`futures::StreamExt::next`].
/// Additionally, if the requested format emits each row on a newline
/// (e.g. `JSONEachRow`, `CSV`, `TSV`, etc.), the cursor can be converted into a stream of lines
/// using [`futures::AsyncBufReadExt::lines`]`, which will allow for more convenient processing.
#[cfg(feature = "futures03")]
pub struct BytesCursor {
raw: RawCursor,
buffer: Vec<u8>,
filled: usize,
consumed: usize,
}

#[cfg(feature = "futures03")]
impl BytesCursor {
pub(crate) fn new(response: Response) -> Self {
Self {
raw: RawCursor::new(response),
buffer: Vec::with_capacity(INITIAL_BYTES_CURSOR_BUFFER_SIZE),
filled: 0,
consumed: 0,
}
}
}

#[cfg(feature = "futures03")]
impl futures::stream::Stream for BytesCursor {
type Item = Result<Bytes>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use futures::FutureExt;
match self.raw.next().boxed().poll_unpin(cx) {
std::task::Poll::Ready(result) => match result {
Ok(Some(bytes)) => std::task::Poll::Ready(Some(Ok(bytes))),
Ok(None) => std::task::Poll::Ready(None),
Err(err) => std::task::Poll::Ready(Some(Err(err))),
},
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}

#[cfg(feature = "futures03")]
impl futures::AsyncRead for BytesCursor {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
use futures::FutureExt;
let buf = match buf.len() {
0 => return std::task::Poll::Ready(Ok(0)),
len => &mut buf[..len],
};

match self.raw.next().boxed().poll_unpin(cx) {
std::task::Poll::Ready(Ok(Some(bytes))) => {
let len = std::cmp::min(bytes.len(), buf.len());
buf[..len].copy_from_slice(&bytes[..len]);
std::task::Poll::Ready(Ok(len))
}
std::task::Poll::Ready(Ok(None)) => std::task::Poll::Ready(Ok(0)),
std::task::Poll::Ready(Err(err)) => std::task::Poll::Ready(Err(err.into_io())),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}

#[cfg(feature = "futures03")]
impl futures::AsyncBufRead for BytesCursor {
fn poll_fill_buf(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<&[u8]>> {
use futures::FutureExt;
let this = self.get_mut();
if this.filled == this.consumed {
let poll_result = this.raw.next().boxed().poll_unpin(cx);
match poll_result {
std::task::Poll::Ready(Ok(Some(bytes))) => {
this.filled = bytes.len();
this.buffer.extend(bytes);
}
std::task::Poll::Ready(Ok(None)) => return std::task::Poll::Ready(Ok(&[])),
std::task::Poll::Ready(Err(err)) => {
return std::task::Poll::Ready(Err(err.into_io()))
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
}
std::task::Poll::Ready(Ok(&this.buffer[this.consumed..]))
}

fn consume(mut self: std::pin::Pin<&mut Self>, amt: usize) {
self.consumed += amt;
if self.consumed == self.filled {
self.filled = 0;
self.consumed = 0;
self.buffer.clear();
}
}
}
Expand All @@ -88,7 +198,7 @@ fn workaround_51132<'a, T: ?Sized>(ptr: &T) -> &'a T {

// === RowCursor ===

/// A cursor that emits rows.
/// A cursor that emits rows deserialized as structures from RowBinary.
#[must_use]
pub struct RowCursor<T> {
raw: RawCursor,
Expand All @@ -107,7 +217,7 @@ impl<T> RowCursor<T> {

/// Emits the next row.
///
/// An result is unspecified if it's called after `Err` is returned.
/// The result is unspecified if it's called after `Err` is returned.
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
where
T: Deserialize<'b>,
Expand Down
14 changes: 13 additions & 1 deletion src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{

const MAX_QUERY_LEN_TO_USE_GET: usize = 8192;

#[cfg(feature = "futures03")]
pub use crate::cursor::BytesCursor;
pub use crate::cursor::RowCursor;

#[must_use]
Expand Down Expand Up @@ -84,12 +86,22 @@ impl Query {
/// ```
pub fn fetch<T: Row>(mut self) -> Result<RowCursor<T>> {
self.sql.bind_fields::<T>();
self.sql.append(" FORMAT RowBinary");
self.sql.set_output_format("RowBinary");

let response = self.do_execute(true)?;
Ok(RowCursor::new(response))
}

/// Executes the query, returning a [`crate::cursor::BytesCursor`]
/// to obtain results as raw bytes.
/// For available formats, see <https://clickhouse.com/docs/en/interfaces/formats>
#[cfg(feature = "futures03")]
pub fn fetch_bytes(mut self, format: impl Into<String>) -> Result<crate::cursor::BytesCursor> {
self.sql.set_output_format(format);
let response = self.do_execute(true)?;
Ok(crate::cursor::BytesCursor::new(response))
}

/// Executes the query and returns just a single row.
///
/// Note that `T` must be owned.
Expand Down
Loading
Loading