diff --git a/.gitignore b/.gitignore index f5e625c..c07ba7b 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ config.yaml .idea dist/ +tempcache \ No newline at end of file diff --git a/cache/cache.go b/cache/cache.go index e0f92c0..d2782ef 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -13,6 +13,7 @@ type Cache interface { Get(key string) ([]byte, bool) // Get retrieves an item from the cache if it exists and hasn't expired. Set(key string, content string, duration int) error // Set adds an item to the cache with a specified duration until expiration. DeleteWithPrefix(prefix string) error + Name() string } type keyType string diff --git a/cache/in_memory.go b/cache/in_memory.go index 0dd764a..834db39 100644 --- a/cache/in_memory.go +++ b/cache/in_memory.go @@ -1,6 +1,7 @@ package cache import ( + "fmt" "sync" "time" ) @@ -25,6 +26,7 @@ func (c *MemoryCache) Get(key string) ([]byte, bool) { item, found := c.items[key] + fmt.Printf("item: %v\n", item) // If the item is not found or has expired, return a cache miss. // The special case of time.Unix(1<<63-1, 0) is used to indicate that an item never expires- and // time.Before will always return true for this case. @@ -81,3 +83,7 @@ func (c *MemoryCache) DeleteWithPrefix(prefix string) error { return nil } + +func (c *MemoryCache) Name() string { + return "Memory" +} diff --git a/config.example.yaml b/config.example.yaml index 893577b..f9fcf47 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -18,6 +18,9 @@ cache: # address: "localhost:6379" # password: admin # database: 0 +filesystem: + enabled: true + directory: ./tempcache/ supergraphs: - graphRef: "${APOLLO_GRAPH_REF}" apolloKey: "${APOLLO_KEY}" diff --git a/config/config.go b/config/config.go index 2f4d01d..7b4dfd1 100644 --- a/config/config.go +++ b/config/config.go @@ -15,14 +15,15 @@ import ( // Config represents the application's configuration structure, // housing Relay, Uplink, and Cache configurations. type Config struct { - Relay RelayConfig `yaml:"relay"` // RelayConfig for incoming connections. - Uplink UplinkConfig `yaml:"uplink"` // UplinkConfig for managing uplink configuration. - Cache CacheConfig `yaml:"cache"` // CacheConfig for cache settings. - Redis RedisConfig `yaml:"redis"` // RedisConfig for using redis as cache. - Supergraphs []SupergraphConfig `yaml:"supergraphs"` // SupergraphConfig for supergraph settings. - Webhook WebhookConfig `yaml:"webhook"` // WebhookConfig for webhook handling. - Polling PollingConfig `yaml:"polling"` // PollingConfig for polling settings. - ManagementAPI ManagementAPIConfig `yaml:"managementAPI"` // ManagementAPIConfig for management API settings. + Relay RelayConfig `yaml:"relay"` // RelayConfig for incoming connections. + Uplink UplinkConfig `yaml:"uplink"` // UplinkConfig for managing uplink configuration. + Cache CacheConfig `yaml:"cache"` // CacheConfig for cache settings. + Redis RedisConfig `yaml:"redis"` // RedisConfig for using redis as cache. + FilesystemCache FilesystemCacheConfig `yaml:"filesystem"` // FilesystemCacheConfig for using filesystem as cache. + Supergraphs []SupergraphConfig `yaml:"supergraphs"` // SupergraphConfig for supergraph settings. + Webhook WebhookConfig `yaml:"webhook"` // WebhookConfig for webhook handling. + Polling PollingConfig `yaml:"polling"` // PollingConfig for polling settings. + ManagementAPI ManagementAPIConfig `yaml:"managementAPI"` // ManagementAPIConfig for management API settings. } // RelayConfig defines the address the proxy server listens on. @@ -61,6 +62,12 @@ type RedisConfig struct { Database int `yaml:"database"` // Database to use in the Redis server. } +// FilesystemCacheConfig defines the configuration for connecting to a Redis cache. +type FilesystemCacheConfig struct { + Enabled bool `yaml:"enabled"` // Whether Redis caching is enabled. + Directory string `yaml:"directory"` // Path to the filesystem cache. +} + // WebhookConfig defines the configuration for webhook handling. type WebhookConfig struct { Enabled bool `yaml:"enabled"` // Whether webhook handling is enabled. diff --git a/filesystem_cache/filesystem.go b/filesystem_cache/filesystem.go new file mode 100644 index 0000000..5742789 --- /dev/null +++ b/filesystem_cache/filesystem.go @@ -0,0 +1,82 @@ +package filesystem_cache + +import ( + "fmt" + "os" + "path" +) + +const PERMISSIONS = 0644 + +type FilesystemCache struct { + path string +} + +func NewFilesystemCache(path string) (*FilesystemCache, error) { + f, err := os.Stat(path) + if os.IsNotExist(err) { + // if the path does not exist, we can create it + err := os.MkdirAll(path, 0755) + if err != nil { + return nil, fmt.Errorf("failed to create directory %s: %v", path, err) + } + } else if !f.Mode().IsDir() { + return nil, fmt.Errorf("path %s is not a directory", path) + } + return &FilesystemCache{path}, nil +} + +func (c *FilesystemCache) Get(key string) ([]byte, bool) { + // Read the content of the file with the given key + // If the file does not exist, return false + // If the file exists, return the content as a byte slice + content, err := os.ReadFile(fmt.Sprintf("%v/%v", c.path, key)) + if err != nil { + return nil, false + } + return content, true +} + +func (c *FilesystemCache) Set(key string, content string, duration int) error { + // Write the content to a file with the given key + // duration is not used in this implementation as pruning is not implemented + cachePath := fmt.Sprintf("%v/%v", c.path, key) + if _, err := os.Stat(cachePath); os.IsNotExist(err) { + dir := path.Dir(cachePath) + err := os.MkdirAll(dir, 0755) + if err != nil { + return fmt.Errorf("failed to create directory %s: %v", dir, err) + } + } + return os.WriteFile(cachePath, []byte(content), PERMISSIONS) +} + +func (c *FilesystemCache) DeleteWithPrefix(prefix string) error { + // Delete all files with the given prefix from the cache. + // We can use the filepath.Glob function to get all files with the given prefix + files, err := os.ReadDir(c.path) + if err != nil { + return fmt.Errorf("failed to read directory %s: %v", c.path, err) + } + for _, file := range files { + if file.IsDir() { + continue + } + if !file.Type().IsRegular() { + continue + } + + if file.Name()[:len(prefix)] == prefix { + err := os.Remove(fmt.Sprintf("%v/%v", c.path, file.Name())) + if err != nil { + return fmt.Errorf("failed to delete file %s: %v", file.Name(), err) + } + } + } + + return nil +} + +func (c *FilesystemCache) Name() string { + return "Filesystem" +} diff --git a/filesystem_cache/filesystem_test.go b/filesystem_cache/filesystem_test.go new file mode 100644 index 0000000..809ecb7 --- /dev/null +++ b/filesystem_cache/filesystem_test.go @@ -0,0 +1,156 @@ +package filesystem_cache + +import ( + "os" + "path/filepath" + "testing" +) + +func TestNewFilesystemCache(t *testing.T) { + cachePath, _ := os.MkdirTemp("", "filesystem_cache_test") + defer os.RemoveAll(cachePath) + cache, err := NewFilesystemCache(cachePath) + if err != nil { + t.Errorf("Failed to create filesystem cache: %v", err) + } + + // Verify that the cache path is set correctly + if cache.path != cachePath { + t.Errorf("Expected cache path %s, got %s", cachePath, cache.path) + } + + // Verify that the cache directory is created + _, err = os.Stat(cachePath) + if os.IsNotExist(err) { + t.Errorf("Cache directory %s does not exist", cachePath) + } +} + +func TestFilesystemCache_Get(t *testing.T) { + cachePath, _ := os.MkdirTemp("", "filesystem_cache_test") + defer os.RemoveAll(cachePath) + cache, _ := NewFilesystemCache(cachePath) + + // Create a test file + testKey := "test_key" + testContent := "test_content" + testFilePath := filepath.Join(cachePath, testKey) + err := os.WriteFile(testFilePath, []byte(testContent), PERMISSIONS) + if err != nil { + t.Errorf("Failed to create test file: %v", err) + } + + // Retrieve the content from the cache + content, ok := cache.Get(testKey) + if !ok { + t.Errorf("Expected cache hit for key %s, got cache miss", testKey) + } + + // Verify that the retrieved content matches the test content + if string(content) != testContent { + t.Errorf("Expected content %s, got %s", testContent, string(content)) + } + + // Clean up the test file + err = os.Remove(testFilePath) + if err != nil { + t.Errorf("Failed to remove test file: %v", err) + } + + // Test that it'll create subdirectories if needed + nestedDir := filepath.Join(cachePath, "nested") + _, err = NewFilesystemCache(nestedDir) + if err != nil { + t.Errorf("Failed to create nested cache: %v", err) + } + if f, err := os.Stat(nestedDir); os.IsNotExist(err) || !f.IsDir() { + t.Errorf("Expected nested cache directory to be created") + } +} + +func TestFilesystemCache_Set(t *testing.T) { + cachePath, _ := os.MkdirTemp("", "filesystem_cache_test") + defer os.RemoveAll(cachePath) + cache, _ := NewFilesystemCache(cachePath) + + // Set a test key-value pair in the cache + testKey := "test_key" + testContent := "test_content" + err := cache.Set(testKey, testContent, 0) + if err != nil { + t.Errorf("Failed to set key-value pair in cache: %v", err) + } + + // Verify that the test file is created in the cache directory + testFilePath := filepath.Join(cachePath, testKey) + _, err = os.Stat(testFilePath) + if os.IsNotExist(err) { + t.Errorf("Test file %s does not exist in cache directory", testFilePath) + } + + // Clean up the test file + err = os.Remove(testFilePath) + if err != nil { + t.Errorf("Failed to remove test file: %v", err) + } +} + +func TestFilesystemCache_DeleteWithPrefix(t *testing.T) { + cachePath, _ := os.MkdirTemp("", "filesystem_cache_test") + defer os.RemoveAll(cachePath) + cache, _ := NewFilesystemCache(cachePath) + + // Create test files with different prefixes + testPrefix1 := "prefix1" + testPrefix2 := "prefix2" + testKey1 := testPrefix1 + "_key" + testKey2 := testPrefix2 + "_key" + testContent := "test_content" + testFilePath1 := filepath.Join(cachePath, testKey1) + testFilePath2 := filepath.Join(cachePath, testKey2) + err := os.WriteFile(testFilePath1, []byte(testContent), PERMISSIONS) + if err != nil { + t.Errorf("Failed to create test file: %v", err) + } + err = os.WriteFile(testFilePath2, []byte(testContent), PERMISSIONS) + if err != nil { + t.Errorf("Failed to create test file: %v", err) + } + + // Delete files with prefix "prefix1" + err = cache.DeleteWithPrefix(testPrefix1) + if err != nil { + t.Errorf("Failed to delete files with prefix %s: %v", testPrefix1, err) + } + + // Verify that the file with prefix "prefix1" is deleted + _, err = os.Stat(testFilePath1) + if !os.IsNotExist(err) { + t.Errorf("Expected file %s to be deleted, but it still exists", testFilePath1) + } + + // Verify that the file with prefix "prefix2" still exists + _, err = os.Stat(testFilePath2) + if os.IsNotExist(err) { + t.Errorf("Expected file %s to exist, but it does not", testFilePath2) + } + + // Clean up the remaining test file + err = os.Remove(testFilePath2) + if err != nil { + t.Errorf("Failed to remove test file: %v", err) + } +} + +func TestFilesystemCache_Name(t *testing.T) { + cachePath, _ := os.MkdirTemp("", "filesystem_cache_test") + defer os.RemoveAll(cachePath) + cache, _ := NewFilesystemCache(cachePath) + + // Verify that the cache name is returned correctly + expectedName := "Filesystem" + name := cache.Name() + if name != expectedName { + t.Errorf("Expected cache name %s, got %s", expectedName, name) + } +} diff --git a/main.go b/main.go index 64ff4a7..aee7ae7 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "apollosolutions/uplink-relay/cache" "apollosolutions/uplink-relay/config" + "apollosolutions/uplink-relay/filesystem_cache" "apollosolutions/uplink-relay/graph" "apollosolutions/uplink-relay/logger" persistedqueries "apollosolutions/uplink-relay/persisted_queries" @@ -23,6 +24,7 @@ import ( "apollosolutions/uplink-relay/polling" "apollosolutions/uplink-relay/proxy" apolloredis "apollosolutions/uplink-relay/redis" + "apollosolutions/uplink-relay/tiered_cache" "apollosolutions/uplink-relay/uplink" "apollosolutions/uplink-relay/webhooks" @@ -65,7 +67,23 @@ func main() { } // Initialize caching based on the configuration. + var uplinkCaches = make([]cache.Cache, 0) + var uplinkCache cache.Cache + // Initialize the cache based on the configuration. + // We want to use the first cache that is enabled, which should be the in-memory cache + if mergedConfig.Cache.Enabled { + uplinkCaches = append(uplinkCaches, cache.NewMemoryCache(mergedConfig.Cache.MaxSize)) + } + if mergedConfig.FilesystemCache.Enabled { + logger.Info("Using filesystem cache", "directory", mergedConfig.FilesystemCache.Directory) + filesystemCache, err := filesystem_cache.NewFilesystemCache(mergedConfig.FilesystemCache.Directory) + if err != nil { + logger.Error("Failed to create filesystem cache", "err", err) + os.Exit(1) + } + uplinkCaches = append(uplinkCaches, filesystemCache) + } if mergedConfig.Redis.Enabled { logger.Info("Using Redis cache", "address", mergedConfig.Redis.Address) redisClient := redis.NewClient(&redis.Options{ @@ -74,11 +92,23 @@ func main() { DB: mergedConfig.Redis.Database, }) redisClient.Ping() - uplinkCache = apolloredis.NewRedisCache(redisClient) - } else { - uplinkCache = cache.NewMemoryCache(mergedConfig.Cache.MaxSize) + uplinkCaches = append(uplinkCaches, apolloredis.NewRedisCache(redisClient)) } + if len(uplinkCaches) == 0 { + logger.Error("No cache configured") + os.Exit(1) + } else if len(uplinkCaches) == 1 { + logger.Debug("Using single cache") + uplinkCache = uplinkCaches[0] + } else { + logger.Debug("Using tiered cache") + uplinkCache, err = tiered_cache.NewTieredCache(uplinkCaches, logger, mergedConfig.Cache.Duration) + if err != nil { + logger.Error("Failed to create tiered cache", "err", err) + os.Exit(1) + } + } // Create a channel to stop polling on SIGHUP to avoid duplicate polling. stopPolling := make(chan bool, 1) diff --git a/redis/redis.go b/redis/redis.go index 34c02ae..9c8a0e3 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -56,3 +56,7 @@ func (c *RedisCache) DeleteWithPrefix(prefix string) error { } return nil } + +func (c *RedisCache) Name() string { + return "Redis" +} diff --git a/tiered_cache/tiered.go b/tiered_cache/tiered.go new file mode 100644 index 0000000..260ce78 --- /dev/null +++ b/tiered_cache/tiered.go @@ -0,0 +1,77 @@ +package tiered_cache + +import ( + "apollosolutions/uplink-relay/cache" + "log/slog" +) + +const PERMISSIONS = 0644 + +type TieredCache struct { + caches []cache.Cache + logger *slog.Logger + duration int +} + +func NewTieredCache(caches []cache.Cache, logger *slog.Logger, duration int) (*TieredCache, error) { + return &TieredCache{caches, logger, duration}, nil +} + +func (c *TieredCache) Get(key string) ([]byte, bool) { + /// Attempt to get the content from each cache in the order they were provided + /// If the content is found in any cache, return it + /// If the content is not found in any cache, return false + missedCaches := []cache.Cache{} + var updateContent []byte + for index, cache := range c.caches { + content, ok := cache.Get(key) + c.logger.Debug("Got content from cache", "content", content, "ok", ok, "cache", cache.Name()) + if ok { + if index > 0 { + updateContent = content + } + return content, true + } else { + missedCaches = append(missedCaches, cache) + } + } + if len(missedCaches) > 0 && len(updateContent) > 0 { + go func() { + for _, cache := range missedCaches { + c.logger.Debug("Setting content into missed cache", "cache", cache, "cache", cache.Name()) + err := cache.Set(key, string(updateContent), c.duration) + if err != nil { + c.logger.Error("Failed to set content in cache", "err", err, "cache", cache.Name()) + } + } + }() + } + return nil, false +} + +func (c *TieredCache) Set(key string, content string, duration int) error { + /// Set the content in each cache in the order they were provided + /// If an error occurs while setting the content in any cache, return the error after trying each cache + /// This ensures that the content is set in all caches if possible instead of stopping at the first error + var err error + for _, cache := range c.caches { + err = cache.Set(key, content, duration) + if err != nil { + c.logger.Error("Failed to set content in cache", "err", err, "cache", cache.Name()) + } + } + return err +} + +func (c *TieredCache) DeleteWithPrefix(prefix string) error { + var err error + for _, cache := range c.caches { + err = cache.DeleteWithPrefix(prefix) + c.logger.Error("Failed to delete content from cache", "err", err, "cache", cache.Name()) + } + return err +} + +func (c *TieredCache) Name() string { + return "Tiered" +} diff --git a/tiered_cache/tiered_test.go b/tiered_cache/tiered_test.go new file mode 100644 index 0000000..96cd7a8 --- /dev/null +++ b/tiered_cache/tiered_test.go @@ -0,0 +1,146 @@ +package tiered_cache + +import ( + "apollosolutions/uplink-relay/cache" + "apollosolutions/uplink-relay/logger" + apolloredis "apollosolutions/uplink-relay/redis" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redis" +) + +func TestNewTieredCache(t *testing.T) { + // Create a mock logger + logger := logger.MakeLogger(nil) + + // Create a test Redis server + server := miniredis.RunT(t) + // Create a Redis client for testing + client := redis.NewClient(&redis.Options{ + Addr: server.Addr(), + }) + + // Create mock caches + cache1 := cache.NewMemoryCache(100) + cache2 := apolloredis.NewRedisCache(client) + + // Create a new TieredCache + tc, err := NewTieredCache([]cache.Cache{cache1, cache2}, logger, 60) + if err != nil { + t.Errorf("Failed to create TieredCache: %v", err) + } + + // Verify that the caches are set correctly + if len(tc.caches) != 2 { + t.Errorf("Expected 2 caches, got %d", len(tc.caches)) + } + if tc.caches[0] != cache1 { + t.Errorf("Expected cache1, got %v", tc.caches[0]) + } + if tc.caches[1] != cache2 { + t.Errorf("Expected cache2, got %v", tc.caches[1]) + } + + // Verify that the logger is set correctly + if tc.logger != logger { + t.Errorf("Expected logger, got %v", tc.logger) + } + + // Verify that the duration is set correctly + if tc.duration != 60 { + t.Errorf("Expected duration 60, got %d", tc.duration) + } +} + +func TestTieredCache_Get(t *testing.T) { + // Create a mock logger + logger := logger.MakeLogger(nil) + + // Create a mock cache + cache1 := cache.NewMemoryCache(100) + + // Create a new TieredCache + tc, _ := NewTieredCache([]cache.Cache{cache1}, logger, 60) + + // Set a value in the cache + cache1.Set("key", "value", 60) + + // Retrieve the value from the TieredCache + content, found := tc.Get("key") + + // Verify that the value is retrieved correctly + if !found { + t.Errorf("Expected value to be found") + } + if string(content) != "value" { + t.Errorf("Expected value 'value', got '%s'", string(content)) + } +} + +func TestTieredCache_Set(t *testing.T) { + // Create a mock logger + logger := logger.MakeLogger(nil) + + // Create a mock cache + cache1 := cache.NewMemoryCache(100) + + // Create a new TieredCache + tc, _ := NewTieredCache([]cache.Cache{cache1}, logger, 60) + + // Set a value in the TieredCache + err := tc.Set("key", "value", 60) + + // Verify that the value is set correctly + if err != nil { + t.Errorf("Failed to set value: %v", err) + } +} + +func TestTieredCache_DeleteWithPrefix(t *testing.T) { + // Create a mock logger + logger := logger.MakeLogger(nil) + + // Create a mock cache + cache1 := cache.NewMemoryCache(100) + + // Create a new TieredCache + tc, _ := NewTieredCache([]cache.Cache{cache1}, logger, 60) + + // Set values in the cache + cache1.Set("key1", "value1", 60) + cache1.Set("key2", "value2", 60) + cache1.Set("prefix1_key", "value3", 60) + cache1.Set("prefix2_key", "value4", 60) + + // Delete values with prefix "prefix1_" + err := tc.DeleteWithPrefix("prefix1_") + + // Verify that the values are deleted correctly + if err != nil { + t.Errorf("Failed to delete values: %v", err) + } + if _, found := cache1.Get("prefix1_key"); found { + t.Errorf("Expected 'prefix1_key' to be deleted") + } + if _, found := cache1.Get("prefix2_key"); !found { + t.Errorf("Expected 'prefix2_key' to be present") + } +} + +func TestTieredCache_Name(t *testing.T) { + // Create a mock logger + logger := logger.MakeLogger(nil) + + // Create a mock cache + cache1 := cache.NewMemoryCache(100) + + // Create a new TieredCache + tc, _ := NewTieredCache([]cache.Cache{cache1}, logger, 60) + + // Verify the name of the TieredCache + name := tc.Name() + if name != "Tiered" { + t.Errorf("Expected name 'TieredCache', got '%s'", name) + } +}