Skip to content

Commit

Permalink
initial fs cache test with tests (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
lleadbet authored Sep 8, 2024
1 parent c831034 commit 46421cd
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ config.yaml
.idea

dist/
tempcache
1 change: 1 addition & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Cache interface {
Get(key string) ([]byte, bool) // Get retrieves an item from the cache if it exists and hasn't expired.
Set(key string, content string, duration int) error // Set adds an item to the cache with a specified duration until expiration.
DeleteWithPrefix(prefix string) error
Name() string
}

type keyType string
Expand Down
6 changes: 6 additions & 0 deletions cache/in_memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"fmt"
"sync"
"time"
)
Expand All @@ -25,6 +26,7 @@ func (c *MemoryCache) Get(key string) ([]byte, bool) {

item, found := c.items[key]

fmt.Printf("item: %v\n", item)
// If the item is not found or has expired, return a cache miss.
// The special case of time.Unix(1<<63-1, 0) is used to indicate that an item never expires- and
// time.Before will always return true for this case.
Expand Down Expand Up @@ -81,3 +83,7 @@ func (c *MemoryCache) DeleteWithPrefix(prefix string) error {

return nil
}

func (c *MemoryCache) Name() string {
return "Memory"
}
3 changes: 3 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ cache:
# address: "localhost:6379"
# password: admin
# database: 0
filesystem:
enabled: true
directory: ./tempcache/
supergraphs:
- graphRef: "${APOLLO_GRAPH_REF}"
apolloKey: "${APOLLO_KEY}"
Expand Down
23 changes: 15 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
// Config represents the application's configuration structure,
// housing Relay, Uplink, and Cache configurations.
type Config struct {
Relay RelayConfig `yaml:"relay"` // RelayConfig for incoming connections.
Uplink UplinkConfig `yaml:"uplink"` // UplinkConfig for managing uplink configuration.
Cache CacheConfig `yaml:"cache"` // CacheConfig for cache settings.
Redis RedisConfig `yaml:"redis"` // RedisConfig for using redis as cache.
Supergraphs []SupergraphConfig `yaml:"supergraphs"` // SupergraphConfig for supergraph settings.
Webhook WebhookConfig `yaml:"webhook"` // WebhookConfig for webhook handling.
Polling PollingConfig `yaml:"polling"` // PollingConfig for polling settings.
ManagementAPI ManagementAPIConfig `yaml:"managementAPI"` // ManagementAPIConfig for management API settings.
Relay RelayConfig `yaml:"relay"` // RelayConfig for incoming connections.
Uplink UplinkConfig `yaml:"uplink"` // UplinkConfig for managing uplink configuration.
Cache CacheConfig `yaml:"cache"` // CacheConfig for cache settings.
Redis RedisConfig `yaml:"redis"` // RedisConfig for using redis as cache.
FilesystemCache FilesystemCacheConfig `yaml:"filesystem"` // FilesystemCacheConfig for using filesystem as cache.
Supergraphs []SupergraphConfig `yaml:"supergraphs"` // SupergraphConfig for supergraph settings.
Webhook WebhookConfig `yaml:"webhook"` // WebhookConfig for webhook handling.
Polling PollingConfig `yaml:"polling"` // PollingConfig for polling settings.
ManagementAPI ManagementAPIConfig `yaml:"managementAPI"` // ManagementAPIConfig for management API settings.
}

// RelayConfig defines the address the proxy server listens on.
Expand Down Expand Up @@ -61,6 +62,12 @@ type RedisConfig struct {
Database int `yaml:"database"` // Database to use in the Redis server.
}

// FilesystemCacheConfig defines the configuration for connecting to a Redis cache.
type FilesystemCacheConfig struct {
Enabled bool `yaml:"enabled"` // Whether Redis caching is enabled.
Directory string `yaml:"directory"` // Path to the filesystem cache.
}

// WebhookConfig defines the configuration for webhook handling.
type WebhookConfig struct {
Enabled bool `yaml:"enabled"` // Whether webhook handling is enabled.
Expand Down
82 changes: 82 additions & 0 deletions filesystem_cache/filesystem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package filesystem_cache

import (
"fmt"
"os"
"path"
)

const PERMISSIONS = 0644

type FilesystemCache struct {
path string
}

func NewFilesystemCache(path string) (*FilesystemCache, error) {
f, err := os.Stat(path)
if os.IsNotExist(err) {
// if the path does not exist, we can create it
err := os.MkdirAll(path, 0755)
if err != nil {
return nil, fmt.Errorf("failed to create directory %s: %v", path, err)
}
} else if !f.Mode().IsDir() {
return nil, fmt.Errorf("path %s is not a directory", path)
}
return &FilesystemCache{path}, nil
}

func (c *FilesystemCache) Get(key string) ([]byte, bool) {
// Read the content of the file with the given key
// If the file does not exist, return false
// If the file exists, return the content as a byte slice
content, err := os.ReadFile(fmt.Sprintf("%v/%v", c.path, key))
if err != nil {
return nil, false
}
return content, true
}

func (c *FilesystemCache) Set(key string, content string, duration int) error {
// Write the content to a file with the given key
// duration is not used in this implementation as pruning is not implemented
cachePath := fmt.Sprintf("%v/%v", c.path, key)
if _, err := os.Stat(cachePath); os.IsNotExist(err) {
dir := path.Dir(cachePath)
err := os.MkdirAll(dir, 0755)
if err != nil {
return fmt.Errorf("failed to create directory %s: %v", dir, err)
}
}
return os.WriteFile(cachePath, []byte(content), PERMISSIONS)
}

func (c *FilesystemCache) DeleteWithPrefix(prefix string) error {
// Delete all files with the given prefix from the cache.
// We can use the filepath.Glob function to get all files with the given prefix
files, err := os.ReadDir(c.path)
if err != nil {
return fmt.Errorf("failed to read directory %s: %v", c.path, err)
}
for _, file := range files {
if file.IsDir() {
continue
}
if !file.Type().IsRegular() {
continue
}

if file.Name()[:len(prefix)] == prefix {
err := os.Remove(fmt.Sprintf("%v/%v", c.path, file.Name()))
if err != nil {
return fmt.Errorf("failed to delete file %s: %v", file.Name(), err)
}
}
}

return nil
}

func (c *FilesystemCache) Name() string {
return "Filesystem"
}
156 changes: 156 additions & 0 deletions filesystem_cache/filesystem_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package filesystem_cache

import (
"os"
"path/filepath"
"testing"
)

func TestNewFilesystemCache(t *testing.T) {
cachePath, _ := os.MkdirTemp("", "filesystem_cache_test")
defer os.RemoveAll(cachePath)
cache, err := NewFilesystemCache(cachePath)
if err != nil {
t.Errorf("Failed to create filesystem cache: %v", err)
}

// Verify that the cache path is set correctly
if cache.path != cachePath {
t.Errorf("Expected cache path %s, got %s", cachePath, cache.path)
}

// Verify that the cache directory is created
_, err = os.Stat(cachePath)
if os.IsNotExist(err) {
t.Errorf("Cache directory %s does not exist", cachePath)
}
}

func TestFilesystemCache_Get(t *testing.T) {
cachePath, _ := os.MkdirTemp("", "filesystem_cache_test")
defer os.RemoveAll(cachePath)
cache, _ := NewFilesystemCache(cachePath)

// Create a test file
testKey := "test_key"
testContent := "test_content"
testFilePath := filepath.Join(cachePath, testKey)
err := os.WriteFile(testFilePath, []byte(testContent), PERMISSIONS)
if err != nil {
t.Errorf("Failed to create test file: %v", err)
}

// Retrieve the content from the cache
content, ok := cache.Get(testKey)
if !ok {
t.Errorf("Expected cache hit for key %s, got cache miss", testKey)
}

// Verify that the retrieved content matches the test content
if string(content) != testContent {
t.Errorf("Expected content %s, got %s", testContent, string(content))
}

// Clean up the test file
err = os.Remove(testFilePath)
if err != nil {
t.Errorf("Failed to remove test file: %v", err)
}

// Test that it'll create subdirectories if needed
nestedDir := filepath.Join(cachePath, "nested")
_, err = NewFilesystemCache(nestedDir)
if err != nil {
t.Errorf("Failed to create nested cache: %v", err)
}
if f, err := os.Stat(nestedDir); os.IsNotExist(err) || !f.IsDir() {
t.Errorf("Expected nested cache directory to be created")
}
}

func TestFilesystemCache_Set(t *testing.T) {
cachePath, _ := os.MkdirTemp("", "filesystem_cache_test")
defer os.RemoveAll(cachePath)
cache, _ := NewFilesystemCache(cachePath)

// Set a test key-value pair in the cache
testKey := "test_key"
testContent := "test_content"
err := cache.Set(testKey, testContent, 0)
if err != nil {
t.Errorf("Failed to set key-value pair in cache: %v", err)
}

// Verify that the test file is created in the cache directory
testFilePath := filepath.Join(cachePath, testKey)
_, err = os.Stat(testFilePath)
if os.IsNotExist(err) {
t.Errorf("Test file %s does not exist in cache directory", testFilePath)
}

// Clean up the test file
err = os.Remove(testFilePath)
if err != nil {
t.Errorf("Failed to remove test file: %v", err)
}
}

func TestFilesystemCache_DeleteWithPrefix(t *testing.T) {
cachePath, _ := os.MkdirTemp("", "filesystem_cache_test")
defer os.RemoveAll(cachePath)
cache, _ := NewFilesystemCache(cachePath)

// Create test files with different prefixes
testPrefix1 := "prefix1"
testPrefix2 := "prefix2"
testKey1 := testPrefix1 + "_key"
testKey2 := testPrefix2 + "_key"
testContent := "test_content"
testFilePath1 := filepath.Join(cachePath, testKey1)
testFilePath2 := filepath.Join(cachePath, testKey2)
err := os.WriteFile(testFilePath1, []byte(testContent), PERMISSIONS)
if err != nil {
t.Errorf("Failed to create test file: %v", err)
}
err = os.WriteFile(testFilePath2, []byte(testContent), PERMISSIONS)
if err != nil {
t.Errorf("Failed to create test file: %v", err)
}

// Delete files with prefix "prefix1"
err = cache.DeleteWithPrefix(testPrefix1)
if err != nil {
t.Errorf("Failed to delete files with prefix %s: %v", testPrefix1, err)
}

// Verify that the file with prefix "prefix1" is deleted
_, err = os.Stat(testFilePath1)
if !os.IsNotExist(err) {
t.Errorf("Expected file %s to be deleted, but it still exists", testFilePath1)
}

// Verify that the file with prefix "prefix2" still exists
_, err = os.Stat(testFilePath2)
if os.IsNotExist(err) {
t.Errorf("Expected file %s to exist, but it does not", testFilePath2)
}

// Clean up the remaining test file
err = os.Remove(testFilePath2)
if err != nil {
t.Errorf("Failed to remove test file: %v", err)
}
}

func TestFilesystemCache_Name(t *testing.T) {
cachePath, _ := os.MkdirTemp("", "filesystem_cache_test")
defer os.RemoveAll(cachePath)
cache, _ := NewFilesystemCache(cachePath)

// Verify that the cache name is returned correctly
expectedName := "Filesystem"
name := cache.Name()
if name != expectedName {
t.Errorf("Expected cache name %s, got %s", expectedName, name)
}
}
36 changes: 33 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import (

"apollosolutions/uplink-relay/cache"
"apollosolutions/uplink-relay/config"
"apollosolutions/uplink-relay/filesystem_cache"
"apollosolutions/uplink-relay/graph"
"apollosolutions/uplink-relay/logger"
persistedqueries "apollosolutions/uplink-relay/persisted_queries"
"apollosolutions/uplink-relay/pinning"
"apollosolutions/uplink-relay/polling"
"apollosolutions/uplink-relay/proxy"
apolloredis "apollosolutions/uplink-relay/redis"
"apollosolutions/uplink-relay/tiered_cache"
"apollosolutions/uplink-relay/uplink"
"apollosolutions/uplink-relay/webhooks"

Expand Down Expand Up @@ -65,7 +67,23 @@ func main() {
}

// Initialize caching based on the configuration.
var uplinkCaches = make([]cache.Cache, 0)

var uplinkCache cache.Cache
// Initialize the cache based on the configuration.
// We want to use the first cache that is enabled, which should be the in-memory cache
if mergedConfig.Cache.Enabled {
uplinkCaches = append(uplinkCaches, cache.NewMemoryCache(mergedConfig.Cache.MaxSize))
}
if mergedConfig.FilesystemCache.Enabled {
logger.Info("Using filesystem cache", "directory", mergedConfig.FilesystemCache.Directory)
filesystemCache, err := filesystem_cache.NewFilesystemCache(mergedConfig.FilesystemCache.Directory)
if err != nil {
logger.Error("Failed to create filesystem cache", "err", err)
os.Exit(1)
}
uplinkCaches = append(uplinkCaches, filesystemCache)
}
if mergedConfig.Redis.Enabled {
logger.Info("Using Redis cache", "address", mergedConfig.Redis.Address)
redisClient := redis.NewClient(&redis.Options{
Expand All @@ -74,11 +92,23 @@ func main() {
DB: mergedConfig.Redis.Database,
})
redisClient.Ping()
uplinkCache = apolloredis.NewRedisCache(redisClient)
} else {
uplinkCache = cache.NewMemoryCache(mergedConfig.Cache.MaxSize)
uplinkCaches = append(uplinkCaches, apolloredis.NewRedisCache(redisClient))
}

if len(uplinkCaches) == 0 {
logger.Error("No cache configured")
os.Exit(1)
} else if len(uplinkCaches) == 1 {
logger.Debug("Using single cache")
uplinkCache = uplinkCaches[0]
} else {
logger.Debug("Using tiered cache")
uplinkCache, err = tiered_cache.NewTieredCache(uplinkCaches, logger, mergedConfig.Cache.Duration)
if err != nil {
logger.Error("Failed to create tiered cache", "err", err)
os.Exit(1)
}
}
// Create a channel to stop polling on SIGHUP to avoid duplicate polling.
stopPolling := make(chan bool, 1)

Expand Down
4 changes: 4 additions & 0 deletions redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (c *RedisCache) DeleteWithPrefix(prefix string) error {
}
return nil
}

func (c *RedisCache) Name() string {
return "Redis"
}
Loading

0 comments on commit 46421cd

Please sign in to comment.