diff --git a/cmd/main.go b/cmd/main.go index 6dc7670..dbd7c3b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -26,6 +26,7 @@ import ( "github.com/mysteriumnetwork/discovery/location" "github.com/mysteriumnetwork/discovery/price" "github.com/mysteriumnetwork/discovery/price/pricing" + "github.com/mysteriumnetwork/discovery/price/pricingbyservice" "github.com/mysteriumnetwork/discovery/proposal" "github.com/mysteriumnetwork/discovery/quality" "github.com/mysteriumnetwork/discovery/quality/oracleapi" @@ -81,21 +82,36 @@ func main() { locationProvider := location.NewLocationProvider(cfg.LocationAddress.String(), cfg.LocationUser, cfg.LocationPass) v3 := r.Group("/api/v3") + v4 := r.Group("/api/v4") + proposal.NewAPI(proposalService, proposalRepo, locationProvider).RegisterRoutes(v3) + proposal.NewAPI(proposalService, proposalRepo, locationProvider).RegisterRoutes(v4) + health.NewAPI(rdb).RegisterRoutes(v3) + health.NewAPI(rdb).RegisterRoutes(v4) cfger := pricing.NewConfigProviderDB(rdb) _, err = cfger.Get() if err != nil { log.Fatal().Err(err).Msg("Failed to load cfg") } + cfgerByService := pricingbyservice.NewConfigProviderDB(rdb) + _, err = cfger.Get() + if err != nil { + log.Fatal().Err(err).Msg("Failed to load cfg by service") + } getter, err := pricing.NewPriceGetter(rdb) if err != nil { log.Fatal().Err(err).Msg("failed to initialize price getter") } + getterByService, err := pricingbyservice.NewPriceGetter(rdb) + if err != nil { + log.Fatal().Err(err).Msg("failed to initialize price getter by service") + } price.NewAPI(getter, cfger, cfg.UniverseJWTSecret).RegisterRoutes(v3) + price.NewAPIByService(getterByService, cfgerByService, cfg.UniverseJWTSecret).RegisterRoutes(v4) brokerListener := listener.New(cfg.BrokerURL.String(), proposalRepo) diff --git a/metrics/metrics.go b/metrics/metrics.go index 3f61f2f..bc2481c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -7,7 +7,7 @@ var CurrentPriceByCountry = prometheus.NewGaugeVec( Name: "discovery_current_price", Help: "Current pricing by country", }, - []string{"country_code", "node_type", "price_type"}, + []string{"country_code", "node_type", "service_type", "price_type"}, ) func InitialiseMonitoring() { diff --git a/price/api_by_service.go b/price/api_by_service.go new file mode 100644 index 0000000..2da1a52 --- /dev/null +++ b/price/api_by_service.go @@ -0,0 +1,83 @@ +package price + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/mysteriumnetwork/discovery/gorest" + "github.com/mysteriumnetwork/discovery/price/pricingbyservice" + "github.com/rs/zerolog/log" +) + +type APIByService struct { + pricer *pricingbyservice.PriceGetter + jwtSecret string + cfger pricingbyservice.ConfigProvider +} + +func NewAPIByService(pricer *pricingbyservice.PriceGetter, cfger pricingbyservice.ConfigProvider, jwtSecret string) *APIByService { + return &APIByService{ + pricer: pricer, + cfger: cfger, + jwtSecret: jwtSecret, + } +} + +// LatestPrices returns latest prices +// @Summary Latest Prices +// @Description Latest Prices +// @Product json +// @Success 200 {array} pricing.LatestPrices +// @Router /prices [get] +// @Tags prices +func (a *APIByService) LatestPrices(c *gin.Context) { + c.JSON(200, a.pricer.GetPrices()) +} + +// GetConfig returns the base pricing config +// @Summary Price config +// @Description price config +// @Product json +// @Success 200 {array} pricing.Config +// @Router /prices/config [get] +// @Tags prices +func (a *APIByService) GetConfig(c *gin.Context) { + cfg, err := a.cfger.Get() + if err != nil { + log.Err(err).Msg("Failed to get config") + c.JSON(http.StatusInternalServerError, gorest.NewErrResponse(err.Error())) + return + } + c.JSON(http.StatusOK, cfg) +} + +// UpdateConfig updates the pricing config +// @Summary update price config +// @Description update price config +// @Product json +// @Success 202 +// @Param config body pricing.Config true "config object" +// @Router /prices/config [post] +// @Tags prices +func (a *APIByService) UpdateConfig(c *gin.Context) { + var cfg pricingbyservice.Config + if err := c.BindJSON(&cfg); err != nil { + c.JSON(http.StatusBadRequest, gorest.NewErrResponse(err.Error())) + return + } + + err := a.cfger.Update(cfg) + if err != nil { + log.Err(err).Msg("Failed to update config") + c.JSON(http.StatusBadRequest, gorest.NewErrResponse(err.Error())) + return + } + + c.Data(http.StatusAccepted, gin.MIMEJSON, nil) +} + +func (a *APIByService) RegisterRoutes(r gin.IRoutes) { + r.GET("/prices/config", JWTAuthorized(a.jwtSecret), a.GetConfig) + r.POST("/prices/config", JWTAuthorized(a.jwtSecret), a.UpdateConfig) + r.GET("/prices", a.LatestPrices) +} diff --git a/price/pricing/price_updater.go b/price/pricing/price_updater.go index 3b178e4..a04c46e 100644 --- a/price/pricing/price_updater.go +++ b/price/pricing/price_updater.go @@ -30,7 +30,11 @@ type PriceUpdater struct { priceAPI FiatPriceAPI priceLifetime time.Duration mystBound Bound +<<<<<<< HEAD db redis.UniversalClient +======= + db *redis.Client +>>>>>>> add support for different pricing lock sync.Mutex lp LatestPrices @@ -130,10 +134,10 @@ func (p *PriceUpdater) submitMetrics() { } func (p *PriceUpdater) submitPriceMetric(country string, price *PriceByType) { - metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "per_gib").Set(price.Other.PricePerGiBHumanReadable) - metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "per_hour").Set(price.Other.PricePerHourHumanReadable) - metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "per_gib").Set(price.Residential.PricePerGiBHumanReadable) - metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "per_hour").Set(price.Residential.PricePerHourHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "wireguard", "per_gib").Set(price.Other.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "wireguard", "per_hour").Set(price.Other.PricePerHourHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "wireguard", "per_gib").Set(price.Residential.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "wireguard", "per_hour").Set(price.Residential.PricePerHourHumanReadable) } func (p *PriceUpdater) Stop() { diff --git a/price/pricingbyservice/config.go b/price/pricingbyservice/config.go new file mode 100644 index 0000000..b2f621c --- /dev/null +++ b/price/pricingbyservice/config.go @@ -0,0 +1,214 @@ +package pricingbyservice + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/mysteriumnetwork/discovery/price/pricing" + "github.com/rs/zerolog/log" +) + +const PricingConfigRedisKey = "DISCOVERY_PRICE_BASE_CONFIG_BY_SERVICE" + +type ConfigProvider interface { + Get() (Config, error) + Update(Config) error +} + +type ConfigProviderDB struct { + db *redis.Client + lock sync.Mutex +} + +func NewConfigProviderDB(redis *redis.Client) *ConfigProviderDB { + return &ConfigProviderDB{ + db: redis, + } +} + +func (cpd *ConfigProviderDB) Get() (Config, error) { + cfg, err := cpd.fetchConfig() + if err != nil { + log.Err(err).Msg("could not fetch config") + return Config{}, errors.New("internal error") + } + + return cfg, nil +} + +func (cpd *ConfigProviderDB) Update(in Config) error { + cpd.lock.Lock() + defer cpd.lock.Unlock() + + err := in.Validate() + if err != nil { + return err + } + + cfgJSON, err := json.Marshal(in) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + err = cpd.db.Set(ctx, PricingConfigRedisKey, string(cfgJSON), 0).Err() + if err != nil { + return err + } + + return nil +} + +func (cpd *ConfigProviderDB) fetchConfig() (Config, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + val, err := cpd.db.Get(ctx, PricingConfigRedisKey).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + err = cpd.db.Set(ctx, PricingConfigRedisKey, defaultPriceConfig, 0).Err() + if err != nil { + return Config{}, err + } + val = defaultPriceConfig + } else { + return Config{}, err + } + } + + res := Config{} + return res, json.Unmarshal([]byte(val), &res) +} + +type Config struct { + BasePrices PriceByTypeUSD `json:"base_prices"` + CountryModifiers map[pricing.ISO3166CountryCode]Modifier `json:"country_modifiers"` +} + +func (c Config) Validate() error { + err := c.BasePrices.Validate() + if err != nil { + return fmt.Errorf("base price invalid: %w", err) + } + + for k, v := range c.CountryModifiers { + err := k.Validate() + if err != nil { + return err + } + + err = v.Validate() + if err != nil { + return fmt.Errorf("country %v contains invalid pricing: %w", k, err) + } + } + + return nil +} + +type PriceByTypeUSD struct { + Residential *PriceByServiceTypeUSD `json:"residential"` + Other *PriceByServiceTypeUSD `json:"other"` +} + +func (p PriceByTypeUSD) Validate() error { + if p.Residential == nil || p.Other == nil { + return errors.New("residential and other pricing should not be nil") + } + + err := p.Residential.Validate() + if err != nil { + return err + } + + return p.Other.Validate() +} + +type PriceByServiceTypeUSD struct { + Wireguard *PriceUSD `json:"wireguard"` + Scraping *PriceUSD `json:"scraping"` + DataTransfer *PriceUSD `json:"data_transfer"` +} + +func (p PriceByServiceTypeUSD) Validate() error { + if p.Wireguard == nil || p.Scraping == nil || p.DataTransfer == nil { + return errors.New("wireguard, scraping and data_transfer pricing should not be nil") + } + + if err := p.Wireguard.Validate(); err != nil { + return err + } + if err := p.Scraping.Validate(); err != nil { + return err + } + return p.DataTransfer.Validate() +} + +type PriceUSD struct { + PricePerHour float64 `json:"price_per_hour_usd"` + PricePerGiB float64 `json:"price_per_gib_usd"` +} + +func (p PriceUSD) Validate() error { + if p.PricePerGiB < 0 || p.PricePerHour < 0 { + return errors.New("prices should be non negative") + } + + return nil +} + +type Modifier struct { + Residential float64 `json:"residential"` + Other float64 `json:"other"` +} + +func (m Modifier) Validate() error { + if m.Residential < 0 || m.Other < 0 { + return errors.New("modifiers should be non negative") + } + return nil +} + +var defaultPriceConfig = `{ + "base_prices": { + "residential": { + "wireguard": { + "price_per_hour_usd": 0.00036, + "price_per_gib_usd": 0.06 + }, + "scraping": { + "price_per_hour_usd": 0.00036, + "price_per_gib_usd": 0.06 + }, + "data_transfer": { + "price_per_hour_usd": 0.00036, + "price_per_gib_usd": 0.06 + } + }, + "other": { + "wireguard": { + "price_per_hour_usd": 0.00036, + "price_per_gib_usd": 0.06 + }, + "scraping": { + "price_per_hour_usd": 0.00036, + "price_per_gib_usd": 0.06 + }, + "data_transfer": { + "price_per_hour_usd": 0.00036, + "price_per_gib_usd": 0.06 + } + } + }, + "country_modifiers": { + "US": { + "residential": 1.5, + "other": 1.2 + } + } + }` diff --git a/price/pricingbyservice/price_getter.go b/price/pricingbyservice/price_getter.go new file mode 100644 index 0000000..3a53d6e --- /dev/null +++ b/price/pricingbyservice/price_getter.go @@ -0,0 +1,159 @@ +package pricingbyservice + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/rs/zerolog/log" +) + +type PriceGetter struct { + db *redis.Client + lp LatestPrices + mutex sync.Mutex +} + +func NewPriceGetter(db *redis.Client) (*PriceGetter, error) { + loaded, err := loadPricing(db) + if err != nil { + return nil, fmt.Errorf("could not laod initial price %w", err) + } + + return &PriceGetter{ + db: db, + lp: loaded, + }, nil +} + +func loadPricing(db *redis.Client) (LatestPrices, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + redisRes, err := db.Get(ctx, PriceRedisKey).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + log.Info().Msg("no pricing found in redis, will use defaults") + cp := defaultPrices + now := time.Now().UTC() + cp.CurrentValidUntil = now.Add(time.Second * 1) + cp.PreviousValidUntil = now.Add(time.Second * -1) + return cp, nil + } + return LatestPrices{}, err + } + + var res LatestPrices + return res, json.Unmarshal([]byte(redisRes), &res) +} + +func (pg *PriceGetter) GetPrices() LatestPrices { + pg.mutex.Lock() + defer pg.mutex.Unlock() + + if time.Now().UTC().After(pg.lp.CurrentValidUntil) { + loaded, err := loadPricing(pg.db) + if err != nil { + log.Err(err).Msg("could not load pricing from db") + return pg.lp.WithCurrentTime() + } + log.Info().Msg("pricing loaded from db") + pg.lp = loaded + } + + return pg.lp.WithCurrentTime() +} + +var defaultPrices = LatestPrices{ + Defaults: &PriceHistory{ + Current: &PriceByType{ + Residential: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + Scraping: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + DataTransfer: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + }, + Other: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + Scraping: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + DataTransfer: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + }, + }, + Previous: &PriceByType{ + Residential: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + Scraping: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + DataTransfer: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + }, + Other: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + Scraping: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + DataTransfer: &Price{ + PricePerHour: big.NewInt(900000000000000), + PricePerHourHumanReadable: 0.0009, + PricePerGiB: big.NewInt(150000000000000000), + PricePerGiBHumanReadable: 0.15, + }, + }, + }, + }, +} diff --git a/price/pricingbyservice/price_updater.go b/price/pricingbyservice/price_updater.go new file mode 100644 index 0000000..189a17e --- /dev/null +++ b/price/pricingbyservice/price_updater.go @@ -0,0 +1,371 @@ +package pricingbyservice + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "sync" + "time" + + "github.com/fln/pprotect" + "github.com/go-redis/redis/v8" + + "github.com/mysteriumnetwork/discovery/metrics" + "github.com/mysteriumnetwork/discovery/price/pricing" + "github.com/mysteriumnetwork/payments/crypto" + "github.com/rs/zerolog/log" +) + +const PriceRedisKey = "DISCOVERY_CURRENT_PRICE_BY_SERVICE" + +type Bound struct { + Min, Max float64 +} + +type FiatPriceAPI interface { + MystUSD() float64 +} + +type PriceUpdater struct { + priceAPI FiatPriceAPI + priceLifetime time.Duration + mystBound Bound + db *redis.Client + + lock sync.Mutex + lp LatestPrices + cfgProvider ConfigProvider + + stop chan struct{} + once sync.Once +} + +func NewPricer( + cfgProvider ConfigProvider, + priceAPI FiatPriceAPI, + priceLifetime time.Duration, + sensibleMystBound Bound, + db *redis.Client, +) (*PriceUpdater, error) { + pricer := &PriceUpdater{ + cfgProvider: cfgProvider, + priceAPI: priceAPI, + priceLifetime: priceLifetime, + mystBound: sensibleMystBound, + stop: make(chan struct{}), + db: db, + } + + go pricer.schedulePriceUpdate(priceLifetime) + if err := pricer.threadSafePriceUpdate(); err != nil { + return nil, err + } + return pricer, nil +} + +func (p *PriceUpdater) schedulePriceUpdate(priceLifetime time.Duration) { + log.Info().Msg("price update be service started") + for { + select { + case <-p.stop: + log.Info().Msg("price update by service stopped") + return + case <-time.After(priceLifetime): + pprotect.CallLoop(func() { + err := p.threadSafePriceUpdate() + if err != nil { + log.Err(err).Msg("failed to update prices by service") + } + }, time.Second, func(val interface{}, stack []byte) { + log.Warn().Msg("panic on scheduled price update by service: " + fmt.Sprint(val)) + }) + } + } +} + +func (p *PriceUpdater) threadSafePriceUpdate() error { + p.lock.Lock() + defer p.lock.Unlock() + + return p.updatePrices() +} + +func (p *PriceUpdater) updatePrices() error { + mystUSD, err := p.fetchMystPrice() + if err != nil { + return err + } + + cfg, err := p.cfgProvider.Get() + if err != nil { + return err + } + + p.lp = p.generateNewLatestPrice(mystUSD, cfg) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + marshalled, err := json.Marshal(p.lp) + if err != nil { + return err + } + + err = p.db.Set(ctx, PriceRedisKey, string(marshalled), 0).Err() + if err != nil { + return err + } + + p.submitMetrics() + + log.Info().Msgf("price update complete by service") + return nil +} + +func (p *PriceUpdater) submitMetrics() { + p.submitPriceMetric("DEFAULTS", p.lp.Defaults.Current) + + for k, v := range p.lp.PerCountry { + p.submitPriceMetric(k, v.Current) + } +} + +func (p *PriceUpdater) submitPriceMetric(country string, price *PriceByType) { + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "wireguard", "per_gib").Set(price.Other.Wireguard.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "wireguard", "per_hour").Set(price.Other.Wireguard.PricePerHourHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "wireguard", "per_gib").Set(price.Residential.Wireguard.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "wireguard", "per_hour").Set(price.Residential.Wireguard.PricePerHourHumanReadable) + + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "scraping", "per_gib").Set(price.Other.Scraping.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "scraping", "per_hour").Set(price.Other.Scraping.PricePerHourHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "scraping", "per_gib").Set(price.Residential.Scraping.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "scraping", "per_hour").Set(price.Residential.Scraping.PricePerHourHumanReadable) + + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "data_transfer", "per_gib").Set(price.Other.DataTransfer.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "other", "data_transfer", "per_hour").Set(price.Other.DataTransfer.PricePerHourHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "data_transfer", "per_gib").Set(price.Residential.DataTransfer.PricePerGiBHumanReadable) + metrics.CurrentPriceByCountry.WithLabelValues(country, "residential", "data_transfer", "per_hour").Set(price.Residential.DataTransfer.PricePerHourHumanReadable) +} + +func (p *PriceUpdater) Stop() { + p.once.Do(func() { close(p.stop) }) +} + +func (p *PriceUpdater) generateNewLatestPrice(mystUSD float64, cfg Config) LatestPrices { + tm := time.Now().UTC() + + newLP := LatestPrices{ + Defaults: p.generateNewDefaults(mystUSD, cfg), + PerCountry: p.generateNewPerCountry(mystUSD, cfg), + CurrentValidUntil: tm.Add(p.priceLifetime), + } + + if !p.lp.isInitialized() { + newLP.Defaults.Previous = newLP.Defaults.Current + newLP.PreviousValidUntil = tm.Add(-p.priceLifetime) + } else { + newLP.Defaults.Previous = p.lp.Defaults.Current + newLP.PreviousValidUntil = p.lp.CurrentValidUntil + } + return newLP +} + +func (p *PriceUpdater) generateNewDefaults(mystUSD float64, cfg Config) *PriceHistory { + ph := &PriceHistory{ + Current: &PriceByType{ + Residential: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerHour, 1), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerHour, 1), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerGiB, 1), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerGiB, 1), + }, + Scraping: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerHour, 1), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerHour, 1), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerGiB, 1), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerGiB, 1), + }, + DataTransfer: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerHour, 1), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerHour, 1), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerGiB, 1), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerGiB, 1), + }, + }, + Other: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerHour, 1), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerHour, 1), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerGiB, 1), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerGiB, 1), + }, + Scraping: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Scraping.PricePerHour, 1), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Scraping.PricePerHour, 1), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Scraping.PricePerGiB, 1), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Scraping.PricePerGiB, 1), + }, + DataTransfer: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerHour, 1), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerHour, 1), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerGiB, 1), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerGiB, 1), + }, + }, + }, + } + if !p.lp.isInitialized() { + ph.Previous = ph.Current + } else { + ph.Previous = p.lp.Defaults.Current + } + return ph +} + +func (p *PriceUpdater) generateNewPerCountry(mystUSD float64, cfg Config) map[string]*PriceHistory { + countries := make(map[string]*PriceHistory) + for countryCode := range pricing.CountryCodeToName { + mod, ok := cfg.CountryModifiers[pricing.ISO3166CountryCode(countryCode)] + if !ok { + mod = Modifier{ + Residential: 1, + Other: 1, + } + } + + ph := &PriceHistory{ + Current: &PriceByType{ + Residential: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerHour, mod.Residential), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerHour, mod.Residential), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerGiB, mod.Residential), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Wireguard.PricePerGiB, mod.Residential), + }, + Scraping: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerHour, mod.Residential), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerHour, mod.Residential), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerGiB, mod.Residential), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.Scraping.PricePerGiB, mod.Residential), + }, + DataTransfer: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerHour, mod.Residential), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerHour, mod.Residential), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerGiB, mod.Residential), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Residential.DataTransfer.PricePerGiB, mod.Residential), + }, + }, + Other: &PriceByServiceType{ + Wireguard: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerHour, mod.Other), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerHour, mod.Other), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerGiB, mod.Other), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Wireguard.PricePerGiB, mod.Other), + }, + Scraping: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Scraping.PricePerHour, mod.Other), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Scraping.PricePerHour, mod.Other), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.Scraping.PricePerGiB, mod.Other), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.Scraping.PricePerGiB, mod.Other), + }, + DataTransfer: &Price{ + PricePerHour: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerHour, mod.Other), + PricePerHourHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerHour, mod.Other), + PricePerGiB: calculatePriceMYST(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerGiB, mod.Other), + PricePerGiBHumanReadable: calculatePriceMystFloat(mystUSD, cfg.BasePrices.Other.DataTransfer.PricePerGiB, mod.Other), + }, + }, + }, + } + + // if current exists in previous lp, take it, otherwise set it to current + if p.lp.isInitialized() { + older, ok := p.lp.PerCountry[string(countryCode)] + if ok { + ph.Previous = older.Current + } else { + ph.Previous = ph.Current + } + } else { + ph.Previous = ph.Current + } + + countries[string(countryCode)] = ph + } + return countries +} + +// Take note that this is not 100% correct as we're rounding a bit due to accuracy issues with floats. +// This, however, is not important here as the accuracy will be more than good enough to a few zeroes after the dot. +func calculatePriceMYST(mystUSD, priceUSD, multiplier float64) *big.Int { + return crypto.FloatToBigMyst((priceUSD / mystUSD) * multiplier) +} + +func calculatePriceMystFloat(mystUSD, priceUSD, multiplier float64) float64 { + return crypto.BigMystToFloat(calculatePriceMYST(mystUSD, priceUSD, multiplier)) +} + +func (p *PriceUpdater) fetchMystPrice() (float64, error) { + mystUSD := p.priceAPI.MystUSD() + if err := p.withinBounds(mystUSD); err != nil { + return 0, err + } + + return mystUSD, nil +} + +// withinBounds used to filter out any possible nonsense that the external pricing services might return. +func (p *PriceUpdater) withinBounds(price float64) error { + if price > p.mystBound.Max || price < p.mystBound.Min { + return fmt.Errorf("myst exceeds sensible bounds: %.6f < %.6f(current price) < %.6f", p.mystBound.Min, price, p.mystBound.Max) + } + return nil +} + +// LatestPrices holds two sets of prices. The Previous should be used in case +// a race condition between obtaining prices by Consumer and Provider +// upon agreement +type LatestPrices struct { + Defaults *PriceHistory `json:"defaults"` + PerCountry map[string]*PriceHistory `json:"per_country"` + CurrentValidUntil time.Time `json:"current_valid_until"` + PreviousValidUntil time.Time `json:"previous_valid_until"` + CurrentServerTime time.Time `json:"current_server_time"` +} + +func (lp LatestPrices) WithCurrentTime() LatestPrices { + lp.CurrentServerTime = time.Now().UTC() + return lp +} + +func (lp *LatestPrices) isInitialized() bool { + return lp.Defaults != nil +} + +func (lp *LatestPrices) isValid() bool { + return lp.CurrentValidUntil.UTC().After(time.Now().UTC()) +} + +type PriceHistory struct { + Current *PriceByType `json:"current"` + Previous *PriceByType `json:"previous"` +} + +type PriceByType struct { + Residential *PriceByServiceType `json:"residential"` + Other *PriceByServiceType `json:"other"` +} + +type PriceByServiceType struct { + Wireguard *Price `json:"wireguard"` + Scraping *Price `json:"scraping"` + DataTransfer *Price `json:"data_transfer"` +} + +type Price struct { + PricePerHour *big.Int `json:"price_per_hour" swaggertype:"integer"` + PricePerHourHumanReadable float64 `json:"price_per_hour_human_readable" swaggertype:"number"` + PricePerGiB *big.Int `json:"price_per_gib" swaggertype:"integer"` + PricePerGiBHumanReadable float64 `json:"price_per_gib_human_readable" swaggertype:"number"` +} diff --git a/sidecar/cmd/main.go b/sidecar/cmd/main.go index 0352f4e..b1d0fb3 100644 --- a/sidecar/cmd/main.go +++ b/sidecar/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/mysteriumnetwork/discovery/config" "github.com/mysteriumnetwork/discovery/metrics" "github.com/mysteriumnetwork/discovery/price/pricing" + "github.com/mysteriumnetwork/discovery/price/pricingbyservice" mlog "github.com/mysteriumnetwork/logger" payprice "github.com/mysteriumnetwork/payments/fees/price" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -63,6 +64,7 @@ func main() { log.Info().Msg("cfger started") metrics.InitialiseMonitoring() + pricer, err := pricing.NewPricer( cfger, mrkt, @@ -77,6 +79,27 @@ func main() { log.Info().Msg("pricer started") defer pricer.Stop() + cfgerByService := pricingbyservice.NewConfigProviderDB(rdb) + _, err = cfger.Get() + if err != nil { + log.Fatal().Err(err).Msg("Failed to load cfg") + } + log.Info().Msg("cfger by service started") + + pricerByService, err := pricingbyservice.NewPricer( + cfgerByService, + mrkt, + time.Minute*5, + pricingbyservice.Bound{Min: 0.1, Max: 3.0}, + rdb, + ) + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize Pricer by service") + return + } + log.Info().Msg("pricer by service started") + defer pricerByService.Stop() + router := gin.New() router.Use(gin.Recovery()) router.GET("/metrics", gin.WrapH(promhttp.Handler()))