Skip to content

Commit

Permalink
NOISSUE - Update timescale reader (#2085)
Browse files Browse the repository at this point in the history
Signed-off-by: Musilah <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Co-authored-by: Rodney Osodo <[email protected]>
  • Loading branch information
Musilah and rodneyosodo authored Mar 5, 2024
1 parent 2be34c4 commit 42d433a
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 27 deletions.
45 changes: 38 additions & 7 deletions api/openapi/readers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ servers:
- url: https://localhost:9009
- url: http://localhost:9011
- url: https://localhost:9011

tags:
- name: readers
description: Everything about your Readers
Expand Down Expand Up @@ -57,24 +57,26 @@ paths:
- $ref: "#/components/parameters/DataValue"
- $ref: "#/components/parameters/From"
- $ref: "#/components/parameters/To"
- $ref: "#/components/parameters/Aggregation"
- $ref: "#/components/parameters/Interval"
responses:
'200':
"200":
$ref: "#/components/responses/MessagesPageRes"
'400':
"400":
description: Failed due to malformed query parameters.
'401':
"401":
description: Missing or invalid access token provided.
'500':
"500":
$ref: "#/components/responses/ServiceError"
/health:
get:
summary: Retrieves service health check info.
tags:
- health
responses:
'200':
"200":
$ref: "#/components/responses/HealthRes"
'500':
"500":
$ref: "#/components/responses/ServiceError"

components:
Expand Down Expand Up @@ -226,13 +228,42 @@ components:
in: query
schema:
type: number
example: 1709218556069
required: false
To:
name: to
description: SenML message time in nanoseconds (integer part represents seconds).
in: query
schema:
type: number
example: 1709218757503
required: false
Aggregation:
name: aggregation
description: Aggregation function.
in: query
schema:
type: string
enum:
- MAX
- AVG
- MIN
- SUM
- COUNT
- max
- min
- sum
- avg
- count
example: MAX
required: false
Interval:
name: interval
description: Aggregation interval.
in: query
schema:
type: string
example: 10s
required: false

responses:
Expand Down
8 changes: 5 additions & 3 deletions cli/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ var cmdMessages = []cobra.Command{
logUsage(cmd.Use)
return
}
pageMetadata := mgxsdk.PageMetadata{
Offset: Offset,
Limit: Limit,
pageMetadata := mgxsdk.MessagePageMetadata{
PageMetadata: mgxsdk.PageMetadata{
Offset: Offset,
Limit: Limit,
},
}

m, err := sdk.ReadMessages(pageMetadata, args[0], args[1])
Expand Down
12 changes: 12 additions & 0 deletions internal/apiutil/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,16 @@ var (

// ErrRollbackTx indicates failed to rollback transaction.
ErrRollbackTx = errors.New("failed to rollback transaction")

// ErrInvalidAggregation indicates invalid aggregation value.
ErrInvalidAggregation = errors.New("invalid aggregation value")

// ErrInvalidInterval indicates invalid interval value.
ErrInvalidInterval = errors.New("invalid interval value")

// ErrMissingFrom indicates missing from value.
ErrMissingFrom = errors.New("missing from time value")

// ErrMissingTo indicates missing to value.
ErrMissingTo = errors.New("missing to time value")
)
43 changes: 38 additions & 5 deletions pkg/sdk/go/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/absmach/magistrala/internal/apiutil"
Expand All @@ -23,14 +25,14 @@ func (sdk mgSDK) SendMessage(chanName, msg, key string) errors.SDKError {
subtopicPart = fmt.Sprintf("/%s", strings.ReplaceAll(chanNameParts[1], ".", "/"))
}

url := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart)
reqURL := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart)

_, _, err := sdk.processRequest(http.MethodPost, url, ThingPrefix+key, []byte(msg), nil, http.StatusAccepted)
_, _, err := sdk.processRequest(http.MethodPost, reqURL, ThingPrefix+key, []byte(msg), nil, http.StatusAccepted)

return err
}

func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (MessagesPage, errors.SDKError) {
func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, token string) (MessagesPage, errors.SDKError) {
chanNameParts := strings.SplitN(chanName, ".", channelParts)
chanID := chanNameParts[0]
subtopicPart := ""
Expand All @@ -39,15 +41,15 @@ func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (Messages
}

readMessagesEndpoint := fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart)
url, err := sdk.withQueryParams(sdk.readerURL, readMessagesEndpoint, pm)
msgURL, err := sdk.withMessageQueryParams(sdk.readerURL, readMessagesEndpoint, pm)
if err != nil {
return MessagesPage{}, errors.NewSDKError(err)
}

header := make(map[string]string)
header["Content-Type"] = string(sdk.msgContentType)

_, body, sdkerr := sdk.processRequest(http.MethodGet, url, token, nil, header, http.StatusOK)
_, body, sdkerr := sdk.processRequest(http.MethodGet, msgURL, token, nil, header, http.StatusOK)
if sdkerr != nil {
return MessagesPage{}, sdkerr
}
Expand All @@ -69,3 +71,34 @@ func (sdk *mgSDK) SetContentType(ct ContentType) errors.SDKError {

return nil
}

func (sdk mgSDK) withMessageQueryParams(baseURL, endpoint string, mpm MessagePageMetadata) (string, error) {
b, err := json.Marshal(mpm)
if err != nil {
return "", err
}
q := map[string]interface{}{}
if err := json.Unmarshal(b, &q); err != nil {
return "", err
}
ret := url.Values{}
for k, v := range q {
switch t := v.(type) {
case string:
ret.Add(k, t)
case float64:
ret.Add(k, strconv.FormatFloat(t, 'f', -1, 64))
case uint64:
ret.Add(k, strconv.FormatUint(t, 10))
case int64:
ret.Add(k, strconv.FormatInt(t, 10))
case json.Number:
ret.Add(k, t.String())
case bool:
ret.Add(k, strconv.FormatBool(t))
}
}
qs := ret.Encode()

return fmt.Sprintf("%s/%s?%s", baseURL, endpoint, qs), nil
}
20 changes: 18 additions & 2 deletions pkg/sdk/go/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ var (
ErrInvalidJWT = errors.New("invalid JWT")
)

type MessagePageMetadata struct {
PageMetadata
Subtopic string `json:"subtopic,omitempty"`
Publisher string `json:"publisher,omitempty"`
Comparator string `json:"comparator,omitempty"`
BoolValue *bool `json:"vb,omitempty"`
StringValue string `json:"vs,omitempty"`
DataValue string `json:"vd,omitempty"`
From float64 `json:"from,omitempty"`
To float64 `json:"to,omitempty"`
Aggregation string `json:"aggregation,omitempty"`
Interval string `json:"interval,omitempty"`
Value float64 `json:"value,omitempty"`
Protocol string `json:"protocol,omitempty"`
}

type PageMetadata struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Expand Down Expand Up @@ -828,13 +844,13 @@ type SDK interface {
// ReadMessages read messages of specified channel.
//
// example:
// pm := sdk.PageMetadata{
// pm := sdk.MessagePageMetadata{
// Offset: 0,
// Limit: 10,
// }
// msgs, _ := sdk.ReadMessages(pm,"channelID", "token")
// fmt.Println(msgs)
ReadMessages(pm PageMetadata, chanID, token string) (MessagesPage, errors.SDKError)
ReadMessages(pm MessagePageMetadata, chanID, token string) (MessagesPage, errors.SDKError)

// SetContentType sets message content type.
//
Expand Down
8 changes: 4 additions & 4 deletions pkg/sdk/mocks/sdk.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 42d433a

Please sign in to comment.