-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathKafka.py
136 lines (118 loc) · 4.81 KB
/
Kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from kafka import KafkaConsumer
import json
import pandas as pd
from src.Logger.AppLogger import Applogger
from configurations.index import read_section
class Connection:
'''
Apache-Kafka connector:
- Connects to one topic in a broker.
- default configs provided in configuration/config.ini.
----------------------------------
Developed by: Pranjal Ghildiyal
'''
def __init__(self):
status, logging_location= read_section('LOCATIONS')
self.data= pd.DataFrame()
if not status:
print('Could not find section. {}'.format(logging_location))
return None
global lg
global logger
logger= Applogger(logging_location['kafka'])
lg= logger.logger
self.__connection_status = False
self.__import_status = False
status, self.configs = read_section('KAFKA')
if not status:
print('Could not find section: KAFKA')
lg.error('Could not find section: KAFKA')
return None
lg.info('Initiating Connection with Kafka.')
def connect(self, broker, topic):
try:
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=[broker],
value_deserializer=Connection.deserializer
)
lg.info(f'Connected to Kafka topic: {topic}')
self.topic= topic
self.__connection_status = True
return self.__connection_status
except Exception as e:
lg.error(f'Failed to connect to Kafka: {str(e)}')
return self.__connection_status
def get_schema(self):
status= False
if self.__connection_status:
try:
for message in self.consumer:
status= True
all_keys= Connection.flatten_nested_dictionary_keys(message.value)
return (status, all_keys)
except Exception as e:
lg.error('Error while trying to get schema!')
status= False
return (status, 'Error while trying to get schema!')
else:
lg.error('No Connections found!')
return (status, 'No Connections found!')
def import_data(self, datetime_column, value_column, update_callback):
if self.__connection_status == True:
try:
for message in self.consumer:
self.__import_status= True
req= dict(message.value)
req= Connection.flatten_nested_dictionary(message.value)
mframe = pd.DataFrame(req, index=[0])[[datetime_column, value_column]].rename(columns={datetime_column: 'DATETIME', value_column: 'value'})
yield mframe
# status= update_callback(mframe)
# if not status:
# return self.__import_status
except Exception as e:
lg.error(f'Error while consuming messages: {str(e)}')
else:
lg.warning('Not connected to Kafka. Call connect() first.')
def shutdown(self):
logger.shutdown()
def disconnect(self):
if self.__connection_status:
self.consumer.close()
lg.info('Disconnected from Kafka.')
self.__connection_status = False
self.__import_status= False
else:
lg.warning('Not connected to Kafka.')
@staticmethod
def deserializer(value):
return json.loads(value.decode('utf-8'))
@staticmethod
def flatten_nested_dictionary(nested_dict, separator='.'):
flat_dict = {}
for key, value in nested_dict.items():
if isinstance(value, dict):
flat_subdict = Connection.flatten_nested_dictionary(value, separator=separator)
for subkey, subvalue in flat_subdict.items():
flat_dict[f"{key}{separator}{subkey}"] = subvalue
else:
flat_dict[key] = value
return flat_dict
@staticmethod
def flatten_nested_dictionary_keys(nested_dict, separator='.'):
flat_dict = {}
for key, value in nested_dict.items():
if isinstance(value, dict):
flat_subdict = Connection.flatten_nested_dictionary(value, separator=separator)
for subkey, subvalue in flat_subdict.items():
flat_dict[f"{key}{separator}{subkey}"] = subvalue
else:
flat_dict[key] = value
return list(flat_dict.keys())
def __str__(self):
return {
'Bootstrap Servers': self.configs['bootstrap_servers'],
'Topic:': self.topic,
'Connection Status': self.__connection_status,
'Import Status': self.__import_status
}