Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gl 1358 migrate #2001

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions autosubmit/autosubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3144,15 +3144,14 @@ def migrate(experiment_id, offer, pickup, only_remote):
if offer:
Autosubmit._check_ownership(experiment_id, raise_error=True)
migrate.migrate_offer_remote()
if not only_remote: # Local migrate
if not only_remote: # Local migrate
try:
if not Autosubmit.archive(experiment_id, True, True):
raise AutosubmitCritical(f"Error archiving the experiment", 7014)
Log.result("The experiment has been successfully offered.")
except Exception as e:
# todo put the IO error code
raise AutosubmitCritical(f"[LOCAL] Error offering the experiment: {str(e)}\n"
f"Please, try again", 7000)
f"Please, try again", 7075)
migrate.migrate_offer_jobdata()
elif pickup:
Log.info(f'Pickup experiment {experiment_id}')
Expand Down
489 changes: 241 additions & 248 deletions autosubmit/migrate/migrate.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions autosubmit/platforms/ecplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def send_file(self, filename, check=True):
return True

def move_file(self, src, dest, must_exist = False):
src = str(src)
dest = str(dest)
command = "ecaccess-file-move {0}:{1} {0}:{2}".format(self.host,os.path.join(self.remote_log_dir,src) , os.path.join(self.remote_log_dir,dest))
try:
retries = 0
Expand Down
2 changes: 2 additions & 0 deletions autosubmit/platforms/locplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ def move_file(self, src, dest, must_exist=False):
:param must_exist: ignore if file exist or not
:type dest: str
"""
src = str(src)
dest = str(dest)
path_root = ""
try:
path_root = self.get_files_path()
Expand Down
60 changes: 59 additions & 1 deletion autosubmit/platforms/paramiko_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ def move_file(self, src, dest, must_exist=False):
:param must_exist: ignore if file exist or not
:type dest: str
"""
src = str(src)
dest = str(dest)
path_root=""
try:
path_root = self.get_files_path()
Expand Down Expand Up @@ -515,6 +517,61 @@ def move_file(self, src, dest, must_exist=False):
os.path.join(self.get_files_path(), src)), 5001)
return False

def move_folder_rsync(self, src: str, dest: str, retries_limit: int = 25) -> bool:
"""
Perform a remote rsync operation with retries.

:param src: Source directory to sync from.
:param dest: Destination directory to sync to.
:param retries_limit: Maximum number of retries to perform. If it keeps failing, user can prompt the command again.
:return: True if the rsync operation is successful, False otherwise.
"""
finished = False
rsync_retries = 0
if not Path(src).parent:
src = Path(f"{self.get_files_path()}/{src}")

while not finished and rsync_retries < retries_limit:
Log.info(
f"Rsync launched {rsync_retries + 1} times. Can take up to 150 retrials or until all data is transferred")
finished = self._attempt_rsync(src, dest)
if not finished:
rsync_retries += 1
self._handle_rsync_failure(src)

if not finished or rsync_retries >= retries_limit:
Log.error(f"Rsync operation failed after {rsync_retries} retries")
return finished

def _attempt_rsync(self, src: str, dest: str) -> bool:
"""
Attempt to perform rsync and check for errors.

:param src: Source directory to sync from.
:param dest: Destination directory to sync to.
:return: True if rsync is successful, False otherwise.
"""
try:
self.send_command(f"rsync --timeout=3600 --bwlimit=20000 -aqz --remove-source-files {src} {dest}")
self.send_command(f"find {src} -type d -empty -delete")
if self.get_ssh_output_err() == "" or "no such file or directory" not in self.get_ssh_output_err().lower():
return True
except BaseException as e:
Log.debug(f"{str(e)}")
return False

def _handle_rsync_failure(self, src: str):
"""
Handle rsync failure by checking for specific error messages and cleaning up.

:param src: Source directory to sync from.
"""
if self.get_ssh_output_err() == "" or any(keyword in self.get_ssh_output_err().lower() for keyword in
["warning: rsync", "closed", "broken pipe", "directory has vanished"]):
return
self.send_command(f"find {src} -depth -type d -empty -delete")
Log.result(f"Empty dirs on {src} have been successfully deleted")

def submit_job(self, job, script_name, hold=False, export="none"):
"""
Submit a job from a given job object.
Expand Down Expand Up @@ -1434,12 +1491,13 @@ def check_remote_log_dir(self):

def check_absolute_file_exists(self, src):
try:
if self._ftpChannel.stat(src):
if self._ftpChannel.stat(str(src)):
return True
else:
return False
except Exception:
return False

class ParamikoPlatformException(Exception):
"""
Exception raised from HPC queues
Expand Down
2 changes: 2 additions & 0 deletions docs/source/troubleshooting/error-codes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ team in Git.
+------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 7074 | Profiling process failed | You can find more detailed information in the logs, as well as hints to solve the problem |
+------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 7075 | I/O error | Check that your filesystem is not full or has any other issue. |
+------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

.. note::
Please submit an issue to the Autosubmit team if you have not found your error
Expand Down
6 changes: 6 additions & 0 deletions docs/source/userguide/manage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ Migrate file example: $expid/conf/migrate.yml
Example for a RES account to BSC account the tmp folder must have rwx|rwx|--- permissions.
The temporary directory must be in the same filesystem.

.. warning:: Autosubmit will use rsync to copy the data during the offer as a last resort.

.. warning:: Autosubmit will use rsync to transfer the files' ownership from the former remote platform to the newer one during the pickup.

When rsync is activated, it will try to move the data up to 25 tries afterward, if some data is still not moved, you can perform ``autosubmit migrate -o or -p $expid`` again.

User A, To offer the experiment:
::

Expand Down
41 changes: 29 additions & 12 deletions test/unit/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

from test.unit.utils.common import create_database, init_expid


@pytest.mark.skip('This test requires a running SSH server, with password-less authentication')
# TODO: Write the tests without the class and self ( to do after the transition to github)
# TODO: Isolate the tests ( to do after the transition to github) rsync one works if only it is launched
@pytest.mark.skip("Pipeline needs to allow to connect to itself through ssh")
class TestMigrate:

@pytest.fixture(scope='class')
def migrate_tmpdir(self, tmpdir_factory):
folder = tmpdir_factory.mktemp(f'migrate_tests')
folder = tmpdir_factory.mktemp('migrate_tests')
os.mkdir(folder.join('scratch'))
os.mkdir(folder.join('migrate_tmp_dir'))
file_stat = os.stat(f"{folder.strpath}")
Expand Down Expand Up @@ -54,7 +55,7 @@ def migrate_tmpdir(self, tmpdir_factory):
os.environ['AUTOSUBMIT_CONFIGURATION'] = str(folder.join('autosubmitrc'))
create_database(str(folder.join('autosubmitrc')))
assert "tests.db" in [Path(f).name for f in folder.listdir()]
init_expid(str(folder.join('autosubmitrc')), platform='pytest-local',create=False)
init_expid(str(folder.join('autosubmitrc')), platform='pytest-local', create=False, test_type='test')
assert "t000" in [Path(f).name for f in folder.listdir()]
return folder

Expand Down Expand Up @@ -124,56 +125,72 @@ def migrate_prepare_test_conf(self, prepare_migrate, migrate_remote_only):

def test_migrate_conf_good_config(self, migrate_prepare_test_conf):
# Test OK
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])
as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["TEMP_DIR"] = ""
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

def test_migrate_no_platforms(self, migrate_prepare_test_conf):
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf.misc_data["PLATFORMS"] = {}
with pytest.raises(AutosubmitCritical):
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

def test_migrate_no_scratch_dir(self, migrate_prepare_test_conf):
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["SCRATCH_DIR"] = ""
with pytest.raises(AutosubmitCritical):
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

def test_migrate_no_project(self, migrate_prepare_test_conf):
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["PROJECT"] = ""
with pytest.raises(AutosubmitCritical):
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

def test_migrate_no_same_user(self, migrate_prepare_test_conf):
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["SAME_USER"] = False
with pytest.raises(AutosubmitCritical):
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

def test_migrate_no_user(self, migrate_prepare_test_conf):
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["USER"] = ""
with pytest.raises(AutosubmitCritical):
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

def test_migrate_no_host(self, migrate_prepare_test_conf):
as_conf, original, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf, _, platforms, migrate_remote_only = migrate_prepare_test_conf
as_conf.misc_data["PLATFORMS"]["PYTEST-LOCAL"]["HOST"] = ""
with pytest.raises(AutosubmitCritical):
migrate_remote_only.check_migrate_config(as_conf, platforms, as_conf.misc_data["PLATFORMS"])

# TODO: parametrize the test with the one below, but right now it's not working due not being well isolated ( to do after the transition to github)
def test_migrate_remote(self, migrate_remote_only, migrate_tmpdir):
# Expected behavior: migrate everything from scratch/whatever to scratch/whatever_new
assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=True)
assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=False)
assert "dummy data" == migrate_tmpdir.join(
f'scratch/whatever/{migrate_tmpdir.owner}/t000/real_data/dummy_symlink').read()
migrate_remote_only.migrate_offer_remote()
assert migrate_tmpdir.join('migrate_tmp_dir/t000').check(dir=True)
migrate_remote_only.migrate_pickup()
assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=False)
assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=True)
assert "dummy data" == migrate_tmpdir.join(
f'scratch/whatever_new/{migrate_tmpdir.owner}/t000/real_data/dummy_symlink').read()

def test_migrate_remote_rsync(self, migrate_remote_only, migrate_tmpdir, mocker):
# Expected behavior: migrate everything from scratch/whatever to scratch/whatever_new
assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=True)
assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=False)
assert "dummy data" == migrate_tmpdir.join(
f'scratch/whatever/{migrate_tmpdir.owner}/t000/real_data/dummy_symlink').read()
mocker.patch('autosubmit.platforms.paramiko_platform.ParamikoPlatform.move_file', return_value=False)
migrate_remote_only.migrate_offer_remote()
assert migrate_tmpdir.join(f'migrate_tmp_dir/t000').check(dir=True)
assert migrate_tmpdir.join('migrate_tmp_dir/t000').check(dir=True)
assert not migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=True)
migrate_remote_only.migrate_pickup()
assert migrate_tmpdir.join(f'scratch/whatever/{migrate_tmpdir.owner}/t000').check(dir=False)
assert migrate_tmpdir.join(f'scratch/whatever_new/{migrate_tmpdir.owner}/t000').check(dir=True)
Expand Down