Skip to content

Commit

Permalink
fix(js/plugins/google-cloud): Make metric export time intervals mutua…
Browse files Browse the repository at this point in the history
…lly exclusive (#1869)

* fix(js/plugins/google-cloud): Implement forceFlush and shutdown hooks for metrics.

* fix(js/plugins/google-cloud): Modify start times to ensure no overlap in exports

* fix(js/plugins/google-cloud): Update startTime mutation comment to include more of the why
  • Loading branch information
schnecle authored Feb 11, 2025
1 parent f7f8701 commit 198e91e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 109 deletions.
78 changes: 60 additions & 18 deletions js/plugins/google-cloud/src/gcpOpenTelemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,6 @@ export class GcpOpenTelemetry {
)
)
: new InMemoryMetricExporter(AggregationTemporality.DELTA);
exporter.selectAggregation = (instrumentType: InstrumentType) => {
if (instrumentType === InstrumentType.HISTOGRAM) {
return new ExponentialHistogramAggregation();
}
return new DefaultAggregation();
};
exporter.selectAggregationTemporality = (
instrumentType: InstrumentType
) => {
return AggregationTemporality.DELTA;
};
return exporter;
}
}
Expand All @@ -218,24 +207,77 @@ export class GcpOpenTelemetry {
* helpful information about how to set up metrics/telemetry in GCP.
*/
class MetricExporterWrapper extends MetricExporter {
private promise = new Promise<void>((resolve) => resolve());

constructor(
options?: ExporterOptions,
private errorHandler?: (error: Error) => void
) {
super(options);
}

export(
async export(
metrics: ResourceMetrics,
resultCallback: (result: ExportResult) => void
): void {
super.export(metrics, (result) => {
if (this.errorHandler && result.error) {
this.errorHandler(result.error);
}
resultCallback(result);
): Promise<void> {
await this.promise;
this.modifyStartTimes(metrics);
this.promise = new Promise<void>((resolve) => {
super.export(metrics, (result) => {
try {
if (this.errorHandler && result.error) {
this.errorHandler(result.error);
}
resultCallback(result);
} finally {
resolve();
}
});
});
}

selectAggregation(instrumentType: InstrumentType) {
if (instrumentType === InstrumentType.HISTOGRAM) {
return new ExponentialHistogramAggregation();
}
return new DefaultAggregation();
}

selectAggregationTemporality(instrumentType: InstrumentType) {
return AggregationTemporality.DELTA;
}

/**
* Modify the start times of each data point to ensure no
* overlap with previous exports.
*
* Cloud metrics do not support delta metrics for custom metrics
* and will convert any DELTA aggregations to CUMULATIVE ones on
* export. There is implicit overlap in the start/end times that
* the Metric reader is sending -- the end_time of the previous
* export will become the start_time of the current export. The
* overlap in times means that only one of those records will
* persist and the other will be overwritten. This
* method adds a thousandth of a second to ensure discrete export
* timeframes.
*/
private modifyStartTimes(metrics: ResourceMetrics): void {
metrics.scopeMetrics.forEach((scopeMetric) => {
scopeMetric.metrics.forEach((metric) => {
metric.dataPoints.forEach((dataPoint) => {
dataPoint.startTime[1] = dataPoint.startTime[1] + 1_000_000;
});
});
});
}

async shutdown(): Promise<void> {
return await this.forceFlush();
}

async forceFlush(): Promise<void> {
await this.promise;
}
}

/**
Expand Down
91 changes: 2 additions & 89 deletions js/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion js/testapps/dev-ui-gallery/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"dependencies": {
"@genkit-ai/dev-local-vectorstore": "workspace:*",
"@genkit-ai/evaluator": "workspace:*",
"@genkit-ai/firebase": "^0.9.12",
"@genkit-ai/firebase": "workspace:*",
"@genkit-ai/google-cloud": "workspace:*",
"@genkit-ai/googleai": "workspace:*",
"@genkit-ai/vertexai": "workspace:*",
Expand Down
3 changes: 2 additions & 1 deletion js/testapps/dev-ui-gallery/src/genkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ logger.setLogLevel('info');

enableFirebaseTelemetry({
forceDevExport: false,
metricExportIntervalMillis: 5000,
metricExportIntervalMillis: 5_000,
metricExportTimeoutMillis: 5_000,
autoInstrumentation: true,
autoInstrumentationConfig: {
'@opentelemetry/instrumentation-fs': { enabled: false },
Expand Down

0 comments on commit 198e91e

Please sign in to comment.