Skip to content

Commit

Permalink
fix to be more precise about removing custom event handlers so that w… (
Browse files Browse the repository at this point in the history
#580)

* fix to be more precise about removing custom event handlers so that when we stop a gather we dont also inadvertently stop a background transcribe as well

* test fixes

* fix: endpointing=false was being ignored for Deepgram
  • Loading branch information
davehorton authored Dec 28, 2023
1 parent 9d8291f commit 2c48083
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 122 deletions.
2 changes: 1 addition & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const disconnect = () => {
httpServer?.on('close', resolve);
httpServer?.close();
srf.disconnect();
srf.locals.mediaservers.forEach((ms) => ms.disconnect());
srf.locals.mediaservers?.forEach((ms) => ms.disconnect());
});
};

Expand Down
77 changes: 43 additions & 34 deletions lib/tasks/gather.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class TaskGather extends SttTask {
} catch (err) {
this.logger.error(err, 'TaskGather:exec error');
}
this.removeSpeechListeners(ep);
this.removeCustomEventListeners();
}

kill(cs) {
Expand Down Expand Up @@ -261,7 +261,7 @@ class TaskGather extends SttTask {
else if (this.input.includes('digits')) {
if (this.digitBuffer.length === 0 && this.needsStt) {
// DTMF is higher priority than STT.
this.removeSpeechListeners(ep);
this.removeCustomEventListeners();
ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname,
Expand Down Expand Up @@ -305,33 +305,37 @@ class TaskGather extends SttTask {
switch (this.vendor) {
case 'google':
this.bugname = `${this.bugname_prefix}google_transcribe`;
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
this.addCustomEventListener(
ep, GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
this.addCustomEventListener(
ep, GoogleTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;

case 'aws':
case 'polly':
this.bugname = `${this.bugname_prefix}aws_transcribe`;
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AwsTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
this.addCustomEventListener(ep, AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, AwsTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;
case 'microsoft':
this.bugname = `${this.bugname_prefix}azure_transcribe`;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this.addCustomEventListener(
ep, AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, AzureTranscriptionEvents.NoSpeechDetected,
this._onNoSpeechDetected.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
this.addCustomEventListener(ep, AzureTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;
case 'nuance':
this.bugname = `${this.bugname_prefix}nuance_transcribe`;
ep.addCustomEventListener(NuanceTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.StartOfSpeech,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.StartOfSpeech,
this._onStartOfSpeech.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.TranscriptionComplete,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.TranscriptionComplete,
this._onTranscriptionComplete.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.VadDetected,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.VadDetected,
this._onVadDetected.bind(this, cs, ep));

/* stall timers until prompt finishes playing */
Expand All @@ -342,9 +346,10 @@ class TaskGather extends SttTask {

case 'deepgram':
this.bugname = `${this.bugname_prefix}deepgram_transcribe`;
ep.addCustomEventListener(DeepgramTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(
ep, DeepgramTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));

/* if app sets deepgramOptions.utteranceEndMs they essentially want continuous asr */
Expand All @@ -353,12 +358,14 @@ class TaskGather extends SttTask {

case 'soniox':
this.bugname = `${this.bugname_prefix}soniox_transcribe`;
ep.addCustomEventListener(SonioxTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, SonioxTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
break;

case 'cobalt':
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
ep.addCustomEventListener(CobaltTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, CobaltTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));

/* cobalt doesnt have language, it has model, which is required */
if (!this.data.recognizer.model) {
Expand Down Expand Up @@ -388,21 +395,21 @@ class TaskGather extends SttTask {

case 'ibm':
this.bugname = `${this.bugname_prefix}ibm_transcribe`;
ep.addCustomEventListener(IbmTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, IbmTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, IbmTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, IbmTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;

case 'nvidia':
this.bugname = `${this.bugname_prefix}nvidia_transcribe`;
ep.addCustomEventListener(NvidiaTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.StartOfSpeech,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.StartOfSpeech,
this._onStartOfSpeech.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.TranscriptionComplete,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.TranscriptionComplete,
this._onTranscriptionComplete.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.VadDetected,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.VadDetected,
this._onVadDetected.bind(this, cs, ep));

/* I think nvidia has this (??) - stall timers until prompt finishes playing */
Expand All @@ -413,19 +420,21 @@ class TaskGather extends SttTask {

case 'assemblyai':
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`;
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(
ep, AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(
ep, JambonzTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
}
Expand All @@ -437,7 +446,7 @@ class TaskGather extends SttTask {
}

/* common handler for all stt engine errors */
ep.addCustomEventListener(JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error setting channel variables'));
}
Expand Down
13 changes: 10 additions & 3 deletions lib/tasks/stt-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ class SttTask extends Task {
const {
setChannelVarsForStt,
normalizeTranscription,
removeSpeechListeners,
setSpeechCredentialsAtRuntime,
compileSonioxTranscripts,
consolidateTranscripts
} = require('../utils/transcription-utils')(logger);
this.setChannelVarsForStt = setChannelVarsForStt;
this.normalizeTranscription = normalizeTranscription;
this.removeSpeechListeners = removeSpeechListeners;
this.compileSonioxTranscripts = compileSonioxTranscripts;
this.consolidateTranscripts = consolidateTranscripts;

this.eventHandlers = [];
this.isHandledByPrimaryProvider = true;
if (this.data.recognizer) {
const recognizer = this.data.recognizer;
Expand Down Expand Up @@ -140,6 +138,15 @@ class SttTask extends Task {
}
}

addCustomEventListener(ep, event, handler) {
this.eventHandlers.push({ep, event, handler});
ep.addCustomEventListener(event, handler);
}

removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
}

async _initSpeechCredentials(cs, vendor, label) {
const {getNuanceAccessToken, getIbmAccessToken} = cs.srf.locals.dbHelpers;
let credentials = cs.getSpeechCredentials(vendor, 'stt', label);
Expand Down
55 changes: 28 additions & 27 deletions lib/tasks/transcribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TaskTranscribe extends SttTask {
this.logger.info(err, 'TaskTranscribe:exec - error');
this.parentTask && this.parentTask.emit('error', err);
}
this.removeSpeechListeners(ep);
this.removeCustomEventListeners();
}

async _stopTranscription() {
Expand Down Expand Up @@ -142,43 +142,43 @@ class TaskTranscribe extends SttTask {
switch (this.vendor) {
case 'google':
this.bugname = `${this.bugname_prefix}google_transcribe`;
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, GoogleTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected,
this.addCustomEventListener(ep, GoogleTranscriptionEvents.NoAudioDetected,
this._onNoAudio.bind(this, cs, ep, channel));
ep.addCustomEventListener(GoogleTranscriptionEvents.MaxDurationExceeded,
this.addCustomEventListener(ep, GoogleTranscriptionEvents.MaxDurationExceeded,
this._onMaxDurationExceeded.bind(this, cs, ep, channel));
break;

case 'aws':
case 'polly':
this.bugname = `${this.bugname_prefix}aws_transcribe`;
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AwsTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AwsTranscriptionEvents.NoAudioDetected,
this.addCustomEventListener(ep, AwsTranscriptionEvents.NoAudioDetected,
this._onNoAudio.bind(this, cs, ep, channel));
ep.addCustomEventListener(AwsTranscriptionEvents.MaxDurationExceeded,
this.addCustomEventListener(ep, AwsTranscriptionEvents.MaxDurationExceeded,
this._onMaxDurationExceeded.bind(this, cs, ep, channel));
break;
case 'microsoft':
this.bugname = `${this.bugname_prefix}azure_transcribe`;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AzureTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this.addCustomEventListener(ep, AzureTranscriptionEvents.NoSpeechDetected,
this._onNoAudio.bind(this, cs, ep, channel));
break;
case 'nuance':
this.bugname = `${this.bugname_prefix}nuance_transcribe`;
ep.addCustomEventListener(NuanceTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'deepgram':
this.bugname = `${this.bugname_prefix}deepgram_transcribe`;
ep.addCustomEventListener(DeepgramTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect,
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Connect,
this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));

/* if app sets deepgramOptions.utteranceEndMs they essentially want continuous asr */
Expand All @@ -187,12 +187,12 @@ class TaskTranscribe extends SttTask {
break;
case 'soniox':
this.bugname = `${this.bugname_prefix}soniox_transcribe`;
ep.addCustomEventListener(SonioxTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, SonioxTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'cobalt':
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
ep.addCustomEventListener(CobaltTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, CobaltTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));

/* cobalt doesnt have language, it has model, which is required */
Expand Down Expand Up @@ -222,37 +222,38 @@ class TaskTranscribe extends SttTask {

case 'ibm':
this.bugname = `${this.bugname_prefix}ibm_transcribe`;
ep.addCustomEventListener(IbmTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, IbmTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(IbmTranscriptionEvents.Connect,
this.addCustomEventListener(ep, IbmTranscriptionEvents.Connect,
this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, IbmTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));
break;

case 'nvidia':
this.bugname = `${this.bugname_prefix}nvidia_transcribe`;
ep.addCustomEventListener(NvidiaTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;

case 'assemblyai':
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`;
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep,
AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));
break;

default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
}
Expand All @@ -264,7 +265,7 @@ class TaskTranscribe extends SttTask {
}

/* common handler for all stt engine errors */
ep.addCustomEventListener(JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error setting channel variables'));
}
Expand Down
Loading

0 comments on commit 2c48083

Please sign in to comment.