Skip to content

Commit

Permalink
separate the clientid and sourceid for mqtt source client (#305)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey authored Dec 21, 2023
1 parent 5aa27d6 commit 66fe32e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
18 changes: 10 additions & 8 deletions cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ type mqttAgentOptions struct {
}

func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
mqttAgentOptions := &mqttAgentOptions{
MQTTOptions: *mqttOptions,
errorChan: make(chan error),
clusterName: clusterName,
agentID: agentID,
}

return &options.CloudEventsAgentOptions{
CloudEventsOptions: &mqttAgentOptions{
MQTTOptions: *mqttOptions,
errorChan: make(chan error),
clusterName: clusterName,
agentID: agentID,
},
AgentID: agentID,
ClusterName: clusterName,
CloudEventsOptions: mqttAgentOptions,
AgentID: mqttAgentOptions.agentID,
ClusterName: mqttAgentOptions.clusterName,
}
}

Expand Down
20 changes: 12 additions & 8 deletions cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ type mqttSourceOptions struct {
MQTTOptions
errorChan chan error
sourceID string
clientID string
}

func NewSourceOptions(mqttOptions *MQTTOptions, sourceID string) *options.CloudEventsSourceOptions {
func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
mqttSourceOptions := &mqttSourceOptions{
MQTTOptions: *mqttOptions,
errorChan: make(chan error),
sourceID: sourceID,
clientID: clientID,
}

return &options.CloudEventsSourceOptions{
CloudEventsOptions: &mqttSourceOptions{
MQTTOptions: *mqttOptions,
errorChan: make(chan error),
sourceID: sourceID,
},
SourceID: sourceID,
CloudEventsOptions: mqttSourceOptions,
SourceID: mqttSourceOptions.sourceID,
}
}

Expand Down Expand Up @@ -56,7 +60,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) {
receiver, err := o.GetCloudEventsClient(
ctx,
fmt.Sprintf("%s-client", o.sourceID),
o.clientID,
func(err error) {
o.errorChan <- err
},
Expand Down
3 changes: 2 additions & 1 deletion test/integration/cloudevents/source/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ func statusHashGetter(obj *Resource) (string, error) {
}

func StartMQTTResourceSourceClient(ctx context.Context, config *mqtt.MQTTOptions, eventHub *EventHub) (generic.CloudEventsClient[*Resource], error) {
sourceID := "integration-test"
client, err := generic.NewCloudEventSourceClient[*Resource](
ctx,
mqtt.NewSourceOptions(config, "integration-test"),
mqtt.NewSourceOptions(config, fmt.Sprintf("%s-client", sourceID), sourceID),
&resourceLister{},
statusHashGetter,
&resourceCodec{},
Expand Down

0 comments on commit 66fe32e

Please sign in to comment.