diff --git a/cache/disk/disk.go b/cache/disk/disk.go index c955a07f1..7049a5dc2 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -77,8 +77,9 @@ type diskCache struct { accessLogger *log.Logger containsQueue chan proxyCheck - // Limit the number of simultaneous file removals. - fileRemovalSem *semaphore.Weighted + // Limit the number of simultaneous file removals and filesystem write + // operations (apart from atime updates, which we hope are fast). + diskWaitSem *semaphore.Weighted mu sync.Mutex lru SizedLRU @@ -103,6 +104,11 @@ func badReqErr(format string, a ...interface{}) *cache.Error { } } +var ErrOverloaded = &cache.Error{ + Code: http.StatusInsufficientStorage, + Text: "Out of disk space, due to too large or too many concurrent cache requests. Please try again later.", +} + // Non-test users must call this to expose metrics. func (c *diskCache) RegisterMetrics() { c.lru.RegisterMetrics() @@ -114,6 +120,21 @@ func (c *diskCache) RegisterMetrics() { // but since the updater func must lock the cache mu, it was deemed // necessary to have greater control of when to get the cache age go c.pollCacheAge() + + go c.shiftMetricPeriodContinuously() +} + +// Shift to new period for metrics every 30 seconds. A period of +// 30 seconds should give margin to catch all peaks (with for example +// a 10 second scrape interval) even in cases of delayed or missed +// scrapes from prometheus. +func (c *diskCache) shiftMetricPeriodContinuously() { + ticker := time.NewTicker(30 * time.Second) + for ; true; <-ticker.C { + c.mu.Lock() + c.lru.shiftToNextMetricPeriod() + c.mu.Unlock() + } } // Update metric every minute with the idle time of the least recently used item in the cache @@ -167,12 +188,6 @@ func (c *diskCache) getElementPath(key Key, value lruItem) string { } func (c *diskCache) removeFile(f string) { - if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil { - log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f) - return - } - defer c.fileRemovalSem.Release(1) - err := os.Remove(f) if err != nil { log.Printf("ERROR: failed to remove evicted cache file: %s", f) @@ -240,6 +255,17 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string, return nil } + // Put requests are processed using blocking file syscalls, which + // consume one operating system thread per put request. Throttling + // the Put requests with a semaphore to avoid requring too many + // operating system threads. Get requests do not seem to consume any + // significant amount of OS threads and are therefore not throttled. + if err := c.diskWaitSem.Acquire(context.Background(), 1); err != nil { + log.Printf("ERROR: failed to aquire semaphore: %v", err) + return internalErr(err) + } + defer c.diskWaitSem.Release(1) + key := cache.LookupKey(kind, hash) var tf *os.File // Tempfile. @@ -282,11 +308,7 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string, } if !ok { c.mu.Unlock() - return &cache.Error{ - Code: http.StatusInsufficientStorage, - Text: fmt.Sprintf("The item (%d) + reserved space is larger than the cache's maximum size (%d).", - size, c.lru.MaxSize()), - } + return ErrOverloaded } c.mu.Unlock() unreserve = true diff --git a/cache/disk/load.go b/cache/disk/load.go index 0ffb4652a..676abd2ef 100644 --- a/cache/disk/load.go +++ b/cache/disk/load.go @@ -43,18 +43,14 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) { } // Go defaults to a limit of 10,000 operating system threads. - // We probably don't need half of those for file removals at - // any given point in time, unless the disk/fs can't keep up. - // I suppose it's better to slow down processing than to crash - // when hitting the 10k limit or to run out of disk space. + // Violating that limit would result in a crash and therefore we use + // a semaphore to throttle amount of concurrently running blocking + // file syscalls. A semaphore weight of 5,000 should give plenty of + // margin. The weight should not be set too low because the + // average latency could increase if a few slow clients could block + // all other clients. semaphoreWeight := int64(5000) - - if strings.HasPrefix(runtime.GOOS, "darwin") { - // Mac seems to fail to create os threads when removing - // lots of files, so allow fewer than linux. - semaphoreWeight = 3000 - } - log.Printf("Limiting concurrent file removals to %d\n", semaphoreWeight) + log.Printf("Limiting concurrent disk waiting requests to %d\n", semaphoreWeight) zi, err := zstdimpl.Get("go") if err != nil { @@ -70,7 +66,11 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) { maxBlobSize: math.MaxInt64, maxProxyBlobSize: math.MaxInt64, - fileRemovalSem: semaphore.NewWeighted(semaphoreWeight), + // Acquire 1 of these before starting filesystem writes/deletes, or + // reject filesystem writes upon failure (since this will create a + // new OS thread and we don't want to hit Go's default 10,000 OS + // thread limit. + diskWaitSem: semaphore.NewWeighted(semaphoreWeight), gaugeCacheAge: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "bazel_remote_disk_cache_longest_item_idle_time_seconds", @@ -112,7 +112,7 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) { if err != nil { return nil, fmt.Errorf("Attempting to migrate the old directory structure failed: %w", err) } - err = c.loadExistingFiles(maxSizeBytes) + err = c.loadExistingFiles(maxSizeBytes, cc) if err != nil { return nil, fmt.Errorf("Loading of existing cache entries failed due to error: %w", err) } @@ -537,7 +537,7 @@ func (c *diskCache) scanDir() (scanResult, error) { // loadExistingFiles lists all files in the cache directory, and adds them to the // LRU index so that they can be served. Files are sorted by access time first, // so that the eviction behavior is preserved across server restarts. -func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error { +func (c *diskCache) loadExistingFiles(maxSizeBytes int64, cc CacheConfig) error { log.Printf("Loading existing files in %s.\n", c.dir) result, err := c.scanDir() @@ -550,18 +550,40 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error { sort.Sort(result) // The eviction callback deletes the file from disk. - // This function is only called while the lock is held + // This function is only called while the lock is not held // by the current goroutine. onEvict := func(key Key, value lruItem) { f := c.getElementPath(key, value) - // Run in a goroutine so we can release the lock sooner. - go c.removeFile(f) + c.removeFile(f) } log.Println("Building LRU index.") c.lru = NewSizedLRU(maxSizeBytes, onEvict, len(result.item)) + log.Printf("Will evict at max_size: %.2f GB", bytesToGigaBytes(maxSizeBytes)) + + if cc.diskSizeLimit > 0 { + // Only set and print if optional limit is enabled. + c.lru.diskSizeLimit = cc.diskSizeLimit + log.Printf("Will reject at disk_size_limit: %.2f GB", + bytesToGigaBytes(c.lru.diskSizeLimit)) + } + + // Start one single goroutine running in background, continuously + // waiting for files to be removed and removing them. Benchmarks on + // Linux with XFS file system have surprisingly shown that removal + // sequentially with a single goroutine is much faster than starting + // separate go routines for each file and removing them in parallel + // despite SSDs with high IOPS performance. Benchmarks have also shown + // that the single background goroutine is still slightly faster even + // if the parallel goroutines would be serialized with a semaphore. + // Sequentially evicting all files helps ensure that Go’s default + // limit of 10,000 operating system threads is not violated. Otherwise, + // the number of concurrent removals could explode when a large new + // file suddenly evicts thousands of old small files. + go c.lru.performQueuedEvictionsContinuously() + for i := 0; i < len(result.item); i++ { ok := c.lru.Add(result.metadata[i].lookupKey, *result.item[i]) if !ok { @@ -572,7 +594,19 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error { } } + // Printing progress gives awareness about slow operations. + // And waiting for evictions to complete before accepting client + // connection reduce risk for confusing overload errors at runtime. + log.Println("Waiting for evictions...") + for c.lru.queuedEvictionsSize.Load() > 0 { + time.Sleep(200 * time.Millisecond) + } + log.Println("Finished loading disk cache files.") return nil } + +func bytesToGigaBytes(bytes int64) float64 { + return float64(bytes) / (1024.0 * 1024.0 * 1024.0) +} diff --git a/cache/disk/lru.go b/cache/disk/lru.go index faeb60be6..48672e881 100644 --- a/cache/disk/lru.go +++ b/cache/disk/lru.go @@ -4,6 +4,7 @@ import ( "container/list" "errors" "fmt" + "sync/atomic" "github.com/prometheus/client_golang/prometheus" ) @@ -42,12 +43,49 @@ type SizedLRU struct { // cache below maxSize. maxSize int64 + // Channel containing evicted entries removed from ll, but not yet + // removed from file system. + // + // The entries are wrapped in a slice to allow the queue to grow + // dynamically and not being limited by the channel's max size. Note + // that one single new large file can result in evicting thousands of + // small old files. And on high load, with many new files, the queue + // of evicting entries aggregates and can grow quickly. + // + // The consumer of the channel does not have to bother about the + // diskCache.mu mutex. + // + // The removal of evicted files asynchronously improves the latency for + // Put requests that can start writing the new file earlier. And in + // addition, improves latency for all requests by not having to hold + // the diskCache.mu mutex during file system remove syscalls. + queuedEvictionsChan chan []*entry + onEvict EvictCallback gaugeCacheSizeBytes prometheus.Gauge + gaugeCacheSizeBytesLimit *prometheus.GaugeVec gaugeCacheLogicalBytes prometheus.Gauge counterEvictedBytes prometheus.Counter counterOverwrittenBytes prometheus.Counter + + // Peak value of: currentSize + currentlyEvictingSize + // Type is uint64 instead of int64 in order to allow representing also + // large, rejected reservations that would have resulted in values above + // the int64 diskSizeLimit. + totalDiskSizePeak uint64 + + // Configured max allowed bytes on disk for the cache, including files + // queued for eviction but not yet removed. Value <= 0 means no + // limit. The diskSizeLimit is expected to be configured higher than + // maxSize (e.g., 5% higher) to allow the asynchronous removal to catch + // up after peaks of file writes. + diskSizeLimit int64 + + // Number of bytes currently being evicted (removed from lru but not + // yet removed from disk). Is allowed to be accessed and changed + // without holding the diskCache.mu lock. + queuedEvictionsSize atomic.Int64 } type entry struct { @@ -69,8 +107,15 @@ func NewSizedLRU(maxSize int64, onEvict EvictCallback, initialCapacity int) Size gaugeCacheSizeBytes: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "bazel_remote_disk_cache_size_bytes", - Help: "The current number of bytes in the disk backend", + Help: "The peak number of bytes in the disk backend for the previous 30 second period.", }), + gaugeCacheSizeBytesLimit: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bazel_remote_disk_cache_size_bytes_limit", + Help: "The currently configured limits of different types, e.g. for when disk cache evicts data or rejects requests.", + }, + []string{"type"}, + ), gaugeCacheLogicalBytes: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "bazel_remote_disk_cache_logical_bytes", Help: "The current number of bytes in the disk backend if they were uncompressed", @@ -83,14 +128,26 @@ func NewSizedLRU(maxSize int64, onEvict EvictCallback, initialCapacity int) Size Name: "bazel_remote_disk_cache_overwritten_bytes_total", Help: "The total number of bytes removed from disk backend, due to put of already existing key", }), + + queuedEvictionsChan: make(chan []*entry, 1), } } func (c *SizedLRU) RegisterMetrics() { prometheus.MustRegister(c.gaugeCacheSizeBytes) + prometheus.MustRegister(c.gaugeCacheSizeBytesLimit) prometheus.MustRegister(c.gaugeCacheLogicalBytes) prometheus.MustRegister(c.counterEvictedBytes) prometheus.MustRegister(c.counterOverwrittenBytes) + + // Set gauges to constant configured values to help visualize configured limits + // and in particular help tuning disk_size_limit configuration by comparing it + // against peak values of the bazel_remote_disk_cache_size_bytes prometheus gauge + // and give awareness about if getting close to rejecting requests. + c.gaugeCacheSizeBytesLimit.WithLabelValues("evict").Set(float64(c.maxSize)) + if c.diskSizeLimit > 0 { + c.gaugeCacheSizeBytesLimit.WithLabelValues("reject").Set(float64(c.diskSizeLimit)) + } } // Add adds a (key, value) to the cache, evicting items as necessary. @@ -109,6 +166,16 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) { return false } + // The files are already stored on disk when Add is invoked, therefore + // it is not motivated to reject based on diskSizeLimit. The check + // against maxSize is considered sufficient. However, invoke + // calcTotalDiskSizeAndUpdatePeak to update the peak value. The peak + // value is updated BEFORE triggering new evictions, to make the + // metrics reflect that both the new file and the files it + // evicts/replaces exists at disk at same time for a short period of + // time (unless Reserve method was used and evicted them). + c.calcTotalDiskSizeAndUpdatePeak(roundedUpSizeOnDisk) + var sizeDelta, uncompressedSizeDelta int64 if ee, ok := c.cache[key]; ok { sizeDelta = roundedUpSizeOnDisk - roundUp4k(ee.Value.(*entry).value.sizeOnDisk) @@ -119,10 +186,9 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) { c.ll.MoveToFront(ee) c.counterOverwrittenBytes.Add(float64(ee.Value.(*entry).value.sizeOnDisk)) - prevValue := ee.Value.(*entry).value - if c.onEvict != nil { - c.onEvict(key, prevValue) - } + kv := ee.Value.(*entry) + kvCopy := &entry{kv.key, kv.value} + c.appendEvictionToQueue(kvCopy) ee.Value.(*entry).value = value } else { @@ -147,7 +213,6 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) { c.currentSize += sizeDelta c.uncompressedSize += uncompressedSizeDelta - c.gaugeCacheSizeBytes.Set(float64(c.currentSize)) c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize)) return true @@ -167,7 +232,6 @@ func (c *SizedLRU) Get(key Key) (value lruItem, ok bool) { func (c *SizedLRU) Remove(key Key) { if ele, hit := c.cache[key]; hit { c.removeElement(ele) - c.gaugeCacheSizeBytes.Set(float64(c.currentSize)) c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize)) } } @@ -226,6 +290,35 @@ func (c *SizedLRU) Reserve(size int64) (bool, error) { return false, nil } + // Note that the calculated value and the potentially updated peak + // value, includes the value tried to be reserved. In other words, + // the peak value is updated even if the limit is exceeded, and the + // reservation rejected. That is on purpose to allow using the + // prometheus gague of the peak value to understand why reservations + // are rejected. That gauge is an aid for tuning disk size limit, + // and it is therefore beneficial that the same calculated + // value (returned as totalDiskSizeNow) is used both for the metrics + // gauge and the logic for deciding about rejection. + totalDiskSizeNow := c.calcTotalDiskSizeAndUpdatePeak(size) + + if c.diskSizeLimit > 0 && totalDiskSizeNow > (uint64(c.diskSizeLimit)) { + + // Reject and let the client decide about retries. E.g., a bazel + // client either building locally or with + // --remote_local_fallback, can choose to have minimal number + // of retries since uploading the build result is not + // critical. And a client depending on remote execution + // where upload is critical can choose a large number of + // retries. Retrying only critical writes increases the chance + // for bazel-remote to recover from the overload quicker. + // Note that bazel-remote can continue serving reads even when + // overloaded by writes, e.g., when SSD's write IOPS capacity + // is overloaded but reads can be served from operating + // system's file system cache in RAM. + + return false, nil + } + // Evict elements until we are able to reserve enough space. for sumLargerThan(size, c.currentSize, c.maxSize) { ele := c.ll.Back() @@ -270,10 +363,7 @@ func (c *SizedLRU) removeElement(e *list.Element) { c.currentSize -= roundUp4k(kv.value.sizeOnDisk) c.uncompressedSize -= roundUp4k(kv.value.size) c.counterEvictedBytes.Add(float64(kv.value.sizeOnDisk)) - - if c.onEvict != nil { - c.onEvict(kv.key, kv.value) - } + c.appendEvictionToQueue(kv) } // Round n up to the nearest multiple of BlockSize (4096). @@ -290,3 +380,54 @@ func (c *SizedLRU) getTailItem() (Key, lruItem) { } return nil, lruItem{} } + +// Append an entry to the eviction queue. The entry must have been removed +// from SizedLRU.ll before being sent to this method. +// Note that this method can be invoked without holding the diskCache.mu mutex, +// but it is guaranteed to never block for full channel buffer as long as +// it is invoked only when holding the diskCache.mu mutex and no one else tries +// to send to queuedEvictionsChan concurrently. +func (c *SizedLRU) appendEvictionToQueue(e *entry) { + c.queuedEvictionsSize.Add(e.value.sizeOnDisk) + select { + case queuedEvictions := <-c.queuedEvictionsChan: + c.queuedEvictionsChan <- append(queuedEvictions, e) + default: + c.queuedEvictionsChan <- []*entry{e} + } +} + +// Block waiting for a slice of evicted entries and then remove them from +// file system. Note that one single slice could theoretically contain +// millions of entries in overload situations. +// Note that this method may be invoked without holding the diskCache.mu mutex. +func (c *SizedLRU) performQueuedEvictions() { + for _, kv := range <-c.queuedEvictionsChan { + c.onEvict(kv.key, kv.value) + c.queuedEvictionsSize.Add(-kv.value.sizeOnDisk) + } +} + +// Note that this method may be invoked without holding the diskCache.mu mutex. +func (c *SizedLRU) performQueuedEvictionsContinuously() { + for { + c.performQueuedEvictions() + } +} + +// Note that this function only needs to be called when the disk size usage +// can grow (e.g., from Reserve and Add, but not from Remove). +// Note that diskCache.mu mutex must be held when invoking this method. +func (c *SizedLRU) calcTotalDiskSizeAndUpdatePeak(sizeOfNewFile int64) uint64 { + totalDiskSizeNow := uint64(c.currentSize) + uint64(c.queuedEvictionsSize.Load()) + uint64(sizeOfNewFile) + if totalDiskSizeNow > c.totalDiskSizePeak { + c.totalDiskSizePeak = totalDiskSizeNow + } + return totalDiskSizeNow +} + +// Note that diskCache.mu mutex must be held when invoking this method. +func (c *SizedLRU) shiftToNextMetricPeriod() { + c.gaugeCacheSizeBytes.Set(float64(c.totalDiskSizePeak)) + c.totalDiskSizePeak = uint64(c.currentSize) + uint64(c.queuedEvictionsSize.Load()) +} diff --git a/cache/disk/lru_test.go b/cache/disk/lru_test.go index f7ae928f0..1b28e5136 100644 --- a/cache/disk/lru_test.go +++ b/cache/disk/lru_test.go @@ -89,7 +89,9 @@ func TestEviction(t *testing.T) { if !ok { t.Fatalf("Add: failed adding %d", i) } - + if len(lru.queuedEvictionsChan) > 0 { + lru.performQueuedEvictions() + } checkSizeAndNumItems(t, lru, thisExpected.expBlocks*BlockSize, thisExpected.expNumItems) expectedEvictions = append(expectedEvictions, thisExpected.expEvicted...) diff --git a/cache/disk/options.go b/cache/disk/options.go index 25339234e..24e2b3e2e 100644 --- a/cache/disk/options.go +++ b/cache/disk/options.go @@ -16,6 +16,7 @@ type Option func(*CacheConfig) error type CacheConfig struct { diskCache *diskCache // Assumed to be non-nil. metrics *metricsDecorator // May be nil. + diskSizeLimit int64 } func WithStorageMode(mode string) Option { @@ -101,3 +102,10 @@ func WithEndpointMetrics() Option { return nil } } + +func WithDiskSizeLimit(diskSizeLimit int64) Option { + return func(cc *CacheConfig) error { + cc.diskSizeLimit = diskSizeLimit + return nil + } +} diff --git a/config/config.go b/config/config.go index c2509d3bc..1088e5256 100644 --- a/config/config.go +++ b/config/config.go @@ -41,6 +41,7 @@ type Config struct { ProfileAddress string `yaml:"profile_address"` Dir string `yaml:"dir"` MaxSize int `yaml:"max_size"` + DiskSizeLimit int `yaml:"disk_size_limit"` StorageMode string `yaml:"storage_mode"` ZstdImplementation string `yaml:"zstd_implementation"` HtpasswdFile string `yaml:"htpasswd_file"` @@ -119,6 +120,7 @@ func newFromArgs(dir string, maxSize int, storageMode string, zstdImplementation httpWriteTimeout time.Duration, accessLogLevel string, logTimezone string, + diskSizeLimit int, maxBlobSize int64, maxProxyBlobSize int64) (*Config, error) { @@ -128,6 +130,7 @@ func newFromArgs(dir string, maxSize int, storageMode string, zstdImplementation ProfileAddress: profileAddress, Dir: dir, MaxSize: maxSize, + DiskSizeLimit: diskSizeLimit, StorageMode: storageMode, ZstdImplementation: zstdImplementation, HtpasswdFile: htpasswdFile, @@ -521,6 +524,7 @@ func get(ctx *cli.Context) (*Config, error) { ctx.Duration("http_write_timeout"), ctx.String("access_log_level"), ctx.String("log_timezone"), + ctx.Int("disk_size_limit"), ctx.Int64("max_blob_size"), ctx.Int64("max_proxy_blob_size"), ) diff --git a/main.go b/main.go index b20260c49..c54e57d7e 100644 --- a/main.go +++ b/main.go @@ -140,6 +140,7 @@ func run(ctx *cli.Context) error { disk.WithZstdImplementation(c.ZstdImplementation), disk.WithMaxBlobSize(c.MaxBlobSize), disk.WithProxyMaxBlobSize(c.MaxProxyBlobSize), + disk.WithDiskSizeLimit(int64(c.DiskSizeLimit) * 1024 * 1024 * 1024), disk.WithAccessLogger(c.AccessLogger), } if c.ProxyBackend != nil { diff --git a/server/grpc.go b/server/grpc.go index 42eefb262..8c033d904 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -238,6 +238,10 @@ func gRPCErrCode(err error, dflt codes.Code) codes.Code { return codes.OK } + if err == disk.ErrOverloaded { + return codes.ResourceExhausted + } + cerr, ok := err.(*cache.Error) if ok && cerr.Code == http.StatusBadRequest { return codes.InvalidArgument @@ -245,3 +249,37 @@ func gRPCErrCode(err error, dflt codes.Code) codes.Code { return dflt } + +// Translate error codes, received by server when streaming back to client, into +// an error code suitable to return as result from the original server invocation +// that originated the streaming. +func translateGRPCErrCodeFromClient(err error) codes.Code { + + resultingCode := status.Code(err) + + // Client rejecting the streaming with + // "code = Unavailable desc = transport is closing" + // indicates that client canceled the call and is closing down. Client + // being unavailable should not be confused as server being unavailable, + // and is therefore instead mapped to Canceled. + if resultingCode == codes.Unavailable { + return codes.Canceled + } + + // Internal error from client should not be mapped to internal error + // in server, and is therefore translated to Unknown. + if resultingCode == codes.Internal { + return codes.Unknown + } + + return resultingCode +} + +func (s *grpcServer) logErrorPrintf(err error, format string, a ...any) { + if err == disk.ErrOverloaded { + // Using accessLogger to prevent too verbose logging to errorLogger. + s.accessLogger.Printf(format, a...) + } else { + s.errorLogger.Printf(format, a...) + } +} diff --git a/server/grpc_ac.go b/server/grpc_ac.go index a5256522a..b76cc0ceb 100644 --- a/server/grpc_ac.go +++ b/server/grpc_ac.go @@ -171,6 +171,12 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte return nil // Not inlined, nothing to do. } + if (*digest).SizeBytes <= 0 { + // Unexpected corner case? + *slice = []byte{} + return nil + } + if *digest == nil { hash := sha256.Sum256(*slice) *digest = &pb.Digest{ @@ -183,10 +189,10 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte if !found { err := s.cache.Put(ctx, cache.CAS, (*digest).Hash, (*digest).SizeBytes, bytes.NewReader(*slice)) - if err != nil && err != io.EOF { - return err + if err == nil || err == io.EOF { + s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash) } - s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash) + // De-inline failed (possibly due to "resource overload"), that's OK though. } *slice = []byte{} @@ -261,7 +267,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, err = s.cache.Put(ctx, cache.AC, req.ActionDigest.Hash, int64(len(data)), bytes.NewReader(data)) if err != nil && err != io.EOF { - s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) + s.logErrorPrintf(err, "%s %s %s", logPrefix, req.ActionDigest.Hash, err) code := gRPCErrCode(err, codes.Internal) return nil, status.Error(code, err.Error()) } @@ -285,7 +291,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, err = s.cache.Put(ctx, cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, bytes.NewReader(f.Contents)) if err != nil && err != io.EOF { - s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) + s.logErrorPrintf(err, "%s %s %s", logPrefix, req.ActionDigest.Hash, err) code := gRPCErrCode(err, codes.Internal) return nil, status.Error(code, err.Error()) } @@ -308,7 +314,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, err = s.cache.Put(ctx, cache.CAS, hash, sizeBytes, bytes.NewReader(req.ActionResult.StdoutRaw)) if err != nil && err != io.EOF { - s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) + s.logErrorPrintf(err, "%s %s %s", logPrefix, req.ActionDigest.Hash, err) code := gRPCErrCode(err, codes.Internal) return nil, status.Error(code, err.Error()) } @@ -330,7 +336,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, err = s.cache.Put(ctx, cache.CAS, hash, sizeBytes, bytes.NewReader(req.ActionResult.StderrRaw)) if err != nil && err != io.EOF { - s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) + s.logErrorPrintf(err, "%s %s %s", logPrefix, req.ActionDigest.Hash, err) code := gRPCErrCode(err, codes.Internal) return nil, status.Error(code, err.Error()) } diff --git a/server/grpc_asset.go b/server/grpc_asset.go index 9a51436e6..ec187454c 100644 --- a/server/grpc_asset.go +++ b/server/grpc_asset.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/base64" "encoding/hex" + "fmt" "io" "net/http" "net/url" @@ -19,6 +20,7 @@ import ( pb "github.com/buchgr/bazel-remote/genproto/build/bazel/remote/execution/v2" "github.com/buchgr/bazel-remote/cache" + "github.com/buchgr/bazel-remote/cache/disk" ) // FetchServer implementation @@ -26,6 +28,13 @@ import ( var errNilFetchBlobRequest = grpc_status.Error(codes.InvalidArgument, "expected a non-nil *FetchBlobRequest") +var resourceExhaustedResponse = asset.FetchBlobResponse{ + Status: &status.Status{ + Code: int32(codes.ResourceExhausted), + Message: "Storage appears to be falling behind", + }, +} + func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) (*asset.FetchBlobResponse, error) { var sha256Str string @@ -114,8 +123,8 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) // See if we can download one of the URIs. for _, uri := range req.GetUris() { - ok, actualHash, size := s.fetchItem(ctx, uri, sha256Str) - if ok { + actualHash, size, err := s.fetchItem(ctx, uri, sha256Str) + if err == nil { return &asset.FetchBlobResponse{ Status: &status.Status{Code: int32(codes.OK)}, BlobDigest: &pb.Digest{ @@ -126,6 +135,10 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) }, nil } + if err == disk.ErrOverloaded { + return &resourceExhaustedResponse, nil + } + // Not a simple file. Not yet handled... } @@ -134,29 +147,29 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) }, nil } -func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (bool, string, int64) { +func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (string, int64, error) { u, err := url.Parse(uri) if err != nil { s.errorLogger.Printf("unable to parse URI: %s err: %v", uri, err) - return false, "", int64(-1) + return "", int64(-1), err } if u.Scheme != "http" && u.Scheme != "https" { s.errorLogger.Printf("unsupported URI: %s", uri) - return false, "", int64(-1) + return "", int64(-1), fmt.Errorf("Unknown URL scheme: %q", u.Scheme) } resp, err := http.Get(uri) if err != nil { s.errorLogger.Printf("failed to get URI: %s err: %v", uri, err) - return false, "", int64(-1) + return "", int64(-1), err } defer resp.Body.Close() rc := resp.Body s.accessLogger.Printf("GRPC ASSET FETCH %s %s", uri, resp.Status) if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return false, "", int64(-1) + return "", int64(-1), fmt.Errorf("Unsuccessful HTTP status code: %d", resp.StatusCode) } expectedSize := resp.ContentLength @@ -166,7 +179,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str data, err := io.ReadAll(resp.Body) if err != nil { s.errorLogger.Printf("failed to read data: %v", uri) - return false, "", int64(-1) + return "", int64(-1), err } expectedSize = int64(len(data)) @@ -176,7 +189,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str if expectedHash != "" && hashStr != expectedHash { s.errorLogger.Printf("URI data has hash %s, expected %s", hashStr, expectedHash) - return false, "", int64(-1) + return "", int64(-1), fmt.Errorf("URI data has hash %s, expected %s", hashStr, expectedHash) } expectedHash = hashStr @@ -186,10 +199,10 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str err = s.cache.Put(ctx, cache.CAS, expectedHash, expectedSize, rc) if err != nil && err != io.EOF { s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err) - return false, "", int64(-1) + return "", int64(-1), err } - return true, expectedHash, expectedSize + return expectedHash, expectedSize, nil } func (s *grpcServer) FetchDirectory(context.Context, *asset.FetchDirectoryRequest) (*asset.FetchDirectoryResponse, error) { diff --git a/server/grpc_bytestream.go b/server/grpc_bytestream.go index d9b3d8e49..78a5c4027 100644 --- a/server/grpc_bytestream.go +++ b/server/grpc_bytestream.go @@ -544,6 +544,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { } return nil } + if err == nil { // Unexpected early return. Should not happen. msg := fmt.Sprintf("GRPC BYTESTREAM WRITE INTERNAL ERROR %s", resourceName) @@ -552,8 +553,8 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { } msg := fmt.Sprintf("GRPC BYTESTREAM WRITE CACHE ERROR: %s %v", resourceName, err) - s.accessLogger.Printf(msg) - return status.Error(codes.Internal, msg) + s.logErrorPrintf(err, msg) + return status.Error(gRPCErrCode(err, codes.Internal), msg) } select { diff --git a/server/grpc_cas.go b/server/grpc_cas.go index 52518a189..0bb09d8a9 100644 --- a/server/grpc_cas.go +++ b/server/grpc_cas.go @@ -121,9 +121,10 @@ func (s *grpcServer) BatchUpdateBlobs(ctx context.Context, err = s.cache.Put(ctx, cache.CAS, req.Digest.Hash, int64(len(req.Data)), bytes.NewReader(req.Data)) if err != nil && err != io.EOF { - s.errorLogger.Printf("%s %s %s", errorPrefix, req.Digest.Hash, err) + s.logErrorPrintf(err, "%s %s %s", errorPrefix, req.Digest.Hash, err) rr.Status.Code = int32(gRPCErrCode(err, codes.Internal)) continue + } s.accessLogger.Printf("GRPC CAS PUT %s OK", req.Digest.Hash) diff --git a/server/http.go b/server/http.go index 81db487d4..4fbe4ace9 100644 --- a/server/http.go +++ b/server/http.go @@ -416,10 +416,17 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { if err != nil { if cerr, ok := err.(*cache.Error); ok { http.Error(w, err.Error(), cerr.Code) + if err == disk.ErrOverloaded { + // Using accessLogger to prevent too verbose logging + // to errorLogger. + h.logResponse(cerr.Code, r) + } else { + h.errorLogger.Printf("PUT %s: %s", path(kind, hash), err) + } } else { http.Error(w, err.Error(), http.StatusInternalServerError) + h.errorLogger.Printf("PUT %s: %s", path(kind, hash), err) } - h.errorLogger.Printf("PUT %s: %s", path(kind, hash), err) } else { h.logResponse(http.StatusOK, r) } diff --git a/utils/flags/flags.go b/utils/flags/flags.go index 02ba4be6f..08c5b1921 100644 --- a/utils/flags/flags.go +++ b/utils/flags/flags.go @@ -41,6 +41,12 @@ func GetCliFlags() []cli.Flag { Usage: "The maximum size of bazel-remote's disk cache in GiB. This flag is required.", EnvVars: []string{"BAZEL_REMOTE_MAX_SIZE"}, }, + &cli.Int64Flag{ + Name: "disk_size_limit", + Value: -1, + Usage: "The maximum size of bazel-remote's disk cache, including files queue for eviction. in GiB. Limit is disabled by default.", + EnvVars: []string{"BAZEL_REMOTE_DISK_SIZE_LIMIT"}, + }, &cli.StringFlag{ Name: "storage_mode", Value: "zstd",