Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert data to Mongo #37

Merged
merged 10 commits into from
Jul 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions config.toml

This file was deleted.

20 changes: 11 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ func parseConfigFile(config *AppConfig, filename string) error {

// AppConfig contains the app config variables.
type AppConfig struct {
EmailUsername string
EmailPassword string
IBMUsername string
IBMPassword string
AccountID string
ApplicationKey string
BucketName string
Debug bool
SecretKey string
BackblazeAccountID string
BackblazeApplicationKey string
BackblazeBucket string
Debug bool
EmailUsername string
EmailPassword string
IBMUsername string
IBMPassword string
MongoURL string
Port int
SecretKey string
}
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package main

import (
"fmt"
"net/http"
_ "net/http/pprof" // import for side effects
"os"
Expand All @@ -27,7 +28,10 @@ func main() {
// serve http
http.Handle("/", middlewareRouter)
http.Handle("/static/", http.FileServer(http.Dir(".")))
if err := http.ListenAndServe(":8080", nil); err != nil {

log.Infof("Server is running at http://localhost:%d", config.Config.Port)
addr := fmt.Sprintf(":%d", config.Config.Port)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Error(err)
}
}
4 changes: 3 additions & 1 deletion tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks

import (
"math/rand"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -130,7 +131,8 @@ func (ex *defaultExecuter) completeTask(id string, task func(string) error, onFa
defer func() {
if r := recover(); r != nil {
log.WithField("task", id).
Error("Task failed")
Errorln("Task failed", r)
debug.PrintStack()
go onFailure(id, "The error message is below. Please check logs for more details."+"\n\n"+"panic occurred")
ex.cMap.setStatus(id, FAILURE)
}
Expand Down
36 changes: 31 additions & 5 deletions transcription/ibm.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,37 @@ func keepConnectionOpen(ws *websocket.Conn, ticker *time.Ticker, quit chan struc
}
}

// GetTranscript gets the full transcript from an IBMResult.
func GetTranscript(res *IBMResult) string {
var buffer bytes.Buffer
// GetTranscription gets the full transcript from an IBMResult.
func GetTranscription(res *IBMResult) *Transcription {
timestamps := []Timestamp{}
confidences := []Confidence{}

var transcriptBuffer bytes.Buffer
for _, subResult := range res.Results {
buffer.WriteString(subResult.Alternatives[0].Transcript)
bestHypothesis := subResult.Alternatives[0]
transcriptBuffer.WriteString(bestHypothesis.Transcript)
for _, timestamp := range bestHypothesis.Timestamps {
timestamps = append(timestamps, Timestamp{
Word: timestamp[0].(string),
StartTime: timestamp[1].(float64),
EndTime: timestamp[2].(float64),
})
}
for _, confidence := range bestHypothesis.WordConfidence {
confidences = append(confidences, Confidence{
Word: confidence[0].(string),
Score: confidence[1].(float64),
})
}
}

transcription := &Transcription{
Transcript: transcriptBuffer.String(),
CompletedAt: time.Now(),
Metadata: Metadata{
Timestamps: timestamps,
Confidences: confidences,
},
}
return buffer.String()
return transcription
}
144 changes: 109 additions & 35 deletions transcription/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"gopkg.in/kothar/go-backblaze.v0"
"gopkg.in/mgo.v2"

log "github.com/Sirupsen/logrus"
"github.com/jordan-wright/email"
Expand Down Expand Up @@ -86,6 +87,8 @@ func DownloadFileFromURL(url string) (string, error) {
func filePathFromURL(url string) string {
tokens := strings.Split(url, "/")
filePath := tokens[len(tokens)-1]
filePath = strings.Split(filePath, ".")[0]

// ensure the filePath is unique by appending timestamp
filePath = filePath + strconv.Itoa(int(time.Now().UnixNano()))
return filePath
Expand All @@ -104,20 +107,23 @@ func SplitWavFile(wavFilePath string) ([]string, error) {
// As a chunk of the Wav file is extracted using FFMPEG, it is converted back into Flac format.
numChunks, err := getNumChunks(wavFilePath)
if err != nil {
return []string{}, err
return []string{}, errors.Trace(err)
}
if numChunks == 1 {
return []string{wavFilePath}, nil
}

chunkLengthInSeconds := 2968
names := make([]string, numChunks)
for i := 0; i < numChunks; i++ {
// 5 seconds of redundancy for each chunk after the first
startingSecond := i*chunkLengthInSeconds - (i-1)*5
newFilePath := wavFilePath + strconv.Itoa(i)
if err := extractAudioSegment(newFilePath, startingSecond, chunkLengthInSeconds); err != nil {
return []string{}, err
newFilePath := strconv.Itoa(i) + "_" + wavFilePath
if err := extractAudioSegment(wavFilePath, newFilePath, startingSecond, chunkLengthInSeconds); err != nil {
return []string{}, errors.Trace(err)
}
if _, err := ConvertAudioIntoFormat(newFilePath, "flac"); err != nil {
return []string{}, err
return []string{}, errors.Trace(err)
}
names[i] = newFilePath
}
Expand All @@ -129,13 +135,13 @@ func SplitWavFile(wavFilePath string) ([]string, error) {
func getNumChunks(filePath string) (int, error) {
file, err := os.Open(filePath)
if err != nil {
return -1, err
return -1, errors.Trace(err)
}
defer file.Close()

stat, err := file.Stat()
if err != nil {
return -1, err
return -1, errors.Trace(err)
}

wavFileSize := int(stat.Size())
Expand All @@ -147,16 +153,17 @@ func getNumChunks(filePath string) (int, error) {
}

// extractAudioSegment uses FFMPEG to write a new audio file starting at a given time of a given length
func extractAudioSegment(filePath string, ss int, t int) error {
func extractAudioSegment(inFilePath string, outFilePath string, ss int, t int) error {
// -ss: starting second, -t: time in seconds
cmd := exec.Command("ffmpeg", "-i", filePath, "-ss", strconv.Itoa(ss), "-t", strconv.Itoa(t), filePath)
if err := cmd.Run(); err != nil {
return err
cmd := exec.Command("ffmpeg", "-i", inFilePath, "-ss", strconv.Itoa(ss), "-t", strconv.Itoa(t), outFilePath)
if out, err := cmd.CombinedOutput(); err != nil {
return errors.New(err.Error() + "\nOutput:\n" + string(out))
}
return nil
}

// MakeIBMTaskFunction returns a task function for transcription using IBM transcription functions.
// TODO(#52): Quite a lot of the transcription process could be done concurrently.
func MakeIBMTaskFunction(audioURL string, emailAddresses []string, searchWords []string) (task func(string) error, onFailure func(string, string)) {
task = func(id string) error {
filePath, err := DownloadFileFromURL(audioURL)
Expand Down Expand Up @@ -186,42 +193,53 @@ func MakeIBMTaskFunction(audioURL string, emailAddresses []string, searchWords [
}

log.WithField("task", id).
Debugf("Split file %s into %d file(s)", filePath, len(wavPath))
Debugf("Split file %s into %d file(s)", filePath, len(wavPaths))

for i := 0; i < len(wavPaths); i++ {
filePath := wavPaths[i]
flacPath, err := ConvertAudioIntoFormat(filePath, "flac")
if err != nil {
return errors.Trace(err)
}
defer os.Remove(flacPath)
// for i := 0; i < len(wavPaths); i++ {
// wavPath := wavPaths[i]
flacPath, err := ConvertAudioIntoFormat(wavPath, "flac")
if err != nil {
return errors.Trace(err)
}
defer os.Remove(flacPath)

log.WithField("task", id).
Debugf("Converted file %s to %s", filePath, flacPath)
log.WithField("task", id).
Debugf("Converted file %s to %s", wavPath, flacPath)

ibmResult, err := TranscribeWithIBM(flacPath, config.Config.IBMUsername, config.Config.IBMPassword)
if err != nil {
return errors.Trace(err)
}
// }
transcription := GetTranscription(ibmResult)

ibmResult, err := TranscribeWithIBM(flacPath, config.Config.IBMUsername, config.Config.IBMPassword)
if err != nil {
return errors.Trace(err)
}
transcript := GetTranscript(ibmResult)
audioURL, err := UploadFileToBackblaze(filePath, config.Config.BackblazeAccountID, config.Config.BackblazeApplicationKey, config.Config.BackblazeBucket)
if err != nil {
return errors.Trace(err)
}
transcription.AudioURL = audioURL

log.WithField("task", id).
Info(transcript)
log.WithField("task", id).
Debugf("Uploaded %s to backblaze", filePath)

// TODO: save data to MongoDB and file to Backblaze.
if err := WriteToMongo(transcription, config.Config.MongoURL); err != nil {
return errors.Trace(err)
}

if err := SendEmail(config.Config.EmailUsername, config.Config.EmailPassword, "smtp.gmail.com", 25, emailAddresses, fmt.Sprintf("IBM Transcription %s Complete", id), "The transcript is below. It can also be found in the database."+"\n\n"+transcript); err != nil {
return errors.Trace(err)
}
log.WithField("task", id).
Debugf("Wrote to mongo")

log.WithField("task", id).
Debugf("Sent email to %v", emailAddresses)
if err := SendEmail(config.Config.EmailUsername, config.Config.EmailPassword, "smtp.gmail.com", 587, emailAddresses, fmt.Sprintf("IBM Transcription %s Complete", id), "The transcript is below. It can also be found in the database."+"\n\n"+transcription.Transcript); err != nil {
return errors.Trace(err)
}

log.WithField("task", id).
Debugf("Sent email to %v", emailAddresses)
return nil
}

onFailure = func(id string, errMessage string) {
err := SendEmail(config.Config.EmailUsername, config.Config.EmailPassword, "smtp.gmail.com", 25, emailAddresses, fmt.Sprintf("IBM Transcription %s Failed", id), errMessage)
err := SendEmail(config.Config.EmailUsername, config.Config.EmailPassword, "smtp.gmail.com", 587, emailAddresses, fmt.Sprintf("IBM Transcription %s Failed", id), errMessage)
if err != nil {
log.WithField("task", id).
Debugf("Could not send error email to %v because of the error %v", emailAddresses, err.Error())
Expand Down Expand Up @@ -267,3 +285,59 @@ func UploadFileToBackblaze(filePath string, accountID string, applicationKey str
}
return url, nil
}

type mgoLogger struct{}

func (mgoLogger) Output(_ int, s string) error {
log.Debug(s)
return nil
}

// Transcription contains the full transcription and other information.
type Transcription struct {
Transcript string
AudioURL string
CompletedAt time.Time
Metadata Metadata
}

// Metadata contains timestamps and confidences.
type Metadata struct {
Timestamps []Timestamp
Confidences []Confidence
}

// Timestamp is when a word occurs.
type Timestamp struct {
Word string
StartTime float64
EndTime float64
}

// Confidence is the estimated likelihood (from 0 to 1) that the transribed word is correct.
type Confidence struct {
Word string
Score float64
}

// WriteToMongo takes a string and writes it to the database
func WriteToMongo(data *Transcription, url string) error {
mgo.SetLogger(mgoLogger{})
session, err := mgo.Dial(url)
if err != nil {
return err
}
defer session.Close()

session.SetMode(mgo.Monotonic, true)

c := session.DB("database").C("transcriptions")

// Insert data
err = c.Insert(&data)
if err != nil {
return err
}

return nil
}
3 changes: 1 addition & 2 deletions vendor/gopkg.in/kothar/go-backblaze.v0/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/gopkg.in/kothar/go-backblaze.v0/apitypes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading