diff --git a/.github/workflows/test-coverage.yml b/.github/workflows/test-coverage.yml index 9b8be23ef2..38c2ab86b4 100644 --- a/.github/workflows/test-coverage.yml +++ b/.github/workflows/test-coverage.yml @@ -236,6 +236,27 @@ jobs: coverage-artifact-name: "coverage_encryption" coverage-path: coverage.txt + # This job runs the database with telemetry tests using default configuration, on linux. + test-telemetry: + name: Test telemetry job + + runs-on: ubuntu-latest + + env: + GOFLAGS: -tags=telemetry + + steps: + - name: Checkout code into the directory + uses: actions/checkout@v4 + + - name: Setup defradb + uses: ./.github/composites/setup-defradb + + - name: Test coverage & save coverage report in an artifact + uses: ./.github/composites/test-coverage-with-artifact + with: + coverage-artifact-name: "coverage_telemetry" + coverage-path: coverage.txt ## This job gathers all the coverage reports and uploads them to code-cov upload-coverage: @@ -248,6 +269,7 @@ jobs: - test-lens # 2 test(s) - test-view # 1 test(s) - test-encryption # 1 test(s) + - test-telemetry # 1 test(s) # Important to know: # - We didn't use `if: always()` here, so this job doesn't run if we manually canceled. diff --git a/README.md b/README.md index 368dbe4e0c..49a7e20c24 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Read the documentation on [docs.source.network](https://docs.source.network/). * [Access Control System](#access-control-system) * [Supporting CORS](#supporting-cors) * [Backing up and restoring](#backing-up-and-restoring) + * [Telemetry](#telemetry) * [Community](#community) * [Licensing](#licensing) * [Contributors](#contributors) @@ -473,6 +474,14 @@ To restore the data, run the following command: defradb client backup import path/to/backup.json ``` +## Telemetry + +DefraDB has no telemetry reporting by default. To enable OpenTelemetry in DefraDB you must build with the `telemetry` tag set. To configure the HTTP exporters use the environment variables in the links below. + +[Metric exporter documentation](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp) + +[Trace exporter documentation](https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp) + ## Community Discuss on [Discord](https://discord.gg/w7jYQVJ) or [Github Discussions](https://github.com/sourcenetwork/defradb/discussions). The Source project is on [Twitter](https://twitter.com/sourcenetwrk). diff --git a/cli/config.go b/cli/config.go index f17b3e5ead..113349bf74 100644 --- a/cli/config.go +++ b/cli/config.go @@ -68,6 +68,7 @@ var configFlags = map[string]string{ "source-hub-address": "acp.sourceHub.address", "development": "development", "secret-file": "secretfile", + "no-telemetry": "telemetry.disabled", } // configDefaults contains default values for config entries. @@ -96,6 +97,7 @@ var configDefaults = map[string]any{ "log.source": false, "log.stacktrace": false, "secretfile": ".env", + "telemetry.disabled": false, } // defaultConfig returns a new config with default values. diff --git a/cli/config_test.go b/cli/config_test.go index 36421bd42f..6c6befefd0 100644 --- a/cli/config_test.go +++ b/cli/config_test.go @@ -70,4 +70,5 @@ func TestLoadConfigNotExist(t *testing.T) { assert.Equal(t, "file", cfg.GetString("keyring.backend")) assert.Equal(t, false, cfg.GetBool("development")) + assert.Equal(t, false, cfg.GetBool("telemetry.disabled")) } diff --git a/cli/start.go b/cli/start.go index ad2f79c928..385bfc91ec 100644 --- a/cli/start.go +++ b/cli/start.go @@ -26,9 +26,11 @@ import ( "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/http" "github.com/sourcenetwork/defradb/internal/db" + "github.com/sourcenetwork/defradb/internal/telemetry" "github.com/sourcenetwork/defradb/keyring" "github.com/sourcenetwork/defradb/net" "github.com/sourcenetwork/defradb/node" + "github.com/sourcenetwork/defradb/version" ) const devModeBanner = ` @@ -149,6 +151,17 @@ func MakeStartCommand() *cobra.Command { } } + if !cfg.GetBool("no-telemetry") { + ver, err := version.NewDefraVersion() + if err != nil { + return err + } + err = telemetry.ConfigureTelemetry(cmd.Context(), ver.String()) + if err != nil { + log.ErrorContextE(cmd.Context(), "failed to configure telemetry", err) + } + } + signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) @@ -250,6 +263,11 @@ func MakeStartCommand() *cobra.Command { "no-encryption", cfg.GetBool(configFlags["no-encryption"]), "Skip generating an encryption key. Encryption at rest will be disabled. WARNING: This cannot be undone.") + cmd.PersistentFlags().Bool( + "no-telemetry", + cfg.GetBool(configFlags["no-telemetry"]), + "Disables telemetry reporting. Telemetry is only enabled in builds that use the telemetry flag.", + ) return cmd } diff --git a/docs/config.md b/docs/config.md index b6df8e03a8..1b86fecf10 100644 --- a/docs/config.md +++ b/docs/config.md @@ -164,3 +164,7 @@ SourceHub ACP requests in order to create protected data. ## `secretfile` Path to the file containing secrets. Defaults to `.env`. + +## `telemetry.disabled` + +Disables telemetry reporting. Telemetry is only enabled in builds that use the `telemetry` flag. Defaults to `false`. diff --git a/docs/website/references/cli/defradb_start.md b/docs/website/references/cli/defradb_start.md index 5aea7e8ed0..6428e22112 100644 --- a/docs/website/references/cli/defradb_start.md +++ b/docs/website/references/cli/defradb_start.md @@ -21,6 +21,7 @@ defradb start [flags] --max-txn-retries int Specify the maximum number of retries per transaction (default 5) --no-encryption Skip generating an encryption key. Encryption at rest will be disabled. WARNING: This cannot be undone. --no-p2p Disable the peer-to-peer network synchronization system + --no-telemetry Disables telemetry reporting. Telemetry is only enabled in builds that use the telemetry flag. --p2paddr strings Listen addresses for the p2p network (formatted as a libp2p MultiAddr) (default [/ip4/127.0.0.1/tcp/9171]) --peers stringArray List of peers to connect to --privkeypath string Path to the private key for tls diff --git a/go.mod b/go.mod index 2671639f76..48dfb00630 100644 --- a/go.mod +++ b/go.mod @@ -57,8 +57,13 @@ require ( github.com/valyala/fastjson v1.6.4 github.com/vito/go-sse v1.1.2 github.com/zalando/go-keyring v0.2.6 - go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/contrib/instrumentation/runtime v0.59.0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 + go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.32.0 golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 @@ -188,7 +193,7 @@ require ( github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -353,9 +358,9 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/sdk v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/dig v1.18.0 // indirect go.uber.org/fx v1.23.0 // indirect go.uber.org/mock v0.5.0 // indirect @@ -372,8 +377,8 @@ require ( gonum.org/v1/gonum v0.15.1 // indirect google.golang.org/api v0.171.0 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect google.golang.org/protobuf v1.36.4 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index de9464689f..0e08ccc5e1 100644 --- a/go.sum +++ b/go.sum @@ -755,8 +755,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= @@ -1528,8 +1528,16 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= +go.opentelemetry.io/contrib/instrumentation/runtime v0.59.0 h1:rfi2MMujBc4yowE0iHckZX4o4jg6SA67EnFVL8ldVvU= +go.opentelemetry.io/contrib/instrumentation/runtime v0.59.0/go.mod h1:IO/gfPEcQYpOpPxn1OXFp1DvRY0viP8ONMedXLjjHIU= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0 h1:opwv08VbCZ8iecIWs+McMdHRcAXzjAeda3uG2kI/hcA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0/go.mod h1:oOP3ABpW7vFHulLpE8aYtNBodrHhMTrvfxUXGvqm7Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 h1:lUsI2TYsQw2r1IASwoROaCnjdj2cvC2+Jbxvk6nHnWU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0/go.mod h1:2HpZxxQurfGxJlJDblybejHB6RX6pmExPNe517hREw4= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= @@ -1539,6 +1547,8 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6Yv go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -2144,10 +2154,10 @@ google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a/go.mod h1:1vXfmgAz google.golang.org/genproto v0.0.0-20221025140454-527a21cfbd71/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= -google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E= -google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= diff --git a/internal/core/parser.go b/internal/core/parser.go index accb192373..38fe2e696b 100644 --- a/internal/core/parser.go +++ b/internal/core/parser.go @@ -36,16 +36,16 @@ type SchemaDefinition struct { // This includes schema and request parsing, and introspection. type Parser interface { // BuildRequestAST builds and return AST for the given request. - BuildRequestAST(request string) (*ast.Document, error) + BuildRequestAST(ctx context.Context, request string) (*ast.Document, error) // Returns true if the given request ast is an introspection request. IsIntrospection(*ast.Document) bool // Executes the given introspection request. - ExecuteIntrospection(request string) *client.RequestResult + ExecuteIntrospection(ctx context.Context, request string) *client.RequestResult // Parses the given request, returning a strongly typed model of that request. - Parse(*ast.Document, *client.GQLOptions) (*request.Request, []error) + Parse(context.Context, *ast.Document, *client.GQLOptions) (*request.Request, []error) // NewFilterFromString creates a new filter from a string. NewFilterFromString(collectionType string, body string) (immutable.Option[request.Filter], error) @@ -55,7 +55,7 @@ type Parser interface { // The parsing should validate the syntax, but not validate what that syntax expresses // is valid or not, i.e. we don't want the parser to make remote calls to verify the // policy description is valid or not (that is the callers responsiblity). - ParseSDL(sdl string) ([]client.CollectionDefinition, error) + ParseSDL(ctx context.Context, sdl string) ([]client.CollectionDefinition, error) // Adds the given schema to this parser's model. // diff --git a/internal/db/collection.go b/internal/db/collection.go index 88b455668f..24b33bf338 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -268,6 +268,9 @@ func (db *DB) getAllActiveDefinitions(ctx context.Context) ([]client.CollectionD func (c *collection) GetAllDocIDs( ctx context.Context, ) (<-chan client.DocIDResult, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, _, err := ensureContextTxn(ctx, c.db, true) if err != nil { return nil, err @@ -382,6 +385,9 @@ func (c *collection) Create( ctx context.Context, doc *client.Document, ) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return err @@ -402,6 +408,9 @@ func (c *collection) CreateMany( ctx context.Context, docs []*client.Document, ) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return err @@ -485,6 +494,9 @@ func (c *collection) Update( ctx context.Context, doc *client.Document, ) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return err @@ -546,6 +558,9 @@ func (c *collection) Save( ctx context.Context, doc *client.Document, ) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return err @@ -841,6 +856,9 @@ func (c *collection) Delete( ctx context.Context, docID client.DocID, ) (bool, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return false, err @@ -866,6 +884,9 @@ func (c *collection) Exists( ctx context.Context, docID client.DocID, ) (bool, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return false, err diff --git a/internal/db/collection_delete.go b/internal/db/collection_delete.go index a3963be2f4..41ba3e195b 100644 --- a/internal/db/collection_delete.go +++ b/internal/db/collection_delete.go @@ -27,6 +27,9 @@ func (c *collection) DeleteWithFilter( ctx context.Context, filter any, ) (*client.DeleteResult, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return nil, err diff --git a/internal/db/collection_get.go b/internal/db/collection_get.go index 94806d47de..f1b426ca48 100644 --- a/internal/db/collection_get.go +++ b/internal/db/collection_get.go @@ -27,6 +27,9 @@ func (c *collection) Get( docID client.DocID, showDeleted bool, ) (*client.Document, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + // create txn ctx, txn, err := ensureContextTxn(ctx, c.db, true) if err != nil { diff --git a/internal/db/collection_index.go b/internal/db/collection_index.go index 227942394c..4f0cf83546 100644 --- a/internal/db/collection_index.go +++ b/internal/db/collection_index.go @@ -220,6 +220,9 @@ func (c *collection) CreateIndex( ctx context.Context, desc client.IndexDescriptionCreateRequest, ) (client.IndexDescription, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return client.IndexDescription{}, err @@ -373,6 +376,9 @@ func (c *collection) indexExistingDocs( // // All index artifacts for existing documents related the index will be removed. func (c *collection) DropIndex(ctx context.Context, indexName string) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return err @@ -464,6 +470,9 @@ func (c *collection) loadIndexes(ctx context.Context) error { // GetIndexes returns all indexes for the collection. func (c *collection) GetIndexes(ctx context.Context) ([]client.IndexDescription, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return nil, err diff --git a/internal/db/collection_update.go b/internal/db/collection_update.go index 29619c48cc..0918a4d045 100644 --- a/internal/db/collection_update.go +++ b/internal/db/collection_update.go @@ -30,6 +30,9 @@ func (c *collection) UpdateWithFilter( filter any, updater string, ) (*client.UpdateResult, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, c.db, false) if err != nil { return nil, err diff --git a/internal/db/db.go b/internal/db/db.go index c3bb574da9..a4cd835d63 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -22,7 +22,6 @@ import ( ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" - "github.com/sourcenetwork/corelog" "github.com/sourcenetwork/immutable" @@ -36,10 +35,12 @@ import ( "github.com/sourcenetwork/defradb/internal/db/permission" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/request/graphql" + "github.com/sourcenetwork/defradb/internal/telemetry" ) var ( - log = corelog.NewLogger("db") + log = corelog.NewLogger("db") + tracer = telemetry.NewTracer() ) // make sure we match our client interface @@ -212,6 +213,9 @@ func (db *DB) AddPolicy( ctx context.Context, policy string, ) (client.AddPolicyResult, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + if !db.acp.HasValue() { return client.AddPolicyResult{}, client.ErrACPOperationButACPNotAvailable } @@ -264,6 +268,9 @@ func (db *DB) AddDocActorRelationship( relation string, targetActor string, ) (client.AddDocActorRelationshipResult, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + if !db.acp.HasValue() { return client.AddDocActorRelationshipResult{}, client.ErrACPOperationButACPNotAvailable } @@ -309,6 +316,9 @@ func (db *DB) DeleteDocActorRelationship( relation string, targetActor string, ) (client.DeleteDocActorRelationshipResult, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + if !db.acp.HasValue() { return client.DeleteDocActorRelationshipResult{}, client.ErrACPOperationButACPNotAvailable } diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index feebdd8a40..d935404a46 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -40,6 +40,9 @@ const ( ) func (db *DB) SetReplicator(ctx context.Context, rep client.ReplicatorParams) error { + ctx, span := tracer.Start(ctx) + defer span.End() + txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -214,6 +217,9 @@ func (db *DB) getDocsHeads( } func (db *DB) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) error { + ctx, span := tracer.Start(ctx) + defer span.End() + txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -307,6 +313,9 @@ func (db *DB) DeleteReplicator(ctx context.Context, rep client.ReplicatorParams) } func (db *DB) GetAllReplicators(ctx context.Context) ([]client.Replicator, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + txn, err := db.NewTxn(ctx, true) if err != nil { return nil, err diff --git a/internal/db/p2p_schema_root.go b/internal/db/p2p_schema_root.go index b1df8ae421..bb256adac0 100644 --- a/internal/db/p2p_schema_root.go +++ b/internal/db/p2p_schema_root.go @@ -26,6 +26,9 @@ import ( const marker = byte(0xff) func (db *DB) AddP2PCollections(ctx context.Context, collectionIDs []string) error { + ctx, span := tracer.Start(ctx) + defer span.End() + txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -93,6 +96,9 @@ func (db *DB) AddP2PCollections(ctx context.Context, collectionIDs []string) err } func (db *DB) RemoveP2PCollections(ctx context.Context, collectionIDs []string) error { + ctx, span := tracer.Start(ctx) + defer span.End() + txn, err := db.NewTxn(ctx, false) if err != nil { return err @@ -152,6 +158,9 @@ func (db *DB) RemoveP2PCollections(ctx context.Context, collectionIDs []string) } func (db *DB) GetAllP2PCollections(ctx context.Context) ([]string, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + txn, err := db.NewTxn(ctx, true) if err != nil { return nil, err diff --git a/internal/db/request.go b/internal/db/request.go index e476ab50eb..bdf5ef5d6e 100644 --- a/internal/db/request.go +++ b/internal/db/request.go @@ -21,16 +21,16 @@ import ( // execRequest executes a request against the database. func (db *DB) execRequest(ctx context.Context, request string, options *client.GQLOptions) *client.RequestResult { res := &client.RequestResult{} - ast, err := db.parser.BuildRequestAST(request) + ast, err := db.parser.BuildRequestAST(ctx, request) if err != nil { res.GQL.Errors = append(res.GQL.Errors, err) return res } if db.parser.IsIntrospection(ast) { - return db.parser.ExecuteIntrospection(request) + return db.parser.ExecuteIntrospection(ctx, request) } - parsedRequest, errors := db.parser.Parse(ast, options) + parsedRequest, errors := db.parser.Parse(ctx, ast, options) if len(errors) > 0 { res.GQL.Errors = append(res.GQL.Errors, errors...) return res @@ -57,8 +57,3 @@ func (db *DB) execRequest(ctx context.Context, request string, options *client.G res.GQL.Data = results return res } - -// ExecIntrospection executes an introspection request against the database. -func (db *DB) ExecIntrospection(request string) *client.RequestResult { - return db.parser.ExecuteIntrospection(request) -} diff --git a/internal/db/schema.go b/internal/db/schema.go index e465f98a2e..69d16a61e4 100644 --- a/internal/db/schema.go +++ b/internal/db/schema.go @@ -38,7 +38,7 @@ func (db *DB) addSchema( ctx context.Context, schemaString string, ) ([]client.CollectionDescription, error) { - newDefinitions, err := db.parser.ParseSDL(schemaString) + newDefinitions, err := db.parser.ParseSDL(ctx, schemaString) if err != nil { return nil, err } diff --git a/internal/db/store.go b/internal/db/store.go index b36cd15c35..c839eaaab3 100644 --- a/internal/db/store.go +++ b/internal/db/store.go @@ -22,6 +22,9 @@ import ( // ExecRequest executes a request against the database. func (db *DB) ExecRequest(ctx context.Context, request string, opts ...client.RequestOption) *client.RequestResult { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { res := &client.RequestResult{} @@ -50,6 +53,9 @@ func (db *DB) ExecRequest(ctx context.Context, request string, opts ...client.Re // GetCollectionByName returns an existing collection within the database. func (db *DB) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return nil, err @@ -64,6 +70,9 @@ func (db *DB) GetCollections( ctx context.Context, options client.CollectionFetchOptions, ) ([]client.Collection, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return nil, err @@ -78,6 +87,9 @@ func (db *DB) GetCollections( // // Will return an error if it is not found. func (db *DB) GetSchemaByVersionID(ctx context.Context, versionID string) (client.SchemaDescription, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return client.SchemaDescription{}, err @@ -93,6 +105,9 @@ func (db *DB) GetSchemas( ctx context.Context, options client.SchemaFetchOptions, ) ([]client.SchemaDescription, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return nil, err @@ -106,6 +121,9 @@ func (db *DB) GetSchemas( func (db *DB) GetAllIndexes( ctx context.Context, ) (map[client.CollectionName][]client.IndexDescription, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return nil, err @@ -121,6 +139,9 @@ func (db *DB) GetAllIndexes( // All schema types provided must not exist prior to calling this, and they may not reference existing // types previously defined. func (db *DB) AddSchema(ctx context.Context, schemaString string) ([]client.CollectionDescription, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return nil, err @@ -155,6 +176,9 @@ func (db *DB) PatchSchema( migration immutable.Option[model.Lens], setAsDefaultVersion bool, ) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -173,6 +197,9 @@ func (db *DB) PatchCollection( ctx context.Context, patchString string, ) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -188,6 +215,9 @@ func (db *DB) PatchCollection( } func (db *DB) SetActiveSchemaVersion(ctx context.Context, schemaVersionID string) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -203,6 +233,9 @@ func (db *DB) SetActiveSchemaVersion(ctx context.Context, schemaVersionID string } func (db *DB) SetMigration(ctx context.Context, cfg client.LensConfig) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -223,6 +256,9 @@ func (db *DB) AddView( sdl string, transform immutable.Option[model.Lens], ) ([]client.CollectionDefinition, error) { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return nil, err @@ -243,6 +279,9 @@ func (db *DB) AddView( } func (db *DB) RefreshViews(ctx context.Context, opts client.CollectionFetchOptions) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -265,6 +304,9 @@ func (db *DB) RefreshViews(ctx context.Context, opts client.CollectionFetchOptio // BasicImport imports a json dataset. // filepath must be accessible to the node. func (db *DB) BasicImport(ctx context.Context, filepath string) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -281,6 +323,9 @@ func (db *DB) BasicImport(ctx context.Context, filepath string) error { // BasicExport exports the current data or subset of data to file in json format. func (db *DB) BasicExport(ctx context.Context, config *client.BackupConfig) error { + ctx, span := tracer.Start(ctx) + defer span.End() + ctx, txn, err := ensureContextTxn(ctx, db, true) if err != nil { return err diff --git a/internal/db/view.go b/internal/db/view.go index 55828021aa..d030f9b79b 100644 --- a/internal/db/view.go +++ b/internal/db/view.go @@ -40,17 +40,17 @@ func (db *DB) addView( // with the all calls to the parser appart from `ParseSDL` when we implement the DQL stuff. query := fmt.Sprintf(`query { %s }`, inputQuery) - newDefinitions, err := db.parser.ParseSDL(sdl) + newDefinitions, err := db.parser.ParseSDL(ctx, sdl) if err != nil { return nil, err } - ast, err := db.parser.BuildRequestAST(query) + ast, err := db.parser.BuildRequestAST(ctx, query) if err != nil { return nil, err } - req, errs := db.parser.Parse(ast, &client.GQLOptions{}) + req, errs := db.parser.Parse(ctx, ast, &client.GQLOptions{}) if len(errs) > 0 { return nil, errors.Join(errs...) } diff --git a/internal/metric/metric.go b/internal/metric/metric.go deleted file mode 100644 index f267a7ed5d..0000000000 --- a/internal/metric/metric.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -/* -Package metric provides the observability system. - -It is a wrapper around the opentelemetry metric package. -*/ -package metric - -import ( - "context" - "encoding/json" - - "go.opentelemetry.io/otel/metric" - otelMetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -var _ Metric = (*Meter)(nil) - -// Metric interface attempts to abstract high-level aspects of the observability features, -// this would make it much easier to swap the base sdk without having to change too -// many implementation details. -type Metric interface { - // Register gives a name to the metric and initializes the provider. - Register(name string) - - // Dump is responsible to read the metrics and output all the gathered data. - Dump(ctx context.Context) (*metricdata.ResourceMetrics, error) - - // Close shutsdown the meter. - Close(ctx context.Context) error -} - -// Meter is currently wrapping opentelemetry meter functionalities while adhering to -// the Metric interface functionality. -type Meter struct { - reader otelMetric.Reader - provider *otelMetric.MeterProvider - meter metric.Meter -} - -// NewMeter returns a new meter. -func NewMeter() Meter { - return Meter{} -} - -// Register gives a name to the metric and initializes the provider. -func (m *Meter) Register(name string) { - m.provider = m.newManualProvider() - m.meter = m.provider.Meter(name) -} - -// Dump is responsible to read the metrics and output all the gathered data. -func (m *Meter) Dump(ctx context.Context) (*metricdata.ResourceMetrics, error) { - out := &metricdata.ResourceMetrics{} - if err := m.reader.Collect(ctx, out); err != nil { - return nil, err - } - return out, nil -} - -// Close shutsdown the meter. -func (m *Meter) Close(ctx context.Context) error { - return m.provider.Shutdown(ctx) -} - -// GetSyncHistogram returns a new histogram with the given name and unit. -func (m *Meter) GetSyncHistogram( - name string, - unit string, -) (metric.Int64Histogram, error) { - return m.meter.Int64Histogram( - name, - metric.WithUnit(unit), - ) -} - -// GetSyncCounter returns a new counter with the given name and unit. -func (m *Meter) GetSyncCounter( - name string, - unit string, -) (metric.Int64Counter, error) { - return m.meter.Int64Counter( - name, - metric.WithUnit(unit), - ) -} - -// DumpScopeMetricsString returns a string representation of the metrics. -func (m *Meter) DumpScopeMetricsString(ctx context.Context) (string, error) { - out := &metricdata.ResourceMetrics{} - if err := m.reader.Collect(ctx, out); err != nil { - return "", err - } - - jsonBytes, err := json.MarshalIndent(out.ScopeMetrics, "", " ") - if err != nil { - return "", err - } - - return string(jsonBytes), nil -} - -// Get returns the meter. -func (m *Meter) Get() metric.Meter { - return m.meter -} - -func (m *Meter) newManualProvider() *otelMetric.MeterProvider { - // Register a manual reader. - m.reader = otelMetric.NewManualReader() - return otelMetric.NewMeterProvider( - otelMetric.WithReader(m.reader), - ) -} diff --git a/internal/metric/metric_test.go b/internal/metric/metric_test.go deleted file mode 100644 index b89700fd74..0000000000 --- a/internal/metric/metric_test.go +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package metric - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -func TestMetricSyncHistogram(t *testing.T) { - meter := NewMeter() - meter.Register("HistogramOnly") - workDuration, err := meter.GetSyncHistogram( - "workDuration", - "ms", - ) - if err != nil { - t.Error(err) - } - - ctx := context.Background() - - // Note: Bucket bounds = [0 5 10 25 ...] - elapsedTime := 2 * time.Nanosecond - // Goes in second bucket. - workDuration.Record(ctx, elapsedTime.Nanoseconds()) - - elapsedTime = 4 * time.Nanosecond - // Goes in second bucket. - workDuration.Record(ctx, elapsedTime.Nanoseconds()) - - elapsedTime = 6 * time.Nanosecond - // Goes in third bucket. - workDuration.Record(ctx, elapsedTime.Nanoseconds()) - - data, err := meter.Dump(ctx) - if err != nil { - t.Error(err) - } - - assert.Equal(t, 1, len(data.ScopeMetrics)) - assert.Equal(t, "HistogramOnly", data.ScopeMetrics[0].Scope.Name) - assert.Equal(t, 1, len(data.ScopeMetrics[0].Metrics)) - assert.Equal(t, "workDuration", data.ScopeMetrics[0].Metrics[0].Name) - - firstMetricData := data.ScopeMetrics[0].Metrics[0].Data - histData, isHistData := firstMetricData.(metricdata.Histogram[int64]) - if !isHistData { - t.Error(err) - } - - assert.Equal(t, 1, len(histData.DataPoints)) - assert.Equal(t, uint64(3), histData.DataPoints[0].Count) - assert.Equal(t, int64(12), histData.DataPoints[0].Sum) // 2 + 4 + 6 - assert.Equal( - t, - []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - histData.DataPoints[0].Bounds, - ) - assert.Equal( - t, - []uint64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - histData.DataPoints[0].BucketCounts, - ) - - if meter.Close(ctx) != nil { - t.Error(err) - } -} - -func TestMetricSyncCounter(t *testing.T) { - meter := NewMeter() - meter.Register("CounterOnly") - stuffCounter, err := meter.GetSyncCounter( - "countStuff", - "1", - ) - if err != nil { - t.Error(err) - } - - ctx := context.Background() - stuffCounter.Add(ctx, 12) - stuffCounter.Add(ctx, 1) - - data, err := meter.Dump(ctx) - if err != nil { - t.Error(err) - } - - assert.Equal(t, 1, len(data.ScopeMetrics)) - assert.Equal(t, "CounterOnly", data.ScopeMetrics[0].Scope.Name) - assert.Equal(t, 1, len(data.ScopeMetrics[0].Metrics)) - assert.Equal(t, "countStuff", data.ScopeMetrics[0].Metrics[0].Name) - - firstMetricData := data.ScopeMetrics[0].Metrics[0].Data - sumData, isSum := firstMetricData.(metricdata.Sum[int64]) - if !isSum { - t.Error(err) - } - assert.Equal(t, "CumulativeTemporality", sumData.Temporality.String()) - assert.Equal(t, 1, len(sumData.DataPoints)) - assert.Equal(t, int64(13), sumData.DataPoints[0].Value) // 12 + 1 - - if meter.Close(ctx) != nil { - t.Error(err) - } -} - -func TestMetricWithCounterAndHistogramIntrumentOnOneMeter(t *testing.T) { - meter := NewMeter() - - meter.Register("CounterAndHistogram") - - stuffCounter, err := meter.GetSyncCounter( - "countStuff", - "1", - ) - if err != nil { - t.Error(err) - } - - workDuration, err := meter.GetSyncHistogram( - "workDuration", - "ms", - ) - if err != nil { - t.Error(err) - } - - ctx := context.Background() - - elapsedTime := 2 * time.Nanosecond - workDuration.Record(ctx, elapsedTime.Nanoseconds()) - - stuffCounter.Add(ctx, 12) - - elapsedTime = 4 * time.Nanosecond - workDuration.Record(ctx, elapsedTime.Nanoseconds()) - - elapsedTime = 6 * time.Nanosecond - workDuration.Record(ctx, elapsedTime.Nanoseconds()) - - stuffCounter.Add(ctx, 1) - - data, err := meter.Dump(ctx) - if err != nil { - t.Error(err) - } - - assert.Equal(t, 1, len(data.ScopeMetrics)) - assert.Equal(t, "CounterAndHistogram", data.ScopeMetrics[0].Scope.Name) - - metrics := data.ScopeMetrics[0].Metrics - assert.Equal(t, 2, len(metrics)) - - // Assert Counter - assert.Equal(t, "countStuff", metrics[0].Name) - countMetricData := metrics[0].Data - sumData, isSum := countMetricData.(metricdata.Sum[int64]) - if !isSum { - t.Error(err) - } - assert.Equal(t, "CumulativeTemporality", sumData.Temporality.String()) - assert.Equal(t, 1, len(sumData.DataPoints)) - assert.Equal(t, int64(13), sumData.DataPoints[0].Value) // 12 + 1 - - // Assert Histogram - assert.Equal(t, "workDuration", metrics[1].Name) - - histMetricData := metrics[1].Data - histData, isHistData := histMetricData.(metricdata.Histogram[int64]) - if !isHistData { - t.Error(err) - } - - assert.Equal(t, 1, len(histData.DataPoints)) - assert.Equal(t, uint64(3), histData.DataPoints[0].Count) - assert.Equal(t, int64(12), histData.DataPoints[0].Sum) // 2 + 4 + 6 - assert.Equal( - t, - []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, - histData.DataPoints[0].Bounds, - ) - assert.Equal( - t, - []uint64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, - histData.DataPoints[0].BucketCounts, - ) - - if meter.Close(ctx) != nil { - t.Error(err) - } -} diff --git a/internal/request/graphql/parser.go b/internal/request/graphql/parser.go index 7ff23394fa..40ca2c68e0 100644 --- a/internal/request/graphql/parser.go +++ b/internal/request/graphql/parser.go @@ -25,10 +25,13 @@ import ( "github.com/sourcenetwork/defradb/internal/core" defrap "github.com/sourcenetwork/defradb/internal/request/graphql/parser" "github.com/sourcenetwork/defradb/internal/request/graphql/schema" + "github.com/sourcenetwork/defradb/internal/telemetry" ) var _ core.Parser = (*parser)(nil) +var tracer = telemetry.NewTracer() + type parser struct { schemaManager *schema.SchemaManager } @@ -46,7 +49,10 @@ func NewParser() (*parser, error) { return p, nil } -func (p *parser) BuildRequestAST(request string) (*ast.Document, error) { +func (p *parser) BuildRequestAST(ctx context.Context, request string) (*ast.Document, error) { + _, span := tracer.Start(ctx) + defer span.End() + source := source.NewSource(&source.Source{ Body: []byte(request), Name: "GraphQL request", @@ -65,7 +71,10 @@ func (p *parser) IsIntrospection(ast *ast.Document) bool { return defrap.IsIntrospectionQuery(*schema, ast) } -func (p *parser) ExecuteIntrospection(request string) *client.RequestResult { +func (p *parser) ExecuteIntrospection(ctx context.Context, request string) *client.RequestResult { + _, span := tracer.Start(ctx) + defer span.End() + schema := p.schemaManager.Schema() params := gql.Params{Schema: *schema, RequestString: request} r := gql.Do(params) @@ -83,7 +92,10 @@ func (p *parser) ExecuteIntrospection(request string) *client.RequestResult { return res } -func (p *parser) Parse(ast *ast.Document, options *client.GQLOptions) (*request.Request, []error) { +func (p *parser) Parse(ctx context.Context, ast *ast.Document, options *client.GQLOptions) (*request.Request, []error) { + _, span := tracer.Start(ctx) + defer span.End() + schema := p.schemaManager.Schema() validationResult := gql.ValidateDocument(schema, ast, nil) if !validationResult.IsValid { @@ -97,11 +109,17 @@ func (p *parser) Parse(ast *ast.Document, options *client.GQLOptions) (*request. return defrap.ParseRequest(*schema, ast, options) } -func (p *parser) ParseSDL(sdl string) ([]client.CollectionDefinition, error) { +func (p *parser) ParseSDL(ctx context.Context, sdl string) ([]client.CollectionDefinition, error) { + _, span := tracer.Start(ctx) + defer span.End() + return p.schemaManager.ParseSDL(sdl) } func (p *parser) SetSchema(ctx context.Context, txn datastore.Txn, collections []client.CollectionDefinition) error { + ctx, span := tracer.Start(ctx) + defer span.End() + schemaManager, err := schema.NewSchemaManager() if err != nil { return err diff --git a/internal/telemetry/noop.go b/internal/telemetry/noop.go new file mode 100644 index 0000000000..8a0f6d1041 --- /dev/null +++ b/internal/telemetry/noop.go @@ -0,0 +1,40 @@ +// Copyright 2025 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build !telemetry + +package telemetry + +import ( + "context" +) + +var ( + _ Tracer = (*noopTracer)(nil) + _ Span = (*noopSpan)(nil) +) + +type noopTracer struct{} + +func NewTracer() Tracer { + return &noopTracer{} +} + +func (t noopTracer) Start(ctx context.Context) (context.Context, Span) { + return ctx, &noopSpan{} +} + +type noopSpan struct{} + +func (s *noopSpan) End() {} + +func ConfigureTelemetry(ctx context.Context, version string) error { + return nil +} diff --git a/internal/telemetry/otel.go b/internal/telemetry/otel.go new file mode 100644 index 0000000000..206b666a50 --- /dev/null +++ b/internal/telemetry/otel.go @@ -0,0 +1,106 @@ +// Copyright 2025 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build telemetry + +package telemetry + +import ( + "context" + + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" +) + +var ( + _ Tracer = (*otelTracer)(nil) + _ Span = (*otelSpan)(nil) +) + +type otelTracer struct { + inner trace.Tracer +} + +func NewTracer() Tracer { + name, _ := callerInfo(2) + tracer := otel.Tracer(name) + return &otelTracer{tracer} +} + +func (t otelTracer) Start(ctx context.Context) (context.Context, Span) { + _, name := callerInfo(2) + ctx, span := t.inner.Start(ctx, name) + return ctx, &otelSpan{span} +} + +type otelSpan struct { + inner trace.Span +} + +func (s *otelSpan) End() { + s.inner.End() +} + +// ConfigureTelemetry configures the global telemetry providers for +// defradb and any dependencies that use the OpenTelemetry SDK. +func ConfigureTelemetry(ctx context.Context, version string) error { + opts := []resource.Option{ + resource.WithSchemaURL(semconv.SchemaURL), + resource.WithAttributes( + semconv.ServiceNameKey.String("DefraDB"), + semconv.ServiceVersionKey.String(version), + ), + // include all OS info + resource.WithOS(), + // include all process info + resource.WithProcess(), + } + res, err := resource.New(ctx, opts...) + if err != nil { + return err + } + // default to http exporter for traces + spanExporter, err := otlptracehttp.New(ctx) + if err != nil { + return err + } + // default to http exporter for metrics + metricExporter, err := otlpmetrichttp.New(ctx) + if err != nil { + return err + } + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithBatcher(spanExporter), + ) + // runtime metrics adds info from the Go runtime + // for more info see the link below: + // https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/runtime + runtimeReader := sdkmetric.NewPeriodicReader( + metricExporter, + sdkmetric.WithProducer(runtime.NewProducer()), + ) + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(runtimeReader), + ) + // set the global meter provider for all otel instances + otel.SetMeterProvider(meterProvider) + // set the global trace provider for all otel instances + otel.SetTracerProvider(tracerProvider) + return nil +} diff --git a/internal/telemetry/otel_test.go b/internal/telemetry/otel_test.go new file mode 100644 index 0000000000..d031ff6c2a --- /dev/null +++ b/internal/telemetry/otel_test.go @@ -0,0 +1,25 @@ +// Copyright 2025 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build telemetry + +package telemetry + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfigureTelemetry(t *testing.T) { + err := ConfigureTelemetry(context.Background(), "v0") + assert.NoError(t, err) +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 0000000000..0143f0ca08 --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,47 @@ +// Copyright 2025 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package telemetry + +import ( + "context" + "runtime" + "strings" +) + +// Tracer is used to create span telemetry. +type Tracer interface { + // Start creates a new span. + Start(context.Context) (context.Context, Span) +} + +// Span represents a node in a function call tree. +type Span interface { + // End completes the span. + End() +} + +// callerInfo returns the calling package name and calling func name. +func callerInfo(skip int) (string, string) { + pc, _, _, ok := runtime.Caller(skip) + if !ok { + return "", "" + } + fn := runtime.FuncForPC(pc) + if fn == nil { + return "", "" + } + name := fn.Name() + index := strings.LastIndex(name, ".") + if index < 0 { + return "", "" + } + return name[:index], name[index+1:] +} diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go new file mode 100644 index 0000000000..aee6f89169 --- /dev/null +++ b/internal/telemetry/telemetry_test.go @@ -0,0 +1,23 @@ +// Copyright 2025 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCallerInfo(t *testing.T) { + pkg, fn := callerInfo(1) + assert.Equal(t, "TestCallerInfo", fn) + assert.Equal(t, "github.com/sourcenetwork/defradb/internal/telemetry", pkg) +} diff --git a/tests/bench/query/planner/utils.go b/tests/bench/query/planner/utils.go index a4fc69e091..6cea739d84 100644 --- a/tests/bench/query/planner/utils.go +++ b/tests/bench/query/planner/utils.go @@ -40,8 +40,8 @@ func runQueryParserBench( b.ResetTimer() for i := 0; i < b.N; i++ { - ast, _ := parser.BuildRequestAST(query) - _, errs := parser.Parse(ast, &client.GQLOptions{}) + ast, _ := parser.BuildRequestAST(ctx, query) + _, errs := parser.Parse(ctx, ast, &client.GQLOptions{}) if errs != nil { return errors.Wrap("failed to parse query string", errors.New(fmt.Sprintf("%v", errs))) } @@ -68,8 +68,8 @@ func runMakePlanBench( return err } - ast, _ := parser.BuildRequestAST(query) - q, errs := parser.Parse(ast, &client.GQLOptions{}) + ast, _ := parser.BuildRequestAST(ctx, query) + q, errs := parser.Parse(ctx, ast, &client.GQLOptions{}) if len(errs) > 0 { return errors.Wrap("failed to parse query string", errors.New(fmt.Sprintf("%v", errs))) } @@ -115,7 +115,7 @@ func buildParser( return nil, err } - collectionDescriptions, err := parser.ParseSDL(schema) + collectionDescriptions, err := parser.ParseSDL(ctx, schema) if err != nil { return nil, err } diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 2f6a47dd6c..e806a61954 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -36,7 +36,6 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/internal/db" "github.com/sourcenetwork/defradb/internal/encryption" - "github.com/sourcenetwork/defradb/internal/request/graphql" "github.com/sourcenetwork/defradb/internal/request/graphql/schema/types" "github.com/sourcenetwork/defradb/net" "github.com/sourcenetwork/defradb/node" @@ -2331,22 +2330,6 @@ func skipIfNetworkTest(t testing.TB, actions []any) { } } -func ParseSDL(gqlSDL string) (map[string]client.CollectionDefinition, error) { - parser, err := graphql.NewParser() - if err != nil { - return nil, err - } - cols, err := parser.ParseSDL(gqlSDL) - if err != nil { - return nil, err - } - result := make(map[string]client.CollectionDefinition) - for _, col := range cols { - result[col.Description.Name.Value()] = col - } - return result, nil -} - func MustParseTime(timeString string) time.Time { t, err := time.Parse(time.RFC3339, timeString) if err != nil {