Skip to content

Commit

Permalink
43 router fix 2 dataignore (#46)
Browse files Browse the repository at this point in the history
* rmeermei router fix

* readding the Audio info

* compling router

* final changes in refactor

* refactor of dataIgnore

* forgotten for refactor

* finish refactor of rm's router fix

* a couple of minor debug changes

* add audio info to test json

* Update .travis.yml

python update

Co-authored-by: fbernard <[email protected]>
Co-authored-by: fbernard <[email protected]>
Co-authored-by: fbernard <[email protected]>
  • Loading branch information
4 people authored Nov 23, 2020
1 parent 349495a commit ba91d2c
Show file tree
Hide file tree
Showing 22 changed files with 219 additions and 396 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ matrix:
- mkdir $HOME/jre
- tar xf data.tar.xz -C $HOME/jre
- mkdir $HOME/pydev
- wget -q http://raspbian.raspberrypi.org/raspbian/pool/main/p/python3.5/libpython3.5_3.5.3-1+deb9u2_armhf.deb
- ar x libpython3.5_3.5.3-1+deb9u2_armhf.deb
- wget -q http://raspbian.raspberrypi.org/raspbian/pool/main/p/python3.5/libpython3.5_3.5.3-1+deb9u3_armhf.deb
- ar x libpython3.5_3.5.3-1+deb9u3_armhf.deb
- tar xf data.tar.xz -C $HOME/pydev
- wget -q http://raspbian.raspberrypi.org/raspbian/pool/main/p/python3.5/libpython3.5-dev_3.5.3-1+deb9u2_armhf.deb
- ar x libpython3.5-dev_3.5.3-1+deb9u2_armhf.deb
- wget -q http://raspbian.raspberrypi.org/raspbian/pool/main/p/python3.5/libpython3.5-dev_3.5.3-1+deb9u3_armhf.deb
- ar x libpython3.5-dev_3.5.3-1+deb9u3_armhf.deb
- tar xf data.tar.xz -C $HOME/pydev
- wget -q http://raspbian.raspberrypi.org/raspbian/pool/main/p/python-numpy/python3-numpy_1.12.1-3_armhf.deb
- ar x python3-numpy_1.12.1-3_armhf.deb
Expand Down
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 3.7)

#set(VERBOSE)

if (NOT (PLATFORM STREQUAL "Linux" OR PLATFORM STREQUAL "Android" OR PLATFORM STREQUAL "RaspberryPi" OR PLATFORM STREQUAL "Windows"))
message(FATAL_ERROR "Please specify the PLATFORM variable <Linux, Android, RaspberryPi, Windows")
endif()
Expand Down Expand Up @@ -207,7 +209,7 @@ install (TARGETS godec DESTINATION bin)
install (TARGETS godec_dynamic DESTINATION bin)
install (TARGETS godec_static DESTINATION lib)
install (DIRECTORY "${PROJECT_SOURCE_DIR}/src/include" DESTINATION .)
install (FILES "${PROJECT_SOURCE_DIR}/src/core_components/GodecMessages.h" DESTINATION include/godec/core_components)
install (FILES "${PROJECT_SOURCE_DIR}/src/core_components/GodecMessages.h" DESTINATION include/godec)
install (FILES "${PROJECT_SOURCE_DIR}/src/core_components/AccumCovariance.h" DESTINATION include/godec/core_components)
install (FILES "${PROJECT_SOURCE_DIR}/src/core_components/cnpy.h" DESTINATION include/godec/core_components)
install (FILES "${PROJECT_SOURCE_DIR}/tools/env.sh" DESTINATION .)
Expand Down
7 changes: 4 additions & 3 deletions doc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required(VERSION 3.7)
project(godec_doc)

execute_process(COMMAND perl "GenDoc.pl" "../src/core_components" "../src/core_components,../src" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_FILE "CoreComponents.md")


execute_process(COMMAND perl "GenDoc.pl" "../src/core_components" "../src/core_components,../src" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_FILE "CoreComponents.md" RESULT_VARIABLE ret)
if(NOT (ret EQUAL "0"))
message( FATAL_ERROR "Bad exit status from GenDoc.pl")
endif()
23 changes: 11 additions & 12 deletions doc/CoreComponents.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ This component supports both BinaryDecoderMessage as well as AudioDecoderMessage
| --- | --- |
| streamed\_audio | AudioDecoderMessage,BinaryDecoderMessage|

#### Outputs
| Output slot |
| --- |
| audio\_info |


## Average
Expand Down Expand Up @@ -214,7 +218,7 @@ Component that writes stream to file
### Extended description:
The writing equivalent to the FileFeeder component, for saving output. Available "input_type":

"audio": For writing AudioDecoderMessage messages. "output_file_prefix" specifies the path prefix that each utterance gets written to
"audio": For writing AudioDecoderMessage messages. "output_file_prefix" specifies the path prefix that each utterance gets written to. The incoming audio are float values expected to be normalied to -1.0/1.0 range

"raw_text": BinaryDecoderMessage expected that gets converted into text and written into "output_file"

Expand Down Expand Up @@ -320,7 +324,7 @@ Applies a matrix (from a stream) to an incoming feature stream
Merges streams that were created by Router. Streams are specified as input\_stream\_0, input\_stream\_1 etc, up to \"num\_streams\". Time map is the one created by the Router

### Extended description:
Refer to the Router extended description to a more detailed description
Refer to the Router extended description to a more detailed description. This component expects all conversation states from the upstream Router, as well as all the processed streams



Expand All @@ -332,8 +336,8 @@ Refer to the Router extended description to a more detailed description
#### Inputs
| Input slot | Message Type |
| --- | --- |
| conversation\_state\_[0-9] | ConverstionStateDecoderMessage|
| input\_streams\_[0-9] | AnyDecoderMessage|
| time\_map | TimeMapDecoderMessage|

#### Outputs
| Output slot |
Expand Down Expand Up @@ -410,15 +414,13 @@ Look at `test/python_test.json` for an example.
Splits an incoming stream into separate streams, based on the binary decision of another input stream

### Extended description:
The router component splits an incoming stream into several streams ("num_outputs"), with two major modes:
The router component can be used to split streams across several branches (e.g. if you have a language detector upstream and want to direct the streams to the language-specific component). The mechanism is very easy, the Router create N output conversation state message streams, each of which has the "ignore_data" set to true or false, depending on the routing at that point. The branches' components can check this flag and do some optimization (while still emitting empty messages that account for the stream time). So, all branch components see all the data. The Merger component can be used downstream to merge the output together (it chooses the right results from each stream according to those conversation state messages.

"sad_nbest": "routing_stream" is expected to be an NbestDecoderMessage, where the 0-th nbest entry is expected to contain a sequence of 0 (nonspeech) or 1 (speech), which the router will use to route the stream

"utterance_round_robin": Simple round robin on an utterance-by-utterance basis (pass in ConversationStateDecoerMessage for "routing_stream")
There are two modes the Router decides where to route:

Very important note: The output streams have completely new timestamps, that's why for each "output_stream_" stream there is a corresponding "conversation_state_" for that stream. If you from there on you only care about these downstream streams, there is no issue, HOWEVER, you can't combine one of these streams with another stream from further upstream, since they have entirely different timestamps. If you WANT to combine things again, you need to re-merge these redefined streams together with the `Merger` component. The `Merger` component takes in the `time_map` stream which tells it how to combine the separate stream into one original one again.
"sad_nbest": "routing_stream" is expected to be an NbestDecoderMessage, where the 0-th nbest entry is expected to contain a sequence of 0 (nonspeech) or 1 (speech), which the router will use to route the stream

One more subtle note: At the end of a "conversation" (as defined by the ConversationStateDecoderMessage), in order for all substreams to see this signal, the Router holds off a small amount so it can spread it among all streams at the end with that signal. When you re-merge the streams with the `Merger` component, this will result in a sequence of very short utterances that the downstream components should be able to deal with. This is an obscure edge case that only occurs at the of conversations.
"utterance_round_robin": Simple round robin on an utterance-by-utterance basis



Expand All @@ -432,14 +434,11 @@ One more subtle note: At the end of a "conversation" (as defined by the Conversa
| Input slot | Message Type |
| --- | --- |
| routing\_stream | AnyDecoderMessage|
| stream\_to\_route | AnyDecoderMessage|

#### Outputs
| Output slot |
| --- |
| Slot: conversation\_state\_[0-9] |
| Slot: output\_stream\_[0-9] |
| time\_map |


## SoundcardPlayer
Expand Down
6 changes: 4 additions & 2 deletions doc/GenDoc.pl
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
my $slotDefs = GetSlotDefs($ARGV[1]);
my @filesToScan;
foreach my $scanDir (@scanDirs) {
push @filesToScan, glob($scanDir."/*.h*");
my @newFiles = grep /.*\.h[^\.]*$/, glob($scanDir."/*.h*");
push @filesToScan, @newFiles;
}

@filesToScan = sort @filesToScan;

sub GetSlotDefs {
Expand Down Expand Up @@ -40,7 +42,7 @@ sub ScanFile {
my $path = shift;
my $slotDefs = shift;
my @hFiles = glob($path."/".$fileBase.".h*");
if (scalar(@hFiles) == 0) {die "No matching .h* file found";}
if (scalar(@hFiles) == 0) {die "No matching .h* file found for ".$path."/".$fileBase;}
my $hFile = $hFiles[0];
open(HIN,"<$hFile") || die "Can't open ".$hFile;
my %outHash;
Expand Down
4 changes: 0 additions & 4 deletions doc/Profiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,3 @@ Do the following steps:

Note that there is a difference between "algorithmic" latency and "realtime" latency. Algorithmic latency is the latency the algorithm of a component incurs, e.g. a component might need 1 second of input audio to create some output. Realtime latency is the combination of the algorithmic latency plus whatever the CPU incurs (thus making it machine-dependent). In order to only measure the algorithmic latency, slow down the input to a very slow pace.

**Gotcha**

Some networks use Router and Merger components (e.g. to separate speech and nonspeech audio). The components between the Merger and the Router have different timestamps than the rest of the network, so if you for example measure between the initial FileFeeder and a Decoder that is inside the Router-Merge combo, the latency will be nonsensical. In this case the reference point needs to also be inside that Router-Merge span.

24 changes: 24 additions & 0 deletions src/ChannelMessenger.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <godec/ChannelMessenger.h>
#include <godec/ComponentGraph.h>
#include "core_components/GodecMessages.h"
#include "core_components/Router.h"
#include <godec/json.hpp>
#include <boost/bimap.hpp>
#include <boost/foreach.hpp>
Expand Down Expand Up @@ -40,6 +41,9 @@ std::string LoopProcessor::SlotTimeMap = "time_map";
std::string LoopProcessor::SlotControl = "control";
std::string LoopProcessor::SlotSearchOutput = "fst_search_output";
std::string LoopProcessor::SlotAudioInfo = "audio_info";
std::string LoopProcessor::SlotInputStreamPrefix = "input_stream_";
std::string LoopProcessor::SlotOutputStream = "output_stream";


std::string DecoderMessage::describeThyself() const {
std::stringstream ss;
Expand Down Expand Up @@ -84,6 +88,7 @@ void DecoderMessage::PythonGetDecoderMessageVals(PyObject* pMsg, std::string& ta
*/
LoopProcessor::LoopProcessor(std::string id, ComponentGraphConfig* pt) : mVerbose(false), mIsFinished(false) {
mId = id;

mComponentGraph = pt->GetComponentGraph();

if (pt->get_optional_READ_DECLARATION_BEFORE_USE<bool>("verbose")) {
Expand Down Expand Up @@ -140,6 +145,7 @@ void LoopProcessor::initOutputs(std::list<std::string> requiredSlots) {

void LoopProcessor::connectInputs(unordered_map<std::string, std::set<uuid>> requiredSlots) {
bool convoStateAlreadyDefined = false;

for (auto it = mInputSlots.begin(); it != mInputSlots.end(); it++) {
if (it->first == SlotConversationState) convoStateAlreadyDefined = true;
}
Expand Down Expand Up @@ -358,6 +364,24 @@ void LoopProcessor::pushToOutputs(std::string slot, DecoderMessage_ptr msg) {
}
}

void LoopProcessor::ProcessIgnoreDataMessageBlock(const DecoderMessageBlock& msgBlock) {
for(int streamIdx = 0; streamIdx < mNumStreams; streamIdx++) {
std::stringstream inputStreamSs;
inputStreamSs << SlotInputStreamPrefix << streamIdx;
auto streamBaseMsg = msgBlock.getBaseMsg(inputStreamSs.str());
std::stringstream convStateSs;
convStateSs << SlotConversationState << "_" << streamIdx;
auto convStateMsg = msgBlock.get<ConversationStateDecoderMessage>(convStateSs.str());
auto ignoreData = convStateMsg->getDescriptor(RouterComponent::IgnoreData);
if (ignoreData == "") GODEC_ERR << getLPId() << ": Stream " << streamIdx << " has a conversation state connected to it that did not come from a Router!";
if (ignoreData == "false") {
pushToOutputs(SlotOutputStream, streamBaseMsg);
}
}
}



GlobalComponentGraphVals::GlobalComponentGraphVals() {
put<bool>(LoopProcessor::GatherRuntimeStats, false);
put<bool>(LoopProcessor::QuietGodec, false);
Expand Down
19 changes: 9 additions & 10 deletions src/core_components/FileWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,19 @@ void FileWriterComponent::WriteJsonOutput(boost::shared_ptr<const JsonDecoderMes
auto audioMsg =msgBlock.get<AudioDecoderMessage>(SlotStreamedAudio);
float secondsPerTick = 1.0 / (audioMsg->mSampleRate*audioMsg->mTicksPerSample);

// Ralf: This is an incredibly ugly allowance for the fact that CTMs are in reference to the analist's audio file
utterance_offset_in_file = audioMsg->getDescriptor("utterance_offset_in_file") != "" ? boost::lexical_cast<float>(audioMsg->getDescriptor("utterance_offset_in_file")) : 0;
// This is an incredibly ugly allowance for the fact that CTMs are in reference to the analist's audio file
if (audioMsg->getDescriptor("utterance_offset_in_file") != "" && boost::lexical_cast<float>(audioMsg->getDescriptor("utterance_offset_in_file")) != mPrevConvoEndTimeInSecondsStreamBased) {
mPrevConvoEndTimeInSecondsStreamBased = boost::lexical_cast<float>(audioMsg->getDescriptor("utterance_offset_in_file"));
}
std::string file = audioMsg->getDescriptor("wave_file_name") != "" ? audioMsg->getDescriptor("wave_file_name") : "dummy";
std::string channel = audioMsg->getDescriptor("channel") == "1" ? "A" : "B";
json_output_writer << std::fixed << std::setprecision(2);
json j = jsonMsg->getJsonObj();
if (jsonOutputFormat == "ctm") {
for (size_t i = 0; i < j["words"].size(); ++i) {
json w = j["words"][i];
float wordBeginInSeconds = utterance_offset_in_file + mPrevConvoEndTimeInSecondsStreamBased + (w["beginTime"].get<int64_t>() - mPrevConvoEndTimeInTicksStreamBased + 1) * secondsPerTick;

float wordBeginInSeconds = mPrevConvoEndTimeInSecondsStreamBased + (w["beginTime"].get<int64_t>() - mPrevConvoEndTimeInTicksStreamBased + 1) * secondsPerTick;
float wordDurationInSeconds = w["duration"].get<int64_t>() * secondsPerTick;

json_output_writer << file << " "
Expand All @@ -215,7 +218,7 @@ void FileWriterComponent::WriteJsonOutput(boost::shared_ptr<const JsonDecoderMes
json_output_writer << std::endl;
}
} else if (jsonOutputFormat == "fst_search") {
float beginTime = utterance_offset_in_file + mPrevConvoEndTimeInSecondsStreamBased + (j["beginTime"].get<int64_t>() - mPrevConvoEndTimeInTicksStreamBased + 1) * secondsPerTick;
float beginTime = mPrevConvoEndTimeInSecondsStreamBased + (j["beginTime"].get<int64_t>() - mPrevConvoEndTimeInTicksStreamBased + 1) * secondsPerTick;
for (size_t i = 0; i < j["searchOutput"].size(); ++i) {
json& w = j["searchOutput"][i];
float wordBeginInSeconds = beginTime + w["relativeBeginTime"].get<int64_t>() * secondsPerTick;
Expand All @@ -230,15 +233,11 @@ void FileWriterComponent::WriteJsonOutput(boost::shared_ptr<const JsonDecoderMes
}
}

// Ralf: This is an incredibly ugly allowance for the fact that CTMs are in reference to the analist's audio file. Pt II
// This is an incredibly ugly allowance for the fact that CTMs are in reference to the analist's audio file. Pt II
auto origConvMsg = msgBlock.get<ConversationStateDecoderMessage>(SlotFileFeederConvstate);
if (origConvMsg->mLastChunkInUtt) {
mPrevConvoEndTimeInSecondsStreamBased = 0.0;
} else {
int64_t timeDeltaInTicks = origConvMsg->getTime() - mPrevConvoEndTimeInTicksStreamBased;
mPrevConvoEndTimeInSecondsStreamBased += timeDeltaInTicks * secondsPerTick;
mPrevConvoEndTimeInTicksStreamBased = convMsg->getTime();
}
mPrevConvoEndTimeInTicksStreamBased = origConvMsg->getTime();
} else if (jsonOutputFormat == "mt") {
json j = jsonMsg->getJsonObj();
const std::string& translatedText = j["translatedText"].get<std::string>();
Expand Down
1 change: 0 additions & 1 deletion src/core_components/FileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class FileWriterComponent : public LoopProcessor {
static std::string SlotFileFeederConvstate;
float mPrevConvoEndTimeInSecondsStreamBased;
int64_t mPrevConvoEndTimeInTicksStreamBased;
float utterance_offset_in_file;

std::string mControlType;
boost::shared_ptr<FileWriterHolder> mCurrentFWH;
Expand Down
71 changes: 0 additions & 71 deletions src/core_components/GodecMessages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1016,77 +1016,6 @@ DecoderMessage_ptr ConversationStateDecoderMessage::fromPython(PyObject* pMsg) {
}
#endif


/*
############ TimeMap decoder message ###################
*/

std::string TimeMapDecoderMessage::describeThyself() const {
std::stringstream ss;
ss << DecoderMessage::describeThyself();
ss << "TimeMap, route " << mMapping.routeIndex << " (" << mMapping.startOrigTime << "," << mMapping.endOrigTime << ")->(" << mMapping.startMappedTime << "," << mMapping.endMappedTime << ")" << std::endl;
return ss.str();
}

DecoderMessage_ptr TimeMapDecoderMessage::create(uint64_t _time, TimeMapEntry timeMapping) {
TimeMapDecoderMessage* msg = new TimeMapDecoderMessage();
msg->setTime(_time);
msg->mMapping = timeMapping;
return DecoderMessage_ptr(msg);
}



DecoderMessage_ptr TimeMapDecoderMessage::clone() const { return DecoderMessage_ptr(new TimeMapDecoderMessage(*this)); }
bool TimeMapDecoderMessage::mergeWith(DecoderMessage_ptr msg, DecoderMessage_ptr &remainingMsg, bool verbose) {
auto newTimeMapMsg = boost::static_pointer_cast<const TimeMapDecoderMessage>(msg);
if (mMapping.routeIndex == newTimeMapMsg->mMapping.routeIndex) {
mMapping.endMappedTime = newTimeMapMsg->mMapping.endMappedTime;
mMapping.endOrigTime = newTimeMapMsg->mMapping.endOrigTime;
setTime(newTimeMapMsg->getTime());
return false;
} else {
remainingMsg = msg;
return true;
}
}
bool TimeMapDecoderMessage::canSliceAt(uint64_t sliceTime, std::vector<DecoderMessage_ptr>& msgList, uint64_t streamStartOffset, bool verbose) {
auto firstMsg = boost::static_pointer_cast<const TimeMapDecoderMessage>(msgList[0]);
// Convo state can only be sliced at boundary
return firstMsg->getTime() == sliceTime;
}
bool TimeMapDecoderMessage::sliceOut(uint64_t sliceTime, DecoderMessage_ptr& sliceMsg, std::vector<DecoderMessage_ptr>& msgList, int64_t streamStartOffset, bool verbose) {
auto firstMsg = boost::static_pointer_cast<const TimeMapDecoderMessage>(msgList[0]);
if (firstMsg->getTime() == sliceTime) {
sliceMsg = msgList[0];
msgList.erase(msgList.begin());
return true;
} else {
GODEC_ERR << "We should never be here";
}
return false;
}

void TimeMapDecoderMessage::shiftInTime(int64_t deltaT) {
GODEC_ERR << "shiftInTime() was called on a TimeMapDecoderMessage. This likely happened because the timemap stream was passed into a Router component as the stream to be routed. This is probably a mistake.";
}

jobject TimeMapDecoderMessage::toJNI(JNIEnv* env) {
GODEC_ERR << "TimeMapDecoderMessage::toJNI not implemented yet!";
return NULL;
};

#ifndef ANDROID
PyObject* TimeMapDecoderMessage::toPython() {
GODEC_ERR << "TimeMapDecoderMessage::toPython not implemented yet";
return nullptr;
}
DecoderMessage_ptr TimeMapDecoderMessage::fromPython(PyObject* pMsg) {
GODEC_ERR << "TimeMapDecoderMessage::fromPython not implemented yet";
return DecoderMessage_ptr();
}
#endif

//############ Raw text message ###################

std::string BinaryDecoderMessage::describeThyself() const {
Expand Down
Loading

0 comments on commit ba91d2c

Please sign in to comment.