Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

correct exponential backoff on Listener #3572

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ postgrest logLevel appState connWorker =
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getRetryNextIn appState
delay <- AppState.getNextDelay appState
return $ addRetryHint delay response
respond resp

Expand Down
27 changes: 19 additions & 8 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
, getSchemaCache
, getMainThreadId
, getPgVersion
, getRetryNextIn
, getNextDelay
, getNextListenerDelay
, getTime
, getJwtCache
, getSocketREST
, getSocketAdmin
, init
, initSockets
, initWithPool
, putNextListenerDelay
, putSchemaCache
, putPgVersion
, putIsListenerOn
Expand Down Expand Up @@ -102,8 +104,10 @@
, stateGetTime :: IO UTCTime
-- | Used for killing the main thread in case a subthread fails
, stateMainThreadId :: ThreadId
-- | Keeps track of when the next retry for connecting to database is scheduled
, stateRetryNextIn :: IORef Int
-- | Keeps track of the next delay for db connection retry
, stateNextDelay :: IORef Int
-- | Keeps track of the next delay for the listener
, stateNextListenerDelay :: IORef Int
-- | JWT Cache
, jwtCache :: C.Cache ByteString AuthResult
-- | Network socket for REST API
Expand Down Expand Up @@ -156,6 +160,7 @@
<*> mkAutoUpdate defaultUpdateSettings { updateAction = getCurrentTime }
<*> myThreadId
<*> newIORef 0
<*> newIORef 1
<*> C.newCache Nothing
<*> pure sock
<*> pure adminSock
Expand Down Expand Up @@ -300,11 +305,17 @@
connectionWorker :: AppState -> IO ()
connectionWorker = debouncedConnectionWorker

getRetryNextIn :: AppState -> IO Int
getRetryNextIn = readIORef . stateRetryNextIn
getNextDelay :: AppState -> IO Int
getNextDelay = readIORef . stateNextDelay

putRetryNextIn :: AppState -> Int -> IO ()
putRetryNextIn = atomicWriteIORef . stateRetryNextIn
putNextDelay :: AppState -> Int -> IO ()
putNextDelay = atomicWriteIORef . stateNextDelay

getNextListenerDelay :: AppState -> IO Int
getNextListenerDelay = readIORef . stateNextListenerDelay

putNextListenerDelay :: AppState -> Int -> IO ()
putNextListenerDelay = atomicWriteIORef . stateNextListenerDelay

Check warning on line 318 in src/PostgREST/AppState.hs

View check run for this annotation

Codecov / codecov/patch

src/PostgREST/AppState.hs#L318

Added line #L318 was not covered by tests

getConfig :: AppState -> IO AppConfig
getConfig = readIORef . stateConf
Expand Down Expand Up @@ -474,7 +485,7 @@
delay = fromMaybe 0 (rsPreviousDelay rs) `div` oneSecondInUs
itShould = ConnPending == isConnSucc && configDbPoolAutomaticRecovery
when itShould $ observer $ ConnectionRetryObs delay
when itShould $ putRetryNextIn appState delay
when itShould $ putNextDelay appState delay
return itShould

retryPolicy :: RetryPolicy
Expand Down
84 changes: 32 additions & 52 deletions src/PostgREST/Listener.hs
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}

module PostgREST.Listener (runListener) where

import qualified Data.ByteString.Char8 as BS

import Control.Exception (throw)
import Data.Either.Combinators (whenLeft)

import qualified Hasql.Connection as SQL
import qualified Hasql.Notifications as SQL
import PostgREST.AppState (AppState, getConfig)
import PostgREST.Config (AppConfig (..))
import PostgREST.Observation (Observation (..))
import PostgREST.Version (prettyVersion)

import Control.Retry (RetryPolicy, RetryStatus (..),
capDelay, exponentialBackoff,
recoverAll, rsPreviousDelay)
import qualified PostgREST.AppState as AppState
import qualified PostgREST.Config as Config

Expand All @@ -31,55 +24,53 @@
when configDbChannelEnabled $
void . forkIO $ retryingListen appState

-- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff if the LISTEN connection is lost.
-- TODO Once the listen channel is recovered, the retry status is not reset. So if the last backoff was 4 seconds, the next time recovery kicks in the backoff will be 8 seconds.
-- This is because `Hasql.Notifications.waitForNotifications` uses a forever loop that only finishes when it throws an exception.
-- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff with a cap of 32 seconds, if the LISTEN connection is lost.
retryingListen :: AppState -> IO ()
retryingListen appState = do
AppConfig{..} <- AppState.getConfig appState
let
dbChannel = toS configDbChannel
-- Try, catch and rethrow the exception. This is done so we can observe the failure message and let Control.Retry.recoverAll do its work.
-- There's a `Control.Retry.recovering` we could use to avoid this rethrowing, but it's more complex to use.
-- The root cause of these workarounds is that `Hasql.Notifications.waitForNotifications` uses exceptions.
tryRethrow :: IO () -> IO ()
tryRethrow action = do
act <- try action
whenLeft act (\ex -> do
AppState.putIsListenerOn appState False
observer $ DBListenFail dbChannel (Right $ Left ex)
unless configDbPoolAutomaticRecovery $ do
killThread mainThreadId
throw ex)

recoverAll retryPolicy (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do

when (rsIterNumber > 0) $
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs in
handleFinally err = do
AppState.putIsListenerOn appState False
observer $ DBListenFail dbChannel (Right err)
unless configDbPoolAutomaticRecovery $
killThread mainThreadId

-- retry the listener
delay <- AppState.getNextListenerDelay appState
observer $ DBListenRetry delay

connection <- SQL.acquire $ toUtf8 (Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri)
case connection of
Right conn -> do

tryRethrow $ SQL.listen conn $ SQL.toPgIdentifier dbChannel

threadDelay (delay * oneSecondInMicro)
unless (delay == maxDelay) $
AppState.putNextListenerDelay appState (delay * 2)
retryingListen appState

Check warning on line 45 in src/PostgREST/Listener.hs

View check run for this annotation

Codecov / codecov/patch

src/PostgREST/Listener.hs#L43-L45

Added lines #L43 - L45 were not covered by tests

-- forkFinally allows to detect if the thread dies
void . flip forkFinally handleFinally $ do
dbOrError <- SQL.acquire $ toUtf8 (Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri)
case dbOrError of
Right db -> do
SQL.listen db $ SQL.toPgIdentifier dbChannel
AppState.putIsListenerOn appState True
observer $ DBListenStart dbChannel

when (rsIterNumber > 0) $ do
-- once we can LISTEN again, we might have lost schema cache notificacions, so reload
delay <- AppState.getNextListenerDelay appState
when (delay > 1) $ do -- if we did a retry
-- assume we lost notifications, call the connection worker which will also reload the schema cache
AppState.connectionWorker appState
-- reset the delay
AppState.putNextListenerDelay appState 1

Check warning on line 60 in src/PostgREST/Listener.hs

View check run for this annotation

Codecov / codecov/patch

src/PostgREST/Listener.hs#L60

Added line #L60 was not covered by tests

tryRethrow $ SQL.waitForNotifications handleNotification conn
observer $ DBListenStart dbChannel
SQL.waitForNotifications handleNotification db

Left err -> do
observer $ DBListenFail dbChannel (Left err)
-- throw an exception so recoverAll works
exitFailure
)

where
observer = AppState.getObserver appState
mainThreadId = AppState.getMainThreadId appState
oneSecondInMicro = 1000000
maxDelay = 32

Check warning on line 72 in src/PostgREST/Listener.hs

View check run for this annotation

Codecov / codecov/patch

src/PostgREST/Listener.hs#L72

Added line #L72 was not covered by tests

handleNotification channel msg =
if | BS.null msg -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
| msg == "reload schema" -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
Expand All @@ -88,14 +79,3 @@

cacheReloader =
AppState.connectionWorker appState

observer = AppState.getObserver appState
mainThreadId = AppState.getMainThreadId appState

retryPolicy :: RetryPolicy
retryPolicy =
let
delayMicroseconds = 32000000 -- 32 seconds
in
capDelay delayMicroseconds $ exponentialBackoff oneSecondInUs
oneSecondInUs = 1000000 -- | One second in microseconds