diff --git a/common/live.go b/common/live.go index 79359b44a..c3ef18566 100644 --- a/common/live.go +++ b/common/live.go @@ -25,7 +25,7 @@ var ( // MQDebug controls whether to output - via Log.Print*() functions - useful messages during processing. Mostly // useful for development / debugging purposes - MQDebug = 1 + MQDebug = 2 ) //// CloseMQChannel closes an open AMQP channel @@ -307,31 +307,56 @@ func LiveRowData(liveNode, loggedInUser, dbOwner, dbName string, reqData LiveDBR return } -// LiveSize asks our AMQP backend for the file size of a database +// LiveSize asks our MQ backend for the file size of a database func LiveSize(liveNode, loggedInUser, dbOwner, dbName string) (size int64, err error) { // Send the size request to our AMQP backend - var rawResponse []byte - rawResponse, err = MQRequest(liveNode, "size", loggedInUser, dbOwner, dbName, "") + //var rawResponse []byte + //rawResponse, err = MQRequest(liveNode, "size", loggedInUser, dbOwner, dbName, "") //rawResponse, err = MQRequest(AmqpChan, liveNode, "size", loggedInUser, dbOwner, dbName, "") + + // Create a job to place on the queue. A job must specify a queue, but its payload may be empty + payload := make(map[string]any) + payload["operation"] = "size" + payload["requestinguser"] = loggedInUser + payload["dbowner"] = dbOwner + payload["dbname"] = dbName + j := &jobs.Job{Queue: liveNode, Payload: payload} + + done := make(chan bool) + jobID, err := Nq.Enqueue(context.Background(), j) if err != nil { + log.Printf("Unable to enqueue job: %s", err) return } - // Decode the response - var resp LiveDBSizeResponse - err = json.Unmarshal(rawResponse, &resp) + // Wait for the queued job to complete + <-done + + // Look for the job response in our database backend + foo, err := MQGetResponse(jobID) if err != nil { return } - if resp.Error != "" { - err = errors.New(resp.Error) - return - } - if resp.Node == "" { - log.Println("A node responded to a 'size' request, but didn't identify itself.") - return - } - size = resp.Size + + log.Printf("%s", foo) + + + //// Decode the response + //var resp LiveDBSizeResponse + //err = json.Unmarshal(rawResponse, &resp) + //if err != nil { + // return + //} + //if resp.Error != "" { + // err = errors.New(resp.Error) + // return + //} + //if resp.Node == "" { + // log.Println("A node responded to a 'size' request, but didn't identify itself.") + // return + //} + + //size = resp.Size return } @@ -411,45 +436,6 @@ func LiveViews(liveNode, loggedInUser, dbOwner, dbName string) (views []string, return } -// MQResponse sends an AMQP response back to its requester -func MQResponse(requestType string, nodeName string, responseData interface{}) (err error) { -//func MQResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, responseData interface{}) (err error) { - var z []byte - z, err = json.Marshal(responseData) - if err != nil { - log.Println(err) - // It's super unlikely we can safely return here without ack-ing the message. So as something has gone - // wrong with json.Marshall() we'd better just attempt passing back info about that error message instead (!) - z = []byte(fmt.Sprintf(`{"node":"%s","error":"%s"}`, nodeName, err.Error())) // This is a LiveDBErrorResponse structure - } - - log.Println(z) - - //// Send the message - //ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - //defer cancel() - //err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false, - // amqp.Publishing{ - // ContentType: "text/json", - // CorrelationId: msg.CorrelationId, - // Body: z, - // }) - //if err != nil { - // log.Println(err) - //} - - //// Acknowledge the request, so it doesn't stick around in the queue - //err = msg.Ack(false) - //if err != nil { - // log.Println(err) - //} - // - //if MQDebug > 0 { - // log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo) - //} - return -} - //// MQCreateDBQueue creates a queue on the MQ server for "create database" messages //func MQCreateDBQueue(Nq neoq.Neoq) (queue amqp.Queue, err error) { // queue, err = channel.QueueDeclare("create_queue", true, false, false, false, nil) diff --git a/common/postgresql_mq.go b/common/postgresql_mq.go index 3c8a10bac..cbe13db4b 100644 --- a/common/postgresql_mq.go +++ b/common/postgresql_mq.go @@ -2,7 +2,9 @@ package common import ( "context" + "encoding/json" "errors" + "fmt" "log" ) @@ -33,3 +35,33 @@ func MQGetResponse(jobID string) (resp map[string]any, err error){ } return } + +// MQResponse stores an MQ response in the backend database +func MQResponse(jobID string, nodeName string, responseData any) (err error) { + var z []byte + z, err = json.Marshal(responseData) + if err != nil { + log.Println(err) + // It's super unlikely we can safely return here without ack-ing the message. So as something has gone + // wrong with json.Marshall() we'd better just attempt passing back info about that error message instead (!) + z = []byte(fmt.Sprintf(`{"node":"%s","error":"%s"}`, nodeName, err.Error())) // This is a LiveDBErrorResponse structure + } + +log.Printf("MQResponse(): %s", z) + + dbQuery := ` + INSERT INTO neoq_job_responses (job_id, status, payload) + VALUES ($1, $2, $3)` + _, err = pdb.Exec(context.Background(), dbQuery, jobID, 0, responseData) + if err != nil { + log.Printf("Error when storing an MQ job response in the database: %v", err) + return + } + + // FIXME: Mark the request as completed + + if MQDebug > 0 { + log.Printf("Live node '%s' responded to request with jobID: '%s'", nodeName, jobID) + } + return +} diff --git a/live/main.go b/live/main.go index b222e774e..777455a00 100644 --- a/live/main.go +++ b/live/main.go @@ -2,23 +2,19 @@ package main // Internal daemon for running SQLite queries sent by the other DBHub.io daemons -// FIXME: Note that all incoming AMQP requests _other_ than for database creation -// are handled by the same single goroutine. This should be changed to -// something smarter, such as using a pool of worker goroutines to handle -// the requests. - import ( "context" "encoding/json" "errors" "fmt" - "github.com/acaloiaro/neoq/handler" - "github.com/acaloiaro/neoq/jobs" "io/fs" "log" "os" "path/filepath" + "strconv" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" sqlite "github.com/gwenn/gosqlite" com "github.com/sqlitebrowser/dbhub.io/common" ) @@ -68,457 +64,431 @@ func main() { } // Connect to MQ server - Nq, err := com.ConnectMQ() + com.Nq, err = com.ConnectMQ() if err != nil { log.Fatal(err) } - //// Create queue for receiving new database creation requests - //createQueue, err := com.MQCreateDBQueue(Nq) - //if err != nil { - // log.Fatal(err) - //} + // Set up the handler that listens for database creation requests + h := handler.New("create_queue", LiveCreateDBHandler) + ctx := context.Background() + com.Nq.Start(ctx, h) - // Start consuming database creation requests - // create a handler that listens for new job on the "create_queue" queue - h := handler.New("create_queue", func(ctx context.Context) (err error) { - j, _ := jobs.FromContext(ctx) + // Start consuming database query requests + queryHandler := handler.New(com.Conf.Live.Nodename, LiveQueryDBHandler) + ctx2 := context.Background() // TODO: Can this re-use the same context (ctx) as above? + com.Nq.Start(ctx2, queryHandler) - log.Printf("Hello, %s!", j.Payload["Name"]) + log.Printf("Live server '%s' listening for requests", com.Conf.Live.Nodename) - // Decode JSON request - var req com.LiveDBRequest - err = json.Unmarshal(j.Payload["body"].([]byte), &req) - if err != nil { - log.Println(err) - err = com.MQCreateResponse(com.Conf.Live.Nodename, "failure") - //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") - if err != nil { - log.Printf("Error: occurred on live node '%s' in the create db code, while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + // Endless loop + var forever chan struct{} + <-forever - // Verify that the object ID was passed through the interface correctly - objectID, ok := req.Data.(string) - if !ok { - err = com.MQCreateResponse(com.Conf.Live.Nodename, "failure") - //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") - if err != nil { - log.Printf("Error: occurred on live node '%s' in the create db code, while converting the Minio object ID to a string: '%s'", com.Conf.Live.Nodename, err) - } - return - } + // Close the channel to the MQ server + com.Nq.Shutdown(ctx) + com.Nq.Shutdown(ctx2) +} - // Set up the live database locally - _, err = com.LiveRetrieveDatabaseMinio(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, objectID) - if err != nil { - log.Println(err) - err = com.MQCreateResponse(com.Conf.Live.Nodename, "failure") - //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") - if err != nil { - log.Printf("Error: occurred on live node '%s' in the create db code, while constructing an AMQP error message response (location 2): '%s'", com.Conf.Live.Nodename, err) +// RemoveLiveDB deletes a live database from the local node. For example, when the user deletes it from +// their account. +// Be aware, it leaves the database owners directory in place, to avoid any potential race condition of +// trying to delete that directory while other databases in their account are being worked with +func removeLiveDB(dbOwner, dbName string) (err error) { + // Get the path to the database file, and it's containing directory + dbDir := filepath.Join(com.Conf.Live.StorageDir, dbOwner, dbName) + dbPath := filepath.Join(dbDir, "live.sqlite") + if _, err = os.Stat(dbPath); err != nil { + if errors.Is(err, fs.ErrNotExist) { + if com.MQDebug > 0 { + log.Printf("Live node '%s': database file '%s/%s' was supposed to get deleted here, but was "+ + "missing from filesystem path: '%s'", com.Conf.Live.Nodename, dbOwner, dbName, dbPath) } return } - // Respond to the creation request with a success message - err = com.MQCreateResponse(com.Conf.Live.Nodename, "success") - //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "success") - if err != nil { - return - } - + // Something wrong with the database file + log.Println(err) return - }) + } - ctx := context.Background() - Nq.Start(ctx, h) + // Delete the "live.sqlite" file + // NOTE: If this seems to leave wal or other files hanging around in actual production use, we could + // instead use filepath.RemoveAll(dbDir). That should kill the containing directory and + // all files within, thus not leave anything hanging around + err = os.Remove(dbPath) + if err != nil { + log.Println(err) + return + } + // Remove the containing directory + err = os.Remove(dbDir) + if err != nil { + log.Println(err) + return + } - //createDBMsgs, err := ch.Consume(createQueue.Name, "", false, false, false, false, nil) - //if err != nil { - // log.Fatal(err) - //} + if com.MQDebug > 0 { + log.Printf("Live node '%s': Database file '%s/%s' removed from filesystem path: '%s'", + com.Conf.Live.Nodename, dbOwner, dbName, dbPath) + } + return +} - //// Create the queue for receiving database queries - //queryQueue, err := com.MQCreateQueryQueue(ch, com.Conf.Live.Nodename) - //if err != nil { - // log.Fatal(err) - //} +// LiveCreateDBHandler receives database creation requests for the Live daemon +func LiveCreateDBHandler(ctx context.Context) (err error) { + j, err := jobs.FromContext(ctx) + if err != nil { + return + } - // Start consuming database query requests - //requests, err := ch.Consume(queryQueue.Name, "", false, false, false, false, nil) - //if err != nil { - // log.Fatal(err) - //} - //go func() { - // for msg := range requests { + log.Printf("Hello, %s!", j.Payload["Name"]) - queryHandler := handler.New(com.Conf.Live.Nodename, func(ctx context.Context) (err error) { - j, _ := jobs.FromContext(ctx) + // Decode JSON request + var req com.LiveDBRequest + err = json.Unmarshal(j.Payload["body"].([]byte), &req) + if err != nil { + log.Println(err) + err = com.MQCreateResponse(com.Conf.Live.Nodename, "failure") + //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") + if err != nil { + log.Printf("Error: occurred on live node '%s' in the create db code, while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) + } + return + } - if com.MQDebug > 1 { - log.Printf("'%s' received AMQP REQUEST (of not-yet-determined type)", com.Conf.Live.Nodename) + // Verify that the object ID was passed through the interface correctly + objectID, ok := req.Data.(string) + if !ok { + err = com.MQCreateResponse(com.Conf.Live.Nodename, "failure") + //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") + if err != nil { + log.Printf("Error: occurred on live node '%s' in the create db code, while converting the Minio object ID to a string: '%s'", com.Conf.Live.Nodename, err) } + return + } - // Decode JSON request - var req com.LiveDBRequest - err = json.Unmarshal(j.Payload["body"].([]byte), &req) + // Set up the live database locally + _, err = com.LiveRetrieveDatabaseMinio(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, objectID) + if err != nil { + log.Println(err) + err = com.MQCreateResponse(com.Conf.Live.Nodename, "failure") + //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "failure") if err != nil { - resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} - err = com.MQResponse("NOT-YET-DETERMINED", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("NOT-YET-DETERMINED", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' the main live node switch{} while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return + log.Printf("Error: occurred on live node '%s' in the create db code, while constructing an AMQP error message response (location 2): '%s'", com.Conf.Live.Nodename, err) } + return + } - //if com.MQDebug > 1 { - // log.Printf("Decoded request on '%s'. Correlation ID: '%s', request operation: '%s', request query: '%v'", com.Conf.Live.Nodename, msg.CorrelationId, req.Operation, req.Data) - //} else if com.MQDebug == 1 { - // log.Printf("Decoded request on '%s'. Correlation ID: '%s', request operation: '%s'", com.Conf.Live.Nodename, msg.CorrelationId, req.Operation) - //} + // Respond to the creation request with a success message + err = com.MQCreateResponse(com.Conf.Live.Nodename, "success") + //err = com.MQCreateResponse(d, ch, com.Conf.Live.Nodename, "success") + if err != nil { + return + } - // Handle each operation - switch req.Operation { - case "backup": - err = com.SQLiteBackupLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) - if err != nil { - resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} - err = com.MQResponse("BACKUP", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + return +} - if com.MQDebug > 0 { - log.Printf("Running [BACKUP] on '%s/%s'", req.DBOwner, req.DBName) - } +// LiveQueryDBHandler receives database query requests for the Live daemon +func LiveQueryDBHandler(ctx context.Context) (err error) { + j, _ := jobs.FromContext(ctx) + + if com.MQDebug > 1 { + log.Printf("'%s' received MQ REQUEST (of not-yet-determined type)", com.Conf.Live.Nodename) + } + + // Assemble the request + var req com.LiveDBRequest + req.DBOwner = j.Payload["dbowner"].(string) + req.DBName = j.Payload["dbname"].(string) + req.RequestingUser = j.Payload["requestinguser"].(string) + req.Operation = j.Payload["operation"].(string) + + if com.MQDebug > 1 { + log.Printf("Decoded request on '%s'. Job ID: '%d', request operation: '%s', request query: '%v'", com.Conf.Live.Nodename, j.ID, req.Operation, req.Data) + } else if com.MQDebug == 1 { + log.Printf("Decoded request on '%s'. Job ID: '%d', request operation: '%s'", com.Conf.Live.Nodename, j.ID, req.Operation) + } - // Return a success message to the caller - resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: ""} // Use an empty error message to indicate success + // Handle each operation + switch j.Payload["operation"] { + case "backup": + err = com.SQLiteBackupLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} err = com.MQResponse("BACKUP", com.Conf.Live.Nodename, resp) //err = com.MQResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP backup response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "columns": - columns, pk, err, errCode := com.SQLiteGetColumnsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, fmt.Sprintf("%s", req.Data)) - if err != nil { - resp := com.LiveDBColumnsResponse{Node: com.Conf.Live.Nodename, Columns: []sqlite.Column{}, PkColumns: nil, Error: err.Error(), ErrCode: errCode} - err = com.MQResponse("COLUMNS", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("COLUMNS", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [BACKUP] on '%s/%s'", req.DBOwner, req.DBName) + } - if com.MQDebug > 0 { - log.Printf("Running [COLUMNS] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) - } + // Return a success message to the caller + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: ""} // Use an empty error message to indicate success + err = com.MQResponse("BACKUP", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP backup response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the columns list to the caller - resp := com.LiveDBColumnsResponse{Node: com.Conf.Live.Nodename, Columns: columns, PkColumns: pk, Error: "", ErrCode: com.AMQPNoError} + case "columns": + var columns []sqlite.Column + var pk []string + var errCode com.AMQPErrorCode + columns, pk, err, errCode = com.SQLiteGetColumnsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, fmt.Sprintf("%s", req.Data)) + if err != nil { + resp := com.LiveDBColumnsResponse{Node: com.Conf.Live.Nodename, Columns: []sqlite.Column{}, PkColumns: nil, Error: err.Error(), ErrCode: errCode} err = com.MQResponse("COLUMNS", com.Conf.Live.Nodename, resp) //err = com.MQResponse("COLUMNS", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP columns list response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "delete": - // Delete the database file on the node - err = removeLiveDB(req.DBOwner, req.DBName) - if err != nil { - resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} - err = com.MQResponse("DELETE", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("DELETE", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [COLUMNS] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) + } - if com.MQDebug > 0 { - log.Printf("Running [DELETE] on '%s/%s'", req.DBOwner, req.DBName) - } + // Return the columns list to the caller + resp := com.LiveDBColumnsResponse{Node: com.Conf.Live.Nodename, Columns: columns, PkColumns: pk, Error: "", ErrCode: com.AMQPNoError} + err = com.MQResponse("COLUMNS", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("COLUMNS", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP columns list response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return a success message (empty string in this case) to the caller - resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: ""} + case "delete": + // Delete the database file on the node + err = removeLiveDB(req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} err = com.MQResponse("DELETE", com.Conf.Live.Nodename, resp) //err = com.MQResponse("DELETE", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP delete database response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "execute": - // Execute a SQL statement on the database file - var rowsChanged int - rowsChanged, err = com.SQLiteExecuteQueryLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.RequestingUser, fmt.Sprintf("%s", req.Data)) - if err != nil { - resp := com.LiveDBExecuteResponse{Node: com.Conf.Live.Nodename, RowsChanged: 0, Error: err.Error()} - err = com.MQResponse("EXECUTE", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("EXECUTE", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [DELETE] on '%s/%s'", req.DBOwner, req.DBName) + } - if com.MQDebug > 0 { - log.Printf("Running [EXECUTE] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) - } + // Return a success message (empty string in this case) to the caller + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: ""} + err = com.MQResponse("DELETE", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("DELETE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP delete database response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return a success message to the caller - resp := com.LiveDBExecuteResponse{Node: com.Conf.Live.Nodename, RowsChanged: rowsChanged, Error: ""} + case "execute": + // Execute a SQL statement on the database file + var rowsChanged int + rowsChanged, err = com.SQLiteExecuteQueryLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.RequestingUser, fmt.Sprintf("%s", req.Data)) + if err != nil { + resp := com.LiveDBExecuteResponse{Node: com.Conf.Live.Nodename, RowsChanged: 0, Error: err.Error()} err = com.MQResponse("EXECUTE", com.Conf.Live.Nodename, resp) //err = com.MQResponse("EXECUTE", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP execute query response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "indexes": - var indexes []com.APIJSONIndex - indexes, err = com.SQLiteGetIndexesLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) - if err != nil { - resp := com.LiveDBIndexesResponse{Node: com.Conf.Live.Nodename, Indexes: []com.APIJSONIndex{}, Error: err.Error()} - err = com.MQResponse("INDEXES", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("INDEXES", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [EXECUTE] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) + } - if com.MQDebug > 0 { - log.Printf("Running [INDEXES] on '%s/%s'", req.DBOwner, req.DBName) - } + // Return a success message to the caller + resp := com.LiveDBExecuteResponse{Node: com.Conf.Live.Nodename, RowsChanged: rowsChanged, Error: ""} + err = com.MQResponse("EXECUTE", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("EXECUTE", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP execute query response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the indexes list to the caller - resp := com.LiveDBIndexesResponse{Node: com.Conf.Live.Nodename, Indexes: indexes, Error: ""} + case "indexes": + var indexes []com.APIJSONIndex + indexes, err = com.SQLiteGetIndexesLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBIndexesResponse{Node: com.Conf.Live.Nodename, Indexes: []com.APIJSONIndex{}, Error: err.Error()} err = com.MQResponse("INDEXES", com.Conf.Live.Nodename, resp) //err = com.MQResponse("INDEXES", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP indexes list response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "query": - var rows com.SQLiteRecordSet - rows, err = com.SQLiteRunQueryLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.RequestingUser, fmt.Sprintf("%s", req.Data)) - if err != nil { - resp := com.LiveDBQueryResponse{Node: com.Conf.Live.Nodename, Results: com.SQLiteRecordSet{}, Error: err.Error()} - err = com.MQResponse("QUERY", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("QUERY", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [INDEXES] on '%s/%s'", req.DBOwner, req.DBName) + } - if com.MQDebug > 0 { - log.Printf("Running [QUERY] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) - } + // Return the indexes list to the caller + resp := com.LiveDBIndexesResponse{Node: com.Conf.Live.Nodename, Indexes: indexes, Error: ""} + err = com.MQResponse("INDEXES", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("INDEXES", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP indexes list response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the query response to the caller - resp := com.LiveDBQueryResponse{Node: com.Conf.Live.Nodename, Results: rows, Error: ""} + case "query": + var rows com.SQLiteRecordSet + rows, err = com.SQLiteRunQueryLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.RequestingUser, fmt.Sprintf("%s", req.Data)) + if err != nil { + resp := com.LiveDBQueryResponse{Node: com.Conf.Live.Nodename, Results: com.SQLiteRecordSet{}, Error: err.Error()} err = com.MQResponse("QUERY", com.Conf.Live.Nodename, resp) //err = com.MQResponse("QUERY", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP query response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "rowdata": - // Extract the request information - // FIXME: Add type checks for safety instead of blind coercing - reqData := req.Data.(map[string]interface{}) - dbTable := reqData["db_table"].(string) - sortCol := reqData["sort_col"].(string) - sortDir := reqData["sort_dir"].(string) - commitID := reqData["commit_id"].(string) - maxRows := int(reqData["max_rows"].(float64)) - rowOffset := int(reqData["row_offset"].(float64)) - - // Open the SQLite database and read the row data - resp := com.LiveDBRowsResponse{Node: com.Conf.Live.Nodename, RowData: com.SQLiteRecordSet{}} - resp.Tables, resp.DefaultTable, resp.RowData, resp.DatabaseSize, err = - com.SQLiteReadDatabasePage("", "", req.RequestingUser, req.DBOwner, req.DBName, dbTable, sortCol, sortDir, commitID, rowOffset, maxRows, true) - if err != nil { - resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} - err = com.MQResponse("ROWDATA", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("ROWDATA", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [QUERY] on '%s/%s': '%s'", req.DBOwner, req.DBName, req.Data) + } - if com.MQDebug > 0 { - log.Printf("Running [ROWDATA] on '%s/%s'", req.DBOwner, req.DBName) - } + // Return the query response to the caller + resp := com.LiveDBQueryResponse{Node: com.Conf.Live.Nodename, Results: rows, Error: ""} + err = com.MQResponse("QUERY", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("QUERY", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP query response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the row data to the caller + case "rowdata": + // Extract the request information + // FIXME: Add type checks for safety instead of blind coercing + reqData := req.Data.(map[string]interface{}) + dbTable := reqData["db_table"].(string) + sortCol := reqData["sort_col"].(string) + sortDir := reqData["sort_dir"].(string) + commitID := reqData["commit_id"].(string) + maxRows := int(reqData["max_rows"].(float64)) + rowOffset := int(reqData["row_offset"].(float64)) + + // Open the SQLite database and read the row data + resp := com.LiveDBRowsResponse{Node: com.Conf.Live.Nodename, RowData: com.SQLiteRecordSet{}} + resp.Tables, resp.DefaultTable, resp.RowData, resp.DatabaseSize, err = + com.SQLiteReadDatabasePage("", "", req.RequestingUser, req.DBOwner, req.DBName, dbTable, sortCol, sortDir, commitID, rowOffset, maxRows, true) + if err != nil { + resp := com.LiveDBErrorResponse{Node: com.Conf.Live.Nodename, Error: err.Error()} err = com.MQResponse("ROWDATA", com.Conf.Live.Nodename, resp) //err = com.MQResponse("ROWDATA", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP query response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "size": - dbPath := filepath.Join(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, "live.sqlite") - var db os.FileInfo - db, err = os.Stat(dbPath) - if err != nil { - resp := com.LiveDBSizeResponse{Node: com.Conf.Live.Nodename, Size: 0, Error: err.Error()} - err = com.MQResponse("SIZE", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("SIZE", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [ROWDATA] on '%s/%s'", req.DBOwner, req.DBName) + } - if com.MQDebug > 0 { - log.Printf("Running [SIZE] on '%s/%s'", req.DBOwner, req.DBName) - } + // Return the row data to the caller + err = com.MQResponse("ROWDATA", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("ROWDATA", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP query response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the database size to the caller - resp := com.LiveDBSizeResponse{Node: com.Conf.Live.Nodename, Size: db.Size(), Error: ""} - err = com.MQResponse("SIZE", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("SIZE", msg, ch, com.Conf.Live.Nodename, resp) + case "size": + dbPath := filepath.Join(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, "live.sqlite") + var db os.FileInfo + db, err = os.Stat(dbPath) + if err != nil { + resp := com.LiveDBSizeResponse{Node: com.Conf.Live.Nodename, Size: 0, Error: err.Error()} + err = com.MQResponse(strconv.FormatInt(j.ID, 10), com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP size response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "tables": - var tables []string - tables, err = com.SQLiteGetTablesLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) - if err != nil { - resp := com.LiveDBTablesResponse{Node: com.Conf.Live.Nodename, Tables: nil, Error: err.Error()} - err = com.MQResponse("TABLES", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("TABLES", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [SIZE] on '%s/%s'", req.DBOwner, req.DBName) + } - if com.MQDebug > 0 { - log.Printf("Running [TABLES] on '%s/%s'", req.DBOwner, req.DBName) - } + // Return the database size to the caller + resp := com.LiveDBSizeResponse{Node: com.Conf.Live.Nodename, Size: db.Size(), Error: ""} + err = com.MQResponse(strconv.FormatInt(j.ID, 10), com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing a size response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the tables list to the caller - resp := com.LiveDBTablesResponse{Node: com.Conf.Live.Nodename, Tables: tables, Error: ""} + case "tables": + var tables []string + tables, err = com.SQLiteGetTablesLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBTablesResponse{Node: com.Conf.Live.Nodename, Tables: nil, Error: err.Error()} err = com.MQResponse("TABLES", com.Conf.Live.Nodename, resp) //err = com.MQResponse("TABLES", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP tables list response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return + } - case "views": - var views []string - views, err = com.SQLiteGetViewsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) - if err != nil { - resp := com.LiveDBViewsResponse{Node: com.Conf.Live.Nodename, Views: nil, Error: err.Error()} - err = com.MQResponse("VIEWS", com.Conf.Live.Nodename, resp) - //err = com.MQResponse("VIEWS", msg, ch, com.Conf.Live.Nodename, resp) - if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) - } - return - } + if com.MQDebug > 0 { + log.Printf("Running [TABLES] on '%s/%s'", req.DBOwner, req.DBName) + } - if com.MQDebug > 0 { - log.Printf("Running [VIEWS] on '%s/%s'", req.DBOwner, req.DBName) - } + // Return the tables list to the caller + resp := com.LiveDBTablesResponse{Node: com.Conf.Live.Nodename, Tables: tables, Error: ""} + err = com.MQResponse("TABLES", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("TABLES", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP tables list response: '%s'", com.Conf.Live.Nodename, err) + } + return - // Return the views list to the caller - resp := com.LiveDBViewsResponse{Node: com.Conf.Live.Nodename, Views: views, Error: ""} + case "views": + var views []string + views, err = com.SQLiteGetViewsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName) + if err != nil { + resp := com.LiveDBViewsResponse{Node: com.Conf.Live.Nodename, Views: nil, Error: err.Error()} err = com.MQResponse("VIEWS", com.Conf.Live.Nodename, resp) //err = com.MQResponse("VIEWS", msg, ch, com.Conf.Live.Nodename, resp) if err != nil { - log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP views list response: '%s'", com.Conf.Live.Nodename, err) + log.Printf("Error: occurred on '%s' in MQResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err) } return - - default: - log.Printf("'%s' received unknown '%s' request on this queue for %s/%s", com.Conf.Live.Nodename, req.Operation, req.DBOwner, req.DBName) } - return - }) - - - ctx2 := context.Background() - Nq.Start(ctx2, queryHandler) - - log.Printf("Live server '%s' listening for requests", com.Conf.Live.Nodename) - - // Endless loop - var forever chan struct{} - <-forever - - // Close the channel to the MQ server - Nq.Shutdown(ctx) - Nq.Shutdown(ctx2) - //_ = com.CloseMQChannel(ch) -} - -// RemoveLiveDB deletes a live database from the local node. For example, when the user deletes it from -// their account. -// Be aware, it leaves the database owners directory in place, to avoid any potential race condition of -// trying to delete that directory while other databases in their account are being worked with -func removeLiveDB(dbOwner, dbName string) (err error) { - // Get the path to the database file, and it's containing directory - dbDir := filepath.Join(com.Conf.Live.StorageDir, dbOwner, dbName) - dbPath := filepath.Join(dbDir, "live.sqlite") - if _, err = os.Stat(dbPath); err != nil { - if errors.Is(err, fs.ErrNotExist) { - if com.MQDebug > 0 { - log.Printf("Live node '%s': database file '%s/%s' was supposed to get deleted here, but was "+ - "missing from filesystem path: '%s'", com.Conf.Live.Nodename, dbOwner, dbName, dbPath) - } - return + if com.MQDebug > 0 { + log.Printf("Running [VIEWS] on '%s/%s'", req.DBOwner, req.DBName) } - // Something wrong with the database file - log.Println(err) - return - } - - // Delete the "live.sqlite" file - // NOTE: If this seems to leave wal or other files hanging around in actual production use, we could - // instead use filepath.RemoveAll(dbDir). That should kill the containing directory and - // all files within, thus not leave anything hanging around - err = os.Remove(dbPath) - if err != nil { - log.Println(err) + // Return the views list to the caller + resp := com.LiveDBViewsResponse{Node: com.Conf.Live.Nodename, Views: views, Error: ""} + err = com.MQResponse("VIEWS", com.Conf.Live.Nodename, resp) + //err = com.MQResponse("VIEWS", msg, ch, com.Conf.Live.Nodename, resp) + if err != nil { + log.Printf("Error: occurred on '%s' in MQResponse() while constructing the AMQP views list response: '%s'", com.Conf.Live.Nodename, err) + } return - } - // Remove the containing directory - err = os.Remove(dbDir) - if err != nil { - log.Println(err) - return + default: + log.Printf("'%s' received unknown '%s' request on this queue for %s/%s", com.Conf.Live.Nodename, req.Operation, req.DBOwner, req.DBName) } - if com.MQDebug > 0 { - log.Printf("Live node '%s': Database file '%s/%s' removed from filesystem path: '%s'", - com.Conf.Live.Nodename, dbOwner, dbName, dbPath) - } return -} +} \ No newline at end of file diff --git a/webui/pages.go b/webui/pages.go index dbd21e16d..934843ee4 100644 --- a/webui/pages.go +++ b/webui/pages.go @@ -1524,6 +1524,7 @@ func prefPage(w http.ResponseWriter, r *http.Request, loggedInUser string) { } } +// profilePage generates the web page for the logged in user looking at their own profile page func profilePage(w http.ResponseWriter, r *http.Request, userName string) { var pageData struct { PageMeta PageMetaInfo