Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Add Readers and Consumers SDK #33

Merged
merged 11 commits into from
Jan 13, 2025
6 changes: 3 additions & 3 deletions apidocs/openapi/notifiers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ info:
license:
name: Apache 2.0
url: https://github.com/absmach/magistrala/blob/main/LICENSE
version: 0.14.0
version: 0.15.1

servers:
- url: http://localhost:9014
Expand Down Expand Up @@ -44,7 +44,7 @@ paths:
"400":
description: Failed due to malformed JSON.
"401":
description: Missing or invalid access token provided.
description: Missing or invalid access token provided.
"403":
description: Failed to perform authorization over the entity.
"409":
Expand Down Expand Up @@ -278,7 +278,7 @@ components:
content:
application/health+json:
schema:
$ref: "./schemas/HealthInfo.yml"
$ref: "./schemas/health_info.yml"

securitySchemes:
bearerAuth:
Expand Down
4 changes: 2 additions & 2 deletions apidocs/openapi/readers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ info:
license:
name: Apache 2.0
url: https://github.com/absmach/magistrala/blob/main/LICENSE
version: 0.14.0
version: 0.15.1

servers:
- url: http://localhost:9003
Expand Down Expand Up @@ -292,7 +292,7 @@ components:
content:
application/health+json:
schema:
$ref: "./schemas/HealthInfo.yml"
$ref: "./schemas/health_info.yml"

securitySchemes:
bearerAuth:
Expand Down
2 changes: 1 addition & 1 deletion cmd/postgres-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/postgres"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
Expand All @@ -23,7 +24,6 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion cmd/timescale-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/timescale"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
Expand All @@ -23,7 +24,6 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package api
import (
"context"

notifiers "github.com/absmach/magistrala/consumers/notifiers"
apiutil "github.com/absmach/supermq/api/http/util"
notifiers "github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-kit/kit/endpoint"
)
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"strings"
"testing"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/api"
"github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/consumers/notifiers"
smqlog "github.com/absmach/supermq/logger"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/uuid"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"log/slog"
"time"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
)

var _ notifiers.Service = (*loggingMiddleware)(nil)
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"context"
"time"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/go-kit/kit/metrics"
)

Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"net/http"
"strings"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-chi/chi/v5"
kithttp "github.com/go-kit/kit/transport/http"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consumers/notifiers/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions consumers/notifiers/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

package notifiers

import (
"errors"

"github.com/absmach/supermq/pkg/messaging"
)

// ErrNotify wraps sending notification errors.
var ErrNotify = errors.New("error sending notification")

// Notifier represents an API for sending notification.
//
//go:generate mockery --name Notifier --output=./mocks --filename notifier.go --quiet --note "Copyright (c) Abstract Machines"
type Notifier interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to keep the Notifier service (CRUD for Subscriptions) in Magistrala, but please move the Notifier interface to SuperMQ /consumers package.

// Notify method is used to send notification for the
// received message to the provided list of receivers.
Notify(from string, to []string, msg *messaging.Message) error
}
2 changes: 1 addition & 1 deletion consumers/notifiers/postgres/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"fmt"
"strings"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
"github.com/jackc/pgerrcode"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/postgres/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"fmt"
"testing"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/postgres"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
"github.com/stretchr/testify/assert"
Expand Down
9 changes: 4 additions & 5 deletions consumers/notifiers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
notif "github.com/absmach/supermq/consumers/notifiers"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
Expand Down Expand Up @@ -47,13 +46,13 @@
authn smqauthn.Authentication
subs SubscriptionsRepository
idp supermq.IDProvider
notifier notif.Notifier
notifier Notifier
errCh chan error
from string
}

// New instantiates the subscriptions service implementation.
func New(authn smqauthn.Authentication, subs SubscriptionsRepository, idp supermq.IDProvider, notifier notif.Notifier, from string) Service {
func New(authn smqauthn.Authentication, subs SubscriptionsRepository, idp supermq.IDProvider, notifier Notifier, from string) Service {
return &notifierService{
authn: authn,
subs: subs,
Expand Down Expand Up @@ -132,7 +131,7 @@
if len(to) > 0 {
err := ns.notifier.Notify(ns.from, to, msg)
if err != nil {
return errors.Wrap(notif.ErrNotify, err)
return errors.Wrap(ErrNotify, err)

Check warning on line 134 in consumers/notifiers/service.go

View check run for this annotation

Codecov / codecov/patch

consumers/notifiers/service.go#L134

Added line #L134 was not covered by tests
}
}

Expand Down Expand Up @@ -166,7 +165,7 @@
}
if len(to) > 0 {
if err := ns.notifier.Notify(ns.from, to, msg); err != nil {
ns.errCh <- errors.Wrap(notif.ErrNotify, err)
ns.errCh <- errors.Wrap(ErrNotify, err)

Check warning on line 168 in consumers/notifiers/service.go

View check run for this annotation

Codecov / codecov/patch

consumers/notifiers/service.go#L168

Added line #L168 was not covered by tests
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"fmt"
"testing"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/supermq/consumers/notifiers"
smqauthn "github.com/absmach/supermq/pkg/authn"
authnmocks "github.com/absmach/supermq/pkg/authn/mocks"
"github.com/absmach/supermq/pkg/errors"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/smpp/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package smpp
import (
"time"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers"
"github.com/absmach/supermq/pkg/transformers/json"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/smtp/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package smtp
import (
"fmt"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/internal/email"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/messaging"
)

Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/tracing/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package tracing
import (
"context"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/certs v0.0.0-20241209153600-91270de67b5a
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50
github.com/absmach/supermq v0.16.1-0.20250110102639-a9169276e54c
github.com/authzed/authzed-go v1.2.1
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
github.com/caarlos0/env/v11 v11.3.1
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fiorix/go-smpp v0.0.0-20210403173735-2894b96e70ba
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-chi/chi/v5 v5.2.0
github.com/go-kit/kit v0.13.0
github.com/gofrs/uuid/v5 v5.3.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.6 h1:WPeIl6vQ00k7ghWSZYT/QP0KUxq2+4zQoaC7240pLFk=
github.com/absmach/senml v1.0.6/go.mod h1:QnJNPy1DJPy0+qUW21PTcH/xoh0LgfYZxTfwriMIvmQ=
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50 h1:ndn1Z9wxUIH5chingm2hy3ZhIMt0+lDjD/CFaBEULbY=
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50/go.mod h1:VihyvWijocoz2yhXGAL+qHtid24O+qL/N2lxP2vRf/c=
github.com/absmach/supermq v0.16.1-0.20250110102639-a9169276e54c h1:s2OxO+rV1PMm/H2jqWVG8IF+HCVfawt8nGN/gY+SIa0=
github.com/absmach/supermq v0.16.1-0.20250110102639-a9169276e54c/go.mod h1:As0UgktURYeC5/SvC269WfdG9satLst8CQcxc2dC02E=
github.com/authzed/authzed-go v1.2.1 h1:o54aIs0ocDfVJl/rfIt/75vrb6z+tgPuXjMlSsSEwH0=
github.com/authzed/authzed-go v1.2.1/go.mod h1:/+NblSrzA6Lm6vUO3fqZyLh8MDCLUQq2AyJMlHb32DE=
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b h1:wbh8IK+aMLTCey9sZasO7b6BWLAJnHHvb79fvWCXwxw=
Expand Down Expand Up @@ -99,8 +99,6 @@ github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec=
github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-gorp/gorp/v3 v3.1.0 h1:ItKF/Vbuj31dmV4jxA1qblpSwkl9g1typ24xoe70IGs=
Expand Down
87 changes: 87 additions & 0 deletions pkg/sdk/consumers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

package sdk

import (
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/absmach/supermq/pkg/errors"
)

const subscriptionEndpoint = "subscriptions"

type Subscription struct {
ID string `json:"id,omitempty"`
OwnerID string `json:"owner_id,omitempty"`
Topic string `json:"topic,omitempty"`
Contact string `json:"contact,omitempty"`
}

func (sdk mgSDK) CreateSubscription(topic, contact, token string) (string, errors.SDKError) {
sub := Subscription{
Topic: topic,
Contact: contact,
}
data, err := json.Marshal(sub)
if err != nil {
return "", errors.NewSDKError(err)
}

url := fmt.Sprintf("%s/%s", sdk.usersURL, subscriptionEndpoint)

headers, _, sdkerr := sdk.processRequest(http.MethodPost, url, token, data, nil, http.StatusCreated)
if sdkerr != nil {
return "", sdkerr
}

id := strings.TrimPrefix(headers.Get("Location"), fmt.Sprintf("/%s/", subscriptionEndpoint))

return id, nil
}

func (sdk mgSDK) ListSubscriptions(pm PageMetadata, token string) (SubscriptionPage, errors.SDKError) {
url, err := sdk.withQueryParams(sdk.usersURL, subscriptionEndpoint, pm)
if err != nil {
return SubscriptionPage{}, errors.NewSDKError(err)
}

_, body, sdkerr := sdk.processRequest(http.MethodGet, url, token, nil, nil, http.StatusOK)
if sdkerr != nil {
return SubscriptionPage{}, sdkerr
}

var sp SubscriptionPage
if err := json.Unmarshal(body, &sp); err != nil {
return SubscriptionPage{}, errors.NewSDKError(err)
}

return sp, nil
}

func (sdk mgSDK) ViewSubscription(id, token string) (Subscription, errors.SDKError) {
url := fmt.Sprintf("%s/%s/%s", sdk.usersURL, subscriptionEndpoint, id)

_, body, err := sdk.processRequest(http.MethodGet, url, token, nil, nil, http.StatusOK)
if err != nil {
return Subscription{}, err
}

var sub Subscription
if err := json.Unmarshal(body, &sub); err != nil {
return Subscription{}, errors.NewSDKError(err)
}

return sub, nil
}

func (sdk mgSDK) DeleteSubscription(id, token string) errors.SDKError {
url := fmt.Sprintf("%s/%s/%s", sdk.usersURL, subscriptionEndpoint, id)

_, _, err := sdk.processRequest(http.MethodDelete, url, token, nil, nil, http.StatusNoContent)

return err
}
Loading
Loading