diff --git a/.env.example b/.env.example index 7b904d2ba8b..c266dc74f60 100644 --- a/.env.example +++ b/.env.example @@ -43,6 +43,9 @@ LIVEPEER_IMAGE_MODEL= # Default: ByteDance/SDXL-Lightning # Speech Synthesis ELEVENLABS_XI_API_KEY= # API key from elevenlabs +# Transcription Provider +TRANSCRIPTION_PROVIDER= # Default: local (possible values: openai, deepgram, local) + # Direct Client Setting EXPRESS_MAX_PAYLOAD= # Default: 100kb diff --git a/packages/client-twitter/src/index.ts b/packages/client-twitter/src/index.ts index 39ee853e828..6da648636ec 100644 --- a/packages/client-twitter/src/index.ts +++ b/packages/client-twitter/src/index.ts @@ -1,4 +1,8 @@ -import { Client, elizaLogger, IAgentRuntime } from "@elizaos/core"; +import { + Client, + elizaLogger, + IAgentRuntime, +} from "@elizaos/core"; import { ClientBase } from "./base.ts"; import { validateTwitterConfig, TwitterConfig } from "./environment.ts"; import { TwitterInteractionClient } from "./interactions.ts"; diff --git a/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts b/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts new file mode 100644 index 00000000000..3ecd5c95059 --- /dev/null +++ b/packages/client-twitter/src/plugins/SttTtsSpacesPlugin.ts @@ -0,0 +1,532 @@ +// src/plugins/SttTtsPlugin.ts + +import fs from 'fs'; +import path from 'path'; +import { spawn } from 'child_process'; +import { ITranscriptionService } from '@elizaos/core'; +import { Space, JanusClient, AudioDataWithUser } from 'agent-twitter-client'; + +interface PluginConfig { + openAiApiKey?: string; // for STT & ChatGPT + elevenLabsApiKey?: string; // for TTS + sttLanguage?: string; // e.g. "en" for Whisper + gptModel?: string; // e.g. "gpt-3.5-turbo" + silenceThreshold?: number; // amplitude threshold for ignoring silence + voiceId?: string; // specify which ElevenLabs voice to use + elevenLabsModel?: string; // e.g. "eleven_monolingual_v1" + systemPrompt?: string; // ex. "You are a helpful AI assistant" + chatContext?: Array<{ + role: 'system' | 'user' | 'assistant'; + content: string; + }>; + transcriptionService: ITranscriptionService; +} + +/** + * MVP plugin for speech-to-text (OpenAI) + conversation + TTS (ElevenLabs) + * Approach: + * - Collect each speaker's unmuted PCM in a memory buffer (only if above silence threshold) + * - On speaker mute -> flush STT -> GPT -> TTS -> push to Janus + */ +export class SttTtsPlugin implements Plugin { + private space?: Space; + private janus?: JanusClient; + + private openAiApiKey?: string; + private elevenLabsApiKey?: string; + + private sttLanguage = 'en'; + private gptModel = 'gpt-3.5-turbo'; + private voiceId = '21m00Tcm4TlvDq8ikWAM'; + private elevenLabsModel = 'eleven_monolingual_v1'; + private systemPrompt = 'You are a helpful AI assistant.'; + private chatContext: Array<{ + role: 'system' | 'user' | 'assistant'; + content: string; + }> = []; + + private transcriptionService: ITranscriptionService; + + /** + * userId => arrayOfChunks (PCM Int16) + */ + private pcmBuffers = new Map(); + + /** + * Track mute states: userId => boolean (true=unmuted) + */ + private speakerUnmuted = new Map(); + + /** + * For ignoring near-silence frames (if amplitude < threshold) + */ + private silenceThreshold = 50; + + // TTS queue for sequentially speaking + private ttsQueue: string[] = []; + private isSpeaking = false; + + onAttach(space: Space) { + console.log('[SttTtsPlugin] onAttach => space was attached'); + } + + init(params: { space: Space; pluginConfig?: Record }): void { + console.log( + '[SttTtsPlugin] init => Space fully ready. Subscribing to events.', + ); + + this.space = params.space; + this.janus = (this.space as any)?.janusClient as JanusClient | undefined; + + const config = params.pluginConfig as PluginConfig; + this.openAiApiKey = config?.openAiApiKey; + this.elevenLabsApiKey = config?.elevenLabsApiKey; + this.transcriptionService = config.transcriptionService; + if (config?.sttLanguage) this.sttLanguage = config.sttLanguage; + if (config?.gptModel) this.gptModel = config.gptModel; + if (typeof config?.silenceThreshold === 'number') { + this.silenceThreshold = config.silenceThreshold; + } + if (config?.voiceId) { + this.voiceId = config.voiceId; + } + if (config?.elevenLabsModel) { + this.elevenLabsModel = config.elevenLabsModel; + } + if (config?.systemPrompt) { + this.systemPrompt = config.systemPrompt; + } + if (config?.chatContext) { + this.chatContext = config.chatContext; + } + console.log('[SttTtsPlugin] Plugin config =>', config); + + // Listen for mute events + this.space.on( + 'muteStateChanged', + (evt: { userId: string; muted: boolean }) => { + console.log('[SttTtsPlugin] Speaker muteStateChanged =>', evt); + if (evt.muted) { + this.handleMute(evt.userId).catch((err) => + console.error('[SttTtsPlugin] handleMute error =>', err), + ); + } else { + this.speakerUnmuted.set(evt.userId, true); + if (!this.pcmBuffers.has(evt.userId)) { + this.pcmBuffers.set(evt.userId, []); + } + } + }, + ); + } + + /** + * Called whenever we receive PCM from a speaker + */ + onAudioData(data: AudioDataWithUser): void { + if (!this.speakerUnmuted.get(data.userId)) return; + + let maxVal = 0; + for (let i = 0; i < data.samples.length; i++) { + const val = Math.abs(data.samples[i]); + if (val > maxVal) maxVal = val; + } + if (maxVal < this.silenceThreshold) { + return; + } + + let arr = this.pcmBuffers.get(data.userId); + if (!arr) { + arr = []; + this.pcmBuffers.set(data.userId, arr); + } + arr.push(data.samples); + } + + // /src/sttTtsPlugin.ts + private async convertPcmToWavInMemory( + pcmData: Int16Array, + sampleRate: number + ): Promise { + // number of channels + const numChannels = 1; + // byte rate = (sampleRate * numChannels * bitsPerSample/8) + const byteRate = sampleRate * numChannels * 2; + const blockAlign = numChannels * 2; + // data chunk size = pcmData.length * (bitsPerSample/8) + const dataSize = pcmData.length * 2; + + // WAV header is 44 bytes + const buffer = new ArrayBuffer(44 + dataSize); + const view = new DataView(buffer); + + // RIFF chunk descriptor + this.writeString(view, 0, 'RIFF'); + view.setUint32(4, 36 + dataSize, true); // file size - 8 + this.writeString(view, 8, 'WAVE'); + + // fmt sub-chunk + this.writeString(view, 12, 'fmt '); + view.setUint32(16, 16, true); // Subchunk1Size (16 for PCM) + view.setUint16(20, 1, true); // AudioFormat (1 = PCM) + view.setUint16(22, numChannels, true); // NumChannels + view.setUint32(24, sampleRate, true); // SampleRate + view.setUint32(28, byteRate, true); // ByteRate + view.setUint16(32, blockAlign, true); // BlockAlign + view.setUint16(34, 16, true); // BitsPerSample (16) + + // data sub-chunk + this.writeString(view, 36, 'data'); + view.setUint32(40, dataSize, true); + + // Write PCM samples + let offset = 44; + for (let i = 0; i < pcmData.length; i++, offset += 2) { + view.setInt16(offset, pcmData[i], true); + } + + return buffer; + } + + private writeString(view: DataView, offset: number, text: string) { + for (let i = 0; i < text.length; i++) { + view.setUint8(offset + i, text.charCodeAt(i)); + } + } + + /** + * On speaker mute => flush STT => GPT => TTS => push to Janus + */ + private async handleMute(userId: string): Promise { + this.speakerUnmuted.set(userId, false); + const chunks = this.pcmBuffers.get(userId) || []; + this.pcmBuffers.set(userId, []); + + if (!chunks.length) { + console.log('[SttTtsPlugin] No audio chunks for user =>', userId); + return; + } + console.log( + `[SttTtsPlugin] Flushing STT buffer for user=${userId}, chunks=${chunks.length}`, + ); + + const totalLen = chunks.reduce((acc, c) => acc + c.length, 0); + const merged = new Int16Array(totalLen); + let offset = 0; + for (const c of chunks) { + merged.set(c, offset); + offset += c.length; + } + + // Convert PCM to WAV for STT + const wavBuffer = await this.convertPcmToWavInMemory(merged, 48000); + + // Whisper STT + const sttText = await this.transcriptionService.transcribe(wavBuffer); + + if (!sttText.trim()) { + console.log('[SttTtsPlugin] No speech recognized for user =>', userId); + return; + } + console.log(`[SttTtsPlugin] STT => user=${userId}, text="${sttText}"`); + + // GPT answer + const replyText = await this.askChatGPT(sttText); + console.log(`[SttTtsPlugin] GPT => user=${userId}, reply="${replyText}"`); + + // Use the standard speak method with queue + await this.speakText(replyText); + } + + /** + * Public method to queue a TTS request + */ + public async speakText(text: string): Promise { + this.ttsQueue.push(text); + if (!this.isSpeaking) { + this.isSpeaking = true; + this.processTtsQueue().catch((err) => { + console.error('[SttTtsPlugin] processTtsQueue error =>', err); + }); + } + } + + /** + * Process TTS requests one by one + */ + private async processTtsQueue(): Promise { + while (this.ttsQueue.length > 0) { + const text = this.ttsQueue.shift(); + if (!text) continue; + + try { + const ttsAudio = await this.elevenLabsTts(text); + const pcm = await this.convertMp3ToPcm(ttsAudio, 48000); + await this.streamToJanus(pcm, 48000); + } catch (err) { + console.error('[SttTtsPlugin] TTS streaming error =>', err); + } + } + this.isSpeaking = false; + } + + private convertPcmToWav( + samples: Int16Array, + sampleRate: number, + ): Promise { + return new Promise((resolve, reject) => { + const tmpPath = path.resolve('/tmp', `stt-${Date.now()}.wav`); + const ff = spawn('ffmpeg', [ + '-f', + 's16le', + '-ar', + sampleRate.toString(), + '-ac', + '1', + '-i', + 'pipe:0', + '-y', + tmpPath, + ]); + ff.stdin.write(Buffer.from(samples.buffer)); + ff.stdin.end(); + ff.on('close', (code) => { + if (code === 0) resolve(tmpPath); + else reject(new Error(`ffmpeg error code=${code}`)); + }); + }); + } + + /** + * OpenAI Whisper STT + */ + private async transcribeWithOpenAI(wavPath: string, language: string) { + if (!this.openAiApiKey) { + throw new Error('[SttTtsPlugin] No OpenAI API key available'); + } + + try { + console.log('[SttTtsPlugin] Transcribe =>', wavPath); + + // Read file into buffer + const fileBuffer = fs.readFileSync(wavPath); + console.log( + '[SttTtsPlugin] File read, size:', + fileBuffer.length, + 'bytes', + ); + + // Create blob from buffer + const blob = new Blob([fileBuffer], { type: 'audio/wav' }); + + // Create FormData + const formData = new FormData(); + formData.append('file', blob, path.basename(wavPath)); + formData.append('model', 'whisper-1'); + formData.append('language', language); + formData.append('temperature', '0'); + + // Call OpenAI API + const response = await fetch( + 'https://api.openai.com/v1/audio/transcriptions', + { + method: 'POST', + headers: { + Authorization: `Bearer ${this.openAiApiKey}`, + }, + body: formData, + }, + ); + if (!response.ok) { + const errorText = await response.text(); + console.error('[SttTtsPlugin] OpenAI API Error:', errorText); + throw new Error(`OpenAI API error: ${response.status} ${errorText}`); + } + const data = (await response.json()) as { text: string }; + return data.text?.trim() || ''; + } catch (err) { + console.error('[SttTtsPlugin] OpenAI STT Error =>', err); + throw new Error('OpenAI STT failed'); + } + } + + /** + * Simple ChatGPT call + */ + private async askChatGPT(userText: string): Promise { + if (!this.openAiApiKey) { + throw new Error('[SttTtsPlugin] No OpenAI API key for ChatGPT'); + } + const url = 'https://api.openai.com/v1/chat/completions'; + const messages = [ + { role: 'system', content: this.systemPrompt }, + ...this.chatContext, + { role: 'user', content: userText }, + ]; + + const resp = await fetch(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.openAiApiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + model: this.gptModel, + messages, + }), + }); + + if (!resp.ok) { + const errText = await resp.text(); + throw new Error( + `[SttTtsPlugin] ChatGPT error => ${resp.status} ${errText}`, + ); + } + + const json = await resp.json(); + const reply = json.choices?.[0]?.message?.content || ''; + this.chatContext.push({ role: 'user', content: userText }); + this.chatContext.push({ role: 'assistant', content: reply }); + return reply.trim(); + } + + /** + * ElevenLabs TTS => returns MP3 Buffer + */ + private async elevenLabsTts(text: string): Promise { + if (!this.elevenLabsApiKey) { + throw new Error('[SttTtsPlugin] No ElevenLabs API key'); + } + const url = `https://api.elevenlabs.io/v1/text-to-speech/${this.voiceId}`; + const resp = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'xi-api-key': this.elevenLabsApiKey, + }, + body: JSON.stringify({ + text, + model_id: this.elevenLabsModel, + voice_settings: { stability: 0.4, similarity_boost: 0.8 }, + }), + }); + if (!resp.ok) { + const errText = await resp.text(); + throw new Error( + `[SttTtsPlugin] ElevenLabs TTS error => ${resp.status} ${errText}`, + ); + } + const arrayBuf = await resp.arrayBuffer(); + return Buffer.from(arrayBuf); + } + + /** + * Convert MP3 => PCM via ffmpeg + */ + private convertMp3ToPcm( + mp3Buf: Buffer, + outRate: number, + ): Promise { + return new Promise((resolve, reject) => { + const ff = spawn('ffmpeg', [ + '-i', + 'pipe:0', + '-f', + 's16le', + '-ar', + outRate.toString(), + '-ac', + '1', + 'pipe:1', + ]); + let raw = Buffer.alloc(0); + + ff.stdout.on('data', (chunk: Buffer) => { + raw = Buffer.concat([raw, chunk]); + }); + ff.stderr.on('data', () => { + // ignoring ffmpeg logs + }); + ff.on('close', (code) => { + if (code !== 0) { + reject(new Error(`ffmpeg error code=${code}`)); + return; + } + const samples = new Int16Array( + raw.buffer, + raw.byteOffset, + raw.byteLength / 2, + ); + resolve(samples); + }); + + ff.stdin.write(mp3Buf); + ff.stdin.end(); + }); + } + + /** + * Push PCM back to Janus in small frames + * We'll do 10ms @48k => 960 samples per frame + */ + private async streamToJanus( + samples: Int16Array, + sampleRate: number, + ): Promise { + // TODO: Check if better than 480 fixed + const FRAME_SIZE = Math.floor(sampleRate * 0.01); // 10ms frames => 480 @48kHz + + for ( + let offset = 0; + offset + FRAME_SIZE <= samples.length; + offset += FRAME_SIZE + ) { + const frame = new Int16Array(FRAME_SIZE); + frame.set(samples.subarray(offset, offset + FRAME_SIZE)); + this.janus?.pushLocalAudio(frame, sampleRate, 1); + + // Short pause so we don't overload + await new Promise((r) => setTimeout(r, 10)); + } + } + + public setSystemPrompt(prompt: string) { + this.systemPrompt = prompt; + console.log('[SttTtsPlugin] setSystemPrompt =>', prompt); + } + + /** + * Change the GPT model at runtime (e.g. "gpt-4", "gpt-3.5-turbo", etc.). + */ + public setGptModel(model: string) { + this.gptModel = model; + console.log('[SttTtsPlugin] setGptModel =>', model); + } + + /** + * Add a message (system, user or assistant) to the chat context. + * E.g. to store conversation history or inject a persona. + */ + public addMessage(role: 'system' | 'user' | 'assistant', content: string) { + this.chatContext.push({ role, content }); + console.log( + `[SttTtsPlugin] addMessage => role=${role}, content=${content}`, + ); + } + + /** + * Clear the chat context if needed. + */ + public clearChatContext() { + this.chatContext = []; + console.log('[SttTtsPlugin] clearChatContext => done'); + } + + cleanup(): void { + console.log('[SttTtsPlugin] cleanup => releasing resources'); + this.pcmBuffers.clear(); + this.speakerUnmuted.clear(); + this.ttsQueue = []; + this.isSpeaking = false; + } +} diff --git a/packages/client-twitter/src/spaces.ts b/packages/client-twitter/src/spaces.ts index a67430191c7..6076b80e8fd 100644 --- a/packages/client-twitter/src/spaces.ts +++ b/packages/client-twitter/src/spaces.ts @@ -4,6 +4,8 @@ import { composeContext, generateText, ModelClass, + ServiceType, + ITranscriptionService, } from "@elizaos/core"; import { ClientBase } from "./base"; import { @@ -11,10 +13,12 @@ import { Space, SpaceConfig, RecordToDiskPlugin, - SttTtsPlugin, IdleMonitorPlugin, SpeakerRequest, } from "agent-twitter-client"; +import { + SttTtsPlugin +} from './plugins/SttTtsSpacesPlugin.ts'; interface SpaceDecisionOptions { maxSpeakers?: number; @@ -305,6 +309,9 @@ export class TwitterSpaceClient { gptModel: this.decisionOptions.gptModel, systemPrompt: this.decisionOptions.systemPrompt, sttLanguage: this.decisionOptions.sttLanguage, + transcriptionService: this.client.runtime.getService( + ServiceType.TRANSCRIPTION, + ) }); } diff --git a/packages/plugin-node/src/services/transcription.ts b/packages/plugin-node/src/services/transcription.ts index 5b734061524..7ffb441ff1d 100644 --- a/packages/plugin-node/src/services/transcription.ts +++ b/packages/plugin-node/src/services/transcription.ts @@ -34,14 +34,35 @@ export class TranscriptionService private isCudaAvailable: boolean = false; private openai: OpenAI | null = null; private deepgram?: DeepgramClient; + private preferredProvider?: string; // "deepgram", "openai", "local" private queue: { audioBuffer: ArrayBuffer; resolve: Function }[] = []; private processing: boolean = false; async initialize(_runtime: IAgentRuntime): Promise { this.runtime = _runtime; + + /** + * We set preferredProvider only if TRANSCRIPTION_PROVIDER is defined. + * The old logic remains in place (Deepgram > OpenAI > Local) for those + * who haven't configured TRANSCRIPTION_PROVIDER yet. + * This way, existing users relying on Deepgram without updating .env + * won't have their workflow broken. + */ + const provider = this.runtime.getSetting("TRANSCRIPTION_PROVIDER"); + if (provider) { + this.preferredProvider = provider; // "deepgram", "openai", "local" ... + } + const deepgramKey = this.runtime.getSetting("DEEPGRAM_API_KEY"); this.deepgram = deepgramKey ? createClient(deepgramKey) : null; + + const openaiKey = this.runtime.getSetting("OPENAI_API_KEY"); + this.openai = openaiKey + ? new OpenAI({ + apiKey: openaiKey, + }) + : null; } constructor() { @@ -92,7 +113,7 @@ export class TranscriptionService } else if (platform === "win32") { const cudaPath = path.join( settings.CUDA_PATH || - "C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v11.0", + "C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v11.0", "bin", "nvcc.exe" ); @@ -192,21 +213,24 @@ export class TranscriptionService } private async processQueue(): Promise { - if (this.processing || this.queue.length === 0) { - return; - } - + // Exit if already processing or if the queue is empty + if (this.processing || this.queue.length === 0) return; this.processing = true; while (this.queue.length > 0) { const { audioBuffer, resolve } = this.queue.shift()!; let result: string | null = null; - if (this.deepgram) { - result = await this.transcribeWithDeepgram(audioBuffer); - } else if (this.openai) { - result = await this.transcribeWithOpenAI(audioBuffer); + + /** + * If TRANSCRIPTION_PROVIDER is set, we use the new approach. + * Otherwise, we preserve the original fallback logic (Deepgram > OpenAI > Local). + * This ensures we don't break existing configurations where Deepgram is expected + * but TRANSCRIPTION_PROVIDER isn't set in the .env. + */ + if (this.preferredProvider) { + result = await this.transcribeUsingPreferredOrFallback(audioBuffer); } else { - result = await this.transcribeLocally(audioBuffer); + result = await this.transcribeUsingDefaultLogic(audioBuffer); } resolve(result); @@ -215,6 +239,45 @@ export class TranscriptionService this.processing = false; } + /** + * New approach (preferred provider + fallback). + * This can still handle a missing provider setting gracefully. + */ + private async transcribeUsingPreferredOrFallback(audioBuffer: ArrayBuffer): Promise { + let result: string | null = null; + + switch (this.preferredProvider) { + case "deepgram": + if (this.deepgram) { + result = await this.transcribeWithDeepgram(audioBuffer); + if (result) return result; + } + // fallback to openai + case "openai": + if (this.openai) { + result = await this.transcribeWithOpenAI(audioBuffer); + if (result) return result; + } + // fallback to local + case "local": + default: + return await this.transcribeLocally(audioBuffer); + } + } + + /** + * Original logic: Deepgram -> OpenAI -> Local + * We keep it untouched for backward compatibility. + */ + private async transcribeUsingDefaultLogic(audioBuffer: ArrayBuffer): Promise { + if (this.deepgram) { + return await this.transcribeWithDeepgram(audioBuffer); + } else if (this.openai) { + return await this.transcribeWithOpenAI(audioBuffer); + } + return await this.transcribeLocally(audioBuffer); + } + private async transcribeWithDeepgram( audioBuffer: ArrayBuffer ): Promise {