From 257124ce41cf7395943555cf5b7757492067f251 Mon Sep 17 00:00:00 2001 From: steve-chavez Date: Mon, 3 Jun 2024 18:45:59 -0500 Subject: [PATCH] correct exponential backoff on Listener Clears the limitation mentioned on https://github.com/PostgREST/postgrest/pull/3536 The Listener no longer uses the https://hackage.haskell.org/package/retry package and instead uses a much simpler IORef in AppState for the delays. Additionally it no longer uses exception throwing/catching, which is rather messy and brings some concerns(https://github.com/PostgREST/postgrest/issues/3569#issuecomment-2146013327). --- src/PostgREST/App.hs | 2 +- src/PostgREST/AppState.hs | 27 +++++++++---- src/PostgREST/Listener.hs | 84 +++++++++++++++------------------------ 3 files changed, 52 insertions(+), 61 deletions(-) diff --git a/src/PostgREST/App.hs b/src/PostgREST/App.hs index d7a3d4eac8..aeb43ffd2b 100644 --- a/src/PostgREST/App.hs +++ b/src/PostgREST/App.hs @@ -120,7 +120,7 @@ postgrest logLevel appState connWorker = -- the connWorker is done. when (isServiceUnavailable response) connWorker resp <- do - delay <- AppState.getRetryNextIn appState + delay <- AppState.getNextDelay appState return $ addRetryHint delay response respond resp diff --git a/src/PostgREST/AppState.hs b/src/PostgREST/AppState.hs index 62d63f8e99..391b1f0e5a 100644 --- a/src/PostgREST/AppState.hs +++ b/src/PostgREST/AppState.hs @@ -10,7 +10,8 @@ module PostgREST.AppState , getSchemaCache , getMainThreadId , getPgVersion - , getRetryNextIn + , getNextDelay + , getNextListenerDelay , getTime , getJwtCache , getSocketREST @@ -18,6 +19,7 @@ module PostgREST.AppState , init , initSockets , initWithPool + , putNextListenerDelay , putSchemaCache , putPgVersion , putIsListenerOn @@ -102,8 +104,10 @@ data AppState = AppState , stateGetTime :: IO UTCTime -- | Used for killing the main thread in case a subthread fails , stateMainThreadId :: ThreadId - -- | Keeps track of when the next retry for connecting to database is scheduled - , stateRetryNextIn :: IORef Int + -- | Keeps track of the next delay for db connection retry + , stateNextDelay :: IORef Int + -- | Keeps track of the next delay for the listener + , stateNextListenerDelay :: IORef Int -- | JWT Cache , jwtCache :: C.Cache ByteString AuthResult -- | Network socket for REST API @@ -156,6 +160,7 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do <*> mkAutoUpdate defaultUpdateSettings { updateAction = getCurrentTime } <*> myThreadId <*> newIORef 0 + <*> newIORef 1 <*> C.newCache Nothing <*> pure sock <*> pure adminSock @@ -300,11 +305,17 @@ putSchemaCache appState = atomicWriteIORef (stateSchemaCache appState) connectionWorker :: AppState -> IO () connectionWorker = debouncedConnectionWorker -getRetryNextIn :: AppState -> IO Int -getRetryNextIn = readIORef . stateRetryNextIn +getNextDelay :: AppState -> IO Int +getNextDelay = readIORef . stateNextDelay -putRetryNextIn :: AppState -> Int -> IO () -putRetryNextIn = atomicWriteIORef . stateRetryNextIn +putNextDelay :: AppState -> Int -> IO () +putNextDelay = atomicWriteIORef . stateNextDelay + +getNextListenerDelay :: AppState -> IO Int +getNextListenerDelay = readIORef . stateNextListenerDelay + +putNextListenerDelay :: AppState -> Int -> IO () +putNextListenerDelay = atomicWriteIORef . stateNextListenerDelay getConfig :: AppState -> IO AppConfig getConfig = readIORef . stateConf @@ -474,7 +485,7 @@ establishConnection appState@AppState{stateObserver=observer} = delay = fromMaybe 0 (rsPreviousDelay rs) `div` oneSecondInUs itShould = ConnPending == isConnSucc && configDbPoolAutomaticRecovery when itShould $ observer $ ConnectionRetryObs delay - when itShould $ putRetryNextIn appState delay + when itShould $ putNextDelay appState delay return itShould retryPolicy :: RetryPolicy diff --git a/src/PostgREST/Listener.hs b/src/PostgREST/Listener.hs index 9a8f68fe0e..7438000e39 100644 --- a/src/PostgREST/Listener.hs +++ b/src/PostgREST/Listener.hs @@ -1,14 +1,10 @@ {-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RecordWildCards #-} module PostgREST.Listener (runListener) where import qualified Data.ByteString.Char8 as BS -import Control.Exception (throw) -import Data.Either.Combinators (whenLeft) - import qualified Hasql.Connection as SQL import qualified Hasql.Notifications as SQL import PostgREST.AppState (AppState, getConfig) @@ -16,9 +12,6 @@ import PostgREST.Config (AppConfig (..)) import PostgREST.Observation (Observation (..)) import PostgREST.Version (prettyVersion) -import Control.Retry (RetryPolicy, RetryStatus (..), - capDelay, exponentialBackoff, - recoverAll, rsPreviousDelay) import qualified PostgREST.AppState as AppState import qualified PostgREST.Config as Config @@ -31,55 +24,53 @@ runListener appState = do when configDbChannelEnabled $ void . forkIO $ retryingListen appState --- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff if the LISTEN connection is lost. --- TODO Once the listen channel is recovered, the retry status is not reset. So if the last backoff was 4 seconds, the next time recovery kicks in the backoff will be 8 seconds. --- This is because `Hasql.Notifications.waitForNotifications` uses a forever loop that only finishes when it throws an exception. +-- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff with a cap of 32 seconds, if the LISTEN connection is lost. retryingListen :: AppState -> IO () retryingListen appState = do AppConfig{..} <- AppState.getConfig appState let dbChannel = toS configDbChannel - -- Try, catch and rethrow the exception. This is done so we can observe the failure message and let Control.Retry.recoverAll do its work. - -- There's a `Control.Retry.recovering` we could use to avoid this rethrowing, but it's more complex to use. - -- The root cause of these workarounds is that `Hasql.Notifications.waitForNotifications` uses exceptions. - tryRethrow :: IO () -> IO () - tryRethrow action = do - act <- try action - whenLeft act (\ex -> do - AppState.putIsListenerOn appState False - observer $ DBListenFail dbChannel (Right $ Left ex) - unless configDbPoolAutomaticRecovery $ do - killThread mainThreadId - throw ex) - - recoverAll retryPolicy (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do - - when (rsIterNumber > 0) $ - let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs in + handleFinally err = do + AppState.putIsListenerOn appState False + observer $ DBListenFail dbChannel (Right err) + unless configDbPoolAutomaticRecovery $ + killThread mainThreadId + + -- retry the listener + delay <- AppState.getNextListenerDelay appState observer $ DBListenRetry delay - - connection <- SQL.acquire $ toUtf8 (Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri) - case connection of - Right conn -> do - - tryRethrow $ SQL.listen conn $ SQL.toPgIdentifier dbChannel - + threadDelay (delay * oneSecondInMicro) + unless (delay == maxDelay) $ + AppState.putNextListenerDelay appState (delay * 2) + retryingListen appState + + -- forkFinally allows to detect if the thread dies + void . flip forkFinally handleFinally $ do + dbOrError <- SQL.acquire $ toUtf8 (Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri) + case dbOrError of + Right db -> do + SQL.listen db $ SQL.toPgIdentifier dbChannel AppState.putIsListenerOn appState True - observer $ DBListenStart dbChannel - when (rsIterNumber > 0) $ do - -- once we can LISTEN again, we might have lost schema cache notificacions, so reload + delay <- AppState.getNextListenerDelay appState + when (delay > 1) $ do -- if we did a retry + -- assume we lost notifications, call the connection worker which will also reload the schema cache AppState.connectionWorker appState + -- reset the delay + AppState.putNextListenerDelay appState 1 - tryRethrow $ SQL.waitForNotifications handleNotification conn + observer $ DBListenStart dbChannel + SQL.waitForNotifications handleNotification db Left err -> do observer $ DBListenFail dbChannel (Left err) - -- throw an exception so recoverAll works exitFailure - ) - where + observer = AppState.getObserver appState + mainThreadId = AppState.getMainThreadId appState + oneSecondInMicro = 1000000 + maxDelay = 32 + handleNotification channel msg = if | BS.null msg -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader | msg == "reload schema" -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader @@ -88,14 +79,3 @@ retryingListen appState = do cacheReloader = AppState.connectionWorker appState - - observer = AppState.getObserver appState - mainThreadId = AppState.getMainThreadId appState - - retryPolicy :: RetryPolicy - retryPolicy = - let - delayMicroseconds = 32000000 -- 32 seconds - in - capDelay delayMicroseconds $ exponentialBackoff oneSecondInUs - oneSecondInUs = 1000000 -- | One second in microseconds