-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtelegram_client.py
500 lines (387 loc) · 16.2 KB
/
telegram_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
"""Telegram Client"""
import logging
import os
import shutil
import json
import hashlib
from telethon import TelegramClient, functions
from telethon.errors import (
PhoneNumberUnoccupiedError,
PhoneCodeInvalidError,
PhoneCodeExpiredError,
SessionPasswordNeededError,
FloodWaitError,
PasswordHashInvalidError,
RPCError,
)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class Errors:
"""
Custom exceptions.
"""
PhoneNumberUnoccupiedError = PhoneNumberUnoccupiedError
PhoneCodeInvalidError = PhoneCodeInvalidError
PhoneCodeExpiredError = PhoneCodeExpiredError
SessionPasswordNeededError = SessionPasswordNeededError
FloodWaitError = FloodWaitError
PasswordHashInvalidError = PasswordHashInvalidError
RPCError = RPCError
class SessionExistError(Exception):
"""
Exception raised when a duplicate session is detected.
Args:
message (str): An optional message to include in the exception.
Defaults to "Duplicate sessions".
Attributes:
message (str): The message included in the exception.
Methods:
__init__(self, message="Duplicate sessions"): Initializes the S
essionExistError object with the given message.
"""
def __init__(self, message="Duplicate sessions"):
self.message = message
super().__init__(self.message)
def md5hash(data: str) -> str:
"""
Hashes the given string using the MD5 algorithm.
Args:
data (str): The string to be hashed.
Returns:
str: The resulting MD5 hash in hexadecimal format.
"""
try:
return hashlib.md5(data.encode("utf-8")).hexdigest()
except Exception as error:
raise error
class Methods:
"""
A collection of methods for interacting with the Telegram API.
Args:
identifier (str): The identifier associated with the Telegram account.
Attributes:
api_id (str): The API ID for the Telegram account.
api_hash (str): The API hash for the Telegram account.
phone_number (str): The phone number associated with the Telegram account.
record_filepath (str): The file path for the record of the Telegram account.
record_db_filepath (str): The file path for the database record of the Telegram account.
Raises:
KeyError: If the required environment variables are not set.
"""
def __init__(self, identifier: str) -> None:
"""
Initializes a new instance of the Methods class.
Args:
identifier (str): The identifier associated with the Telegram account.
"""
credentials_path = os.environ.get("TELEGRAM_CREDENTIALS")
records_path = os.environ.get("TELEGRAM_RECORDS")
if not credentials_path:
raise KeyError("TELEGRAM_CREDENTIALS environment variable not set.")
if not records_path:
raise KeyError("TELEGRAM_RECORDS environment variable not set.")
if not os.path.exists(credentials_path):
logger.warning(
"Telegram credentials file not found at %s", credentials_path
)
with open(credentials_path, "r", encoding="utf-8") as file_:
creds = json.load(file_)
self.api_id = creds["api_id"]
self.api_hash = creds["api_hash"]
self.phone_number = identifier
phone_number_hash = md5hash(data=identifier)
self.record_filepath = os.path.join(records_path, phone_number_hash)
self.record_db_filepath = os.path.join(self.record_filepath, phone_number_hash)
def __write_registry__(self, phone_code_hash: str, code: str = None) -> bool:
"""
Write phone code hash and code to registry file in JSON format.
Args:
phone_code_hash (str): Phone code hash to write to registry.
code (str, optional): Code to write to registry. Defaults to None.
Raises:
Exception: If an error occurs while writing to registry file.
Returns:
bool: True if data was written successfully, False otherwise.
"""
try:
# Create dictionary with data to write to registry
data = {"code": code, "phone_code_hash": phone_code_hash}
# Convert dictionary to JSON format
json_data = json.dumps(data)
# Write JSON data to registry file
registry_filepath = os.path.join(self.record_filepath, "registry.json")
with open(registry_filepath, "w", encoding="utf-8") as outfile:
outfile.write(json_data)
return True
except Exception as error:
logger.error("An error occurred while writting registry file.")
raise error
def __read_registry__(self) -> dict:
"""Read the user registry file and return the contents as a dictionary."""
try:
registry_filepath = os.path.join(self.record_filepath, "registry.json")
with open(registry_filepath, "r", encoding="utf-8") as file_:
json_content = json.load(file_)
os.remove(registry_filepath)
logger.debug("- removed user registry file: %s", registry_filepath)
return json_content
except Exception as error:
logger.error("An error occurred while reading registry file.")
raise error
async def authorize(self) -> None:
"""Connects to the Telegram API, creates a user file, and sends an
authorization code request to the specified phone number.
Args:
self: An instance of the Methods class.
Returns:
None.
"""
# Check if user file already exists and create it if not
if not os.path.exists(self.record_filepath):
logger.debug("- creating user file: %s", self.record_filepath)
os.makedirs(self.record_filepath)
else:
logger.debug(
"deleting draft record '%s' and deps ...", self.record_filepath
)
shutil.rmtree(self.record_filepath)
logger.debug("- creating user file: %s", self.record_filepath)
os.makedirs(self.record_filepath)
# Initialize Telethon client and connect to API
client = TelegramClient(
self.record_db_filepath, api_id=self.api_id, api_hash=self.api_hash
)
try:
# open telethon connection
await client.connect()
# Check if session already exists
if await client.is_user_authorized():
logger.error("Session already exists")
raise Errors.SessionExistError()
# Send authorization code request and write phone_code_hash to registry
result = await client.send_code_request(phone=self.phone_number)
self.__write_registry__(phone_code_hash=result.phone_code_hash)
logger.info("- authentication code sent successfully.")
except FloodWaitError as error:
raise error
except Exception as error:
logger.error("An error occurred while authorizing.")
raise error
finally:
# close telethon connection
await client.disconnect()
async def validate(self, code: str) -> dict:
"""Validate the given phone number confirmation code.
Args:
code (str): The phone number confirmation code to validate.
Returns:
dict: A dictionary containing the user's token and profile information.
"""
# Check if user file already exists and create it if not
if not os.path.exists(self.record_filepath):
logger.debug("- creating user file: %s", self.record_filepath)
os.makedirs(self.record_filepath)
# Initialize Telethon client and connect to API
client = TelegramClient(
self.record_db_filepath, api_id=self.api_id, api_hash=self.api_hash
)
try:
# open telethon connection
await client.connect()
registry_data = self.__read_registry__()
# validate code
await client.sign_in(
self.phone_number,
code=code,
phone_code_hash=registry_data["phone_code_hash"],
)
logger.info("- Code validation successful")
# get user profile info
logger.debug("Fetching user's info ...")
user_data = await client.get_me()
return {
"token": self.phone_number,
"profile": {
"name": user_data.first_name,
"unique_id": self.phone_number,
},
}
except PhoneNumberUnoccupiedError as error:
logger.error("%s has no account", self.phone_number)
raise error
except PhoneCodeInvalidError as error:
logger.error("The phone code entered was invalid")
self.__write_registry__(phone_code_hash=registry_data["phone_code_hash"])
raise error
except PhoneCodeExpiredError as error:
logger.error("The confirmation code has expired")
raise error
except SessionPasswordNeededError as error:
logger.error("two-steps verification is enabled and a password is required")
self.__write_registry__(
code=code, phone_code_hash=registry_data["phone_code_hash"]
)
raise error
except FloodWaitError as error:
wait_time = error.seconds
logger.error(
"Flood wait error occurred. Please try again in %s seconds.", wait_time
)
raise error
except Exception as error:
logger.error("An error occurred while validating.")
raise error
finally:
# close telethon connection
await client.disconnect()
async def message(self, recipient: str, text: str) -> bool:
"""
Sends a message to a recipient using the Telegram API.
Args:
recipient (str): The username or phone number of the recipient.
text (str): The text of the message to send.
Returns:
bool: True if the message was sent successfully, False otherwise.
"""
# Initialize Telethon client and connect to API
client = TelegramClient(
self.record_db_filepath, api_id=self.api_id, api_hash=self.api_hash
)
try:
# open telethon connection
await client.connect()
# sent message
logger.debug("sending message to: %s...", recipient)
await client.send_message(recipient, text)
except Exception as error:
logger.error("An error occurred while sending a telegram message.")
raise error
finally:
# close telethon connection
await client.disconnect()
async def invalidate(self, token: str) -> bool:
"""
Revokes access for a Telegram user by logging them out and deleting their local data.
Args:
token (str): The user's access token.
Returns:
bool: True if access was revoked successfully, False otherwise.
"""
phone_number_hash = md5hash(data=token)
self.record_filepath = os.path.join(
os.environ["TELEGRAM_RECORDS"], phone_number_hash
)
self.record_db_filepath = os.path.join(self.record_filepath, phone_number_hash)
# Initialize Telethon client and connect to API
client = TelegramClient(
self.record_db_filepath, api_id=self.api_id, api_hash=self.api_hash
)
try:
# open telethon connection
await client.connect()
# revoke access
logger.debug("revoking %s access ...", self.phone_number)
await client.log_out()
# delete files
logger.debug("deleting files ...")
shutil.rmtree(self.record_filepath)
logger.info("- Successfully revoked access")
return True
except Exception as error:
logger.error("An error occurred while invalidating.\n\n%s", str(error))
return False
finally:
# close telethon connection
await client.disconnect()
async def validate_with_password(self, password: str) -> dict:
"""Validate the given phone number confirmation code with password.
Args:
password (str): The user's password.
Returns:
dict: A dictionary containing the user's token and profile information.
"""
# Initialize Telethon client and connect to API
client = TelegramClient(
self.record_db_filepath, api_id=self.api_id, api_hash=self.api_hash
)
try:
# open telethon connection
await client.connect()
registry_data = self.__read_registry__()
# validate code with password
await client.sign_in(
password=password,
phone_code_hash=registry_data["phone_code_hash"],
)
logger.info("- Code validation with password successful")
# get user profile info
logger.debug("Fetching user's info ...")
user_data = await client.get_me()
# Return user profile info and token
return {
"token": self.phone_number,
"profile": {
"name": user_data.first_name,
"unique_id": self.phone_number,
},
}
except PhoneNumberUnoccupiedError as error:
logger.error("%s has no account", self.phone_number)
raise error
except PasswordHashInvalidError as error:
logger.error("The password (and thus its hash value) entered is invalid")
self.__write_registry__(phone_code_hash=registry_data["phone_code_hash"])
raise error
except FloodWaitError as error:
wait_time = error.seconds
logger.error(
"Flood wait error occurred. Please try again in %s seconds.", wait_time
)
raise error
except Exception as error:
logger.error("An error occurred while validating with password")
raise error
finally:
# close telethon connection
await client.disconnect()
async def contacts(self) -> list:
"""Fetches all telegram contacts.
Returns:
A list of dictionaries containing the following keys:
- id (int): The unique identifier of the contact
- phone (str): The phone number of the contact
- username (str): The username of the contact (if available)
- first_name (str): The first name of the contact
- last_name (str): The last name of the contact (if available)
"""
# Initialize Telethon client and connect to API
client = TelegramClient(
self.record_db_filepath, api_id=self.api_id, api_hash=self.api_hash
)
try:
# open telethon connection
await client.connect()
# fetch telegram contacts
contacts = []
logger.debug("Fetching telegram contacts for %s ...", self.phone_number)
result = await client(functions.contacts.GetContactsRequest(hash=0))
for user in result.users:
contacts.append(
{
"id": user.id,
"phone": user.phone,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name,
}
)
logger.info("- Successfully fetched all telegram contacts")
return contacts
except Exception as error:
logger.error("An error occurred while fetching contacts.")
raise error
finally:
# close telethon connection
await client.disconnect()