Skip to content

Commit

Permalink
Refactor Limit struct to use usize for offset and add new constructor…
Browse files Browse the repository at this point in the history
… method
  • Loading branch information
holicc committed May 28, 2024
1 parent 34ba102 commit 83e34d1
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 18 deletions.
12 changes: 9 additions & 3 deletions src/logical/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use super::{
expr::{AggregateExpr, LogicalExpr, SortExpr},
plan::{Aggregate, CrossJoin, EmptyRelation, Join, LogicalPlan, Projection, Sort, TableScan},
plan::{Aggregate, CrossJoin, EmptyRelation, Join, Limit, LogicalPlan, Projection, Sort, TableScan},
};
use crate::{common::JoinType, error::Result};
use crate::{common::OwnedTableRelation, datasource::DataSource};
Expand Down Expand Up @@ -98,7 +98,13 @@ impl LogicalPlanBuilder {
})
}

pub(crate) fn limit(&self) -> Self {
todo!()
pub fn limit(self, limit: i64, offset: i64) -> Self {
LogicalPlanBuilder {
plan: LogicalPlan::Limit(Limit {
input: Box::new(self.plan),
fetch: limit as usize,
offset: offset as usize,
}),
}
}
}
7 changes: 7 additions & 0 deletions src/logical/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,10 @@ impl Display for LogicalExpr {
}
}
}

pub(crate) fn get_expr_value(expr: LogicalExpr) -> Result<i64> {
match expr {
LogicalExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(v),
_ => Err(Error::InternalError(format!("Unexpected expression in"))),
}
}
36 changes: 36 additions & 0 deletions src/logical/plan/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::fmt::Display;

use arrow::datatypes::SchemaRef;

use crate::logical::plan::LogicalPlan;

#[derive(Debug, Clone)]
pub struct Limit {
pub input: Box<LogicalPlan>,
pub fetch: usize,
pub offset: usize,
}

impl Display for Limit {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Limit: fetch={}, offset={}", self.fetch, self.offset)
}
}

impl Limit {
pub fn new(input: LogicalPlan, fetch: usize, offset: usize) -> Self {
Self {
input: Box::new(input),
fetch,
offset,
}
}

pub fn schema(&self) -> SchemaRef {
self.input.schema()
}

pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
self.input.children()
}
}
7 changes: 7 additions & 0 deletions src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod aggregate;
mod filter;
mod join;
mod limit;
mod projection;
mod scan;
mod sort;
Expand All @@ -9,6 +10,7 @@ mod sub_query;
pub use aggregate::Aggregate;
pub use filter::Filter;
pub use join::*;
pub use limit::Limit;
pub use projection::Projection;
pub use scan::TableScan;
pub use sort::*;
Expand Down Expand Up @@ -49,6 +51,8 @@ pub enum LogicalPlan {
SubqueryAlias(SubqueryAlias),
/// Sort the result set by the specified expressions.
Sort(Sort),
/// Limit the number of rows in the result set, and optionally an offset.
Limit(Limit),
}

impl LogicalPlan {
Expand All @@ -63,6 +67,7 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(s) => s.schema(),
LogicalPlan::Join(j) => j.schema(),
LogicalPlan::Sort(s) => s.schema(),
LogicalPlan::Limit(l) => l.schema(),
}
}

Expand All @@ -77,6 +82,7 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(s) => s.children(),
LogicalPlan::Join(j) => j.children(),
LogicalPlan::Sort(s) => s.children(),
LogicalPlan::Limit(l) => l.children(),
}
}
}
Expand All @@ -93,6 +99,7 @@ impl std::fmt::Display for LogicalPlan {
LogicalPlan::SubqueryAlias(s) => write!(f, "{}", s),
LogicalPlan::Join(j) => write!(f, "{}", j),
LogicalPlan::Sort(s) => write!(f, "{}", s),
LogicalPlan::Limit(l) => write!(f, "{}", l),
}
}
}
75 changes: 75 additions & 0 deletions src/physical/plan/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;

use crate::error::Result;
use crate::physical::plan::PhysicalPlan;

use std::sync::Arc;

pub struct Limit {
pub input: Arc<dyn PhysicalPlan>,
pub fetch: usize,
pub offset: usize,
}

impl Limit {
pub fn new(input: Arc<dyn PhysicalPlan>, fetch: usize, offset: Option<usize>) -> Self {
Self {
input,
fetch,
offset: offset.unwrap_or_default(),
}
}
}

impl PhysicalPlan for Limit {
fn schema(&self) -> SchemaRef {
self.input.schema()
}

fn execute(&self) -> Result<Vec<RecordBatch>> {
Ok(self
.input
.execute()?
.into_iter()
.map(|batch| batch.slice(self.offset, self.fetch))
.collect())
}

fn children(&self) -> Option<Vec<Arc<dyn PhysicalPlan>>> {
self.input.children()
}
}

#[cfg(test)]
mod test {
use crate::{build_table_scan, physical::plan::PhysicalPlan, test_utils::assert_batch_eq};

use super::Limit;

#[test]
fn test_limit() {
let input = build_table_scan!(
("a", Int32Type, DataType::Int32, vec![1, 2, 3, 4]),
("b", Float64Type, DataType::Float64, vec![1.0, 2.0, 3.0, 4.0]),
("c", UInt64Type, DataType::UInt64, vec![1, 2, 3, 4]),
);

let limit = Limit::new(input, 3, Some(1));

let reuslts = limit.execute().unwrap();

assert_batch_eq(
&reuslts,
vec![
"+---+-----+---+",
"| a | b | c |",
"+---+-----+---+",
"| 2 | 2.0 | 2 |",
"| 3 | 3.0 | 3 |",
"| 4 | 4.0 | 4 |",
"+---+-----+---+",
],
)
}
}
2 changes: 2 additions & 0 deletions src/physical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ mod join;
mod projection;
mod scan;
mod sort;
mod limit;

pub use aggregate::HashAggregate;
pub use filter::Filter;
pub use join::{join_schema, ColumnIndex, CrossJoin, Join, JoinFilter, JoinSide};
pub use projection::Projection;
pub use scan::Scan;
pub use limit::Limit;

use std::sync::Arc;

Expand Down
1 change: 1 addition & 0 deletions src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl QueryPlanner for DefaultQueryPlanner {
LogicalPlan::SubqueryAlias(_) => todo!(),
LogicalPlan::Join(join) => self.physical_plan_join(join),
LogicalPlan::Sort(_) => todo!(),
LogicalPlan::Limit(_) => todo!(),
}
}

Expand Down
18 changes: 5 additions & 13 deletions src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ impl<'a> SqlQueryPlanner<'a> {
}
}

fn create_logical_expr(&self) -> Result<LogicalPlan> {
todo!()
}

fn select_to_plan(&mut self, select: Select, mut context: &mut PlannerContext) -> Result<LogicalPlan> {
// process `with` clause
if let Some(with) = select.with {
Expand Down Expand Up @@ -95,10 +91,10 @@ impl<'a> SqlQueryPlanner<'a> {

// process the LIMIT clause
if let (Some(limit), Some(offset)) = (select.limit, select.offset) {
let limit = self.sql_to_expr(context, limit)?;
let offset = self.sql_to_expr(context, offset)?;
// Ok(LogicalPlanBuilder::from(plan).limit(limit).offset(offset)?)
Ok(plan)
let limit = self.sql_to_expr(context, limit).and_then(get_expr_value)?;
let offset = self.sql_to_expr(context, offset).and_then(get_expr_value)?;

Ok(LogicalPlanBuilder::from(plan).limit(limit, offset).build())
} else {
Ok(plan)
}
Expand Down Expand Up @@ -819,13 +815,9 @@ mod tests {
#[test]
fn test_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
let expected = "Limit: skip=0, fetch=5\
\n Projection: person.id\
\n Filter: person.id > Int64(100)\
\n TableScan: person";
let expected = "Limit: fetch=5, offset=0\n Filter: person.id > Int64(100)\n TableScan: person\n";
quick_test(sql, expected);

// Flip the order of LIMIT and OFFSET in the query. Plan should remain the same.
let sql = "SELECT id FROM person WHERE person.id > 100 OFFSET 0 LIMIT 5;";
quick_test(sql, expected);
}
Expand Down
5 changes: 3 additions & 2 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ macro_rules! build_schema {
#[macro_export]
macro_rules! build_table_scan {
( $(($column: expr, $data_type: ty, $f_dy: expr, $data: expr)),+$(,)? ) => {
{
{
use crate::datasource::memory::MemoryDataSource;
use crate::physical::plan::Scan;
use arrow::array::{Array, PrimitiveArray};
use arrow::array::{Array,RecordBatch, PrimitiveArray};
use arrow::datatypes::*;
use std::sync::Arc;

let schema = Schema::new(vec![
$(
Expand Down

0 comments on commit 83e34d1

Please sign in to comment.