Skip to content

Commit

Permalink
Extracts raw data #1
Browse files Browse the repository at this point in the history
  • Loading branch information
bwalsh committed Feb 3, 2023
1 parent 9e4b8a1 commit 9a01fe9
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ dmypy.json

# Pyre type checker
.pyre/

# project data
.idea
Secrets/.env
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,72 @@

# MCF10A
LINCS MCF10A Molecular Deep Dive (MDD)

## Startup

Setup python env
```
python3 -m venv venv ; source venv/bin/activate
pip intall -r requirements.txt
pip intall -e .
```


Create your own .env file using `https://www.synapse.org/#!PersonalAccessTokens:`

```
cat Secrets/.env
SYNAPSE_AUTH_TOKEN=ey...
```

Load your environment:

```
export $(cat Secrets/.env | xargs)
```


## Extract

Download raw data

```
export $(cat Secrets/.env | xargs)
mcf10a_etl extract --help
Usage: mcf10a_etl extract [OPTIONS] COMMAND [ARGS]...
Extract data from Synapse.
Options:
--help Show this message and exit.
Commands:
files Synchronizes all the files in a folder (including...
hierarchy Extract project hierarchy.
project Extract project.
sample Extract sample annotations.
table Extract project summary.
```

## Transform

Transform raw data into FHIR

```
$ mcf10a_etl transform --help
Usage: mcf10a_etl transform [OPTIONS] COMMAND [ARGS]...
Transform raw data from Synapse.
Options:
--help Show this message and exit.
Commands:
specimens FHIR Specimen
study FHIR ResearchStudy
subjects FHIR Patient, ResearchSubject
tasks FHIR Task, DocumentReference
```
2 changes: 2 additions & 0 deletions Secrets/.env-sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# https://python-docs.synapse.org/build/html/Credentials.html
SYNAPSE_AUTH_TOKEN=
1 change: 1 addition & 0 deletions Secrets/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
29 changes: 29 additions & 0 deletions mcf10a_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import csv
import pathlib
import uuid
from typing import Iterator, Dict

from orjson import orjson

RAW_DATA_PATH = pathlib.Path('data/raw')
PROJECT_ID = 'syn21577710'
SUMMARY_TABLE_ID = 'syn18486042'
SAMPLE_ANNOTATIONS_ID = 'syn18662790'
FHIR_DATA_PATH = pathlib.Path('data/fhir')

ACED_NAMESPACE = uuid.uuid3(uuid.NAMESPACE_DNS, 'aced-ipd.org')


def read_ndjson(path: str) -> Iterator[Dict]:
"""Read ndjson file, load json line by line."""
with open(path) as jsonfile:
for l_ in jsonfile.readlines():
yield orjson.loads(l_)


def read_tsv(path: str, delimiter="\t") -> Iterator[Dict]:
"""Read tsv file line by line."""
with open(path) as tsv_file:
reader = csv.DictReader(tsv_file, delimiter=delimiter)
for row in reader:
yield row
23 changes: 23 additions & 0 deletions mcf10a_etl/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging

import click

from mcf10a_etl.extract import extract
from mcf10a_etl.transform import transform

logging.basicConfig(format='%(asctime)s %(message)s', encoding='utf-8', level=logging.INFO)
logger = logging.getLogger(__name__)


@click.group()
def cli():
"""Extract, Transform MCF10A data."""
pass


cli.add_command(extract)
cli.add_command(transform)


if __name__ == '__main__':
cli()
144 changes: 144 additions & 0 deletions mcf10a_etl/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import datetime
import pathlib
from typing import List, Any
import dataclasses

import click
import orjson
import synapseclient
import synapseutils

from synapseclient import Project, Folder, File, Link
import shutil

from mcf10a_etl import RAW_DATA_PATH, PROJECT_ID, SUMMARY_TABLE_ID, SAMPLE_ANNOTATIONS_ID, read_tsv


@click.group()
def extract():
"""Extract data from Synapse."""
pass


@extract.command('project')
@click.argument('output_path', default=RAW_DATA_PATH)
@click.option('--project_id', default=PROJECT_ID)
def extract_project(output_path, project_id):
"""Extract project."""
syn = synapseclient.Synapse()
syn.login(silent=True)
project = syn.get(project_id)
output_path = pathlib.Path(output_path)
with open(output_path / "project.ndjson", "w") as fp:
bytes_ = orjson.dumps(project.__dict__, option=orjson.OPT_NAIVE_UTC | orjson.OPT_APPEND_NEWLINE)
fp.write(str(bytes_, 'UTF-8'))


@extract.command('hierarchy')
@click.argument('output_path', default=RAW_DATA_PATH)
@click.option('--project_id', default=PROJECT_ID)
def extract_tree(output_path, project_id):
"""Extract project hierarchy."""
syn = synapseclient.Synapse()
syn.login(silent=True)
output_path = pathlib.Path(output_path)

# Traverse through the hierarchy of files and folders stored under the synId.
# https://python-docs.synapse.org/build/html/synapseutils.html#synapseutils.walk_functions.walk
walker = synapseutils.walk_functions.walk(syn, project_id,
includeTypes=['folder', 'file', 'table', 'link', 'entityview',
'dockerrepo', 'submissionview', 'dataset',
'materializedview'])

@dataclasses.dataclass
class NamedId:
name: str
id_: str
entity: Any

@dataclasses.dataclass
class WalkedPath:
dir_path: NamedId
items: List[NamedId]
file_names: List[NamedId]

def _map_item(item: tuple, fetch=True):
"""Transform a 2 item tuple to a NamedId"""
# Getting the entity retrieves an object that holds metadata describing the matrix,
# and also downloads the file to a local cache.
# We _don't_ want the file, we'll do that separately
entity = None
if fetch:
e_ = syn.get(item[1], downloadFile=False)
# itemize keys and values to make the entity json serializable
entity = {
'properties': e_.properties,
'annotations': e_.annotations
}
if hasattr(e_, '_file_handle'):
entity['file_handle'] = e_._file_handle # noqa
return NamedId(item[0], item[1], entity)

def _map_items(items: List[tuple], fetch=True):
"""Transform a list of 2 item tuples to a list of NamedId"""
return [_map_item(i, fetch) for i in items]


# Traverse through the hierarchy of files and folders stored under the synId.
# https://python-docs.synapse.org/build/html/synapseutils.html#synapseutils.walk_functions.walk
path_hierarchy = [WalkedPath(_map_item(dir_path), _map_items(items), _map_items(file_names)) for
dir_path, items, file_names in walker]

with open(output_path / "hierarchy.ndjson", "w") as fp:
for p in path_hierarchy:
bytes_ = orjson.dumps(p.__dict__, option=orjson.OPT_NAIVE_UTC | orjson.OPT_APPEND_NEWLINE)
fp.write(str(bytes_, 'UTF-8'))


@extract.command('files')
@click.argument('output_path', default=RAW_DATA_PATH)
@click.option('--project_id', default=PROJECT_ID)
def extract_files(output_path, project_id):
"""Synchronizes all the files in a folder (including subfolders) from Synapse."""
syn = synapseclient.Synapse()
syn.login(silent=True)
output_path = pathlib.Path(output_path)

# Synchronizes all the files in a folder (including subfolders) from Synapse
# and adds a readme manifest with file metadata.
# https://python-docs.synapse.org/build/html/synapseutils.html#synapseutils.sync.syncFromSynapse
synapseutils.syncFromSynapse(syn, project_id, path=str(output_path))


@extract.command('table')
@click.argument('output_path', default=RAW_DATA_PATH)
@click.option('--table_id', default=SUMMARY_TABLE_ID)
def extract_table(output_path, table_id):
"""Extract project summary."""
syn = synapseclient.Synapse(silent=True)
syn.login(silent=True)
output_path = pathlib.Path(output_path)
table_csv = syn.tableQuery(f"select * from {table_id}", resultsAs="csv")
with open(output_path / "summary_table.ndjson", "w") as fp:
for annotation in read_tsv(table_csv.filepath, delimiter=","):
bytes_ = orjson.dumps(annotation, option=orjson.OPT_NAIVE_UTC | orjson.OPT_APPEND_NEWLINE)
fp.write(str(bytes_, 'UTF-8'))


@extract.command('sample')
@click.argument('output_path', default=RAW_DATA_PATH)
@click.option('--file_id', default=SAMPLE_ANNOTATIONS_ID)
def extract_sample_annotations(output_path, file_id):
"""Extract sample annotations."""
syn = synapseclient.Synapse(silent=True)
syn.login(silent=True)
output_path = pathlib.Path(output_path)
entity = syn.get(file_id, downloadFile=True, downloadLocation=output_path)
with open(output_path / "sample_annotations.ndjson", "w") as fp:
for annotation in read_tsv(entity.path, delimiter=","):
bytes_ = orjson.dumps(annotation, option=orjson.OPT_NAIVE_UTC | orjson.OPT_APPEND_NEWLINE)
fp.write(str(bytes_, 'UTF-8'))


if __name__ == '__main__':
extract()
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
synapseclient[pandas, pysftp]
click
orjson
https://github.com/bmeg/fhir.resources/archive/feature/backref.zip
Loading

0 comments on commit 9a01fe9

Please sign in to comment.