Skip to content

Commit

Permalink
rework buffering of tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
davehorton committed Feb 13, 2025
1 parent df88aa2 commit 6f9cfdf
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions lib/utils/tts-streaming-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class TtsStreamingBuffer extends Emitter {
this._isFull = false;
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
this.timer = null;
// Record the last time the text buffer was updated.
this.lastUpdateTime = 0;
}

get isEmpty() {
Expand Down Expand Up @@ -114,6 +116,8 @@ class TtsStreamingBuffer extends Emitter {
);
this.queue.push({ type: 'text', value: tokens });
this.bufferedLength += totalLength;
// Update the last update time each time new text is buffered.
this.lastUpdateTime = Date.now();

await this._feedQueue();
return { status: 'ok' };
Expand Down Expand Up @@ -175,17 +179,17 @@ class TtsStreamingBuffer extends Emitter {
* Then, remove the exact tokens (or portions thereof) that were consumed.
*/
async _feedQueue(handlingTimeout = false) {
this.logger.debug({ queue: this.queue }, '_feedQueue');
this.logger.debug({ queue: this.queue }, 'TtsStreamingBuffer:_feedQueue');
try {
if (!this.cs.isTtsStreamOpen || !this.ep) {
this.logger.debug('TTS stream is not open or no endpoint available');
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not open or no endpoint available');
return;
}
if (
this._connectionStatus === TtsStreamingConnectionStatus.NotConnected ||
this._connectionStatus === TtsStreamingConnectionStatus.Failed
) {
this.logger.debug('TTS stream is not connected');
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not connected');
return;
}

Expand Down Expand Up @@ -229,7 +233,10 @@ class TtsStreamingBuffer extends Emitter {
}

// --- Phase 2: Process remaining text tokens ---
if (this.queue.length === 0) return;
if (this.queue.length === 0) {
this._removeTimer();
return;
}

// Accumulate contiguous text tokens (from the front) up to MAX_CHUNK_SIZE.
let combinedText = '';
Expand All @@ -238,7 +245,10 @@ class TtsStreamingBuffer extends Emitter {
combinedText += item.value;
if (combinedText.length >= MAX_CHUNK_SIZE) break;
}
if (combinedText.length === 0) return;
if (combinedText.length === 0) {
this._removeTimer();
return;
}

const limit = Math.min(MAX_CHUNK_SIZE, combinedText.length);
let chunkEnd = findSentenceBoundary(combinedText, limit);
Expand Down Expand Up @@ -334,12 +344,29 @@ class TtsStreamingBuffer extends Emitter {

_setTimerIfNeeded() {
if (this.bufferedLength > 0 && !this.timer) {
this.logger.debug({queue: this.queue},
`TtsStreamingBuffer:_setTimerIfNeeded setting timer because ${this.bufferedLength} buffered`);
this.timer = setTimeout(this._onTimeout.bind(this), TIMEOUT_RETRY_MSECS);
}
}

_removeTimer() {
if (this.timer) {
this.logger.debug('TtsStreamingBuffer:_removeTimer clearing timer');
clearTimeout(this.timer);
this.timer = null;
}
}

_onTimeout() {
this.logger.info('TtsStreamingBuffer:_onTimeout Timeout waiting for sentence boundary');
this.logger.debug('TtsStreamingBuffer:_onTimeout Timeout waiting for sentence boundary');
// Check if new text has been added since the timer was set.
const now = Date.now();
if (now - this.lastUpdateTime < TIMEOUT_RETRY_MSECS) {
this.logger.debug('TtsStreamingBuffer:_onTimeout New text received recently; postponing flush.');
this._setTimerIfNeeded();
return;
}
this.timer = null;
this._feedQueue(true);
}
Expand Down

0 comments on commit 6f9cfdf

Please sign in to comment.