Skip to content

Commit

Permalink
110 improve in code documentation (#111)
Browse files Browse the repository at this point in the history
* improving documententation and comments

* improving documententation and comments

* improving documententation and commentas for dsh module

* improving documententation and commentas for certificates module

* improving documententation and commentas for datastream module

* [FIX] closure problem

* improving documententation and commentas. also added test to the errors

* remove tests from error

* improving documententation and comments for management api

* [FIX] test error problem

* [FIX] typo

* remove error example in documentation

* remove error example in documentation

* Update dsh_sdk/src/datastream/mod.rs

* Apply suggestions from code review

---------

Co-authored-by: Frank Hol <[email protected]>
  • Loading branch information
Arend-Jan and toelo3 authored Jan 13, 2025
1 parent e164f48 commit a04f85f
Show file tree
Hide file tree
Showing 5 changed files with 443 additions and 274 deletions.
170 changes: 105 additions & 65 deletions dsh_sdk/src/certificates/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
//! Handle DSH Certificates and bootstrap process
//! Handles DSH certificates and the bootstrap process.
//!
//! The certificate struct holds the DSH CA certificate, the DSH Kafka certificate and
//! the private key. It also has methods to create a reqwest client with the DSH Kafka
//! certificate included and to retrieve the certificates and keys as PEM strings. Also
//! it is possible to create the ca.crt, client.pem, and client.key files in a desired
//! directory.
//! The [`Cert`] struct holds the DSH CA certificate, the DSH Kafka certificate, and
//! the corresponding private key. It provides methods to:
//! - Create Reqwest clients (async/blocking) that embed the Kafka certificate for secure connections
//! - Retrieve certificates and keys as PEM strings
//! - Generate certificate files (`ca.crt`, `client.pem`, and `client.key`) in a target directory
//!
//! ## Create files
//! # Usage Flow
//! Typically, you either:
//! 1. **Bootstrap**: Generate and sign certificates using [`Cert::from_bootstrap`] or [`Cert::from_env`],
//! which fetches or creates certificates at runtime.
//! 2. **Load**: Read existing certificates from a directory using [`Cert::from_pki_config_dir`].
//!
//! To create the ca.crt, client.pem, and client.key files in a desired directory, use the
//! `to_files` method.
//! After obtaining a [`Cert`] instance, you can create HTTP clients or retrieve the raw certificate/key data.
//!
//! ## Creating Files
//! To create the `ca.crt`, `client.pem`, and `client.key` files in a desired directory, use the
//! [`Cert::to_files`] method.
//! ```no_run
//! use dsh_sdk::certificates::Cert;
//! use std::path::PathBuf;
Expand Down Expand Up @@ -38,7 +45,12 @@ mod bootstrap;
mod error;
mod pki_config_dir;

/// Hold all relevant certificates and keys to connect to DSH Kafka Cluster and Schema Store.
/// Holds all relevant certificates and private keys to connect to the DSH Kafka cluster and the Schema Store.
///
/// This struct includes:
/// - `dsh_ca_certificate_pem`: The CA certificate (equivalent to `ca.crt`)
/// - `dsh_client_certificate_pem`: The client (Kafka) certificate (equivalent to `client.pem`)
/// - `key_pair`: The private key used for Kafka connections (equivalent to `client.key`)
#[derive(Debug, Clone)]
pub struct Cert {
dsh_ca_certificate_pem: String,
Expand All @@ -47,7 +59,7 @@ pub struct Cert {
}

impl Cert {
/// Create new [Cert] struct
/// Creates a new [`Cert`] struct from the given certificate strings and key pair.
fn new(
dsh_ca_certificate_pem: String,
dsh_client_certificate_pem: String,
Expand All @@ -60,34 +72,41 @@ impl Cert {
}
}

/// Bootstrap to DSH and sign the certificates.
/// Bootstraps to DSH and signs the certificates.
///
/// This fetches the DSH CA certificate, creates/signs a Kafka certificate, and generates a private key.
///
/// This method will get DSH CA certificate, sign the Kafka certificate and generate a private key.
/// # Recommended Approach
/// Use [`Cert::from_env`] if you rely on environment variables injected by DSH (e.g., `KAFKA_CONFIG_HOST`,
/// `MESOS_TASK_ID`). This allows an easier switch between Kafka Proxy, VPN connection, etc.
///
/// ## Recommended
/// Use [Cert::from_env] to get the certificates and keys. As this method will check based on the injected environment variables by DSH.
/// This method also allows you to easily switch between Kafka Proxy or VPN connection, based on `PKI_CONFIG_DIR` environment variable.
/// # Arguments
/// - `config_host`: The DSH config host where the CSR is sent.
/// - `tenant_name`: The tenant name.
/// - `task_id`: The running container’s task ID.
///
/// ## Arguments
/// * `config_host` - The DSH config host where the CSR can be send to.
/// * `tenant_name` - The tenant name.
/// * `task_id` - The task id of running container.
/// # Errors
/// Returns a [`CertificatesError`] if the bootstrap process fails (e.g., network issues or invalid inputs).
pub fn from_bootstrap(
config_host: &str,
tenant_name: &str,
task_id: &str,
) -> Result<Self, CertificatesError> {
bootstrap::bootstrap(&config_host, tenant_name, task_id)
bootstrap::bootstrap(config_host, tenant_name, task_id)
}

Check warning on line 96 in dsh_sdk/src/certificates/mod.rs

View check run for this annotation

Codecov / codecov/patch

dsh_sdk/src/certificates/mod.rs#L90-L96

Added lines #L90 - L96 were not covered by tests

/// Bootstrap to DSH and sign the certificates based on the injected environment variables by DSH.
/// Bootstraps to DSH and signs certificates based on environment variables injected by DSH.
///
/// This method will first check if `PKI_CONFIG_DIR` environment variable is set. If set, it will use the certificates from the directory.
/// This is usefull when you want to use Kafka Proxy, VPN or when a different process that already created the certificates. More info at [CONNECT_PROXY_VPN_LOCAL.md](https://github.com/kpn-dsh/dsh-sdk-platform-rs/blob/main/dsh_sdk/CONNECT_PROXY_VPN_LOCAL.md).
/// This method checks if `PKI_CONFIG_DIR` is set:
/// - If it is, certificates are loaded from that directory (e.g., when using Kafka Proxy or VPN).
/// - Otherwise, it uses `KAFKA_CONFIG_HOST`, `MESOS_TASK_ID`, and `MARATHON_APP_ID` to bootstrap
/// and sign certificates.
///
/// Else it will check `KAFKA_CONFIG_HOST`, `MESOS_TASK_ID` and `MARATHON_APP_ID` environment variables to bootstrap to DSH and sign the certificates.
/// These environment variables are injected by DSH.
/// # Errors
/// Returns a [`CertificatesError::MisisngInjectedVariables`] if required environment variables are absent,
/// or if the bootstrap operation fails for another reason.
pub fn from_env() -> Result<Self, CertificatesError> {

Check warning on line 108 in dsh_sdk/src/certificates/mod.rs

View check run for this annotation

Codecov / codecov/patch

dsh_sdk/src/certificates/mod.rs#L108

Added line #L108 was not covered by tests
// Attempt to load from PKI_CONFIG_DIR
if let Ok(cert) = Self::from_pki_config_dir::<std::path::PathBuf>(None) {
Ok(cert)
} else if let (Ok(config_host), Ok(task_id), Ok(tenant_name)) = (
Expand All @@ -101,29 +120,35 @@ impl Cert {
}
}

Check warning on line 121 in dsh_sdk/src/certificates/mod.rs

View check run for this annotation

Codecov / codecov/patch

dsh_sdk/src/certificates/mod.rs#L121

Added line #L121 was not covered by tests

/// Get the certificates from a directory.
/// Loads the certificates from a specified directory (or from `PKI_CONFIG_DIR` if set).
///
/// Useful if certificates are already created and stored locally (e.g., Kafka Proxy, VPN usage).
///
/// This method is usefull if you already have the certificates in a directory.
/// For example if you are using Kafka Proxy, VPN or when a different process already
/// created the certificates.
/// # Arguments
/// - `path`: An optional path to the directory containing the certificates in PEM format.
///
/// ## Arguments
/// * `path` - Path to the directory where the certificates are stored (Optional).
/// If omitted, the `PKI_CONFIG_DIR` environment variable is used.
///
/// path can be overruled by setting the environment variable `PKI_CONFIG_DIR`.
/// # Note
/// - Only PEM format for certificates is supported.
/// - Key files should be in PKCS#8 format and can be in DER or PEM.
///
/// ## Note
/// Only certificates in PEM format are supported.
/// Key files should be in PKCS8 format and can be DER or PEM files.
/// # Errors
/// Returns a [`CertificatesError`] if files are missing, malformed, or cannot be read.
pub fn from_pki_config_dir<P>(path: Option<P>) -> Result<Self, CertificatesError>
where
P: AsRef<std::path::Path>,
{
pki_config_dir::get_pki_certificates(path)
}

/// Build an async reqwest client with the DSH Kafka certificate included.
/// With this client we can retrieve datastreams.json and conenct to Schema Registry.
/// Builds an **async** Reqwest client with the DSH Kafka certificate included.
///
/// This client can be used to securely fetch `datastreams.json` or connect to the Schema Registry.
///
/// # Panics
/// Panics if the certificate or private key is invalid. In practice, this should not occur if
/// the [`Cert`] was instantiated successfully.
pub fn reqwest_client_config(&self) -> reqwest::ClientBuilder {
let (pem_identity, reqwest_cert) = Self::prepare_reqwest_client(
self.dsh_kafka_certificate_pem(),
Expand All @@ -136,8 +161,13 @@ impl Cert {
.use_rustls_tls()
}

Check warning on line 162 in dsh_sdk/src/certificates/mod.rs

View check run for this annotation

Codecov / codecov/patch

dsh_sdk/src/certificates/mod.rs#L152-L162

Added lines #L152 - L162 were not covered by tests

/// Build a reqwest client with the DSH Kafka certificate included.
/// With this client we can retrieve datastreams.json and conenct to Schema Registry.
/// Builds a **blocking** Reqwest client with the DSH Kafka certificate included.
///
/// This client can be used to securely fetch `datastreams.json` or connect to the Schema Registry.
///
/// # Panics
/// Panics if the certificate or private key is invalid. This should not occur if
/// the [`Cert`] was instantiated successfully.
pub fn reqwest_blocking_client_config(&self) -> ClientBuilder {
let (pem_identity, reqwest_cert) = Self::prepare_reqwest_client(
self.dsh_kafka_certificate_pem(),
Expand All @@ -150,42 +180,41 @@ impl Cert {
.use_rustls_tls()
}

Check warning on line 181 in dsh_sdk/src/certificates/mod.rs

View check run for this annotation

Codecov / codecov/patch

dsh_sdk/src/certificates/mod.rs#L171-L181

Added lines #L171 - L181 were not covered by tests

/// Get the root certificate as PEM string. Equivalent to ca.crt.
/// Returns the root CA certificate as a PEM string (equivalent to `ca.crt`).
pub fn dsh_ca_certificate_pem(&self) -> &str {
self.dsh_ca_certificate_pem.as_str()
&self.dsh_ca_certificate_pem
}

/// Get the kafka certificate as PEM string. Equivalent to client.pem.
/// Returns the Kafka certificate as a PEM string (equivalent to `client.pem`).
pub fn dsh_kafka_certificate_pem(&self) -> &str {
self.dsh_client_certificate_pem.as_str()
&self.dsh_client_certificate_pem
}

/// Get the private key as PKCS8 and return bytes based on asn1 DER format.
/// Returns the private key in PKCS#8 ASN.1 DER-encoded bytes.
pub fn private_key_pkcs8(&self) -> Vec<u8> {
self.key_pair.serialize_der()
}

/// Get the private key as PEM string. Equivalent to client.key.
/// Returns the private key as a PEM string (equivalent to `client.key`).
pub fn private_key_pem(&self) -> String {
self.key_pair.serialize_pem()
}

/// Get the public key as PEM string.
/// Returns the public key in PEM format.
pub fn public_key_pem(&self) -> String {
self.key_pair.public_key_pem()
}

/// Get the public key as DER bytes.
/// Returns the public key as DER bytes.
pub fn public_key_der(&self) -> Vec<u8> {
self.key_pair.public_key_der()
}

/// Create the ca.crt, client.pem, and client.key files in a desired directory.
/// Creates `ca.crt`, `client.pem`, and `client.key` files in the specified directory.
///
/// This method will create the directory if it does not exist.
/// This method also creates the directory if it doesn't exist.
///
/// # Example
///
/// ```no_run
/// use dsh_sdk::certificates::Cert;
/// use std::path::PathBuf;
Expand All @@ -197,6 +226,9 @@ impl Cert {
/// # Ok(())
/// # }
/// ```
///
/// # Errors
/// Returns a [`CertificatesError`] if files cannot be created or written.
pub fn to_files(&self, dir: &PathBuf) -> Result<(), CertificatesError> {
std::fs::create_dir_all(dir)?;
Self::create_file(dir.join("ca.crt"), self.dsh_ca_certificate_pem())?;
Expand All @@ -205,12 +237,17 @@ impl Cert {
Ok(())
}

/// Internal helper to create a file with the specified contents.
fn create_file<C: AsRef<[u8]>>(path: PathBuf, contents: C) -> Result<(), CertificatesError> {
std::fs::write(&path, contents)?;
info!("File created ({})", path.display());
Ok(())
}

/// Creates a [`reqwest::Identity`] from the certificate and private key bytes.
///
/// # Errors
/// Returns a `reqwest::Error` if the provided bytes are invalid.
fn create_identity(
cert: &[u8],
private_key: &[u8],
Expand All @@ -221,25 +258,28 @@ impl Cert {
reqwest::Identity::from_pem(&ident)
}

/// Panics when the certificate or key is not valid.
/// However, these are already validated during the creation of the `Cert` struct and converted if nedded.
/// Internal helper to set up the [`reqwest::Identity`] and root certificate.
///
/// # Panics
/// Panics if the certificate or key is invalid, but they should already be validated
/// during [`Cert`] construction.
fn prepare_reqwest_client(
kafka_certificate: &str,
private_key: &str,
ca_certificate: &str,
) -> (reqwest::Identity, reqwest::tls::Certificate) {
let pem_identity =
Cert::create_identity(kafka_certificate.as_bytes(), private_key.as_bytes()).expect(
"Error creating identity. The kafka certificate or key is not valid. Please check the certificate and key.",
);
let reqwest_cert = reqwest::tls::Certificate::from_pem(ca_certificate.as_bytes()).expect(
"Error parsing CA certificate as PEM to be used in Reqwest. The certificate is not valid. Please check the certificate.",
);
Cert::create_identity(kafka_certificate.as_bytes(), private_key.as_bytes())
.expect("Error creating identity. The Kafka certificate or key is invalid.");

let reqwest_cert = reqwest::tls::Certificate::from_pem(ca_certificate.as_bytes())
.expect("Error parsing CA certificate as PEM. The certificate is invalid.");

(pem_identity, reqwest_cert)
}

Check warning on line 279 in dsh_sdk/src/certificates/mod.rs

View check run for this annotation

Codecov / codecov/patch

dsh_sdk/src/certificates/mod.rs#L266-L279

Added lines #L266 - L279 were not covered by tests
}

/// Helper function to ensure that the host starts with `https://` (or `http://`)
/// Helper function to ensure that the host starts with `https://` or `http://`.
pub(crate) fn ensure_https_prefix(host: impl AsRef<str>) -> String {
if host.as_ref().starts_with("http://") || host.as_ref().starts_with("https://") {
host.as_ref().to_string()
Expand Down Expand Up @@ -273,27 +313,27 @@ mod tests {
let pkey_pem_bytes = pkey.private_key_to_pem_pkcs8().unwrap();

let key_pem = cert.private_key_pem();
let pkey_pem = String::from_utf8_lossy(pkey_pem_bytes.as_slice());
let pkey_pem = String::from_utf8_lossy(&pkey_pem_bytes);
assert_eq!(key_pem, pkey_pem);
}

#[test]
fn test_public_key_pem() {
let cert = TEST_CERTIFICATES.get_or_init(set_test_cert);
let der = cert.key_pair.serialize_der();
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
let pkey = PKey::private_key_from_der(&der).unwrap();
let pkey_pub_pem_bytes = pkey.public_key_to_pem().unwrap();

let pub_pem = cert.public_key_pem();
let pkey_pub_pem = String::from_utf8_lossy(pkey_pub_pem_bytes.as_slice());
let pkey_pub_pem = String::from_utf8_lossy(&pkey_pub_pem_bytes);
assert_eq!(pub_pem, pkey_pub_pem);
}

#[test]
fn test_public_key_der() {
let cert = TEST_CERTIFICATES.get_or_init(set_test_cert);
let der = cert.key_pair.serialize_der();
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
let pkey = PKey::private_key_from_der(&der).unwrap();
let pkey_pub_der = pkey.public_key_to_der().unwrap();

let pub_der = cert.public_key_der();
Expand All @@ -304,7 +344,7 @@ mod tests {
fn test_private_key_pkcs8() {
let cert = TEST_CERTIFICATES.get_or_init(set_test_cert);
let der = cert.key_pair.serialize_der();
let pkey = PKey::private_key_from_der(der.as_slice()).unwrap();
let pkey = PKey::private_key_from_der(&der).unwrap();
let pkey = pkey.private_key_to_pkcs8().unwrap();

let key = cert.private_key_pkcs8();
Expand Down
Loading

0 comments on commit a04f85f

Please sign in to comment.