Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #9 from cmu-db/sort-operator
Browse files Browse the repository at this point in the history
Sort operator
  • Loading branch information
SarveshOO7 authored Mar 29, 2024
2 parents 373e7b0 + b73f041 commit 58624ea
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 253 deletions.
23 changes: 13 additions & 10 deletions eggstrain/src/execution/operators/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ impl Sort {
}
}

fn sort_in_mem(&self, rb: RecordBatch) -> Result<RecordBatch> {
assert_eq!(rb.schema(), self.input_schema);
fn sort_in_mem(
rb: RecordBatch,
limit_size: Option<usize>,
sort_expr: Vec<PhysicalSortExpr>,
) -> Result<RecordBatch> {
// assert_eq!(rb.schema(), self.input_schema);

let expressions = self.sort_expr.clone();
let expressions = sort_expr.clone();

let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(&rb))
.collect::<Result<Vec<_>>>()?;

let indices = lexsort_to_indices(&sort_columns, self.limit_size)?;
let indices = lexsort_to_indices(&sort_columns, limit_size)?;

let columns = rb
.columns()
Expand Down Expand Up @@ -90,9 +94,11 @@ impl UnaryOperator for Sort {
}

let merged_batch = concat_batches(&self.input_schema, &batches);
match merged_batch {
let limit_size = self.limit_size;
let sort_expr = self.sort_expr.clone();
rayon::spawn(move || match merged_batch {
Ok(merged_batch) => {
let sorted_batch = self.sort_in_mem(merged_batch).unwrap();
let sorted_batch = Sort::sort_in_mem(merged_batch, limit_size, sort_expr).unwrap();
let mut current = 0;
let total_rows = sorted_batch.num_rows();
while current + BATCH_SIZE < total_rows {
Expand All @@ -104,11 +110,8 @@ impl UnaryOperator for Sort {
let batch_to_send = sorted_batch.slice(current, total_rows - current);
tx.send(batch_to_send)
.expect("Unable to send the last sorted batch");

// TODO: do I have to call drop here manually or will rust take care of it?
// drop(sorted_batch);
}
Err(_) => todo!("Could not concat the batches for sorting"),
}
});
}
}
1 change: 1 addition & 0 deletions eggstrain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async fn main() -> Result<()> {

// Run our execution engine on the physical plan
let df_physical_plan = sql.clone().create_physical_plan().await?;
let df_physical_plan = df_physical_plan.children()[0].clone();
let results = run(df_physical_plan).await;

results.into_iter().for_each(|batch| {
Expand Down
4 changes: 4 additions & 0 deletions proposal/images/hashjoin.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
173 changes: 63 additions & 110 deletions proposal/presentation.html

Large diffs are not rendered by default.

187 changes: 54 additions & 133 deletions proposal/presentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,218 +5,139 @@ class: invert # Remove this line for light mode
paginate: true
---

# Execution Engine: Eggstrain
# Eggstrain

<br>

## **Authors: Connor, Kyle, Sarvesh**

Vectorized Push-Based inspired Execution Engine


---


# Overview

We will be taking heavy inspiration from:
* [DataFusion](https://arrow.apache.org/datafusion/)
* [Velox](https://velox-lib.io/)
* [InfluxDB](https://github.com/influxdata/influxdb)
* which is built on top of DataFusion

- [DataFusion](https://arrow.apache.org/datafusion/)
- [Velox](https://velox-lib.io/)
- [InfluxDB](https://github.com/influxdata/influxdb)
- which is built on top of DataFusion

---


# Our Design Goals

![bg right:50% 120%](./images/robustness.png)

* Robustness
* Modularity
* Extensibility
* Forward Compatibility

- Robustness
- Modularity
- Extensibility
- Forward Compatibility

---


# Features

* Encode behavior in the type system
* Provide bare minimum statistics the optimizer needs
* Timing
* Cardinality

- Encode behavior in the type system
- Provide bare minimum statistics the optimizer needs
- Timing
- Cardinality

---


# List of rust crates we plan to use

* `arrow`: for handling the Apache Arrow format
* `tokio`: high performance `async` runtime
* `rayon`: data parallelism crate
* `anyhow`: ergonomic `Error` handling

- `arrow`: for handling the Apache Arrow format
- `tokio`: high performance `async` runtime
- `rayon`: data parallelism crate
- `datafusion`: for the input of physical plans

---


# Design Rationale

Push vs Pull Based

| Push | Pull |
| --- | --- |
| Improves cache efficiency by removing control flow logic | Easier to implement |
| Forking is efficient: You push a thing only once | Operators like LIMIT make their producers aware of when to stop running (Headache for the optimizer) |
| Parallelization is easier | Parallelization is harder |

| Push | Pull |
| -------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
| Improves cache efficiency by removing control flow logic | Easier to implement |
| Forking is efficient: You push a thing only once | Operators like LIMIT make their producers aware of when to stop running (Headache for the optimizer) |
| Parallelization is easier | Parallelization is harder |

---


# Step 1: Finalize Interfaces

Finalize API with other teams:

* I/O Service
* Catalog
* Scheduler

- I/O Service
- Catalog
- Scheduler

---

# Step 2: Implement operators in memory

# Step 2: Buffer Pool Manager

![bg right:50% 80%](./images/bufferpool.png)

Need to spill the data to local disk. This will be done after the first operators are implemented.

* Can potentially rip out the [`memory_pool`](https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/index.html)

- TableScan
- Filter (Completed)
- Projection (Completed)
- HashAggregation (In-Progress)
- HashProbe + HashBuild (In-Progress)
- OrderBy (Completed)
- TopN (Completed)
- Exchange
- More may be added as a stretch goal.

---

# Step 3: Buffer Pool Manager

# Step 3: Implement operators
![bg right:50% 80%](./images/bufferpool.png)

* TableScan
* Filter (Completed)
* Projection (Completed)
* HashAggregation (In-Progress)
* HashProbe + HashBuild (In-Progress)
* MergeJoin
* NestedLoopJoin
* OrderBy
* TopN
* Limit
* Values
* More may be added as a stretch goal.
Need to spill the data to local disk. This will be done after the first operators are implemented.

- Can potentially rip out the [`memory_pool`](https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/index.html)

---


# Final Design

![bg right:70% 100%](./images/architecture.drawio.svg)


---


# Testing
* Unit tests for each operator
* Timing each operator's performance to benchmark our code

- Unit tests for each operator
- Timing each operator's performance to benchmark our code

---


# For the sake of code quality...

* Pair programming (all combinations: KC, KS, CS)
* Unit testing for each operator
* Integrated tests across mutliple operators


---


# Goals

* 75%: First 7 operators working + integration with other components
* 100%: All operators listed above working
* 125%: TPC-H benchmark working

- Pair programming (all combinations: KC, KS, CS)
- Unit testing for each operator
- Integrated tests across mutliple operators

---

# Example Operator Workflow

# Stretch Goal

* Integrating with a DBMS
* Testing against TPC-H or TPC-H like workload
* Add a lot of statistics and timers to each operator (for optimizer's sake)

![bg right:70% 80%](./images/hashjoin.svg)

---

# Goals

# Potential `StorageClient` API

```rust
/// Will probably end up re-exporting this type:
pub type SendableRecordBatchStream =
Pin<Box<
dyn RecordBatchStream<Item =
Result<RecordBatch, DataFusionError>
> + Send
>>;

impl StorageClient {
/// Have some sort of way to create a `StorageClient` on our local node.
pub fn new(_id: usize) -> Self {
Self
}

pub async fn request_data(
&self,
_request: BlobData,
) -> SendableRecordBatchStream {
todo!()
}
}
```

- 75%: First 7 operators working + integration with other components
- 100%: All operators listed above working
- 125%: TPC-H benchmark working

---

# Stretch Goal

# Example usage of the storage client

```rust
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize a storage client
let sc = storage_client::StorageClient::new(42);

// Formualte a request we want to make to the storage client
let request = create_column_request();

// Request data from the storage client
// Note that this request could fail
let stream = sc.request_data(request).await?;

// Executor node returns a future containing
// another stream that can be sent to another operator
let table_scan_node = operators::TableScan::new();
let result = table_scan_node.execute_with_stream(stream);

Ok(())
}
```
- Integrating with a DBMS
- Testing against TPC-H or TPC-H like workload
- Add a lot of statistics and timers to each operator (for optimizer's sake)
Binary file modified proposal/presentation.pdf
Binary file not shown.
5 changes: 5 additions & 0 deletions queries/filter_project_sort.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT orders.o_totalprice, orders.o_orderdate
FROM orders
WHERE
orders.o_totalprice < 900.00
ORDER BY orders.o_totalprice;

0 comments on commit 58624ea

Please sign in to comment.