Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-5696] Have a deduplicating job worker #18

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ jobs:
compilerVersion: 9.4.4
setup-method: ghcup
allow-failure: false
- compiler: ghc-9.2.3
- compiler: ghc-9.2.7
compilerKind: ghc
compilerVersion: 9.2.3
compilerVersion: 9.2.7
setup-method: ghcup
allow-failure: false
- compiler: ghc-9.0.2
Expand Down
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# GHC and Cabal
skykanin marked this conversation as resolved.
Show resolved Hide resolved
/dist/
/dist-newstyle/
/cabal.project.local
Expand All @@ -6,3 +7,13 @@ TAGS
.ghc.environment.*
.cabal-sandbox
cabal.sandbox.config

# direnv
.direnv/
.envrc

# emacs
**/.dir-locals.el

# postgres
_local/
2 changes: 1 addition & 1 deletion consumers.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ library
, hpqtypes >= 1.11 && < 2.0
, lifted-base >= 0.2 && < 0.3
, lifted-threads >= 1.0 && < 1.1
, log-base >= 0.11 && < 0.12
, log-base >= 0.11 && < 0.13
, monad-control >= 1.0 && < 1.1
, monad-time >= 0.4 && < 0.5
, mtl >= 2.2 && < 2.3
Expand Down
1 change: 1 addition & 0 deletions example/Example.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ main = do
, ccMaxRunningJobs = 1
, ccProcessJob = processJob
, ccOnException = handleException
, ccMode = Standard
}

-- Add a job to the consumer's queue.
Expand Down
57 changes: 38 additions & 19 deletions src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal
initialJobs <- liftBase $ readTVarIO runningJobsInfo
(`fix` initialJobs) $ \loop jobsInfo -> do
-- If jobs are still running, display info about them.
when (not $ M.null jobsInfo) $ do
unless (M.null jobsInfo) $ do
logInfo "Waiting for running jobs" $ object [
"job_id" .= showJobsInfo jobsInfo
]
Expand All @@ -108,7 +108,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal
-- If jobs info didn't change, wait for it to change.
-- Otherwise loop so it either displays the new info
-- or exits if there are no jobs running anymore.
if (newJobsInfo == jobsInfo)
if newJobsInfo == jobsInfo
then retry
else return $ loop newJobsInfo
where
Expand Down Expand Up @@ -167,7 +167,7 @@ spawnMonitor ConsumerConfig{..} cs cid = forkP "monitor" . forever $ do
if ok
then logInfo_ "Activity of the consumer updated"
else do
logInfo_ $ "Consumer is not registered"
logInfo_ "Consumer is not registered"
throwM ThreadKilled
-- Freeing jobs locked by inactive consumers needs to happen
-- exactly once, otherwise it's possible to free it twice, after
Expand Down Expand Up @@ -242,7 +242,11 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore

loop :: Int -> m Bool
loop limit = do
(batch, batchSize) <- reserveJobs limit
-- If we're running in 'Deduplicating' mode we only
-- reserve one job at a time.
(batch, batchSize) <- reserveJobs $ case ccMode of
Standard -> limit
Duplicating _ -> 1
when (batchSize > 0) $ do
logInfo "Processing batch" $ object [
"batch_size" .= batchSize
Expand Down Expand Up @@ -286,16 +290,28 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
(, n) . F.toList . fmap ccJobFetcher <$> queryResult
where
reservedJobs :: UTCTime -> SQL
reservedJobs now = smconcat [
"SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
reservedJobs now = case ccMode of
Standard -> smconcat [
"SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
Duplicating field -> smconcat [
"WITH latest_for_id AS"
, " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable
, " ORDER BY run_at," <+> field <> ", id" <+> "DESC LIMIT" <?> limit <+> "FOR UPDATE SKIP LOCKED),"
, " lock_all AS"
, " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable
, " WHERE" <+> field <+> "= (SELECT" <+> field <+> "FROM latest_for_id)"
, " AND id <= (SELECT id FROM latest_for_id)"
, " FOR UPDATE SKIP LOCKED)"
, "SELECT id FROM lock_all"
]

-- | Spawn each job in a separate thread.
startJob :: job -> m (job, m (T.Result Result))
Expand Down Expand Up @@ -329,10 +345,10 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
updateJobs :: [(idx, Result)] -> m ()
updateJobs results = runDBT cs ts $ do
now <- currentTime
runSQL_ $ smconcat [
"WITH removed AS ("
runSQL_ $ smconcat
[ "WITH removed AS ("
, " DELETE FROM" <+> raw ccJobsTable
, " WHERE id = ANY(" <?> Array1 deletes <+> ")"
, " WHERE id" <+> operator <+> "ANY (" <?> Array1 deletes <+> ")"
, ")"
, "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by = NULL"
Expand All @@ -345,15 +361,18 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
, " WHEN id = ANY(" <?> Array1 successes <+> ") THEN " <?> now
, " ELSE NULL"
, " END"
, "WHERE id = ANY(" <?> Array1 (map fst updates) <+> ")"
, "WHERE id" <+> operator <+> "ANY (" <?> Array1 (map fst updates) <+> ")"
]
where
operator = case ccMode of
Standard -> "="
Duplicating _field -> "<="
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to change the type signature of updateJobs so that it takes only a single (idx, Result) in the deduplicating case 🤔 The problem now is that if something goes awry and multiple ids are passed here, the condition id <= ANY (...) will wreak havoc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how were tests passing with this bug? :) They should be updated accordingly.

retryToSQL now (Left int) ids =
("WHEN id = ANY(" <?> Array1 ids <+> ") THEN " <?> now <> " +" <?> int :)
retryToSQL _ (Right time) ids =
("WHEN id = ANY(" <?> Array1 ids <+> ") THEN" <?> time :)

retries = foldr step M.empty $ map f updates
retries = foldr (step . f) M.empty updates
where
f (idx, result) = case result of
Ok action -> (idx, action)
Expand Down
11 changes: 10 additions & 1 deletion src/Database/PostgreSQL/Consumers/Config.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE ExistentialQuantification #-}
module Database.PostgreSQL.Consumers.Config (
Action(..)
, Mode(..)
, Result(..)
, ConsumerConfig(..)
) where
Expand All @@ -21,12 +22,16 @@ data Action
| RerunAfter Interval
| RerunAt UTCTime
| Remove
deriving (Eq, Ord, Show)
deriving (Eq, Ord, Show)

-- | Result of processing a job.
data Result = Ok Action | Failed Action
deriving (Eq, Ord, Show)

-- | The mode the consumer will run in.
data Mode = Standard | Duplicating SQL
skykanin marked this conversation as resolved.
Show resolved Hide resolved
deriving (Show)

-- | Config of a consumer.
data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig {
-- | Name of the database table where jobs are stored. The table needs to have
Expand Down Expand Up @@ -118,4 +123,8 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig {
-- Note that if this action throws an exception, the consumer goes
-- down, so it's best to ensure that it doesn't throw.
, ccOnException :: !(SomeException -> job -> m Action)
-- | The mode the consumer will use to reserve jobs.
-- In @'Duplicating'@ mode the SQL expression indicates which field
-- in the jobs table (specified by @'ccJobsTable'@) to select for duplication.
, ccMode :: Mode
}
Loading