Skip to content

Commit

Permalink
doc: update ballista client front page
Browse files Browse the repository at this point in the history
... making it look more like root `README.md`
  • Loading branch information
milenkovicm committed Jan 26, 2025
1 parent e9e8f9a commit 35dcdd0
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 55 deletions.
25 changes: 16 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,25 @@ use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create DataFusion SessionContext with ballista standalone cluster started
let ctx = SessionContext::standalone();
// create SessionContext with ballista support
// standalone context will start all required
// ballista infrastructure in the background as well
let ctx = SessionContext::standalone().await?;

// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// everything else remains the same

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new())
.await?;

// execute and print results
df.show().await?;
Ok(())
// create a plan to run a SQL query
let df = ctx
.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100")
.await?;

// execute and print results
df.show().await?;
Ok(())
}
```

Expand Down
130 changes: 84 additions & 46 deletions ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,69 @@

# Ballista: Distributed Scheduler for Apache Arrow DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and
DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and
Java) to be supported as first-class citizens without paying a penalty for serialization costs.
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:
![logo](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/ballista-logo.png?raw=true)

- [Apache Arrow](https://arrow.apache.org/) memory model and compute kernels for efficient processing of data.
- [Apache Arrow Flight Protocol](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/) for efficient
data transfer between processes.
- [Google Protocol Buffers](https://developers.google.com/protocol-buffers) for serializing query plans.
- [Docker](https://www.docker.com/) for packaging up executors along with user-defined code.
Ballista is a distributed query execution engine that enhances [Apache DataFusion](https://github.com/apache/datafusion) by enabling the parallelized execution of workloads across multiple nodes in a distributed environment.

Ballista can be deployed as a standalone cluster and also supports [Kubernetes](https://kubernetes.io/). In either
case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide
redundancy in the case of a scheduler failing.
Existing DataFusion application:

## Rust Version Compatibility
```rust,no_run
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// datafusion context
let ctx = SessionContext::new();
This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute and print results
df.show().await?;
Ok(())
}
```

can be distributed with few lines changed:

```rust,no_run
use ballista::prelude::*;
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create SessionContext with ballista support
// standalone context will start all required
// ballista infrastructure in the background as well
let ctx = SessionContext::standalone().await?;
// everything else remains the same
// register the table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new())
.await?;
// create a plan to run a SQL query
let df = ctx
.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100")
.await?;
// execute and print results
df.show().await?;
Ok(())
}
```

## Starting a cluster

There are numerous ways to start a Ballista cluster, including support for Docker and
Kubernetes. For full documentation, refer to the deployment section of the
[Ballista User Guide](https://datafusion.apache.org/ballista/user-guide/deployment/)
![architecture](https://github.com/apache/datafusion-ballista/blob/main/docs/source/contributors-guide/ballista_architecture.excalidraw.svg?raw=true)

A simple way to start a local cluster for testing purposes is to use cargo to install
the scheduler and executor crates.
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.

```bash
cargo install --locked ballista-scheduler
Expand All @@ -61,35 +96,27 @@ RUST_LOG=info ballista-scheduler

The scheduler will bind to port `50050` by default.

Next, start an executor processes in a new terminal session with the specified concurrency
level.
Next, start an executor processes in a new terminal session with the specified concurrency level.

```bash
RUST_LOG=info ballista-executor -c 4
```

The executor will bind to port `50051` by default. Additional executors can be started by
manually specifying a bind port. For example:
The executor will bind to port `50051` by default. Additional executors can be started by manually specifying a bind port.

```bash
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
```
For full documentation, refer to the deployment section of the
[Ballista User Guide](https://datafusion.apache.org/ballista/user-guide/deployment/)

## Executing a query
## Executing a Query

Ballista provides a `BallistaContext` as a starting point for creating queries. DataFrames can be created
by invoking the `read_csv`, `read_parquet`, and `sql` methods.
Ballista provides a custom `SessionContext` as a starting point for creating queries. DataFrames can be created by invoking the `read_csv`, `read_parquet`, and `sql` methods.

To build a simple ballista example, run the following command to add the dependencies to your `Cargo.toml` file:

```bash
cargo add ballista datafusion tokio
```

The following example runs a simple aggregate SQL query against a Parquet file (`yellow_tripdata_2022-01.parquet`) from the
[New York Taxi and Limousine Commission](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
data set. Download the file and add it to the `testdata` folder before running the example.

```rust,no_run
use ballista::prelude::*;
use datafusion::common::Result;
Expand All @@ -99,7 +126,6 @@ use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, aver
#[tokio::main]
async fn main() -> Result<()> {
// connect to Ballista scheduler
let ctx = SessionContext::remote("df://localhost:50050").await?;
Expand All @@ -121,13 +147,6 @@ async fn main() -> Result<()> {
)?
.sort(vec![col("passenger_count").sort(true, true)])?;
// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
// FROM tripdata
// GROUP BY passenger_count
// ORDER BY passenger_count
// print the results
df.show().await?;
Ok(())
Expand All @@ -146,12 +165,31 @@ The output should look similar to the following table.
| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 |
| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 |
| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 |
| 5 | -52 | 668 | 12.160378472086954 | 624289.51 |
| 6 | -52 | 252.5 | 12.576583325529857 | 402916 |
| 7 | 7 | 79 | 61.77777777777778 | 556 |
| 8 | 8.3 | 115 | 79.9125 | 639.3 |
| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
```

More [examples](../../examples/examples/) can be found in the arrow-ballista repository.

## Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations.

These are benchmarks derived from TPC-H and not official TPC-H benchmarks. These results are from running individual queries at scale factor 100 (100 GB) on a single node with a single executor and 8 concurrent tasks.

### Overall Speedup

The overall speedup is 2.9x

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_allqueries.png?raw=true)

### Per Query Comparison

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_queries_compare.png?raw=true)

### Relative Speedup

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_queries_speedup_rel.png?raw=true)

### Absolute Speedup

![benchmarks](https://github.com/apache/datafusion-ballista/blob/main/docs/source/_static/images/tpch_queries_speedup_abs.png?raw=true)

0 comments on commit 35dcdd0

Please sign in to comment.