From 7e8e3e9580d8dca53cbc6d726fc461a581444a97 Mon Sep 17 00:00:00 2001 From: serfon Date: Tue, 9 Jul 2024 14:22:16 +0200 Subject: [PATCH] Fix SUBSCRIPTIONS_HISTORY integrity error --- lib/rucio/alembicrevision.py | 2 +- lib/rucio/core/subscription.py | 8 ++-- ...ix_primary_key_for_subscription_history.py | 41 +++++++++++++++++++ lib/rucio/db/sqla/models.py | 2 +- tests/test_subscription.py | 41 +++++++++++++++++++ 5 files changed, 88 insertions(+), 6 deletions(-) create mode 100644 lib/rucio/db/sqla/migrate_repo/versions/b5493606bbf5_fix_primary_key_for_subscription_history.py diff --git a/lib/rucio/alembicrevision.py b/lib/rucio/alembicrevision.py index d29a3e23698..d1ba1f17e53 100644 --- a/lib/rucio/alembicrevision.py +++ b/lib/rucio/alembicrevision.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -ALEMBIC_REVISION = 'a08fa8de1545' # the current alembic head revision +ALEMBIC_REVISION = 'b5493606bbf5' # the current alembic head revision diff --git a/lib/rucio/core/subscription.py b/lib/rucio/core/subscription.py index a756be68db8..ce8b16a4827 100644 --- a/lib/rucio/core/subscription.py +++ b/lib/rucio/core/subscription.py @@ -23,7 +23,7 @@ from sqlalchemy.exc import IntegrityError, NoResultFound, StatementError from sqlalchemy.orm import aliased -from rucio.common.config import config_get +from rucio.common.config import config_get_bool from rucio.common.exception import RucioException, SubscriptionDuplicate, SubscriptionNotFound from rucio.db.sqla import models from rucio.db.sqla.constants import SubscriptionState @@ -78,7 +78,7 @@ def add_subscription(name: str, :returns: The subscriptionid """ try: - keep_history = config_get('subscriptions', 'keep_history') + keep_history = config_get_bool('subscriptions', 'keep_history') except (NoOptionError, NoSectionError, RuntimeError): keep_history = False @@ -146,7 +146,7 @@ def update_subscription(name: str, :raises: SubscriptionNotFound if subscription is not found """ try: - keep_history = config_get('subscriptions', 'keep_history') + keep_history = config_get_bool('subscriptions', 'keep_history') except (NoOptionError, NoSectionError, RuntimeError): keep_history = False values = {'state': SubscriptionState.UPDATED} @@ -196,7 +196,7 @@ def update_subscription(name: str, comments=subscription.comments, last_processed=subscription.last_processed, expired_at=subscription.expired_at, - updated_at=subscription.updated_at, + updated_at=datetime.datetime.utcnow(), created_at=subscription.created_at) subscription_history.save(session=session) except NoResultFound: diff --git a/lib/rucio/db/sqla/migrate_repo/versions/b5493606bbf5_fix_primary_key_for_subscription_history.py b/lib/rucio/db/sqla/migrate_repo/versions/b5493606bbf5_fix_primary_key_for_subscription_history.py new file mode 100644 index 00000000000..e8878c7c020 --- /dev/null +++ b/lib/rucio/db/sqla/migrate_repo/versions/b5493606bbf5_fix_primary_key_for_subscription_history.py @@ -0,0 +1,41 @@ +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +''' Fix primary key for subscription_history ''' + + +from alembic import context +from alembic.op import create_primary_key, drop_constraint + +# Alembic revision identifiers +revision = 'b5493606bbf5' +down_revision = 'a08fa8de1545' + + +def upgrade(): + ''' + Upgrade the database to this revision + ''' + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + drop_constraint(constraint_name='SUBSCRIPTIONS_PK', table_name='subscriptions_history', type_='primary') + create_primary_key('SUBSCRIPTIONS_HISTORY_PK', 'subscriptions_history', ['id', 'updated_at']) + + +def downgrade(): + ''' + Downgrade the database to the previous revision + ''' + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + drop_constraint(constraint_name='SUBSCRIPTIONS_HISTORY_PK', table_name='subscriptions_history', type_='primary') + create_primary_key('SUBSCRIPTIONS_PK', 'subscriptions_history', ['id', 'updated_at']) diff --git a/lib/rucio/db/sqla/models.py b/lib/rucio/db/sqla/models.py index b4f5362e468..a6a83b824fb 100644 --- a/lib/rucio/db/sqla/models.py +++ b/lib/rucio/db/sqla/models.py @@ -1509,7 +1509,7 @@ class SubscriptionHistory(BASE, ModelBase): retroactive: Mapped[bool] = mapped_column(Boolean(name='SUBS_HISTORY_RETROACTIVE_CHK', create_constraint=True), default=False) expired_at: Mapped[Optional[datetime]] = mapped_column(DateTime) - _table_args = (PrimaryKeyConstraint('id', 'updated_at', name='SUBSCRIPTIONS_PK'),) + _table_args = (PrimaryKeyConstraint('id', 'updated_at', name='SUBSCRIPTIONS_HISTORY_PK'),) class Token(BASE, ModelBase): diff --git a/tests/test_subscription.py b/tests/test_subscription.py index 48f0d5025bb..5237b373f49 100644 --- a/tests/test_subscription.py +++ b/tests/test_subscription.py @@ -17,6 +17,7 @@ from json.decoder import JSONDecodeError import pytest +from sqlalchemy import select from rucio.common.constants import RseAttr from rucio.common.exception import InvalidObject, SubscriptionDuplicate, SubscriptionNotFound @@ -30,6 +31,7 @@ from rucio.core.rule import add_rule from rucio.core.scope import add_scope from rucio.daemons.transmogrifier.transmogrifier import get_subscriptions, run +from rucio.db.sqla import models from rucio.db.sqla.constants import AccountType, DIDType, RuleState from rucio.gateway.subscription import add_subscription, get_subscription_by_id, list_subscription_rule_states, list_subscriptions, update_subscription from rucio.tests.common import auth, did_name_generator, headers, rse_name_generator @@ -86,6 +88,45 @@ def test_create_and_update_and_list_subscription(self, vo, rse_factory): with pytest.raises(SubscriptionNotFound): get_subscription_by_id(sub_id, vo=vo) + @pytest.mark.parametrize("file_config_mock", [ + {"overrides": [('subscriptions', 'keep_history', 'True')]}, + ], indirect=True) + def test_keep_history_subscription(self, vo,rse_factory, db_session, file_config_mock): + """ SUBSCRIPTION (CLIENT): Test the keep_history oprtion """ + subscription_name = uuid() + rse1, _ = rse_factory.make_mock_rse() + rse2, _ = rse_factory.make_mock_rse() + rse_expression = '%s|%s' % (rse1, rse2) + sub_id = add_subscription(name=subscription_name, + account='root', + filter_={'project': self.projects, 'datatype': ['AOD', ], 'excluded_pattern': self.pattern1, 'account': ['tier0', ]}, + replication_rules=[{'rse_expression': rse_expression, 'copies': 2, 'activity': self.activity}], + lifetime=100000, + retroactive=False, + dry_run=False, + comments='This is a comment', + issuer='root', + vo=vo) + result = [sub['id'] for sub in list_subscriptions(name=subscription_name, account='root', vo=vo)] + assert sub_id == result[0] + result = update_subscription(name=subscription_name, account='root', metadata={'filter': {'project': ['toto', ]}}, issuer='root', vo=vo) + assert result is None + result = list_subscriptions(name=subscription_name, account='root', vo=vo) + sub = [] + for res in result: + sub.append(res) + assert len(sub) == 1 + assert loads(sub[0]['filter'])['project'][0] == 'toto' + # Check that the history was properly updated + stmt = select(models.SubscriptionHistory) + history = [hist for hist in db_session.execute(stmt)] + print(history) + stmt = select(models.SubscriptionHistory).where(models.SubscriptionHistory.name == subscription_name).order_by(models.SubscriptionHistory.updated_at) + history = [hist[0] for hist in db_session.execute(stmt)] + assert len(history) == 2 + assert (loads(history[0]['filter'])['project']) == self.projects + assert (loads(history[1]['filter'])['project']) == ['toto', ] + @pytest.mark.noparallel(reason='uses pre-defined RSE') def test_create_list_subscription_by_id(self, vo, rse_factory): """ SUBSCRIPTION (Gateway): Test the creation of a new subscription and list it by id """