Skip to content

Commit

Permalink
some more fixing and addition of partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
skarakuzu committed Jan 15, 2025
1 parent add42df commit fa53707
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions tiled/adapters/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,43 @@ def write(
self.cur.adbc_ingest(self.table_name, reader)
self.conn.commit()

def write_partition(
self,
partition: int,
data: Union[List[pyarrow.record_batch], pyarrow.record_batch, pandas.DataFrame],
) -> None:
"""
"Function to write the data as arrow format."
Parameters
----------
data : data to write into arrow file. Can be a list of record batch, or pandas dataframe.
table_name: string indicating the name of the table to ingest data in the database.
Returns
-------
"""
if partition != 0:
raise NotImplementedError

if isinstance(data, pandas.DataFrame):
table = pyarrow.Table.from_pandas(data)
batches = table.to_batches()
else:
if not isinstance(data, list):
batches = [data]
else:
batches = data

schema = batches[
0
].schema # list of column names can be obtained from schema.names

reader = pyarrow.ipc.RecordBatchReader.from_batches(schema, batches)

query = "DROP TABLE IF EXISTS {}".format(self.table_name)
self.cur.execute(query)
self.cur.adbc_ingest(self.table_name, reader)
self.conn.commit()

def append_partition(
self,
data: Union[List[pyarrow.record_batch], pyarrow.record_batch, pandas.DataFrame],
Expand Down Expand Up @@ -285,3 +322,28 @@ def read(self, fields: Optional[Union[str, List[str]]] = None) -> pandas.DataFra
if fields is not None:
return table[fields]
return table

def read_partition(self, partition: int, fields: Optional[Union[str, List[str]]] = None) -> pandas.DataFrame:
"""
The concatenated data from given set of partitions as pyarrow table.
Parameters
----------
table_schema: hashed string or list of strings as column names to be hashed.
for example table_schema = ['f0', 'f1', 'f2'] or '3d51c6b180b64bea848f23e5crd91ea3'
fields: optional string to return the data in the specified field.
Returns
-------
Returns the concatenated pyarrow table as pandas dataframe.
"""
if partition != 0:
raise NotImplementedError

query = "SELECT * FROM {}".format(self.table_name)
self.cur.execute(query)
data = self.cur.fetch_arrow_table()
self.conn.commit()

table = data.to_pandas()
if fields is not None:
return table[fields]
return table

0 comments on commit fa53707

Please sign in to comment.