Skip to content

Commit

Permalink
Fix SUBSCRIPTIONS_HISTORY integrity error
Browse files Browse the repository at this point in the history
  • Loading branch information
serfon committed Jul 9, 2024
1 parent 98b40a9 commit 974e2f8
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 7 deletions.
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ALEMBIC_REVISION = 'a08fa8de1545' # the current alembic head revision
ALEMBIC_REVISION = 'b5493606bbf5' # the current alembic head revision
12 changes: 7 additions & 5 deletions lib/rucio/core/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sqlalchemy.exc import IntegrityError, NoResultFound, StatementError
from sqlalchemy.orm import aliased

from rucio.common.config import config_get
from rucio.common.config import config_get_bool
from rucio.common.exception import RucioException, SubscriptionDuplicate, SubscriptionNotFound
from rucio.db.sqla import models
from rucio.db.sqla.constants import SubscriptionState
Expand Down Expand Up @@ -78,7 +78,7 @@ def add_subscription(name: str,
:returns: The subscriptionid
"""
try:
keep_history = config_get('subscriptions', 'keep_history')
keep_history = config_get_bool('subscriptions', 'keep_history')
except (NoOptionError, NoSectionError, RuntimeError):
keep_history = False

Expand Down Expand Up @@ -109,7 +109,9 @@ def add_subscription(name: str,
lifetime=new_subscription.lifetime,
retroactive=new_subscription.retroactive,
policyid=new_subscription.policyid,
comments=new_subscription.comments)
comments=new_subscription.comments,
created_at=datetime.datetime.utcnow(),
updated_at=datetime.datetime.utcnow())
try:
new_subscription.save(session=session)
if keep_history:
Expand Down Expand Up @@ -146,7 +148,7 @@ def update_subscription(name: str,
:raises: SubscriptionNotFound if subscription is not found
"""
try:
keep_history = config_get('subscriptions', 'keep_history')
keep_history = config_get_bool('subscriptions', 'keep_history')
except (NoOptionError, NoSectionError, RuntimeError):
keep_history = False
values = {'state': SubscriptionState.UPDATED}
Expand Down Expand Up @@ -196,7 +198,7 @@ def update_subscription(name: str,
comments=subscription.comments,
last_processed=subscription.last_processed,
expired_at=subscription.expired_at,
updated_at=subscription.updated_at,
updated_at=datetime.datetime.utcnow(),
created_at=subscription.created_at)
subscription_history.save(session=session)
except NoResultFound:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

''' Fix primary key for subscription_history '''


from alembic import context
from alembic.op import create_primary_key, drop_constraint

# Alembic revision identifiers
revision = 'b5493606bbf5'
down_revision = 'a08fa8de1545'


def upgrade():
'''
Upgrade the database to this revision
'''
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
drop_constraint(constraint_name='SUBSCRIPTIONS_PK', table_name='subscriptions_history', type_='primary')
create_primary_key('SUBSCRIPTIONS_HISTORY_PK', 'subscriptions_history', ['id', 'updated_at'])


def downgrade():
'''
Downgrade the database to the previous revision
'''
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
drop_constraint(constraint_name='SUBSCRIPTIONS_HISTORY_PK', table_name='subscriptions_history', type_='primary')
create_primary_key('SUBSCRIPTIONS_PK', 'subscriptions_history', ['id', 'updated_at'])
2 changes: 1 addition & 1 deletion lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ class SubscriptionHistory(BASE, ModelBase):
retroactive: Mapped[bool] = mapped_column(Boolean(name='SUBS_HISTORY_RETROACTIVE_CHK', create_constraint=True),
default=False)
expired_at: Mapped[Optional[datetime]] = mapped_column(DateTime)
_table_args = (PrimaryKeyConstraint('id', 'updated_at', name='SUBSCRIPTIONS_PK'),)
_table_args = (PrimaryKeyConstraint('id', 'updated_at', name='SUBSCRIPTIONS_HISTORY_PK'),)


class Token(BASE, ModelBase):
Expand Down
48 changes: 48 additions & 0 deletions tests/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from datetime import datetime
from json import loads
from json.decoder import JSONDecodeError

import pytest
from sqlalchemy import select

from rucio.common.constants import RseAttr
from rucio.common.exception import InvalidObject, SubscriptionDuplicate, SubscriptionNotFound
Expand All @@ -30,11 +32,19 @@
from rucio.core.rule import add_rule
from rucio.core.scope import add_scope
from rucio.daemons.transmogrifier.transmogrifier import get_subscriptions, run
from rucio.db.sqla import models
from rucio.db.sqla.constants import AccountType, DIDType, RuleState
from rucio.db.sqla.session import read_session
from rucio.gateway.subscription import add_subscription, get_subscription_by_id, list_subscription_rule_states, list_subscriptions, update_subscription
from rucio.tests.common import auth, did_name_generator, headers, rse_name_generator


@read_session
def get_subscription_history(subscription_name, *, session=None):
stmt = select(models.SubscriptionHistory).where(models.SubscriptionHistory.name == subscription_name).order_by(models.SubscriptionHistory.updated_at)
return [hist[0] for hist in session.execute(stmt)]


class TestSubscriptionCoreGateway:
projects = ['data12_900GeV', 'data12_8TeV', 'data13_900GeV', 'data13_8TeV']
pattern1 = r'(_tid|physics_(Muons|JetTauEtmiss|Egamma)\..*\.ESD|express_express(?!.*NTUP|.*\.ESD|.*RAW)|(physics|express)(?!.*NTUP).* \
Expand Down Expand Up @@ -86,6 +96,44 @@ def test_create_and_update_and_list_subscription(self, vo, rse_factory):
with pytest.raises(SubscriptionNotFound):
get_subscription_by_id(sub_id, vo=vo)

@pytest.mark.dirty
@pytest.mark.parametrize("file_config_mock", [
{"overrides": [('subscriptions', 'keep_history', 'True')]},
], indirect=True)
def test_keep_history_subscription(self, vo, rse_factory, file_config_mock):
""" SUBSCRIPTION (CLIENT): Test the keep_history oprtion """
subscription_name = uuid()
rse1, _ = rse_factory.make_mock_rse()
rse2, _ = rse_factory.make_mock_rse()
rse_expression = '%s|%s' % (rse1, rse2)
sub_id = add_subscription(name=subscription_name,
account='root',
filter_={'project': self.projects, 'datatype': ['AOD', ], 'excluded_pattern': self.pattern1, 'account': ['tier0', ]},
replication_rules=[{'rse_expression': rse_expression, 'copies': 2, 'activity': self.activity}],
lifetime=100000,
retroactive=False,
dry_run=False,
comments='This is a comment',
issuer='root',
vo=vo)
result = [sub['id'] for sub in list_subscriptions(name=subscription_name, account='root', vo=vo)]
assert sub_id == result[0]
time.sleep(1)
result = update_subscription(name=subscription_name, account='root', metadata={'filter': {'project': ['toto', ]}}, issuer='root', vo=vo)
assert result is None
result = list_subscriptions(name=subscription_name, account='root', vo=vo)
sub = []
for res in result:
sub.append(res)
assert len(sub) == 1
assert loads(sub[0]['filter'])['project'][0] == 'toto'
# Check that the history was properly updated
history = get_subscription_history(subscription_name)
assert len(history) == 2
assert history[0]['updated_at'] < history[1]['updated_at']
assert (loads(history[0]['filter'])['project']) == self.projects
assert (loads(history[1]['filter'])['project']) == ['toto', ]

@pytest.mark.noparallel(reason='uses pre-defined RSE')
def test_create_list_subscription_by_id(self, vo, rse_factory):
""" SUBSCRIPTION (Gateway): Test the creation of a new subscription and list it by id """
Expand Down

0 comments on commit 974e2f8

Please sign in to comment.