Skip to content

Commit

Permalink
impr(S3UTILS-181): Add scuba backend to service-level-sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
tmacro committed Nov 18, 2024
1 parent 5ffeb9c commit 73124f1
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 27 deletions.
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,14 +1972,31 @@ REST API to provide service level reports for UtapiV2

## Usage


**Using Warp 10 backend**

```shell
docker run -d \
--network=host \
-e SIDECAR_API_KEY=dev_key_change_me \
-e SIDECAR_SCALE_FACTOR=1.4 \
-e SIDECAR_WARP10_NODE="md1-cluster1:[email protected]:4802" \
scality/s3utils service-level-sidecar/index.js
```

**Using Scuba backend**
```shell
docker run -d \
--network=host \
-e SIDECAR_API_KEY=dev_key_change_me \
-e SIDECAR_SCALE_FACTOR=1.4 \
-e SIDECAR_ENABLE_SCUBA=true \
-e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \
scality/s3utils service-level-sidecar/index.js
```

**Example output**
```shell
curl -X POST -H "Authorization: Bearer dev_key_change_me" localhost:24742/api/report | jq
{
"account": [
Expand Down Expand Up @@ -2120,7 +2137,7 @@ docker run -d \
scality/s3utils service-level-sidecar/index.js
```

#### Warp10
#### Warp 10

The Warp 10 address is configured using `SIDECAR_WARP10_NODE`.
The Warp 10 `nodeId` must be included (normally matches ansible inventory name plus port ie `md1-cluster1:4802`).
Expand All @@ -2133,6 +2150,21 @@ docker run -d \
scality/s3utils service-level-sidecar/index.js
```

#### Scuba

The scuba backend can be enabled by setting `SIDECAR_ENABLE_SCUBA`.
A bucketd address can be provided using `SIDECAR_SCUBA_BUCKETD_BOOTSTRAP`.
If a bucketd address is not provided `127.0.0.1:19000` will be used.
Internal TLS support can be enabled using `SIDECAR_SCUBA_BUCKETD_ENABLE_TLS`.

```shell
docker run -d \
--network=host \
-e SIDECAR_ENABLE_SCUBA=true \
-e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \
scality/s3utils service-level-sidecar/index.js
```

### Other Settings

- Log level can be set using `SIDECAR_LOG_LEVEL` (defaults to `info`)
Expand Down
16 changes: 16 additions & 0 deletions service-level-sidecar/bucketd.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ async function* listBuckets(log) {
}
}

async function getRaftSessionIds(log) {
return new Promise((resolve, reject) => {
metadata.client.getAllRafts(log.getSerializedUids(), (error, res) => {
if (error) {
log.error('error getting raft session ids', { error });
return reject(error);
}

const data = JSON.parse(res);

return resolve(data.map(raft => `${raft.id}`));
}, log);
});
}

module.exports = {
listBuckets,
getRaftSessionIds,
};
6 changes: 6 additions & 0 deletions service-level-sidecar/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ const defaults = {
host: 'localhost',
port: 8500,
},
enableScuba: false,
scubaBucketd: 'localhost:19000',
scubaBucketdTls: false,
};

module.exports = {
Expand All @@ -123,4 +126,7 @@ module.exports = {
readToken: loadFromEnv('WARP10_READ_TOKEN', defaults.warp10.readToken),
...loadFromEnv('WARP10_NODE', defaults.warp10.node, typeCasts.node),
},
enableScuba: loadFromEnv('ENABLE_SCUBA', defaults.enableScuba, typeCasts.bool),
scubaBucketd: loadFromEnv('SCUBA_BUCKETD_BOOTSTRAP', defaults.scubaBucketd),
scubaBucketdTls: loadFromEnv('SCUBA_BUCKETD_ENABLE_TLS', defaults.scubaBucketdTls, typeCasts.bool),
};
59 changes: 36 additions & 23 deletions service-level-sidecar/report.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
const async = require('async');
const util = require('util');

const arsenal = require('arsenal');

const { splitter } = arsenal.constants;

const bucketd = require('./bucketd');
const scuba = require('./scuba');
const env = require('./env');
const warp10 = require('./warp10');
const { getAccountIdForCanonicalId } = require('./vault');
Expand Down Expand Up @@ -43,34 +48,36 @@ class MetricReport {


/**
*
* @param {Array<Number>} sessionIds - raft session ids to retrieve metrics for (only contains info if scuba backend is enabled)
* @param {integer} timestamp - timestamp to retrieve metrics in microseconds
* @param {string} bucket - bucket name to retrieve metrics for
* @param {object} bucket - bucket to retrieve metrics for
* @param {string} bucket.name - bucket name
* @param {string} bucket.account - bucket owner account canonical id
* @param {object} log - werelogs logger instance
* @returns {object} - object count and bytes stored for bucket
*/
async function getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: env.warp10.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await warp10.exec(params);
async function getMetricsForBucket(sessionIds, timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket: bucket.name, timestamp });

if (env.enableScuba) {
const resourceName = `${bucket.account}${splitter}${bucket.name}`;
const logResults = await util.promisify(async.mapLimit)(sessionIds, env.concurrencyLimit, async logId => {
try {
return await scuba.getMetrics('bucket', resourceName, logId, new Date(timestamp), log);
} catch (err) {
log.error('error getting metrics for bucket', { bucket: bucket.name, logId, error: err.message });
throw err;
}
});

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
return logResults
.filter(result => result !== null)
.reduce((acc, result) => ({
count: acc.count + result.value.metrics.objectsTotal,
bytes: acc.bytes + result.value.metrics.bytesTotal,
}), { count: 0, bytes: 0 });
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
return warp10.getMetricsForBucket(timestamp, bucket.name, log);
}

/**
Expand All @@ -92,10 +99,16 @@ async function getServiceReport(timestamp, log) {
const bucketReports = {};
const accountInfoCache = {};

let sessionIds = [];
if (env.enableScuba) {
sessionIds = await bucketd.getRaftSessionIds(log);
sessionIds = sessionIds.filter(id => id !== '0');
}

for await (const buckets of bucketd.listBuckets(log)) {
log.debug('got response from bucketd', { numBuckets: buckets.length });
await util.promisify(async.eachLimit)(buckets, env.concurrencyLimit, async bucket => {
const metrics = await getMetricsForBucket(timestamp, bucket.name, log);
const metrics = await getMetricsForBucket(sessionIds, timestamp, bucket, log);

log.debug('fetched metrics for bucket', { bucket: bucket.name, accCanonicalId: bucket.account });

Expand Down
66 changes: 66 additions & 0 deletions service-level-sidecar/scuba.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
const arsenal = require('arsenal');
const bucketclient = require('bucketclient');

const { BucketClientInterface } = arsenal.storage.metadata.bucketclient;
const { splitter } = arsenal.constants;

const rootLogger = require('./log');
const env = require('./env');
const utils = require('./utils');

const params = {
bucketdBootstrap: [env.scubaBucketd],
https: env.scubaBucketdTls ? env.tls.certs : undefined,
};

const metadata = new BucketClientInterface(params, bucketclient, rootLogger);

const listObjects = utils.retryable(metadata.listObject.bind(metadata));

function roundToDay(timestamp) {
return new Date(
Date.UTC(timestamp.getUTCFullYear(), timestamp.getUTCMonth(), timestamp.getUTCDate(), 23, 59, 59, 999),
);
}

const LENGTH_TS = 14;
const MAX_TS = parseInt(('9'.repeat(LENGTH_TS)), 10);

function formatMetricsKey(resourceName, timestamp) {
const ts = (MAX_TS - roundToDay(timestamp).getTime()).toString().padStart(LENGTH_TS, '0');
return `${resourceName}/${ts}`;
}

async function getMetrics(classType, resourceName, sessionId, timestamp, log) {
const listingParams = {
maxKeys: 1,
listingType: 'Basic',
gte: formatMetricsKey(resourceName, timestamp),
lte: `${resourceName}/${MAX_TS.toString()}`,
};

const bucket = `${classType}${splitter}${sessionId}`;

try {
const resp = await listObjects(bucket, listingParams, log);
if (resp.length === 0) {
return null;
}

const { key, value } = resp[0];
return {
key,
value: JSON.parse(value),
};
} catch (error) {
if (error.NoSuchBucket) {
return null;
}
log.error('error during metric listing', { error: error.message });
throw error;
}
}

module.exports = {
getMetrics,
};
6 changes: 3 additions & 3 deletions service-level-sidecar/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function finishMiddleware(req, res) {
}

// Hash our api key once for reuse during the request
// We prepend `Bearer ` to avoid having to strip it from the header value runtime for comparison
// We prepend `Bearer ` to avoid having to strip it from the header value at runtime for comparison
const actualKeyHash = crypto.createHash('sha512').copy().update(`Bearer ${env.apiKey}`).digest();

// Any handler mounted under `/api/` requires an Authorization header
Expand Down Expand Up @@ -101,7 +101,7 @@ app.post('/api/report', (req, res, next) => {
const timestamp = Date.now() * 1000;
getServiceReportCb(timestamp, req.log, (err, report) => {
if (err) {
req.log.error('error generating metrics report', { error: err });
req.log.error('error generating metrics report', { error: err.message });
next(makeError(500, 'Internal Server Error'));
return;
}
Expand All @@ -119,7 +119,7 @@ app.use((req, res, next) => {

// Catch all error handler
app.use((err, req, res, next) => {
req.log.error('error during request', { error: err });
req.log.error('error during request', { error: err.message });
// Default errors to `500 Internal Server Error` in case something unexpected slips through
const data = {
code: err.status || 500,
Expand Down
24 changes: 24 additions & 0 deletions service-level-sidecar/warp10.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,30 @@ class Warp10Client {
log.debug('warpscript executed', { ...params, stats: resp.meta });
return resp;
}

async getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: this.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await this.exec(params);

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
}
}


Expand Down

0 comments on commit 73124f1

Please sign in to comment.