Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tags uploading optimization #257

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lomik/zapwriter"

"github.com/lomik/graphite-clickhouse/cache"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/date"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/limiter"
Expand Down Expand Up @@ -260,11 +261,15 @@ func clickhouseURLValidate(chURL string) (*url.URL, error) {

// Tags config
type Tags struct {
Rules string `toml:"rules" json:"rules"`
Date string `toml:"date" json:"date"`
ExtraWhere string `toml:"extra-where" json:"extra-where"`
InputFile string `toml:"input-file" json:"input-file"`
OutputFile string `toml:"output-file" json:"output-file"`
Rules string `toml:"rules" json:"rules"`
Date string `toml:"date" json:"date"`
ExtraWhere string `toml:"extra-where" json:"extra-where"`
InputFile string `toml:"input-file" json:"input-file"`
OutputFile string `toml:"output-file" json:"output-file"`
Threads int `toml:"threads" json:"threads" comment:"number of threads for uploading tags to clickhouse (1 by default)"`
Compression clickhouse.ContentEncoding `toml:"compression" json:"compression" comment:"compression method for tags before sending them to clickhouse (i.e. content encoding): gzip (default), none, zstd"`
Version uint32 `toml:"version" json:"version" comment:"fixed tags version for testing purposes (by default the current timestamp is used for each upload)"`
SelectChunksCount int `toml:"select-chunks-count" json:"select-chunks-count" comment:"number of chunks for selecting metrics from clickhouse (10 by default)"`
}

// Carbonlink configuration
Expand Down Expand Up @@ -385,7 +390,10 @@ func New() *Config {
FindLimiter: limiter.NoopLimiter{},
TagsLimiter: limiter.NoopLimiter{},
},
Tags: Tags{},
Tags: Tags{
Threads: 1,
Compression: "gzip",
},
Carbonlink: Carbonlink{
Threads: 10,
Retries: 2,
Expand Down
18 changes: 15 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ date = "2012-12-12"
extra-where = "AND case"
input-file = "input"
output-file = "output"
threads = 5
compression = "zstd"
version = 42
select-chunks-count = 15

[carbonlink]
server = "server:3333"
Expand Down Expand Up @@ -382,7 +386,7 @@ sample-thereafter = 12
assert.Equal(t, expected.ClickHouse, config.ClickHouse)

// Tags
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"}
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output", 5, "zstd", 42, 15}
assert.Equal(t, expected.Tags, config.Tags)

// Carbonlink
Expand Down Expand Up @@ -504,6 +508,10 @@ date = "2012-12-12"
extra-where = "AND case"
input-file = "input"
output-file = "output"
threads = 5
compression = "zstd"
version = 42
select-chunks-count = 15

[carbonlink]
server = "server:3333"
Expand Down Expand Up @@ -682,7 +690,7 @@ sample-thereafter = 12
assert.Equal(t, expected.ClickHouse, config.ClickHouse)

// Tags
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"}
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output", 5, "zstd", 42, 15}
assert.Equal(t, expected.Tags, config.Tags)

// Carbonlink
Expand Down Expand Up @@ -814,6 +822,10 @@ date = "2012-12-12"
extra-where = "AND case"
input-file = "input"
output-file = "output"
threads = 5
compression = "zstd"
version = 42
select-chunks-count = 15

[carbonlink]
server = "server:3333"
Expand Down Expand Up @@ -997,7 +1009,7 @@ sample-thereafter = 12
assert.Equal(t, expected.ClickHouse, config.ClickHouse)

// Tags
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output"}
expected.Tags = Tags{"filename", "2012-12-12", "AND case", "input", "output", 5, "zstd", 42, 15}
assert.Equal(t, expected.Tags, config.Tags)

// Carbonlink
Expand Down
8 changes: 8 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
# extra-where = ""
# input-file = ""
# output-file = ""
# number of threads for uploading tags to clickhouse (1 by default)
# threads = 1
# compression method for tags before sending them to clickhouse (i.e. content encoding): gzip (default), none, zstd
# compression = "gzip"
# fixed tags version for testing purposes (by default the current timestamp is used for each upload)
# version = 0
# number of chunks for selecting metrics from clickhouse (10 by default)
# select-chunks-count = 0

[carbonlink]
server = ""
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/lomik/stop v0.0.0-20161127103810-188e98d969bd // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
38 changes: 31 additions & 7 deletions helper/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -30,6 +31,14 @@ type ErrWithDescr struct {
data string
}

type ContentEncoding string

const (
ContentEncodingNone ContentEncoding = "none"
ContentEncodingGzip ContentEncoding = "gzip"
ContentEncodingZstd ContentEncoding = "zstd"
)

func NewErrWithDescr(err string, data string) error {
return &ErrWithDescr{err, data}
}
Expand Down Expand Up @@ -183,18 +192,23 @@ func Query(ctx context.Context, dsn string, query string, opts Options, extData
}

func Post(ctx context.Context, dsn string, query string, postBody io.Reader, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
return do(ctx, dsn, query, postBody, false, opts, extData)
return do(ctx, dsn, query, postBody, ContentEncodingNone, opts, extData)
}

// Deprecated: use PostWithEncoding instead
func PostGzip(ctx context.Context, dsn string, query string, postBody io.Reader, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
return do(ctx, dsn, query, postBody, true, opts, extData)
return do(ctx, dsn, query, postBody, ContentEncodingGzip, opts, extData)
}

func PostWithEncoding(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
return do(ctx, dsn, query, postBody, encoding, opts, extData)
}

func Reader(ctx context.Context, dsn string, query string, opts Options, extData *ExternalData) (*LoggedReader, error) {
return reader(ctx, dsn, query, nil, false, opts, extData)
return reader(ctx, dsn, query, nil, ContentEncodingNone, opts, extData)
}

func reader(ctx context.Context, dsn string, query string, postBody io.Reader, gzip bool, opts Options, extData *ExternalData) (bodyReader *LoggedReader, err error) {
func reader(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) (bodyReader *LoggedReader, err error) {
if postBody != nil && extData != nil {
err = fmt.Errorf("postBody and extData could not be passed in one request")
return
Expand Down Expand Up @@ -265,8 +279,15 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
req.Header.Add("Content-Type", contentHeader)
}

if gzip {
switch encoding {
case ContentEncodingNone:
// no encoding
case ContentEncodingGzip:
req.Header.Add("Content-Encoding", "gzip")
case ContentEncodingZstd:
req.Header.Add("Content-Encoding", "zstd")
default:
return nil, fmt.Errorf("unknown encoding: %s", encoding)
}

client := &http.Client{
Expand Down Expand Up @@ -305,6 +326,9 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
read_bytes, _ = strconv.ParseInt(v, 10, 64)
}
}
sort.Slice(fields, func(i int, j int) bool {
return fields[i].Key < fields[j].Key
})
logger = logger.With(fields...)
} else {
logger.Warn("query", zap.Error(err), zap.String("clickhouse-summary", summaryHeader))
Expand Down Expand Up @@ -337,8 +361,8 @@ func reader(ctx context.Context, dsn string, query string, postBody io.Reader, g
return
}

func do(ctx context.Context, dsn string, query string, postBody io.Reader, gzip bool, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
bodyReader, err := reader(ctx, dsn, query, postBody, gzip, opts, extData)
func do(ctx context.Context, dsn string, query string, postBody io.Reader, encoding ContentEncoding, opts Options, extData *ExternalData) ([]byte, int64, int64, error) {
bodyReader, err := reader(ctx, dsn, query, postBody, encoding, opts, extData)
if err != nil {
return nil, 0, 0, err
}
Expand Down
Loading
Loading