Skip to content

Commit

Permalink
New Function
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent1218 committed Oct 15, 2021
1 parent 25e7a49 commit 0277bfa
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 24 deletions.
35 changes: 28 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ Developed by Chai Wen Xuan 2021
```python
import pandas as pd

from influxdbDataProcessor.processor import processcsvdata
from influxdbDataProcessor.processor import processCsvData

df = processcsvdata()
df = processCsvData()
```

#### Required input:
#### User will be required to input:
1. Token
2. influxDb url
3. Organization
Expand All @@ -29,27 +29,48 @@ df = processcsvdata()
7. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s')

#### CSV file format:
- Only two column ("Measurement", "Field")
- Contains two column ("Measurement", "Field")


### Processing array data

```python
import pandas as pd

from influxdbDataProcessor.processor import processarraydata
from influxdbDataProcessor.processor import processArrayData

df = processarraydata(measurementArr,fieldArr)
measurementArr = ["Air Conditioner"]
fieldArr = ["Temperature"]

df = processArrayData(measurementArr,fieldArr)
```

#### Required input:
#### User will be required to input:
1. Token
2. influxDb url
3. Organization
4. Bucket name
5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s')
6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s')

### Ready input by user before calling function
```python
import pandas as pd

from influxdbDataProcessor.processor import processArrayData

df = processData(token,url,org,bucket,samplerange,length,mea,field)
```

#### User will be required to input:
1. Token
2. influxDb url
3. Organization
4. Bucket name
5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s')
6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s')
7. MeasurementArr
8. FieldArr



5 changes: 3 additions & 2 deletions build/lib/influxdbDataProcessor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from influxdbDataProcessor.processor import processcsvdata
from influxdbDataProcessor.processor import processarraydata
from influxdbDataProcessor.processor import processCsvData
from influxdbDataProcessor.processor import processArrayData
from influxdbDataProcessor.processor import processData
62 changes: 60 additions & 2 deletions build/lib/influxdbDataProcessor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import scipy as sp

def processcsvdata():
def processCsvData():

print("Token: ")
token = input()
Expand Down Expand Up @@ -95,7 +95,7 @@ def processcsvdata():
return df_final


def processarraydata(mea,field):
def processArrayData(mea,field):

print("Token: ")
token = input()
Expand Down Expand Up @@ -171,3 +171,61 @@ def processarraydata(mea,field):

return df_final

def processData(token,url,org,bucket,samplerange,length,mea,field):

client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)

query_api = client.query_api()

df_final = pd.DataFrame([])

print(len(mea))
for i in range(len(mea)):
query = ' from(bucket:"' + bucket + '")\
|> range(start: -' + length + ')\
|> filter(fn:(r) => r._measurement == "' + mea[i] + '")\
|> filter(fn:(r) => r._field == "' + field[i] + '" )'

print("Measurement : ", mea[i])
print("field : ", field[i])

result = query_api.query(org=org, query=query)
if len(result):
for table in result:
dev_name = table.records[0]["dev_name"]
df = pd.DataFrame([], columns=["Time", str(dev_name) + " Value"])
print("dev_name: ", dev_name)
for record in table.records:
# print(record["dev_name"])
value = record.get_value()
if value.is_integer():
pass
else:
value = np.nan

df = df.append(
pd.DataFrame([[record.get_time(), value]], columns=["Time", str(dev_name) + " Value"]),
ignore_index=True)

df['Time'] = pd.to_datetime(df['Time'])
df.index = df['Time']
del df['Time']
# print(df[:11])
df_interpol = df.resample(samplerange).mean()
# print(df_interpol[:11])

for x in df:
df_interpol[x] = df_interpol[x].interpolate(method="quadratic")

if df_final.empty:
df_final = df_interpol
else:
df_final = pd.concat([df_final, df_interpol], axis=1)
# print(df_interpol)

return df_final

Binary file added dist/influxdb-data-processor-0.0.4.tar.gz
Binary file not shown.
Binary file not shown.
5 changes: 3 additions & 2 deletions influxdbDataProcessor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from influxdbDataProcessor.processor import processcsvdata
from influxdbDataProcessor.processor import processarraydata
from influxdbDataProcessor.processor import processCsvData
from influxdbDataProcessor.processor import processArrayData
from influxdbDataProcessor.processor import processData
62 changes: 60 additions & 2 deletions influxdbDataProcessor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import scipy as sp

def processcsvdata():
def processCsvData():

print("Token: ")
token = input()
Expand Down Expand Up @@ -95,7 +95,7 @@ def processcsvdata():
return df_final


def processarraydata(mea,field):
def processArrayData(mea,field):

print("Token: ")
token = input()
Expand Down Expand Up @@ -171,3 +171,61 @@ def processarraydata(mea,field):

return df_final

def processData(token,url,org,bucket,samplerange,length,mea,field):

client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)

query_api = client.query_api()

df_final = pd.DataFrame([])

print(len(mea))
for i in range(len(mea)):
query = ' from(bucket:"' + bucket + '")\
|> range(start: -' + length + ')\
|> filter(fn:(r) => r._measurement == "' + mea[i] + '")\
|> filter(fn:(r) => r._field == "' + field[i] + '" )'

print("Measurement : ", mea[i])
print("field : ", field[i])

result = query_api.query(org=org, query=query)
if len(result):
for table in result:
dev_name = table.records[0]["dev_name"]
df = pd.DataFrame([], columns=["Time", str(dev_name) + " Value"])
print("dev_name: ", dev_name)
for record in table.records:
# print(record["dev_name"])
value = record.get_value()
if value.is_integer():
pass
else:
value = np.nan

df = df.append(
pd.DataFrame([[record.get_time(), value]], columns=["Time", str(dev_name) + " Value"]),
ignore_index=True)

df['Time'] = pd.to_datetime(df['Time'])
df.index = df['Time']
del df['Time']
# print(df[:11])
df_interpol = df.resample(samplerange).mean()
# print(df_interpol[:11])

for x in df:
df_interpol[x] = df_interpol[x].interpolate(method="quadratic")

if df_final.empty:
df_final = df_interpol
else:
df_final = pd.concat([df_final, df_interpol], axis=1)
# print(df_interpol)

return df_final

37 changes: 29 additions & 8 deletions influxdb_data_processor.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: influxdb-data-processor
Version: 0.0.3
Version: 0.0.4
Summary: A data processor for data in influxDB
Home-page: UNKNOWN
Author: Pikacent (Chai Wen Xuan)
Expand All @@ -23,12 +23,12 @@ Description:
```python
import pandas as pd

from influxdbDataProcessor.processor import processcsvdata
from influxdbDataProcessor.processor import processCsvData

df = processcsvdata()
df = processCsvData()
```

#### Required input:
#### User will be required to input:
1. Token
2. influxDb url
3. Organization
Expand All @@ -38,27 +38,48 @@ Description:
7. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s')

#### CSV file format:
- Only two column ("Measurement", "Field")
- Contains two column ("Measurement", "Field")


### Processing array data

```python
import pandas as pd

from influxdbDataProcessor.processor import processarraydata
from influxdbDataProcessor.processor import processArrayData

df = processarraydata(measurementArr,fieldArr)
measurementArr = ["Air Conditioner"]
fieldArr = ["Temperature"]

df = processArrayData(measurementArr,fieldArr)
```

#### Required input:
#### User will be required to input:
1. Token
2. influxDb url
3. Organization
4. Bucket name
5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s')
6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s')

### Ready input by user before calling function
```python
import pandas as pd

from influxdbDataProcessor.processor import processArrayData

df = processData(token,url,org,bucket,samplerange,length,mea,field)
```

#### User will be required to input:
1. Token
2. influxDb url
3. Organization
4. Bucket name
5. Sampling frequency (1 day: '1d', 1hour: '1h', 1 minute: '1t', 1 second: '1s')
6. Data range (1 day: '1d', 1hour: '1h', 1 minute: '1m', 1 second: '1s')
7. MeasurementArr
8. FieldArr



Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
with codecs.open(os.path.join(here, "README.md"), encoding="utf-8") as fh:
long_description = "\n" + fh.read()

VERSION = '0.0.3'
VERSION = '0.0.4'
DESCRIPTION = 'A data processor for data in influxDB'

# Setting up
Expand Down

0 comments on commit 0277bfa

Please sign in to comment.