Skip to content

Commit

Permalink
Fix SUBSCRIPTIONS_HISTORY integrity error
Browse files Browse the repository at this point in the history
  • Loading branch information
serfon committed Jul 9, 2024
1 parent 98b40a9 commit 7e8e3e9
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 6 deletions.
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ALEMBIC_REVISION = 'a08fa8de1545' # the current alembic head revision
ALEMBIC_REVISION = 'b5493606bbf5' # the current alembic head revision
8 changes: 4 additions & 4 deletions lib/rucio/core/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sqlalchemy.exc import IntegrityError, NoResultFound, StatementError
from sqlalchemy.orm import aliased

from rucio.common.config import config_get
from rucio.common.config import config_get_bool
from rucio.common.exception import RucioException, SubscriptionDuplicate, SubscriptionNotFound
from rucio.db.sqla import models
from rucio.db.sqla.constants import SubscriptionState
Expand Down Expand Up @@ -78,7 +78,7 @@ def add_subscription(name: str,
:returns: The subscriptionid
"""
try:
keep_history = config_get('subscriptions', 'keep_history')
keep_history = config_get_bool('subscriptions', 'keep_history')
except (NoOptionError, NoSectionError, RuntimeError):
keep_history = False

Expand Down Expand Up @@ -146,7 +146,7 @@ def update_subscription(name: str,
:raises: SubscriptionNotFound if subscription is not found
"""
try:
keep_history = config_get('subscriptions', 'keep_history')
keep_history = config_get_bool('subscriptions', 'keep_history')
except (NoOptionError, NoSectionError, RuntimeError):
keep_history = False
values = {'state': SubscriptionState.UPDATED}
Expand Down Expand Up @@ -196,7 +196,7 @@ def update_subscription(name: str,
comments=subscription.comments,
last_processed=subscription.last_processed,
expired_at=subscription.expired_at,
updated_at=subscription.updated_at,
updated_at=datetime.datetime.utcnow(),
created_at=subscription.created_at)
subscription_history.save(session=session)
except NoResultFound:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

''' Fix primary key for subscription_history '''


from alembic import context
from alembic.op import create_primary_key, drop_constraint

# Alembic revision identifiers
revision = 'b5493606bbf5'
down_revision = 'a08fa8de1545'


def upgrade():
'''
Upgrade the database to this revision
'''
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
drop_constraint(constraint_name='SUBSCRIPTIONS_PK', table_name='subscriptions_history', type_='primary')
create_primary_key('SUBSCRIPTIONS_HISTORY_PK', 'subscriptions_history', ['id', 'updated_at'])


def downgrade():
'''
Downgrade the database to the previous revision
'''
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
drop_constraint(constraint_name='SUBSCRIPTIONS_HISTORY_PK', table_name='subscriptions_history', type_='primary')
create_primary_key('SUBSCRIPTIONS_PK', 'subscriptions_history', ['id', 'updated_at'])
2 changes: 1 addition & 1 deletion lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ class SubscriptionHistory(BASE, ModelBase):
retroactive: Mapped[bool] = mapped_column(Boolean(name='SUBS_HISTORY_RETROACTIVE_CHK', create_constraint=True),
default=False)
expired_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
_table_args = (PrimaryKeyConstraint('id', 'updated_at', name='SUBSCRIPTIONS_PK'),)
_table_args = (PrimaryKeyConstraint('id', 'updated_at', name='SUBSCRIPTIONS_HISTORY_PK'),)


class Token(BASE, ModelBase):
Expand Down
41 changes: 41 additions & 0 deletions tests/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from json.decoder import JSONDecodeError

import pytest
from sqlalchemy import select

from rucio.common.constants import RseAttr
from rucio.common.exception import InvalidObject, SubscriptionDuplicate, SubscriptionNotFound
Expand All @@ -30,6 +31,7 @@
from rucio.core.rule import add_rule
from rucio.core.scope import add_scope
from rucio.daemons.transmogrifier.transmogrifier import get_subscriptions, run
from rucio.db.sqla import models
from rucio.db.sqla.constants import AccountType, DIDType, RuleState
from rucio.gateway.subscription import add_subscription, get_subscription_by_id, list_subscription_rule_states, list_subscriptions, update_subscription
from rucio.tests.common import auth, did_name_generator, headers, rse_name_generator
Expand Down Expand Up @@ -86,6 +88,45 @@ def test_create_and_update_and_list_subscription(self, vo, rse_factory):
with pytest.raises(SubscriptionNotFound):
get_subscription_by_id(sub_id, vo=vo)

@pytest.mark.parametrize("file_config_mock", [
{"overrides": [('subscriptions', 'keep_history', 'True')]},
], indirect=True)
def test_keep_history_subscription(self, vo,rse_factory, db_session, file_config_mock):
""" SUBSCRIPTION (CLIENT): Test the keep_history oprtion """
subscription_name = uuid()
rse1, _ = rse_factory.make_mock_rse()
rse2, _ = rse_factory.make_mock_rse()
rse_expression = '%s|%s' % (rse1, rse2)
sub_id = add_subscription(name=subscription_name,
account='root',
filter_={'project': self.projects, 'datatype': ['AOD', ], 'excluded_pattern': self.pattern1, 'account': ['tier0', ]},
replication_rules=[{'rse_expression': rse_expression, 'copies': 2, 'activity': self.activity}],
lifetime=100000,
retroactive=False,
dry_run=False,
comments='This is a comment',
issuer='root',
vo=vo)
result = [sub['id'] for sub in list_subscriptions(name=subscription_name, account='root', vo=vo)]
assert sub_id == result[0]
result = update_subscription(name=subscription_name, account='root', metadata={'filter': {'project': ['toto', ]}}, issuer='root', vo=vo)
assert result is None
result = list_subscriptions(name=subscription_name, account='root', vo=vo)
sub = []
for res in result:
sub.append(res)
assert len(sub) == 1
assert loads(sub[0]['filter'])['project'][0] == 'toto'
# Check that the history was properly updated
stmt = select(models.SubscriptionHistory)
history = [hist for hist in db_session.execute(stmt)]
print(history)
stmt = select(models.SubscriptionHistory).where(models.SubscriptionHistory.name == subscription_name).order_by(models.SubscriptionHistory.updated_at)
history = [hist[0] for hist in db_session.execute(stmt)]
assert len(history) == 2
assert (loads(history[0]['filter'])['project']) == self.projects
assert (loads(history[1]['filter'])['project']) == ['toto', ]

@pytest.mark.noparallel(reason='uses pre-defined RSE')
def test_create_list_subscription_by_id(self, vo, rse_factory):
""" SUBSCRIPTION (Gateway): Test the creation of a new subscription and list it by id """
Expand Down

0 comments on commit 7e8e3e9

Please sign in to comment.