Skip to content

Commit

Permalink
Merge pull request #911 from mysteriumnetwork/feature/794-tequilapi-n…
Browse files Browse the repository at this point in the history
…at-status

Add tequilapi endpoint for NAT traversal status
  • Loading branch information
vkuznecovas authored Apr 26, 2019
2 parents bc16393 + 3d24215 commit 0184237
Show file tree
Hide file tree
Showing 14 changed files with 386 additions and 56 deletions.
3 changes: 3 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 33 additions & 11 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/mysteriumnetwork/node/metrics"
"github.com/mysteriumnetwork/node/money"
"github.com/mysteriumnetwork/node/nat"
"github.com/mysteriumnetwork/node/nat/event"
"github.com/mysteriumnetwork/node/nat/mapping"
"github.com/mysteriumnetwork/node/nat/traversal"
"github.com/mysteriumnetwork/node/nat/traversal/config"
Expand Down Expand Up @@ -95,14 +96,20 @@ type NatPinger interface {

// NatEventTracker is responsible for tracking NAT events
type NatEventTracker interface {
ConsumeNATEvent(event traversal.Event)
LastEvent() *traversal.Event
WaitForEvent() traversal.Event
ConsumeNATEvent(event event.Event)
LastEvent() *event.Event
WaitForEvent() event.Event
}

// NatEventSender is responsible for sending NAT events to metrics server
type NatEventSender interface {
ConsumeNATEvent(event traversal.Event)
ConsumeNATEvent(event event.Event)
}

// NATStatusTracker tracks status of NAT traversal by consuming NAT events
type NATStatusTracker interface {
Status() nat.Status
ConsumeNATEvent(event event.Event)
}

// Dependencies is DI container for top level components which is reused in several places
Expand Down Expand Up @@ -140,9 +147,10 @@ type Dependencies struct {
ServiceRegistry *service.Registry
ServiceSessionStorage *session.StorageMemory

NATPinger NatPinger
NATTracker NatEventTracker
NATEventSender NatEventSender
NATPinger NatPinger
NATTracker NatEventTracker
NATEventSender NatEventSender
NATStatusTracker NATStatusTracker

PortPool *port.Pool

Expand Down Expand Up @@ -299,11 +307,15 @@ func (di *Dependencies) subscribeEventConsumers() error {
}

// NAT events
err = di.EventBus.Subscribe(traversal.EventTopic, di.NATEventSender.ConsumeNATEvent)
err = di.EventBus.Subscribe(event.Topic, di.NATEventSender.ConsumeNATEvent)
if err != nil {
return err
}
err = di.EventBus.Subscribe(event.Topic, di.NATTracker.ConsumeNATEvent)
if err != nil {
return err
}
return di.EventBus.Subscribe(traversal.EventTopic, di.NATTracker.ConsumeNATEvent)
return di.EventBus.Subscribe(event.Topic, di.NATStatusTracker.ConsumeNATEvent)
}

func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options) {
Expand Down Expand Up @@ -344,6 +356,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options) {
tequilapi_endpoints.AddRoutesForServiceSessions(router, di.ServiceSessionStorage)
tequilapi_endpoints.AddRoutesForPayout(router, di.IdentityManager, di.SignerFactory, di.MysteriumAPI)
tequilapi_endpoints.AddRoutesForAccessPolicies(router, nodeOptions.AccessPolicyEndpointAddress)
tequilapi_endpoints.AddRoutesForNAT(router, di.NATStatusTracker.Status)
identity_registry.AddIdentityRegistrationEndpoint(router, di.IdentityRegistration, di.IdentityRegistry)

corsPolicy := tequilapi.NewMysteriumCorsPolicy()
Expand Down Expand Up @@ -485,8 +498,7 @@ func (di *Dependencies) bootstrapMetrics(options node.Options) {
}

func (di *Dependencies) bootstrapNATComponents(options node.Options) {
di.NATTracker = traversal.NewEventsTracker()
di.NATEventSender = traversal.NewEventsSender(di.MetricsSender, di.IPResolver.GetPublicIP)
di.NATTracker = event.NewTracker()
if options.ExperimentNATPunching {
di.NATPinger = traversal.NewPingerFactory(
di.NATTracker,
Expand All @@ -499,4 +511,14 @@ func (di *Dependencies) bootstrapNATComponents(options node.Options) {
} else {
di.NATPinger = &traversal.NoopPinger{}
}

di.NATEventSender = event.NewSender(di.MetricsSender, di.IPResolver.GetPublicIP)

var lastStageName string
if options.ExperimentNATPunching {
lastStageName = traversal.StageName
} else {
lastStageName = mapping.StageName
}
di.NATStatusTracker = nat.NewStatusTracker(lastStageName)
}
24 changes: 12 additions & 12 deletions nat/traversal/events_sender.go → nat/event/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package traversal
package event

import (
log "github.com/cihub/seelog"
)

const eventsSenderLogPrefix = "[traversal-events-sender] "
const senderLogPrefix = "[traversal-events-sender] "

// EventsSender allows subscribing to NAT events and sends them to server
type EventsSender struct {
// Sender allows subscribing to NAT events and sends them to server
type Sender struct {
metricsSender metricsSender
ipResolver ipResolver
lastIp string
Expand All @@ -38,16 +38,16 @@ type metricsSender interface {

type ipResolver func() (string, error)

// NewEventsSender returns a new instance of events sender
func NewEventsSender(metricsSender metricsSender, ipResolver ipResolver) *EventsSender {
return &EventsSender{metricsSender: metricsSender, ipResolver: ipResolver, lastIp: ""}
// NewSender returns a new instance of events sender
func NewSender(metricsSender metricsSender, ipResolver ipResolver) *Sender {
return &Sender{metricsSender: metricsSender, ipResolver: ipResolver, lastIp: ""}
}

// ConsumeNATEvent sends received event to server
func (es *EventsSender) ConsumeNATEvent(event Event) {
func (es *Sender) ConsumeNATEvent(event Event) {
publicIP, err := es.ipResolver()
if err != nil {
log.Warnf(eventsSenderLogPrefix, "resolving public ip failed: ", err)
log.Warnf(senderLogPrefix, "resolving public ip failed: ", err)
return
}
if publicIP == es.lastIp && es.matchesLastEvent(event) {
Expand All @@ -56,22 +56,22 @@ func (es *EventsSender) ConsumeNATEvent(event Event) {

err = es.sendNATEvent(event)
if err != nil {
log.Warnf(eventsSenderLogPrefix, "sending event failed: ", err)
log.Warnf(senderLogPrefix, "sending event failed: ", err)
}

es.lastIp = publicIP
es.lastEvent = &event
}

func (es *EventsSender) sendNATEvent(event Event) error {
func (es *Sender) sendNATEvent(event Event) error {
if event.Successful {
return es.metricsSender.SendNATMappingSuccessEvent(event.Stage)
}

return es.metricsSender.SendNATMappingFailEvent(event.Stage, event.Error)
}

func (es *EventsSender) matchesLastEvent(event Event) bool {
func (es *Sender) matchesLastEvent(event Event) bool {
if es.lastEvent == nil {
return false
}
Expand Down
16 changes: 8 additions & 8 deletions nat/traversal/events_sender_test.go → nat/event/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package traversal
package event

import (
"errors"
Expand Down Expand Up @@ -60,7 +60,7 @@ func (resolver *mockIPResolver) GetPublicIP() (string, error) {
func Test_EventsSender_ConsumeNATEvent_SendsSuccessEvent(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Stage: "hole_punching", Successful: true})

Expand All @@ -71,7 +71,7 @@ func Test_EventsSender_ConsumeNATEvent_SendsSuccessEvent(t *testing.T) {
func Test_EventsSender_ConsumeNATEvent_WithSameIp_DoesNotSendSuccessEventAgain(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})

Expand All @@ -84,7 +84,7 @@ func Test_EventsSender_ConsumeNATEvent_WithSameIp_DoesNotSendSuccessEventAgain(t
func Test_EventsSender_ConsumeNATEvent_WithDifferentIP_SendsSuccessEventAgain(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})

Expand All @@ -98,7 +98,7 @@ func Test_EventsSender_ConsumeNATEvent_WithDifferentIP_SendsSuccessEventAgain(t
func Test_EventsSender_ConsumeNATEvent_WhenIPResolverFails_DoesNotSendEvent(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockErr: errors.New("mock error")}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})

Expand All @@ -108,7 +108,7 @@ func Test_EventsSender_ConsumeNATEvent_WhenIPResolverFails_DoesNotSendEvent(t *t
func Test_EventsSender_ConsumeNATEvent_SendsFailureEvent(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

testErr := errors.New("test error")
sender.ConsumeNATEvent(Event{Stage: "hole_punching", Successful: false, Error: testErr})
Expand All @@ -120,7 +120,7 @@ func Test_EventsSender_ConsumeNATEvent_SendsFailureEvent(t *testing.T) {
func Test_EventsSender_ConsumeNATEvent_WithFailuresOfDifferentStages_SendsBothEvents(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

testErr1 := errors.New("test error 1")
sender.ConsumeNATEvent(Event{Successful: false, Error: testErr1, Stage: "test 1"})
Expand All @@ -133,7 +133,7 @@ func Test_EventsSender_ConsumeNATEvent_WithFailuresOfDifferentStages_SendsBothEv
func Test_EventsSender_ConsumeNATEvent_WithSuccessAndFailureOnSameIp_SendsBothEvents(t *testing.T) {
mockMetricsSender := buildMockMetricsSender(nil)
mockIPResolver := &mockIPResolver{mockIp: "1st ip"}
sender := NewEventsSender(mockMetricsSender, mockIPResolver.GetPublicIP)
sender := NewSender(mockMetricsSender, mockIPResolver.GetPublicIP)

sender.ConsumeNATEvent(Event{Successful: true})
testErr := errors.New("test error")
Expand Down
26 changes: 13 additions & 13 deletions nat/traversal/events_tracker.go → nat/event/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package traversal
package event

import (
"time"

log "github.com/cihub/seelog"
)

// EventTopic the topic that traversal events are published on
const EventTopic = "Traversal"
// Topic the topic that traversal events are published on
const Topic = "Traversal"

const eventsTrackerLogPrefix = "[traversal-events-tracker] "

// EventsTracker is able to track NAT traversal events
type EventsTracker struct {
// Tracker is able to track NAT traversal events
type Tracker struct {
lastEvent *Event
eventChan chan Event
}

// BuildSuccessEvent returns new event for successful NAT traversal
func BuildSuccessEvent(stage string) Event {
// BuildSuccessfulEvent returns new event for successful NAT traversal
func BuildSuccessfulEvent(stage string) Event {
return Event{Stage: stage, Successful: true}
}

Expand All @@ -44,13 +44,13 @@ func BuildFailureEvent(stage string, err error) Event {
return Event{Stage: stage, Successful: false, Error: err}
}

// NewEventsTracker returns a new instance of event tracker
func NewEventsTracker() *EventsTracker {
return &EventsTracker{eventChan: make(chan Event, 1)}
// NewTracker returns a new instance of event tracker
func NewTracker() *Tracker {
return &Tracker{eventChan: make(chan Event, 1)}
}

// ConsumeNATEvent consumes a NAT event
func (et *EventsTracker) ConsumeNATEvent(event Event) {
func (et *Tracker) ConsumeNATEvent(event Event) {
log.Info(eventsTrackerLogPrefix, "got NAT event: ", event)

et.lastEvent = &event
Expand All @@ -61,13 +61,13 @@ func (et *EventsTracker) ConsumeNATEvent(event Event) {
}

// LastEvent returns the last known event and boolean flag, indicating if such event exists
func (et *EventsTracker) LastEvent() *Event {
func (et *Tracker) LastEvent() *Event {
log.Info(eventsTrackerLogPrefix, "getting last NAT event: ", et.lastEvent)
return et.lastEvent
}

// WaitForEvent waits for event to occur
func (et *EventsTracker) WaitForEvent() Event {
func (et *Tracker) WaitForEvent() Event {
if et.lastEvent != nil {
return *et.lastEvent
}
Expand Down
6 changes: 3 additions & 3 deletions nat/mapping/port_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

log "github.com/cihub/seelog"
portmap "github.com/ethereum/go-ethereum/p2p/nat"
"github.com/mysteriumnetwork/node/nat/traversal"
"github.com/mysteriumnetwork/node/nat/event"
)

const logPrefix = "[port mapping] "
Expand Down Expand Up @@ -88,11 +88,11 @@ func addMapping(m portmap.Interface, protocol string, extPort, intPort int, name
log.Debugf("%s Couldn't add port mapping for port %d: %v, retrying with permanent lease", logPrefix, extPort, err)
if err := m.AddMapping(protocol, extPort, intPort, name, 0); err != nil {
// some gateways support only permanent leases
publisher.Publish(traversal.EventTopic, traversal.BuildFailureEvent(StageName, err))
publisher.Publish(event.Topic, event.BuildFailureEvent(StageName, err))
log.Debugf("%s Couldn't add port mapping for port %d: %v", logPrefix, extPort, err)
return
}
}
publisher.Publish(traversal.EventTopic, traversal.BuildSuccessEvent(StageName))
publisher.Publish(event.Topic, event.BuildSuccessfulEvent(StageName))
log.Info(logPrefix, "Mapped network port: ", extPort)
}
Loading

0 comments on commit 0184237

Please sign in to comment.