Skip to content

Commit

Permalink
Fix updateJobs deduplication implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
skykanin committed May 9, 2023
1 parent 95e7312 commit 866ed7d
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
updateJobs :: [(idx, Result)] -> m ()
updateJobs results = runDBT cs ts $ do
now <- currentTime
runSQL_ $ smconcat [
"WITH removed AS ("
runSQL_ $ smconcat
[ "WITH removed AS ("
, " DELETE FROM" <+> raw ccJobsTable
, " WHERE id = ANY(" <?> Array1 deletes <+> ")"
, " WHERE id" <+> operator <+> "ANY (" <?> Array1 deletes <+> ")"
, ")"
, "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by = NULL"
Expand All @@ -361,9 +361,12 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
, " WHEN id = ANY(" <?> Array1 successes <+> ") THEN " <?> now
, " ELSE NULL"
, " END"
, "WHERE id = ANY(" <?> Array1 (map fst updates) <+> ")"
, "WHERE id" <+> operator <+> "ANY (" <?> Array1 (map fst updates) <+> ")"
]
where
operator = case ccMode of
Standard -> "="
Duplicating _field -> "<="
retryToSQL now (Left int) ids =
("WHEN id = ANY(" <?> Array1 ids <+> ") THEN " <?> now <> " +" <?> int :)
retryToSQL _ (Right time) ids =
Expand Down

0 comments on commit 866ed7d

Please sign in to comment.