-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcvs.py
executable file
·87 lines (72 loc) · 3.15 KB
/
cvs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python3
#
import gcpcvs
import logging
import typer
import json
from tabulate import tabulate
app = typer.Typer(no_args_is_help=True)
volume_app = typer.Typer(no_args_is_help=True)
app.add_typer(volume_app, name="volume")
snapshot_app = typer.Typer(no_args_is_help=True)
app.add_typer(snapshot_app, name="snapshot")
activedirectory_app = typer.Typer(no_args_is_help=True)
app.add_typer(activedirectory_app, name="activedirectory")
backup_app = typer.Typer(no_args_is_help=True)
app.add_typer(backup_app, name="backup")
replication_app = typer.Typer(no_args_is_help=True)
app.add_typer(replication_app, name="replication")
kms_app = typer.Typer(no_args_is_help=True)
app.add_typer(kms_app, name="kms")
cvs: gcpcvs
def print_results(entries: list, fields: list, style: str = "text"):
if style == "text":
table = []
for i in entries:
table.append([i[k] for k in fields])
print(tabulate(table, headers=fields))
if style == "json":
print(json.dumps(entries, indent=4))
return
@volume_app.command()
def list(format: str = typer.Option("text", help="Specify output format: text/json")):
result = cvs._API_getAll("-", "Volumes")
if result.status_code != 200:
logging.error(f"HTTP code: {result.status_code} {result.reason.decode('utf-8')} for url: {result.url}")
return
print_results(result.json(), ['volumeId', 'name', 'region', 'lifeCycleState', 'quotaInBytes', 'protocolTypes', 'serviceLevel', 'network'], format)
return
@snapshot_app.command()
def list(format: str = typer.Option("text", help="Specify output format: text/json")):
result = cvs._API_getAll("-", "Snapshots")
if result.status_code != 200:
logging.error(f"HTTP code: {result.status_code} {result.reason.decode('utf-8')} for url: {result.url}")
return
print_results(result.json(), ['ownerId', 'name', 'region', 'usedBytes'], format)
return
@activedirectory_app.command()
def list(format: str = typer.Option("text", help="Specify output format: text/json")):
result = cvs._API_getAll("-", "Storage/ActiveDirectory")
if result.status_code != 200:
logging.error(f"HTTP code: {result.status_code} {result.reason.decode('utf-8')} for url: {result.url}")
return
print_results(result.json(), ['UUID', 'domain', 'netBIOS', 'region', 'DNS', 'username'], format)
return
@kms_app.command()
def list(format: str = typer.Option("text", help="Specify output format: text/json")):
result = cvs._API_getAll("-", "Storage/KmsConfig")
if result.status_code != 200:
logging.error(f"HTTP code: {result.status_code} {result.reason.decode('utf-8')} for url: {result.url}")
return
print_results(result.json(), ['uuid', 'keyRingLocation', 'keyRing', 'keyName', 'region', 'network'], format)
return
if __name__ == "__main__":
import sys
from os import getenv
logging.basicConfig(level=logging.ERROR)
credentials = getenv('SERVICE_ACCOUNT_CREDENTIAL', None)
if credentials == None:
logging.error('Missing service account credentials. Please set SERVICE_ACCOUNT_CREDENTIAL.')
sys.exit(2)
cvs = gcpcvs.gcpcvs(credentials)
app()