forked from UtrechtUniversity/yoda-portal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
106 lines (76 loc) · 2.91 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
__copyright__ = 'Copyright (c) 2021-2022, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
import json
from typing import Any, Dict, Optional
from flask import Blueprint, g, jsonify, request, Response
from irods import message, rule
from errors import UnauthorizedAPIAccessError
from util import log_error
api_bp = Blueprint('api_bp', __name__)
@api_bp.route('/<fn>', methods=['POST'])
def _call(fn: str) -> Response:
if not authenticated():
raise UnauthorizedAPIAccessError
data: Dict[str, Any] = {}
if 'data' in request.form:
data = json.loads(request.form['data'])
result: Dict[str, Any] = call(fn, data)
code: int = 200
if result['status'] == 'error_internal':
code = 500
elif result['status'] != 'ok':
code = 400
response = jsonify(result)
response.status_code = code
return response
def call(fn: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
def bytesbuf_to_str(s: message.BinBytesBuf) -> str:
s = s.buf[:s.buflen]
i = s.find(b'\x00')
return s if i < 0 else s[:i]
def escape_quotes(s: str) -> str:
return s.replace('\\', '\\\\').replace('"', '\\"')
def break_strings(N: int, m: int) -> int:
return (N - 1) // m + 1
def nrep_string_expr(s: str, m: int = 64) -> str:
return ' ++\n'.join('"{}"'.format(escape_quotes(s[i * m:i * m + m])) for i in range(break_strings(len(s), m) + 1))
if data is None:
data = {}
params = json.dumps(data)
arg_str_expr = nrep_string_expr(params)
# Set parameters as variable instead of parameter input to circumvent iRODS string limits.
rule_body = ''' *x={}
api_{}(*x)
'''.format(arg_str_expr, fn)
x = rule.Rule(
g.irods,
instance_name='irods_rule_engine_plugin-irods_rule_language-instance',
body=rule_body,
params={},
output='ruleExecOut')
# Cleanup session for vault actions calling msiExecCmd.
if fn in ['vault_submit', 'vault_approve', 'vault_cancel', 'vault_depublish', 'vault_republish']:
g.irods.cleanup()
x = x.execute(session_cleanup=False)
x = bytesbuf_to_str(x._values['MsParam_PI'][0]._values['inOutStruct']._values['stdoutBuf'])
result = x.decode()
return json.loads(result)
def authenticated() -> bool:
return g.get('user') is not None and g.get('irods') is not None
@api_bp.errorhandler(Exception)
def api_error_handler(error: Exception) -> Response:
log_error(f'API Error: {error}', True)
status = "internal_error"
status_info = "Something went wrong"
data: Dict[str, Any] = {}
code = 500
if type(error) == UnauthorizedAPIAccessError:
code = 401
status_info = "Not authorized to use the API"
return jsonify(
{
"status": status,
"status_info": status_info,
"data": data
}), code