diff --git a/cmd/di.go b/cmd/di.go index f5c1b5dd11..bff18c2f56 100644 --- a/cmd/di.go +++ b/cmd/di.go @@ -37,10 +37,9 @@ import ( "github.com/mysteriumnetwork/node/core/auth" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/discovery" - discovery_api "github.com/mysteriumnetwork/node/core/discovery/api" - discovery_broker "github.com/mysteriumnetwork/node/core/discovery/broker" - discovery_composite "github.com/mysteriumnetwork/node/core/discovery/composite" - discovery_noop "github.com/mysteriumnetwork/node/core/discovery/noop" + "github.com/mysteriumnetwork/node/core/discovery/apidiscovery" + "github.com/mysteriumnetwork/node/core/discovery/brokerdiscovery" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/ip" "github.com/mysteriumnetwork/node/core/location" "github.com/mysteriumnetwork/node/core/node" @@ -132,9 +131,9 @@ type Dependencies struct { IdentityRegistry identity_registry.IdentityRegistry IdentitySelector identity_selector.Handler - DiscoveryFactory service.DiscoveryFactory - DiscoveryStorage *discovery.ProposalStorage - DiscoveryFinder discovery.ProposalFinder + DiscoveryFactory service.DiscoveryFactory + ProposalRepository proposal.Repository + DiscoveryWorker brokerdiscovery.Worker QualityMetricsSender *quality.Sender QualityClient *quality.MysteriumMORQA @@ -266,7 +265,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error { if err = di.subscribeEventConsumers(); err != nil { return err } - if err = di.DiscoveryFinder.Start(); err != nil { + if err = di.DiscoveryWorker.Start(); err != nil { return err } if err := di.Node.Start(); err != nil { @@ -367,8 +366,8 @@ func (di *Dependencies) Shutdown() (err error) { errs = append(errs, err) } } - if di.DiscoveryFinder != nil { - di.DiscoveryFinder.Stop() + if di.DiscoveryWorker != nil { + di.DiscoveryWorker.Stop() } if di.Storage != nil { if err := di.Storage.Close(); err != nil { @@ -557,10 +556,10 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne tequilapi_endpoints.AddRouteForStop(router, utils.SoftKiller(di.Shutdown)) tequilapi_endpoints.AddRoutesForAuthentication(router, di.Authenticator, di.JWTAuthenticator) tequilapi_endpoints.AddRoutesForIdentities(router, di.IdentityManager, di.IdentitySelector, di.IdentityRegistry, nodeOptions.Transactor.RegistryAddress, channelImplementation, di.ConsumerBalanceTracker.GetBalance) - tequilapi_endpoints.AddRoutesForConnection(router, di.ConnectionManager, di.StatisticsTracker, di.DiscoveryStorage, di.IdentityRegistry) + tequilapi_endpoints.AddRoutesForConnection(router, di.ConnectionManager, di.StatisticsTracker, di.ProposalRepository, di.IdentityRegistry) tequilapi_endpoints.AddRoutesForConnectionSessions(router, di.SessionStorage) tequilapi_endpoints.AddRoutesForConnectionLocation(router, di.ConnectionManager, di.IPResolver, di.LocationResolver, di.LocationResolver) - tequilapi_endpoints.AddRoutesForProposals(router, di.DiscoveryStorage, di.QualityClient) + tequilapi_endpoints.AddRoutesForProposals(router, di.ProposalRepository, di.QualityClient) tequilapi_endpoints.AddRoutesForService(router, di.ServicesManager, serviceTypesRequestParser, nodeOptions.AccessPolicyEndpointAddress) tequilapi_endpoints.AddRoutesForServiceSessions(router, di.StateKeeper) tequilapi_endpoints.AddRoutesForPayout(router, di.IdentityManager, di.SignerFactory, di.MysteriumAPI) @@ -729,37 +728,23 @@ func (di *Dependencies) bootstrapIdentityComponents(options node.Options) { } func (di *Dependencies) bootstrapDiscoveryComponents(options node.OptionsDiscovery) error { - di.DiscoveryStorage = discovery.NewStorage() - - discoveryRegistry := discovery_composite.NewRegistry() - discoveryFinder := discovery_composite.NewFinder() + proposalRepository := discovery.NewRepository() + discoveryRegistry := discovery.NewRegistry() for _, discoveryType := range options.Types { switch discoveryType { case node.DiscoveryTypeAPI: - discoveryRegistry.AddRegistry( - discovery_api.NewRegistry(di.MysteriumAPI), - ) - - if !options.ProposalFetcherEnabled { - discoveryFinder.AddFinder(discovery_noop.NewFinder()) - } else { - discoveryFinder.AddFinder( - discovery_api.NewFinder(di.DiscoveryStorage, di.MysteriumAPI.Proposals, options.FetchInterval), - ) - } + discoveryRegistry.AddRegistry(apidiscovery.NewRegistry(di.MysteriumAPI)) + proposalRepository.Add(apidiscovery.NewRepository(di.MysteriumAPI)) case node.DiscoveryTypeBroker: - discoveryRegistry.AddRegistry( - discovery_broker.NewRegistry(di.BrokerConnection), - ) - discoveryFinder.AddFinder( - discovery_broker.NewFinder(di.DiscoveryStorage, di.BrokerConnection, options.PingInterval+time.Second, 1*time.Second), - ) + discoveryRegistry.AddRegistry(brokerdiscovery.NewRegistry(di.BrokerConnection)) + brokerRepository := brokerdiscovery.NewRepository(di.BrokerConnection, options.PingInterval+time.Second, 1*time.Second) + proposalRepository.Add(brokerRepository) + di.DiscoveryWorker = brokerRepository default: return errors.Errorf("unknown discovery adapter: %s", discoveryType) } } - - di.DiscoveryFinder = discoveryFinder + di.ProposalRepository = proposalRepository di.DiscoveryFactory = func() service.Discovery { return discovery.NewService(di.IdentityRegistry, discoveryRegistry, options.PingInterval, di.SignerFactory, di.EventBus) } diff --git a/core/discovery/api/finder.go b/core/discovery/api/finder.go deleted file mode 100644 index 4f0bf13cca..0000000000 --- a/core/discovery/api/finder.go +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package api - -import ( - "time" - - "github.com/mysteriumnetwork/node/core/discovery" - "github.com/mysteriumnetwork/node/market" - "github.com/rs/zerolog/log" -) - -// FetchCallback does real fetch of proposals through Mysterium API -type FetchCallback func() ([]market.ServiceProposal, error) - -// finderAPI represents async proposal fetcher from Mysterium API -type finderAPI struct { - fetch FetchCallback - fetchInterval time.Duration - fetchShutdown chan bool - - proposalStorage *discovery.ProposalStorage -} - -// NewFinder create instance of API finder -func NewFinder(proposalsStorage *discovery.ProposalStorage, callback FetchCallback, interval time.Duration) *finderAPI { - return &finderAPI{ - fetch: callback, - fetchInterval: interval, - - proposalStorage: proposalsStorage, - } -} - -// Start begins fetching proposals to storage -func (fa *finderAPI) Start() error { - go func() { - if err := fa.fetchDo(); err != nil { - log.Warn().Err(err).Msg("Initial proposal fetch failed, continuing") - } - }() - - fa.fetchShutdown = make(chan bool, 1) - go fa.fetchLoop() - - return nil -} - -// Stop ends fetching proposals to storage -func (fa *finderAPI) Stop() { - fa.fetchShutdown <- true -} - -func (fa *finderAPI) fetchLoop() { - for { - select { - case <-fa.fetchShutdown: - break - case <-time.After(fa.fetchInterval): - _ = fa.fetchDo() - } - } -} - -func (fa *finderAPI) fetchDo() error { - proposals, err := fa.fetch() - if err != nil { - log.Warn().Err(err).Msg("Failed to fetch proposals") - return err - } - - log.Debug().Msgf("Proposals fetched: %d", len(proposals)) - fa.proposalStorage.AddProposal(proposals...) - return nil -} diff --git a/core/discovery/api/finder_test.go b/core/discovery/api/finder_test.go deleted file mode 100644 index a4f45c7cb3..0000000000 --- a/core/discovery/api/finder_test.go +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package api - -import ( - "sync" - "testing" - "time" - - "github.com/mysteriumnetwork/node/core/discovery" - "github.com/mysteriumnetwork/node/market" - "github.com/stretchr/testify/assert" -) - -var ( - proposalFirst = market.ServiceProposal{ProviderID: "0x1"} - proposalSecond = market.ServiceProposal{ProviderID: "0x2"} - proposalsCurrent = fetchCallback{} -) - -type fetchCallback struct { - proposalsMock []market.ServiceProposal - mutex sync.RWMutex -} - -func (callback *fetchCallback) Mock(proposals ...market.ServiceProposal) { - callback.mutex.Lock() - defer callback.mutex.Unlock() - callback.proposalsMock = proposals -} - -func (callback *fetchCallback) Fetch() ([]market.ServiceProposal, error) { - callback.mutex.Lock() - defer callback.mutex.Unlock() - return callback.proposalsMock, nil -} - -func waitForInitialFetch() { - time.Sleep(2*time.Second + 100*time.Millisecond) -} - -func Test_Fetcher_StartFetchesInitialProposals(t *testing.T) { - storage := discovery.NewStorage() - fetcher := NewFinder(storage, proposalsCurrent.Fetch, time.Hour) - - proposalsCurrent.Mock(proposalFirst, proposalSecond) - err := fetcher.Start() - defer fetcher.Stop() - assert.NoError(t, err) - - waitForInitialFetch() - - // Initial fetch should be done after initial delay - assert.Len(t, storage.Proposals(), 2) - assert.Exactly(t, []market.ServiceProposal{proposalFirst, proposalSecond}, storage.Proposals()) -} - -func Test_Fetcher_StartFetchesNewProposals(t *testing.T) { - storage := discovery.NewStorage() - fetcher := NewFinder(storage, proposalsCurrent.Fetch, time.Millisecond) - - err := fetcher.Start() - defer fetcher.Stop() - assert.NoError(t, err) - - waitForInitialFetch() - - // Following fetches should be done asynchronously - proposalsCurrent.Mock(proposalFirst, proposalSecond) - waitABit() - - assert.Len(t, storage.Proposals(), 2) - assert.Exactly(t, []market.ServiceProposal{proposalFirst, proposalSecond}, storage.Proposals()) -} - -func waitForProposal(t *testing.T, proposalsChan chan market.ServiceProposal) market.ServiceProposal { - select { - case proposal := <-proposalsChan: - return proposal - case <-time.After(20 * time.Millisecond): - t.Log("Proposal not fetched") - return market.ServiceProposal{} - } -} - -func waitABit() { - //usually time.Sleep call gives a chance for other goroutines to kick in - //important when testing async code - time.Sleep(1 * time.Millisecond) -} diff --git a/core/discovery/api/registry.go b/core/discovery/apidiscovery/registry.go similarity index 98% rename from core/discovery/api/registry.go rename to core/discovery/apidiscovery/registry.go index bca052aaa5..337103ae2c 100644 --- a/core/discovery/api/registry.go +++ b/core/discovery/apidiscovery/registry.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package api +package apidiscovery import ( "github.com/mysteriumnetwork/node/identity" diff --git a/core/discovery/apidiscovery/repository.go b/core/discovery/apidiscovery/repository.go new file mode 100644 index 0000000000..bde80ed14c --- /dev/null +++ b/core/discovery/apidiscovery/repository.go @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package apidiscovery + +import ( + "fmt" + + "github.com/mysteriumnetwork/node/core/discovery/proposal" + "github.com/mysteriumnetwork/node/market" + "github.com/mysteriumnetwork/node/market/mysterium" +) + +type apiRepository struct { + discoveryAPI *mysterium.MysteriumAPI +} + +// NewRepository constructs a new proposal repository (backed by API). +func NewRepository(api *mysterium.MysteriumAPI) *apiRepository { + return &apiRepository{discoveryAPI: api} +} + +// Proposal returns proposal by ID. +func (a *apiRepository) Proposal(id market.ProposalID) (*market.ServiceProposal, error) { + proposals, err := a.discoveryAPI.QueryProposals(mysterium.ProposalsQuery{ + NodeKey: id.ProviderID, + ServiceType: id.ServiceType, + }) + if err != nil { + return nil, err + } + if len(proposals) != 1 { + return nil, fmt.Errorf("proposal does not exist: %+v", id) + } + return &proposals[0], nil +} + +// Proposals returns proposals matching filter. +func (a *apiRepository) Proposals(filter *proposal.Filter) ([]market.ServiceProposal, error) { + proposals, err := a.discoveryAPI.Proposals() + if err != nil { + return nil, err + } + var filtered []market.ServiceProposal + for _, p := range proposals { + if filter.Matches(p) { + filtered = append(filtered, p) + } + } + return filtered, nil +} diff --git a/core/discovery/broker/messaging_ping.go b/core/discovery/brokerdiscovery/messaging_ping.go similarity index 98% rename from core/discovery/broker/messaging_ping.go rename to core/discovery/brokerdiscovery/messaging_ping.go index 2cca997846..e10b2c278d 100644 --- a/core/discovery/broker/messaging_ping.go +++ b/core/discovery/brokerdiscovery/messaging_ping.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "github.com/mysteriumnetwork/node/communication" diff --git a/core/discovery/broker/messaging_register.go b/core/discovery/brokerdiscovery/messaging_register.go similarity index 98% rename from core/discovery/broker/messaging_register.go rename to core/discovery/brokerdiscovery/messaging_register.go index 979b4603e7..ebc909542e 100644 --- a/core/discovery/broker/messaging_register.go +++ b/core/discovery/brokerdiscovery/messaging_register.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "github.com/mysteriumnetwork/node/communication" diff --git a/core/discovery/broker/messaging_unregister.go b/core/discovery/brokerdiscovery/messaging_unregister.go similarity index 98% rename from core/discovery/broker/messaging_unregister.go rename to core/discovery/brokerdiscovery/messaging_unregister.go index a4f45acb9c..6d221e4ac5 100644 --- a/core/discovery/broker/messaging_unregister.go +++ b/core/discovery/brokerdiscovery/messaging_unregister.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "github.com/mysteriumnetwork/node/communication" diff --git a/core/discovery/broker/registry.go b/core/discovery/brokerdiscovery/registry.go similarity index 98% rename from core/discovery/broker/registry.go rename to core/discovery/brokerdiscovery/registry.go index d9a2831531..4850fe5973 100644 --- a/core/discovery/broker/registry.go +++ b/core/discovery/brokerdiscovery/registry.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "github.com/mysteriumnetwork/node/communication" diff --git a/core/discovery/broker/registry_test.go b/core/discovery/brokerdiscovery/registry_test.go similarity index 99% rename from core/discovery/broker/registry_test.go rename to core/discovery/brokerdiscovery/registry_test.go index 1bcee35fe7..374dec75e0 100644 --- a/core/discovery/broker/registry_test.go +++ b/core/discovery/brokerdiscovery/registry_test.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "encoding/json" diff --git a/core/discovery/broker/finder.go b/core/discovery/brokerdiscovery/repository.go similarity index 72% rename from core/discovery/broker/finder.go rename to core/discovery/brokerdiscovery/repository.go index 4afcdfec7e..1590a8def7 100644 --- a/core/discovery/broker/finder.go +++ b/core/discovery/brokerdiscovery/repository.go @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "sync" @@ -23,13 +23,13 @@ import ( "github.com/mysteriumnetwork/node/communication" "github.com/mysteriumnetwork/node/communication/nats" - "github.com/mysteriumnetwork/node/core/discovery" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/market" ) -// finderBroker responsible for handling proposal events through Broker (Mysterium Communication) -type finderBroker struct { - storage *discovery.ProposalStorage +// Repository provides proposals from the broker. +type Repository struct { + storage *ProposalStorage receiver communication.Receiver timeoutInterval time.Duration @@ -39,15 +39,14 @@ type finderBroker struct { watchdogSeen map[market.ProposalID]time.Time } -// NewFinder returns new instance of broker finder. -func NewFinder( - proposalsStorage *discovery.ProposalStorage, +// NewRepository constructs a new proposal repository (backed by the broker). +func NewRepository( connection nats.Connection, proposalTimeoutInterval time.Duration, proposalCheckInterval time.Duration, -) *finderBroker { - return &finderBroker{ - storage: proposalsStorage, +) *Repository { + return &Repository{ + storage: NewStorage(), receiver: nats.NewReceiver(connection, communication.NewCodecJSON(), "*"), timeoutInterval: proposalTimeoutInterval, @@ -57,8 +56,18 @@ func NewFinder( } } +// Proposal returns a single proposal by its ID. +func (s *Repository) Proposal(id market.ProposalID) (*market.ServiceProposal, error) { + return s.storage.GetProposal(id) +} + +// Proposals returns proposals matching the filter. +func (s *Repository) Proposals(filter *proposal.Filter) ([]market.ServiceProposal, error) { + return s.storage.FindProposals(*filter) +} + // Start begins proposals synchronization to storage -func (s *finderBroker) Start() error { +func (s *Repository) Start() error { err := s.receiver.Receive(®isterConsumer{Callback: s.proposalRegisterMessage}) if err != nil { return err @@ -79,11 +88,11 @@ func (s *finderBroker) Start() error { } // Stop ends proposals synchronization to storage -func (s *finderBroker) Stop() { +func (s *Repository) Stop() { close(s.watchdogStop) } -func (s *finderBroker) proposalRegisterMessage(message registerMessage) error { +func (s *Repository) proposalRegisterMessage(message registerMessage) error { s.storage.AddProposal(message.Proposal) s.watchdogLock.Lock() @@ -93,7 +102,7 @@ func (s *finderBroker) proposalRegisterMessage(message registerMessage) error { return nil } -func (s *finderBroker) proposalUnregisterMessage(message unregisterMessage) error { +func (s *Repository) proposalUnregisterMessage(message unregisterMessage) error { s.storage.RemoveProposal(message.Proposal.UniqueID()) s.watchdogLock.Lock() @@ -103,7 +112,7 @@ func (s *finderBroker) proposalUnregisterMessage(message unregisterMessage) erro return nil } -func (s *finderBroker) proposalPingMessage(message pingMessage) error { +func (s *Repository) proposalPingMessage(message pingMessage) error { s.storage.AddProposal(message.Proposal) s.watchdogLock.Lock() @@ -113,7 +122,7 @@ func (s *finderBroker) proposalPingMessage(message pingMessage) error { return nil } -func (s *finderBroker) proposalWatchdog() { +func (s *Repository) proposalWatchdog() { for { select { case <-s.watchdogStop: diff --git a/core/discovery/broker/finder_test.go b/core/discovery/brokerdiscovery/repository_test.go similarity index 75% rename from core/discovery/broker/finder_test.go rename to core/discovery/brokerdiscovery/repository_test.go index 4fa8f1f15b..c1e99e1c07 100644 --- a/core/discovery/broker/finder_test.go +++ b/core/discovery/brokerdiscovery/repository_test.go @@ -15,14 +15,13 @@ * along with this program. If not, see . */ -package broker +package brokerdiscovery import ( "testing" "time" "github.com/mysteriumnetwork/node/communication/nats" - "github.com/mysteriumnetwork/node/core/discovery" "github.com/mysteriumnetwork/node/market" "github.com/stretchr/testify/assert" ) @@ -46,9 +45,7 @@ func Test_Subscriber_StartSyncsNewProposals(t *testing.T) { connection := nats.StartConnectionMock() defer connection.Close() - storage := discovery.NewStorage() - - subscriber := NewFinder(storage, connection, 10*time.Millisecond, 1*time.Millisecond) + subscriber := NewRepository(connection, 10*time.Millisecond, 1*time.Millisecond) err := subscriber.Start() defer subscriber.Stop() assert.NoError(t, err) @@ -58,25 +55,23 @@ func Test_Subscriber_StartSyncsNewProposals(t *testing.T) { }`) time.Sleep(5 * time.Millisecond) - assert.Len(t, storage.Proposals(), 1) - assert.Exactly(t, []market.ServiceProposal{proposalFirst}, storage.Proposals()) + assert.Len(t, subscriber.storage.Proposals(), 1) + assert.Exactly(t, []market.ServiceProposal{proposalFirst}, subscriber.storage.Proposals()) proposalRegister(connection, `{ "proposal": {"provider_id": "0x2"} }`) time.Sleep(1 * time.Millisecond) - assert.Len(t, storage.Proposals(), 2) - assert.Exactly(t, []market.ServiceProposal{proposalFirst, proposalSecond}, storage.Proposals()) + assert.Len(t, subscriber.storage.Proposals(), 2) + assert.Exactly(t, []market.ServiceProposal{proposalFirst, proposalSecond}, subscriber.storage.Proposals()) } func Test_Subscriber_StartSyncsIdleProposals(t *testing.T) { connection := nats.StartConnectionMock() defer connection.Close() - storage := discovery.NewStorage() - - subscriber := NewFinder(storage, connection, 10*time.Millisecond, 1*time.Millisecond) + subscriber := NewRepository(connection, 10*time.Millisecond, 1*time.Millisecond) err := subscriber.Start() defer subscriber.Stop() assert.NoError(t, err) @@ -86,16 +81,14 @@ func Test_Subscriber_StartSyncsIdleProposals(t *testing.T) { }`) time.Sleep(15 * time.Millisecond) - assert.Len(t, storage.Proposals(), 0) + assert.Empty(t, subscriber.storage.Proposals()) } func Test_Subscriber_StartSyncsHealthyProposals(t *testing.T) { connection := nats.StartConnectionMock() defer connection.Close() - storage := discovery.NewStorage() - - subscriber := NewFinder(storage, connection, 10*time.Millisecond, 1*time.Millisecond) + subscriber := NewRepository(connection, 10*time.Millisecond, 1*time.Millisecond) err := subscriber.Start() defer subscriber.Stop() assert.NoError(t, err) @@ -110,19 +103,16 @@ func Test_Subscriber_StartSyncsHealthyProposals(t *testing.T) { }`) time.Sleep(1 * time.Millisecond) - assert.Len(t, storage.Proposals(), 1) - assert.Exactly(t, []market.ServiceProposal{proposalFirst}, storage.Proposals()) + assert.Len(t, subscriber.storage.Proposals(), 1) + assert.Exactly(t, []market.ServiceProposal{proposalFirst}, subscriber.storage.Proposals()) } func Test_Subscriber_StartSyncsStoppedProposals(t *testing.T) { connection := nats.StartConnectionMock() defer connection.Close() - storage := discovery.NewStorage() - storage.AddProposal(proposalFirst) - storage.AddProposal(proposalSecond) - - subscriber := NewFinder(storage, connection, 10*time.Millisecond, 1*time.Millisecond) + subscriber := NewRepository(connection, 10*time.Millisecond, 1*time.Millisecond) + subscriber.storage.AddProposal(proposalFirst, proposalSecond) err := subscriber.Start() defer subscriber.Stop() assert.NoError(t, err) @@ -132,8 +122,8 @@ func Test_Subscriber_StartSyncsStoppedProposals(t *testing.T) { }`) time.Sleep(1 * time.Millisecond) - assert.Len(t, storage.Proposals(), 1) - assert.Exactly(t, []market.ServiceProposal{proposalSecond}, storage.Proposals()) + assert.Len(t, subscriber.storage.Proposals(), 1) + assert.Exactly(t, []market.ServiceProposal{proposalSecond}, subscriber.storage.Proposals()) } func proposalRegister(connection nats.Connection, payload string) { diff --git a/core/discovery/storage.go b/core/discovery/brokerdiscovery/storage.go similarity index 84% rename from core/discovery/storage.go rename to core/discovery/brokerdiscovery/storage.go index f34b9f202a..e0c88fabaa 100644 --- a/core/discovery/storage.go +++ b/core/discovery/brokerdiscovery/storage.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -15,18 +15,18 @@ * along with this program. If not, see . */ -package discovery +package brokerdiscovery import ( "fmt" "sync" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/market" - "github.com/mysteriumnetwork/node/market/mysterium" ) -// ProposalFinder continuously tracks service proposals from discovery service to storage -type ProposalFinder interface { +// Worker continuously tracks service proposals from discovery service to storage +type Worker interface { Start() error Stop() } @@ -34,12 +34,6 @@ type ProposalFinder interface { // ProposalReducer proposal match function type ProposalReducer func(proposal market.ServiceProposal) bool -// ProposalFilter defines interface with proposal match function -type ProposalFilter interface { - Matches(proposal market.ServiceProposal) bool - ToAPIQuery() mysterium.ProposalsQuery -} - // NewStorage creates new instance of ProposalStorage func NewStorage() *ProposalStorage { return &ProposalStorage{ @@ -67,16 +61,16 @@ func (s *ProposalStorage) MatchProposals(match ProposalReducer) ([]market.Servic defer s.mutex.Unlock() proposals := make([]market.ServiceProposal, 0) - for _, proposal := range s.proposals { - if match(proposal) { - proposals = append(proposals, proposal) + for _, p := range s.proposals { + if match(p) { + proposals = append(proposals, p) } } return proposals, nil } // FindProposals fetches currently active service proposals from storage by given filter -func (s *ProposalStorage) FindProposals(filter ProposalFilter) ([]market.ServiceProposal, error) { +func (s *ProposalStorage) FindProposals(filter proposal.Filter) ([]market.ServiceProposal, error) { return s.MatchProposals(filter.Matches) } diff --git a/core/discovery/storage_test.go b/core/discovery/brokerdiscovery/storage_test.go similarity index 90% rename from core/discovery/storage_test.go rename to core/discovery/brokerdiscovery/storage_test.go index 13cc5264c2..25179ff4ea 100644 --- a/core/discovery/storage_test.go +++ b/core/discovery/brokerdiscovery/storage_test.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -15,11 +15,12 @@ * along with this program. If not, see . */ -package discovery +package brokerdiscovery import ( "testing" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/discovery/reducer" "github.com/mysteriumnetwork/node/market" "github.com/mysteriumnetwork/node/market/mysterium" @@ -90,7 +91,7 @@ func Test_Finder_MatchProposals(t *testing.T) { func Test_Finder_FindProposals(t *testing.T) { storage := createFullStorage() - proposals, err := storage.FindProposals(&filter{}) + proposals, err := storage.FindProposals(proposal.Filter{}) assert.NoError(t, err) assert.Len(t, proposals, 3) assert.Exactly(t, @@ -98,7 +99,7 @@ func Test_Finder_FindProposals(t *testing.T) { proposals, ) - proposals, err = storage.FindProposals(&filter{"streaming"}) + proposals, err = storage.FindProposals(proposal.Filter{ServiceType: "streaming"}) assert.NoError(t, err) assert.Len(t, proposals, 2) assert.Exactly(t, @@ -118,18 +119,18 @@ func Test_Storage_HasProposal(t *testing.T) { func Test_Storage_GetProposal(t *testing.T) { storage := createEmptyStorage() - proposal, err := storage.GetProposal(market.ProposalID{ServiceType: "unknown", ProviderID: "0x1"}) + p, err := storage.GetProposal(market.ProposalID{ServiceType: "unknown", ProviderID: "0x1"}) assert.EqualError(t, err, "proposal does not exist: {unknown 0x1 0}") - assert.Nil(t, proposal) + assert.Nil(t, p) storage = createFullStorage() - proposal, err = storage.GetProposal(market.ProposalID{ServiceType: "unknown", ProviderID: "0x1"}) + p, err = storage.GetProposal(market.ProposalID{ServiceType: "unknown", ProviderID: "0x1"}) assert.EqualError(t, err, "proposal does not exist: {unknown 0x1 0}") - assert.Nil(t, proposal) + assert.Nil(t, p) - proposal, err = storage.GetProposal(market.ProposalID{ServiceType: "streaming", ProviderID: "0x1"}) + p, err = storage.GetProposal(market.ProposalID{ServiceType: "streaming", ProviderID: "0x1"}) assert.NoError(t, err) - assert.Exactly(t, proposalProvider1Streaming, *proposal) + assert.Exactly(t, proposalProvider1Streaming, *p) } func Test_Storage_Set(t *testing.T) { diff --git a/core/discovery/composite/finder.go b/core/discovery/composite/finder.go deleted file mode 100644 index 0ee99268d9..0000000000 --- a/core/discovery/composite/finder.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package composite - -import ( - "github.com/mysteriumnetwork/node/core/discovery" - "github.com/mysteriumnetwork/node/utils" -) - -type finderComposite struct { - finders []discovery.ProposalFinder -} - -// NewFinder creates an instance of composite finder -func NewFinder(finders ...discovery.ProposalFinder) *finderComposite { - return &finderComposite{finders: finders} -} - -// AddRegistry adds registry to set of registries -func (fc *finderComposite) AddFinder(finder discovery.ProposalFinder) { - fc.finders = append(fc.finders, finder) -} - -// Start begins proposals synchronization to storage -func (fc *finderComposite) Start() error { - var err utils.ErrorCollection - for _, finder := range fc.finders { - err.Add(finder.Start()) - } - return err.Error() -} - -// Stop ends proposals synchronization to storage -func (fc *finderComposite) Stop() { - for _, finder := range fc.finders { - finder.Stop() - } -} diff --git a/core/discovery/discovery_test.go b/core/discovery/discovery_test.go index 77995ddbb2..08905a56aa 100644 --- a/core/discovery/discovery_test.go +++ b/core/discovery/discovery_test.go @@ -24,14 +24,14 @@ import ( "github.com/mysteriumnetwork/node/eventbus" "github.com/mysteriumnetwork/node/identity" - identity_registry "github.com/mysteriumnetwork/node/identity/registry" + identityregistry "github.com/mysteriumnetwork/node/identity/registry" "github.com/mysteriumnetwork/node/market" "github.com/stretchr/testify/assert" ) var ( - providerID = identity.FromAddress("my-identity") - proposal = market.ServiceProposal{ + providerID = identity.FromAddress("my-identity") + serviceProposal = market.ServiceProposal{ ProviderID: providerID.Address, } ) @@ -52,9 +52,9 @@ func discoveryWithMockedDependencies() *Discovery { func TestStartRegistersProposal(t *testing.T) { d := discoveryWithMockedDependencies() - d.identityRegistry = &identity_registry.FakeRegistry{RegistrationStatus: identity_registry.RegisteredProvider} + d.identityRegistry = &identityregistry.FakeRegistry{RegistrationStatus: identityregistry.RegisteredProvider} - d.Start(providerID, proposal) + d.Start(providerID, serviceProposal) actualStatus := observeStatus(d, PingProposal) assert.Equal(t, PingProposal, actualStatus) @@ -62,16 +62,16 @@ func TestStartRegistersProposal(t *testing.T) { func TestStartRegistersIdentitySuccessfully(t *testing.T) { d := discoveryWithMockedDependencies() - d.identityRegistry = &identity_registry.FakeRegistry{RegistrationStatus: identity_registry.Unregistered} + d.identityRegistry = &identityregistry.FakeRegistry{RegistrationStatus: identityregistry.Unregistered} - d.Start(providerID, proposal) + d.Start(providerID, serviceProposal) actualStatus := observeStatus(d, WaitingForRegistration) assert.Equal(t, WaitingForRegistration, actualStatus) - d.eventBus.Publish(identity_registry.RegistrationEventTopic, identity_registry.RegistrationEventPayload{ + d.eventBus.Publish(identityregistry.RegistrationEventTopic, identityregistry.RegistrationEventPayload{ ID: providerID, - Status: identity_registry.RegisteredProvider, + Status: identityregistry.RegisteredProvider, }) actualStatus = observeStatus(d, PingProposal) @@ -80,18 +80,18 @@ func TestStartRegistersIdentitySuccessfully(t *testing.T) { func TestStartRegisterIdentityCancelled(t *testing.T) { d := discoveryWithMockedDependencies() - mockRegistry := &identity_registry.FakeRegistry{RegistrationStatus: identity_registry.Unregistered} + mockRegistry := &identityregistry.FakeRegistry{RegistrationStatus: identityregistry.Unregistered} d.identityRegistry = mockRegistry - d.Start(providerID, proposal) + d.Start(providerID, serviceProposal) defer d.Stop() actualStatus := observeStatus(d, WaitingForRegistration) assert.Equal(t, WaitingForRegistration, actualStatus) - d.eventBus.Publish(identity_registry.RegistrationEventTopic, identity_registry.RegistrationEventPayload{ + d.eventBus.Publish(identityregistry.RegistrationEventTopic, identityregistry.RegistrationEventPayload{ ID: providerID, - Status: identity_registry.RegistrationError, + Status: identityregistry.RegistrationError, }) actualStatus = observeStatus(d, IdentityRegisterFailed) @@ -100,9 +100,9 @@ func TestStartRegisterIdentityCancelled(t *testing.T) { func TestStartStopUnregisterProposal(t *testing.T) { d := discoveryWithMockedDependencies() - d.identityRegistry = &identity_registry.FakeRegistry{RegistrationStatus: identity_registry.RegisteredProvider} + d.identityRegistry = &identityregistry.FakeRegistry{RegistrationStatus: identityregistry.RegisteredProvider} - d.Start(providerID, proposal) + d.Start(providerID, serviceProposal) actualStatus := observeStatus(d, PingProposal) assert.Equal(t, PingProposal, actualStatus) diff --git a/core/discovery/noop/registry.go b/core/discovery/noop/registry.go deleted file mode 100644 index 077ba9b157..0000000000 --- a/core/discovery/noop/registry.go +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package noop - -import ( - "github.com/mysteriumnetwork/node/identity" - "github.com/mysteriumnetwork/node/market" -) - -type registryNoop struct{} - -// NewRegistry creates an instance of composite registry -func NewRegistry() *registryNoop { - return ®istryNoop{} -} - -// RegisterProposal registers service proposal to discovery service -func (rc *registryNoop) RegisterProposal(proposal market.ServiceProposal, signer identity.Signer) error { - return nil -} - -// UnregisterProposal unregisters a service proposal when client disconnects -func (rc *registryNoop) UnregisterProposal(proposal market.ServiceProposal, signer identity.Signer) error { - return nil -} - -// PingProposal pings service proposal as being alive -func (rc *registryNoop) PingProposal(proposal market.ServiceProposal, signer identity.Signer) error { - return nil -} diff --git a/core/discovery/proposal/filter.go b/core/discovery/proposal/filter.go new file mode 100644 index 0000000000..cc11facd6d --- /dev/null +++ b/core/discovery/proposal/filter.go @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package proposal + +import ( + "github.com/mysteriumnetwork/node/core/discovery/reducer" + "github.com/mysteriumnetwork/node/market" + "github.com/mysteriumnetwork/node/market/mysterium" +) + +// Filter defines all flags for proposal filtering in discovery of Mysterium Network +type Filter struct { + ProviderID string + ServiceType string + LocationType string + AccessPolicyID string + AccessPolicySource string +} + +// Matches return flag if filter matches given proposal +func (filter *Filter) Matches(proposal market.ServiceProposal) bool { + conditions := make([]reducer.AndCondition, 0) + if filter.ProviderID != "" { + conditions = append(conditions, reducer.Equal(reducer.ProviderID, filter.ProviderID)) + } + if filter.ServiceType != "" { + conditions = append(conditions, reducer.Equal(reducer.ServiceType, filter.ServiceType)) + } + if filter.LocationType != "" { + conditions = append(conditions, reducer.Equal(reducer.LocationType, filter.LocationType)) + } + if filter.AccessPolicyID != "" || filter.AccessPolicySource != "" { + conditions = append(conditions, reducer.AccessPolicy(filter.AccessPolicyID, filter.AccessPolicySource)) + } + if len(conditions) > 0 { + return reducer.And(conditions...)(proposal) + } + return true +} + +// ToAPIQuery serialises filter to query of Mysterium API +func (filter *Filter) ToAPIQuery() mysterium.ProposalsQuery { + return mysterium.ProposalsQuery{ + NodeKey: filter.ProviderID, + ServiceType: filter.ServiceType, + AccessPolicyID: filter.AccessPolicyID, + AccessPolicySource: filter.AccessPolicySource, + } +} diff --git a/tequilapi/endpoints/proposals_filter_test.go b/core/discovery/proposal/filter_test.go similarity index 87% rename from tequilapi/endpoints/proposals_filter_test.go rename to core/discovery/proposal/filter_test.go index 0f316245bd..118d86878d 100644 --- a/tequilapi/endpoints/proposals_filter_test.go +++ b/core/discovery/proposal/filter_test.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package endpoints +package proposal import ( "testing" @@ -69,7 +69,7 @@ func (service mockService) GetLocation() market.Location { } func Test_ProposalFilter_FiltersAll(t *testing.T) { - filter := &proposalsFilter{} + filter := &Filter{} assert.True(t, filter.Matches(proposalEmpty)) assert.True(t, filter.Matches(proposalProvider1Streaming)) assert.True(t, filter.Matches(proposalProvider1Noop)) @@ -77,8 +77,8 @@ func Test_ProposalFilter_FiltersAll(t *testing.T) { } func Test_ProposalFilter_FiltersByProviderID(t *testing.T) { - filter := &proposalsFilter{ - providerID: provider1, + filter := &Filter{ + ProviderID: provider1, } assert.False(t, filter.Matches(proposalEmpty)) assert.True(t, filter.Matches(proposalProvider1Streaming)) @@ -87,16 +87,16 @@ func Test_ProposalFilter_FiltersByProviderID(t *testing.T) { } func Test_ProposalFilter_FiltersByServiceType(t *testing.T) { - filter := &proposalsFilter{ - serviceType: serviceTypeNoop, + filter := &Filter{ + ServiceType: serviceTypeNoop, } assert.False(t, filter.Matches(proposalEmpty)) assert.False(t, filter.Matches(proposalProvider1Streaming)) assert.True(t, filter.Matches(proposalProvider1Noop)) assert.False(t, filter.Matches(proposalProvider2Streaming)) - filter = &proposalsFilter{ - serviceType: serviceTypeStreaming, + filter = &Filter{ + ServiceType: serviceTypeStreaming, } assert.False(t, filter.Matches(proposalEmpty)) assert.True(t, filter.Matches(proposalProvider1Streaming)) @@ -105,16 +105,16 @@ func Test_ProposalFilter_FiltersByServiceType(t *testing.T) { } func Test_ProposalFilter_FiltersByLocationType(t *testing.T) { - filter := &proposalsFilter{ - locationType: "datacenter", + filter := &Filter{ + LocationType: "datacenter", } assert.False(t, filter.Matches(proposalEmpty)) assert.True(t, filter.Matches(proposalProvider1Streaming)) assert.False(t, filter.Matches(proposalProvider1Noop)) assert.False(t, filter.Matches(proposalProvider2Streaming)) - filter = &proposalsFilter{ - locationType: "residential", + filter = &Filter{ + LocationType: "residential", } assert.False(t, filter.Matches(proposalEmpty)) assert.False(t, filter.Matches(proposalProvider1Streaming)) @@ -123,25 +123,25 @@ func Test_ProposalFilter_FiltersByLocationType(t *testing.T) { } func Test_ProposalFilter_FiltersByAccessID(t *testing.T) { - filter := &proposalsFilter{ - accessPolicyID: "whitelist", + filter := &Filter{ + AccessPolicyID: "whitelist", } assert.False(t, filter.Matches(proposalEmpty)) assert.True(t, filter.Matches(proposalProvider1Streaming)) assert.False(t, filter.Matches(proposalProvider1Noop)) assert.True(t, filter.Matches(proposalProvider2Streaming)) - filter = &proposalsFilter{ - accessPolicyID: "blacklist", + filter = &Filter{ + AccessPolicyID: "blacklist", } assert.False(t, filter.Matches(proposalEmpty)) assert.False(t, filter.Matches(proposalProvider1Streaming)) assert.False(t, filter.Matches(proposalProvider1Noop)) assert.True(t, filter.Matches(proposalProvider2Streaming)) - filter = &proposalsFilter{ - accessPolicyID: "whitelist", - accessPolicySource: "unknown.txt", + filter = &Filter{ + AccessPolicyID: "whitelist", + AccessPolicySource: "unknown.txt", } assert.False(t, filter.Matches(proposalEmpty)) assert.False(t, filter.Matches(proposalProvider1Streaming)) diff --git a/core/discovery/noop/finder.go b/core/discovery/proposal/proposal.go similarity index 61% rename from core/discovery/noop/finder.go rename to core/discovery/proposal/proposal.go index ae98bd14ed..fdafa946da 100644 --- a/core/discovery/noop/finder.go +++ b/core/discovery/proposal/proposal.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -15,18 +15,16 @@ * along with this program. If not, see . */ -package noop +package proposal -// NewFinder returns noop instance of noop finder which is used to -// disable proposals auto fetching. -func NewFinder() *finderNoop { - return &finderNoop{} -} - -type finderNoop struct{} +import ( + "github.com/mysteriumnetwork/node/market" +) -func (fn finderNoop) Start() error { - return nil +// Repository provides proposals. +type Repository interface { + // Proposal returns a single proposal by its ID. + Proposal(id market.ProposalID) (*market.ServiceProposal, error) + // Proposals returns proposals matching the filter. + Proposals(filter *Filter) ([]market.ServiceProposal, error) } - -func (fn finderNoop) Stop() {} diff --git a/core/discovery/composite/registry.go b/core/discovery/registry.go similarity index 86% rename from core/discovery/composite/registry.go rename to core/discovery/registry.go index 38ecd4429a..0272ed34ce 100644 --- a/core/discovery/composite/registry.go +++ b/core/discovery/registry.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019 The "MysteriumNetwork/node" Authors. + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -15,26 +15,25 @@ * along with this program. If not, see . */ -package composite +package discovery import ( - "github.com/mysteriumnetwork/node/core/discovery" "github.com/mysteriumnetwork/node/identity" "github.com/mysteriumnetwork/node/market" "github.com/pkg/errors" ) type registryComposite struct { - registries []discovery.ProposalRegistry + registries []ProposalRegistry } // NewRegistry creates an instance of composite registry -func NewRegistry(registries ...discovery.ProposalRegistry) *registryComposite { +func NewRegistry(registries ...ProposalRegistry) *registryComposite { return ®istryComposite{registries: registries} } // AddRegistry adds registry to set of registries -func (rc *registryComposite) AddRegistry(registry discovery.ProposalRegistry) { +func (rc *registryComposite) AddRegistry(registry ProposalRegistry) { rc.registries = append(rc.registries, registry) } diff --git a/core/discovery/repository.go b/core/discovery/repository.go new file mode 100644 index 0000000000..17a2c440a0 --- /dev/null +++ b/core/discovery/repository.go @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 The "MysteriumNetwork/node" Authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package discovery + +import ( + "sync" + + "github.com/mysteriumnetwork/node/core/discovery/proposal" + "github.com/mysteriumnetwork/node/market" + "github.com/mysteriumnetwork/node/utils" + "github.com/rs/zerolog/log" +) + +// repository provides proposals from multiple other repositories. +type repository struct { + delegates []proposal.Repository +} + +// NewRepository constructs a new composite repository. +func NewRepository() *repository { + return &repository{} +} + +// Add adds a delegate repositories from which proposals can be acquired. +func (c *repository) Add(repository proposal.Repository) { + c.delegates = append(c.delegates, repository) +} + +// Proposal returns a single proposal by its ID. +func (c *repository) Proposal(id market.ProposalID) (*market.ServiceProposal, error) { + allErrors := utils.ErrorCollection{} + + for _, delegate := range c.delegates { + serviceProposal, err := delegate.Proposal(id) + if err == nil { + return serviceProposal, nil + } + allErrors.Add(err) + } + + return nil, allErrors.Error() +} + +// Proposals returns proposals matching the filter. +func (c *repository) Proposals(filter *proposal.Filter) ([]market.ServiceProposal, error) { + log.Debug().Msgf("Retrieving proposals from %d repositories", len(c.delegates)) + proposals := make([][]market.ServiceProposal, len(c.delegates)) + errors := make([]error, len(c.delegates)) + + var wg sync.WaitGroup + for i, delegate := range c.delegates { + wg.Add(1) + go func(idx int, repo proposal.Repository) { + defer wg.Done() + proposals[idx], errors[idx] = repo.Proposals(filter) + }(i, delegate) + } + wg.Wait() + + uniqueProposals := make(map[market.ProposalID]market.ServiceProposal) + for i, repoProposals := range proposals { + log.Trace().Msgf("Retrieved %d proposals from repository %d", len(repoProposals), i) + for _, p := range repoProposals { + uniqueProposals[p.UniqueID()] = p + } + } + + var result []market.ServiceProposal + for _, val := range uniqueProposals { + result = append(result, val) + } + + allErrors := utils.ErrorCollection{} + allErrors.Add(errors...) + + log.Err(allErrors.Error()).Msgf("Returning %d unique proposals", len(result)) + return result, allErrors.Error() +} diff --git a/mobile/mysterium/entrypoint.go b/mobile/mysterium/entrypoint.go index cb1cdbcc31..31fe7b9864 100644 --- a/mobile/mysterium/entrypoint.go +++ b/mobile/mysterium/entrypoint.go @@ -30,7 +30,6 @@ import ( "github.com/mysteriumnetwork/node/cmd" "github.com/mysteriumnetwork/node/consumer/statistics" "github.com/mysteriumnetwork/node/core/connection" - "github.com/mysteriumnetwork/node/core/discovery" "github.com/mysteriumnetwork/node/core/ip" "github.com/mysteriumnetwork/node/core/location" "github.com/mysteriumnetwork/node/core/node" @@ -52,7 +51,6 @@ type MobileNode struct { node *node.Node connectionManager connection.Manager locationResolver *location.Cache - discoveryStorage *discovery.ProposalStorage identitySelector selector.Handler signerFactory identity.SignerFactory natPinger natPinger @@ -168,7 +166,6 @@ func NewNode(appPath string, optionsNetwork *MobileNetworkOptions) (*MobileNode, node: di.Node, connectionManager: di.ConnectionManager, locationResolver: di.LocationResolver, - discoveryStorage: di.DiscoveryStorage, identitySelector: di.IdentitySelector, signerFactory: di.SignerFactory, natPinger: di.NATPinger, @@ -184,7 +181,7 @@ func NewNode(appPath string, optionsNetwork *MobileNetworkOptions) (*MobileNode, channelImplementationAddress: options.Transactor.ChannelImplementation, registryAddress: options.Transactor.RegistryAddress, proposalsManager: newProposalsManager( - di.DiscoveryStorage, + di.ProposalRepository, di.MysteriumAPI, di.QualityClient, ), @@ -300,7 +297,7 @@ type ConnectRequest struct { // Connect connects to given provider. func (mb *MobileNode) Connect(req *ConnectRequest) error { - proposal, err := mb.discoveryStorage.GetProposal(market.ProposalID{ + proposal, err := mb.proposalsManager.repository.Proposal(market.ProposalID{ ProviderID: req.ProviderID, ServiceType: req.ServiceType, }) diff --git a/mobile/mysterium/proposals_manager.go b/mobile/mysterium/proposals_manager.go index 4cd7d99795..304f97d4c8 100644 --- a/mobile/mysterium/proposals_manager.go +++ b/mobile/mysterium/proposals_manager.go @@ -20,7 +20,7 @@ package mysterium import ( "encoding/json" - "github.com/mysteriumnetwork/node/core/discovery" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/quality" "github.com/mysteriumnetwork/node/market" "github.com/mysteriumnetwork/node/market/mysterium" @@ -55,7 +55,7 @@ type GetProposalRequest struct { ServiceType string } -type proposal struct { +type proposalDTO struct { ID int `json:"id"` ProviderID string `json:"providerId"` ServiceType string `json:"serviceType"` @@ -64,15 +64,11 @@ type proposal struct { } type getProposalsResponse struct { - Proposals []*proposal `json:"proposals"` + Proposals []*proposalDTO `json:"proposals"` } type getProposalResponse struct { - Proposal *proposal `json:"proposal"` -} - -type proposalStorage interface { - Set(proposals ...market.ServiceProposal) + Proposal *proposalDTO `json:"proposal"` } type mysteriumAPI interface { @@ -84,35 +80,35 @@ type qualityFinder interface { } func newProposalsManager( - proposalsStore *discovery.ProposalStorage, + repository proposal.Repository, mysteriumAPI mysteriumAPI, qualityFinder qualityFinder, ) *proposalsManager { return &proposalsManager{ - proposalsStore: proposalsStore, - mysteriumAPI: mysteriumAPI, - qualityFinder: qualityFinder, + repository: repository, + mysteriumAPI: mysteriumAPI, + qualityFinder: qualityFinder, } } type proposalsManager struct { - proposalsStore *discovery.ProposalStorage - mysteriumAPI mysteriumAPI - qualityFinder qualityFinder + repository proposal.Repository + cache []market.ServiceProposal + mysteriumAPI mysteriumAPI + qualityFinder qualityFinder } func (m *proposalsManager) getProposals(req *GetProposalsRequest) ([]byte, error) { // Get proposals from cache if exists. if !req.Refresh { - cachedProposals, err := m.getFromCache() - if err != nil { - return nil, err + cachedProposals := m.getFromCache() + if len(cachedProposals) > 0 { + return m.mapToProposalsResponse(cachedProposals) } - return m.mapToProposalsResponse(cachedProposals) } // Get proposals from remote discovery api and store in cache. - apiProposals, err := m.getFromAPI(req.ShowOpenvpnProposals, req.ShowWireguardProposals) + apiProposals, err := m.getFromRepository() if err != nil { return nil, err } @@ -122,40 +118,27 @@ func (m *proposalsManager) getProposals(req *GetProposalsRequest) ([]byte, error } func (m *proposalsManager) getProposal(req *GetProposalRequest) ([]byte, error) { - proposal, err := m.proposalsStore.GetProposal(market.ProposalID{ + result, err := m.repository.Proposal(market.ProposalID{ ProviderID: req.ProviderID, ServiceType: req.ServiceType, }) if err != nil { return nil, err } - - if proposal == nil { + if result == nil { return nil, nil } - return m.mapToProposalResponse(proposal) + return m.mapToProposalResponse(result) } -func (m *proposalsManager) getFromCache() ([]market.ServiceProposal, error) { - return m.proposalsStore.MatchProposals(func(v market.ServiceProposal) bool { - return true - }) +func (m *proposalsManager) getFromCache() []market.ServiceProposal { + return m.cache } -func (m *proposalsManager) getFromAPI(showOpenvpnProposals, showWireguardProposals bool) ([]market.ServiceProposal, error) { - var serviceType string - if showOpenvpnProposals && showWireguardProposals { - serviceType = "all" - } else if showOpenvpnProposals { - serviceType = openvpn.ServiceType - } else if showWireguardProposals { - serviceType = wireguard.ServiceType - } - query := mysterium.ProposalsQuery{ - ServiceType: serviceType, - AccessPolicyAll: false, - } - allProposals, err := m.mysteriumAPI.QueryProposals(query) +func (m *proposalsManager) getFromRepository() ([]market.ServiceProposal, error) { + allProposals, err := m.repository.Proposals(&proposal.Filter{ + ServiceType: "all", + }) if err != nil { return nil, err } @@ -172,13 +155,13 @@ func (m *proposalsManager) getFromAPI(showOpenvpnProposals, showWireguardProposa } func (m *proposalsManager) addToCache(proposals []market.ServiceProposal) { - m.proposalsStore.Set(proposals) + m.cache = proposals } func (m *proposalsManager) mapToProposalsResponse(serviceProposals []market.ServiceProposal) ([]byte, error) { - var proposals []*proposal + var proposals []*proposalDTO for _, p := range serviceProposals { - proposals = append(proposals, &proposal{ + proposals = append(proposals, &proposalDTO{ ID: p.ID, ProviderID: p.ProviderID, ServiceType: p.ServiceType, @@ -197,13 +180,13 @@ func (m *proposalsManager) mapToProposalsResponse(serviceProposals []market.Serv } func (m *proposalsManager) mapToProposalResponse(p *market.ServiceProposal) ([]byte, error) { - proposal := &proposal{ + dto := &proposalDTO{ ID: p.ID, ProviderID: p.ProviderID, ServiceType: p.ServiceType, CountryCode: m.getServiceCountryCode(p), } - res := &getProposalResponse{Proposal: proposal} + res := &getProposalResponse{Proposal: dto} bytes, err := json.Marshal(res) if err != nil { return nil, err @@ -218,7 +201,7 @@ func (m *proposalsManager) getServiceCountryCode(p *market.ServiceProposal) stri return p.ServiceDefinition.GetLocation().Country } -func (m *proposalsManager) addQualityData(proposals []*proposal) { +func (m *proposalsManager) addQualityData(proposals []*proposalDTO) { metrics := m.qualityFinder.ProposalsMetrics() // Convert metrics slice to map for fast lookup. diff --git a/mobile/mysterium/proposals_manager_test.go b/mobile/mysterium/proposals_manager_test.go index e2930f5d2a..ad761f1e7f 100644 --- a/mobile/mysterium/proposals_manager_test.go +++ b/mobile/mysterium/proposals_manager_test.go @@ -20,7 +20,7 @@ package mysterium import ( "testing" - "github.com/mysteriumnetwork/node/core/discovery" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/quality" "github.com/mysteriumnetwork/node/market" "github.com/mysteriumnetwork/node/market/mysterium" @@ -31,29 +31,29 @@ import ( type proposalManagerTestSuite struct { suite.Suite - proposalsStore *discovery.ProposalStorage - mysteriumAPI mysteriumAPI - qualityFinder qualityFinder + repository *mockRepository + mysteriumAPI mysteriumAPI + qualityFinder qualityFinder proposalsManager *proposalsManager } func (s *proposalManagerTestSuite) SetupTest() { - s.proposalsStore = discovery.NewStorage() + s.repository = &mockRepository{} s.mysteriumAPI = &mockMysteriumAPI{} s.qualityFinder = &mockQualityFinder{} s.proposalsManager = newProposalsManager( - s.proposalsStore, + s.repository, s.mysteriumAPI, s.qualityFinder, ) } func (s *proposalManagerTestSuite) TestGetProposalsFromCache() { - s.proposalsStore.Set([]market.ServiceProposal{ + s.proposalsManager.cache = []market.ServiceProposal{ {ProviderID: "p1", ServiceType: "openvpn"}, - }) + } s.proposalsManager.qualityFinder = &mockQualityFinder{ metrics: []quality.ConnectMetric{ { @@ -81,12 +81,10 @@ func (s *proposalManagerTestSuite) TestGetProposalsFromCache() { } func (s *proposalManagerTestSuite) TestGetProposalsFromAPIWhenNotFoundInCache() { - s.proposalsStore.Set([]market.ServiceProposal{}) - s.proposalsManager.mysteriumAPI = &mockMysteriumAPI{ - proposals: []market.ServiceProposal{ - {ProviderID: "p1", ServiceType: "wireguard"}, - }, + s.repository.data = []market.ServiceProposal{ + {ProviderID: "p1", ServiceType: "wireguard"}, } + s.proposalsManager.mysteriumAPI = &mockMysteriumAPI{} bytes, err := s.proposalsManager.getProposals(&GetProposalsRequest{ ShowOpenvpnProposals: false, ShowWireguardProposals: false, @@ -98,9 +96,9 @@ func (s *proposalManagerTestSuite) TestGetProposalsFromAPIWhenNotFoundInCache() } func (s *proposalManagerTestSuite) TestGetSingleProposal() { - s.proposalsStore.Set([]market.ServiceProposal{ + s.repository.data = []market.ServiceProposal{ {ProviderID: "p1", ServiceType: "wireguard"}, - }) + } bytes, err := s.proposalsManager.getProposal(&GetProposalRequest{ ProviderID: "p1", ServiceType: "wireguard", @@ -114,6 +112,21 @@ func TestProposalManagerSuite(t *testing.T) { suite.Run(t, new(proposalManagerTestSuite)) } +type mockRepository struct { + data []market.ServiceProposal +} + +func (m *mockRepository) Proposal(id market.ProposalID) (*market.ServiceProposal, error) { + if len(m.data) == 0 { + return nil, nil + } + return &m.data[0], nil +} + +func (m *mockRepository) Proposals(filter *proposal.Filter) ([]market.ServiceProposal, error) { + return m.data, nil +} + type mockMysteriumAPI struct { proposals []market.ServiceProposal } diff --git a/tequilapi/endpoints/connection.go b/tequilapi/endpoints/connection.go index 731fe5494a..2d38f54aa2 100644 --- a/tequilapi/endpoints/connection.go +++ b/tequilapi/endpoints/connection.go @@ -26,6 +26,7 @@ import ( "github.com/julienschmidt/httprouter" "github.com/mysteriumnetwork/node/consumer" "github.com/mysteriumnetwork/node/core/connection" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/identity" "github.com/mysteriumnetwork/node/identity/registry" "github.com/mysteriumnetwork/node/market" @@ -90,7 +91,7 @@ type connectionResponse struct { SessionID string `json:"sessionId,omitempty"` // example: {"id":1,"providerId":"0x71ccbdee7f6afe85a5bc7106323518518cd23b94","serviceType":"openvpn","serviceDefinition":{"locationOriginate":{"asn":"","country":"CA"}}} - Proposal *proposal `json:"proposal,omitempty"` + Proposal *proposalDTO `json:"proposal,omitempty"` } // swagger:model IPDTO @@ -133,17 +134,17 @@ type ConnectionEndpoint struct { manager connection.Manager statisticsTracker SessionStatisticsTracker //TODO connection should use concrete proposal from connection params and avoid going to marketplace - proposalProvider ProposalGetter - identityRegistry identityRegistry + proposalRepository proposal.Repository + identityRegistry identityRegistry } // NewConnectionEndpoint creates and returns connection endpoint -func NewConnectionEndpoint(manager connection.Manager, statsKeeper SessionStatisticsTracker, proposalProvider ProposalGetter, identityRegistry identityRegistry) *ConnectionEndpoint { +func NewConnectionEndpoint(manager connection.Manager, statsKeeper SessionStatisticsTracker, proposalRepository proposal.Repository, identityRegistry identityRegistry) *ConnectionEndpoint { return &ConnectionEndpoint{ - manager: manager, - statisticsTracker: statsKeeper, - proposalProvider: proposalProvider, - identityRegistry: identityRegistry, + manager: manager, + statisticsTracker: statsKeeper, + proposalRepository: proposalRepository, + identityRegistry: identityRegistry, } } @@ -232,7 +233,7 @@ func (ce *ConnectionEndpoint) Create(resp http.ResponseWriter, req *http.Request } // TODO Pass proposal ID directly in request - proposal, err := ce.proposalProvider.GetProposal(market.ProposalID{ + proposal, err := ce.proposalRepository.Proposal(market.ProposalID{ ProviderID: cr.ProviderID, ServiceType: cr.ServiceType, }) @@ -324,8 +325,8 @@ func (ce *ConnectionEndpoint) GetStatistics(writer http.ResponseWriter, request // AddRoutesForConnection adds connections routes to given router func AddRoutesForConnection(router *httprouter.Router, manager connection.Manager, - statsKeeper SessionStatisticsTracker, proposalProvider ProposalGetter, identityRegistry identityRegistry) { - connectionEndpoint := NewConnectionEndpoint(manager, statsKeeper, proposalProvider, identityRegistry) + statsKeeper SessionStatisticsTracker, proposalRepository proposal.Repository, identityRegistry identityRegistry) { + connectionEndpoint := NewConnectionEndpoint(manager, statsKeeper, proposalRepository, identityRegistry) router.GET("/connection", connectionEndpoint.Status) router.PUT("/connection", connectionEndpoint.Create) router.DELETE("/connection", connectionEndpoint.Kill) diff --git a/tequilapi/endpoints/connection_test.go b/tequilapi/endpoints/connection_test.go index ed9990ff37..df73679367 100644 --- a/tequilapi/endpoints/connection_test.go +++ b/tequilapi/endpoints/connection_test.go @@ -80,7 +80,7 @@ func (ssk *StubStatisticsTracker) GetSessionDuration() time.Duration { return ssk.duration } -func getMockProposalProviderWithSpecifiedProposal(providerID, serviceType string) *mockProposalProvider { +func mockRepositoryWithProposal(providerID, serviceType string) *mockProposalRepository { sampleProposal := market.ServiceProposal{ ID: 1, ServiceType: serviceType, @@ -88,7 +88,7 @@ func getMockProposalProviderWithSpecifiedProposal(providerID, serviceType string ProviderID: providerID, } - return &mockProposalProvider{ + return &mockProposalRepository{ proposals: []market.ServiceProposal{sampleProposal}, } } @@ -100,7 +100,7 @@ func TestAddRoutesForConnectionAddsRoutes(t *testing.T) { duration: time.Minute, } - mockedProposalProvider := getMockProposalProviderWithSpecifiedProposal("node1", "noop") + mockedProposalProvider := mockRepositoryWithProposal("node1", "noop") AddRoutesForConnection(router, &fakeManager, statsKeeper, mockedProposalProvider, mockIdentityRegistryInstance) tests := []struct { @@ -152,7 +152,7 @@ func TestDisconnectingState(t *testing.T) { SessionID: "", } - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodGet, "/irrelevant", nil) resp := httptest.NewRecorder() @@ -174,7 +174,7 @@ func TestNotConnectedStateIsReturnedWhenNoConnection(t *testing.T) { SessionID: "", } - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodGet, "/irrelevant", nil) resp := httptest.NewRecorder() @@ -196,7 +196,7 @@ func TestStateConnectingIsReturnedWhenIsConnectionInProgress(t *testing.T) { State: connection.Connecting, } - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodGet, "/irrelevant", nil) resp := httptest.NewRecorder() @@ -219,7 +219,7 @@ func TestConnectedStateAndSessionIdIsReturnedWhenIsConnected(t *testing.T) { SessionID: "My-super-session", } - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodGet, "/irrelevant", nil) resp := httptest.NewRecorder() @@ -239,7 +239,7 @@ func TestConnectedStateAndSessionIdIsReturnedWhenIsConnected(t *testing.T) { func TestPutReturns400ErrorIfRequestBodyIsNotJSON(t *testing.T) { fakeManager := mockConnectionManager{} - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodPut, "/irrelevant", strings.NewReader("a")) resp := httptest.NewRecorder() @@ -258,7 +258,7 @@ func TestPutReturns400ErrorIfRequestBodyIsNotJSON(t *testing.T) { func TestPutReturns422ErrorIfRequestBodyIsMissingFieldValues(t *testing.T) { fakeManager := mockConnectionManager{} - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodPut, "/irrelevant", strings.NewReader("{}")) resp := httptest.NewRecorder() @@ -281,7 +281,7 @@ func TestPutReturns422ErrorIfRequestBodyIsMissingFieldValues(t *testing.T) { func TestPutWithValidBodyCreatesConnection(t *testing.T) { fakeManager := mockConnectionManager{} - proposalProvider := getMockProposalProviderWithSpecifiedProposal("required-node", "openvpn") + proposalProvider := mockRepositoryWithProposal("required-node", "openvpn") connEndpoint := NewConnectionEndpoint(&fakeManager, nil, proposalProvider, mockIdentityRegistryInstance) req := httptest.NewRequest( http.MethodPut, @@ -307,7 +307,7 @@ func TestPutWithValidBodyCreatesConnection(t *testing.T) { func TestPutUnregisteredIdentityReturnsError(t *testing.T) { fakeManager := mockConnectionManager{} - proposalProvider := getMockProposalProviderWithSpecifiedProposal("required-node", "openvpn") + proposalProvider := mockRepositoryWithProposal("required-node", "openvpn") mir := *mockIdentityRegistryInstance mir.RegistrationStatus = registry.Unregistered @@ -336,7 +336,7 @@ func TestPutUnregisteredIdentityReturnsError(t *testing.T) { func TestPutFailedRegistrationCheckReturnsError(t *testing.T) { fakeManager := mockConnectionManager{} - proposalProvider := getMockProposalProviderWithSpecifiedProposal("required-node", "openvpn") + proposalProvider := mockRepositoryWithProposal("required-node", "openvpn") mir := *mockIdentityRegistryInstance mir.RegistrationCheckError = errors.New("explosions everywhere") @@ -365,7 +365,7 @@ func TestPutFailedRegistrationCheckReturnsError(t *testing.T) { func TestPutWithServiceTypeOverridesDefault(t *testing.T) { fakeManager := mockConnectionManager{} - mystAPI := getMockProposalProviderWithSpecifiedProposal("required-node", "noop") + mystAPI := mockRepositoryWithProposal("required-node", "noop") connEndpoint := NewConnectionEndpoint(&fakeManager, nil, mystAPI, mockIdentityRegistryInstance) req := httptest.NewRequest( http.MethodPut, @@ -392,7 +392,7 @@ func TestPutWithServiceTypeOverridesDefault(t *testing.T) { func TestDeleteCallsDisconnect(t *testing.T) { fakeManager := mockConnectionManager{} - connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&fakeManager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest(http.MethodDelete, "/irrelevant", nil) resp := httptest.NewRecorder() @@ -410,7 +410,7 @@ func TestGetStatisticsEndpointReturnsStatistics(t *testing.T) { } manager := mockConnectionManager{} - connEndpoint := NewConnectionEndpoint(&manager, statsKeeper, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&manager, statsKeeper, &mockProposalRepository{}, mockIdentityRegistryInstance) resp := httptest.NewRecorder() connEndpoint.GetStatistics(resp, nil, nil) @@ -431,7 +431,7 @@ func TestGetStatisticsEndpointReturnsStatisticsWhenSessionIsNotStarted(t *testin } manager := mockConnectionManager{} - connEndpoint := NewConnectionEndpoint(&manager, statsKeeper, &mockProposalProvider{}, mockIdentityRegistryInstance) + connEndpoint := NewConnectionEndpoint(&manager, statsKeeper, &mockProposalRepository{}, mockIdentityRegistryInstance) resp := httptest.NewRecorder() connEndpoint.GetStatistics(resp, nil, nil) @@ -450,7 +450,7 @@ func TestEndpointReturnsConflictStatusIfConnectionAlreadyExists(t *testing.T) { manager := mockConnectionManager{} manager.onConnectReturn = connection.ErrAlreadyExists - mystAPI := getMockProposalProviderWithSpecifiedProposal("required-node", "openvpn") + mystAPI := mockRepositoryWithProposal("required-node", "openvpn") connectionEndpoint := NewConnectionEndpoint(&manager, nil, mystAPI, mockIdentityRegistryInstance) req := httptest.NewRequest( @@ -480,7 +480,7 @@ func TestDisconnectReturnsConflictStatusIfConnectionDoesNotExist(t *testing.T) { manager := mockConnectionManager{} manager.onDisconnectReturn = connection.ErrNoConnection - connectionEndpoint := NewConnectionEndpoint(&manager, nil, &mockProposalProvider{}, mockIdentityRegistryInstance) + connectionEndpoint := NewConnectionEndpoint(&manager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest( http.MethodDelete, @@ -505,7 +505,7 @@ func TestConnectReturnsConnectCancelledStatusWhenErrConnectionCancelledIsEncount manager := mockConnectionManager{} manager.onConnectReturn = connection.ErrConnectionCancelled - mockProposalProvider := getMockProposalProviderWithSpecifiedProposal("required-node", "openvpn") + mockProposalProvider := mockRepositoryWithProposal("required-node", "openvpn") connectionEndpoint := NewConnectionEndpoint(&manager, nil, mockProposalProvider, mockIdentityRegistryInstance) req := httptest.NewRequest( http.MethodPut, @@ -534,7 +534,7 @@ func TestConnectReturnsErrorIfNoProposals(t *testing.T) { manager := mockConnectionManager{} manager.onConnectReturn = connection.ErrConnectionCancelled - connectionEndpoint := NewConnectionEndpoint(&manager, nil, &mockProposalProvider{proposals: make([]market.ServiceProposal, 0)}, mockIdentityRegistryInstance) + connectionEndpoint := NewConnectionEndpoint(&manager, nil, &mockProposalRepository{}, mockIdentityRegistryInstance) req := httptest.NewRequest( http.MethodPut, "/irrelevant", diff --git a/tequilapi/endpoints/proposals.go b/tequilapi/endpoints/proposals.go index 5eef6cda40..9d203550eb 100644 --- a/tequilapi/endpoints/proposals.go +++ b/tequilapi/endpoints/proposals.go @@ -21,7 +21,7 @@ import ( "net/http" "github.com/julienschmidt/httprouter" - "github.com/mysteriumnetwork/node/core/discovery" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/quality" "github.com/mysteriumnetwork/node/market" "github.com/mysteriumnetwork/node/tequilapi/utils" @@ -29,7 +29,7 @@ import ( // swagger:model ProposalsList type proposalsRes struct { - Proposals []*proposal `json:"proposals"` + Proposals []*proposalDTO `json:"proposals"` } // swagger:model ServiceLocationDTO @@ -60,7 +60,7 @@ type metricsRes struct { } // swagger:model ProposalDTO -type proposal struct { +type proposalDTO struct { // per provider unique serial number of service description provided // example: 5 ID int `json:"id"` @@ -83,8 +83,8 @@ type proposal struct { AccessPolicies *[]market.AccessPolicy `json:"accessPolicies,omitempty"` } -func proposalToRes(p market.ServiceProposal) *proposal { - return &proposal{ +func proposalToRes(p market.ServiceProposal) *proposalDTO { + return &proposalDTO{ ID: p.ID, ProviderID: p.ProviderID, ServiceType: p.ServiceType, @@ -103,26 +103,21 @@ func proposalToRes(p market.ServiceProposal) *proposal { } } -// ProposalFinder defines interface to fetch currently active service proposals from discovery by given filter -type ProposalFinder interface { - FindProposals(filter discovery.ProposalFilter) ([]market.ServiceProposal, error) -} - // QualityFinder allows to fetch proposal quality data type QualityFinder interface { ProposalsMetrics() []quality.ConnectMetric } type proposalsEndpoint struct { - proposalProvider ProposalFinder - qualityProvider QualityFinder + proposalRepository proposal.Repository + qualityProvider QualityFinder } // NewProposalsEndpoint creates and returns proposal creation endpoint -func NewProposalsEndpoint(proposalProvider ProposalFinder, qualityProvider QualityFinder) *proposalsEndpoint { +func NewProposalsEndpoint(proposalRepository proposal.Repository, qualityProvider QualityFinder) *proposalsEndpoint { return &proposalsEndpoint{ - proposalProvider: proposalProvider, - qualityProvider: qualityProvider, + proposalRepository: proposalRepository, + qualityProvider: qualityProvider, } } @@ -163,18 +158,18 @@ func NewProposalsEndpoint(proposalProvider ProposalFinder, qualityProvider Quali func (pe *proposalsEndpoint) List(resp http.ResponseWriter, req *http.Request, params httprouter.Params) { fetchConnectCounts := req.URL.Query().Get("fetchConnectCounts") - proposals, err := pe.proposalProvider.FindProposals(&proposalsFilter{ - providerID: req.URL.Query().Get("providerId"), - serviceType: req.URL.Query().Get("serviceType"), - accessPolicyID: req.URL.Query().Get("accessPolicyId"), - accessPolicySource: req.URL.Query().Get("accessPolicySource"), + proposals, err := pe.proposalRepository.Proposals(&proposal.Filter{ + ProviderID: req.URL.Query().Get("providerId"), + ServiceType: req.URL.Query().Get("serviceType"), + AccessPolicyID: req.URL.Query().Get("accessPolicyId"), + AccessPolicySource: req.URL.Query().Get("accessPolicySource"), }) if err != nil { utils.SendError(resp, err, http.StatusInternalServerError) return } - proposalsRes := proposalsRes{Proposals: []*proposal{}} + proposalsRes := proposalsRes{Proposals: []*proposalDTO{}} for _, p := range proposals { proposalsRes.Proposals = append(proposalsRes.Proposals, proposalToRes(p)) } @@ -188,13 +183,13 @@ func (pe *proposalsEndpoint) List(resp http.ResponseWriter, req *http.Request, p } // AddRoutesForProposals attaches proposals endpoints to router -func AddRoutesForProposals(router *httprouter.Router, proposalProvider ProposalFinder, qualityProvider QualityFinder) { - pe := NewProposalsEndpoint(proposalProvider, qualityProvider) +func AddRoutesForProposals(router *httprouter.Router, proposalRepository proposal.Repository, qualityProvider QualityFinder) { + pe := NewProposalsEndpoint(proposalRepository, qualityProvider) router.GET("/proposals", pe.List) } // addProposalMetrics adds quality metrics to proposals. -func addProposalMetrics(proposals []*proposal, metrics []quality.ConnectMetric) { +func addProposalMetrics(proposals []*proposalDTO, metrics []quality.ConnectMetric) { // Convert metrics slice to map for fast lookup. metricsMap := map[string]quality.ConnectMetric{} for _, m := range metrics { diff --git a/tequilapi/endpoints/proposals_filter.go b/tequilapi/endpoints/proposals_filter.go index 977dbd9133..ef435667d7 100644 --- a/tequilapi/endpoints/proposals_filter.go +++ b/tequilapi/endpoints/proposals_filter.go @@ -16,50 +16,3 @@ */ package endpoints - -import ( - "github.com/mysteriumnetwork/node/core/discovery/reducer" - "github.com/mysteriumnetwork/node/market" - "github.com/mysteriumnetwork/node/market/mysterium" -) - -// proposalsFilter defines all flags for proposal filtering in discovery of Mysterium Network -type proposalsFilter struct { - providerID string - serviceType string - locationType string - accessPolicyID string - accessPolicySource string -} - -// Matches return flag if filter matches given proposal -func (filter *proposalsFilter) Matches(proposal market.ServiceProposal) bool { - conditions := make([]reducer.AndCondition, 0) - if filter.providerID != "" { - conditions = append(conditions, reducer.Equal(reducer.ProviderID, filter.providerID)) - } - if filter.serviceType != "" { - conditions = append(conditions, reducer.Equal(reducer.ServiceType, filter.serviceType)) - } - if filter.locationType != "" { - conditions = append(conditions, reducer.Equal(reducer.LocationType, filter.locationType)) - } - if filter.accessPolicyID != "" || filter.accessPolicySource != "" { - conditions = append(conditions, reducer.AccessPolicy(filter.accessPolicyID, filter.accessPolicySource)) - } - if len(conditions) > 0 { - return reducer.And(conditions...)(proposal) - } - - return reducer.All()(proposal) -} - -// ToAPIQuery serialises filter to query of Mysterium API -func (filter *proposalsFilter) ToAPIQuery() mysterium.ProposalsQuery { - return mysterium.ProposalsQuery{ - NodeKey: filter.providerID, - ServiceType: filter.serviceType, - AccessPolicyID: filter.accessPolicyID, - AccessPolicySource: filter.accessPolicySource, - } -} diff --git a/tequilapi/endpoints/proposals_test.go b/tequilapi/endpoints/proposals_test.go index bad7209185..0a688c0e36 100644 --- a/tequilapi/endpoints/proposals_test.go +++ b/tequilapi/endpoints/proposals_test.go @@ -22,7 +22,7 @@ import ( "net/http/httptest" "testing" - "github.com/mysteriumnetwork/node/core/discovery" + "github.com/mysteriumnetwork/node/core/discovery/proposal" "github.com/mysteriumnetwork/node/core/quality" "github.com/mysteriumnetwork/node/market" "github.com/stretchr/testify/assert" @@ -50,7 +50,7 @@ var serviceProposals = []market.ServiceProposal{ } func TestProposalsEndpointListByNodeId(t *testing.T) { - mockProposalProvider := &mockProposalProvider{ + repository := &mockProposalRepository{ //we assume that underling component does correct filtering proposals: []market.ServiceProposal{serviceProposals[0]}, } @@ -67,7 +67,7 @@ func TestProposalsEndpointListByNodeId(t *testing.T) { req.URL.RawQuery = query.Encode() resp := httptest.NewRecorder() - handlerFunc := NewProposalsEndpoint(mockProposalProvider, &mockQualityProvider{}).List + handlerFunc := NewProposalsEndpoint(repository, &mockQualityProvider{}).List handlerFunc(resp, req, nil) assert.JSONEq( @@ -90,11 +90,11 @@ func TestProposalsEndpointListByNodeId(t *testing.T) { }`, resp.Body.String(), ) - assert.Equal(t, &proposalsFilter{providerID: "0xProviderId"}, mockProposalProvider.recordedFilter) + assert.Equal(t, &proposal.Filter{ProviderID: "0xProviderId"}, repository.recordedFilter) } func TestProposalsEndpointAcceptsAccessPolicyParams(t *testing.T) { - mockProposalProvider := &mockProposalProvider{ + repository := &mockProposalRepository{ proposals: []market.ServiceProposal{serviceProposals[0]}, } @@ -111,7 +111,7 @@ func TestProposalsEndpointAcceptsAccessPolicyParams(t *testing.T) { req.URL.RawQuery = query.Encode() resp := httptest.NewRecorder() - handlerFunc := NewProposalsEndpoint(mockProposalProvider, &mockQualityProvider{}).List + handlerFunc := NewProposalsEndpoint(repository, &mockQualityProvider{}).List handlerFunc(resp, req, nil) assert.JSONEq( @@ -135,16 +135,16 @@ func TestProposalsEndpointAcceptsAccessPolicyParams(t *testing.T) { resp.Body.String(), ) assert.Equal(t, - &proposalsFilter{ - accessPolicyID: "accessPolicyId", - accessPolicySource: "accessPolicySource", + &proposal.Filter{ + AccessPolicyID: "accessPolicyId", + AccessPolicySource: "accessPolicySource", }, - mockProposalProvider.recordedFilter, + repository.recordedFilter, ) } func TestProposalsEndpointList(t *testing.T) { - proposalProvider := &mockProposalProvider{ + repository := &mockProposalRepository{ proposals: serviceProposals, } @@ -156,7 +156,7 @@ func TestProposalsEndpointList(t *testing.T) { assert.Nil(t, err) resp := httptest.NewRecorder() - handlerFunc := NewProposalsEndpoint(proposalProvider, &mockQualityProvider{}).List + handlerFunc := NewProposalsEndpoint(repository, &mockQualityProvider{}).List handlerFunc(resp, req, nil) assert.JSONEq( @@ -194,7 +194,7 @@ func TestProposalsEndpointList(t *testing.T) { } func TestProposalsEndpointListFetchConnectCounts(t *testing.T) { - proposalProvider := &mockProposalProvider{ + repository := &mockProposalRepository{ proposals: serviceProposals, } req, err := http.NewRequest( @@ -205,7 +205,7 @@ func TestProposalsEndpointListFetchConnectCounts(t *testing.T) { assert.Nil(t, err) resp := httptest.NewRecorder() - handlerFunc := NewProposalsEndpoint(proposalProvider, &mockQualityProvider{}).List + handlerFunc := NewProposalsEndpoint(repository, &mockQualityProvider{}).List handlerFunc(resp, req, nil) assert.JSONEq( @@ -268,21 +268,19 @@ func (m *mockQualityProvider) ProposalsMetrics() []quality.ConnectMetric { } } -type mockProposalProvider struct { - recordedFilter discovery.ProposalFilter +type mockProposalRepository struct { proposals []market.ServiceProposal + recordedFilter *proposal.Filter } -func (mpp *mockProposalProvider) GetProposal(id market.ProposalID) (*market.ServiceProposal, error) { - if len(mpp.proposals) == 0 { +func (m *mockProposalRepository) Proposal(id market.ProposalID) (*market.ServiceProposal, error) { + if len(m.proposals) == 0 { return nil, nil } - return &mpp.proposals[0], nil + return &m.proposals[0], nil } -func (mpp *mockProposalProvider) FindProposals(filter discovery.ProposalFilter) ([]market.ServiceProposal, error) { - mpp.recordedFilter = filter - return mpp.proposals, nil +func (m *mockProposalRepository) Proposals(filter *proposal.Filter) ([]market.ServiceProposal, error) { + m.recordedFilter = filter + return m.proposals, nil } - -var _ ProposalFinder = &mockProposalProvider{} diff --git a/tequilapi/endpoints/service.go b/tequilapi/endpoints/service.go index b4ea9479bc..68c9317d8a 100644 --- a/tequilapi/endpoints/service.go +++ b/tequilapi/endpoints/service.go @@ -83,7 +83,7 @@ type serviceInfo struct { // example: Running Status string `json:"status"` - Proposal proposal `json:"proposal"` + Proposal proposalDTO `json:"proposal"` AccessPolicies *[]market.AccessPolicy `json:"accessPolicies,omitempty"` }