Skip to content

Commit

Permalink
database_observability: log schema name in query samples (#2444)
Browse files Browse the repository at this point in the history
database_observability: log schema name in query samples

Capture and log schema name in query samples fetched from mysql.
  • Loading branch information
cristiangreco authored Jan 20, 2025
1 parent 51176f4 commit fa8c111
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Main (unreleased)

- (_Experimental_) Improve parsing of truncated queries in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Capture schema name for query samples in `database_observability.mysql` (@cristiangreco)

- Add json format support for log export via faro receiver (@ravishankar15)

- Add livedebugging support for `prometheus.remote_write` (@ravishankar15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
const selectQuerySamples = `
SELECT
digest,
schema_name,
query_sample_text,
query_sample_seen,
query_sample_timer_wait
Expand Down Expand Up @@ -127,8 +128,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
break
}

var digest, sampleText, sampleSeen, sampleTimerWait string
err := rs.Scan(&digest, &sampleText, &sampleSeen, &sampleTimerWait)
var digest, schemaName, sampleText, sampleSeen, sampleTimerWait string
err := rs.Scan(&digest, &schemaName, &sampleText, &sampleSeen, &sampleTimerWait)
if err != nil {
level.Error(c.logger).Log("msg", "failed to scan result set for query samples", "err", err)
continue
Expand All @@ -142,20 +143,20 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
sampleText = sampleText[:idx]
}
} else {
level.Debug(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest)
level.Debug(c.logger).Log("msg", "skipping parsing truncated query", "schema", schemaName, "digest", digest)
continue
}
}

stmt, err := sqlparser.Parse(sampleText)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse sql query", "digest", digest, "err", err)
level.Error(c.logger).Log("msg", "failed to parse sql query", "schema", schemaName, "digest", digest, "err", err)
continue
}

sampleRedactedText, err := sqlparser.RedactSQLQuery(sampleText)
if err != nil {
level.Error(c.logger).Log("msg", "failed to redact sql query", "digest", digest, "err", err)
level.Error(c.logger).Log("msg", "failed to redact sql query", "schema", schemaName, "digest", digest, "err", err)
continue
}

Expand All @@ -164,8 +165,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="query samples fetched" op="%s" instance="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, c.instanceKey, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
`level=info msg="query samples fetched" op="%s" instance="%s" schema="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
OP_QUERY_SAMPLE, c.instanceKey, schemaName, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
),
},
}
Expand All @@ -177,8 +178,8 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="table name parsed" op="%s" instance="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, c.instanceKey, digest, table,
`level=info msg="table name parsed" op="%s" instance="%s" schema="%s" digest="%s" table="%s"`,
OP_QUERY_PARSED_TABLE_NAME, c.instanceKey, schemaName, digest, table,
),
},
}
Expand Down
Loading

0 comments on commit fa8c111

Please sign in to comment.