Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use zmq for storage write #439

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ General settings
Remote port to connect to in order to get console messages (defaults to
``5557``).

* ``data``: integer [optional]
Remote port for data transfers between the client and agent (defaults to
``0`` for a dynamic port assignment).

* ``host``: string [optional]
Remote host name or ip to connect to as a client to interact with the
MTDA agent (defaults to ``localhost``).
Expand Down
38 changes: 18 additions & 20 deletions mtda/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import tempfile
import time
import zmq
import zstandard as zstd

from mtda.main import MultiTenantDeviceAccess
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(self, host=None, session=None, config_files=None,
else:
self._impl = agent
self._agent = agent
self._data = None

if session is None:
HOST = socket.gethostname()
Expand Down Expand Up @@ -115,7 +117,12 @@ def storage_open(self):
while tries > 0:
tries = tries - 1
try:
self._impl.storage_open(self._session)
host = self.remote()
port = self._impl.storage_open(self._session)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect(f'tcp://{host}:{port}')
self._data = socket
return
except Exception:
if tries > 0:
Expand Down Expand Up @@ -201,7 +208,7 @@ def storage_write_image(self, path, callback=None):

try:
# Prepare for download/copy
file.prepare(image_size)
file.prepare(self._data, image_size)

# Copy image to shared storage
file.copy()
Expand Down Expand Up @@ -298,6 +305,7 @@ def __init__(self, path, agent, session, blksz, callback=None):
self._path = path
self._session = session
self._totalread = 0
self._totalsent = 0

def bmap(self, path):
return None
Expand All @@ -324,21 +332,25 @@ def flush(self):
inputsize = self._inputsize
totalread = self._totalread
outputsize = self._outputsize

agent.storage_flush(self._totalsent)
while True:
status, writing, written = agent.storage_status(self._session)
status, writing, written = agent.storage_status()
if callback is not None:
callback(imgname, totalread, inputsize, written, outputsize)
if writing is False:
break
time.sleep(0.5)
self._socket.close()

def path(self):
return self._path

def prepare(self, output_size=None, compression=None):
def prepare(self, socket, output_size=None, compression=None):
compr = self.compression() if compression is None else compression
self._inputsize = self.size()
self._outputsize = output_size
self._socket = socket
# if image is uncompressed, we compress on the fly
if compr == CONSTS.IMAGE.RAW.value:
compr = CONSTS.IMAGE.ZST.value
Expand All @@ -362,22 +374,8 @@ def size(self):
return None

def _write_to_storage(self, data):
max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL)

for _ in range(max_tries):
result = self._agent.storage_write(data, self._session)
if result != 0:
break
time.sleep(CONSTS.STORAGE.RETRY_INTERVAL)

if result > 0:
return result
elif result < 0:
exc = 'write or decompression error from shared storage'
raise IOError(exc)
else:
exc = 'timeout from shared storage'
raise IOError(exc)
self._socket.send(data)
self._totalsent += len(data)


class ImageLocal(ImageFile):
Expand Down
3 changes: 1 addition & 2 deletions mtda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class STORAGE:


class WRITER:
QUEUE_SLOTS = 16
QUEUE_TIMEOUT = 5
RECV_TIMEOUT = 5
READ_SIZE = 1*1024*1024
WRITE_SIZE = 1*1024*1024
Loading
Loading