-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CrateDB] Add support for data acquisition and data export #148
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main crate/crate-python#148 +/- ##
==========================================
+ Coverage 78.59% 78.86% +0.26%
==========================================
Files 55 58 +3
Lines 3014 3180 +166
==========================================
+ Hits 2369 2508 +139
- Misses 645 672 +27
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
f395199
to
f774a90
Compare
Grafana instant dashboardsAbout8abe55d added baseline support for producing Grafana instant dashboards, and 9c663b2 now improves it by using proper time bucketing within the standard SQL statement template, based on emulating Reference documentation
ThanksThank you for the guidance, @seut and @hammerhead. |
a609bbb
to
9c663b2
Compare
fcd4379
to
2a2ec79
Compare
def record_from_dict(item): | ||
record = OrderedDict() | ||
record.update({"time": item["time"]}) | ||
record.update(item["tags"]) | ||
record.update(item["fields"]) | ||
return record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking for a function to merge those two objects, tags
and fields
, into a single record, but have not been able to discover it. Only now, I luckily discovered the concat(object, object)
function in @proddata's tutorial 1, and that it can also operate on objects, effectively merging those.
I think the two reasons why I have not been able to discover this function were:
a) That both sets of functions operating on container data types 23 have been on the "Scalar functions" page, and I did not expect to find them there.
b) That the search term "merge" did not occur in the corresponding documentation section of the concat(object, object)
function, contrary to the documentation of the array_unique()
function.
The
concat(object, object)
function combines two objects into a new object.
Thearray_unique(array, array, ...)
function merges two arrays into one array with unique elements.
Please let me know if you think this could be improved on the CrateDB documentation.
Footnotes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. concat()
does not completely do what I am aiming at here.
cr> select time, concat(tags, fields) from mqttkit_2_itest.foo_bar_sensors;
+---------------+------------------------------------------+
| time | concat(tags, fields) |
+---------------+------------------------------------------+
| 1687469154383 | {"humidity": 83.1, "temperature": 42.84} |
+---------------+------------------------------------------+
It is nice that it will merge two objects, but now, I would like to destructure the top-level attributes of that single object into individual fields again.
Is there any chance to do this, or, if not, would submitting a corresponding feature request make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think we can break down the object into fields at the moment, but maybe using object_keys
and then referencing the fields one by one could be sufficient for what you are trying to do?
@@ -26,14 +26,16 @@ Infrastructure components | |||
|
|||
- Kotori_, a data acquisition, graphing and telemetry toolkit | |||
- Grafana_, a graph and dashboard builder for visualizing time series metrics | |||
- InfluxDB_, a time-series database | |||
- CrateDB_, a time-series database ¹ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anyone have a better suggestion, describing CrateDB within a single, short and concise sentence?
- CrateDB_, a time-series database ¹ | |
- CrateDB_, a time-series database with document features and more ¹ |
def write(self, meta, data): | ||
""" | ||
Format ingress data chunk and store it into database table. | ||
|
||
TODO: This dearly needs efficiency improvements. Currently, there is no | ||
batching, just single records/inserts. That yields bad performance. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not be forgotten. When wrapping up all review comments, put this TODO item into the backlog for a subsequent iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most probably, we should look at using the improvement from crate/crate-python#553 here, if that would be applicable.
class TimezoneAwareCrateJsonEncoder(json.JSONEncoder): | ||
epoch_aware = datetime(1970, 1, 1, tzinfo=pytz.UTC) | ||
epoch_naive = datetime(1970, 1, 1) | ||
|
||
def default(self, o): | ||
if isinstance(o, Decimal): | ||
return str(o) | ||
if isinstance(o, datetime): | ||
if o.tzinfo: | ||
delta = o - self.epoch_aware | ||
else: | ||
delta = o - self.epoch_naive | ||
return int(delta.microseconds / 1000.0 + | ||
(delta.seconds + delta.days * 24 * 3600) * 1000.0) | ||
if isinstance(o, date): | ||
return calendar.timegm(o.timetuple()) * 1000 | ||
return json.JSONEncoder.default(self, o) | ||
|
||
|
||
# Monkey patch. | ||
# TODO: Submit upstream. | ||
crate.client.http.CrateJsonEncoder = TimezoneAwareCrateJsonEncoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This monkeypatch should be submitted upstream to the Python driver on behalf of the crate
Python package. It may resolve https://github.com/crate/crate-python/issues/361.
Effectively, it is only this change:
if o.tzinfo:
delta = o - self.epoch_aware
else:
delta = o - self.epoch_naive
# TODO: Add querying by tags. | ||
tags = {} | ||
# tags = CrateDBAdapter.get_tags(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not forget about implementing this, querying by tags. It hasn't been implemented for InfluxDB, but that does not mean it should stay like this.
{ | ||
"alias": "{{ alias }}", | ||
"format": "table", | ||
"resultFormat": "time_series", | ||
"tags": {{ tags }}, | ||
"groupByTags": [], | ||
"measurement": "{{ measurement }}", | ||
"rawQuery": true, | ||
"rawSql": "SELECT $__timeGroupAlias(time, $__interval), MEAN(fields['{{ name }}']) AS {{ alias }} FROM {{ table }} WHERE $__timeFilter(time) GROUP BY time ORDER BY time" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hammerhead helped me to discover the right solution for the SQL query here, and he also told me that the DATE_BIN()
function, which @seut recommended to use, does not yet understand Grafana's interval values.
This issue is already being tracked at crate/crate#14211. After it has been resolved, adjust the SQL statement template here, to use DATE_BIN()
instead of $__timeGroupAlias(time, $__interval)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"groupByTags": [], | ||
"measurement": "{{ measurement }}", | ||
"rawQuery": true, | ||
"rawSql": "SELECT $__timeGroupAlias(time, $__interval), MEAN(fields['{{ name }}']) AS {{ alias }} FROM {{ table }} WHERE $__timeFilter(time) GROUP BY time ORDER BY time" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On behalf of a subsequent iteration, we may also want to demonstrate advanced downsampling on a secondary panel, using the largest triangle three buckets (LTTB) algorithm, as presented by @hlcianfagna 1, when possible.
Footnotes
- Extremely fast distributed query execution. | ||
- Auto-partitioning, auto-sharding, and auto-replication. | ||
- Self-healing and auto-rebalancing. | ||
- User-defined functions (UDFs) can be used to extend the functionality of CrateDB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User-defined functions are not mentioned on the canonical CrateDB README at all. It should be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. code-block:: sql | ||
|
||
-- An SQL DDL statement defining a custom schema for holding sensor data. | ||
CREATE TABLE iot_data ( | ||
timestamp TIMESTAMP WITH TIME ZONE, | ||
sensor_data OBJECT (DYNAMIC) AS ( | ||
temperature FLOAT, | ||
humidity FLOAT, | ||
location OBJECT (DYNAMIC) AS ( | ||
latitude DOUBLE PRECISION, longitude DOUBLE PRECISION | ||
) | ||
) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the only SQL DDL statement within the "query examples" section. Adding a few more, including use of other CrateDB special data types, may be sensible. Do you have any suggestions in your toolboxes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance of using GEO_POINT
maybe?
kotori/daq/storage/cratedb.py
Outdated
CREATE TABLE IF NOT EXISTS {tablename} ( | ||
time TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, | ||
tags OBJECT(DYNAMIC), | ||
fields OBJECT(DYNAMIC) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use such a DDL from the very beginning, as partitioning by year seems to be a general useful approach to be used as a reasonable default, @hlcianfagna?
CREATE TABLE IF NOT EXISTS {tablename} ( | |
time TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, | |
tags OBJECT(DYNAMIC), | |
fields OBJECT(DYNAMIC) | |
); | |
CREATE TABLE IF NOT EXISTS {tablename} ( | |
time TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, | |
tags OBJECT(DYNAMIC), | |
fields OBJECT(DYNAMIC), | |
year TIMESTAMP GENERATED ALWAYS AS DATE_TRUNC('year', time) | |
) PARTITIONED BY (year); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you went for this, I think it is a good idea, yes
self.db_client = client.connect( | ||
self.host_uri, username=self.username, password=self.password, pool_size=20, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verify and demonstrate connecting also to CrateDB Cloud, maybe on behalf of a subsequent iteration.
into tables. Tables are grouped into schemas, which is equivalent to the concept of hosting | ||
multiple databases on the same server instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@proddata mentioned that, at least from a PostgreSQL perspective, databases are more like catalogs. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions by @hammerhead. Thanks.
@@ -85,8 +85,8 @@ We are standing on the shoulders of giants: | |||
- Leverage the open infrastructure based on Twisted_ - an event-driven networking engine - | |||
to implement custom software components. | |||
- Listen and talk M2M_ using the *MQ Telemetry Transport* connectivity protocol and software bus (MQTT_). | |||
- Store data points into InfluxDB_, a leading open source time series database suitable | |||
for realtime analytics and sensor data storage. | |||
- Store data points into CrateDB_, InfluxDB_, or other open source time series databases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Store data points into CrateDB_, InfluxDB_, or other open source time series databases | |
- Store data points into CrateDB_, InfluxDB_, or other open source time-series databases |
@@ -26,14 +26,16 @@ Infrastructure components | |||
|
|||
- Kotori_, a data acquisition, graphing and telemetry toolkit | |||
- Grafana_, a graph and dashboard builder for visualizing time series metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Grafana_, a graph and dashboard builder for visualizing time series metrics | |
- Grafana_, a graph and dashboard builder for visualizing time-series metrics |
|
||
| ¹ MongoDB is only required when doing CSV data acquisition, so it is completely | ||
| ¹ Kotori can either use CrateDB or InfluxDB as timeseries database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ¹ Kotori can either use CrateDB or InfluxDB as timeseries database. | |
| ¹ Kotori can either use CrateDB or InfluxDB as time-series database. |
Purpose | ||
======= | ||
|
||
Kotori uses CrateDB to store **timeseries-data** of data acquisition channels. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kotori uses CrateDB to store **timeseries-data** of data acquisition channels. | |
Kotori uses CrateDB to store **time-series data** of data acquisition channels. |
and based on Lucene. | ||
|
||
<small> | ||
<strong>Categories:</strong> timeseries-database, multi-modal database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<strong>Categories:</strong> timeseries-database, multi-modal database | |
<strong>Categories:</strong> time-series database, multi-modal database |
CrateDB | ||
======= | ||
|
||
This example uses CrateDB as timeseries-database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example uses CrateDB as timeseries-database. | |
This example uses CrateDB as time-series database. |
elif "influxdb" in self.config: | ||
self.dbtype = TimeseriesDatabaseType.INFLUXDB1 | ||
else: | ||
raise ValueError("Timeseries database type not defined") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise ValueError("Timeseries database type not defined") | |
raise ValueError("Time-series database type not defined") |
Submit single reading in JSON format to HTTP API and proof | ||
it can be retrieved back from the HTTP API in different formats. | ||
|
||
This uses CrateDB as timeseries database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses CrateDB as timeseries database. | |
This uses CrateDB as time-series database. |
Apply time bucketing over interval obtained from Grafana's date range picker. Emulate `GROUP BY DATE_BIN()` by using Grafana's `$__timeGroupAlias` macro for casting `$__interval` values, until CrateDB's `DATE_BIN()` function understands Grafana's native interval values.
About
Migrating to InfluxDB version 2 would mean to leave SQL behind 1. While using the Flux query language is intriguing, and I will not reject bringing in support for InfluxDB2 and its successor IOx, supporting an SQL-based timeseries database makes sense for me, and this time maybe even a more capable one than InfluxDB in terms of broader support for data types and SQL operations.
So, I think viable alternatives are both CrateDB and TimescaleDB 2, which may even share parts of their corresponding adapter implementations, because both are building upon PostgreSQL standards. This patch makes a start by adding support for CrateDB, let's have a look at TimescaleDB later.
Documentation
https://kotori--148.org.readthedocs.build/en/148/database/cratedb.html
Backlog
max_by
andmin_by
could also be utilized in a sensible way.Footnotes
https://docs.influxdata.com/influxdb/v2.7/query-data/flux/ ↩
With the drawback that TimescaleDB also changed the license for parts of their code to non-FOSS, see https://github.com/timescale/timescaledb/blob/main/tsl/LICENSE-TIMESCALE. ↩