diff --git a/.gitignore b/.gitignore index 38ac29fd..399b38b5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.o *.a *.so +*.env # Folders _obj diff --git a/enums.go b/enums.go index 01beb24e..350fd6dd 100644 --- a/enums.go +++ b/enums.go @@ -120,7 +120,7 @@ func (s PNChannelMembersInclude) String() string { return [...]string{"custom", "uuid", "uuid.custom"}[s-1] } -// PNMessageType is used as an enum to catgorize the Subscribe response. +// PNMessageType is used as an enum to categorize the Subscribe response. type PNMessageType int const ( @@ -136,8 +136,9 @@ const ( ) const ( + PNMessageTypeMessage PNMessageType = iota // PNMessageTypeSignal is to identify Signal the Subscribe response - PNMessageTypeSignal PNMessageType = 1 + iota + PNMessageTypeSignal // PNMessageTypeObjects is to identify Objects the Subscribe response PNMessageTypeObjects // PNMessageTypeMessageActions is to identify Actions the Subscribe response diff --git a/fetch_request.go b/fetch_request.go index de4253cd..5f1a3d9d 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -96,6 +96,18 @@ func (b *fetchBuilder) IncludeMessageType(withMessageType bool) *fetchBuilder { return b } +// IncludeType fetches the Message Type associated with the message +func (b *fetchBuilder) IncludeType(withType bool) *fetchBuilder { + b.opts.WithType = withType + return b +} + +// IncludeSpaceId fetches the Space Id associated with the message +func (b *fetchBuilder) IncludeSpaceId(withSpaceId bool) *fetchBuilder { + b.opts.WithSpaceId = withSpaceId + return b +} + // QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. func (b *fetchBuilder) QueryParam(queryParam map[string]string) *fetchBuilder { b.opts.QueryParam = queryParam @@ -126,6 +138,7 @@ func newFetchOpts(pubnub *PubNub, ctx Context, opts fetchOpts) *fetchOpts { } opts.WithUUID = true opts.WithMessageType = true + opts.WithType = true return &opts } @@ -139,6 +152,8 @@ type fetchOpts struct { WithMeta bool WithUUID bool WithMessageType bool + WithType bool + WithSpaceId bool // default: 100 Count int @@ -214,6 +229,8 @@ func (o *fetchOpts) buildQuery() (*url.Values, error) { q.Set("reverse", strconv.FormatBool(o.Reverse)) q.Set("include_meta", strconv.FormatBool(o.WithMeta)) q.Set("include_message_type", strconv.FormatBool(o.WithMessageType)) + q.Set("include_type", strconv.FormatBool(o.WithType)) + q.Set("include_space_id", strconv.FormatBool(o.WithSpaceId)) q.Set("include_uuid", strconv.FormatBool(o.WithUUID)) SetQueryParam(q, o.QueryParam) @@ -318,6 +335,12 @@ func (o *fetchOpts) fetchMessages(channels map[string]interface{}) map[string][] } } } + if t, ok := histResponse["type"]; ok { + histItem.Type = t.(string) + } + if sid, ok := histResponse["space_id"]; ok { + histItem.SpaceId = SpaceId(sid.(string)) + } if d, ok := histResponse["uuid"]; ok { histItem.UUID = d.(string) } @@ -395,16 +418,18 @@ type FetchResponseItem struct { Timetoken string `json:"timetoken"` UUID string `json:"uuid"` MessageType int `json:"message_type"` + Type string `json:"custom_message_type"` + SpaceId SpaceId `TODO: remove` Error error } // PNHistoryMessageActionsTypeMap is the struct used in the Fetch request that includes Message Actions type PNHistoryMessageActionsTypeMap struct { - ActionsTypeValues map[string][]PNHistoryMessageActionTypeVal `json:"-"` + ActionsTypeValues map[string][]PNHistoryMessageActionTypeVal } // PNHistoryMessageActionTypeVal is the struct used in the Fetch request that includes Message Actions type PNHistoryMessageActionTypeVal struct { - UUID string `json:"uuid"` - ActionTimetoken string `json:"actionTimetoken"` + UUID string + ActionTimetoken string } diff --git a/files_send_file.go b/files_send_file.go index 8f08cc75..f33527e9 100644 --- a/files_send_file.go +++ b/files_send_file.go @@ -48,6 +48,18 @@ func (b *sendFileBuilder) Meta(meta interface{}) *sendFileBuilder { return b } +func (b *sendFileBuilder) SpaceId(id SpaceId) *sendFileBuilder { + b.opts.SpaceId = id + + return b +} + +func (b *sendFileBuilder) Type(typ string) *sendFileBuilder { + b.opts.Type = typ + + return b +} + // ShouldStore if true the messages are stored in History func (b *sendFileBuilder) ShouldStore(store bool) *sendFileBuilder { b.opts.ShouldStore = store @@ -117,6 +129,8 @@ type sendFileOpts struct { CipherKey string TTL int Meta interface{} + SpaceId SpaceId + Type string ShouldStore bool QueryParam map[string]string @@ -236,7 +250,15 @@ func newPNSendFileResponse(jsonBytes []byte, o *sendFileOpts, maxCount := o.config().FileMessagePublishRetryLimit for !sent && tryCount < maxCount { tryCount++ - pubFileMessageResponse, pubFileResponseStatus, errPubFileResponse := o.pubnub.PublishFileMessage().TTL(o.TTL).Meta(o.Meta).ShouldStore(o.ShouldStore).Channel(o.Channel).Message(message).Execute() + pubFileMessageResponse, pubFileResponseStatus, errPubFileResponse := o.pubnub.PublishFileMessage(). + TTL(o.TTL). + Meta(o.Meta). + ShouldStore(o.ShouldStore). + Channel(o.Channel). + Message(message). + Type(o.Type). + SpaceId(o.SpaceId). + Execute() if errPubFileResponse != nil { if tryCount >= maxCount { pubFileResponseStatus.AdditionalData = file diff --git a/listener_manager.go b/listener_manager.go index c4f279ab..3fc71c64 100644 --- a/listener_manager.go +++ b/listener_manager.go @@ -270,6 +270,8 @@ type PNMessage struct { Subscription string Publisher string Timetoken int64 + Type string + SpaceId SpaceId // TODO: remove Error error } @@ -361,5 +363,7 @@ type PNFilesEvent struct { Subscription string Publisher string Timetoken int64 + Type string + SpaceId SpaceId // TODO: remove Error error } diff --git a/publish_file_message.go b/publish_file_message.go index 53cceb23..86c9fdbe 100644 --- a/publish_file_message.go +++ b/publish_file_message.go @@ -51,6 +51,18 @@ func (b *publishFileMessageBuilder) Meta(meta interface{}) *publishFileMessageBu return b } +func (b *publishFileMessageBuilder) SpaceId(id SpaceId) *publishFileMessageBuilder { + b.opts.SpaceId = id + + return b +} + +func (b *publishFileMessageBuilder) Type(typ string) *publishFileMessageBuilder { + b.opts.Type = typ + + return b +} + // ShouldStore if true the messages are stored in History func (b *publishFileMessageBuilder) ShouldStore(store bool) *publishFileMessageBuilder { b.opts.ShouldStore = store @@ -129,6 +141,8 @@ type publishFileMessageOpts struct { UsePost bool TTL int Meta interface{} + SpaceId SpaceId + Type string ShouldStore bool setTTL bool setShouldStore bool @@ -244,6 +258,14 @@ func (o *publishFileMessageOpts) buildPath() (string, error) { func (o *publishFileMessageOpts) buildQuery() (*url.Values, error) { q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) + if o.Type != "" { + q.Set(publishTypeQueryParam, o.Type) + } + + if o.SpaceId != "" { + q.Set(publishSpaceIdQueryParam, string(o.SpaceId)) + } + SetQueryParam(q, o.QueryParam) return q, nil diff --git a/publish_file_message_test.go b/publish_file_message_test.go index 04274f5e..8bd88928 100644 --- a/publish_file_message_test.go +++ b/publish_file_message_test.go @@ -298,3 +298,37 @@ func TestPublishFileMessageResponseValuePass(t *testing.T) { assert.Nil(err) } + +func TestPublishFileMessageSpaceIdQueryParamIsPassed(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + expectedSpaceId := SpaceId("spaceId") + queryValues, _ := pn.PublishFileMessage().SpaceId(expectedSpaceId).opts.buildQuery() + + a.Equal(expectedSpaceId, SpaceId(queryValues.Get(publishSpaceIdQueryParam))) +} + +func TestPublishFileMessageMissingSpaceIdQueryParamIsNotSet(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + queryValues, _ := pn.PublishFileMessage().opts.buildQuery() + + a.Equal("", queryValues.Get(publishSpaceIdQueryParam)) +} + +func TestPublishFileMessageTypeQueryParamIsPassed(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + expectedType := "customMessageType" + queryValues, _ := pn.PublishFileMessage().Type(expectedType).opts.buildQuery() + + a.Equal(expectedType, queryValues.Get(publishTypeQueryParam)) +} + +func TestPublishFileMessageMissingMessageTypeQueryParamIsNotSet(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + queryValues, _ := pn.PublishFileMessage().opts.buildQuery() + + a.Equal("", queryValues.Get(publishTypeQueryParam)) +} diff --git a/publish_request.go b/publish_request.go index c625f3e5..8fc01515 100644 --- a/publish_request.go +++ b/publish_request.go @@ -20,6 +20,9 @@ const publishPostPath = "/publish/%s/%s/0/%s/%s" var emptyPublishResponse *PublishResponse +const publishSpaceIdQueryParam = "space-id" +const publishTypeQueryParam = "type" + type publishOpts struct { endpointOpts @@ -27,6 +30,8 @@ type publishOpts struct { Channel string Message interface{} Meta interface{} + SpaceId SpaceId + Type string UsePost bool ShouldStore bool @@ -125,6 +130,17 @@ func (b *publishBuilder) Meta(meta interface{}) *publishBuilder { return b } +func (b *publishBuilder) SpaceId(id SpaceId) *publishBuilder { + b.opts.SpaceId = id + return b +} + +func (b *publishBuilder) Type(typ string) *publishBuilder { + b.opts.Type = typ + + return b +} + // UsePost sends the Publish request using HTTP POST. func (b *publishBuilder) UsePost(post bool) *publishBuilder { b.opts.UsePost = post @@ -306,6 +322,14 @@ func (o *publishOpts) buildQuery() (*url.Values, error) { q.Set("meta", string(meta)) } + if o.Type != "" { + q.Set(publishTypeQueryParam, o.Type) + } + + if o.SpaceId != "" { + q.Set(publishSpaceIdQueryParam, string(o.SpaceId)) + } + if o.setShouldStore { if o.ShouldStore { q.Set("store", "1") diff --git a/publish_request_test.go b/publish_request_test.go index b14e2e0d..25fefd7c 100644 --- a/publish_request_test.go +++ b/publish_request_test.go @@ -2,15 +2,15 @@ package pubnub import ( "fmt" + "github.com/stretchr/testify/assert" "net/url" "testing" h "github.com/pubnub/go/v7/tests/helpers" - "github.com/stretchr/testify/assert" ) func AssertSuccessPublishGet(t *testing.T, expectedString string, message interface{}) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) @@ -22,7 +22,7 @@ func AssertSuccessPublishGet(t *testing.T, expectedString string, message interf o.DoNotReplicate(true) path, err := o.opts.buildPath() - assert.Nil(err) + a.Nil(err) h.AssertPathsEqual(t, fmt.Sprintf("/publish/demo/demo/0/ch/0/%s", expectedString), @@ -30,15 +30,15 @@ func AssertSuccessPublishGet(t *testing.T, expectedString string, message interf body, err := o.opts.buildBody() - assert.Nil(err) - assert.Empty(body) - assert.Equal(10, o.opts.TTL) - assert.Equal(true, o.opts.ShouldStore) - assert.Equal(true, o.opts.DoNotReplicate) + a.Nil(err) + a.Empty(body) + a.Equal(10, o.opts.TTL) + a.Equal(true, o.opts.ShouldStore) + a.Equal(true, o.opts.DoNotReplicate) } func AssertSuccessPublishGetContext(t *testing.T, expectedString string, message interface{}) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) @@ -50,7 +50,7 @@ func AssertSuccessPublishGetContext(t *testing.T, expectedString string, message o.DoNotReplicate(true) path, err := o.opts.buildPath() - assert.Nil(err) + a.Nil(err) h.AssertPathsEqual(t, fmt.Sprintf("/publish/demo/demo/0/ch/0/%s", expectedString), @@ -58,15 +58,15 @@ func AssertSuccessPublishGetContext(t *testing.T, expectedString string, message body, err := o.opts.buildBody() - assert.Nil(err) - assert.Empty(body) - assert.Equal(10, o.opts.TTL) - assert.Equal(true, o.opts.ShouldStore) - assert.Equal(true, o.opts.DoNotReplicate) + a.Nil(err) + a.Empty(body) + a.Equal(10, o.opts.TTL) + a.Equal(true, o.opts.ShouldStore) + a.Equal(true, o.opts.DoNotReplicate) } func AssertSuccessPublishGet2(t *testing.T, expectedString string, message interface{}) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.AuthKey = "a" @@ -79,7 +79,7 @@ func AssertSuccessPublishGet2(t *testing.T, expectedString string, message inter o.DoNotReplicate(true) path, err := o.opts.buildPath() - assert.Nil(err) + a.Nil(err) h.AssertPathsEqual(t, fmt.Sprintf("/publish/demo/demo/0/ch/0/%s", expectedString), @@ -88,7 +88,7 @@ func AssertSuccessPublishGet2(t *testing.T, expectedString string, message inter query, err := o.opts.buildQuery() //log.Println(query) - assert.Nil(err) + a.Nil(err) expected := &url.Values{} expected.Set("seqn", "1") expected.Set("uuid", pn.Config.UUID) @@ -103,7 +103,7 @@ func AssertSuccessPublishGet2(t *testing.T, expectedString string, message inter } func AssertSuccessPublishGet3(t *testing.T, expectedString string, message interface{}) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.AuthKey = "a" @@ -124,7 +124,7 @@ func AssertSuccessPublishGet3(t *testing.T, expectedString string, message inter query, err := o.opts.buildQuery() //log.Println(query) - assert.Nil(err) + a.Nil(err) expected := &url.Values{} expected.Set("seqn", "1") expected.Set("uuid", pn.Config.UUID) @@ -142,7 +142,7 @@ func AssertSuccessPublishGet3(t *testing.T, expectedString string, message inter func AssertSuccessPublishGetAuth(t *testing.T, expectedString string, message interface{}) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.AuthKey = "PubAuthKey" @@ -155,7 +155,7 @@ func AssertSuccessPublishGetAuth(t *testing.T, expectedString string, message in o.DoNotReplicate(true) path, err := o.opts.buildPath() - assert.Nil(err) + a.Nil(err) h.AssertPathsEqual(t, fmt.Sprintf("/publish/demo/demo/0/ch/0/%s", expectedString), @@ -163,36 +163,16 @@ func AssertSuccessPublishGetAuth(t *testing.T, expectedString string, message in body, err := o.opts.buildBody() - assert.Nil(err) - assert.Empty(body) - assert.Equal(10, o.opts.TTL) - assert.Equal(true, o.opts.ShouldStore) - assert.Equal(true, o.opts.DoNotReplicate) - -} - -func AssertSuccessPublishGetMeta(t *testing.T, expectedString string, message interface{}) { - assert := assert.New(t) - - pn := NewPubNub(NewDemoConfig()) - - o := newPublishBuilder(pn) - o.Meta(nil) - - path, err := o.opts.buildPath() - assert.Nil(err) - - h.AssertPathsEqual(t, - fmt.Sprintf("/publish/demo/demo/0/ch/0/%s", expectedString), - path, []int{}) - - _, err1 := o.opts.buildQuery() + a.Nil(err) + a.Empty(body) + a.Equal(10, o.opts.TTL) + a.Equal(true, o.opts.ShouldStore) + a.Equal(true, o.opts.DoNotReplicate) - assert.Nil(err1) } func AssertSuccessPublishPost(t *testing.T, pn *PubNub, expectedBody string, message interface{}) { - assert := assert.New(t) + a := assert.New(t) opts := newPublishOpts(pn, pn.ctx) opts.Channel = "ch" @@ -201,7 +181,7 @@ func AssertSuccessPublishPost(t *testing.T, pn *PubNub, expectedBody string, mes opts.Serialize = true path, err := opts.buildPath() - assert.Nil(err) + a.Nil(err) u := &url.URL{ Path: path, } @@ -210,8 +190,8 @@ func AssertSuccessPublishPost(t *testing.T, pn *PubNub, expectedBody string, mes u.EscapedPath(), []int{}) body, err := opts.buildBody() - assert.Nil(err) - assert.Equal(expectedBody, string(body)) + a.Nil(err) + a.Equal(expectedBody, string(body)) } func TestPublishMixedGet(t *testing.T) { @@ -299,7 +279,7 @@ func TestPublishMixedPost(t *testing.T) { } func TestPublishDoNotSerializePost(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) message := "{\"one\":\"hey\"}" @@ -309,7 +289,7 @@ func TestPublishDoNotSerializePost(t *testing.T) { opts.UsePost = true path, err := opts.buildPath() - assert.Nil(err) + a.Nil(err) u := &url.URL{ Path: path, } @@ -318,12 +298,12 @@ func TestPublishDoNotSerializePost(t *testing.T) { u.EscapedPath(), []int{}) body, err := opts.buildBody() - assert.Nil(err) - assert.NotEmpty(body) + a.Nil(err) + a.NotEmpty(body) } func TestPublishDoNotSerializeInvalidPost(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) msgMap := make(map[string]string) @@ -338,7 +318,7 @@ func TestPublishDoNotSerializeInvalidPost(t *testing.T) { opts.Serialize = false path, err := opts.buildPath() - assert.Nil(err) + a.Nil(err) u := &url.URL{ Path: path, } @@ -347,12 +327,12 @@ func TestPublishDoNotSerializeInvalidPost(t *testing.T) { u.EscapedPath(), []int{}) body, err := opts.buildBody() - assert.Contains(err.Error(), "Message is not JSON serialized.") - assert.Empty(body) + a.Contains(err.Error(), "Message is not JSON serialized.") + a.Empty(body) } func TestPublishMeta(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) meta := make(map[string]string) @@ -367,7 +347,7 @@ func TestPublishMeta(t *testing.T) { opts.Meta = meta query, err := opts.buildQuery() - assert.Nil(err) + a.Nil(err) expected := &url.Values{} expected.Set("meta", @@ -377,12 +357,12 @@ func TestPublishMeta(t *testing.T) { []string{"seqn", "pnsdk", "uuid", "store"}, []string{}) body, err := opts.buildBody() - assert.Nil(err) - assert.Empty(body) + a.Nil(err) + a.Empty(body) } func TestPublishStore(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) opts := newPublishOpts(pubnub, pubnub.ctx) opts.Channel = "ch" @@ -391,7 +371,7 @@ func TestPublishStore(t *testing.T) { opts.setShouldStore = true query, err := opts.buildQuery() - assert.Nil(err) + a.Nil(err) expected := &url.Values{} expected.Set("store", "1") @@ -400,12 +380,12 @@ func TestPublishStore(t *testing.T) { []string{"seqn", "pnsdk", "uuid"}, []string{}) body, err := opts.buildBody() - assert.Nil(err) - assert.Empty(body) + a.Nil(err) + a.Empty(body) } func TestPublishEncrypt(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.UseRandomInitializationVector = false @@ -416,16 +396,16 @@ func TestPublishEncrypt(t *testing.T) { opts.Message = "hey" path, err := opts.buildPath() - assert.Nil(err) + a.Nil(err) - assert.Equal( + a.Equal( "/publish/demo/demo/0/ch/0/%22MnwzPGdVgz2osQCIQJviGg%3D%3D%22", path) pnconfig.CipherKey = "" } func TestPublishEncryptPNOther(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.UseRandomInitializationVector = false @@ -441,16 +421,16 @@ func TestPublishEncryptPNOther(t *testing.T) { opts.Serialize = true path, err := opts.buildPath() - assert.Nil(err) + a.Nil(err) - assert.Equal( + a.Equal( "/publish/demo/demo/0/ch/0/%7B%22not_other%22%3A%221234%22%2C%22pn_other%22%3A%22Wi24KS4pcTzvyuGOHubiXg%3D%3D%22%7D", path) pn.Config.CipherKey = "" } func TestPublishEncryptPNOtherDisable(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.UseRandomInitializationVector = false pn.Config.CipherKey = "enigma" @@ -467,15 +447,15 @@ func TestPublishEncryptPNOtherDisable(t *testing.T) { opts.Serialize = true path, err := opts.buildPath() - assert.Nil(err) + a.Nil(err) - assert.Equal( + a.Equal( "/publish/demo/demo/0/ch/0/%22bCC%2FkQbGdScQ0teYcawUsnASfRpUioutNKQfUAQNc46gWR%2FJnz8Ks5n%2FvfKnDkE6%22", path) pn.Config.CipherKey = "" } func TestPublishSequenceCounter(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) meta := make(map[string]string) @@ -491,41 +471,75 @@ func TestPublishSequenceCounter(t *testing.T) { for i := 1; i <= MaxSequence; i++ { counter := opts.pubnub.getPublishSequence() if counter == MaxSequence { - assert.Equal(1, opts.pubnub.getPublishSequence()) + a.Equal(1, opts.pubnub.getPublishSequence()) break } } } func TestNewPublishResponse(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) jsonBytes := []byte(`s`) _, _, err := newPublishResponse(jsonBytes, StatusResponse{}) - assert.Equal("pubnub/parsing: Error unmarshalling response: {s}", err.Error()) + a.Equal("pubnub/parsing: Error unmarshalling response: {s}", err.Error()) } func TestNewPublishResponseTimestamp(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) jsonBytes := []byte(`[1, Sent, "a"]`) _, _, err := newPublishResponse(jsonBytes, StatusResponse{}) - assert.Equal("pubnub/parsing: Error unmarshalling response: {[1, Sent, \"a\"]}", err.Error()) + a.Equal("pubnub/parsing: Error unmarshalling response: {[1, Sent, \"a\"]}", err.Error()) } func TestNewPublishResponseTimestamp2(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) jsonBytes := []byte(`[1, "Sent", "a"]`) _, _, err := newPublishResponse(jsonBytes, StatusResponse{}) - assert.Contains(err.Error(), "parsing \"a\": invalid syntax") + a.Contains(err.Error(), "parsing \"a\": invalid syntax") } func TestPublishValidateSubscribeKey(t *testing.T) { - assert := assert.New(t) + a := assert.New(t) pn := NewPubNub(NewDemoConfig()) pn.Config.SubscribeKey = "" opts := newPublishOpts(pn, pn.ctx) - assert.Equal("pubnub/validation: pubnub: Publish: Missing Subscribe Key", opts.validate().Error()) + a.Equal("pubnub/validation: pubnub: Publish: Missing Subscribe Key", opts.validate().Error()) +} + +func TestPublishSpaceIdQueryParamIsPassed(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + expectedSpaceId := SpaceId("spaceId") + queryValues, _ := pn.Publish().SpaceId(expectedSpaceId).opts.buildQuery() + + a.Equal(expectedSpaceId, SpaceId(queryValues.Get(publishSpaceIdQueryParam))) +} + +func TestPublishMissingSpaceIdQueryParamIsNotSet(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + queryValues, _ := pn.Publish().opts.buildQuery() + + a.Equal("", queryValues.Get(publishSpaceIdQueryParam)) +} + +func TestPublishMessageTypeQueryParamIsPassed(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + expectedType := "customMessageType" + queryValues, _ := pn.Publish().Type(expectedType).opts.buildQuery() + + a.Equal(expectedType, queryValues.Get(publishTypeQueryParam)) +} + +func TestPublishMissingMessageTypeQueryParamIsNotSet(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + queryValues, _ := pn.Publish().opts.buildQuery() + + a.Equal("", queryValues.Get(publishTypeQueryParam)) } diff --git a/pubnub.go b/pubnub.go index 93c2a585..12dfefd3 100644 --- a/pubnub.go +++ b/pubnub.go @@ -745,7 +745,7 @@ func (pn *PubNub) Destroy() { pn.requestWorkers.Close() pn.Config.Log.Println("after close requestWorkers") pn.tokenManager.CleanUp() - pn.client.CloseIdleConnections() + pn.GetClient().CloseIdleConnections() } diff --git a/signal_request.go b/signal_request.go index 4e7dde1e..94f7a22d 100644 --- a/signal_request.go +++ b/signal_request.go @@ -69,6 +69,18 @@ func (b *signalBuilder) QueryParam(queryParam map[string]string) *signalBuilder return b } +func (b *signalBuilder) SpaceId(id SpaceId) *signalBuilder { + b.opts.SpaceId = id + + return b +} + +func (b *signalBuilder) Type(typ string) *signalBuilder { + b.opts.Type = typ + + return b +} + // Execute runs the Signal request. func (b *signalBuilder) Execute() (*SignalResponse, StatusResponse, error) { rawJSON, status, err := executeRequest(b.opts) @@ -85,6 +97,8 @@ type signalOpts struct { Channel string UsePost bool QueryParam map[string]string + SpaceId SpaceId + Type string Transport http.RoundTripper } @@ -130,6 +144,14 @@ func (o *signalOpts) buildQuery() (*url.Values, error) { SetQueryParam(q, o.QueryParam) + if o.Type != "" { + q.Set(publishTypeQueryParam, o.Type) + } + + if o.SpaceId != "" { + q.Set(publishSpaceIdQueryParam, string(o.SpaceId)) + } + return q, nil } diff --git a/signal_request_test.go b/signal_request_test.go index faea4ea6..b9652ed3 100644 --- a/signal_request_test.go +++ b/signal_request_test.go @@ -134,7 +134,7 @@ func TestSignalResponseValueError(t *testing.T) { assert.Equal("pubnub/parsing: Error unmarshalling response: {s}", err.Error()) } -//[1, "Sent", "1232423423423"] +// [1, "Sent", "1232423423423"] func TestSignalResponseValuePass(t *testing.T) { assert := assert.New(t) pn := NewPubNub(NewDemoConfig()) @@ -144,3 +144,41 @@ func TestSignalResponseValuePass(t *testing.T) { _, _, err := newSignalResponse(jsonBytes, opts, StatusResponse{}) assert.Nil(err) } + +func TestSignalSpaceIdQueryParamIsPassed(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + expectedSpaceId := SpaceId("spaceId") + queryValues, _ := pn.Signal().SpaceId(expectedSpaceId).opts.buildQuery() + + a.Equal(expectedSpaceId, SpaceId(queryValues.Get(publishSpaceIdQueryParam))) +} + +func TestSignalMissingSpaceIdQueryParamIsNotSet(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + queryValues, _ := pn.Signal().opts.buildQuery() + + a.Equal("", queryValues.Get(publishSpaceIdQueryParam)) +} + +func TestSignalMessageTypeQueryParamIsPassed(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + expectedType := "customMessageType" + queryValues, _ := pn.Signal().Type(expectedType).opts.buildQuery() + + a.Equal(expectedType, queryValues.Get(publishTypeQueryParam)) +} + +func MessageType(s string) { + panic("unimplemented") +} + +func TestSignalMissingMessageTypeQueryParamIsNotSet(t *testing.T) { + a := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + queryValues, _ := pn.Signal().opts.buildQuery() + + a.Equal("", queryValues.Get(publishTypeQueryParam)) +} diff --git a/subscription_manager.go b/subscription_manager.go index cbbe4913..e1b1282d 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -481,6 +481,8 @@ type subscribeMessage struct { UserMetadata interface{} `json:"u"` MessageType PNMessageType `json:"e"` SequenceNumber int `json:"s"` + SpaceId SpaceId `json:"si"` + Type string `json:"mt"` PublishMetaData publishMetadata `json:"p"` } @@ -638,7 +640,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage, switch payload.MessageType { case PNMessageTypeSignal: - pnMessageResult := createPNMessageResult(payload.Payload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, /*no error*/nil) + pnMessageResult := createPNMessageResult(payload.Payload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.Type, /*TODO: to remove*/payload.SpaceId, /*no error*/nil) m.pubnub.Config.Log.Println("announceSignal,", pnMessageResult) m.listenerManager.announceSignal(pnMessageResult) case PNMessageTypeObjects: @@ -675,7 +677,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage, } - pnFilesEvent := createPNFilesEvent(messagePayload, m, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, err) + pnFilesEvent := createPNFilesEvent(messagePayload, m, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.Type, /* TODO: to remove*/payload.SpaceId, err) m.pubnub.Config.Log.Println("PNMessageTypeFile:", PNMessageTypeFile) m.listenerManager.announceFile(pnFilesEvent) default: @@ -693,7 +695,7 @@ func processNonPresencePayload(m *SubscriptionManager, payload subscribeMessage, m.listenerManager.announceStatus(pnStatus) } - pnMessageResult := createPNMessageResult(messagePayload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, err) + pnMessageResult := createPNMessageResult(messagePayload, actualCh, subscribedCh, channel, subscriptionMatch, payload.IssuingClientID, payload.UserMetadata, timetoken, payload.Type, /*TODO: to remove*/payload.SpaceId, err) m.pubnub.Config.Log.Println("announceMessage,", pnMessageResult) m.listenerManager.announceMessage(pnMessageResult) } @@ -716,7 +718,7 @@ func processSubscribePayload(m *SubscriptionManager, payload subscribeMessage) { } } -func createPNFilesEvent(filePayload interface{}, m *SubscriptionManager, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, err error) *PNFilesEvent { +func createPNFilesEvent(filePayload interface{}, m *SubscriptionManager, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, typ string, /*TODO: to remove*/spaceId SpaceId, err error) *PNFilesEvent { var filesPayload map[string]interface{} var ok bool if filesPayload, ok = filePayload.(map[string]interface{}); !ok { @@ -747,6 +749,8 @@ func createPNFilesEvent(filePayload interface{}, m *SubscriptionManager, actualC Timetoken: timetoken, Publisher: issuingClientID, UserMetadata: userMetadata, + SpaceId: spaceId, /*TODO: to remove*/ + Type: typ, Error: err, } return pnFilesEvent @@ -934,7 +938,7 @@ func createPNObjectsResult(objPayload interface{}, m *SubscriptionManager, actua return pnUUIDEvent, pnChannelEvent, pnMembershipEvent, eventType } -func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, error error) *PNMessage { +func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, channel, subscriptionMatch, issuingClientID string, userMetadata interface{}, timetoken int64, typ string, /* TODO: to remove */spaceId SpaceId, error error) *PNMessage { pnMessageResult := &PNMessage{ Message: messagePayload, @@ -945,6 +949,8 @@ func createPNMessageResult(messagePayload interface{}, actualCh, subscribedCh, c Timetoken: timetoken, Publisher: issuingClientID, UserMetadata: userMetadata, + Type: typ, + SpaceId: spaceId, // TODO: to remove Error: error, } diff --git a/subscription_manager_test.go b/subscription_manager_test.go index 47f8243f..ce2d5e99 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -571,6 +571,61 @@ func TestProcessSubscribePayloadCipherErr(t *testing.T) { //pn.Destroy() } +func Test_processNonPresencePayload_spaceId(t *testing.T) { + tests := []struct { + name string + inputSpaceId SpaceId + expectedSpaceId SpaceId + }{ + { + name: "If SpaceId is not set the default is empty SpaceId", + inputSpaceId: "", + expectedSpaceId: "", + }, + { + name: "If SpaceId is set it's value of SpaceId", + inputSpaceId: "spaceId", + expectedSpaceId: "spaceId", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pn := NewPubNubDemo() + + payload := payload() + payload.SpaceId = tt.inputSpaceId + msg := pn.payloadToMsg(payload) + + assert.Equal(t, tt.expectedSpaceId, msg.SpaceId) + }) + } +} + +func payload() subscribeMessage { + return subscribeMessage{ + Shard: "shard", + SubscriptionMatch: "subscriptionMatch", + Channel: "channel", + IssuingClientID: "issueClientId", + SubscribeKey: "sub", + Flags: 0, + Payload: nil, + UserMetadata: nil, + MessageType: 0, + SequenceNumber: 42, + SpaceId: "", + Type: "", + PublishMetaData: publishMetadata{PublishTimetoken: "10", Region: 42}, + } +} + +func (pn *PubNub) payloadToMsg(payload subscribeMessage) *PNMessage { + listener := NewListener() + pn.AddListener(listener) + processNonPresencePayload(pn.subscriptionManager, payload, "channel", "subscriptionMatch", payload.PublishMetaData) + return <-listener.Message +} + func TestDecryptionProcessOnEncryptedMessage(t *testing.T) { assert := assert.New(t) pn := NewPubNub(NewDemoConfig()) diff --git a/tests/contract/access_state_test.go b/tests/contract/access_state_test.go index 0b5fa1f5..b8dd23ca 100644 --- a/tests/contract/access_state_test.go +++ b/tests/contract/access_state_test.go @@ -1,6 +1,9 @@ package contract -import pubnub "github.com/pubnub/go/v7" +import ( + "context" + pubnub "github.com/pubnub/go/v7" +) type accessStateKey struct{} @@ -21,7 +24,7 @@ type accessState struct { RevokeTokenResult pubnub.PNRevokeTokenResponse } -func newAccessState(pn *pubnub.PubNub) *accessState { +func newAccessState() *accessState { return &accessState{ TTL: 0, ChannelPermissions: make(map[string]*pubnub.ChannelPermissions), @@ -32,3 +35,7 @@ func newAccessState(pn *pubnub.PubNub) *accessState { UUIDPatternPermissions: make(map[string]*pubnub.UUIDPermissions), CurrentPermissions: nil} } + +func getAccessState(ctx context.Context) *accessState { + return ctx.Value(accessStateKey{}).(*accessState) +} diff --git a/tests/contract/common_given_steps_test.go b/tests/contract/common_given_steps_test.go index 8581d2f0..d05c5434 100644 --- a/tests/contract/common_given_steps_test.go +++ b/tests/contract/common_given_steps_test.go @@ -2,7 +2,6 @@ package contract import ( "context" - pubnub "github.com/pubnub/go/v7" ) @@ -30,3 +29,16 @@ func iHaveAKeysetWithAccessManagerEnabledWithoutSecretKey(ctx context.Context) e state.pubNub = pubnub.NewPubNub(config) return nil } + +func theDemoKeyset(ctx context.Context) error { + state := getCommonState(ctx) + config := pubnub.NewConfigWithUserId(pubnub.UserId(pubnub.GenerateUUID())) + config.PublishKey = state.contractTestConfig.publishKey + config.SubscribeKey = state.contractTestConfig.subscribeKey + config.SecretKey = state.contractTestConfig.secretKey + config.Origin = state.contractTestConfig.hostPort + config.Secure = state.contractTestConfig.secure + + state.pubNub = pubnub.NewPubNub(config) + return nil +} diff --git a/tests/contract/common_then_steps_test.go b/tests/contract/common_then_steps_test.go index 52de3c3c..83c45f88 100644 --- a/tests/contract/common_then_steps_test.go +++ b/tests/contract/common_then_steps_test.go @@ -61,3 +61,11 @@ func theResultIsSuccessful(ctx context.Context) error { return nil } + +func iReceiveASuccessfulResponse(ctx context.Context) error { + return theResultIsSuccessful(ctx) +} + +func iReceiveErrorResponse(ctx context.Context) error { + return anErrorIsReturned(ctx) +} diff --git a/tests/contract/files_steps_test.go b/tests/contract/files_steps_test.go new file mode 100644 index 00000000..6ffdc693 --- /dev/null +++ b/tests/contract/files_steps_test.go @@ -0,0 +1,31 @@ +package contract + +import ( + "context" + pubnub "github.com/pubnub/go/v7" + "os" +) + +func iSendAFileWithSpaceidAndType(ctx context.Context, spaceId string, typ string) error { + commonState := getCommonState(ctx) + + file, err := os.Open("test_file.txt") + defer file.Close() + if err != nil { + return err + } + + _, s, err := commonState.pubNub.SendFile(). + Message("This is a message"). + Type(typ). + SpaceId(pubnub.SpaceId(spaceId)). + File(file). + Name("name"). + Channel("channel"). + Execute() + + commonState.err = err + commonState.statusResponse = s + + return nil +} diff --git a/tests/contract/history_state_test.go b/tests/contract/history_state_test.go new file mode 100644 index 00000000..ab586457 --- /dev/null +++ b/tests/contract/history_state_test.go @@ -0,0 +1,20 @@ +package contract + +import ( + "context" + pubnub "github.com/pubnub/go/v7" +) + +type historyStateKey struct{} + +type historyState struct { + fetchResponse *pubnub.FetchResponse +} + +func newHistoryState() *historyState { + return &historyState{} +} + +func getHistoryState(ctx context.Context) *historyState { + return ctx.Value(historyStateKey{}).(*historyState) +} diff --git a/tests/contract/history_steps_test.go b/tests/contract/history_steps_test.go new file mode 100644 index 00000000..8059522d --- /dev/null +++ b/tests/contract/history_steps_test.go @@ -0,0 +1,107 @@ +package contract + +import ( + "context" + "fmt" +) + +func historyResponseContainsMessagesWithProvidedTypes(ctx context.Context, firstType string, secondType string) error { + historyState := getHistoryState(ctx) + + for _, fetchResponseItems := range historyState.fetchResponse.Messages { + for _, item := range fetchResponseItems { + if item.Type != firstType && item.Type != secondType { + return fmt.Errorf("expected type to be %s or %s, but found %s", firstType, secondType, item.Type) + } + } + } + return nil +} + +func historyResponseContainsMessagesWithProvidedMessageTypes(ctx context.Context, firstType int, secondType int) error { + historyState := getHistoryState(ctx) + + for _, fetchResponseItems := range historyState.fetchResponse.Messages { + for _, item := range fetchResponseItems { + if item.MessageType != firstType && item.MessageType != secondType { + return fmt.Errorf("expected message type to be %d or %d, but found %d", firstType, secondType, item.MessageType) + } + } + } + return nil +} + +func historyResponseContainsMessagesWithSpaceIds(ctx context.Context) error { + historyState := getHistoryState(ctx) + + for _, fetchResponseItems := range historyState.fetchResponse.Messages { + for _, item := range fetchResponseItems { + if item.SpaceId == "" { + return fmt.Errorf("expected non empty space id") + } + } + } + return nil + +} + +func historyResponseContainsMessagesWithoutType(ctx context.Context) error { + historyState := getHistoryState(ctx) + + for _, fetchResponseItems := range historyState.fetchResponse.Messages { + for _, item := range fetchResponseItems { + if item.Type != "" { + return fmt.Errorf("expected empty message type, but found %s", item.Type) + } + } + } + return nil +} + +func historyResponseContainsMessagesWithoutSpaceIds(ctx context.Context) error { + historyState := getHistoryState(ctx) + + for _, fetchResponseItems := range historyState.fetchResponse.Messages { + for _, item := range fetchResponseItems { + if item.SpaceId != "" { + return fmt.Errorf("expected empty space id, but found %s", item.SpaceId) + } + } + } + return nil +} + +func iFetchMessageHistoryForChannel(ctx context.Context, channel string) error { + commonState := getCommonState(ctx) + historyState := getHistoryState(ctx) + r, s, err := commonState.pubNub.Fetch().Channels([]string{channel}).Execute() + + commonState.err = err + commonState.statusResponse = s + historyState.fetchResponse = r + return nil +} + +func iFetchMessageHistoryWithIncludeTypeSetToFalseForChannel(ctx context.Context, channel string) error { + commonState := getCommonState(ctx) + historyState := getHistoryState(ctx) + + r, s, err := commonState.pubNub.Fetch().Channels([]string{channel}).IncludeType(false).Execute() + + commonState.err = err + commonState.statusResponse = s + historyState.fetchResponse = r + return nil +} + +func iFetchMessageHistoryWithIncludeSpaceIdSetToTrueForChannel(ctx context.Context, channel string) error { + commonState := getCommonState(ctx) + historyState := getHistoryState(ctx) + + r, s, err := commonState.pubNub.Fetch().Channels([]string{channel}).IncludeSpaceId(true).Execute() + + commonState.err = err + commonState.statusResponse = s + historyState.fetchResponse = r + return nil +} diff --git a/tests/contract/publish_steps_test.go b/tests/contract/publish_steps_test.go new file mode 100644 index 00000000..de68b9bb --- /dev/null +++ b/tests/contract/publish_steps_test.go @@ -0,0 +1,19 @@ +package contract + +import ( + "context" + pubnub "github.com/pubnub/go/v7" +) + +func iPublishMessageWithSpaceIdAndType(ctx context.Context, spaceId string, typ string) error { + commonState := getCommonState(ctx) + + _, s, err := commonState.pubNub.Publish(). + Message("whatever"). + Channel("whatever"). + Type(typ). + SpaceId(pubnub.SpaceId(spaceId)).Execute() + commonState.err = err + commonState.statusResponse = s + return nil +} diff --git a/tests/contract/signal_steps_test.go b/tests/contract/signal_steps_test.go new file mode 100644 index 00000000..d3bb5d01 --- /dev/null +++ b/tests/contract/signal_steps_test.go @@ -0,0 +1,19 @@ +package contract + +import ( + "context" + pubnub "github.com/pubnub/go/v7" +) + +func iSendASignalWithSpaceidSpaceIdAndType(ctx context.Context, spaceId string, typ string) error { + commonState := getCommonState(ctx) + + _, s, err := commonState.pubNub.Signal(). + Message("whatever"). + Channel("whatever"). + Type(typ). + SpaceId(pubnub.SpaceId(spaceId)).Execute() + commonState.err = err + commonState.statusResponse = s + return nil +} diff --git a/tests/contract/steps_mapping_test.go b/tests/contract/steps_mapping_test.go index c69a3442..23d74412 100644 --- a/tests/contract/steps_mapping_test.go +++ b/tests/contract/steps_mapping_test.go @@ -56,6 +56,35 @@ func MapSteps(ctx *godog.ScenarioContext) { ctx.Step(`^the result is successful$`, theResultIsSuccessful) ctx.Step(`^the token string \'(.*)\'$`, theTokenString) + ctx.Step(`^the demo keyset$`, theDemoKeyset) + ctx.Step(`^the demo keyset with enabled storage$`, theDemoKeyset) + + ctx.Step(`^I publish message with \'(.*)\' space id and \'(.*)\' type$`, iPublishMessageWithSpaceIdAndType) + ctx.Step(`^I send a signal with \'(.*)\' space id and \'(.*)\' type$`, iSendASignalWithSpaceidSpaceIdAndType) + ctx.Step(`^I receive a successful response$`, iReceiveASuccessfulResponse) + ctx.Step(`^I receive error response$`, iReceiveErrorResponse) + ctx.Step(`^I receive an error response$`, iReceiveErrorResponse) + + ctx.Step(`^I receive the message in my subscribe response$`, iReceiveTheMessageInMySubscribeResponse) + ctx.Step(`^I subscribe to \'(.*)\' channel$`, iSubscribeToChannel) + ctx.Step(`^subscribe response contains messages with space ids$`, subscribeResponseContainsMessagesWithSpaceIds) + ctx.Step(`^subscribe response contains messages without space ids$`, subscribeResponseContainsMessagesWithoutSpaceIds) + + ctx.Step(`^history response contains messages with \'(.*)\' and \'(.*)\' types$`, historyResponseContainsMessagesWithProvidedTypes) + ctx.Step(`^history response contains messages with \'(\d+)\' and \'(\d+)\' message types$`, historyResponseContainsMessagesWithProvidedMessageTypes) + ctx.Step(`^history response contains messages with space ids$`, historyResponseContainsMessagesWithSpaceIds) + ctx.Step(`^history response contains messages without types$`, historyResponseContainsMessagesWithoutType) + ctx.Step(`^history response contains messages without space ids$`, historyResponseContainsMessagesWithoutSpaceIds) + ctx.Step(`^I fetch message history for \'(.*)\' channel$`, iFetchMessageHistoryForChannel) + ctx.Step(`^I fetch message history with \'includeType\' set to \'false\' for \'(.*)\' channel$`, iFetchMessageHistoryWithIncludeTypeSetToFalseForChannel) + ctx.Step(`^I fetch message history with \'includeSpaceId\' set to \'true\' for \'(.*)\' channel$`, iFetchMessageHistoryWithIncludeSpaceIdSetToTrueForChannel) + + ctx.Step(`^I send a file with \'(.*)\' space id and \'(.*)\' message type$`, iSendAFileWithSpaceidAndType) + + ctx.Step(`^I receive (\d+) messages in my subscribe response$`, iReceiveMessagesInMySubscribeResponse) + ctx.Step(`^response contains messages with \'(.*)\' and \'(.*)\' types$`, responseContainsMessagesWithTypes) + ctx.Step(`^response contains messages with space ids$`, responseContainsMessagesWithSpaceIds) + ctx.Step(`^Decrypted file content equal to the \'(.*)\' file content$`, decryptedFileContentEqualToFileContent) ctx.Step(`^I decrypt \'(.*)\' file as \'(.*)\'$`, iDecryptFileAs) ctx.Step(`^I decrypt \'(.*)\' file$`, iDecryptFile) diff --git a/tests/contract/subscribe_state_test.go b/tests/contract/subscribe_state_test.go new file mode 100644 index 00000000..a76c8ccd --- /dev/null +++ b/tests/contract/subscribe_state_test.go @@ -0,0 +1,33 @@ +package contract + +import ( + "sync" +) + +type subscribeStateKey struct{} + +type subscribeState struct { + sync.RWMutex + allSubscribeMessages chan interface{} + all []interface{} +} + +func (s *subscribeState) addSubscribeMessage(msg interface{}) { + s.Lock() + defer s.Unlock() + s.all = append(s.all, msg) +} + +func (s *subscribeState) readAllSubscribeMessages() []interface{} { + s.RLock() + defer s.RUnlock() + r := make([]interface{}, len(s.all)) + copy(r, s.all) + return r +} + +func newSubscribeState() *subscribeState { + return &subscribeState{ + allSubscribeMessages: make(chan interface{}), + } +} diff --git a/tests/contract/subscribe_steps_test.go b/tests/contract/subscribe_steps_test.go new file mode 100644 index 00000000..bd29c644 --- /dev/null +++ b/tests/contract/subscribe_steps_test.go @@ -0,0 +1,99 @@ +package contract + +import ( + "context" + "errors" + "fmt" + pubnub "github.com/pubnub/go/v7" + "time" +) + +func responseContainsMessagesWithTypes(ctx context.Context, firstType string, secondType string) error { + subscribeState := getSubscribeState(ctx) + + return allMessagesMatch(subscribeState.readAllSubscribeMessages(), func(t pubnub.PNMessage) error { + if t.Type != firstType && t.Type != secondType { + return errors.New(fmt.Sprintf("expected %s or %s but found %s", firstType, secondType, t.Type)) + } + return nil + }) +} + +func subscribeResponseContainsMessagesWithSpaceIds(ctx context.Context) error { + subscribeState := getSubscribeState(ctx) + + return allMessagesMatch(subscribeState.readAllSubscribeMessages(), func(t pubnub.PNMessage) error { + if t.SpaceId != "" { + return errors.New("expected spaceId in the element but found empty") + } + return nil + }) +} + +func subscribeResponseContainsMessagesWithoutSpaceIds(ctx context.Context) error { + subscribeState := getSubscribeState(ctx) + + return allMessagesMatch(subscribeState.readAllSubscribeMessages(), func(t pubnub.PNMessage) error { + if t.SpaceId == "" { + return errors.New(fmt.Sprintf("expected empty spaceId in the element but found %s", t.SpaceId)) + } + return nil + }) +} + +func iReceiveTheMessageInMySubscribeResponse(ctx context.Context) error { + return iReceiveMessagesInMySubscribeResponse(ctx, 1) +} + +func iSubscribeToChannel(ctx context.Context, channel string) error { + commonState := getCommonState(ctx) + listener := pubnub.NewListener() + commonState.pubNub.AddListener(listener) + commonState.pubNub.Subscribe().Channels([]string{channel}).Execute() + + subscribeState := getSubscribeState(ctx) + + go func() { + for true { + select { + case <-listener.Status: + //ignore + case item := <-listener.Message: + subscribeState.addSubscribeMessage(item) + case item := <-listener.Presence: + subscribeState.addSubscribeMessage(item) + case item := <-listener.File: + subscribeState.addSubscribeMessage(item) + case item := <-listener.MessageActionsEvent: + subscribeState.addSubscribeMessage(item) + case item := <-listener.Signal: + subscribeState.addSubscribeMessage(item) + case item := <-listener.ChannelEvent: + subscribeState.addSubscribeMessage(item) + case item := <-listener.MembershipEvent: + subscribeState.addSubscribeMessage(item) + case item := <-listener.UUIDEvent: + subscribeState.addSubscribeMessage(item) + } + + } + }() + + return nil +} + +func iReceiveMessagesInMySubscribeResponse(ctx context.Context, numberOfMessages int) error { + subscribeState := getSubscribeState(ctx) + err := checkFor(time.Millisecond*500, time.Millisecond*50, func() error { + if len(subscribeState.readAllSubscribeMessages()) < numberOfMessages { + return fmt.Errorf("received less messages than %d", numberOfMessages) + } else { + return nil + } + }) + return err +} + +func responseContainsMessagesWithSpaceIds(ctx context.Context) error { + return subscribeResponseContainsMessagesWithoutSpaceIds(ctx) +} diff --git a/tests/contract/test_file.txt b/tests/contract/test_file.txt new file mode 100644 index 00000000..a5d1ee3d --- /dev/null +++ b/tests/contract/test_file.txt @@ -0,0 +1 @@ +This is just a dummy content diff --git a/tests/contract/utils_test.go b/tests/contract/utils_test.go index b26c8ba7..79e4d299 100644 --- a/tests/contract/utils_test.go +++ b/tests/contract/utils_test.go @@ -5,11 +5,13 @@ import ( "encoding/json" "errors" "fmt" + pubnub "github.com/pubnub/go/v7" "io/ioutil" "net/http" "os" "strconv" "strings" + "time" "github.com/cucumber/godog" ) @@ -100,14 +102,14 @@ func after(ctx context.Context, sc *godog.Scenario, err error) (context.Context, return ctx, nil } -func getAccessState(ctx context.Context) *accessState { - return ctx.Value(accessStateKey{}).(*accessState) -} - func getCommonState(ctx context.Context) *commonState { return ctx.Value(commonStateKey{}).(*commonState) } +func getSubscribeState(ctx context.Context) *subscribeState { + return ctx.Value(subscribeStateKey{}).(*subscribeState) +} + type contractTestConfigKey struct{} type contractTestConfig struct { @@ -163,3 +165,36 @@ func getenvBoolWithDefault(name string, defaultValue bool) (bool, error) { return value, nil } + +func checkFor(maxTime, intervalTime time.Duration, fun func() error) error { + maxTimeout := time.NewTimer(maxTime) + interval := time.NewTicker(intervalTime) + lastErr := fun() +ForLoop: + for { + select { + case <-interval.C: + lastErr := fun() + if lastErr != nil { + continue + } else { + break ForLoop + } + case <-maxTimeout.C: + return lastErr + } + } + return nil +} + +func allMessagesMatch(msgs []interface{}, predicate func(t pubnub.PNMessage) error) error { + for _, item := range msgs { + switch t := item.(type) { + case pubnub.PNMessage: + if err := predicate(t); err != nil { + return err + } + } + } + return nil +} diff --git a/tests/e2e/files_test.go b/tests/e2e/files_test.go index fc3ae1fd..d3bcf010 100644 --- a/tests/e2e/files_test.go +++ b/tests/e2e/files_test.go @@ -1,15 +1,11 @@ package e2e import ( - "bytes" - "crypto/aes" - "crypto/cipher" "errors" "fmt" "io" "io/ioutil" "log" - "math/rand" "os" "strings" "testing" @@ -20,23 +16,6 @@ import ( "github.com/stretchr/testify/assert" ) -func processInterface(in interface{}) { - switch v := in.(type) { - case map[string]interface{}: - for s, b := range v { - fmt.Printf("%s: b=%v\n", s, b) - } - case map[string]*pubnub.PNFileMessageAndDetails: - - fmt.Printf("*pubnub.PNFileMessageAndDetails") - case map[string]pubnub.PNFileMessageAndDetails: - - fmt.Printf("pubnub.PNFileMessageAndDetails") - default: - fmt.Println("unknown type") - } -} - func TestFileUpload(t *testing.T) { FileUploadCommon(t, false, "", "file_upload_test.txt", "file_upload_test_output.txt") } @@ -51,12 +30,15 @@ func TestFileUploadWithCustomCipher(t *testing.T) { type FileData struct { id, url, name, message string + typ string + spaceId pubnub.SpaceId } func FileUploadCommon(t *testing.T, useCipher bool, customCipher string, filepathInput, filepathOutput string) { assert := assert.New(t) - - pn := pubnub.NewPubNub(pamConfigCopy()) + config := pamConfigCopy() + config.Log = log.Default() + pn := pubnub.NewPubNub(config) if enableDebuggingInTests { pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) } @@ -87,6 +69,8 @@ func FileUploadCommon(t *testing.T, useCipher bool, customCipher string, filepat ch := fmt.Sprintf("test_file_upload_channel_%d", rno) name := fmt.Sprintf("test_file_upload_name_%d.txt", rno) message := fmt.Sprintf("test file %s", name) + expectedType := "This_is_messageType" + expectedSpaceId := pubnub.SpaceId("This_is_spaceId") listener := pubnub.NewListener() exitListener := make(chan bool) @@ -115,7 +99,7 @@ func FileUploadCommon(t *testing.T, useCipher bool, customCipher string, filepat fmt.Printf("file.SubscribedChannel: %s\n", file.SubscribedChannel) fmt.Printf("file.Publisher: %s\n", file.Publisher) } - fileDataChannel <- FileData{file.File.PNFile.ID, file.File.PNFile.URL, file.File.PNFile.Name, file.File.PNMessage.Text} + fileDataChannel <- FileData{file.File.PNFile.ID, file.File.PNFile.URL, file.File.PNFile.Name, file.File.PNMessage.Text, file.Type, file.SpaceId} case <-exitListener: break ExitLabel @@ -130,14 +114,23 @@ func FileUploadCommon(t *testing.T, useCipher bool, customCipher string, filepat // Sleep a bit, to give client some time to subscribe on channels firs. time.Sleep(100 * time.Millisecond) - resSendFile, statusSendFile, _ := pn.SendFile().Channel(ch).Message(message).CipherKey(cipherKey).Name(name).File(file).Execute() + resSendFile, statusSendFile, err := pn.SendFile(). + Channel(ch). + Message(message). + CipherKey(cipherKey). + Name(name). + File(file). + Type(expectedType). + SpaceId(expectedSpaceId). + Execute() assert.Equal(200, statusSendFile.StatusCode) if enableDebuggingInTests { fmt.Println("statusSendFile.AdditionalData:", statusSendFile.AdditionalData) } - if resSendFile == nil { + if err != nil { close(fileDataChannel) + t.Error(err) assert.Fail("resSendFile nil") return } @@ -193,6 +186,8 @@ func FileUploadCommon(t *testing.T, useCipher bool, customCipher string, filepat assert.Equal(id, fileData.id) assert.Equal(message, fileData.message) assert.Equal(retURL, location) + assert.Equal(expectedType, fileData.typ) + assert.Equal(expectedSpaceId, fileData.spaceId) out, errDL := os.Create(filepathOutput) defer out.Close() @@ -289,6 +284,39 @@ func FileUploadCommon(t *testing.T, useCipher bool, customCipher string, filepat } +func TestFileEncryptionDecryption(t *testing.T) { + assert := assert.New(t) + filepathInput := "file_upload_test.txt" + filepathOutput := "file_upload_test_output.txt" + filepathSampleOutput := "file_upload_sample_encrypted.txt" + filepathOutputDec := "file_upload_dec_output.txt" + + out, _ := os.Create(filepathOutput) + file, err := os.Open(filepathInput) + if err != nil { + panic(err) + } + utils.EncryptFile("enigma", []byte{133, 126, 158, 123, 43, 95, 96, 90, 215, 178, 17, 73, 166, 130, 79, 156}, out, file) + fileText, _ := ioutil.ReadFile(filepathOutput) + + fileTextSample, _ := ioutil.ReadFile(filepathSampleOutput) + assert.Equal(string(fileTextSample), string(fileText)) + + outDec, _ := os.Open(filepathSampleOutput) + fi, _ := outDec.Stat() + contentLenEnc := fi.Size() + defer outDec.Close() + + fileDec, _ := os.Create(filepathOutputDec) + defer fileDec.Close() + r, w := io.Pipe() + utils.DecryptFile("enigma", contentLenEnc, outDec, w) + io.Copy(fileDec, r) + fileTextDec, _ := ioutil.ReadFile(filepathOutputDec) + fileTextIn, _ := ioutil.ReadFile(filepathInput) + assert.Equal(fileTextIn, fileTextDec) +} + func unpadPKCS7(data []byte) ([]byte, error) { blocklen := 16 if len(data)%blocklen != 0 || len(data) == 0 { @@ -721,5 +749,4 @@ func FileDownload3Test(t *testing.T) { fmt.Println("after reader") fmt.Println("---FileTextOut---") // fmt.Println(string(fileTextOut)) - } diff --git a/tests/e2e/helper.go b/tests/e2e/helper.go index a0c2c036..668cb110 100644 --- a/tests/e2e/helper.go +++ b/tests/e2e/helper.go @@ -14,6 +14,7 @@ import ( "testing" "time" + godotenv "github.com/joho/godotenv" pubnub "github.com/pubnub/go/v7" "github.com/stretchr/testify/assert" ) @@ -100,6 +101,30 @@ func logInTest(format string, a ...interface{}) (n int, err error) { return 0, nil } +func subscribeWithATimeout(t *testing.T, pn *pubnub.PubNub, channel string, duration time.Duration) error { + listener := pubnub.NewListener() + pn.AddListener(listener) + pn.Subscribe().Channels([]string{channel}).Execute() + timer := time.NewTimer(duration) + select { + case s := <-listener.Status: + timer.Stop() + if s.Category == pubnub.PNConnectedCategory { + pn.RemoveListener(listener) + return nil + } else { + errMsg := fmt.Sprintf("didn't receive connected but %s", s.Category) + t.Error(errMsg) + return errors.New(errMsg) + } + case <-timer.C: + timer.Stop() + errMsg := "connected didn't came in desired time" + t.Error(errMsg) + return errors.New(errMsg) + } +} + func checkForAsserted(t *testing.T, maxTime, intervalTime time.Duration, fun func() error) { } diff --git a/tests/e2e/subscribe_test.go b/tests/e2e/subscribe_test.go index 64719a90..813dfd4d 100644 --- a/tests/e2e/subscribe_test.go +++ b/tests/e2e/subscribe_test.go @@ -2360,83 +2360,45 @@ func TestSubscribeSuperCall(t *testing.T) { exitListener <- true } -func ReconnectionExhaustion(t *testing.T) { - assert := assert.New(t) - doneSubscribe := make(chan bool) - errChan := make(chan string) +func TestPublishAndSubscribeWithSpaceIdAndType(t *testing.T) { + pn := pubnub.NewPubNub(configCopy()) + channel := randomized("channel") + expectedSpaceId := pubnub.SpaceId("spaceId") + expectedType := "typ" - interceptor := stubs.NewInterceptor() - interceptor.AddStub(&stubs.Stub{ - Method: "GET", - Path: fmt.Sprintf("/v2/subscribe/%s/ch/0", config.SubscribeKey), - ResponseBody: "", - Query: "heartbeat=300", - IgnoreQueryKeys: []string{"pnsdk", "uuid"}, - ResponseStatusCode: 400, - }) - interceptor.AddStub(&stubs.Stub{ - Method: "GET", - Path: fmt.Sprintf("/v2/presence/sub-key/%s/channel/ch/leave", config.SubscribeKey), - ResponseBody: `{"status": 200, "message": "OK", "action": "leave", "service": "Presence"}`, - Query: "", - IgnoreQueryKeys: []string{"pnsdk", "uuid"}, - ResponseStatusCode: 200, - }) - config.MaximumReconnectionRetries = 1 - config.PNReconnectionPolicy = pubnub.PNLinearPolicy + err := subscribeWithATimeout(t, pn, channel, time.Second*1) + if err != nil { + return + } - pn := pubnub.NewPubNub(config) - //pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) - pn.Config.AuthKey = "myAuthKey" - pn.SetSubscribeClient(interceptor.GetClient()) listener := pubnub.NewListener() - count := 0 - exitListener := make(chan bool) - - go func() { - ExitLabel: - for { - select { - case status := <-listener.Status: - - switch status.Category { - case pubnub.PNReconnectionAttemptsExhausted: - doneSubscribe <- true - default: - //if count > 1 { - //errChan <- fmt.Sprintf("Non PNReconnectedCategory event, %s", status) - //fmt.Println(status) - //} - } - count++ - case <-listener.Message: - errChan <- "Got message while awaiting for a status event" - return - case <-listener.Presence: - errChan <- "Got presence while awaiting for a status event" - return - case <-exitListener: - break ExitLabel - - } - } - }() - pn.AddListener(listener) - pn.Subscribe(). - Channels([]string{"ch"}). + _, _, err = pn.Publish(). + Channel(channel). + Message("msg"). + SpaceId(expectedSpaceId). + Type(expectedType). Execute() - tic := time.NewTicker(time.Duration(timeout) * time.Second) - select { - case <-doneSubscribe: - fmt.Println("doneSubscribe") - case err := <-errChan: - assert.Fail(err) - case <-tic.C: - tic.Stop() - assert.Fail("timeout") + if err != nil { + t.Error("Publish failed", err) + return + } + timer := time.NewTimer(time.Second * 3) + stop := false + for !stop { + select { + case <-listener.Status: + //just ignore + case msg := <-listener.Message: + timer.Stop() + stop = true + assert.Equal(t, expectedSpaceId, msg.SpaceId) + assert.Equal(t, expectedType, msg.Type) + case <-timer.C: + t.Error("Didn't receive message in expected time") + return + } } - exitListener <- true }