-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #16932 from jdavcs/dev_sa20_fix21
SQLAlchemy 2.0 upgrades (part 5)
- Loading branch information
Showing
4 changed files
with
95 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -356,6 +356,22 @@ def test_collections_in_library_folders(self): | |
# assert len(loaded_dataset_collection.datasets) == 2 | ||
# assert loaded_dataset_collection.collection_type == "pair" | ||
|
||
def test_dataset_action_tuples(self): | ||
u = model.User(email="foo", password="foo") | ||
h1 = model.History(user=u) | ||
hda1 = model.HistoryDatasetAssociation(history=h1, create_dataset=True, sa_session=self.model.session) | ||
hda2 = model.HistoryDatasetAssociation(history=h1, create_dataset=True, sa_session=self.model.session) | ||
r1 = model.Role() | ||
dp1 = model.DatasetPermissions(action="action1", dataset=hda1.dataset, role=r1) | ||
dp2 = model.DatasetPermissions(action=None, dataset=hda1.dataset, role=r1) | ||
dp3 = model.DatasetPermissions(action="action3", dataset=hda1.dataset, role=r1) | ||
c1 = model.DatasetCollection(collection_type="type1") | ||
dce1 = model.DatasetCollectionElement(collection=c1, element=hda1) | ||
dce2 = model.DatasetCollectionElement(collection=c1, element=hda2) | ||
self.model.session.add_all([u, h1, hda1, hda2, r1, dp1, dp2, dp3, c1, dce1, dce2]) | ||
self.model.session.flush() | ||
assert c1.dataset_action_tuples == [("action1", r1.id), ("action3", r1.id)] | ||
|
||
def test_nested_collection_attributes(self): | ||
u = model.User(email="[email protected]", password="password") | ||
h1 = model.History(name="History 1", user=u) | ||
|
@@ -392,18 +408,31 @@ def test_nested_collection_attributes(self): | |
) | ||
self.model.session.add_all([d1, d2, c1, dce1, dce2, c2, dce3, c3, c4, dce4]) | ||
self.model.session.flush() | ||
q = c2._get_nested_collection_attributes( | ||
|
||
stmt = c2._build_nested_collection_attributes_stmt( | ||
element_attributes=("element_identifier",), hda_attributes=("extension",), dataset_attributes=("state",) | ||
) | ||
assert [(r._fields) for r in q] == [ | ||
result = self.model.session.execute(stmt).all() | ||
assert [(r._fields) for r in result] == [ | ||
("element_identifier_0", "element_identifier_1", "extension", "state"), | ||
("element_identifier_0", "element_identifier_1", "extension", "state"), | ||
] | ||
assert q.all() == [("inner_list", "forward", "bam", "new"), ("inner_list", "reverse", "txt", "new")] | ||
q = c2._get_nested_collection_attributes(return_entities=(model.HistoryDatasetAssociation,)) | ||
assert q.all() == [d1, d2] | ||
q = c2._get_nested_collection_attributes(return_entities=(model.HistoryDatasetAssociation, model.Dataset)) | ||
assert q.all() == [(d1, d1.dataset), (d2, d2.dataset)] | ||
|
||
stmt = c2._build_nested_collection_attributes_stmt( | ||
element_attributes=("element_identifier",), hda_attributes=("extension",), dataset_attributes=("state",) | ||
) | ||
result = self.model.session.execute(stmt).all() | ||
assert result == [("inner_list", "forward", "bam", "new"), ("inner_list", "reverse", "txt", "new")] | ||
|
||
stmt = c2._build_nested_collection_attributes_stmt(return_entities=(model.HistoryDatasetAssociation,)) | ||
result = self.model.session.execute(stmt).all() | ||
assert result == [(d1,), (d2,)] | ||
|
||
stmt = c2._build_nested_collection_attributes_stmt( | ||
return_entities=(model.HistoryDatasetAssociation, model.Dataset) | ||
) | ||
result = self.model.session.execute(stmt).all() | ||
assert result == [(d1, d1.dataset), (d2, d2.dataset)] | ||
# Assert properties that use _get_nested_collection_attributes return correct content | ||
assert c2.dataset_instances == [d1, d2] | ||
assert c2.dataset_elements == [dce1, dce2] | ||
|
@@ -422,13 +451,14 @@ def test_nested_collection_attributes(self): | |
assert c3.dataset_instances == [] | ||
assert c3.dataset_elements == [] | ||
assert c3.dataset_states_and_extensions_summary == (set(), set()) | ||
q = c4._get_nested_collection_attributes(element_attributes=("element_identifier",)) | ||
assert q.all() == [("outer_list", "inner_list", "forward"), ("outer_list", "inner_list", "reverse")] | ||
assert c4.dataset_elements == [dce1, dce2] | ||
assert c4.element_identifiers_extensions_and_paths == [ | ||
(("outer_list", "inner_list", "forward"), "bam", "mock_dataset_14.dat"), | ||
(("outer_list", "inner_list", "reverse"), "txt", "mock_dataset_14.dat"), | ||
|
||
stmt = c4._build_nested_collection_attributes_stmt(element_attributes=("element_identifier",)) | ||
result = self.model.session.execute(stmt).all() | ||
assert result == [ | ||
("outer_list", "inner_list", "forward"), | ||
("outer_list", "inner_list", "reverse"), | ||
] | ||
assert c4.dataset_elements == [dce1, dce2] | ||
|
||
def test_dataset_dbkeys_and_extensions_summary(self): | ||
u = model.User(email="[email protected]", password="password") | ||
|