diff --git a/README.md b/README.md index 8b5dc1f..f70e799 100644 --- a/README.md +++ b/README.md @@ -14,12 +14,12 @@ Developed by Chai Wen Xuan 2021 ```python import pandas as pd -from influxdbDataProcessor.processor import processcsvdata +from influxdbDataProcessor.processor import processCsvData -df = processcsvdata() +df = processCsvData() ``` -#### Required input: +#### User will be required to input: 1. Token 2. influxDb url 3. Organization @@ -29,7 +29,7 @@ df = processcsvdata() 7. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s') #### CSV file format: -- Only two column ("Measurement", "Field") +- Contains two column ("Measurement", "Field") ### Processing array data @@ -37,12 +37,15 @@ df = processcsvdata() ```python import pandas as pd -from influxdbDataProcessor.processor import processarraydata +from influxdbDataProcessor.processor import processArrayData -df = processarraydata(measurementArr,fieldArr) +measurementArr = ["Air Conditioner"] +fieldArr = ["Temperature"] + +df = processArrayData(measurementArr,fieldArr) ``` -#### Required input: +#### User will be required to input: 1. Token 2. influxDb url 3. Organization @@ -50,6 +53,24 @@ df = processarraydata(measurementArr,fieldArr) 5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s') 6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s') +### Ready input by user before calling function +```python +import pandas as pd + +from influxdbDataProcessor.processor import processArrayData + +df = processData(token,url,org,bucket,samplerange,length,mea,field) +``` + +#### User will be required to input: +1. Token +2. influxDb url +3. Organization +4. Bucket name +5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s') +6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s') +7. MeasurementArr +8. FieldArr diff --git a/build/lib/influxdbDataProcessor/__init__.py b/build/lib/influxdbDataProcessor/__init__.py index 57617c6..58832e7 100644 --- a/build/lib/influxdbDataProcessor/__init__.py +++ b/build/lib/influxdbDataProcessor/__init__.py @@ -1,2 +1,3 @@ -from influxdbDataProcessor.processor import processcsvdata -from influxdbDataProcessor.processor import processarraydata \ No newline at end of file +from influxdbDataProcessor.processor import processCsvData +from influxdbDataProcessor.processor import processArrayData +from influxdbDataProcessor.processor import processData \ No newline at end of file diff --git a/build/lib/influxdbDataProcessor/processor.py b/build/lib/influxdbDataProcessor/processor.py index 22cda89..64a9efa 100644 --- a/build/lib/influxdbDataProcessor/processor.py +++ b/build/lib/influxdbDataProcessor/processor.py @@ -3,7 +3,7 @@ import numpy as np import scipy as sp -def processcsvdata(): +def processCsvData(): print("Token: ") token = input() @@ -95,7 +95,7 @@ def processcsvdata(): return df_final -def processarraydata(mea,field): +def processArrayData(mea,field): print("Token: ") token = input() @@ -171,3 +171,61 @@ def processarraydata(mea,field): return df_final +def processData(token,url,org,bucket,samplerange,length,mea,field): + + client = influxdb_client.InfluxDBClient( + url=url, + token=token, + org=org + ) + + query_api = client.query_api() + + df_final = pd.DataFrame([]) + + print(len(mea)) + for i in range(len(mea)): + query = ' from(bucket:"' + bucket + '")\ + |> range(start: -' + length + ')\ + |> filter(fn:(r) => r._measurement == "' + mea[i] + '")\ + |> filter(fn:(r) => r._field == "' + field[i] + '" )' + + print("Measurement : ", mea[i]) + print("field : ", field[i]) + + result = query_api.query(org=org, query=query) + if len(result): + for table in result: + dev_name = table.records[0]["dev_name"] + df = pd.DataFrame([], columns=["Time", str(dev_name) + " Value"]) + print("dev_name: ", dev_name) + for record in table.records: + # print(record["dev_name"]) + value = record.get_value() + if value.is_integer(): + pass + else: + value = np.nan + + df = df.append( + pd.DataFrame([[record.get_time(), value]], columns=["Time", str(dev_name) + " Value"]), + ignore_index=True) + + df['Time'] = pd.to_datetime(df['Time']) + df.index = df['Time'] + del df['Time'] + # print(df[:11]) + df_interpol = df.resample(samplerange).mean() + # print(df_interpol[:11]) + + for x in df: + df_interpol[x] = df_interpol[x].interpolate(method="quadratic") + + if df_final.empty: + df_final = df_interpol + else: + df_final = pd.concat([df_final, df_interpol], axis=1) + # print(df_interpol) + + return df_final + diff --git a/dist/influxdb-data-processor-0.0.4.tar.gz b/dist/influxdb-data-processor-0.0.4.tar.gz new file mode 100644 index 0000000..88cd7cf Binary files /dev/null and b/dist/influxdb-data-processor-0.0.4.tar.gz differ diff --git a/dist/influxdb_data_processor-0.0.4-py3-none-any.whl b/dist/influxdb_data_processor-0.0.4-py3-none-any.whl new file mode 100644 index 0000000..2bd5f6e Binary files /dev/null and b/dist/influxdb_data_processor-0.0.4-py3-none-any.whl differ diff --git a/influxdbDataProcessor/__init__.py b/influxdbDataProcessor/__init__.py index 57617c6..58832e7 100644 --- a/influxdbDataProcessor/__init__.py +++ b/influxdbDataProcessor/__init__.py @@ -1,2 +1,3 @@ -from influxdbDataProcessor.processor import processcsvdata -from influxdbDataProcessor.processor import processarraydata \ No newline at end of file +from influxdbDataProcessor.processor import processCsvData +from influxdbDataProcessor.processor import processArrayData +from influxdbDataProcessor.processor import processData \ No newline at end of file diff --git a/influxdbDataProcessor/processor.py b/influxdbDataProcessor/processor.py index 22cda89..64a9efa 100644 --- a/influxdbDataProcessor/processor.py +++ b/influxdbDataProcessor/processor.py @@ -3,7 +3,7 @@ import numpy as np import scipy as sp -def processcsvdata(): +def processCsvData(): print("Token: ") token = input() @@ -95,7 +95,7 @@ def processcsvdata(): return df_final -def processarraydata(mea,field): +def processArrayData(mea,field): print("Token: ") token = input() @@ -171,3 +171,61 @@ def processarraydata(mea,field): return df_final +def processData(token,url,org,bucket,samplerange,length,mea,field): + + client = influxdb_client.InfluxDBClient( + url=url, + token=token, + org=org + ) + + query_api = client.query_api() + + df_final = pd.DataFrame([]) + + print(len(mea)) + for i in range(len(mea)): + query = ' from(bucket:"' + bucket + '")\ + |> range(start: -' + length + ')\ + |> filter(fn:(r) => r._measurement == "' + mea[i] + '")\ + |> filter(fn:(r) => r._field == "' + field[i] + '" )' + + print("Measurement : ", mea[i]) + print("field : ", field[i]) + + result = query_api.query(org=org, query=query) + if len(result): + for table in result: + dev_name = table.records[0]["dev_name"] + df = pd.DataFrame([], columns=["Time", str(dev_name) + " Value"]) + print("dev_name: ", dev_name) + for record in table.records: + # print(record["dev_name"]) + value = record.get_value() + if value.is_integer(): + pass + else: + value = np.nan + + df = df.append( + pd.DataFrame([[record.get_time(), value]], columns=["Time", str(dev_name) + " Value"]), + ignore_index=True) + + df['Time'] = pd.to_datetime(df['Time']) + df.index = df['Time'] + del df['Time'] + # print(df[:11]) + df_interpol = df.resample(samplerange).mean() + # print(df_interpol[:11]) + + for x in df: + df_interpol[x] = df_interpol[x].interpolate(method="quadratic") + + if df_final.empty: + df_final = df_interpol + else: + df_final = pd.concat([df_final, df_interpol], axis=1) + # print(df_interpol) + + return df_final + diff --git a/influxdb_data_processor.egg-info/PKG-INFO b/influxdb_data_processor.egg-info/PKG-INFO index 1c2b10d..79e71ab 100644 --- a/influxdb_data_processor.egg-info/PKG-INFO +++ b/influxdb_data_processor.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: influxdb-data-processor -Version: 0.0.3 +Version: 0.0.4 Summary: A data processor for data in influxDB Home-page: UNKNOWN Author: Pikacent (Chai Wen Xuan) @@ -23,12 +23,12 @@ Description: ```python import pandas as pd - from influxdbDataProcessor.processor import processcsvdata + from influxdbDataProcessor.processor import processCsvData - df = processcsvdata() + df = processCsvData() ``` - #### Required input: + #### User will be required to input: 1. Token 2. influxDb url 3. Organization @@ -38,7 +38,7 @@ Description: 7. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s') #### CSV file format: - - Only two column ("Measurement", "Field") + - Contains two column ("Measurement", "Field") ### Processing array data @@ -46,12 +46,15 @@ Description: ```python import pandas as pd - from influxdbDataProcessor.processor import processarraydata + from influxdbDataProcessor.processor import processArrayData - df = processarraydata(measurementArr,fieldArr) + measurementArr = ["Air Conditioner"] + fieldArr = ["Temperature"] + + df = processArrayData(measurementArr,fieldArr) ``` - #### Required input: + #### User will be required to input: 1. Token 2. influxDb url 3. Organization @@ -59,6 +62,24 @@ Description: 5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s') 6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s') + ### Ready input by user before calling function + ```python + import pandas as pd + + from influxdbDataProcessor.processor import processArrayData + + df = processData(token,url,org,bucket,samplerange,length,mea,field) + ``` + + #### User will be required to input: + 1. Token + 2. influxDb url + 3. Organization + 4. Bucket name + 5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s') + 6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s') + 7. MeasurementArr + 8. FieldArr diff --git a/setup.py b/setup.py index 8b964c7..9ec6b5f 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with codecs.open(os.path.join(here, "README.md"), encoding="utf-8") as fh: long_description = "\n" + fh.read() -VERSION = '0.0.3' +VERSION = '0.0.4' DESCRIPTION = 'A data processor for data in influxDB' # Setting up