diff --git a/.travis-data/test_daemon.py b/.travis-data/test_daemon.py index 3c73f7514b..161ca78fb0 100644 --- a/.travis-data/test_daemon.py +++ b/.travis-data/test_daemon.py @@ -24,7 +24,6 @@ def print_daemon_log(): def have_finished(pks): finished_list = [load_node(pk).has_finished() for pk in pks] - print {pk: (load_node(pk).get_state(), load_node(pk).has_finished()) for pk in pks} num_finished = len([_ for _ in finished_list if _]) print "{}/{} finished".format(num_finished, len(finished_list)) return not (False in finished_list) diff --git a/aiida/backends/sqlalchemy/__init__.py b/aiida/backends/sqlalchemy/__init__.py index 9d674ee9a0..ba3b68d156 100644 --- a/aiida/backends/sqlalchemy/__init__.py +++ b/aiida/backends/sqlalchemy/__init__.py @@ -8,11 +8,19 @@ # For further information please visit http://www.aiida.net # ########################################################################### - +# The next two serve as 'global' variables, set in the load_dbenv +# call. They are properly reset upon forking. engine = None scopedsessionclass = None def get_scoped_session(): + """ + Return a scoped session (according to SQLAlchemy docs, + this returns always the same object within a thread, and + a different object in a different thread. + Moreover, since we update the scopedsessionclass upon + forking, also forks have different session objects. + """ if scopedsessionclass is None: s = None else: diff --git a/aiida/backends/sqlalchemy/globalsettings.py b/aiida/backends/sqlalchemy/globalsettings.py index 8030441d4a..3065291c7a 100644 --- a/aiida/backends/sqlalchemy/globalsettings.py +++ b/aiida/backends/sqlalchemy/globalsettings.py @@ -14,15 +14,9 @@ from aiida.backends.sqlalchemy.models.settings import DbSetting from sqlalchemy.orm.exc import NoResultFound +from aiida.backends.sqlalchemy import get_scoped_session -def get_session(): - """ - Return the global session for SQLA - """ - import aiida.backends.sqlalchemy - return aiida.backends.sqlalchemy.get_scoped_session() - def set_global_setting(key, value, description=None): """ Set a global setting in the DbSetting table (therefore, stored at the DB @@ -39,7 +33,7 @@ def del_global_setting(key): :raise KeyError: if the setting does not exist in the DB """ try: - setting = get_session().query(DbSetting).filter_by(key=key).one() + setting = get_scoped_session().query(DbSetting).filter_by(key=key).one() setting.delete() except NoResultFound: raise KeyError("No global setting with key={}".format(key)) @@ -59,7 +53,7 @@ def get_global_setting(key): try: return get_value_of_sub_field( - key, lambda given_key: get_session().query(DbSetting).filter_by( + key, lambda given_key: get_scoped_session().query(DbSetting).filter_by( key=given_key).one().getvalue()) except NoResultFound: raise KeyError("No global setting with key={}".format(key)) @@ -78,7 +72,7 @@ def get_global_setting_description(key): validate_key(key) try: - return (get_session().query(DbSetting).filter_by(key=key). + return (get_scoped_session().query(DbSetting).filter_by(key=key). one().get_description()) except NoResultFound: raise KeyError("No global setting with key={}".format(key)) @@ -91,7 +85,7 @@ def table_check_test(): """ from sqlalchemy.engine import reflection from aiida.backends import sqlalchemy as sa - inspector = reflection.Inspector.from_engine(get_session().bind) + inspector = reflection.Inspector.from_engine(get_scoped_session().bind) if 'db_dbsetting' not in inspector.get_table_names(): raise KeyError("No table found") diff --git a/aiida/backends/sqlalchemy/models/node.py b/aiida/backends/sqlalchemy/models/node.py index c2adf4bf2f..561799058f 100644 --- a/aiida/backends/sqlalchemy/models/node.py +++ b/aiida/backends/sqlalchemy/models/node.py @@ -33,9 +33,6 @@ from aiida.common.datastructures import calc_states -# Magic to make the most recent state given in DbCalcState an attribute -# of the DbNode (None if node is not a calculation) - class DbCalcState(Base): __tablename__ = "db_dbcalcstate" @@ -50,7 +47,6 @@ class DbCalcState(Base): ) dbnode = relationship( 'DbNode', backref=backref('dbstates', passive_deletes=True), - #order_by='DbCalcState.time' ) state = Column(ChoiceType((_, _) for _ in calc_states), index=True) @@ -260,6 +256,9 @@ def __str__(self): @hybrid_property def state(self): + """ + Return the most recent state from DbCalcState + """ if not self.id: return None all_states = DbCalcState.query.filter(DbCalcState.dbnode_id == self.id).all() @@ -270,7 +269,10 @@ def state(self): @state.expression def state(cls): - + """ + Return the expression to get the most recent state from DbCalcState, + to be used in queries + """ subq = select( [ DbCalcState.dbnode_id.label('dbnode_id'), @@ -290,7 +292,6 @@ def state(cls): label('laststate') - class DbLink(Base): __tablename__ = "db_dblink" diff --git a/aiida/backends/sqlalchemy/querybuilder_sqla.py b/aiida/backends/sqlalchemy/querybuilder_sqla.py index 2764028ab7..0e70cf84ce 100644 --- a/aiida/backends/sqlalchemy/querybuilder_sqla.py +++ b/aiida/backends/sqlalchemy/querybuilder_sqla.py @@ -316,8 +316,8 @@ def get_filter_expr( def _get_filter_expr_from_column(self, operator, value, column): - # Label is used because is what is returned for the - # 'state' column by the hybrid_column + # Label is used because it is what is returned for the + # 'state' column by the hybrid_column construct if not isinstance(column, (Cast, InstrumentedAttribute, Label)): raise TypeError( 'column ({}) {} is not a valid column'.format( diff --git a/aiida/backends/sqlalchemy/tests/generic.py b/aiida/backends/sqlalchemy/tests/generic.py index 745915fb53..c63e8c1d6f 100644 --- a/aiida/backends/sqlalchemy/tests/generic.py +++ b/aiida/backends/sqlalchemy/tests/generic.py @@ -43,16 +43,16 @@ def test_deletion(self): _ = JobCalculation(**calc_params).store() - #print "Node stored with pk:", _.dbnode.pk + session = aiida.backends.sqlalchemy.get_scoped_session() # This should fail, because there is at least a calculation # using this computer (the one created just above) try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() with self.assertRaises(InvalidOperation): delete_computer(self.computer) finally: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() class TestGroupsSqla(AiidaTestCase): diff --git a/aiida/backends/sqlalchemy/tests/nodes.py b/aiida/backends/sqlalchemy/tests/nodes.py index 35576c2393..48799ba662 100644 --- a/aiida/backends/sqlalchemy/tests/nodes.py +++ b/aiida/backends/sqlalchemy/tests/nodes.py @@ -165,6 +165,7 @@ def test_load_nodes(self): from aiida.orm import load_node from aiida.common.exceptions import NotExistent import aiida.backends.sqlalchemy + from aiida.backends.sqlalchemy import get_scoped_session a = Node() a.store() @@ -173,41 +174,43 @@ def test_load_nodes(self): self.assertEquals(a.pk, load_node(node_id=a.uuid).pk) self.assertEquals(a.pk, load_node(pk=a.pk).pk) self.assertEquals(a.pk, load_node(uuid=a.uuid).pk) + + session = get_scoped_session() try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() with self.assertRaises(ValueError): load_node(node_id=a.pk, pk=a.pk) finally: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() with self.assertRaises(ValueError): load_node(pk=a.pk, uuid=a.uuid) finally: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() with self.assertRaises(ValueError): load_node(pk=a.uuid) finally: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() with self.assertRaises(ValueError): load_node(uuid=a.pk) finally: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() with self.assertRaises(ValueError): load_node() finally: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() def test_multiple_node_creation(self): """ @@ -227,17 +230,19 @@ def test_multiple_node_creation(self): node_uuid = get_new_uuid() DbNode(user=user, uuid=node_uuid, type=None) + session = aiida.backends.sqlalchemy.get_scoped_session() + # Query the session before commit - res = aiida.backends.sqlalchemy.get_scoped_session().query(DbNode.uuid).filter( + res = session.query(DbNode.uuid).filter( DbNode.uuid == node_uuid).all() self.assertEqual(len(res), 0, "There should not be any nodes with this" "UUID in the session/DB.") # Commit the transaction - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.commit() # Check again that the node is not in the DB - res = aiida.backends.sqlalchemy.get_scoped_session().query(DbNode.uuid).filter( + res = session.query(DbNode.uuid).filter( DbNode.uuid == node_uuid).all() self.assertEqual(len(res), 0, "There should not be any nodes with this" "UUID in the session/DB.") @@ -247,20 +252,20 @@ def test_multiple_node_creation(self): # Create a new node but now add it to the session node_uuid = get_new_uuid() node = DbNode(user=user, uuid=node_uuid, type=None) - aiida.backends.sqlalchemy.get_scoped_session().add(node) + session.add(node) # Query the session before commit - res = aiida.backends.sqlalchemy.get_scoped_session().query(DbNode.uuid).filter( + res = session.query(DbNode.uuid).filter( DbNode.uuid == node_uuid).all() self.assertEqual(len(res), 1, "There should be a node in the session/DB with the " "UUID {}".format(node_uuid)) # Commit the transaction - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.commit() # Check again that the node is in the db - res = aiida.backends.sqlalchemy.get_scoped_session().query(DbNode.uuid).filter( + res = session.query(DbNode.uuid).filter( DbNode.uuid == node_uuid).all() self.assertEqual(len(res), 1, "There should be a node in the session/DB with the " diff --git a/aiida/backends/sqlalchemy/tests/schema.py b/aiida/backends/sqlalchemy/tests/schema.py index bc9d32dd15..95514e30ac 100644 --- a/aiida/backends/sqlalchemy/tests/schema.py +++ b/aiida/backends/sqlalchemy/tests/schema.py @@ -130,9 +130,10 @@ def test_User_node_1(self): self.assertIsNone(dbu1.id) self.assertIsNone(dbn1.id) + session = aiida.backends.sqlalchemy.get_scoped_session() # Add only the node and commit - aiida.backends.sqlalchemy.get_scoped_session().add(dbn1) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(dbn1) + session.commit() # Check that a pk has been assigned, which means that things have # been flushed into the database @@ -162,13 +163,15 @@ def test_User_node_2(self): self.assertIsNone(dbu1.id) self.assertIsNone(dbn1.id) + session = aiida.backends.sqlalchemy.get_scoped_session() + # Catch all the SQLAlchemy warnings generated by the following code with warnings.catch_warnings(): warnings.simplefilter("ignore", category=sa_exc.SAWarning) # Add only the user and commit - aiida.backends.sqlalchemy.get_scoped_session().add(dbu1) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(dbu1) + session.commit() # Check that a pk has been assigned (or not), which means that things # have been flushed into the database @@ -199,9 +202,11 @@ def test_User_node_3(self): self.assertIsNone(dbn1.id) self.assertIsNone(dbn2.id) + session = aiida.backends.sqlalchemy.get_scoped_session() + # Add only first node and commit - aiida.backends.sqlalchemy.get_scoped_session().add(dbn1) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(dbn1) + session.commit() # Check for which object a pk has been assigned, which means that # things have been at least flushed into the database @@ -236,9 +241,11 @@ def test_User_node_4(self): self.assertIsNone(dbu1.id) self.assertIsNone(dbn1.id) + session = aiida.backends.sqlalchemy.get_scoped_session() + # Add only first node and commit - aiida.backends.sqlalchemy.get_scoped_session().add(dbn1) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(dbn1) + session.commit() # Check for which object a pk has been assigned, which means that # things have been at least flushed into the database diff --git a/aiida/backends/sqlalchemy/tests/session.py b/aiida/backends/sqlalchemy/tests/session.py index b275d49ab3..203a168577 100644 --- a/aiida/backends/sqlalchemy/tests/session.py +++ b/aiida/backends/sqlalchemy/tests/session.py @@ -36,6 +36,9 @@ class TestSessionSqla(AiidaTestCase): def set_connection(self, expire_on_commit=True): # Creating a sessionmaker with the desired parameters + ## Note: to check if this is still correct with the new + ## way of managing connections and sessions in SQLA... + ## For instance, we should use probably a scopedsession wrapper Session = sessionmaker(expire_on_commit=expire_on_commit) aiida.backends.sqlalchemy.sessionfactory = Session( bind=self._AiidaTestCase__backend_instance.connection) @@ -45,8 +48,9 @@ def set_connection(self, expire_on_commit=True): aiida.backends.sqlalchemy.get_scoped_session().expunge_all() def drop_connection(self): - aiida.backends.sqlalchemy.get_scoped_session().expunge_all() - aiida.backends.sqlalchemy.get_scoped_session().close() + session = aiida.backends.sqlalchemy.get_scoped_session() + session.expunge_all() + session.close() aiida.backends.sqlalchemy.sessionfactory = None def test_session_update_and_expiration_1(self): @@ -60,9 +64,11 @@ def test_session_update_and_expiration_1(self): self.set_connection(expire_on_commit=True) + session = aiida.backends.sqlalchemy.get_scoped_session() + user = User(email=get_configured_user_email()) - aiida.backends.sqlalchemy.get_scoped_session().add(user._dbuser) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(user._dbuser) + session.commit() defaults = dict(name='localhost', hostname='localhost', @@ -70,13 +76,13 @@ def test_session_update_and_expiration_1(self): scheduler_type='pbspro', workdir='/tmp/aiida') computer = Computer(**defaults) - aiida.backends.sqlalchemy.get_scoped_session().add(computer._dbcomputer) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(computer._dbcomputer) + session.commit() code = Code() code.set_remote_computer_exec((computer, '/x.x')) - aiida.backends.sqlalchemy.get_scoped_session().add(code.dbnode) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(code.dbnode) + session.commit() self.drop_connection() @@ -90,11 +96,13 @@ def test_session_update_and_expiration_2(self): from aiida.orm.code import Code from aiida.orm.user import User + session = aiida.backends.sqlalchemy.get_scoped_session() + self.set_connection(expire_on_commit=True) user = User(email=get_configured_user_email()) - aiida.backends.sqlalchemy.get_scoped_session().add(user._dbuser) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(user._dbuser) + session.commit() defaults = dict(name='localhost', hostname='localhost', @@ -121,9 +129,11 @@ def test_session_update_and_expiration_3(self): self.set_connection(expire_on_commit=False) + session = aiida.backends.sqlalchemy.get_scoped_session() + user = User(email=get_configured_user_email()) - aiida.backends.sqlalchemy.get_scoped_session().add(user._dbuser) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(user._dbuser) + session.commit() defaults = dict(name='localhost', hostname='localhost', @@ -131,13 +141,13 @@ def test_session_update_and_expiration_3(self): scheduler_type='pbspro', workdir='/tmp/aiida') computer = Computer(**defaults) - aiida.backends.sqlalchemy.get_scoped_session().add(computer._dbcomputer) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(computer._dbcomputer) + session.commit() code = Code() code.set_remote_computer_exec((computer, '/x.x')) - aiida.backends.sqlalchemy.get_scoped_session().add(code.dbnode) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(code.dbnode) + session.commit() self.drop_connection() @@ -152,9 +162,11 @@ def test_session_update_and_expiration_4(self): from aiida.orm.code import Code from aiida.orm.user import User + session = aiida.backends.sqlalchemy.get_scoped_session() + user = User(email=get_configured_user_email()) - aiida.backends.sqlalchemy.get_scoped_session().add(user._dbuser) - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.add(user._dbuser) + session.commit() defaults = dict(name='localhost', hostname='localhost', diff --git a/aiida/backends/sqlalchemy/transition_06dj_to_07sqla.py b/aiida/backends/sqlalchemy/transition_06dj_to_07sqla.py index 0ca53986cc..0f4123a04c 100644 --- a/aiida/backends/sqlalchemy/transition_06dj_to_07sqla.py +++ b/aiida/backends/sqlalchemy/transition_06dj_to_07sqla.py @@ -128,19 +128,21 @@ def select_from_key(key, d): def modify_link_table(): - with sa.get_scoped_session().begin(subtransactions=True): + session = sa.get_scoped_session() + + with session.begin(subtransactions=True): print("\nStaring the modification of link table.") - inspector = reflection.Inspector.from_engine(sa.get_scoped_session().bind) + inspector = reflection.Inspector.from_engine(session.bind) table_cols = inspector.get_columns("db_dblink") col_type_list = [_["type"] for _ in table_cols if _["name"] == "type"] if len(col_type_list) == 0: print("Creating link type column") - sa.get_scoped_session().execute('ALTER TABLE db_dblink ADD COLUMN ' + session.execute('ALTER TABLE db_dblink ADD COLUMN ' 'type varchar(255)') else: print("Link type column already exists.") - sa.get_scoped_session().commit() + session.commit() def transition_extras(profile=None, group_size=1000, delete_table=False): @@ -190,12 +192,14 @@ class DbExtra(Base): .format(EXTRAS_COL_NAME, NODE_TABLE_NAME)) return - with sa.get_scoped_session().begin(subtransactions=True): + session = sa.get_scoped_session() + + with session.begin(subtransactions=True): print("Creating columns..") - sa.get_scoped_session().execute('ALTER TABLE db_dbnode ADD COLUMN extras ' + session.execute('ALTER TABLE db_dbnode ADD COLUMN extras ' 'JSONB DEFAULT \'{}\'') from aiida.backends.sqlalchemy.models.node import DbNode - total_nodes = sa.get_scoped_session().query(func.count(DbNode.id)).scalar() + total_nodes = session.query(func.count(DbNode.id)).scalar() total_groups = int(math.ceil(total_nodes/float(group_size))) error = False @@ -213,20 +217,20 @@ class DbExtra(Base): error |= err_ node.extras = attrs - sa.get_scoped_session().add(node) + session.add(node) - sa.get_scoped_session().flush() - sa.get_scoped_session().expunge_all() + session.flush() + session.expunge_all() if error: cont = query_yes_no("There has been some errors during the " "migration. Do you want to continue?", "no") if not cont: - sa.get_scoped_session().rollback() + session.rollback() sys.exit(-1) if delete_table: - sa.get_scoped_session().execute('DROP TABLE db_dbextra') - sa.get_scoped_session().commit() + session.execute('DROP TABLE db_dbextra') + session.commit() print("Migration of extras finished.") @@ -278,12 +282,14 @@ class DbAttribute(Base): .format(ATTR_COL_NAME, NODE_TABLE_NAME)) return - with sa.get_scoped_session().begin(subtransactions=True): + session = sa.get_scoped_session() + + with session.begin(subtransactions=True): print("Creating columns..") - sa.get_scoped_session().execute('ALTER TABLE db_dbnode ADD COLUMN attributes ' + session.execute('ALTER TABLE db_dbnode ADD COLUMN attributes ' 'JSONB DEFAULT \'{}\'') from aiida.backends.sqlalchemy.models.node import DbNode - total_nodes = sa.get_scoped_session().query(func.count(DbNode.id)).scalar() + total_nodes = session.query(func.count(DbNode.id)).scalar() total_groups = int(math.ceil(total_nodes/float(group_size))) error = False @@ -301,12 +307,12 @@ class DbAttribute(Base): error |= err_ node.attributes = attrs - sa.get_scoped_session().add(node) + session.add(node) # Remove the db_dbnode from sqlalchemy, to allow the GC to do its # job. - sa.get_scoped_session().flush() - sa.get_scoped_session().expunge_all() + session.flush() + session.expunge_all() del nodes gc.collect() @@ -315,11 +321,11 @@ class DbAttribute(Base): cont = query_yes_no("There has been some errors during the " "migration. Do you want to continue?", "no") if not cont: - sa.get_scoped_session().rollback() + session.rollback() sys.exit(-1) if delete_table: - sa.get_scoped_session().execute('DROP TABLE db_dbattribute') - sa.get_scoped_session().commit() + session.execute('DROP TABLE db_dbattribute') + session.commit() print("Migration of attributes finished.") @@ -363,14 +369,16 @@ class DbSetting(Base): .format(SETTINGS_VAL_COL_NAME, SETTINGS_TABLE_NAME)) return - with sa.get_scoped_session().begin(subtransactions=True): + session = sa.get_scoped_session() + + with session.begin(subtransactions=True): print("Creating columns..") - sa.get_scoped_session().execute('ALTER TABLE db_dbsetting ADD COLUMN val ' + session.execute('ALTER TABLE db_dbsetting ADD COLUMN val ' 'JSONB DEFAULT \'{}\'') - sa.get_scoped_session().commit() + session.commit() - with sa.get_scoped_session().begin(subtransactions=True): - total_settings = sa.get_scoped_session().query(DbSetting).all() + with session.begin(subtransactions=True): + total_settings = session.query(DbSetting).all() for settings in total_settings: @@ -391,17 +399,17 @@ class DbSetting(Base): settings.val = val flag_modified(settings, "val") - sa.get_scoped_session().flush() + session.flush() - sa.get_scoped_session().commit() + session.commit() - with sa.get_scoped_session().begin(subtransactions=True): + with session.begin(subtransactions=True): for col_name in ["datatype", "tval", "fval", "ival", "bval", "dval"]: sql = ("ALTER TABLE {table} DROP COLUMN {column}") - sa.get_scoped_session().execute(sql.format(table=SETTINGS_TABLE_NAME, + session.execute(sql.format(table=SETTINGS_TABLE_NAME, column=col_name)) - sa.get_scoped_session().commit() + session.commit() print("Migration of settings finished.") @@ -428,7 +436,9 @@ def transition_json_column(profile=None): sql = ("ALTER TABLE {table} ALTER COLUMN {column} TYPE JSONB " "USING {column}::JSONB") - with sa.get_scoped_session().begin(subtransactions=True): + session = sa.get_scoped_session() + + with session.begin(subtransactions=True): for table, col in table_col: table_cols = inspector.get_columns(table) @@ -446,9 +456,9 @@ def transition_json_column(profile=None): print("Changing column {} of table {} in JSON format." .format(table, col)) - sa.get_scoped_session().execute(sql.format(table=table, column=col)) + session.execute(sql.format(table=table, column=col)) - sa.get_scoped_session().commit() + session.commit() def create_gin_index(): @@ -485,24 +495,27 @@ def set_correct_schema_version_and_backend(): from aiida.utils import timezone # Setting the correct backend and schema version SQLA_SCHEMA_VERSION = 0.1 - with sa.get_scoped_session().begin(subtransactions=True): + + session = sa.get_scoped_session() + + with session.begin(subtransactions=True): # Setting manually the correct schema version - sa.get_scoped_session().execute( + session.execute( 'DELETE FROM db_dbsetting WHERE key=\'db|schemaversion\'') - sa.get_scoped_session().execute( + session.execute( 'INSERT INTO db_dbsetting (key, val, description, time) values ' '(\'db|schemaversion\', \'{}\', ' '\'The version of the schema used in this database.\', \'{}\')' .format(SQLA_SCHEMA_VERSION, timezone.datetime.now())) # Setting the correct backend - sa.get_scoped_session().execute('DELETE FROM db_dbsetting WHERE key=\'db|backend\'') - sa.get_scoped_session().execute( + session.execute('DELETE FROM db_dbsetting WHERE key=\'db|backend\'') + session.execute( 'INSERT INTO db_dbsetting (key, val, description, time) values ' '(\'db|backend\', \'"{}"\', ' '\'The backend used to communicate with database.\', \'{}\')' .format(BACKEND_SQLA, timezone.datetime.now())) - sa.get_scoped_session().commit() + session.commit() def transition_db(profile=None, group_size=1000, delete_table=False): diff --git a/aiida/backends/sqlalchemy/utils.py b/aiida/backends/sqlalchemy/utils.py index 57c0ddcef5..e2396560b0 100644 --- a/aiida/backends/sqlalchemy/utils.py +++ b/aiida/backends/sqlalchemy/utils.py @@ -48,14 +48,22 @@ def recreate_after_fork(engine): """ + :param engine: the engine that will be used by the sessionmaker + Callback called after a fork. Not only disposes the engine, but also recreates a new scoped session to use independent sessions in the forked process. """ sa.engine.dispose() sa.scopedsessionclass = scoped_session(sessionmaker(bind=sa.engine, expire_on_commit=True)) - print "after fork", sa.engine, id(sa.engine), sa.scopedsessionclass def reset_session(config): + """ + :param config: the configuration of the profile from the + configuration file + + Resets (global) engine and sessionmaker classes, to create a new one + (or creates a new one from scratch if not already available) + """ from multiprocessing.util import register_after_fork engine_url = ( diff --git a/aiida/daemon/tasks.py b/aiida/daemon/tasks.py index 456f2d0116..1158282835 100644 --- a/aiida/daemon/tasks.py +++ b/aiida/daemon/tasks.py @@ -9,13 +9,10 @@ ########################################################################### from datetime import timedelta -from aiida.backends import settings from aiida.backends.utils import load_dbenv, is_dbenv_loaded from celery import Celery from celery.task import periodic_task -import os - from aiida.backends import settings from aiida.backends.profile import BACKEND_SQLA, BACKEND_DJANGO from aiida.common.setup import AIIDA_CONFIG_FOLDER, DAEMON_SUBDIR @@ -23,8 +20,6 @@ if not is_dbenv_loaded(): load_dbenv(process="daemon") -print "THE BACKEND:", settings.BACKEND - from aiida.common.setup import get_profile_config from aiida.common.exceptions import ConfigurationError from aiida.daemon.timestamps import set_daemon_timestamp,get_last_daemon_timestamp @@ -66,12 +61,6 @@ def submitter(): from aiida.daemon.execmanager import submit_jobs print "aiida.daemon.tasks.submitter: Checking for calculations to submit" - - if settings.BACKEND == BACKEND_SQLA: - from aiida.backends.sqlalchemy import get_scoped_session - s = get_scoped_session() - print 'submitter [SQLA]:', s.hash_key, s, s.connection(), id(s.connection().engine), s.bind, id(s.bind) - set_daemon_timestamp(task_name='submitter', when='start') submit_jobs() set_daemon_timestamp(task_name='submitter', when='stop') @@ -84,13 +73,6 @@ def submitter(): ) def updater(): from aiida.daemon.execmanager import update_jobs - - if settings.BACKEND == BACKEND_SQLA: - from aiida.backends.sqlalchemy import get_scoped_session - s = get_scoped_session() - print 'updater [SQLA]:', s.hash_key, s, s.connection(), id(s.connection().engine), s.bind, id(s.bind) - - print "aiida.daemon.tasks.update: Checking for calculations to update" set_daemon_timestamp(task_name='updater', when='start') update_jobs() @@ -105,12 +87,6 @@ def updater(): ) def retriever(): from aiida.daemon.execmanager import retrieve_jobs - - if settings.BACKEND == BACKEND_SQLA: - from aiida.backends.sqlalchemy import get_scoped_session - s = get_scoped_session() - print 'retriever [SQLA]:', s.hash_key, s, s.connection(), id(s.connection().engine), s.bind, id(s.bind) - print "aiida.daemon.tasks.retrieve: Checking for calculations to retrieve" set_daemon_timestamp(task_name='retriever', when='start') retrieve_jobs() @@ -135,13 +111,6 @@ def tick_work(): ) def workflow_stepper(): # daemon for legacy workflow from aiida.daemon.workflowmanager import execute_steps - - if settings.BACKEND == BACKEND_SQLA: - from aiida.backends.sqlalchemy import get_scoped_session - s = get_scoped_session() - print 'submitter [SQLA]:', s.hash_key, s, s.connection(), id(s.connection().engine), s.bind, id(s.bind) - - print "aiida.daemon.tasks.workflowmanager: Checking for workflows to manage" # RUDIMENTARY way to check if this task is already running (to avoid acting # again and again on the same workflow steps) diff --git a/aiida/orm/implementation/sqlalchemy/group.py b/aiida/orm/implementation/sqlalchemy/group.py index 210811559c..191b73dc57 100644 --- a/aiida/orm/implementation/sqlalchemy/group.py +++ b/aiida/orm/implementation/sqlalchemy/group.py @@ -245,6 +245,8 @@ def query(cls, name=None, type_string="", pk=None, uuid=None, nodes=None, name_filters=None, **kwargs): from aiida.orm.implementation.sqlalchemy.node import Node + session = sa.get_scoped_session() + filters = [] if name is not None: @@ -268,7 +270,7 @@ def query(cls, name=None, type_string="", pk=None, uuid=None, nodes=None, # In the case of the Node orm from Sqlalchemy, there is an id # property on it. - sub_query = (sa.get_scoped_session().query(table_groups_nodes).filter( + sub_query = (session.query(table_groups_nodes).filter( table_groups_nodes.c["dbnode_id"].in_( map(lambda n: n.id, nodes)), table_groups_nodes.c["dbgroup_id"] == DbGroup.id @@ -298,15 +300,18 @@ def query(cls, name=None, type_string="", pk=None, uuid=None, nodes=None, pass # TODO SP: handle **kwargs - groups = (sa.get_scoped_session().query(DbGroup.id).filter(*filters) + groups = (session.query(DbGroup.id).filter(*filters) .order_by(DbGroup.id).distinct().all()) return [cls(dbgroup=g[0]) for g in groups] def delete(self): + + session = sa.get_scoped_session() + if self.pk is not None: - sa.get_scoped_session().delete(self._dbgroup) - sa.get_scoped_session().commit() + session.delete(self._dbgroup) + session.commit() new_group = copy(self._dbgroup) make_transient(new_group) diff --git a/aiida/orm/implementation/sqlalchemy/lock.py b/aiida/orm/implementation/sqlalchemy/lock.py index 168ea29743..43814bf997 100644 --- a/aiida/orm/implementation/sqlalchemy/lock.py +++ b/aiida/orm/implementation/sqlalchemy/lock.py @@ -23,10 +23,11 @@ class LockManager(AbstractLockManager): def aquire(self, key, timeout=3600, owner="None"): + session = get_scoped_session() try: - with get_scoped_session().begin(subtransactions=True): + with session.begin(subtransactions=True): dblock = DbLock(key=key, timeout=timeout, owner=owner) - get_scoped_session().add(dblock) + session.add(dblock) return Lock(dblock) @@ -46,19 +47,21 @@ def aquire(self, key, timeout=3600, owner="None"): raise InternalError("Something went wrong, try to keep on.") def clear_all(self): - with get_scoped_session().begin(subtransactions=True): + session = get_scoped_session() + with session.begin(subtransactions=True): DbLock.query.delete() class Lock(AbstractLock): def release(self, owner="None"): - if self.dblock == None: + session = get_scoped_session() + if self.dblock is None: raise InternalError("No dblock present.") try: if self.dblock.owner == owner: - get_scoped_session().delete(self.dblock) - get_scoped_session().commit() + session.delete(self.dblock) + session.commit() self.dblock = None else: raise ModificationNotAllowed("Only the owner can release the lock.") diff --git a/aiida/orm/implementation/sqlalchemy/workflow.py b/aiida/orm/implementation/sqlalchemy/workflow.py index ff2e030dc3..6bdded06ef 100644 --- a/aiida/orm/implementation/sqlalchemy/workflow.py +++ b/aiida/orm/implementation/sqlalchemy/workflow.py @@ -204,9 +204,11 @@ def _increment_version_number_db(self): # every save; moreover in this way I should do the right thing for concurrent writings # I use self._dbnode because this will not do a query to update the node; here I only # need to get its pk + session = sa.get_scoped_session() + self.dbworkflowinstance.nodeversion = DbWorkflow.nodeversion + 1 - sa.get_scoped_session().add(self.dbworkflowinstance) - sa.get_scoped_session().commit() + session.add(self.dbworkflowinstance) + session.commit() @classmethod def query(cls, *args, **kwargs): @@ -528,11 +530,14 @@ def get_subclass_from_dbnode(cls, wf_db): @classmethod def get_subclass_from_pk(cls, pk): import aiida.backends.sqlalchemy + + session = aiida.backends.sqlalchemy.get_scoped_session() + try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() dbworkflowinstance = DbWorkflow.query.filter_by(id=pk).first() except: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() raise if not dbworkflowinstance: @@ -544,11 +549,14 @@ def get_subclass_from_pk(cls, pk): @classmethod def get_subclass_from_uuid(cls, uuid): import aiida.backends.sqlalchemy + + session = aiida.backends.sqlalchemy.get_scoped_session() + try: - aiida.backends.sqlalchemy.get_scoped_session().begin_nested() + session.begin_nested() dbworkflowinstance = DbWorkflow.query.filter_by(uuid=uuid).first() except: - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() raise if not dbworkflowinstance: diff --git a/aiida/orm/importexport.py b/aiida/orm/importexport.py index fb7679418e..05c29014ee 100644 --- a/aiida/orm/importexport.py +++ b/aiida/orm/importexport.py @@ -944,6 +944,9 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): # with transaction.atomic(): # if True import aiida.backends.sqlalchemy + + session = aiida.backends.sqlalchemy.get_scoped_session() + try: foreign_ids_reverse_mappings = {} new_entries = {} @@ -1111,7 +1114,7 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): # Store them all in once; however, the PK are not set in this way... # Model.objects.bulk_create(objects_to_create) if objects_to_create: - aiida.backends.sqlalchemy.get_scoped_session().add_all(objects_to_create) + session.add_all(objects_to_create) # session.commit() # Get back the just-saved entries @@ -1142,7 +1145,7 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): DbCalcState(dbnode_id=new_pk, state=calc_states.IMPORTED)) - aiida.backends.sqlalchemy.get_scoped_session().add_all(imported_states) + session.add_all(imported_states) # session.commit() # models.DbCalcState.objects.bulk_create(imported_states) @@ -1173,7 +1176,7 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): # Needed for fast checks of existing links from aiida.backends.sqlalchemy.models.node import DbLink - existing_links_raw = aiida.backends.sqlalchemy.get_scoped_session().query(DbLink.input, DbLink.output, DbLink.label).all() + existing_links_raw = session.query(DbLink.input, DbLink.output, DbLink.label).all() # existing_links_raw = DbLink.objects.all().values_list( # 'input', 'output', 'label') existing_links_labels = {(l[0], l[1]): l[2] for l in existing_links_raw} @@ -1225,7 +1228,7 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): if links_to_store: if not silent: print " ({} new links...)".format(len(links_to_store)) - aiida.backends.sqlalchemy.get_scoped_session().add_all(links_to_store) + session.add_all(links_to_store) # session.commit() # models.DbLink.objects.bulk_create(links_to_store) else: @@ -1239,7 +1242,7 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): # TODO: cache these to avoid too many queries from aiida.backends.sqlalchemy.models.group import DbGroup from uuid import UUID - group = aiida.backends.sqlalchemy.get_scoped_session().query(DbGroup).filter( + group = session.query(DbGroup).filter( DbGroup.uuid == UUID(groupuuid)).all() # group = DbGroup.objects.get(uuid=groupuuid) @@ -1279,9 +1282,9 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): group = Group(name=group_name, type_string=IMPORTGROUP_TYPE) from aiida.backends.sqlalchemy.models.group import DbGroup - if aiida.backends.sqlalchemy.get_scoped_session().query(DbGroup).filter( + if session.query(DbGroup).filter( DbGroup.name == group._dbgroup.name).count() == 0: - aiida.backends.sqlalchemy.get_scoped_session().add(group._dbgroup) + session.add(group._dbgroup) created = True else: counter += 1 @@ -1289,7 +1292,7 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): # Add all the nodes to the new group # TODO: decide if we want to return the group name from aiida.backends.sqlalchemy.models.node import DbNode - group.add_nodes(aiida.backends.sqlalchemy.get_scoped_session().query(DbNode).filter( + group.add_nodes(session.query(DbNode).filter( DbNode.id.in_(pks_for_group)).distinct().all()) # group.add_nodes(models.DbNode.objects.filter( @@ -1301,10 +1304,10 @@ def import_data_sqla(in_path, ignore_unknown_nodes=False, silent=False): if not silent: print "NO DBNODES TO IMPORT, SO NO GROUP CREATED" - aiida.backends.sqlalchemy.get_scoped_session().commit() + session.commit() except: print "Rolling back" - aiida.backends.sqlalchemy.get_scoped_session().rollback() + session.rollback() raise if not silent: diff --git a/open_source_licenses.txt b/open_source_licenses.txt index 0a616866df..bf6ae7eb34 100644 --- a/open_source_licenses.txt +++ b/open_source_licenses.txt @@ -15,6 +15,9 @@ Python: Fastep: * fastentrypoints.py +wait-for-it: + * .travis-data/wait-for-it.sh + The respective copyright notices follow below. -------------------------------------------------------------------------- @@ -361,3 +364,28 @@ fastentrypoints.py Licence: LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +------------------------------------------------------------------------------- + +wait-for-it License (https://github.com/vishnubob/wait-for-it/blob/master/LICENSE): + +The MIT License (MIT) +Copyright (c) 2016 Giles Hall + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/setup_requirements.py b/setup_requirements.py index c6f1849055..67a0f007d6 100644 --- a/setup_requirements.py +++ b/setup_requirements.py @@ -25,6 +25,11 @@ # SQLA for us... probably switch to using rabbitmq? # Note that however this requires a new server process. 'celery==3.1.25', + # The next two are internal dependencies of celery, but since + # in the past we had version mismatch problems, we freeze them + # as well + 'billiard==3.3.0.23', + 'amqp==1.4.9', 'anyjson==0.3.3', 'supervisor==3.1.3', 'meld3==1.0.0',