Skip to content

Commit

Permalink
fix to be more precise about removing custom event handlers so that w…
Browse files Browse the repository at this point in the history
…hen we stop a gather we dont also inadvertently stop a background transcribe as well
  • Loading branch information
davehorton committed Dec 27, 2023
1 parent 9d8291f commit be2bea6
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 121 deletions.
2 changes: 1 addition & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const disconnect = () => {
httpServer?.on('close', resolve);
httpServer?.close();
srf.disconnect();
srf.locals.mediaservers.forEach((ms) => ms.disconnect());
srf.locals.mediaservers?.forEach((ms) => ms.disconnect());
});
};

Expand Down
77 changes: 43 additions & 34 deletions lib/tasks/gather.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class TaskGather extends SttTask {
} catch (err) {
this.logger.error(err, 'TaskGather:exec error');
}
this.removeSpeechListeners(ep);
this.removeCustomEventListeners();
}

kill(cs) {
Expand Down Expand Up @@ -261,7 +261,7 @@ class TaskGather extends SttTask {
else if (this.input.includes('digits')) {
if (this.digitBuffer.length === 0 && this.needsStt) {
// DTMF is higher priority than STT.
this.removeSpeechListeners(ep);
this.removeCustomEventListeners();
ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname,
Expand Down Expand Up @@ -305,33 +305,37 @@ class TaskGather extends SttTask {
switch (this.vendor) {
case 'google':
this.bugname = `${this.bugname_prefix}google_transcribe`;
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
this.addCustomEventListener(
ep, GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
this.addCustomEventListener(
ep, GoogleTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;

case 'aws':
case 'polly':
this.bugname = `${this.bugname_prefix}aws_transcribe`;
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AwsTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
this.addCustomEventListener(ep, AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, AwsTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;
case 'microsoft':
this.bugname = `${this.bugname_prefix}azure_transcribe`;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this.addCustomEventListener(
ep, AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, AzureTranscriptionEvents.NoSpeechDetected,
this._onNoSpeechDetected.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
this.addCustomEventListener(ep, AzureTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;
case 'nuance':
this.bugname = `${this.bugname_prefix}nuance_transcribe`;
ep.addCustomEventListener(NuanceTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.StartOfSpeech,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.StartOfSpeech,
this._onStartOfSpeech.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.TranscriptionComplete,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.TranscriptionComplete,
this._onTranscriptionComplete.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.VadDetected,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.VadDetected,
this._onVadDetected.bind(this, cs, ep));

/* stall timers until prompt finishes playing */
Expand All @@ -342,9 +346,10 @@ class TaskGather extends SttTask {

case 'deepgram':
this.bugname = `${this.bugname_prefix}deepgram_transcribe`;
ep.addCustomEventListener(DeepgramTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(
ep, DeepgramTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));

/* if app sets deepgramOptions.utteranceEndMs they essentially want continuous asr */
Expand All @@ -353,12 +358,14 @@ class TaskGather extends SttTask {

case 'soniox':
this.bugname = `${this.bugname_prefix}soniox_transcribe`;
ep.addCustomEventListener(SonioxTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, SonioxTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
break;

case 'cobalt':
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
ep.addCustomEventListener(CobaltTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, CobaltTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));

/* cobalt doesnt have language, it has model, which is required */
if (!this.data.recognizer.model) {
Expand Down Expand Up @@ -388,21 +395,21 @@ class TaskGather extends SttTask {

case 'ibm':
this.bugname = `${this.bugname_prefix}ibm_transcribe`;
ep.addCustomEventListener(IbmTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, IbmTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, IbmTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, IbmTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;

case 'nvidia':
this.bugname = `${this.bugname_prefix}nvidia_transcribe`;
ep.addCustomEventListener(NvidiaTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.StartOfSpeech,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.StartOfSpeech,
this._onStartOfSpeech.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.TranscriptionComplete,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.TranscriptionComplete,
this._onTranscriptionComplete.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.VadDetected,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.VadDetected,
this._onVadDetected.bind(this, cs, ep));

/* I think nvidia has this (??) - stall timers until prompt finishes playing */
Expand All @@ -413,19 +420,21 @@ class TaskGather extends SttTask {

case 'assemblyai':
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`;
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(
ep, AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(
ep, JambonzTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
}
Expand All @@ -437,7 +446,7 @@ class TaskGather extends SttTask {
}

/* common handler for all stt engine errors */
ep.addCustomEventListener(JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error setting channel variables'));
}
Expand Down
13 changes: 10 additions & 3 deletions lib/tasks/stt-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ class SttTask extends Task {
const {
setChannelVarsForStt,
normalizeTranscription,
removeSpeechListeners,
setSpeechCredentialsAtRuntime,
compileSonioxTranscripts,
consolidateTranscripts
} = require('../utils/transcription-utils')(logger);
this.setChannelVarsForStt = setChannelVarsForStt;
this.normalizeTranscription = normalizeTranscription;
this.removeSpeechListeners = removeSpeechListeners;
this.compileSonioxTranscripts = compileSonioxTranscripts;
this.consolidateTranscripts = consolidateTranscripts;

this.eventHandlers = [];
this.isHandledByPrimaryProvider = true;
if (this.data.recognizer) {
const recognizer = this.data.recognizer;
Expand Down Expand Up @@ -140,6 +138,15 @@ class SttTask extends Task {
}
}

addCustomEventListener(ep, event, handler) {
this.eventHandlers.push({ep, event, handler});
ep.addCustomEventListener(event, handler);
}

removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
}

async _initSpeechCredentials(cs, vendor, label) {
const {getNuanceAccessToken, getIbmAccessToken} = cs.srf.locals.dbHelpers;
let credentials = cs.getSpeechCredentials(vendor, 'stt', label);
Expand Down
55 changes: 28 additions & 27 deletions lib/tasks/transcribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TaskTranscribe extends SttTask {
this.logger.info(err, 'TaskTranscribe:exec - error');
this.parentTask && this.parentTask.emit('error', err);
}
this.removeSpeechListeners(ep);
this.removeCustomEventListeners();
}

async _stopTranscription() {
Expand Down Expand Up @@ -142,43 +142,43 @@ class TaskTranscribe extends SttTask {
switch (this.vendor) {
case 'google':
this.bugname = `${this.bugname_prefix}google_transcribe`;
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, GoogleTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected,
this.addCustomEventListener(ep, GoogleTranscriptionEvents.NoAudioDetected,
this._onNoAudio.bind(this, cs, ep, channel));
ep.addCustomEventListener(GoogleTranscriptionEvents.MaxDurationExceeded,
this.addCustomEventListener(ep, GoogleTranscriptionEvents.MaxDurationExceeded,
this._onMaxDurationExceeded.bind(this, cs, ep, channel));
break;

case 'aws':
case 'polly':
this.bugname = `${this.bugname_prefix}aws_transcribe`;
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AwsTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AwsTranscriptionEvents.NoAudioDetected,
this.addCustomEventListener(ep, AwsTranscriptionEvents.NoAudioDetected,
this._onNoAudio.bind(this, cs, ep, channel));
ep.addCustomEventListener(AwsTranscriptionEvents.MaxDurationExceeded,
this.addCustomEventListener(ep, AwsTranscriptionEvents.MaxDurationExceeded,
this._onMaxDurationExceeded.bind(this, cs, ep, channel));
break;
case 'microsoft':
this.bugname = `${this.bugname_prefix}azure_transcribe`;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AzureTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this.addCustomEventListener(ep, AzureTranscriptionEvents.NoSpeechDetected,
this._onNoAudio.bind(this, cs, ep, channel));
break;
case 'nuance':
this.bugname = `${this.bugname_prefix}nuance_transcribe`;
ep.addCustomEventListener(NuanceTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'deepgram':
this.bugname = `${this.bugname_prefix}deepgram_transcribe`;
ep.addCustomEventListener(DeepgramTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect,
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Connect,
this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));

/* if app sets deepgramOptions.utteranceEndMs they essentially want continuous asr */
Expand All @@ -187,12 +187,12 @@ class TaskTranscribe extends SttTask {
break;
case 'soniox':
this.bugname = `${this.bugname_prefix}soniox_transcribe`;
ep.addCustomEventListener(SonioxTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, SonioxTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'cobalt':
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
ep.addCustomEventListener(CobaltTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, CobaltTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));

/* cobalt doesnt have language, it has model, which is required */
Expand Down Expand Up @@ -222,37 +222,38 @@ class TaskTranscribe extends SttTask {

case 'ibm':
this.bugname = `${this.bugname_prefix}ibm_transcribe`;
ep.addCustomEventListener(IbmTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, IbmTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(IbmTranscriptionEvents.Connect,
this.addCustomEventListener(ep, IbmTranscriptionEvents.Connect,
this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, IbmTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));
break;

case 'nvidia':
this.bugname = `${this.bugname_prefix}nvidia_transcribe`;
ep.addCustomEventListener(NvidiaTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;

case 'assemblyai':
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`;
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep,
AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));
break;

default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription,
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.ConnectFailure,
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
}
Expand All @@ -264,7 +265,7 @@ class TaskTranscribe extends SttTask {
}

/* common handler for all stt engine errors */
ep.addCustomEventListener(JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error setting channel variables'));
}
Expand Down
Loading

0 comments on commit be2bea6

Please sign in to comment.