Skip to content

Commit

Permalink
Collect some metrics from the gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj committed Nov 29, 2024
1 parent 687dd2a commit 99f2502
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
83 changes: 83 additions & 0 deletions tools/walletextension/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package metrics

import (
"sync"
"sync/atomic"
"time"
)

type MetricsTracker struct {
totalUsers atomic.Uint64
accountsRegistered atomic.Uint64
activeUsers map[string]time.Time
activeUserLock sync.RWMutex
}

func NewMetricsTracker() *MetricsTracker {
mt := &MetricsTracker{
activeUsers: make(map[string]time.Time),
}

// Start cleanup routine for inactive users
go mt.cleanupInactiveUsers()
return mt
}

// RecordNewUser increments the total user count
func (mt *MetricsTracker) RecordNewUser() {
mt.totalUsers.Add(1)
}

// RecordAccountRegistered increments the total number of registered accounts
func (mt *MetricsTracker) RecordAccountRegistered() {
mt.accountsRegistered.Add(1)
}

// RecordUserActivity updates the last activity timestamp for a user
func (mt *MetricsTracker) RecordUserActivity(anonymousID string) {
mt.activeUserLock.Lock()
defer mt.activeUserLock.Unlock()
mt.activeUsers[anonymousID] = time.Now()
}

// GetTotalUsers returns the total number of registered users
func (mt *MetricsTracker) GetTotalUsers() uint64 {
return mt.totalUsers.Load()
}

// GetTotalAccountsRegistered returns the total number of registered accounts
func (mt *MetricsTracker) GetTotalAccountsRegistered() uint64 {
return mt.accountsRegistered.Load()
}

// GetMonthlyActiveUsers returns the number of users active in the last 30 days
func (mt *MetricsTracker) GetMonthlyActiveUsers() int {
mt.activeUserLock.RLock()
defer mt.activeUserLock.RUnlock()

count := 0
thirtyDaysAgo := time.Now().AddDate(0, 0, -30)

for _, lastActive := range mt.activeUsers {
if lastActive.After(thirtyDaysAgo) {
count++
}
}
return count
}

// cleanupInactiveUsers removes users that haven't been active for more than 30 days
func (mt *MetricsTracker) cleanupInactiveUsers() {
ticker := time.NewTicker(24 * time.Hour)
for range ticker.C {
mt.activeUserLock.Lock()
thirtyDaysAgo := time.Now().AddDate(0, 0, -30)

for userID, lastActive := range mt.activeUsers {
if lastActive.Before(thirtyDaysAgo) {
delete(mt.activeUsers, userID)
}
}
mt.activeUserLock.Unlock()
}
}
2 changes: 2 additions & 0 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func ExecAuthRPC[R any](ctx context.Context, w *services.Services, cfg *AuthExec
return nil, err
}

w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(user.ID))

rateLimitAllowed, requestUUID := w.RateLimiter.Allow(gethcommon.Address(user.ID))
defer w.RateLimiter.SetRequestEnd(gethcommon.Address(user.ID), requestUUID)
if !rateLimitAllowed {
Expand Down
9 changes: 8 additions & 1 deletion tools/walletextension/services/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/status-im/keycard-go/hexutils"

"github.com/ten-protocol/go-ten/tools/walletextension/cache"
"github.com/ten-protocol/go-ten/tools/walletextension/metrics"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -48,6 +49,7 @@ type Services struct {
SKManager SKManager
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
MetricsTracker *metrics.MetricsTracker
}

type NewHeadNotifier interface {
Expand All @@ -57,7 +59,7 @@ type NewHeadNotifier interface {
// number of rpc responses to cache
const rpcResponseCacheSize = 1_000_000

func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserStorage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services {
func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserStorage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, metricsTracker *metrics.MetricsTracker, config *common.Config) *Services {
newGatewayCache, err := cache.NewCache(rpcResponseCacheSize, logger)
if err != nil {
logger.Error(fmt.Errorf("could not create cache. Cause: %w", err).Error())
Expand All @@ -78,6 +80,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto
SKManager: NewSKManager(storage, config, logger),
RateLimiter: rateLimiter,
Config: config,
MetricsTracker: metricsTracker,
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
Expand Down Expand Up @@ -154,6 +157,7 @@ func (w *Services) GenerateAndStoreNewUser() ([]byte, error) {
w.Logger().Error(fmt.Sprintf("failed to save user to the database: %s", err))
return nil, err
}
w.MetricsTracker.RecordNewUser()

requestEndTime := time.Now()
duration := requestEndTime.Sub(requestStartTime)
Expand All @@ -163,6 +167,7 @@ func (w *Services) GenerateAndStoreNewUser() ([]byte, error) {

// AddAddressToUser checks if a message is in correct format and if signature is valid. If all checks pass we save address and signature against userID
func (w *Services) AddAddressToUser(userID []byte, address string, signature []byte, signatureType viewingkey.SignatureType) error {
w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(userID))
audit(w, "Adding address to user: %s, address: %s", hexutils.BytesToHex(userID), address)
requestStartTime := time.Now()
addressFromMessage := gethcommon.HexToAddress(address)
Expand All @@ -182,13 +187,15 @@ func (w *Services) AddAddressToUser(userID []byte, address string, signature []b
w.Logger().Error(fmt.Errorf("error while storing account (%s) for user (%s): %w", addressFromMessage.Hex(), userID, err).Error())
return err
}
w.MetricsTracker.RecordAccountRegistered()

audit(w, "Storing new address for user: %s, address: %s, duration: %d ", hexutils.BytesToHex(userID), address, time.Since(requestStartTime).Milliseconds())
return nil
}

// UserHasAccount checks if provided account exist in the database for given userID
func (w *Services) UserHasAccount(userID []byte, address string) (bool, error) {
w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(userID))
audit(w, "Checking if user has account: %s, address: %s", hexutils.BytesToHex(userID), address)
addressBytes, err := hex.DecodeString(address[2:]) // remove 0x prefix from address
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion tools/walletextension/walletextension_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"time"

"github.com/ten-protocol/go-ten/tools/walletextension/metrics"
"github.com/ten-protocol/go-ten/tools/walletextension/services"

"github.com/ten-protocol/go-ten/go/common/subscription"
Expand Down Expand Up @@ -43,6 +44,8 @@ func NewContainerFromConfig(config wecommon.Config, logger gethlog.Logger) *Cont
os.Exit(1)
}

metricsTracker := metrics.NewMetricsTracker()

// start the database with the encryption key
userStorage, err := storage.New(config.DBType, config.DBConnectionURL, config.DBPathOverride, encryptionKey, logger)
if err != nil {
Expand All @@ -57,7 +60,7 @@ func NewContainerFromConfig(config wecommon.Config, logger gethlog.Logger) *Cont
}

stopControl := stopcontrol.New()
walletExt := services.NewServices(hostRPCBindAddrHTTP, hostRPCBindAddrWS, userStorage, stopControl, version, logger, &config)
walletExt := services.NewServices(hostRPCBindAddrHTTP, hostRPCBindAddrWS, userStorage, stopControl, version, logger, metricsTracker, &config)
cfg := &node.RPCConfig{
EnableHTTP: true,
HTTPPort: config.WalletExtensionPortHTTP,
Expand Down

0 comments on commit 99f2502

Please sign in to comment.