Skip to content

Commit

Permalink
WIP. DB size request response is stored in backend database
Browse files Browse the repository at this point in the history
  • Loading branch information
justinclift committed Oct 5, 2023
1 parent 8b6047b commit efdda0b
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 403 deletions.
96 changes: 41 additions & 55 deletions common/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (

// MQDebug controls whether to output - via Log.Print*() functions - useful messages during processing. Mostly
// useful for development / debugging purposes
MQDebug = 1
MQDebug = 2
)

//// CloseMQChannel closes an open AMQP channel
Expand Down Expand Up @@ -307,31 +307,56 @@ func LiveRowData(liveNode, loggedInUser, dbOwner, dbName string, reqData LiveDBR
return
}

// LiveSize asks our AMQP backend for the file size of a database
// LiveSize asks our MQ backend for the file size of a database
func LiveSize(liveNode, loggedInUser, dbOwner, dbName string) (size int64, err error) {
// Send the size request to our AMQP backend
var rawResponse []byte
rawResponse, err = MQRequest(liveNode, "size", loggedInUser, dbOwner, dbName, "")
//var rawResponse []byte
//rawResponse, err = MQRequest(liveNode, "size", loggedInUser, dbOwner, dbName, "")
//rawResponse, err = MQRequest(AmqpChan, liveNode, "size", loggedInUser, dbOwner, dbName, "")

// Create a job to place on the queue. A job must specify a queue, but its payload may be empty
payload := make(map[string]any)
payload["operation"] = "size"
payload["requestinguser"] = loggedInUser
payload["dbowner"] = dbOwner
payload["dbname"] = dbName
j := &jobs.Job{Queue: liveNode, Payload: payload}

done := make(chan bool)
jobID, err := Nq.Enqueue(context.Background(), j)
if err != nil {
log.Printf("Unable to enqueue job: %s", err)
return
}

// Decode the response
var resp LiveDBSizeResponse
err = json.Unmarshal(rawResponse, &resp)
// Wait for the queued job to complete
<-done

// Look for the job response in our database backend
foo, err := MQGetResponse(jobID)
if err != nil {
return
}
if resp.Error != "" {
err = errors.New(resp.Error)
return
}
if resp.Node == "" {
log.Println("A node responded to a 'size' request, but didn't identify itself.")
return
}
size = resp.Size

log.Printf("%s", foo)


//// Decode the response
//var resp LiveDBSizeResponse
//err = json.Unmarshal(rawResponse, &resp)
//if err != nil {
// return
//}
//if resp.Error != "" {
// err = errors.New(resp.Error)
// return
//}
//if resp.Node == "" {
// log.Println("A node responded to a 'size' request, but didn't identify itself.")
// return
//}

//size = resp.Size
return
}

Expand Down Expand Up @@ -411,45 +436,6 @@ func LiveViews(liveNode, loggedInUser, dbOwner, dbName string) (views []string,
return
}

// MQResponse sends an AMQP response back to its requester
func MQResponse(requestType string, nodeName string, responseData interface{}) (err error) {
//func MQResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, responseData interface{}) (err error) {
var z []byte
z, err = json.Marshal(responseData)
if err != nil {
log.Println(err)
// It's super unlikely we can safely return here without ack-ing the message. So as something has gone
// wrong with json.Marshall() we'd better just attempt passing back info about that error message instead (!)
z = []byte(fmt.Sprintf(`{"node":"%s","error":"%s"}`, nodeName, err.Error())) // This is a LiveDBErrorResponse structure
}

log.Println(z)

//// Send the message
//ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
//defer cancel()
//err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
// amqp.Publishing{
// ContentType: "text/json",
// CorrelationId: msg.CorrelationId,
// Body: z,
// })
//if err != nil {
// log.Println(err)
//}

//// Acknowledge the request, so it doesn't stick around in the queue
//err = msg.Ack(false)
//if err != nil {
// log.Println(err)
//}
//
//if MQDebug > 0 {
// log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo)
//}
return
}

//// MQCreateDBQueue creates a queue on the MQ server for "create database" messages
//func MQCreateDBQueue(Nq neoq.Neoq) (queue amqp.Queue, err error) {
// queue, err = channel.QueueDeclare("create_queue", true, false, false, false, nil)
Expand Down
32 changes: 32 additions & 0 deletions common/postgresql_mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package common

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
)

Expand Down Expand Up @@ -33,3 +35,33 @@ func MQGetResponse(jobID string) (resp map[string]any, err error){
}
return
}

// MQResponse stores an MQ response in the backend database
func MQResponse(jobID string, nodeName string, responseData any) (err error) {
var z []byte
z, err = json.Marshal(responseData)
if err != nil {
log.Println(err)
// It's super unlikely we can safely return here without ack-ing the message. So as something has gone
// wrong with json.Marshall() we'd better just attempt passing back info about that error message instead (!)
z = []byte(fmt.Sprintf(`{"node":"%s","error":"%s"}`, nodeName, err.Error())) // This is a LiveDBErrorResponse structure
}

log.Printf("MQResponse(): %s", z)

dbQuery := `
INSERT INTO neoq_job_responses (job_id, status, payload)
VALUES ($1, $2, $3)`
_, err = pdb.Exec(context.Background(), dbQuery, jobID, 0, responseData)
if err != nil {
log.Printf("Error when storing an MQ job response in the database: %v", err)
return
}

// FIXME: Mark the request as completed

if MQDebug > 0 {
log.Printf("Live node '%s' responded to request with jobID: '%s'", nodeName, jobID)
}
return
}
Loading

0 comments on commit efdda0b

Please sign in to comment.