Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write buffering #76

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
*.zip
integration/tls/cert
resources
timestream-prometheus-connector
13 changes: 1 addition & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ The Prometheus Connector receives and sends time series data between Prometheus
- [Prometheus Connector Specific Errors](#prometheus-connector-specific-errors)
- [Write API Errors](#write-api-errors)
- [Query API Errors](#query-api-errors)
- [Limitations](#limitations)
- [Maximum Prometheus Samples Per Remote Write Request](#maximum-prometheus-samples-per-remote-write-request)
- [Caveats](#caveats)
- [Unsupported SigV4 Authentication](#unsupported-sigv4-authentication)
- [Unsupported Temporary Security Credentials](#unsupported-temporary-security-credentials)
Expand Down Expand Up @@ -67,8 +65,6 @@ The Prometheus Connector is available in the following formats:

To configure Prometheus to read and write to remote storage, configure the `remote_read` and `remote_write` sections in `prometheus.yml`. To learn more, see the [remote read](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_read) and [remote write](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) sections on Prometheus' configuration page.

> **NOTE**: It is recommended to use the default number of samples per write request through `max_samples_per_send`. For more details see [Maximum Prometheus Samples Per Remote Write Request](#maximum-prometheus-samples-per-remote-write-request).

1. Configure Prometheus' remote read and remote write destination by setting the `url` options to the Prometheus Connector's listening URLs, e.g. `"http://localhost:9201/write"`.

2. Configure the basic authentication header for Prometheus read and write requests with valid IAM credentials.
Expand Down Expand Up @@ -952,7 +948,7 @@ All connector-specific errors can be found in [`errors/errors.go`](./errors/erro
| `AccessDeniedException` | 403 | You are not authorized to perform this action | Ensure you have sufficient access to Amazon Timestream. |
| `ResourceNotFoundException` | 404 | The operation tried to access a non-existent resource. | Specify the resource correctly, or check whether its status is not ACTIVE. |
| `ConflictException` | 409 | Amazon Timestream was unable to process this request because it contains a resource that already exists. | Update the request with correct resource. |
| `RejectedRecordsException` | 419 | Amazon Timestream will throw this exception in the following cases: <br> 1. Records with duplicate data where there are multiple records with the same dimensions, timestamps, and measure names but different measure values.<br>2. Records with timestamps that lie outside the retention duration of the memory store. <br>3. Records with dimensions or measures that exceed the Amazon Timestream defined limits. | 1. Check and process the data to ensure that there are no different measure values at the same timestamp given other labels/filters are the same. <br>2. Check or update the retention duration in database. <br>3. Set the maximum number of samples per write request in prometheus.yml to 100. |
| `RejectedRecordsException` | 419 | Amazon Timestream will throw this exception in the following cases: <br> 1. Records with duplicate data where there are multiple records with the same dimensions, timestamps, and measure names but different measure values.<br>2. Records with timestamps that lie outside the retention duration of the memory store. <br>3. Records with dimensions or measures that exceed the Amazon Timestream defined limits. | 1. Check and process the data to ensure that there are no different measure values at the same timestamp given other labels/filters are the same. <br>2. Check or update the retention duration in database. |
| `InvalidEndpointException` | 421 | The requested endpoint was invalid. | Check whether the endpoint is NIL or in an incorrect format. |
| `ThrottlingException` | 429 | Too many requests were made by a user exceeding service quotas. The request was throttled. | Continue to send data at the same (or higher) throughput. Go to [Data Ingestion](https://docs.aws.amazon.com/timestream/latest/developerguide/data-ingest.html) for more information. |
| `InternalServerException` | 500 | Amazon Timestream was unable to fully process this request because of an internal server error. | Please send the request again later. |
Expand All @@ -969,13 +965,6 @@ All connector-specific errors can be found in [`errors/errors.go`](./errors/erro
| `ThrottlingException` | 429 | The request was denied due to request throttling. | Continue to send queries at the same (or higher) throughput. |
| `InternalServerException` | 500 | Amazon Timestream was unable to fully process this request because of an internal server error. | Please send the request again later. |

# Limitations

### Maximum Prometheus Samples Per Remote Write Request

Ingesting more time series than the `Records per WriteRecords API request` value specified in the [Timestream Quotas](https://docs.aws.amazon.com/timestream/latest/developerguide/ts-limits.html) will return a `RejectedRecordsException`, and none of the time series in the Prometheus write request will be ingested to Timestream.
It is recommended to use the default value for `max_samples_per_send` in Prometheus' [remote write configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).

# Caveats

### Unsupported SigV4 Authentication
Expand Down
1 change: 1 addition & 0 deletions integration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ const (
invalidMatcher = 10
value = 1.0
numRecords = 100
largeNumRecords = 452
memStoreRetentionHour = 5
)
8 changes: 8 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func TestWriteClient(t *testing.T) {
}
reqBatch := &prompb.WriteRequest{Timeseries: timeSeriesBatch}

// Request with more than 100 samples
var largeTimeSeriesBatch []*prompb.TimeSeries
for i := 0; i < largeNumRecords; i++ {
largeTimeSeriesBatch = append(largeTimeSeriesBatch, createTimeSeriesTemplate())
}
largeReqBatch := &prompb.WriteRequest{Timeseries: largeTimeSeriesBatch}

timeSeriesBatchFail := append(timeSeriesBatch, createTimeSeriesTemplate())
timeSeriesBatchFail = append(timeSeriesBatchFail, createTimeSeriesTemplate())
reqBatchFail := &prompb.WriteRequest{Timeseries: timeSeriesBatchFail}
Expand All @@ -98,6 +105,7 @@ func TestWriteClient(t *testing.T) {
{"write request with long metric name", reqLongMetric, awsCredentials},
{"write request with long label value", reqLongLabel, awsCredentials},
{"write request with 100 samples per request", reqBatch, awsCredentials},
{"write request with more than 100 samples per request", largeReqBatch, awsCredentials},
}
for _, test := range successTestCase {
t.Run(test.testName, func(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions integration/tls/config/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ scrape_configs:

remote_write:
- url: "https://host.docker.internal:9201/write"
queue_config:
max_samples_per_send: 100

tls_config:
ca_file: "/etc/prometheus/RootCA.pem"
Expand Down
2 changes: 0 additions & 2 deletions serverless/DEVELOPER_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ To view the full set of `sam deploy` options see the [sam deploy documentation](
remote_write:
# Update the value to the InvokeWriteURL returned when deploying the stack.
- url: "InvokeWriteURL"
queue_config:
max_samples_per_send: 100

# Update the username and password to a valid IAM access key and secret access key.
basic_auth:
Expand Down
66 changes: 38 additions & 28 deletions timestream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/timestreamquery"
"github.com/aws/aws-sdk-go/service/timestreamquery/timestreamqueryiface"
Expand All @@ -45,8 +45,8 @@ type labelOperation string
type longMetricsOperation func(measureValueName string) (labelOperation, error)

var addUserAgent = request.NamedHandler {
Name: "UserAgentHandler",
Fn: request.MakeAddToUserAgentHandler("Prometheus Connector", Version),
Name: "UserAgentHandler",
Fn: request.MakeAddToUserAgentHandler("Prometheus Connector", Version),
}

// Store the initialization function calls to allow unit tests to mock the creation of real clients.
Expand All @@ -55,15 +55,15 @@ var initWriteClient = func(config *aws.Config) (timestreamwriteiface.TimestreamW
if err != nil {
return nil, err
}
sess.Handlers.Build.PushFrontNamed(addUserAgent)
sess.Handlers.Build.PushFrontNamed(addUserAgent)
return timestreamwrite.New(sess), nil
}
var initQueryClient = func(config *aws.Config) (timestreamqueryiface.TimestreamQueryAPI, error) {
sess, err := session.NewSession(config)
if err != nil {
return nil, err
}
sess.Handlers.Build.PushFrontNamed(addUserAgent)
sess.Handlers.Build.PushFrontNamed(addUserAgent)
return timestreamquery.New(sess), nil
}

Expand All @@ -82,6 +82,7 @@ var initQueryClient = func(config *aws.Config) (timestreamqueryiface.TimestreamQ
type recordDestinationMap map[string]map[string][]*timestreamwrite.Record

const (
maxWriteBatchLength int = 100
maxMeasureNameLength int = 60
ignored labelOperation = "Ignored"
failed labelOperation = "Failed"
Expand Down Expand Up @@ -211,25 +212,34 @@ func (wc *WriteClient) Write(req *prompb.WriteRequest, credentials *credentials.
var sdkErr error
for database, tableMap := range recordMap {
for table, records := range tableMap {
writeRecordsInput := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(database),
TableName: aws.String(table),
Records: records,
}
begin := time.Now()
_, err = wc.timestreamWrite.WriteRecords(writeRecordsInput)
duration := time.Since(begin).Seconds()
if err != nil {
sdkErr = wc.handleSDKErr(req, err, sdkErr)
} else {
LogInfo(wc.logger, fmt.Sprintf("Successfully wrote %d records to database: %s table: %s", len(writeRecordsInput.Records), database, table))
recordsIgnored := getCounterValue(wc.ignoredSamples)
if (recordsIgnored > 0) {
LogInfo(wc.logger, fmt.Sprintf("%d number of records were rejected for ingestion to Timestream. See Troubleshooting in the README for why these may be rejected, or turn on debug logging for additional info.", recordsIgnored))
// Timestream will return an error if more than 100 records are sent in a batch.
// Therefore, records should be chunked if there are more than 100 of them
var chunkEndIndex int
for chunkStartIndex := 0; chunkStartIndex < len(records); chunkStartIndex += maxWriteBatchLength {
chunkEndIndex += maxWriteBatchLength
if chunkEndIndex > len(records) {
chunkEndIndex = len(records)
}
writeRecordsInput := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(database),
TableName: aws.String(table),
Records: records[chunkStartIndex:chunkEndIndex],
}
begin := time.Now()
_, err = wc.timestreamWrite.WriteRecords(writeRecordsInput)
duration := time.Since(begin).Seconds()
if err != nil {
sdkErr = wc.handleSDKErr(req, err, sdkErr)
} else {
LogInfo(wc.logger, fmt.Sprintf("Successfully wrote %d records to database: %s table: %s", len(writeRecordsInput.Records), database, table))
recordsIgnored := getCounterValue(wc.ignoredSamples)
if (recordsIgnored > 0) {
LogInfo(wc.logger, fmt.Sprintf("%d number of records were rejected for ingestion to Timestream. See Troubleshooting in the README for why these may be rejected, or turn on debug logging for additional info.", recordsIgnored))
}
}
wc.writeExecutionTime.Observe(duration)
wc.writeRequests.Inc()
}
wc.writeExecutionTime.Observe(duration)
wc.writeRequests.Inc()
}
}

Expand Down Expand Up @@ -297,7 +307,7 @@ func (qc *QueryClient) Read(req *prompb.ReadRequest, credentials *credentials.Cr
func (wc *WriteClient) handleSDKErr(req *prompb.WriteRequest, currErr error, errToReturn error) error {
requestError, ok := currErr.(awserr.RequestFailure)
if !ok {
LogError(wc.logger, fmt.Sprintf("Error occurred while ingesting Timestream Records. %d records failed to be written", len(req.Timeseries)), currErr)
LogError(wc.logger, fmt.Sprintf("Error occurred while ingesting Timestream Records. %d records failed to be written", len(req.Timeseries)), currErr)
return errors.NewSDKNonRequestError(currErr)
}

Expand Down Expand Up @@ -711,9 +721,9 @@ func (c *Client) Collect(ch chan<- prometheus.Metric) {

// Get the value of a counter
func getCounterValue(collector prometheus.Collector) int {
channel := make(chan prometheus.Metric, 1) // 1 denotes no Vector
collector.Collect(channel)
metric := prometheusClientModel.Metric{}
_ = (<-channel).Write(&metric)
return int(*metric.Counter.Value)
channel := make(chan prometheus.Metric, 1) // 1 denotes no Vector
collector.Collect(channel)
metric := prometheusClientModel.Metric{}
_ = (<-channel).Write(&metric)
return int(*metric.Counter.Value)
}
Loading