forked from Apex-technologies/PyApex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCommon.py
93 lines (82 loc) · 3.17 KB
/
Common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def Send(Connexion, Command):
from PyApex.Constantes import APXXXX_ERROR_ARGUMENT_TYPE, APXXXX_ERROR_BADCOMMAND
from PyApex.Errors import ApexError
from sys import exit
from socket import timeout
if not isinstance(Command, str):
Connexion.close()
raise ApexError(APXXXX_ERROR_ARGUMENT_TYPE, "Command")
try:
Connexion.send(Command.encode('utf-8'))
except timeout:
Connexion.close()
raise ApexError(APXXXX_ERROR_BADCOMMAND, Command)
def Receive(Connexion, ByteNumber=1024):
from PyApex.Constantes import APXXXX_ERROR_ARGUMENT_TYPE, APXXXX_ERROR_COMMUNICATION
from PyApex.Errors import ApexError
from sys import exit
from socket import timeout
if not isinstance(ByteNumber, int):
Connexion.close()
raise ApexError(APXXXX_ERROR_ARGUMENT_TYPE, "ByteNumber")
try:
data = Connexion.recv(ByteNumber)
except timeout:
Connexion.close()
raise ApexError(APXXXX_ERROR_COMMUNICATION, Connexion.getsockname()[0])
else:
return data.decode('utf-8')
# Python Socket Receive Large Amount of Data
# The line data += packet can make receiving VERY slow for large messages.
# It's much better to use data = bytearray() and then data.extend(packet).
def recvall(Connexion, ByteNumber):
# # Helper function to recv n bytes or return None if EOF is hit
# data = bytearray()
# while len(data) < n:
# packet = Connexion.recv(n - len(data))
# if not packet:
# return None
# data.extend(packet)
# return data
from PyApex.Constantes import APXXXX_ERROR_ARGUMENT_TYPE, APXXXX_ERROR_COMMUNICATION
from PyApex.Errors import ApexError
# from sys import exit
from socket import timeout
if not isinstance(ByteNumber, int):
Connexion.close()
raise ApexError(APXXXX_ERROR_ARGUMENT_TYPE, "ByteNumber")
try:
# Helper function to recv n bytes or return None if EOF is hit
data = bytearray()
while len(data) < ByteNumber:
packet = Connexion.recv(ByteNumber - len(data))
if not packet:
return None
data.extend(packet)
except timeout:
Connexion.close()
raise ApexError(APXXXX_ERROR_COMMUNICATION, Connexion.getsockname()[0])
else:
return data
def ReceiveUntilChar(Connexion, EndCharacter = "\n"):
from PyApex.Constantes import APXXXX_ERROR_ARGUMENT_TYPE, APXXXX_ERROR_COMMUNICATION
from PyApex.Errors import ApexError
from sys import exit
from socket import timeout
if not isinstance(EndCharacter, str):
Connexion.close()
raise ApexError(APXXXX_ERROR_ARGUMENT_TYPE, "EndCharacter")
try:
data_total = ""
while True:
data = (Connexion.recv(1024)).decode('utf-8')
if data.find(EndCharacter) >= 0:
data_total += data[:data.find(EndCharacter)] + EndCharacter
break
else:
data_total += data
except timeout:
Connexion.close()
raise ApexError(APXXXX_ERROR_COMMUNICATION, Connexion.getsockname()[0])
else:
return data_total