Skip to content

Commit

Permalink
chore: add some shared llm-type plugin utilities (#5109)
Browse files Browse the repository at this point in the history
* wip utilities

* fixes

* add tests

* update comment

* update

* add math.random stubs to test hooks

* fix
  • Loading branch information
sabrenner authored and szegedi committed Jan 30, 2025
1 parent 246b7f7 commit 406b905
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 72 deletions.
36 changes: 6 additions & 30 deletions packages/datadog-plugin-langchain/src/handlers/default.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
'use strict'

const Sampler = require('../../../dd-trace/src/sampler')
const makeUtilities = require('../../../dd-trace/src/plugins/util/llm')

const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g

// TODO: should probably refactor the OpenAI integration to use a shared LLMTracingPlugin base class
// This logic isn't particular to LangChain
class LangChainHandler {
constructor (config) {
this.config = config
this.sampler = new Sampler(config.spanPromptCompletionSampleRate)
constructor (tracerConfig) {
const utilities = makeUtilities('langchain', tracerConfig)

this.normalize = utilities.normalize
this.isPromptCompletionSampled = utilities.isPromptCompletionSampled
}

// no-op for default handler
Expand All @@ -27,27 +24,6 @@ class LangChainHandler {

// no-op for default handler
extractModel (instance) {}

normalize (text) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

const max = this.config.spanCharLimit

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > max) {
return text.substring(0, max) + '...'
}

return text
}

isPromptCompletionSampled () {
return this.sampler.isSampled()
}
}

module.exports = LangChainHandler
11 changes: 5 additions & 6 deletions packages/datadog-plugin-langchain/src/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ class LangChainTracingPlugin extends TracingPlugin {
constructor () {
super(...arguments)

const langchainConfig = this._tracerConfig.langchain || {}
this.handlers = {
chain: new LangChainChainHandler(langchainConfig),
chat_model: new LangChainChatModelHandler(langchainConfig),
llm: new LangChainLLMHandler(langchainConfig),
embedding: new LangChainEmbeddingHandler(langchainConfig),
default: new LangChainHandler(langchainConfig)
chain: new LangChainChainHandler(this._tracerConfig),
chat_model: new LangChainChatModelHandler(this._tracerConfig),
llm: new LangChainLLMHandler(this._tracerConfig),
embedding: new LangChainEmbeddingHandler(this._tracerConfig),
default: new LangChainHandler(this._tracerConfig)
}
}

Expand Down
47 changes: 14 additions & 33 deletions packages/datadog-plugin-openai/src/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ const Sampler = require('../../dd-trace/src/sampler')
const { MEASURED } = require('../../../ext/tags')
const { estimateTokens } = require('./token-estimator')

// String#replaceAll unavailable on Node.js@v14 (dd-trace@<=v3)
const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g
const makeUtilities = require('../../dd-trace/src/plugins/util/llm')

// TODO: In the future we should refactor config.js to make it requirable
let MAX_TEXT_LEN = 128
let normalize

function safeRequire (path) {
try {
Expand Down Expand Up @@ -44,9 +41,11 @@ class OpenAiTracingPlugin extends TracingPlugin {

this.sampler = new Sampler(0.1) // default 10% log sampling

// hoist the max length env var to avoid making all of these functions a class method
// hoist the normalize function to avoid making all of these functions a class method
if (this._tracerConfig) {
MAX_TEXT_LEN = this._tracerConfig.openaiSpanCharLimit
const utilities = makeUtilities('openai', this._tracerConfig)

normalize = utilities.normalize
}
}

Expand Down Expand Up @@ -116,7 +115,7 @@ class OpenAiTracingPlugin extends TracingPlugin {
// createEdit, createEmbedding, createModeration
if (payload.input) {
const normalized = normalizeStringOrTokenArray(payload.input, false)
tags['openai.request.input'] = truncateText(normalized)
tags['openai.request.input'] = normalize(normalized)
openaiStore.input = normalized
}

Expand Down Expand Up @@ -594,7 +593,7 @@ function commonImageResponseExtraction (tags, body) {
for (let i = 0; i < body.data.length; i++) {
const image = body.data[i]
// exactly one of these two options is provided
tags[`openai.response.images.${i}.url`] = truncateText(image.url)
tags[`openai.response.images.${i}.url`] = normalize(image.url)
tags[`openai.response.images.${i}.b64_json`] = image.b64_json && 'returned'
}
}
Expand Down Expand Up @@ -731,14 +730,14 @@ function commonCreateResponseExtraction (tags, body, openaiStore, methodName) {

tags[`openai.response.choices.${choiceIdx}.finish_reason`] = choice.finish_reason
tags[`openai.response.choices.${choiceIdx}.logprobs`] = specifiesLogProb ? 'returned' : undefined
tags[`openai.response.choices.${choiceIdx}.text`] = truncateText(choice.text)
tags[`openai.response.choices.${choiceIdx}.text`] = normalize(choice.text)

// createChatCompletion only
const message = choice.message || choice.delta // delta for streamed responses
if (message) {
tags[`openai.response.choices.${choiceIdx}.message.role`] = message.role
tags[`openai.response.choices.${choiceIdx}.message.content`] = truncateText(message.content)
tags[`openai.response.choices.${choiceIdx}.message.name`] = truncateText(message.name)
tags[`openai.response.choices.${choiceIdx}.message.content`] = normalize(message.content)
tags[`openai.response.choices.${choiceIdx}.message.name`] = normalize(message.name)
if (message.tool_calls) {
const toolCalls = message.tool_calls
for (let toolIdx = 0; toolIdx < toolCalls.length; toolIdx++) {
Expand Down Expand Up @@ -795,24 +794,6 @@ function truncateApiKey (apiKey) {
return apiKey && `sk-...${apiKey.substr(apiKey.length - 4)}`
}

/**
* for cleaning up prompt and response
*/
function truncateText (text) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > MAX_TEXT_LEN) {
return text.substring(0, MAX_TEXT_LEN) + '...'
}

return text
}

function tagChatCompletionRequestContent (contents, messageIdx, tags) {
if (typeof contents === 'string') {
tags[`openai.request.messages.${messageIdx}.content`] = contents
Expand All @@ -824,10 +805,10 @@ function tagChatCompletionRequestContent (contents, messageIdx, tags) {
const type = content.type
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.type`] = content.type
if (type === 'text') {
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.text`] = truncateText(content.text)
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.text`] = normalize(content.text)
} else if (type === 'image_url') {
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.image_url.url`] =
truncateText(content.image_url.url)
normalize(content.image_url.url)
}
// unsupported type otherwise, won't be tagged
}
Expand Down Expand Up @@ -1004,7 +985,7 @@ function normalizeStringOrTokenArray (input, truncate) {
const normalized = Array.isArray(input)
? `[${input.join(', ')}]` // "[1, 2, 999]"
: input // "foo"
return truncate ? truncateText(normalized) : normalized
return truncate ? normalize(normalized) : normalized
}

function defensiveArrayLength (maybeArray) {
Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ class Config {
this._setValue(defaults, 'inferredProxyServicesEnabled', false)
this._setValue(defaults, 'memcachedCommandEnabled', false)
this._setValue(defaults, 'openAiLogsEnabled', false)
this._setValue(defaults, 'openaiSpanCharLimit', 128)
this._setValue(defaults, 'openai.spanCharLimit', 128)
this._setValue(defaults, 'peerServiceMapping', {})
this._setValue(defaults, 'plugins', true)
this._setValue(defaults, 'port', '8126')
Expand Down Expand Up @@ -805,7 +805,7 @@ class Config {
// Requires an accompanying DD_APM_OBFUSCATION_MEMCACHED_KEEP_COMMAND=true in the agent
this._setBoolean(env, 'memcachedCommandEnabled', DD_TRACE_MEMCACHED_COMMAND_ENABLED)
this._setBoolean(env, 'openAiLogsEnabled', DD_OPENAI_LOGS_ENABLED)
this._setValue(env, 'openaiSpanCharLimit', maybeInt(DD_OPENAI_SPAN_CHAR_LIMIT))
this._setValue(env, 'openai.spanCharLimit', maybeInt(DD_OPENAI_SPAN_CHAR_LIMIT))
this._envUnprocessed.openaiSpanCharLimit = DD_OPENAI_SPAN_CHAR_LIMIT
if (DD_TRACE_PEER_SERVICE_MAPPING) {
this._setValue(env, 'peerServiceMapping', fromEntries(
Expand Down
35 changes: 35 additions & 0 deletions packages/dd-trace/src/plugins/util/llm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const Sampler = require('../../sampler')

const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g

function normalize (text, limit = 128) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > limit) {
return text.substring(0, limit) + '...'
}

return text
}

function isPromptCompletionSampled (sampler) {
return sampler.isSampled()
}

module.exports = function (integrationName, tracerConfig) {
const integrationConfig = tracerConfig[integrationName] || {}
const { spanCharLimit, spanPromptCompletionSampleRate } = integrationConfig

const sampler = new Sampler(spanPromptCompletionSampleRate ?? 1.0)

return {
normalize: str => normalize(str, spanCharLimit),
isPromptCompletionSampled: () => isPromptCompletionSampled(sampler)
}
}
2 changes: 1 addition & 1 deletion packages/dd-trace/test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ describe('Config', () => {
{ name: 'logInjection', value: false, origin: 'default' },
{ name: 'lookup', value: undefined, origin: 'default' },
{ name: 'openAiLogsEnabled', value: false, origin: 'default' },
{ name: 'openaiSpanCharLimit', value: 128, origin: 'default' },
{ name: 'openai.spanCharLimit', value: 128, origin: 'default' },
{ name: 'peerServiceMapping', value: {}, origin: 'default' },
{ name: 'plugins', value: true, origin: 'default' },
{ name: 'port', value: '8126', origin: 'default' },
Expand Down
80 changes: 80 additions & 0 deletions packages/dd-trace/test/plugins/util/llm.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict'

require('../../setup/tap')

const makeUtilities = require('../../../src/plugins/util/llm')

describe('llm utils', () => {
let utils

describe('with default configuration', () => {
beforeEach(() => {
utils = makeUtilities('langchain', {})
})

it('should normalize text to 128 characters', () => {
const text = 'a'.repeat(256)
expect(utils.normalize(text)).to.equal('a'.repeat(128) + '...')
})

it('should return undefined for empty text', () => {
expect(utils.normalize('')).to.be.undefined
})

it('should return undefined for a non-string', () => {
expect(utils.normalize(42)).to.be.undefined
})

it('should replace special characters', () => {
expect(utils.normalize('a\nb\tc')).to.equal('a\\nb\\tc')
})

it('should always sample prompt completion', () => {
expect(utils.isPromptCompletionSampled()).to.be.true
})
})

describe('with custom configuration available', () => {
beforeEach(() => {
utils = makeUtilities('langchain', {
langchain: {
spanCharLimit: 100,
spanPromptCompletionSampleRate: 0.6
}
})
})

it('should normalize text to 100 characters', () => {
const text = 'a'.repeat(256)
expect(utils.normalize(text)).to.equal('a'.repeat(100) + '...')
})

describe('with a random value greater than 0.6', () => {
beforeEach(() => {
sinon.stub(Math, 'random').returns(0.7)
})

afterEach(() => {
Math.random.restore()
})

it('should not sample prompt completion', () => {
expect(utils.isPromptCompletionSampled()).to.be.false
})
})

describe('with a random value less than 0.6', () => {
beforeEach(() => {
sinon.stub(Math, 'random').returns(0.5)
})

afterEach(() => {
Math.random.restore()
})

it('should sample prompt completion', () => {
expect(utils.isPromptCompletionSampled()).to.be.true
})
})
})
})

0 comments on commit 406b905

Please sign in to comment.