Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor code interacting with pool for consistency, and misc fixes #2398

Merged
merged 9 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nix/tools/style.nix
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ let

trap "echo postgrest-style-check failed. Run postgrest-style to fix issues automatically." ERR

${git}/bin/git diff-index --exit-code HEAD -- '*.hs' '*.lhs' '*.nix'
${git}/bin/git diff-index --exit-code HEAD -- '*.hs' '*.lhs' '*.nix' '*.py'
'';

lint =
Expand Down
3 changes: 1 addition & 2 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import Network.Socket.ByteString
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai

import qualified Hasql.Pool as SQL
import qualified Hasql.Session as SQL

import qualified PostgREST.AppState as AppState
Expand All @@ -27,7 +26,7 @@ postgrestAdmin appState appConfig req respond = do
isConnectionUp <-
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> SQL.use (AppState.getPool appState) (SQL.sql "SELECT 1")
else isRight <$> AppState.usePool appState (SQL.sql "SELECT 1")

case Wai.pathInfo req of
["ready"] ->
Expand Down
17 changes: 8 additions & 9 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Data.HashMap.Strict as HM
import qualified Data.Set as S
import qualified Hasql.DynamicStatements.Snippet as SQL (Snippet)
import qualified Hasql.Pool as SQL
import qualified Hasql.Transaction as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.HTTP.Types.Header as HTTP
Expand Down Expand Up @@ -167,7 +166,7 @@ postgrest logLevel appState connWorker =
let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse conf maybeDbStructure jsonDbS pgVer (AppState.getPool appState) authResult req
runExceptT $ postgrestResponse appState conf maybeDbStructure jsonDbS pgVer authResult req

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
Expand All @@ -185,15 +184,15 @@ addRetryHint shouldAdd appState response = do
return $ Wai.mapResponseHeaders (\hs -> if shouldAdd then h:hs else hs) response

postgrestResponse
:: AppConfig
:: AppState.AppState
-> AppConfig
-> Maybe DbStructure
-> ByteString
-> PgVersion
-> SQL.Pool
-> AuthResult
-> Wai.Request
-> Handler IO Wai.Response
postgrestResponse conf@AppConfig{..} maybeDbStructure jsonDbS pgVer pool AuthResult{..} req = do
postgrestResponse appState conf@AppConfig{..} maybeDbStructure jsonDbS pgVer AuthResult{..} req = do
body <- lift $ Wai.strictRequestBody req

dbStructure <-
Expand All @@ -212,15 +211,15 @@ postgrestResponse conf@AppConfig{..} maybeDbStructure jsonDbS pgVer pool AuthRes
if iAction apiRequest == ActionInfo then
handleInfo (iTarget apiRequest) (ctx apiRequest)
else
runDbHandler pool (txMode apiRequest) (Just authRole /= configDbAnonRole) configDbPreparedStatements .
runDbHandler appState (txMode apiRequest) (Just authRole /= configDbAnonRole) configDbPreparedStatements .
Middleware.optionalRollback conf apiRequest $
Middleware.runPgLocals conf authClaims authRole (handleRequest . ctx) apiRequest jsonDbS pgVer

runDbHandler :: SQL.Pool -> SQL.Mode -> Bool -> Bool -> DbHandler a -> Handler IO a
runDbHandler pool mode authenticated prepared handler = do
runDbHandler :: AppState.AppState -> SQL.Mode -> Bool -> Bool -> DbHandler b -> Handler IO b
runDbHandler appState mode authenticated prepared handler = do
dbResp <-
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction in
lift . SQL.use pool . transaction SQL.ReadCommitted mode $ runExceptT handler
lift . AppState.usePool appState . transaction SQL.ReadCommitted mode $ runExceptT handler

resp <-
liftEither . mapLeft Error.PgErr $
Expand Down
11 changes: 6 additions & 5 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module PostgREST.AppState
, getJsonDbS
, getMainThreadId
, getPgVersion
, getPool
, getTime
, getRetryNextIn
, init
Expand All @@ -24,10 +23,12 @@ module PostgREST.AppState
, putRetryNextIn
, releasePool
, signalListener
, usePool
, waitListener
) where

import qualified Hasql.Pool as SQL
import qualified Hasql.Pool as SQL
import qualified Hasql.Session as SQL

import Control.AutoUpdate (defaultUpdateSettings, mkAutoUpdate,
updateAction)
Expand Down Expand Up @@ -93,11 +94,11 @@ initPool :: AppConfig -> IO SQL.Pool
initPool AppConfig{..} =
SQL.acquire (configDbPoolSize, configDbPoolTimeout, toUtf8 configDbUri)

getPool :: AppState -> SQL.Pool
getPool = statePool
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{..} = SQL.use statePool

releasePool :: AppState -> IO ()
releasePool AppState{..} = SQL.release statePool >> throwTo stateMainThreadId UserInterrupt
releasePool AppState{..} = SQL.release statePool

getPgVersion :: AppState -> IO PgVersion
getPgVersion = readIORef . statePgVersion
Expand Down
23 changes: 8 additions & 15 deletions src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ module PostgREST.CLI
import qualified Data.Aeson as JSON
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Hasql.Pool as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified Options.Applicative as O

Expand All @@ -36,32 +35,26 @@ main installSignalHandlers runAppWithSocket CLI{cliCommand, cliPath} = do
conf@AppConfig{..} <-
either panic identity <$> Config.readAppConfig mempty cliPath Nothing
appState <- AppState.init conf

-- Override the config with config options from the db
-- TODO: the same operation is repeated on connectionWorker, ideally this
-- would be done only once, but dump CmdDumpConfig needs it for tests.
when configDbConfig $ reReadConfig True appState

exec cliCommand appState
where
exec :: Command -> AppState -> IO ()
exec CmdDumpConfig appState = putStr . Config.toText =<< AppState.getConfig appState
exec CmdDumpSchema appState = putStrLn =<< dumpSchema appState
exec CmdRun appState = App.run installSignalHandlers runAppWithSocket appState
case cliCommand of
CmdDumpConfig -> do
when configDbConfig $ reReadConfig True appState
putStr . Config.toText =<< AppState.getConfig appState
CmdDumpSchema -> putStrLn =<< dumpSchema appState
CmdRun -> App.run installSignalHandlers runAppWithSocket appState

-- | Dump DbStructure schema to JSON
dumpSchema :: AppState -> IO LBS.ByteString
dumpSchema appState = do
AppConfig{..} <- AppState.getConfig appState
result <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
SQL.use (AppState.getPool appState) $
AppState.usePool appState $
transaction SQL.ReadCommitted SQL.Read $
queryDbStructure
(toList configDbSchemas)
configDbExtraSearchPath
configDbPreparedStatements
SQL.release $ AppState.getPool appState
AppState.releasePool appState
case result of
Left e -> do
hPutStrLn stderr $ "An error ocurred when loading the schema cache:\n" <> show e
Expand Down
8 changes: 3 additions & 5 deletions src/PostgREST/Config/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import PostgREST.Config.PgVersion (PgVersion (..))

import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import qualified Hasql.Pool as SQL
import Hasql.Session (Session, statement)
import qualified Hasql.Statement as SQL
import qualified Hasql.Transaction as SQL
Expand All @@ -29,11 +28,10 @@ pgVersionStatement = SQL.Statement sql HE.noParams versionRow False
sql = "SELECT current_setting('server_version_num')::integer, current_setting('server_version')"
versionRow = HD.singleRow $ PgVersion <$> column HD.int4 <*> column HD.text

queryDbSettings :: SQL.Pool -> Bool -> IO (Either SQL.UsageError [(Text, Text)])
queryDbSettings pool prepared =
queryDbSettings :: Bool -> Session [(Text, Text)]
queryDbSettings prepared =
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction in
SQL.use pool . transaction SQL.ReadCommitted SQL.Read $
SQL.statement mempty dbSettingsStatement
transaction SQL.ReadCommitted SQL.Read $ SQL.statement mempty dbSettingsStatement

-- | Get db settings from the connection role. Global settings will be overridden by database specific settings.
dbSettingsStatement :: SQL.Statement () [(Text, Text)]
Expand Down
7 changes: 5 additions & 2 deletions src/PostgREST/Unix.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ installSignalHandlers :: AppState.AppState -> IO ()
installSignalHandlers appState = do
-- Releases the connection pool whenever the program is terminated,
-- see https://github.com/PostgREST/postgrest/issues/268
install Signals.sigINT $ AppState.releasePool appState
install Signals.sigTERM $ AppState.releasePool appState
let interrupt = do
AppState.releasePool appState
throwTo (AppState.getMainThreadId appState) UserInterrupt
install Signals.sigINT interrupt
install Signals.sigTERM interrupt

-- The SIGUSR1 signal updates the internal 'DbStructure' by running
-- 'connectionWorker' exactly as before.
Expand Down
10 changes: 4 additions & 6 deletions src/PostgREST/Workers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text.Encoding as T
import qualified Hasql.Notifications as SQL
import qualified Hasql.Pool as SQL
import qualified Hasql.Transaction.Sessions as SQL

import Control.Retry (RetryStatus, capDelay, exponentialBackoff,
Expand Down Expand Up @@ -110,16 +109,15 @@ connectionWorker appState = do
connectionStatus :: AppState -> IO ConnectionStatus
connectionStatus appState =
retrying retrySettings shouldRetry $
const $ SQL.release pool >> getConnectionStatus
const $ AppState.releasePool appState >> getConnectionStatus
where
pool = AppState.getPool appState
retrySettings = capDelay delayMicroseconds $ exponentialBackoff backoffMicroseconds
delayMicroseconds = 32000000 -- 32 seconds
backoffMicroseconds = 1000000 -- 1 second

getConnectionStatus :: IO ConnectionStatus
getConnectionStatus = do
pgVersion <- SQL.use pool queryPgVersion
pgVersion <- AppState.usePool appState queryPgVersion
case pgVersion of
Left e -> do
let err = PgError False e
Expand Down Expand Up @@ -155,7 +153,7 @@ loadSchemaCache appState = do
AppConfig{..} <- AppState.getConfig appState
result <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
SQL.use (AppState.getPool appState) . transaction SQL.ReadCommitted SQL.Read $
AppState.usePool appState . transaction SQL.ReadCommitted SQL.Read $
queryDbStructure (toList configDbSchemas) configDbExtraSearchPath configDbPreparedStatements
case result of
Left e -> do
Expand Down Expand Up @@ -234,7 +232,7 @@ reReadConfig startingUp appState = do
AppConfig{..} <- AppState.getConfig appState
dbSettings <-
if configDbConfig then do
qDbSettings <- queryDbSettings (AppState.getPool appState) configDbPreparedStatements
qDbSettings <- AppState.usePool appState $ queryDbSettings configDbPreparedStatements
case qDbSettings of
Left e -> do
let
Expand Down
9 changes: 4 additions & 5 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,7 @@ def set_statement_timeout(postgrest, role, milliseconds):
use a postgrest instance that doesn't use the affected role."""

response = postgrest.session.post(
"/rpc/set_statement_timeout",
data={"role": role, "milliseconds": milliseconds}
"/rpc/set_statement_timeout", data={"role": role, "milliseconds": milliseconds}
)
assert response.status_code == 204

Expand All @@ -855,7 +854,7 @@ def test_statement_timeout(defaultenv, metapostgrest):
"Statement timeout times out slow statements"

role = "timeout_authenticator"
set_statement_timeout(metapostgrest, role, 1000) # 1 second
set_statement_timeout(metapostgrest, role, 1000) # 1 second

env = {
**defaultenv,
Expand Down Expand Up @@ -890,7 +889,7 @@ def test_change_statement_timeout(defaultenv, metapostgrest):
response = postgrest.session.get("/rpc/sleep?seconds=1")
assert response.status_code == 204

set_statement_timeout(metapostgrest, role, 500) # 0.5s
set_statement_timeout(metapostgrest, role, 500) # 0.5s

# trigger schema refresh
postgrest.process.send_signal(signal.SIGUSR1)
Expand All @@ -901,7 +900,7 @@ def test_change_statement_timeout(defaultenv, metapostgrest):
data = response.json()
assert data["message"] == "canceling statement due to statement timeout"

set_statement_timeout(metapostgrest, role, 2000) # 2s
set_statement_timeout(metapostgrest, role, 2000) # 2s

# trigger role setting refresh
postgrest.process.send_signal(signal.SIGUSR1)
Expand Down