Skip to content

Commit

Permalink
ctk load table: Add support for MongoDB Change Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 17, 2024
1 parent 302c44f commit f538572
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- `ctk load table`: Added support for MongoDB Change Streams

## 2024/07/08 v0.0.15
- IO: Added the `if-exists` query parameter by updating to influxio 0.4.0.
Expand Down
16 changes: 11 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
from cratedb_toolkit.io.mongodb.api import mongodb_copy
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

Check warning on line 127 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L125-L127

Added lines #L125 - L127 were not covered by tests

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
mongodb_relay_cdc(source_url, target_url, progress=True)

Check warning on line 129 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L129

Added line #L129 was not covered by tests
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

Check warning on line 131 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L131

Added line #L131 was not covered by tests

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

Check warning on line 136 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L133-L136

Added lines #L133 - L136 were not covered by tests
else:
raise NotImplementedError("Importing resource not implemented yet")
39 changes: 39 additions & 0 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import logging

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB

Check warning on line 4 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L4

Added line #L4 was not covered by tests
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
Expand Down Expand Up @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):

Check warning on line 74 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L74

Added line #L74 was not covered by tests
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc
ctk load table mongodb+cdc://localhost:27017/testdrive/demo
Backlog
-------
TODO: Run on multiple collections.
TODO: Run on the whole database.
TODO: Accept parameters like `if_exists="append,replace"`.
TODO: Propagate parameters like `scan="full"`.
"""
logger.info("Running MongoDB CDC relay")

Check warning on line 88 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L88

Added line #L88 was not covered by tests

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

Check warning on line 94 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L91-L94

Added lines #L91 - L94 were not covered by tests

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()

Check warning on line 97 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L96-L97

Added lines #L96 - L97 were not covered by tests

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(

Check warning on line 100 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L100

Added line #L100 was not covered by tests
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
)

# Invoke machinery.
relay.start()

Check warning on line 109 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L109

Added line #L109 was not covered by tests
9 changes: 6 additions & 3 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ class TableAddress:

@property
def fullname(self):
if self.schema is None and self.table is None:
if self.table is None:
raise ValueError("Uninitialized table address can not be serialized")
if self.schema and self.table:
return f'"{self.schema}"."{self.table}"'
schema = self.schema.strip('"')
table = self.table.strip('"')
return f'"{schema}"."{table}"'
else:
return f'"{self.table}"'
table = self.table.strip('"')
return f'"{table}"'

Check warning on line 96 in cratedb_toolkit/model.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/model.py#L95-L96

Added lines #L95 - L96 were not covered by tests


@dataclasses.dataclass
Expand Down
268 changes: 268 additions & 0 deletions doc/io/mongodb/cdc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
(mongodb-cdc-relay)=
# MongoDB CDC Relay

## About
Relay a [MongoDB Change Stream] into a [CrateDB] table using a one-stop command
`ctk load table mongodb+cdc://...`, or `mongodb+srv+cdc://` for MongoDB Atlas.

You can use it in order to facilitate convenient data transfers to be used
within data pipelines or ad hoc operations. It can be used as a CLI interface,
and as a library.


## Install
```shell
pip install --upgrade 'cratedb-toolkit[mongodb]'
```

:::{tip}
The tutorial also uses the programs `crash`, `mongosh`, and `atlas`. `crash`
will be installed with CrateDB Toolkit, but `mongosh` and `atlas` must be
installed by other means. If you are using Docker anyway, please use those
command aliases to provide them to your environment without actually needing
to install them.

```shell
alias mongosh='docker run -i --rm --network=host mongo:7 mongosh'
```

The `atlas` program needs to store authentication information between invocations,
therefore you need to supply a storage volume.
```shell
mkdir atlas-config
alias atlas='docker run --rm -it --volume=$(pwd)/atlas-config:/root mongodb/atlas atlas'
```
:::


## Usage

(mongodb-cdc-workstation)=
### Workstation
The guidelines assume that both services, CrateDB and MongoDB, are listening on
`localhost`.
Please find guidelines how to provide them on your workstation using
Docker or Podman in the {ref}`mongodb-cdc-services-standalone` section below.
```shell
export MONGODB_URL=mongodb://localhost/testdrive
export MONGODB_URL_CTK=mongodb+cdc://localhost/testdrive/demo
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost/testdrive/demo-cdc
ctk load table "${MONGODB_URL_CTK}"
```

Insert document into MongoDB collection, and update it.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})'
mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })'
```

Query data in CrateDB.
```shell
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

Invoke a delete operation, and check data in CrateDB once more.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.deleteOne({"foo": "bar"})'
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

(mongodb-cdc-cloud)=
### Cloud
The guidelines assume usage of cloud variants for both services, CrateDB Cloud
and MongoDB Atlas.
Please find guidelines how to provision relevant cloud resources
in the {ref}`mongodb-cdc-services-cloud` section below.

:::{rubric} Invoke pipeline
:::
A canonical invocation for ingesting MongoDB Atlas Change Streams into
CrateDB Cloud.

```shell
export MONGODB_URL=mongodb+srv://user:[email protected]/testdrive
export MONGODB_URL_CTK=mongodb+srv+cdc://user:[email protected]/testdrive/demo
export CRATEDB_HTTP_URL="https://admin:[email protected]:4200/"
export CRATEDB_SQLALCHEMY_URL="crate://admin:[email protected]:4200/testdrive/demo-cdc?ssl=true"
```
```shell
ctk load table "${MONGODB_URL_CTK}"
```

:::{note}
Please note the `mongodb+srv://` and `mongodb+srv+cdc://` URL schemes, and the
`ssl=true` query parameter. Both are needed to establish connectivity with
MongoDB Atlas and CrateDB.
:::

:::{rubric} Trigger CDC events
:::
Inserting a document into the MongoDB collection, and updating it, will trigger two CDC events.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})'
mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })'
```

:::{rubric} Query data in CrateDB
:::
```shell
crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-cdc";'
```


## Appendix

### Database Operations
A few operations that are handy when exploring this exercise.

Reset MongoDB collection.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.drop()'
```

Reset CrateDB table.
```shell
crash --command 'DELETE FROM "testdrive"."demo-cdc";'
```

Display documents in MongoDB collection.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.find()'
```

(mongodb-cdc-services-standalone)=
### Standalone Services
Quickly start CrateDB and MongoDB using Docker or Podman.

#### CrateDB
Start CrateDB.
```shell
docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \
crate:5.7 -Cdiscovery.type=single-node
```

#### MongoDB
Start MongoDB.
Please note that change streams are only available for replica sets and
sharded clusters, so let's define a replica set by using the
`--replSet rs-testdrive` option when starting the MongoDB server.
```shell
docker run -it --rm --name=mongodb --publish=27017:27017 \
mongo:7 mongod --replSet rs-testdrive
```

Now, initialize the replica set, by using the `mongosh` command to invoke
the `rs.initiate()` operation.
```shell
export MONGODB_URL="mongodb://localhost/"
docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <<EOF
config = {
_id: "rs-testdrive",
members: [{ _id : 0, host : "localhost:27017"}]
};
rs.initiate(config);
EOF
```


(mongodb-cdc-services-cloud)=
### Cloud Services
Quickly provision [CrateDB Cloud] and [MongoDB Atlas].

#### CrateDB Cloud
To provision a database cluster, use either the [croud CLI], or the
[CrateDB Cloud Web Console].

Invoke CLI login.
```shell
croud login
```
Create organization.
```shell
croud organizations create --name samplecroudorganization
```
Create project.
```shell
croud projects create --name sampleproject
```
Deploy cluster.
```shell
croud clusters deploy /
--product-name crfree /
--tier default /
--cluster-name testdrive /
--subscription-id 782dfc00-7b25-4f48-8381-b1b096dd1619 \
--project-id 952cd102-91c1-4837-962a-12ecb71a6ba8 \
--version 5.8.0 \
--username admin \
--password "as6da9ddasfaad7i902jcv780dmcba"
```

Finally, create a "Database Access" user and use the credentials to populate
`MONGODB_URL` and `MONGODB_URL_CTK` at {ref}`mongodb-cdc-workstation` properly.

When shutting down your workbench, you may want to clean up any cloud resources
you just used.
```shell
croud clusters delete --cluster-id CLUSTER_ID
```

#### MongoDB Atlas
To provision a database cluster, use either the [Atlas CLI], or the
Atlas User Interface.

Create an API key.
```shell
atlas projects apiKeys create --desc "Ondemand Testdrive" --role GROUP_OWNER
```
```text
API Key '889727cb5bfe8830d0f8a203' created.
Public API Key bksttjep
Private API Key 9f8c1c41-b5f7-4d2a-b1a0-a1d2ef457796
```
Enter authentication key information.
```shell
atlas config init
```
Create database cluster.
```shell
atlas clusters create testdrive --provider AWS --region EU_CENTRAL_1 --tier M0 --tag env=dev
```
Inquire connection string.
```shell
atlas clusters connectionStrings describe testdrive
```
```text
mongodb+srv://testdrive.jaxmmfp.mongodb.net
```

Finally, create a "Database Access" user and use the credentials to populate
`MONGODB_URL` and `MONGODB_URL_CTK` at {ref}`mongodb-cdc-cloud` properly.

When shutting down your workbench, you may want to clean up any cloud resources
you just used.
```shell
atlas clusters delete testdrive
```


## Backlog
:::{todo}
- Improve UX/DX.
- Provide `ctk shell`.
- Provide [SDK and CLI for CrateDB Cloud Cluster APIs].

[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81
:::


[Atlas CLI]: https://www.mongodb.com/docs/atlas/cli/
[commons-codec]: https://pypi.org/project/commons-codec/
[CrateDB]: https://cratedb.com/docs/guide/home/
[CrateDB Cloud]: https://cratedb.com/docs/cloud/
[MongoDB Atlas]: https://www.mongodb.com/atlas
[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/
[croud CLI]: https://cratedb.com/docs/cloud/en/latest/tutorials/deploy/croud.html
[CrateDB Cloud Web Console]: https://cratedb.com/docs/cloud/en/latest/tutorials/quick-start.html#deploy-cluster
6 changes: 5 additions & 1 deletion doc/io/mongodb/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ Using the MongoDB subsystem, you can transfer data from and to MongoDB.
:maxdepth: 1
loader
migr8
cdc
```

:::{note}
The MongoDB Table Loader is an improvement of the traditional {doc}`migr8`.
:::
6 changes: 5 additions & 1 deletion doc/io/mongodb/migr8.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
orphan: true
---

(migr8)=
# migr8
# migr8 migration utility

## About

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ io = [
"sqlalchemy>=2",
]
mongodb = [
"commons-codec[mongodb] @ git+https://github.com/daq-tools/commons-codec.git@mongodb",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down

0 comments on commit f538572

Please sign in to comment.