diff --git a/.gitignore b/.gitignore
index 2140260..22df5f7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,4 @@
*.zip
integration/tls/cert
resources
+timestream-prometheus-connector
diff --git a/README.md b/README.md
index 4b67ee8..5b8d3ff 100644
--- a/README.md
+++ b/README.md
@@ -25,8 +25,6 @@ The Prometheus Connector receives and sends time series data between Prometheus
- [Prometheus Connector Specific Errors](#prometheus-connector-specific-errors)
- [Write API Errors](#write-api-errors)
- [Query API Errors](#query-api-errors)
-- [Limitations](#limitations)
- - [Maximum Prometheus Samples Per Remote Write Request](#maximum-prometheus-samples-per-remote-write-request)
- [Caveats](#caveats)
- [Unsupported SigV4 Authentication](#unsupported-sigv4-authentication)
- [Unsupported Temporary Security Credentials](#unsupported-temporary-security-credentials)
@@ -67,8 +65,6 @@ The Prometheus Connector is available in the following formats:
To configure Prometheus to read and write to remote storage, configure the `remote_read` and `remote_write` sections in `prometheus.yml`. To learn more, see the [remote read](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_read) and [remote write](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) sections on Prometheus' configuration page.
-> **NOTE**: It is recommended to use the default number of samples per write request through `max_samples_per_send`. For more details see [Maximum Prometheus Samples Per Remote Write Request](#maximum-prometheus-samples-per-remote-write-request).
-
1. Configure Prometheus' remote read and remote write destination by setting the `url` options to the Prometheus Connector's listening URLs, e.g. `"http://localhost:9201/write"`.
2. Configure the basic authentication header for Prometheus read and write requests with valid IAM credentials.
@@ -952,7 +948,7 @@ All connector-specific errors can be found in [`errors/errors.go`](./errors/erro
| `AccessDeniedException` | 403 | You are not authorized to perform this action | Ensure you have sufficient access to Amazon Timestream. |
| `ResourceNotFoundException` | 404 | The operation tried to access a non-existent resource. | Specify the resource correctly, or check whether its status is not ACTIVE. |
| `ConflictException` | 409 | Amazon Timestream was unable to process this request because it contains a resource that already exists. | Update the request with correct resource. |
-| `RejectedRecordsException` | 419 | Amazon Timestream will throw this exception in the following cases:
1. Records with duplicate data where there are multiple records with the same dimensions, timestamps, and measure names but different measure values.
2. Records with timestamps that lie outside the retention duration of the memory store.
3. Records with dimensions or measures that exceed the Amazon Timestream defined limits. | 1. Check and process the data to ensure that there are no different measure values at the same timestamp given other labels/filters are the same.
2. Check or update the retention duration in database.
3. Set the maximum number of samples per write request in prometheus.yml to 100. |
+| `RejectedRecordsException` | 419 | Amazon Timestream will throw this exception in the following cases:
1. Records with duplicate data where there are multiple records with the same dimensions, timestamps, and measure names but different measure values.
2. Records with timestamps that lie outside the retention duration of the memory store.
3. Records with dimensions or measures that exceed the Amazon Timestream defined limits. | 1. Check and process the data to ensure that there are no different measure values at the same timestamp given other labels/filters are the same.
2. Check or update the retention duration in database. |
| `InvalidEndpointException` | 421 | The requested endpoint was invalid. | Check whether the endpoint is NIL or in an incorrect format. |
| `ThrottlingException` | 429 | Too many requests were made by a user exceeding service quotas. The request was throttled. | Continue to send data at the same (or higher) throughput. Go to [Data Ingestion](https://docs.aws.amazon.com/timestream/latest/developerguide/data-ingest.html) for more information. |
| `InternalServerException` | 500 | Amazon Timestream was unable to fully process this request because of an internal server error. | Please send the request again later. |
@@ -969,13 +965,6 @@ All connector-specific errors can be found in [`errors/errors.go`](./errors/erro
| `ThrottlingException` | 429 | The request was denied due to request throttling. | Continue to send queries at the same (or higher) throughput. |
| `InternalServerException` | 500 | Amazon Timestream was unable to fully process this request because of an internal server error. | Please send the request again later. |
-# Limitations
-
-### Maximum Prometheus Samples Per Remote Write Request
-
-Ingesting more time series than the `Records per WriteRecords API request` value specified in the [Timestream Quotas](https://docs.aws.amazon.com/timestream/latest/developerguide/ts-limits.html) will return a `RejectedRecordsException`, and none of the time series in the Prometheus write request will be ingested to Timestream.
-It is recommended to use the default value for `max_samples_per_send` in Prometheus' [remote write configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
-
# Caveats
### Unsupported SigV4 Authentication
diff --git a/integration/constants.go b/integration/constants.go
index 23b26e2..3c1d4eb 100644
--- a/integration/constants.go
+++ b/integration/constants.go
@@ -27,5 +27,6 @@ const (
invalidMatcher = 10
value = 1.0
numRecords = 100
+ largeNumRecords = 452
memStoreRetentionHour = 5
)
diff --git a/integration/integration_test.go b/integration/integration_test.go
index 6de6aaa..93fbcda 100644
--- a/integration/integration_test.go
+++ b/integration/integration_test.go
@@ -79,6 +79,13 @@ func TestWriteClient(t *testing.T) {
}
reqBatch := &prompb.WriteRequest{Timeseries: timeSeriesBatch}
+ // Request with more than 100 samples
+ var largeTimeSeriesBatch []*prompb.TimeSeries
+ for i := 0; i < largeNumRecords; i++ {
+ largeTimeSeriesBatch = append(largeTimeSeriesBatch, createTimeSeriesTemplate())
+ }
+ largeReqBatch := &prompb.WriteRequest{Timeseries: largeTimeSeriesBatch}
+
timeSeriesBatchFail := append(timeSeriesBatch, createTimeSeriesTemplate())
timeSeriesBatchFail = append(timeSeriesBatchFail, createTimeSeriesTemplate())
reqBatchFail := &prompb.WriteRequest{Timeseries: timeSeriesBatchFail}
@@ -98,6 +105,7 @@ func TestWriteClient(t *testing.T) {
{"write request with long metric name", reqLongMetric, awsCredentials},
{"write request with long label value", reqLongLabel, awsCredentials},
{"write request with 100 samples per request", reqBatch, awsCredentials},
+ {"write request with more than 100 samples per request", largeReqBatch, awsCredentials},
}
for _, test := range successTestCase {
t.Run(test.testName, func(t *testing.T) {
diff --git a/integration/tls/config/prometheus.yml b/integration/tls/config/prometheus.yml
index ed5c68c..66f2020 100644
--- a/integration/tls/config/prometheus.yml
+++ b/integration/tls/config/prometheus.yml
@@ -11,8 +11,6 @@ scrape_configs:
remote_write:
- url: "https://host.docker.internal:9201/write"
- queue_config:
- max_samples_per_send: 100
tls_config:
ca_file: "/etc/prometheus/RootCA.pem"
diff --git a/serverless/DEVELOPER_README.md b/serverless/DEVELOPER_README.md
index a2e2827..f9fae3e 100644
--- a/serverless/DEVELOPER_README.md
+++ b/serverless/DEVELOPER_README.md
@@ -172,8 +172,6 @@ To view the full set of `sam deploy` options see the [sam deploy documentation](
remote_write:
# Update the value to the InvokeWriteURL returned when deploying the stack.
- url: "InvokeWriteURL"
- queue_config:
- max_samples_per_send: 100
# Update the username and password to a valid IAM access key and secret access key.
basic_auth:
diff --git a/timestream/client.go b/timestream/client.go
index 8c72fd8..bbed550 100644
--- a/timestream/client.go
+++ b/timestream/client.go
@@ -22,7 +22,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
- "github.com/aws/aws-sdk-go/aws/request"
+ "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/timestreamquery"
"github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface"
@@ -45,8 +45,8 @@ type labelOperation string
type longMetricsOperation func(measureValueName string) (labelOperation, error)
var addUserAgent = request.NamedHandler {
- Name: "UserAgentHandler",
- Fn: request.MakeAddToUserAgentHandler("Prometheus Connector", Version),
+ Name: "UserAgentHandler",
+ Fn: request.MakeAddToUserAgentHandler("Prometheus Connector", Version),
}
// Store the initialization function calls to allow unit tests to mock the creation of real clients.
@@ -55,7 +55,7 @@ var initWriteClient = func(config *aws.Config) (timestreamwriteiface.TimestreamW
if err != nil {
return nil, err
}
- sess.Handlers.Build.PushFrontNamed(addUserAgent)
+ sess.Handlers.Build.PushFrontNamed(addUserAgent)
return timestreamwrite.New(sess), nil
}
var initQueryClient = func(config *aws.Config) (timestreamqueryiface.TimestreamQueryAPI, error) {
@@ -63,7 +63,7 @@ var initQueryClient = func(config *aws.Config) (timestreamqueryiface.TimestreamQ
if err != nil {
return nil, err
}
- sess.Handlers.Build.PushFrontNamed(addUserAgent)
+ sess.Handlers.Build.PushFrontNamed(addUserAgent)
return timestreamquery.New(sess), nil
}
@@ -82,6 +82,7 @@ var initQueryClient = func(config *aws.Config) (timestreamqueryiface.TimestreamQ
type recordDestinationMap map[string]map[string][]*timestreamwrite.Record
const (
+ maxWriteBatchLength int = 100
maxMeasureNameLength int = 60
ignored labelOperation = "Ignored"
failed labelOperation = "Failed"
@@ -211,25 +212,34 @@ func (wc *WriteClient) Write(req *prompb.WriteRequest, credentials *credentials.
var sdkErr error
for database, tableMap := range recordMap {
for table, records := range tableMap {
- writeRecordsInput := ×treamwrite.WriteRecordsInput{
- DatabaseName: aws.String(database),
- TableName: aws.String(table),
- Records: records,
- }
- begin := time.Now()
- _, err = wc.timestreamWrite.WriteRecords(writeRecordsInput)
- duration := time.Since(begin).Seconds()
- if err != nil {
- sdkErr = wc.handleSDKErr(req, err, sdkErr)
- } else {
- LogInfo(wc.logger, fmt.Sprintf("Successfully wrote %d records to database: %s table: %s", len(writeRecordsInput.Records), database, table))
- recordsIgnored := getCounterValue(wc.ignoredSamples)
- if (recordsIgnored > 0) {
- LogInfo(wc.logger, fmt.Sprintf("%d number of records were rejected for ingestion to Timestream. See Troubleshooting in the README for why these may be rejected, or turn on debug logging for additional info.", recordsIgnored))
+ // Timestream will return an error if more than 100 records are sent in a batch.
+ // Therefore, records should be chunked if there are more than 100 of them
+ var chunkEndIndex int
+ for chunkStartIndex := 0; chunkStartIndex < len(records); chunkStartIndex += maxWriteBatchLength {
+ chunkEndIndex += maxWriteBatchLength
+ if chunkEndIndex > len(records) {
+ chunkEndIndex = len(records)
+ }
+ writeRecordsInput := ×treamwrite.WriteRecordsInput{
+ DatabaseName: aws.String(database),
+ TableName: aws.String(table),
+ Records: records[chunkStartIndex:chunkEndIndex],
+ }
+ begin := time.Now()
+ _, err = wc.timestreamWrite.WriteRecords(writeRecordsInput)
+ duration := time.Since(begin).Seconds()
+ if err != nil {
+ sdkErr = wc.handleSDKErr(req, err, sdkErr)
+ } else {
+ LogInfo(wc.logger, fmt.Sprintf("Successfully wrote %d records to database: %s table: %s", len(writeRecordsInput.Records), database, table))
+ recordsIgnored := getCounterValue(wc.ignoredSamples)
+ if (recordsIgnored > 0) {
+ LogInfo(wc.logger, fmt.Sprintf("%d number of records were rejected for ingestion to Timestream. See Troubleshooting in the README for why these may be rejected, or turn on debug logging for additional info.", recordsIgnored))
+ }
}
+ wc.writeExecutionTime.Observe(duration)
+ wc.writeRequests.Inc()
}
- wc.writeExecutionTime.Observe(duration)
- wc.writeRequests.Inc()
}
}
@@ -297,7 +307,7 @@ func (qc *QueryClient) Read(req *prompb.ReadRequest, credentials *credentials.Cr
func (wc *WriteClient) handleSDKErr(req *prompb.WriteRequest, currErr error, errToReturn error) error {
requestError, ok := currErr.(awserr.RequestFailure)
if !ok {
- LogError(wc.logger, fmt.Sprintf("Error occurred while ingesting Timestream Records. %d records failed to be written", len(req.Timeseries)), currErr)
+ LogError(wc.logger, fmt.Sprintf("Error occurred while ingesting Timestream Records. %d records failed to be written", len(req.Timeseries)), currErr)
return errors.NewSDKNonRequestError(currErr)
}
@@ -711,9 +721,9 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {
// Get the value of a counter
func getCounterValue(collector prometheus.Collector) int {
- channel := make(chan prometheus.Metric, 1) // 1 denotes no Vector
- collector.Collect(channel)
- metric := prometheusClientModel.Metric{}
- _ = (<-channel).Write(&metric)
- return int(*metric.Counter.Value)
+ channel := make(chan prometheus.Metric, 1) // 1 denotes no Vector
+ collector.Collect(channel)
+ metric := prometheusClientModel.Metric{}
+ _ = (<-channel).Write(&metric)
+ return int(*metric.Counter.Value)
}