Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Query.fetch_raw #182

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft

WIP Query.fetch_raw #182

wants to merge 6 commits into from

Conversation

slvrtrn
Copy link
Contributor

@slvrtrn slvrtrn commented Nov 28, 2024

Summary

Adds a method to fetch raw bytes from a query, exposes RawCursor, and adds an additional cursor type (RawCursorNewline) to work with arbitrary formats where a newline character separates rows. See the related examples regarding streaming into a file, as well as streaming separate *EachRow records.

Checklist

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided so that we can include it in CHANGELOG later
  • For significant changes, documentation in README and https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

@slvrtrn slvrtrn mentioned this pull request Nov 28, 2024
3 tasks
Comment on lines 16 to 28
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
FORMAT JSONEachRow
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
.with_option("output_format_json_quote_64bit_integers", "0")
.fetch_raw()?
.newline();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe like this?

Suggested change
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
FORMAT JSONEachRow
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
.with_option("output_format_json_quote_64bit_integers", "0")
.fetch_raw()?
.newline();
.query(
"
SELECT number, hex(randomPrintableASCII(20)) AS hex_str
FROM system.numbers
LIMIT 10
",
)
// By default, ClickHouse quotes (U)Int64 in JSON* family formats;
// disable it to simplify this example.
.with_option("output_format_json_quote_64bit_integers", "0")
.with_format("JSONEachRow")
.fetch_raw()?
.newline();

Copy link
Contributor

@pravic pravic Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because specifying FORMAT xxx as part of the SQL feels not convenient and a bit more error prone.

Ideally, I would prefer with_format(clickhouse::Format::JsonEachRow) if the maintenance burden of mapping all formats to an enum isn't too much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possible after a few tweaks to the SQLBuilder. See this commit.

Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave the raw API as-is. The user will quickly find an invalid FORMAT with the first query.

Same thread #182 (comment)

@slvrtrn
Copy link
Contributor Author

slvrtrn commented Dec 4, 2024

Open questions so far:

  1. Error handling. This test returns an error locally, but the contents are off:
Err(BadResponse("{\"s\":0}\n{\"exception\": \"Code: 159. DB::Exception: Timeout exceeded: elapsed 0.100752804 seconds, maximum: 0.08. (TIMEOUT_EXCEEDED) (version 24.10.1.2812 (official build))\"}"))

Fails on the CI for some reason.

  1. This API probably has the same issue as The fetch API is unsound if used with borrowed types #24
  2. Output formats. Currently, it is parsed from the docs, and every format that is suitable for the output is included. Shall we keep all of them, or only a select subset?

examples/raw_stream_json_each_row.rs Show resolved Hide resolved

// NB: there is no `Row` derive here
#[derive(Debug, Deserialize)]
struct MyRowInJSONEachRowFormat {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest avoiding any use cases where using non-raw API is preferred.

Here we can use https://docs.rs/serde_json/latest/serde_json/enum.Value.html

@@ -13,7 +14,7 @@ use crate::{

// === RawCursor ===

struct RawCursor(RawCursorInner);
pub struct RawCursor(RawCursorInner);
Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs (preparing for warn(missing_docs) lint).

src/format.rs Show resolved Hide resolved
use clickhouse::format::OutputFormat;

#[tokio::test]
async fn fetch_raw_with_error() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about testing a successful path?

@@ -11,7 +11,9 @@ use std::{collections::HashMap, fmt::Display, sync::Arc};
pub use self::{compression::Compression, row::Row};
pub use clickhouse_derive::Row;

pub mod cursor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be implementation details; let's reexport from mod query where this is actually used.

@@ -27,11 +28,18 @@ struct RawCursorLoading {
}

impl RawCursor {
fn new(response: Response) -> Self {
pub(crate) fn new(response: Response) -> Self {
Self(RawCursorInner::Waiting(response.into_future()))
}

Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should also add into_futures03(self) -> RawStream03 (implementing https://docs.rs/futures/latest/futures/prelude/trait.Stream.html) under a new futures03 feature (to support future v1 and AsyncIterator from std when they will released).

This is de facto the standard for streams (with built-in combinators, e.g. concat) and should be used for owned types (like Bytes).

Alternatively, we can directly implement Stream for RawCursor (also under the feature).

@@ -0,0 +1,54 @@
use std::str::from_utf8;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another real use case is to collect everything into one Bytes (or Vec<u8>). It's nice to have such an example, too!

Self(RawCursorInner::Waiting(response.into_future()))
}

async fn next(&mut self) -> Result<Option<Bytes>> {
pub fn newline(self) -> RawCursorNewline {
Copy link
Collaborator

@loyd loyd Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The futures crate already provides the AsyncBufReadExt::lines method.

It seems we can implement the AsyncBufRead (under the futures03 feature) trait for RawCursor covering more use cases, for instance the use of io::copy. This function can be also used in the example of streaming to a file.


let mut file = File::create(filename).await.unwrap();

loop {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #182 (comment) for how to make this example more canonical.

@pravic
Copy link
Contributor

pravic commented Jan 9, 2025

Hey guys, any update on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants