Skip to content

Commit

Permalink
[upgrade] Client enhanced with new methods
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamdipt committed Nov 25, 2021
1 parent cc2161f commit f3a6b7d
Show file tree
Hide file tree
Showing 7 changed files with 27,499 additions and 44 deletions.
68 changes: 51 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## Dependencies

- Python3
- Python >= 3.6
- GPG (also known as GnuPG) software

## Configuration
Expand All @@ -32,6 +32,39 @@ Or as a dictionary

## Usage


### Import GPG keys from Passbolt

The first step will be to import the private and public keys using gpg for encryption.

Note: Do not keep private and public files. Rather just import them using gpg command one time and delete those files.

#### Using Python
To import new keys using Python:

>>>import passboltapi
>>>passbolt = passboltapi.PassboltAPI(config_path="config.ini", new_keys=True)

To delete old keys and import only the new ones.

>>>import passboltapi
>>>passbolt = passboltapi.PassboltAPI(config_path="config.ini", new_keys=True, delete_old_keys=True)

#### Using GPG

Import new keys:

$gpg --import public.asc
$gpg --batch --import private.asc

Deleting existing keys:

$gpg --delete-secret-keys <fingerprint>
$gpg --delete-key <fingerprint>


## How to use PassboltAPI client

>>>import passboltapi
>>>passbolt = passboltapi.PassboltAPI(config_path="config.ini")
# Or pass the configuration settings as a dict
Expand All @@ -44,31 +77,32 @@ Or as a dictionary
# One can also use it as context manager
>>>with passboltapi.PassboltAPI(config_path="config.ini") as passbolt:

Check test.py for an example.

If new keys needs to be imported, then USER_PUBLIC_KEY_FILE and USER_PRIVATE_KEY_FILE settings
should be in the config ini having the path of the public and private keys file respectively.
To get all resources

To import new keys:
resources = {record.username: record for record in passbolt.list_resources(folder_id=folder_id)}

>>>import passboltapi
>>>passbolt = passboltapi.PassboltAPI(config_path="config.ini", new_keys=True)
To create new resource (optional: folder)

To delete old keys and import only the new ones.
response = passbolt.create_resource(
name=name,
username=username,
password=password,
uri=uri, # optional
description=description, # optional
folder=passbolt_folder_id # optional
)

>>>import passboltapi
>>>passbolt = passboltapi.PassboltAPI(config_path="config.ini", new_keys=True, delete_old_keys=True)
To move resource to folder

Recommended to do: Do not keep private and public files.
Rather just import them using gpg command one time and delete those files.
passbolt.move_resource_to_folder(resource_id, folder_id)

$gpg --import public.asc
$gpg --batch --import private.asc

For deleting gpg keys
### Sample test
Check test.py for an example.

$gpg --delete-secret-keys <fingerprint>
$gpg --delete-key <fingerprint>
If new keys needs to be imported, then USER_PUBLIC_KEY_FILE and USER_PRIVATE_KEY_FILE settings
should be in the config ini having the path of the public and private keys file respectively.


### Passbolt API
Expand Down
27,081 changes: 27,081 additions & 0 deletions get-pip.py

Large diffs are not rendered by default.

165 changes: 149 additions & 16 deletions passboltapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import requests
import configparser
import gnupg
import urllib.parse
from typing import List, Union, Mapping, Optional

import gnupg
import requests

from passboltapi.schema import (
constructor,
PassboltFolderIdType,
PassboltGroupIdType,
PassboltUserIdType,
PassboltResourceTypeIdType,
PassboltResourceTuple,
PassboltFolderTuple,
PassboltOpenPgpKeyTuple,
PassboltUserTuple,
PassboltSecretTuple,
PassboltDateTimeType,
PassboltRoleIdType,
PassboltSecretIdType,
PassboltGroupTuple,
PassboltResourceIdType,
PassboltPermissionTuple,
PassboltPermissionIdType,
PassboltFavoriteDetailsType,
PassboltOpenPgpKeyIdType,
AllPassboltTupleTypes
)

LOGIN_URL = "/auth/login.json"
VERIFY_URL = "/auth/verify.json"


class PassboltAPI:
class APIClient:

def __init__(self, config=None, config_path=None, new_keys=False, delete_old_keys=False):
def __init__(self, config: Optional[str] = None, config_path: Optional[str] = None, new_keys: bool = False,
delete_old_keys: bool = False):
"""
:param config: Config as a dictionary
:param config_path: Path to the config file.
Expand All @@ -26,7 +52,7 @@ def __init__(self, config=None, config_path=None, new_keys=False, delete_old_key
if not self.config["PASSBOLT"]["SERVER"]:
raise ValueError("Missing value for SERVER in config.ini")

self.server_url = self.config["PASSBOLT"]["SERVER"]
self.server_url = self.config["PASSBOLT"]["SERVER"].rstrip("/")
self.user_fingerprint = self.config["PASSBOLT"]["USER_FINGERPRINT"].upper()
self.gpg = gnupg.GPG()
if delete_old_keys:
Expand Down Expand Up @@ -79,11 +105,15 @@ def _login(self):
"user_token_result": token
},
})
self._get_csrf_token()

def encrypt(self, text):
def _get_csrf_token(self):
self.get("/users/me.json", return_response_object=True) # Fetches the X-CSRF-Token header for future requests

def encrypt(self, text, recipients=None):
return str(self.gpg.encrypt(
data=text,
recipients=self.gpg_fingerprint,
recipients=recipients or self.gpg_fingerprint,
always_trust=True
))

Expand All @@ -95,29 +125,132 @@ def decrypt(self, text):
))

def get_headers(self):
return {"X-CSRF-Token": self.requests_session.cookies['csrfToken'] if 'csrfToken' in self.requests_session.cookies else ''}
return {"X-CSRF-Token": self.requests_session.cookies[
'csrfToken'] if 'csrfToken' in self.requests_session.cookies else ''}

def get_server_public_key(self):
r = self.requests_session.get(self.server_url + VERIFY_URL)
return r.json()["body"]["fingerprint"], r.json()["body"]["keydata"]

def get(self, url):
r = self.requests_session.get(self.server_url + url)
def delete(self, url):
r = self.requests_session.delete(self.server_url + url, headers=self.get_headers())
return r.json()

def post(self, url, data):
r = self.requests_session.post(self.server_url + url, json=data, headers=self.get_headers())
def get(self, url, return_response_object=False, **kwargs):
r = self.requests_session.get(self.server_url + url, headers=self.get_headers(), **kwargs)
if return_response_object:
return r
return r.json()

def put(self, url, data):
r = self.requests_session.put(self.server_url + url, json=data, headers=self.get_headers())
def put(self, url, data, return_response_object=False, **kwargs):
r = self.requests_session.put(self.server_url + url, json=data, headers=self.get_headers(), **kwargs)
if return_response_object:
return r
return r.json()

def delete(self, url):
r = self.requests_session.delete(self.server_url + url, headers=self.get_headers())
def post(self, url, data, return_response_object=False, **kwargs):
r = self.requests_session.post(self.server_url + url, json=data, headers=self.get_headers(), **kwargs)
if return_response_object:
return r
return r.json()

def close_session(self):
self.requests_session.close()


class PassboltAPI(APIClient):
"""Adding a convenience method for getting resources.
Design Principle: All passbolt aware public methods must accept or output one of PassboltTupleTypes"""

def iterate_resources(self, params: Optional[dict] = None):
params = params or {}
url_params = urllib.parse.urlencode(params)
if url_params:
url_params = "?" + url_params
response = self.get('/resources.json' + url_params)
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"
resources = response["body"]
for resource in resources:
yield resource

def list_resources(self, folder_id: Union[None, PassboltFolderIdType] = None):
params = {
**({"filter[has-id][]": folder_id} if folder_id else {}),
"contain[children_resources]": True,
}
url_params = urllib.parse.urlencode(params)
if url_params:
url_params = "?" + url_params
response = self.get('/folders.json' + url_params)
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"
response = response["body"][0]
assert "children_resources" in response.keys(), f"Key 'body[].children_resources' not found in response " \
f"keys: {response.keys()} "
return constructor(PassboltResourceTuple)(response["children_resources"])

def get_secret(self, resource: PassboltResourceTuple) -> PassboltSecretTuple:
response = self.get(f"/secrets/resource/{resource.id}.json")
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"
return PassboltSecretTuple(**response["body"])

def update_secret(self, resource: PassboltResourceTuple, new_secret):
return self.put(f"/resources/{resource.id}.json", {
"secrets": new_secret
}, return_response_object=True)

def list_users(self, can_access: Union[None, PassboltResourceTuple, PassboltFolderTuple] = None) \
-> List[PassboltUserTuple]:
if can_access is not None:
params = {"filter[has-access]": can_access.id}
else:
params = {}
response = self.get(f"/users.json", params=params)
assert "body" in response.keys(), f"Key 'body' not found in response keys: {response.keys()}"

response["body"]: List[Mapping]

return constructor(
PassboltUserTuple,
subconstructors={"gpgkey": constructor(PassboltOpenPgpKeyTuple)},
)(response["body"])

def import_public_keys(self, trustlevel='TRUST_FULLY'):
# get all users
users = self.list_users()
for user in users:
self.gpg.import_keys(user.gpgkey.armored_key)
self.gpg.trust_keys(user.gpgkey.fingerprint, trustlevel)

def describe_folder(self, folder_id):
return self.get(f"/folders/{folder_id}.json")

def create_resource(self, name: str, username: str, password: str, uri: Optional[str] = None,
description: Optional[str] = None, folder: Optional[str] = None):
"""Creates a new resource on passbolt and shares it with the provided group and folder"""
r = self.post("/resources.json", {
"name": name,
"username": username,
"description": description or "",
"uri": uri or "",
"secrets": [
{
"data": self.encrypt(password)
}
],
**({"folder_parent_id": folder} if folder else {}),
}, return_response_object=True)
r.raise_for_status()
r_json = r.json()
new_resource_id = r_json.get("body", {}).get("id")
if new_resource_id is None:
raise ValueError(f"Unexpected resource creation: {r_json}")
if folder:
self.move_resource_to_folder(new_resource_id, folder)
return r_json

def move_resource_to_folder(self, resource_id, folder_id):
r = self.post(f"/move/resource/{resource_id}.json", {"folder_parent_id": folder_id},
return_response_object=True)
r.raise_for_status()
return r.json()
Loading

0 comments on commit f3a6b7d

Please sign in to comment.