diff --git a/js/src/components/env/EnvInfo.tsx b/js/src/components/env/EnvInfo.tsx index ba139fa6..c9f6cf8b 100644 --- a/js/src/components/env/EnvInfo.tsx +++ b/js/src/components/env/EnvInfo.tsx @@ -136,37 +136,62 @@ export function EnvInfo() { )} - - DBT - - - - - - - - - - - - - - - - - - - - - - - - - - -
currentbase
schema{JSON.stringify(Array.from(currentSchemas))}{JSON.stringify(Array.from(baseSchemas))}
version{dbtCurrent?.dbt_version}{dbtBase?.dbt_version}
timestamp{currentTime}{baseTime}
-
-
+ {envInfo?.adapterType === "dbt" && ( + + DBT + + + + + + + + + + + + + + + + + + + + + + + + + + +
basecurrent
schema{JSON.stringify(Array.from(baseSchemas))}{JSON.stringify(Array.from(currentSchemas))}
version{dbtBase?.dbt_version}{dbtCurrent?.dbt_version}
timestamp{baseTime}{currentTime}
+
+
+ )} + {envInfo?.adapterType === "sqlmesh" && ( + + SQLMesh + + + + + + + + + + + + + + + + +
basecurrent
Environment{envInfo?.sqlmesh?.base_env}{envInfo?.sqlmesh?.current_env}
+
+
+ )} diff --git a/js/src/components/query/QueryPage.tsx b/js/src/components/query/QueryPage.tsx index d3558939..e95f7234 100644 --- a/js/src/components/query/QueryPage.tsx +++ b/js/src/components/query/QueryPage.tsx @@ -1,7 +1,10 @@ import React, { useState, useCallback } from "react"; import { Box, Button, Flex } from "@chakra-ui/react"; import SqlEditor from "./SqlEditor"; -import { useRecceQueryContext } from "@/lib/hooks/RecceQueryContext"; +import { + defaultSqlQuery, + useRecceQueryContext, +} from "@/lib/hooks/RecceQueryContext"; import { createCheckByRun, updateCheck } from "@/lib/api/checks"; import { QueryDiffResultView } from "./QueryDiffResultView"; @@ -18,9 +21,16 @@ import { QueryResultView } from "./QueryResultView"; import { cancelRun, waitRun } from "@/lib/api/runs"; import { RunView } from "../run/RunView"; import { Run } from "@/lib/api/types"; +import { useLineageGraphContext } from "@/lib/hooks/LineageGraphContext"; export const QueryPage = () => { - const { sqlQuery, setSqlQuery } = useRecceQueryContext(); + const { sqlQuery: _sqlQuery, setSqlQuery } = useRecceQueryContext(); + const { envInfo } = useLineageGraphContext(); + + let sqlQuery = _sqlQuery; + if (envInfo?.adapterType === "sqlmesh" && _sqlQuery === defaultSqlQuery) { + sqlQuery = `select * from db.mymodel`; + } const [runType, setRunType] = useState(); const [runId, setRunId] = useState(); diff --git a/js/src/lib/api/info.ts b/js/src/lib/api/info.ts index 4aedfe36..68412285 100644 --- a/js/src/lib/api/info.ts +++ b/js/src/lib/api/info.ts @@ -37,6 +37,10 @@ export interface ManifestMetadata extends ArtifactMetadata { project_name?: string; user_id?: string; } +export interface SQLMeshInfo { + base_env: string; + current_env: string; +} export interface CatalogMetadata extends ArtifactMetadata {} @@ -54,7 +58,6 @@ export interface LineageData { manifest_metadata?: ManifestMetadata | null; catalog_metadata?: CatalogMetadata | null; } - interface LineageOutput { error?: string; data?: LineageData; @@ -123,6 +126,7 @@ export interface ServerInfoResult { review_mode: boolean; git?: gitInfo; pull_request?: pullRequestInfo; + sqlmesh?: SQLMeshInfo; lineage: { base: LineageData; current: LineageData; diff --git a/js/src/lib/hooks/LineageGraphContext.tsx b/js/src/lib/hooks/LineageGraphContext.tsx index afefc9ac..bd995c0d 100644 --- a/js/src/lib/hooks/LineageGraphContext.tsx +++ b/js/src/lib/hooks/LineageGraphContext.tsx @@ -10,6 +10,7 @@ import React, { import { cacheKeys } from "../api/cacheKeys"; import { ManifestMetadata, + SQLMeshInfo, getServerInfo, gitInfo, pullRequestInfo, @@ -27,6 +28,7 @@ interface EnvInfo { base: ManifestMetadata | undefined | null; current: ManifestMetadata | undefined | null; }; + sqlmesh?: SQLMeshInfo | null; } export interface LineageGraphContextType { @@ -137,6 +139,7 @@ export function LineageGraphContextProvider({ children }: LineageGraphProps) { base: dbtBase, current: dbtCurrent, }, + sqlmesh: data?.sqlmesh, }; return ( diff --git a/js/src/lib/hooks/RecceQueryContext.tsx b/js/src/lib/hooks/RecceQueryContext.tsx index 2f835698..837abdf4 100644 --- a/js/src/lib/hooks/RecceQueryContext.tsx +++ b/js/src/lib/hooks/RecceQueryContext.tsx @@ -1,35 +1,34 @@ -import React, { createContext, useContext } from 'react'; +import React, { createContext, useContext } from "react"; export interface QueryContext { - sqlQuery: string; - setSqlQuery: (sqlQuery: string) => void; + sqlQuery: string; + setSqlQuery: (sqlQuery: string) => void; } -const defaultSqlQuery = 'select * from {{ ref("mymodel") }}'; +export const defaultSqlQuery = 'select * from {{ ref("mymodel") }}'; const defaultQueryContext: QueryContext = { - sqlQuery: defaultSqlQuery, - setSqlQuery: () => {}, + sqlQuery: defaultSqlQuery, + setSqlQuery: () => {}, }; const RecceQueryContext = createContext(defaultQueryContext); interface QueryContextProps { - children: React.ReactNode; + children: React.ReactNode; } export function RecceQueryContextProvider({ children }: QueryContextProps) { - const [sqlQuery, setSqlQuery] = React.useState(defaultSqlQuery); - return ( - - {children} - - ); + const [sqlQuery, setSqlQuery] = React.useState(defaultSqlQuery); + return ( + + {children} + + ); } export const useRecceQueryContext = () => useContext(RecceQueryContext); - export interface RowCountStateContext { isNodesFetching: string[]; setIsNodesFetching: (nodes: string[]) => void; @@ -46,10 +45,14 @@ interface RowCountStateContextProps { children: React.ReactNode; } -export function RowCountStateContextProvider({ children }: RowCountStateContextProps) { +export function RowCountStateContextProvider({ + children, +}: RowCountStateContextProps) { const [isNodesFetching, setIsNodesFetching] = React.useState([]); return ( - + {children} ); diff --git a/recce/adapter/sqlmesh_adapter.py b/recce/adapter/sqlmesh_adapter.py index db1be3c7..fa3649ce 100644 --- a/recce/adapter/sqlmesh_adapter.py +++ b/recce/adapter/sqlmesh_adapter.py @@ -3,7 +3,7 @@ from typing import Optional import pandas as pd -from sqlglot import parse_one, Expression +from sqlglot import parse_one, Expression, select from sqlglot.optimizer import traverse_scope from sqlmesh.core.context import Context as SqlmeshContext from sqlmesh.core.environment import Environment @@ -90,18 +90,26 @@ def fetchdf_with_limit( limit: Optional[int] = None ) -> (pd.DataFrame, bool): if isinstance(sql, str): - expression = parse_one(sql) + expression = parse_one(sql, dialect=self.context.default_dialect) else: expression = sql - expression = expression.limit(limit + 1 if limit else None) - env = self.base_env if base else self.curr_env + model_names = [model.name for model in self.context.models.values()] if env.name != 'prod': for scope in traverse_scope(expression): for table in scope.tables: - table.args['db'] = f"{table.args['db']}__{env.name}" - + if f'{table.db}.{table.name}' in model_names: + table.args['db'] = f"{table.args['db']}__{env.name}" + + if limit: + expression = select( + '*' + ).from_( + '__QUERY' + ).with_( + '__QUERY', as_=expression + ).limit(limit + 1 if limit else None) df = self.context.fetchdf(expression) if limit and len(df) > limit: df = df.head(limit) diff --git a/recce/server.py b/recce/server.py index 640e8e03..0853b242 100644 --- a/recce/server.py +++ b/recce/server.py @@ -145,7 +145,7 @@ async def get_info(): state = context.export_state() try: - return { + info = { 'adapter_type': context.adapter_type, 'review_mode': context.review_mode, 'git': state.git.to_dict() if state.git else None, @@ -156,6 +156,16 @@ async def get_info(): }, 'demo': bool(demo) } + + if context.adapter_type == 'sqlmesh': + from recce.adapter.sqlmesh_adapter import SqlmeshAdapter + sqlmesh_adapter: SqlmeshAdapter = context.adapter + info['sqlmesh'] = { + 'base_env': sqlmesh_adapter.base_env.name, + 'current_env': sqlmesh_adapter.curr_env.name, + } + + return info except Exception as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/recce/tasks/dataframe.py b/recce/tasks/dataframe.py index 07bbcfec..ee4ce893 100644 --- a/recce/tasks/dataframe.py +++ b/recce/tasks/dataframe.py @@ -32,10 +32,12 @@ class DataFrame(BaseModel): @staticmethod def from_agate(table: 'agate.Table', limit: t.Optional[int] = None, more: t.Optional[bool] = None): + import dbt.clients.agate_helper + import agate columns = [] for col_name, col_type in zip(table.column_names, table.column_types): - import dbt.clients.agate_helper + has_integer = hasattr(dbt.clients.agate_helper, 'Integer') if isinstance(col_type, agate.Number): diff --git a/recce/tasks/rowcount.py b/recce/tasks/rowcount.py index a8cf72d3..75bec2f1 100644 --- a/recce/tasks/rowcount.py +++ b/recce/tasks/rowcount.py @@ -83,14 +83,14 @@ def execute_sqlmesh(self): curr_row_count = None try: - df = sqlmesh_adapter.fetchdf(f'select count(*) from {name}') + df, _ = sqlmesh_adapter.fetchdf_with_limit(f'select count(*) from {name}', base=True) base_row_count = int(df.iloc[0, 0]) except Exception: pass self.check_cancel() try: - df = sqlmesh_adapter.fetchdf(f'select count(*) from {name}', env='dev') + df, _ = sqlmesh_adapter.fetchdf_with_limit(f'select count(*) from {name}', base=False) curr_row_count = int(df.iloc[0, 0]) except Exception: pass