forked from angadsingh/wiz_light
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaio.py
268 lines (216 loc) · 8.42 KB
/
aio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""Wrapper for asyncio available at https://github.com/jsbronder/asyncio-dgram """
"""Module couldn't be installed on my environment due to older Python version hence copied the module directly"""
''' Original author Justin Bronder @jsbronder '''
import asyncio
import socket
import warnings
__all__ = ("bind", "connect", "from_socket")
class DatagramStream:
"""
Representation of a Datagram socket attached via either bind() or
connect() returned to consumers of this module. Provides simple
wrappers around sending and receiving bytes.
Due to the stateless nature of datagram protocols, errors are not
immediately available to this class at the point an action was performed
that will generate it. Rather, successive calls will raise exceptions if
there are any. Checking for exceptions can be done explicitly by using the
exception property.
For instance, failure to connect to a remote endpoint will not be noticed
until some point in time later, at which point ConnectionRefused will be
raised.
"""
def __init__(self, transport, recvq, excq, drained):
"""
@param transport - asyncio transport
@param recvq - asyncio queue that gets populated by the
DatagramProtocol with received datagrams.
@param excq - asyncio queue that gets populated with any errors
detected by the DatagramProtocol.
@param drained - asyncio event that is unset when writing is
paused and set otherwise.
"""
self._transport = transport
self._recvq = recvq
self._excq = excq
self._drained = drained
def __del__(self):
self._transport.close()
@property
def exception(self):
"""
If the underlying protocol detected an error, raise the first
unconsumed exception it noticed, otherwise returns None.
"""
try:
exc = self._excq.get_nowait()
raise exc
except asyncio.queues.QueueEmpty:
pass
@property
def sockname(self):
"""
The associated socket's own address
"""
return self._transport.get_extra_info("sockname")
@property
def peername(self):
"""
The address the associated socket is connected to
"""
return self._transport.get_extra_info("peername")
@property
def socket(self):
"""
The socket instance used by the stream. In python <3.8 this is a
socket.socket instance, after it is an asyncio.TransportSocket
instance.
"""
return self._transport.get_extra_info("socket")
def close(self):
"""
Close the underlying transport.
"""
self._transport.close()
async def send(self, data, addr=None):
"""
@param data - bytes to send
@param addr - remote address to send data to, if unspecified then the
underlying socket has to have been been connected to a
remote address previously.
"""
_ = self.exception
self._transport.sendto(data, addr)
await self._drained.wait()
async def recv(self):
"""
Receive data on the local socket.
@return - tuple of the bytes received and the address (ip, port) that
the data was received from.
"""
_ = self.exception
data, addr = await self._recvq.get()
return data, addr
class DatagramServer(DatagramStream):
"""
Datagram socket bound to an address on the local machine.
"""
async def send(self, data, addr):
"""
@param data - bytes to send
@param addr - remote address to send data to.
"""
await super().send(data, addr)
class DatagramClient(DatagramStream):
"""
Datagram socket connected to a remote address.
"""
async def send(self, data):
"""
@param data - bytes to send
"""
await super().send(data)
class Protocol(asyncio.DatagramProtocol):
"""
asyncio.DatagramProtocol for feeding received packets into the
Datagram{Client,Server} which handles converting the lower level callback
based asyncio into higher level coroutines.
"""
def __init__(self, recvq, excq, drained):
"""
@param recvq - asyncio.Queue for new datagrams
@param excq - asyncio.Queue for exceptions
@param drained - asyncio.Event set when the write buffer is below the
high watermark.
"""
self._recvq = recvq
self._excq = excq
self._drained = drained
self._drained.set()
# Transports are connected at the time a connection is made.
self._transport = None
def connection_made(self, transport):
if self._transport is not None:
old_peer = self._transport.get_extra_info("peername")
new_peer = transport.get_extra_info("peername")
warnings.warn(
"Reinitializing transport connection from %s to %s", old_peer, new_peer
)
self._transport = transport
def connection_lost(self, exc):
if exc is not None:
self._excq.put_nowait(exc)
if self._transport is not None:
self._transport.close()
self._transport = None
def datagram_received(self, data, addr):
self._recvq.put_nowait((data, addr))
def error_received(self, exc):
self._excq.put_nowait(exc)
def pause_writing(self):
self._drained.clear()
super().pause_writing()
def resume_writing(self):
self._drained.set()
super().resume_writing()
async def bind(addr):
"""
Bind a socket to a local address for datagrams. The socket will be either
AF_INET or AF_INET6 depending upon the type of address specified.
@param addr - For AF_INET or AF_INET6, a tuple with the the host and port to
to bind; port may be set to 0 to get any free port.
@return - A DatagramServer instance
"""
loop = asyncio.get_event_loop()
recvq = asyncio.Queue()
excq = asyncio.Queue()
drained = asyncio.Event()
transport, protocol = await loop.create_datagram_endpoint(
lambda: Protocol(recvq, excq, drained), local_addr=addr
)
return DatagramServer(transport, recvq, excq, drained)
async def connect(addr):
"""
Connect a socket to a remote address for datagrams. The socket will be
either AF_INET or AF_INET6 depending upon the type of host specified.
@param addr - For AF_INET or AF_INET6, a tuple with the the host and port to
to connect to.
@return - A DatagramClient instance
"""
loop = asyncio.get_event_loop()
recvq = asyncio.Queue()
excq = asyncio.Queue()
drained = asyncio.Event()
transport, protocol = await loop.create_datagram_endpoint(
lambda: Protocol(recvq, excq, drained), remote_addr=addr
)
return DatagramClient(transport, recvq, excq, drained)
async def from_socket(sock):
"""
Create a DatagramStream from a socket. This is meant to be used in cases
where the defaults set by `bind()` and `connect()` are not desired and/or
sufficient. If `socket.connect()` was previously called on the socket,
then an instance of DatagramClient will be returned, otherwise an instance
of DatagramServer.
@param sock - socket to use in the DatagramStream.
@return - A DatagramClient for connected sockets, otherwise a
DatagramServer.
"""
loop = asyncio.get_event_loop()
recvq = asyncio.Queue()
excq = asyncio.Queue()
drained = asyncio.Event()
if sock.family not in (socket.AF_INET, socket.AF_INET6):
raise TypeError(
"socket must be either %s or %s" % (socket.AF_INET, socket.AF_INET6)
)
if sock.type != socket.SOCK_DGRAM:
raise TypeError("socket must be %s" % (socket.SOCK_DGRAM,))
transport, protocol = await loop.create_datagram_endpoint(
lambda: Protocol(recvq, excq, drained), sock=sock
)
if transport.get_extra_info("peername") is not None:
# Workaround transport ignoring the peer address of the socket.
transport._address = transport.get_extra_info("peername")
return DatagramClient(transport, recvq, excq, drained)
else:
return DatagramServer(transport, recvq, excq, drained)