From daca73a24a29eace6461664829e4072695c3bc32 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Thu, 12 Dec 2024 13:20:36 +0000 Subject: [PATCH 1/9] Add file size info to file connector state --- src/Ambar/Emulator.hs | 9 +++++---- src/Ambar/Emulator/Config.hs | 6 +++++- src/Ambar/Emulator/Connector/File.hs | 28 ++++++++++++++++++++++------ 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/Ambar/Emulator.hs b/src/Ambar/Emulator.hs index c8fdf58..07d4a83 100644 --- a/src/Ambar/Emulator.hs +++ b/src/Ambar/Emulator.hs @@ -16,6 +16,7 @@ import System.Directory (doesFileExist) import System.FilePath (()) import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder) +import Ambar.Emulator.Connector.File (FileConnectorState) import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState) import Ambar.Emulator.Connector.MySQL (MySQLState) import Ambar.Emulator.Connector.Postgres (PostgreSQLState) @@ -98,7 +99,7 @@ emulate logger_ config env = do SourcePostgreSQL _ -> StatePostgres def SourceMySQL _ -> StateMySQL def SourceSQLServer _ -> StateSQLServer def - SourceFile _ -> StateFile () + SourceFile _ -> StateFile def projectAll queue = forConcurrently_ (c_destinations env) (project queue) @@ -137,7 +138,7 @@ data SavedState = StatePostgres PostgreSQLState | StateMySQL MySQLState | StateSQLServer SQLServerState - | StateFile () + | StateFile FileConnectorState deriving (Generic) deriving anyclass (ToJSON, FromJSON) @@ -170,8 +171,8 @@ toConnectorConfig source sstate = _ -> incompatible SourceFile path -> case sstate of - StateFile () -> - return $ ConnectorConfig source path () StateFile + StateFile state -> + return $ ConnectorConfig source path state StateFile _ -> incompatible where incompatible = throwIO $ ErrorCall $ diff --git a/src/Ambar/Emulator/Config.hs b/src/Ambar/Emulator/Config.hs index 7416672..ca62b9a 100644 --- a/src/Ambar/Emulator/Config.hs +++ b/src/Ambar/Emulator/Config.hs @@ -150,7 +150,11 @@ instance FromJSON DataSource where c_incrementingColumn <- o .: "autoIncrementingColumn" return $ SourceSQLServer SQLServer{..} - parseFile o = SourceFile . FileConnector <$> (o .: "path") + parseFile o = do + c_path <- o .: "path" + c_partitioningField <- o .: "partitioningField" + c_incrementingField <- o .: "incrementingField" + return $ SourceFile FileConnector{..} parseDataDestination :: Map (Id DataSource) DataSource diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 5b622c8..56db16e 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -1,5 +1,6 @@ module Ambar.Emulator.Connector.File ( FileConnector(..) + , FileConnectorState(..) ) where {-| File connector. @@ -7,12 +8,16 @@ Read JSON values from a file. One value per line. -} -import qualified Data.Aeson as Json import Control.Monad (forM_) +import qualified Data.Aeson as Json import qualified Data.ByteString.Lazy.Char8 as Char8 import qualified Data.ByteString.Lazy as LB +import Data.Default (Default) +import Data.Int (Int64) +import Data.Text (Text) import qualified Data.Text.Lazy as Text import qualified Data.Text.Lazy.Encoding as Text +import GHC.Generics (Generic) import qualified Ambar.Emulator.Connector as C import Ambar.Emulator.Queue.Topic (modPartitioner) @@ -20,20 +25,31 @@ import qualified Ambar.Emulator.Queue.Topic as Topic import Utils.Async (withAsyncThrow) import Utils.Logger (fatal, logInfo) -data FileConnector = FileConnector FilePath +data FileConnector = FileConnector + { c_path :: FilePath + , c_partitioningField :: Text + , c_incrementingField :: Text + } + +data FileConnectorState = FileConnectorState + { c_fileSize :: Int64 + , c_currentOffset :: Int64 + } + deriving (Show, Generic) + deriving anyclass (Json.ToJSON, Json.FromJSON, Default) newtype FileRecord = FileRecord Json.Value instance C.Connector FileConnector where - type ConnectorState FileConnector = () + type ConnectorState FileConnector = FileConnectorState type ConnectorRecord FileConnector = FileRecord partitioner = modPartitioner (const 1) encoder (FileRecord value) = LB.toStrict $ Json.encode value - connect (FileConnector path) logger () producer f = - withAsyncThrow worker $ f (return ()) + connect (FileConnector {..}) logger initialState producer f = + withAsyncThrow worker $ f (return initialState) where worker = do - bs <- Char8.readFile path + bs <- Char8.readFile c_path forM_ (Char8.lines bs) $ \line -> do value <- case Json.eitherDecode line of Left e -> fatal logger $ unlines From eb22ed4457e0fb34e1c3af9b28571a3299df953a Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Thu, 12 Dec 2024 15:11:37 +0000 Subject: [PATCH 2/9] Detect added records in file connector --- src/Ambar/Emulator/Connector/File.hs | 143 ++++++++++++++++++++------- 1 file changed, 110 insertions(+), 33 deletions(-) diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 56db16e..0bb8042 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -1,42 +1,57 @@ module Ambar.Emulator.Connector.File - ( FileConnector(..) - , FileConnectorState(..) - ) where + ( FileConnector(..) + , FileConnectorState(..) + ) where {-| File connector. Read JSON values from a file. One value per line. -} -import Control.Monad (forM_) +import Control.Concurrent.STM (STM, newTVarIO, readTVar, readTVarIO, atomically, writeTVar) +import Control.Monad (forever) import qualified Data.Aeson as Json -import qualified Data.ByteString.Lazy.Char8 as Char8 +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.ByteString as BS +import qualified Data.ByteString.Char8 as Char8 import qualified Data.ByteString.Lazy as LB import Data.Default (Default) -import Data.Int (Int64) +import Data.Maybe (fromMaybe) +import Data.String (IsString(fromString)) import Data.Text (Text) -import qualified Data.Text.Lazy as Text -import qualified Data.Text.Lazy.Encoding as Text +import qualified Data.Text.Lazy as LText +import qualified Data.Text.Lazy.Encoding as LText +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text import GHC.Generics (Generic) +import System.IO (Handle, withFile, hFileSize, hSeek, IOMode(..), SeekMode(..)) +import Prettyprinter ((<+>)) import qualified Ambar.Emulator.Connector as C import Ambar.Emulator.Queue.Topic (modPartitioner) +import Ambar.Emulator.Queue.Topic (Producer) import qualified Ambar.Emulator.Queue.Topic as Topic +import Ambar.Record (Value(..)) import Utils.Async (withAsyncThrow) -import Utils.Logger (fatal, logInfo) +import Utils.Logger (SimpleLogger, fatal, logInfo) +import Utils.Delay (Duration, delay, millis) +import Utils.Prettyprinter (prettyJSON, renderPretty, commaSeparated) + +_POLLING_INTERVAL :: Duration +_POLLING_INTERVAL = millis 50 data FileConnector = FileConnector - { c_path :: FilePath - , c_partitioningField :: Text - , c_incrementingField :: Text - } + { c_path :: FilePath + , c_partitioningField :: Text + , c_incrementingField :: Text + } data FileConnectorState = FileConnectorState - { c_fileSize :: Int64 - , c_currentOffset :: Int64 - } - deriving (Show, Generic) - deriving anyclass (Json.ToJSON, Json.FromJSON, Default) + { c_fileSize :: Integer + , c_currentOffset :: Integer + } + deriving (Show, Generic) + deriving anyclass (Json.ToJSON, Json.FromJSON, Default) newtype FileRecord = FileRecord Json.Value @@ -45,19 +60,81 @@ instance C.Connector FileConnector where type ConnectorRecord FileConnector = FileRecord partitioner = modPartitioner (const 1) encoder (FileRecord value) = LB.toStrict $ Json.encode value - connect (FileConnector {..}) logger initialState producer f = - withAsyncThrow worker $ f (return initialState) - where - worker = do - bs <- Char8.readFile c_path - forM_ (Char8.lines bs) $ \line -> do - value <- case Json.eitherDecode line of - Left e -> fatal logger $ unlines - [ "Unable to decode value from source:" - , show e - , Text.unpack $ Text.decodeUtf8 bs - ] - Right v -> return v - Topic.write producer (FileRecord value) - logInfo logger $ "ingested. " <> Text.decodeUtf8 line + connect = connect + +connect + :: FileConnector + -> SimpleLogger + -> FileConnectorState + -> Producer (FileRecord) + -> (STM FileConnectorState -> IO a) + -> IO a +connect conn@(FileConnector {..}) logger initState producer f = do + svar <- newTVarIO initState + let initialOffset = c_currentOffset initState + withFile c_path ReadMode $ \h -> do + hSeek h AbsoluteSeek initialOffset + withAsyncThrow (worker h svar) $ f (readTVar svar) + where + worker h svar = forever $ do + FileConnectorState fsize0 offset <- readTVarIO svar + fsize <- waitForData h fsize0 offset + line <- Char8.hGetLine h + value <- case Json.eitherDecode $ LB.fromStrict line of + Left e -> fatal logger $ unlines + [ "Unable to decode value from source:" + , show e + , Text.unpack $ Text.decodeUtf8 line + ] + Right v -> return v + let record = FileRecord value + Topic.write producer record + logResult record + let offset' = offset + fromIntegral (BS.length line) + atomically $ writeTVar svar $ FileConnectorState fsize offset' + + waitForData :: Handle -> Integer -> Integer -> IO Integer + waitForData h fsize offset = + if fsize > offset + then return fsize + else do + fsize' <- hFileSize h + let stillNoData = fsize' == fsize + if stillNoData + then do + delay _POLLING_INTERVAL + waitForData h fsize offset + else return fsize' + + logResult record = + logInfo logger $ renderPretty $ + "ingested." <+> commaSeparated + [ "incrementing_value:" <+> prettyJSON (incrementingValue conn record) + , "partitioning_value:" <+> prettyJSON (partitioningValue conn record) + ] + +partitioningValue :: FileConnector -> FileRecord -> Value +partitioningValue FileConnector{..} r = getField c_partitioningField r + +incrementingValue :: FileConnector -> FileRecord -> Value +incrementingValue FileConnector{..} r = getField c_incrementingField r + +getField :: Text -> FileRecord -> Value +getField field (FileRecord json) = + fromMaybe err $ do + o <- getObject json + let key = fromString $ Text.unpack field + v <- KeyMap.lookup key o + let txt = jsonToTxt v + return $ Json txt v + where + err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json + + jsonToTxt :: Json.Value -> Text + jsonToTxt = LText.toStrict . LText.decodeUtf8 . Json.encode + + getObject :: Json.Value -> Maybe Json.Object + getObject = \case + Json.Object o -> Just o + _ -> Nothing From 4f45300c39e6ae249a5e5560a44a8698d1c6dab8 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 12:21:24 +0000 Subject: [PATCH 3/9] Write file connector tests --- emulator.cabal | 1 + src/Ambar/Emulator/Connector/File.hs | 1 + tests/Test/Connector.hs | 9 ++- tests/Test/Connector/File.hs | 98 ++++++++++++++++++++++++++++ tests/Test/Utils/SQL.hs | 18 +++-- tests/Tests.hs | 2 +- 6 files changed, 118 insertions(+), 11 deletions(-) create mode 100644 tests/Test/Connector/File.hs diff --git a/emulator.cabal b/emulator.cabal index 1a5758f..6ecd2e0 100644 --- a/emulator.cabal +++ b/emulator.cabal @@ -119,6 +119,7 @@ test-suite emulator-tests Test.Config Test.Queue Test.Connector + Test.Connector.File Test.Connector.MySQL Test.Connector.PostgreSQL Test.Connector.MicrosoftSQLServer diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 0bb8042..3b6e166 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -1,6 +1,7 @@ module Ambar.Emulator.Connector.File ( FileConnector(..) , FileConnectorState(..) + , FileRecord ) where {-| File connector. diff --git a/tests/Test/Connector.hs b/tests/Test/Connector.hs index 585f2c3..01ab846 100644 --- a/tests/Test/Connector.hs +++ b/tests/Test/Connector.hs @@ -11,6 +11,7 @@ import Test.Hspec , shouldBe ) import Ambar.Emulator.Connector.Poll (Boundaries(..), mark, boundaries, cleanup) +import Test.Connector.File (FileConnectorInfo, testFileConnector, withFileConnectorInfo) import Test.Connector.PostgreSQL (PostgresCreds, testPostgreSQL, withPostgreSQL) import Test.Connector.MySQL (MySQLCreds, testMySQL, withMySQL) import Test.Connector.MicrosoftSQLServer (MicrosoftSQLServerCreds, testMicrosoftSQLServer, withMicrosoftSQLServer) @@ -22,18 +23,21 @@ data Databases = Databases (OnDemand PostgresCreds) (OnDemand MySQLCreds) (OnDemand MicrosoftSQLServerCreds) + (OnDemand FileConnectorInfo) withDatabases :: (Databases -> IO a) -> IO a withDatabases f = OnDemand.withLazy withPostgreSQL $ \pcreds -> OnDemand.withLazy withMySQL $ \mcreds -> OnDemand.withLazy withMicrosoftSQLServer $ \screds -> - f (Databases pcreds mcreds screds) + OnDemand.withLazy withFileConnectorInfo $ \finfo -> + f (Databases pcreds mcreds screds finfo) testConnectors :: Databases -> Spec -testConnectors (Databases pcreds mcreds screds) = do +testConnectors (Databases pcreds mcreds screds finfo) = do describe "connector" $ do testPollingConnector + testFileConnector finfo testPostgreSQL pcreds testMySQL mcreds testMicrosoftSQLServer screds @@ -74,4 +78,3 @@ testPollingConnector = describe "Poll" $ it "cleanup doesn't remove ranges ending higher than time given" $ do (boundaries . cleanup 2 . bs) [1 ,2, 5, 3] `shouldBe` Boundaries [(1,3), (5,5)] - diff --git a/tests/Test/Connector/File.hs b/tests/Test/Connector/File.hs new file mode 100644 index 0000000..ecfc62c --- /dev/null +++ b/tests/Test/Connector/File.hs @@ -0,0 +1,98 @@ +{-# OPTIONS_GHC -Wno-orphans #-} +module Test.Connector.File + ( testFileConnector + , withFileConnectorInfo + , FileConnectorInfo + ) where + +import Control.Concurrent (MVar, newMVar, modifyMVar) +import Data.ByteString.Lazy.Char8 as Char8 +import qualified Data.Aeson as Json +import Data.Map (Map) +import qualified Data.Map as Map +import Control.Monad (forM_, forM) +import Data.Text (Text) +import System.FilePath (()) +import System.IO (withFile, IOMode(..)) +import System.IO.Temp (withSystemTempFile) +import Test.Hspec (Spec, describe) + +import Ambar.Emulator.Connector.File (FileConnector(..)) + +import Test.Utils.OnDemand (OnDemand) +import Test.Utils.SQL hiding (Connection) +import qualified Test.Utils.SQL as TS + +testFileConnector :: OnDemand FileConnectorInfo -> Spec +testFileConnector od = + describe "FileConnector" $ do + testGenericSQL @(EventsTable FileConnector) od withConnection mkFileConnector () + +type TableName = String + +data FileConnectorInfo = FileConnectorInfo + { c_directory :: FilePath + , c_partitioningField :: Text + , c_incrementingField :: Text + , c_maxIds :: MVar (Map TableName Int) + } + +withFileConnectorInfo :: (ConnectInfo -> IO a) -> IO a +withFileConnectorInfo f = + withSystemTempFile "file-db-xxx" $ \path _ -> do + var <- newMVar mempty + f $ FileConnectorInfo path "aggregate_id" "id" var + +type ConnectInfo = FileConnectorInfo +type Connection = FileConnectorInfo + +withConnection :: ConnectInfo -> (Connection -> IO a) -> IO a +withConnection cinfo f = f cinfo + +mkFileConnector :: Table t => Connection -> t -> FileConnector +mkFileConnector FileConnectorInfo{..} table = + FileConnector + { c_path = c_directory tableName table + , c_partitioningField = c_partitioningField + , c_incrementingField = c_incrementingField + } + +instance Table (EventsTable FileConnector) where + type Entry (EventsTable FileConnector) = Event + type Config (EventsTable FileConnector) = () + type Connection (EventsTable FileConnector) = FileConnectorInfo + tableName (EventsTable name) = name + tableCols _ = ["id", "aggregate_id", "sequence_number"] + withTable () conn f = do + name <- mkTableName + let table = EventsTable name + connector = mkFileConnector conn table + withFile (c_path connector) ReadMode $ \_ -> f table + mocks _ = + [ [ Event err agg_id seq_id | seq_id <- [0..] ] + | agg_id <- [0..] + ] + where err = error "aggregate id is determined by mysql" + insert conn table events = + forM_ events $ \(Event _ agg_id seq_id) -> do + modifyMVar (c_maxIds conn) $ \ids -> do + let nxt = Map.findWithDefault 0 tname ids + encoded = Json.encode $ Event nxt agg_id seq_id + Char8.appendFile (c_path connector) (encoded <> "\n") + return (Map.insert tname (nxt + 1) ids, nxt) + where + tname = tableName table + connector = mkFileConnector conn table + + selectAll conn table = do + bs <- Char8.readFile (c_path connector) + forM (Char8.lines bs) $ \line -> + case Json.eitherDecode' line of + Left err -> error $ "unable to decode file entry: " <> err + Right v -> return v + where + connector = mkFileConnector conn table + + + + diff --git a/tests/Test/Utils/SQL.hs b/tests/Test/Utils/SQL.hs index 9e6a1a4..04b738c 100644 --- a/tests/Test/Utils/SQL.hs +++ b/tests/Test/Utils/SQL.hs @@ -20,7 +20,7 @@ import Control.Concurrent.Async (mapConcurrently_) import Control.Exception (throwIO, ErrorCall(..)) import Control.Monad (replicateM, forM_) import qualified Data.Aeson as Aeson -import Data.Aeson (FromJSON) +import Data.Aeson (FromJSON, ToJSON) import qualified Data.ByteString.Lazy as LB import Data.Default (Default, def) import Data.List (sort, stripPrefix) @@ -64,13 +64,17 @@ data Event = Event } deriving (Eq, Show, Generic) +instance ToJSON Event where + toJSON = Aeson.genericToJSON eventJSONOptions + instance FromJSON Event where - parseJSON = Aeson.genericParseJSON opt - where - opt = Aeson.defaultOptions - { Aeson.fieldLabelModifier = \label -> - fromMaybe label (stripPrefix "e_" label) - } + parseJSON = Aeson.genericParseJSON eventJSONOptions + +eventJSONOptions :: Aeson.Options +eventJSONOptions = Aeson.defaultOptions + { Aeson.fieldLabelModifier = \label -> + fromMaybe label (stripPrefix "e_" label) + } newtype EventsTable a = EventsTable String diff --git a/tests/Tests.hs b/tests/Tests.hs index 7da5cd8..42074a9 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -21,7 +21,7 @@ Example: match on test description -} main :: IO () main = - withDatabases $ \dbs@(Databases pcreds _ _ ) -> + withDatabases $ \dbs@(Databases pcreds _ _ _) -> hspec $ parallel $ do -- unit tests use the projector library testOnDemand From 207214f73b165f22cd0a1b67eef25071013844d0 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 13:26:33 +0000 Subject: [PATCH 4/9] Make testGenericSQL more generic --- tests/Test/Connector.hs | 10 +- tests/Test/Connector/File.hs | 101 ++++++++++----------- tests/Test/Connector/MicrosoftSQLServer.hs | 16 +++- tests/Test/Connector/MySQL.hs | 17 +++- tests/Test/Utils/SQL.hs | 16 ++-- tests/Tests.hs | 2 +- 6 files changed, 84 insertions(+), 78 deletions(-) diff --git a/tests/Test/Connector.hs b/tests/Test/Connector.hs index 01ab846..379fedc 100644 --- a/tests/Test/Connector.hs +++ b/tests/Test/Connector.hs @@ -11,7 +11,7 @@ import Test.Hspec , shouldBe ) import Ambar.Emulator.Connector.Poll (Boundaries(..), mark, boundaries, cleanup) -import Test.Connector.File (FileConnectorInfo, testFileConnector, withFileConnectorInfo) +import Test.Connector.File (testFileConnector) import Test.Connector.PostgreSQL (PostgresCreds, testPostgreSQL, withPostgreSQL) import Test.Connector.MySQL (MySQLCreds, testMySQL, withMySQL) import Test.Connector.MicrosoftSQLServer (MicrosoftSQLServerCreds, testMicrosoftSQLServer, withMicrosoftSQLServer) @@ -23,21 +23,19 @@ data Databases = Databases (OnDemand PostgresCreds) (OnDemand MySQLCreds) (OnDemand MicrosoftSQLServerCreds) - (OnDemand FileConnectorInfo) withDatabases :: (Databases -> IO a) -> IO a withDatabases f = OnDemand.withLazy withPostgreSQL $ \pcreds -> OnDemand.withLazy withMySQL $ \mcreds -> OnDemand.withLazy withMicrosoftSQLServer $ \screds -> - OnDemand.withLazy withFileConnectorInfo $ \finfo -> - f (Databases pcreds mcreds screds finfo) + f (Databases pcreds mcreds screds) testConnectors :: Databases -> Spec -testConnectors (Databases pcreds mcreds screds finfo) = do +testConnectors (Databases pcreds mcreds screds) = do describe "connector" $ do testPollingConnector - testFileConnector finfo + testFileConnector testPostgreSQL pcreds testMySQL mcreds testMicrosoftSQLServer screds diff --git a/tests/Test/Connector/File.hs b/tests/Test/Connector/File.hs index ecfc62c..59e12f1 100644 --- a/tests/Test/Connector/File.hs +++ b/tests/Test/Connector/File.hs @@ -1,97 +1,88 @@ {-# OPTIONS_GHC -Wno-orphans #-} module Test.Connector.File ( testFileConnector - , withFileConnectorInfo - , FileConnectorInfo ) where import Control.Concurrent (MVar, newMVar, modifyMVar) import Data.ByteString.Lazy.Char8 as Char8 import qualified Data.Aeson as Json -import Data.Map (Map) -import qualified Data.Map as Map +import Data.Default (def) import Control.Monad (forM_, forM) -import Data.Text (Text) -import System.FilePath (()) -import System.IO (withFile, IOMode(..)) +import System.IO (withFile, IOMode(..), hClose) import System.IO.Temp (withSystemTempFile) import Test.Hspec (Spec, describe) +import qualified Ambar.Emulator.Queue.Topic as Topic +import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) +import Ambar.Emulator.Connector (partitioner, encoder, connect) import Ambar.Emulator.Connector.File (FileConnector(..)) -import Test.Utils.OnDemand (OnDemand) -import Test.Utils.SQL hiding (Connection) -import qualified Test.Utils.SQL as TS -testFileConnector :: OnDemand FileConnectorInfo -> Spec -testFileConnector od = - describe "FileConnector" $ do - testGenericSQL @(EventsTable FileConnector) od withConnection mkFileConnector () - -type TableName = String +import Test.Queue (withFileTopic) +import Utils.Logger (plainLogger, Severity(..)) +import Test.Utils.SQL -data FileConnectorInfo = FileConnectorInfo - { c_directory :: FilePath - , c_partitioningField :: Text - , c_incrementingField :: Text - , c_maxIds :: MVar (Map TableName Int) +testFileConnector :: Spec +testFileConnector = + describe "FileConnector" $ do + testGenericSQL with + +with + :: PartitionCount + -> ( FileConnection + -> EventsTable FileConnector + -> Topic + -> (forall b. IO b -> IO b) + -> IO a + ) + -> IO a +with partitions f = + withSystemTempFile "file-db" $ \path h -> do + hClose h + var <- newMVar 0 + let connector = FileConnector path "aggregate_id" "sequence_number" + conn = FileConnection connector var + withFileTopic partitions $ \topic -> -- create topic + Topic.withProducer topic partitioner encoder $ \producer -> -- create topic producer + withTable () conn $ \table -> do + let logger = plainLogger Warn + connected :: forall a. IO a -> IO a + connected act = connect connector logger def producer (const act) -- setup connector + f conn table topic connected + +data FileConnection = FileConnection + { _connector :: FileConnector + , _maxId :: MVar Int } -withFileConnectorInfo :: (ConnectInfo -> IO a) -> IO a -withFileConnectorInfo f = - withSystemTempFile "file-db-xxx" $ \path _ -> do - var <- newMVar mempty - f $ FileConnectorInfo path "aggregate_id" "id" var - -type ConnectInfo = FileConnectorInfo -type Connection = FileConnectorInfo - -withConnection :: ConnectInfo -> (Connection -> IO a) -> IO a -withConnection cinfo f = f cinfo - -mkFileConnector :: Table t => Connection -> t -> FileConnector -mkFileConnector FileConnectorInfo{..} table = - FileConnector - { c_path = c_directory tableName table - , c_partitioningField = c_partitioningField - , c_incrementingField = c_incrementingField - } - instance Table (EventsTable FileConnector) where type Entry (EventsTable FileConnector) = Event type Config (EventsTable FileConnector) = () - type Connection (EventsTable FileConnector) = FileConnectorInfo + type Connection (EventsTable FileConnector) = FileConnection tableName (EventsTable name) = name tableCols _ = ["id", "aggregate_id", "sequence_number"] - withTable () conn f = do + withTable () (FileConnection connector _) f = do name <- mkTableName let table = EventsTable name - connector = mkFileConnector conn table withFile (c_path connector) ReadMode $ \_ -> f table mocks _ = [ [ Event err agg_id seq_id | seq_id <- [0..] ] | agg_id <- [0..] ] where err = error "aggregate id is determined by mysql" - insert conn table events = + insert (FileConnection connector varMaxId) _ events = forM_ events $ \(Event _ agg_id seq_id) -> do - modifyMVar (c_maxIds conn) $ \ids -> do - let nxt = Map.findWithDefault 0 tname ids - encoded = Json.encode $ Event nxt agg_id seq_id + modifyMVar varMaxId $ \nxt -> do + let encoded = Json.encode $ Event nxt agg_id seq_id Char8.appendFile (c_path connector) (encoded <> "\n") - return (Map.insert tname (nxt + 1) ids, nxt) - where - tname = tableName table - connector = mkFileConnector conn table + return (nxt + 1, nxt) - selectAll conn table = do + selectAll (FileConnection connector _) _ = do bs <- Char8.readFile (c_path connector) forM (Char8.lines bs) $ \line -> case Json.eitherDecode' line of Left err -> error $ "unable to decode file entry: " <> err Right v -> return v - where - connector = mkFileConnector conn table diff --git a/tests/Test/Connector/MicrosoftSQLServer.hs b/tests/Test/Connector/MicrosoftSQLServer.hs index ba2d1ff..c8d318a 100644 --- a/tests/Test/Connector/MicrosoftSQLServer.hs +++ b/tests/Test/Connector/MicrosoftSQLServer.hs @@ -21,7 +21,7 @@ import Test.Hspec ) import qualified Ambar.Emulator.Queue.Topic as Topic -import Ambar.Emulator.Queue.Topic (PartitionCount(..)) +import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..)) import Ambar.Record (Bytes(..)) import Database.MicrosoftSQLServer as S @@ -46,7 +46,7 @@ import Utils.Delay (deadline, seconds) testMicrosoftSQLServer :: OnDemand MicrosoftSQLServerCreds -> Spec testMicrosoftSQLServer od = do describe "MicrosoftSQLServer" $ do - testGenericSQL @(EventsTable SQLServer) od withConnection mkConnector () + testGenericSQL with describe "decodes" $ do -- Integers supported "TINYINT" (1 :: Int) @@ -103,9 +103,17 @@ testMicrosoftSQLServer od = do -- unsupported "CLR UDT" _NULL -- unsupported "IMAGE" _NULL -- unsupported "SQL_VARIANT" _NULL - - where + with + :: PartitionCount + -> ( S.Connection + -> EventsTable SQLServer + -> Topic + -> (forall b. IO b -> IO b) + -> IO a ) + -> IO a + with = withConnector od withConnection mkConnector () + _NULL :: Maybe String _NULL = Nothing diff --git a/tests/Test/Connector/MySQL.hs b/tests/Test/Connector/MySQL.hs index bccacd7..c198120 100644 --- a/tests/Test/Connector/MySQL.hs +++ b/tests/Test/Connector/MySQL.hs @@ -41,6 +41,7 @@ import Database.MySQL , withConnection , defaultConnectionInfo ) +import Ambar.Emulator.Queue.Topic (Topic) import qualified Ambar.Emulator.Queue.Topic as Topic import Ambar.Emulator.Queue.Topic (PartitionCount(..)) import Ambar.Record (Bytes(..)) @@ -53,9 +54,9 @@ import qualified Test.Utils.SQL as TS type MySQLCreds = ConnectionInfo testMySQL :: OnDemand MySQLCreds -> Spec -testMySQL c = do +testMySQL od = do describe "MySQL" $ do - testGenericSQL @(EventsTable MySQL) c withConnection mkMySQL () + testGenericSQL with -- Test that column types are supported/unsupported by -- creating database entries with the value and reporting @@ -163,6 +164,16 @@ testMySQL c = do describe "JSON" $ do supported "JSON" ("{\"a\": 1}" :: String) where + with + :: PartitionCount + -> ( Connection + -> EventsTable MySQL + -> Topic + -> (forall b. IO b -> IO b) + -> IO a ) + -> IO a + with = withConnector od withConnection mkMySQL () + _NULL :: Maybe String _NULL = Nothing @@ -181,7 +192,7 @@ testMySQL c = do -- Write a value of a given type to a database table, then read it from the Topic. roundTrip :: forall a. (FromField a, FromJSON a, ToField a, Show a, Eq a) => String -> a -> IO () roundTrip ty val = - withConnector @(TTable MySQL a) c withConnection mkMySQL (MySQLType ty) (PartitionCount 1) $ \conn table topic connected -> do + withConnector @(TTable MySQL a) od withConnection mkMySQL (MySQLType ty) (PartitionCount 1) $ \conn table topic connected -> do let record = TEntry 1 1 1 val insert conn table [record] connected $ deadline (seconds 1) $ do diff --git a/tests/Test/Utils/SQL.hs b/tests/Test/Utils/SQL.hs index 04b738c..ffd6554 100644 --- a/tests/Test/Utils/SQL.hs +++ b/tests/Test/Utils/SQL.hs @@ -79,13 +79,12 @@ eventJSONOptions = Aeson.defaultOptions newtype EventsTable a = EventsTable String testGenericSQL - :: (Table table, Connector connector, Default (ConnectorState connector)) - => OnDemand db - -> (forall x. db -> (Connection table -> IO x) -> IO x) - -> (db -> table -> connector) - -> Config table + :: Table table + => (forall a. PartitionCount + -> (Connection table -> table -> Topic -> (forall b. IO b -> IO b) -> IO a) + -> IO a) -> Spec -testGenericSQL od withConnection mkConfig conf = sequential $ do +testGenericSQL with = sequential $ do -- checks that our tests can connect to postgres it "connects" $ with (PartitionCount 1) $ \conn table _ _ -> do @@ -141,8 +140,6 @@ testGenericSQL od withConnection mkConfig conf = sequential $ do forM_ byAggregateId $ \(a_id, seqs) -> annotate ("ordered (" <> show a_id <> ")") $ sort seqs `shouldBe` seqs - where - with = withConnector od withConnection mkConfig conf -- | A generic consumer group group :: Topic.ConsumerGroupName @@ -167,7 +164,7 @@ withConnector -> (db -> table -> connector) -> Config table -> PartitionCount - -> (Connection table -> table -> Topic -> (IO b -> IO b) -> IO a) + -> (Connection table -> table -> Topic -> (forall b. IO b -> IO b) -> IO a) -> IO a withConnector od withConnection mkConfig conf partitions f = OnDemand.with od $ \db -> -- load db @@ -177,6 +174,7 @@ withConnector od withConnection mkConfig conf partitions f = Topic.withProducer topic partitioner encoder $ \producer -> do -- create topic producer let logger = plainLogger Warn config = mkConfig db table + connected :: forall x. IO x -> IO x connected act = connect config logger def producer (const act) -- setup connector f conn table topic connected diff --git a/tests/Tests.hs b/tests/Tests.hs index 42074a9..7da5cd8 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -21,7 +21,7 @@ Example: match on test description -} main :: IO () main = - withDatabases $ \dbs@(Databases pcreds _ _ _) -> + withDatabases $ \dbs@(Databases pcreds _ _ ) -> hspec $ parallel $ do -- unit tests use the projector library testOnDemand From 6065e749dcdc03b1dfc71dc31c66a75742b7a105 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 14:57:02 +0000 Subject: [PATCH 5/9] Perform generic tests on PostgreSQL --- tests/Test/Connector/File.hs | 48 ++--- tests/Test/Connector/MicrosoftSQLServer.hs | 2 +- tests/Test/Connector/MySQL.hs | 2 +- tests/Test/Connector/PostgreSQL.hs | 203 +++++---------------- tests/Test/Emulator.hs | 19 +- tests/Test/Utils/SQL.hs | 6 +- 6 files changed, 86 insertions(+), 194 deletions(-) diff --git a/tests/Test/Connector/File.hs b/tests/Test/Connector/File.hs index 59e12f1..22f18e4 100644 --- a/tests/Test/Connector/File.hs +++ b/tests/Test/Connector/File.hs @@ -25,30 +25,30 @@ import Test.Utils.SQL testFileConnector :: Spec testFileConnector = describe "FileConnector" $ do - testGenericSQL with - -with - :: PartitionCount - -> ( FileConnection - -> EventsTable FileConnector - -> Topic - -> (forall b. IO b -> IO b) - -> IO a - ) - -> IO a -with partitions f = - withSystemTempFile "file-db" $ \path h -> do - hClose h - var <- newMVar 0 - let connector = FileConnector path "aggregate_id" "sequence_number" - conn = FileConnection connector var - withFileTopic partitions $ \topic -> -- create topic - Topic.withProducer topic partitioner encoder $ \producer -> -- create topic producer - withTable () conn $ \table -> do - let logger = plainLogger Warn - connected :: forall a. IO a -> IO a - connected act = connect connector logger def producer (const act) -- setup connector - f conn table topic connected + testGenericSQL with_ + where + with_ + :: PartitionCount + -> ( FileConnection + -> EventsTable FileConnector + -> Topic + -> (IO b -> IO b) + -> IO a + ) + -> IO a + with_ partitions f = + withSystemTempFile "file-db" $ \path h -> do + hClose h + var <- newMVar 0 + let connector = FileConnector path "aggregate_id" "sequence_number" + conn = FileConnection connector var + withFileTopic partitions $ \topic -> -- create topic + Topic.withProducer topic partitioner encoder $ \producer -> -- create topic producer + withTable () conn $ \table -> do + let logger = plainLogger Warn + connected :: forall a. IO a -> IO a + connected act = connect connector logger def producer (const act) -- setup connector + f conn table topic connected data FileConnection = FileConnection { _connector :: FileConnector diff --git a/tests/Test/Connector/MicrosoftSQLServer.hs b/tests/Test/Connector/MicrosoftSQLServer.hs index c8d318a..cb81a95 100644 --- a/tests/Test/Connector/MicrosoftSQLServer.hs +++ b/tests/Test/Connector/MicrosoftSQLServer.hs @@ -109,7 +109,7 @@ testMicrosoftSQLServer od = do -> ( S.Connection -> EventsTable SQLServer -> Topic - -> (forall b. IO b -> IO b) + -> (IO b -> IO b) -> IO a ) -> IO a with = withConnector od withConnection mkConnector () diff --git a/tests/Test/Connector/MySQL.hs b/tests/Test/Connector/MySQL.hs index c198120..acf01c6 100644 --- a/tests/Test/Connector/MySQL.hs +++ b/tests/Test/Connector/MySQL.hs @@ -169,7 +169,7 @@ testMySQL od = do -> ( Connection -> EventsTable MySQL -> Topic - -> (forall b. IO b -> IO b) + -> (IO b -> IO b) -> IO a ) -> IO a with = withConnector od withConnection mkMySQL () diff --git a/tests/Test/Connector/PostgreSQL.hs b/tests/Test/Connector/PostgreSQL.hs index 0f0d94d..93bb20b 100644 --- a/tests/Test/Connector/PostgreSQL.hs +++ b/tests/Test/Connector/PostgreSQL.hs @@ -1,3 +1,4 @@ +{-# OPTIONS_GHC -Wno-orphans #-} module Test.Connector.PostgreSQL ( testPostgreSQL @@ -12,16 +13,11 @@ module Test.Connector.PostgreSQL where import Control.Concurrent (MVar, newMVar, modifyMVar) -import Control.Concurrent.Async (mapConcurrently) import Control.Exception (bracket, throwIO, ErrorCall(..), fromException) -import Control.Monad (void, replicateM, forM_) +import Control.Monad (void, forM_) import qualified Data.Aeson as Aeson import Data.Aeson (FromJSON) -import qualified Data.ByteString.Lazy as LB -import Data.Default (def) -import Data.List (isInfixOf, sort, stripPrefix) -import qualified Data.Map.Strict as Map -import Data.Maybe (fromMaybe) +import Data.List (isInfixOf) import Data.Scientific (Scientific) import Data.String (fromString) import qualified Data.Text as Text @@ -34,87 +30,29 @@ import Test.Hspec , shouldBe , shouldThrow ) -import Test.Hspec.Expectations.Contrib (annotate) import qualified Database.PostgreSQL.Simple as P import qualified Database.PostgreSQL.Simple.ToField as P -import GHC.Generics +import qualified Database.PostgreSQL.Simple.FromField as P hiding (Binary) import System.IO.Unsafe (unsafePerformIO) import System.Exit (ExitCode(..)) import System.Process (readProcessWithExitCode) import qualified Ambar.Emulator.Connector as Connector -import Ambar.Emulator.Connector (partitioner, encoder) import Ambar.Emulator.Connector.Postgres (PostgreSQL(..)) import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) import qualified Ambar.Emulator.Queue.Topic as Topic import Ambar.Record (Bytes(..)) -import Test.Queue (withFileTopic) import Test.Utils.Docker (DockerCommand(..), withDocker) import Test.Utils.OnDemand (OnDemand) +import Test.Utils.SQL import qualified Test.Utils.OnDemand as OnDemand -import Utils.Async (withAsyncThrow) import Utils.Delay (deadline, seconds, delay) -import Utils.Logger (plainLogger, Severity(..)) testPostgreSQL :: OnDemand PostgresCreds -> Spec testPostgreSQL p = do describe "PostgreSQL" $ do - -- checks that our tests can connect to postgres - it "connects" $ - with (PartitionCount 1) $ \conn table _ _ -> do - insert conn table (take 10 $ head mocks) - rs <- P.query_ @Event conn (fromString $ "SELECT * FROM " <> tableName table) - length rs `shouldBe` 10 - - it "retrieves all events already in the db" $ - with (PartitionCount 1) $ \conn table topic connected -> do - let count = 10 - insert conn table (take count $ head mocks) - connected $ - deadline (seconds 2) $ - Topic.withConsumer topic group $ \consumer -> do - es <- replicateM count $ readEntry @Event consumer - length es `shouldBe` count - - it "can retrieve a large number of events" $ - with (PartitionCount 1) $ \conn table topic connected -> do - let count = 10_000 - insert conn table (take count $ head mocks) - connected $ deadline (seconds 2) $ - Topic.withConsumer topic group $ \consumer -> do - es <- replicateM count $ readEntry @Event consumer - length es `shouldBe` count - - it "retrieves events added after initial snapshot" $ - with (PartitionCount 1) $ \conn table topic connected -> do - let count = 10 - write = insert conn table (take count $ head mocks) - connected $ deadline (seconds 1) $ - Topic.withConsumer topic group $ \consumer -> - withAsyncThrow write $ do - es <- replicateM count $ readEntry @Event consumer - length es `shouldBe` count - - it "maintains ordering through parallel writes" $ do - let partitions = 5 - with (PartitionCount partitions) $ \conn table topic connected -> do - let count = 1_000 - write = mapConcurrently id - [ insert conn table (take count $ mocks !! partition) - | partition <- [1..partitions] ] - connected $ deadline (seconds 1) $ - Topic.withConsumer topic group $ \consumer -> do - -- write and consume concurrently - withAsyncThrow write $ do - es <- replicateM (count * partitions) $ readEntry consumer - let byAggregateId = Map.toList $ Map.fromListWith (flip (++)) - [ (e_aggregate_id, [e_sequence_number]) - | (Event{..}, _) <- es - ] - forM_ byAggregateId $ \(a_id, seqs) -> - annotate ("ordered (" <> show a_id <> ")") $ - sort seqs `shouldBe` seqs + testGenericSQL with -- Test that column types are supported/unsupported by -- creating database entries with the value and reporting @@ -267,7 +205,15 @@ testPostgreSQL p = do unsupported "REGTYPE" ("integer" :: String) unsupported "PG_LSN" ("AAA/AAA" :: String) where - with = with_ () + with + :: PartitionCount + -> ( P.Connection + -> EventsTable PostgreSQL + -> Topic + -> (IO b -> IO b) + -> IO a ) + -> IO a + with = withConnector p withConnection mkPostgreSQL () withType ty definition act = OnDemand.with p $ \creds -> @@ -276,7 +222,7 @@ testPostgreSQL p = do destroy _ = P.execute_ conn $ "DROP TYPE " <> ty bracket create destroy (const act) - unsupported :: (FromJSON a, P.ToField a, Show a, Eq a) => String -> a -> Spec + unsupported :: ValidTy a => String -> a -> Spec unsupported ty val = it ("unsupported " <> ty) $ roundTrip ty val `shouldThrow` unsupportedType @@ -285,13 +231,13 @@ testPostgreSQL p = do | Just (Connector.UnsupportedType _) <- fromException e = True | otherwise = False - supported :: (FromJSON a, P.ToField a, Show a, Eq a) => String -> a -> Spec + supported :: ValidTy a => String -> a -> Spec supported ty val = it ty $ roundTrip ty val -- Write a value of a given type to a database table, then read it from the Topic. - roundTrip :: (FromJSON a, P.ToField a, Show a, Eq a) => String -> a -> IO () + roundTrip :: forall a. ValidTy a => String -> a -> IO () roundTrip ty val = - with_ (PostgresType ty) (PartitionCount 1) $ \conn table topic connected -> do + withConnector @(TTable PostgreSQL a) p withConnection mkPostgreSQL (PostgresType ty) (PartitionCount 1) $ \conn table topic connected -> do let record = TEntry 1 1 1 val insert conn table [record] connected $ deadline (seconds 1) $ do @@ -299,31 +245,9 @@ testPostgreSQL p = do (entry, _) <- readEntry consumer entry `shouldBe` record - with_ - :: Table t - => Config t - -> PartitionCount - -> (P.Connection -> t -> Topic -> (IO b -> IO b) -> IO a) - -> IO a - with_ conf partitions f = - OnDemand.with p $ \creds -> -- load db - withConnection creds $ \conn -> - withTable conf conn $ \table -> -- create events table - withFileTopic partitions $ \topic -> -- create topic - let config = mkPostgreSQL creds table in - Topic.withProducer topic partitioner encoder $ \producer -> do -- create topic producer - let logger = plainLogger Warn - connected act = -- setup connector - Connector.connect config logger def producer (const act) - f conn table topic connected - -readEntry :: Aeson.FromJSON a => Topic.Consumer -> IO (a, Topic.Meta) -readEntry consumer = do - result <- Topic.read consumer - (bs, meta) <- either (throwIO . ErrorCall . show) return result - case Aeson.eitherDecode $ LB.fromStrict bs of - Left err -> throwIO $ ErrorCall $ "decoding error: " <> show err - Right val -> return (val, meta) +type ValidTy a = (FromJSON a, P.FromField a, P.ToField a, Show a, Eq a) + + mkPostgreSQL :: Table t => PostgresCreds -> t -> PostgreSQL mkPostgreSQL PostgresCreds{..} table = PostgreSQL @@ -338,9 +262,6 @@ mkPostgreSQL PostgresCreds{..} table = PostgreSQL , c_serialColumn = "id" } -group :: Topic.ConsumerGroupName -group = Topic.ConsumerGroupName "test_group" - data PostgresCreds = PostgresCreds { p_database :: String , p_username :: String @@ -349,66 +270,25 @@ data PostgresCreds = PostgresCreds , p_port :: Word16 } -data Event = Event - { e_id :: Int - , e_aggregate_id :: Int - , e_sequence_number :: Int - } - deriving (Eq, Show, Generic, P.FromRow) - -instance Aeson.FromJSON Event where - parseJSON = Aeson.genericParseJSON opt - where - opt = Aeson.defaultOptions - { Aeson.fieldLabelModifier = \label -> - fromMaybe label (stripPrefix "e_" label) - } - -class Table a where - type Entry a = b | b -> a - type Config a = b | b -> a - withTable :: Config a -> P.Connection -> (a -> IO b) -> IO b - tableCols :: a -> [Text.Text] - tableName :: a -> String - -- Mock events to be added to the database. - -- Each sublist is an infinite list of events for the same aggregate. - mocks :: [[Entry a]] - insert :: P.Connection -> a -> [Entry a] -> IO () - -data TTable a = TTable - { tt_name :: String - , _tt_tyName :: String - } - -data TEntry a = TEntry - { te_id :: Int - , te_aggregate_id :: Int - , te_sequence_number :: Int - , te_value :: a - } - deriving (Eq, Show, Generic, Functor) - -instance FromJSON a => FromJSON (TEntry a) where - parseJSON = Aeson.genericParseJSON opt - where - opt = Aeson.defaultOptions - { Aeson.fieldLabelModifier = \label -> - fromMaybe label (stripPrefix "te_" label) - } - newtype PostgresType a = PostgresType String -instance P.ToField a => Table (TTable a) where - type Entry (TTable a) = TEntry a - type Config (TTable a) = PostgresType a +instance P.FromField a => P.FromRow (TEntry a) +instance P.FromRow Event + +instance (P.FromField a, P.ToField a) => Table (TTable PostgreSQL a) where + type Entry (TTable PostgreSQL a) = TEntry a + type Config (TTable PostgreSQL a) = PostgresType a + type Connection (TTable PostgreSQL a) = P.Connection tableName (TTable name _) = name tableCols _ = ["id", "aggregate_id", "sequence_number", "value"] mocks = error "no mocks for TTable" + selectAll conn table = + P.query_ @(TEntry a) conn (fromString $ "SELECT * FROM " <> tableName table) insert conn t entries = void $ P.executeMany conn query [(agg_id, seq_num, val) | TEntry _ agg_id seq_num val <- entries ] where query = fromString $ unwords - [ "INSERT INTO", tt_name t + [ "INSERT INTO", tableName t ,"(aggregate_id, sequence_number, value)" ,"VALUES (?, ?, ?)" ] @@ -425,20 +305,22 @@ instance P.ToField a => Table (TTable a) where , ")" ] -newtype EventsTable = EventsTable String - -instance Table EventsTable where - type (Entry EventsTable) = Event - type (Config EventsTable) = () +instance Table (EventsTable PostgreSQL) where + type (Entry (EventsTable PostgreSQL)) = Event + type (Config (EventsTable PostgreSQL)) = () + type (Connection (EventsTable PostgreSQL)) = P.Connection tableName (EventsTable name) = name tableCols _ = ["id", "aggregate_id", "sequence_number"] - mocks = + mocks _ = -- the aggregate_id is given when the records are inserted into the database [ [ Event err agg_id seq_id | seq_id <- [0..] ] | agg_id <- [0..] ] where err = error "aggregate id is determined by postgres" + selectAll conn table = + P.query_ @Event conn (fromString $ "SELECT * FROM " <> tableName table) + insert conn (EventsTable table) events = void $ P.executeMany conn query [(agg_id, seq_num) | Event _ agg_id seq_num <- events ] where @@ -460,7 +342,7 @@ instance Table EventsTable where , ")" ] -withEventsTable :: PostgresCreds -> (P.Connection -> EventsTable -> IO a) -> IO a +withEventsTable :: PostgresCreds -> (P.Connection -> EventsTable PostgreSQL -> IO a) -> IO a withEventsTable creds f = withConnection creds $ \conn -> withTable () conn $ \table -> @@ -479,6 +361,9 @@ newtype BytesRow = BytesRow Bytes instance P.ToField BytesRow where toField (BytesRow (Bytes bs)) = P.toField (P.Binary bs) +instance P.FromField BytesRow where + fromField = error "not used" + type Schema = String withConnection :: PostgresCreds -> (P.Connection -> IO a) -> IO a diff --git a/tests/Test/Emulator.hs b/tests/Test/Emulator.hs index 4856b37..697abb8 100644 --- a/tests/Test/Emulator.hs +++ b/tests/Test/Emulator.hs @@ -28,9 +28,11 @@ import Ambar.Emulator.Config , Source(..) ) import Ambar.Emulator.Projector (Message(..), Payload(..)) +import Ambar.Emulator.Connector.Postgres (PostgreSQL) import Test.Connector.PostgreSQL (PostgresCreds, Event(..), mocks) import qualified Test.Connector.PostgreSQL as C +import Test.Utils.SQL (EventsTable) import Test.Utils.OnDemand (OnDemand) import qualified Test.Utils.OnDemand as OnDemand import Utils.Async (withAsyncThrow) @@ -41,10 +43,10 @@ testEmulator :: OnDemand PostgresCreds -> Spec testEmulator p = describe "emulator" $ do it "retrieves data in a PostgreSQL db" $ withConfig $ \config -> - withPostgresSource $ \insert source -> do + withPostgresSource $ \table insert source -> do (out, dest) <- funDestination [source] let env = mkEnv [source] [dest] - events = addIds $ take 10 $ head mocks + events = addIds $ take 10 $ head (mocks table) insert events withAsyncThrow (emulate logger config env) $ deadline (seconds 5) $ do @@ -53,10 +55,10 @@ testEmulator p = describe "emulator" $ do it "resumes from last index" $ withConfig $ \config -> - withPostgresSource $ \insert source -> do + withPostgresSource $ \table insert source -> do (out, dest) <- funDestination [source] let env = mkEnv [source] [dest] - events = addIds $ take 10 $ head mocks + events = addIds $ take 10 $ head (mocks table) (before, after) = splitAt 5 events -- insert and consume 'before' @@ -118,7 +120,12 @@ testEmulator p = describe "emulator" $ do , c_dataPath = path } - withPostgresSource :: (([Event] -> IO ()) -> DataSource -> IO a) -> IO a + withPostgresSource :: + ( EventsTable PostgreSQL + -> ([Event] -> IO ()) + -> DataSource + -> IO a ) + -> IO a withPostgresSource f = OnDemand.with p $ \creds -> C.withEventsTable creds $ \conn table -> do @@ -129,7 +136,7 @@ testEmulator p = describe "emulator" $ do , s_source = SourcePostgreSQL config } insert events = C.insert conn table events - f insert source + f table insert source -- Add ids to the events we chose to use addIds :: [Event] -> [Event] diff --git a/tests/Test/Utils/SQL.hs b/tests/Test/Utils/SQL.hs index ffd6554..6a94bdf 100644 --- a/tests/Test/Utils/SQL.hs +++ b/tests/Test/Utils/SQL.hs @@ -80,8 +80,8 @@ newtype EventsTable a = EventsTable String testGenericSQL :: Table table - => (forall a. PartitionCount - -> (Connection table -> table -> Topic -> (forall b. IO b -> IO b) -> IO a) + => (forall a b. PartitionCount + -> (Connection table -> table -> Topic -> (IO b -> IO b) -> IO a) -> IO a) -> Spec testGenericSQL with = sequential $ do @@ -164,7 +164,7 @@ withConnector -> (db -> table -> connector) -> Config table -> PartitionCount - -> (Connection table -> table -> Topic -> (forall b. IO b -> IO b) -> IO a) + -> (Connection table -> table -> Topic -> (IO b -> IO b) -> IO a) -> IO a withConnector od withConnection mkConfig conf partitions f = OnDemand.with od $ \db -> -- load db From b7e7c3405bc85f177cd16d62ba3e571e5423c13e Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 16:19:40 +0000 Subject: [PATCH 6/9] make FileConnector able to write --- src/Ambar/Emulator.hs | 11 +- src/Ambar/Emulator/Config.hs | 11 +- src/Ambar/Emulator/Connector/File.hs | 154 ++++++++++++++++----- src/Ambar/Emulator/Projector.hs | 2 +- src/Ambar/Emulator/Queue/Partition/File.hs | 4 + tests/Test/Connector/File.hs | 22 +-- 6 files changed, 144 insertions(+), 60 deletions(-) diff --git a/src/Ambar/Emulator.hs b/src/Ambar/Emulator.hs index 07d4a83..7972a1f 100644 --- a/src/Ambar/Emulator.hs +++ b/src/Ambar/Emulator.hs @@ -16,7 +16,7 @@ import System.Directory (doesFileExist) import System.FilePath (()) import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder) -import Ambar.Emulator.Connector.File (FileConnectorState) +import Ambar.Emulator.Connector.File (FileConnectorState, mkFileConnector) import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState) import Ambar.Emulator.Connector.MySQL (MySQLState) import Ambar.Emulator.Connector.Postgres (PostgreSQLState) @@ -99,7 +99,7 @@ emulate logger_ config env = do SourcePostgreSQL _ -> StatePostgres def SourceMySQL _ -> StateMySQL def SourceSQLServer _ -> StateSQLServer def - SourceFile _ -> StateFile def + SourceFile{} -> StateFile def projectAll queue = forConcurrently_ (c_destinations env) (project queue) @@ -169,10 +169,11 @@ toConnectorConfig source sstate = StateSQLServer state -> return $ ConnectorConfig source sqlserver state StateSQLServer _ -> incompatible - SourceFile path -> + SourceFile path partitioningField incrementingField -> case sstate of - StateFile state -> - return $ ConnectorConfig source path state StateFile + StateFile state -> do + fileconn <- mkFileConnector path partitioningField incrementingField + return $ ConnectorConfig source fileconn state StateFile _ -> incompatible where incompatible = throwIO $ ErrorCall $ diff --git a/src/Ambar/Emulator/Config.hs b/src/Ambar/Emulator/Config.hs index ca62b9a..0adeafa 100644 --- a/src/Ambar/Emulator/Config.hs +++ b/src/Ambar/Emulator/Config.hs @@ -30,7 +30,6 @@ import qualified Data.Yaml as Yaml import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..)) import Ambar.Emulator.Connector.Postgres (PostgreSQL(..)) import Ambar.Emulator.Connector.MySQL (MySQL(..)) -import Ambar.Emulator.Connector.File (FileConnector(..)) import Ambar.Transport (SubmissionError) import Ambar.Transport.Http (Endpoint, User, Password) @@ -58,7 +57,7 @@ data DataSource = DataSource } data Source - = SourceFile FileConnector + = SourceFile { sf_path :: FilePath, sf_partitioningField :: Text, sf_incrementingField :: Text } | SourcePostgreSQL PostgreSQL | SourceMySQL MySQL | SourceSQLServer SQLServer @@ -151,10 +150,10 @@ instance FromJSON DataSource where return $ SourceSQLServer SQLServer{..} parseFile o = do - c_path <- o .: "path" - c_partitioningField <- o .: "partitioningField" - c_incrementingField <- o .: "incrementingField" - return $ SourceFile FileConnector{..} + sf_path <- o .: "path" + sf_partitioningField <- o .: "partitioningField" + sf_incrementingField <- o .: "incrementingField" + return $ SourceFile{..} parseDataDestination :: Map (Id DataSource) DataSource diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 3b6e166..9442bff 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -1,7 +1,10 @@ module Ambar.Emulator.Connector.File - ( FileConnector(..) - , FileConnectorState(..) + ( FileConnector + , FileConnectorState , FileRecord + , mkFileConnector + , write + , c_path ) where {-| File connector. @@ -9,8 +12,24 @@ Read JSON values from a file. One value per line. -} -import Control.Concurrent.STM (STM, newTVarIO, readTVar, readTVarIO, atomically, writeTVar) -import Control.Monad (forever) +import Control.Concurrent (MVar, newMVar, withMVar) +import Control.Concurrent.STM + ( STM + , TMVar + , TVar + , newTVarIO + , readTVar + , atomically + , writeTVar + , newTMVarIO + , modifyTVar + , retry + , takeTMVar + , writeTMVar + , putTMVar + ) +import Control.Exception (bracket) +import Control.Monad (forever, when) import qualified Data.Aeson as Json import qualified Data.Aeson.KeyMap as KeyMap import qualified Data.ByteString as BS @@ -25,10 +44,23 @@ import qualified Data.Text.Lazy.Encoding as LText import qualified Data.Text as Text import qualified Data.Text.Encoding as Text import GHC.Generics (Generic) -import System.IO (Handle, withFile, hFileSize, hSeek, IOMode(..), SeekMode(..)) +import GHC.IO.FD (FD) +import System.Directory (getFileSize) +import System.IO + ( Handle + , hSeek + , openFile + , hSeek + , IOMode(..) + , SeekMode(..) + ) import Prettyprinter ((<+>)) import qualified Ambar.Emulator.Connector as C +import Ambar.Emulator.Queue.Partition.File + ( openNonLockingWritableFD + , writeFD + ) import Ambar.Emulator.Queue.Topic (modPartitioner) import Ambar.Emulator.Queue.Topic (Producer) import qualified Ambar.Emulator.Queue.Topic as Topic @@ -45,11 +77,72 @@ data FileConnector = FileConnector { c_path :: FilePath , c_partitioningField :: Text , c_incrementingField :: Text + , c_state :: TVar FileConnectorState + , c_readHandle :: TMVar Handle + , c_writeHandle :: MVar FD + , c_getFileSize :: IO Integer } +-- | We don't close these file descriptors because we consider that +-- this is only used during tests. +mkFileConnector :: FilePath -> Text -> Text -> IO FileConnector +mkFileConnector path partitioningField incrementingField = do + size <- getFileSize path + varState <- newTVarIO (FileConnectorState size 0) + varWriteHandle <- do + fd <- openNonLockingWritableFD path + newMVar fd + varReadHandle <- do + readHandle <- openFile path ReadMode + newTMVarIO readHandle + return $ FileConnector + path + partitioningField + incrementingField + varState + varReadHandle + varWriteHandle + (getFileSize path) + +-- Does not work in the presence of external writers to the same file. +write :: FileConnector -> Json.Value -> IO () +write FileConnector{..} json = do + withMVar c_writeHandle $ \fd -> do + let entry = LB.toStrict (Json.encode json) <> "\n" + entrySize = fromIntegral (BS.length entry) + writeFD fd entry + atomically $ modifyTVar c_state $ \state -> + state { c_fileSize = c_fileSize state + entrySize } + +readNext :: SimpleLogger -> TVar FileConnectorState -> TMVar Handle -> IO Json.Value +readNext logger varState varReadHandle = + withReadLock $ \readHandle -> do + bs <- Char8.hGetLine readHandle + value <- case Json.eitherDecode $ LB.fromStrict bs of + Left e -> fatal logger $ unlines + [ "Unable to decode value from source:" + , show e + , Text.unpack $ Text.decodeUtf8 bs + ] + Right v -> return v + let entrySize = fromIntegral (BS.length bs) + atomically $ modifyTVar varState $ \state -> + state { c_offset = c_offset state + entrySize } + return value + where + withReadLock = bracket acquire release + where + acquire = atomically $ do + FileConnectorState size pos <- readTVar varState + when (pos == size) retry + takeTMVar varReadHandle + + release readHandle = atomically $ + writeTMVar varReadHandle readHandle + data FileConnectorState = FileConnectorState { c_fileSize :: Integer - , c_currentOffset :: Integer + , c_offset :: Integer } deriving (Show, Generic) deriving anyclass (Json.ToJSON, Json.FromJSON, Default) @@ -71,41 +164,28 @@ connect -> (STM FileConnectorState -> IO a) -> IO a connect conn@(FileConnector {..}) logger initState producer f = do - svar <- newTVarIO initState - let initialOffset = c_currentOffset initState - withFile c_path ReadMode $ \h -> do - hSeek h AbsoluteSeek initialOffset - withAsyncThrow (worker h svar) $ f (readTVar svar) + h <- atomically $ do + writeTVar c_state initState + takeTMVar c_readHandle + hSeek h AbsoluteSeek (c_offset initState) + atomically $ putTMVar c_readHandle h + withAsyncThrow updateFileSize $ + withAsyncThrow worker $ + f (readTVar c_state) where - worker h svar = forever $ do - FileConnectorState fsize0 offset <- readTVarIO svar - fsize <- waitForData h fsize0 offset - line <- Char8.hGetLine h - value <- case Json.eitherDecode $ LB.fromStrict line of - Left e -> fatal logger $ unlines - [ "Unable to decode value from source:" - , show e - , Text.unpack $ Text.decodeUtf8 line - ] - Right v -> return v + worker = forever $ do + value <- readNext logger c_state c_readHandle let record = FileRecord value Topic.write producer record logResult record - let offset' = offset + fromIntegral (BS.length line) - atomically $ writeTVar svar $ FileConnectorState fsize offset' - - waitForData :: Handle -> Integer -> Integer -> IO Integer - waitForData h fsize offset = - if fsize > offset - then return fsize - else do - fsize' <- hFileSize h - let stillNoData = fsize' == fsize - if stillNoData - then do - delay _POLLING_INTERVAL - waitForData h fsize offset - else return fsize' + + updateFileSize = forever $ do + delay _POLLING_INTERVAL + newSize <- c_getFileSize + atomically $ do + FileConnectorState fsize offset <- readTVar c_state + when (fsize > newSize) $ + writeTVar c_state $ FileConnectorState newSize offset logResult record = logInfo logger $ renderPretty $ diff --git a/src/Ambar/Emulator/Projector.hs b/src/Ambar/Emulator/Projector.hs index ef6c602..2123f99 100644 --- a/src/Ambar/Emulator/Projector.hs +++ b/src/Ambar/Emulator/Projector.hs @@ -107,7 +107,7 @@ relevantFields :: Source -> Payload -> Text relevantFields source (Payload value) = renderPretty $ withObject $ \o -> case source of - SourceFile _ -> prettyJSON value + SourceFile{} -> prettyJSON value SourceMySQL MySQL{..} -> fillSep $ [ pretty field <> ":" <+> prettyJSON v diff --git a/src/Ambar/Emulator/Queue/Partition/File.hs b/src/Ambar/Emulator/Queue/Partition/File.hs index 57bb7d4..eea36bf 100644 --- a/src/Ambar/Emulator/Queue/Partition/File.hs +++ b/src/Ambar/Emulator/Queue/Partition/File.hs @@ -6,6 +6,10 @@ module Ambar.Emulator.Queue.Partition.File , close , OpenError(..) , WriteError(..) + + , openNonLockingWritableFD + , closeNonLockingWritableFD + , writeFD ) where import Control.Concurrent (MVar, newMVar, withMVarMasked, modifyMVar_) diff --git a/tests/Test/Connector/File.hs b/tests/Test/Connector/File.hs index 22f18e4..5e08061 100644 --- a/tests/Test/Connector/File.hs +++ b/tests/Test/Connector/File.hs @@ -3,19 +3,19 @@ module Test.Connector.File ( testFileConnector ) where -import Control.Concurrent (MVar, newMVar, modifyMVar) +import Control.Concurrent (MVar, newMVar, modifyMVar, withMVar) import Data.ByteString.Lazy.Char8 as Char8 import qualified Data.Aeson as Json import Data.Default (def) import Control.Monad (forM_, forM) -import System.IO (withFile, IOMode(..), hClose) +import System.IO (hClose) import System.IO.Temp (withSystemTempFile) import Test.Hspec (Spec, describe) import qualified Ambar.Emulator.Queue.Topic as Topic import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) import Ambar.Emulator.Connector (partitioner, encoder, connect) -import Ambar.Emulator.Connector.File (FileConnector(..)) +import Ambar.Emulator.Connector.File (FileConnector, mkFileConnector, write, c_path) import Test.Queue (withFileTopic) @@ -39,9 +39,9 @@ testFileConnector = with_ partitions f = withSystemTempFile "file-db" $ \path h -> do hClose h + connector <- mkFileConnector path "aggregate_id" "sequence_number" var <- newMVar 0 - let connector = FileConnector path "aggregate_id" "sequence_number" - conn = FileConnection connector var + let conn = FileConnection connector var withFileTopic partitions $ \topic -> -- create topic Topic.withProducer topic partitioner encoder $ \producer -> -- create topic producer withTable () conn $ \table -> do @@ -52,7 +52,7 @@ testFileConnector = data FileConnection = FileConnection { _connector :: FileConnector - , _maxId :: MVar Int + , _maxId :: MVar Int -- max ID and write lock } instance Table (EventsTable FileConnector) where @@ -61,10 +61,10 @@ instance Table (EventsTable FileConnector) where type Connection (EventsTable FileConnector) = FileConnection tableName (EventsTable name) = name tableCols _ = ["id", "aggregate_id", "sequence_number"] - withTable () (FileConnection connector _) f = do + withTable _ _ f = do name <- mkTableName let table = EventsTable name - withFile (c_path connector) ReadMode $ \_ -> f table + f table mocks _ = [ [ Event err agg_id seq_id | seq_id <- [0..] ] | agg_id <- [0..] @@ -73,11 +73,11 @@ instance Table (EventsTable FileConnector) where insert (FileConnection connector varMaxId) _ events = forM_ events $ \(Event _ agg_id seq_id) -> do modifyMVar varMaxId $ \nxt -> do - let encoded = Json.encode $ Event nxt agg_id seq_id - Char8.appendFile (c_path connector) (encoded <> "\n") + write connector (Json.toJSON $ Event nxt agg_id seq_id) return (nxt + 1, nxt) - selectAll (FileConnection connector _) _ = do + selectAll (FileConnection connector var) _ = + withMVar var $ \_ -> do bs <- Char8.readFile (c_path connector) forM (Char8.lines bs) $ \line -> case Json.eitherDecode' line of From b7844f17ce7efef82b6e1374bfc16dc2f4635a6a Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 17:08:06 +0000 Subject: [PATCH 7/9] Fix file connector --- src/Ambar/Emulator/Connector/File.hs | 13 ++++++------- tests/Test/Connector/File.hs | 2 ++ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 9442bff..032f6c1 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -25,7 +25,6 @@ import Control.Concurrent.STM , modifyTVar , retry , takeTMVar - , writeTMVar , putTMVar ) import Control.Exception (bracket) @@ -125,7 +124,7 @@ readNext logger varState varReadHandle = , Text.unpack $ Text.decodeUtf8 bs ] Right v -> return v - let entrySize = fromIntegral (BS.length bs) + let entrySize = fromIntegral $ BS.length bs + BS.length "\n" atomically $ modifyTVar varState $ \state -> state { c_offset = c_offset state + entrySize } return value @@ -133,12 +132,12 @@ readNext logger varState varReadHandle = withReadLock = bracket acquire release where acquire = atomically $ do - FileConnectorState size pos <- readTVar varState - when (pos == size) retry + FileConnectorState fsize pos <- readTVar varState + when (fsize <= pos) retry takeTMVar varReadHandle release readHandle = atomically $ - writeTMVar varReadHandle readHandle + putTMVar varReadHandle readHandle data FileConnectorState = FileConnectorState { c_fileSize :: Integer @@ -180,11 +179,11 @@ connect conn@(FileConnector {..}) logger initState producer f = do logResult record updateFileSize = forever $ do - delay _POLLING_INTERVAL newSize <- c_getFileSize + delay _POLLING_INTERVAL -- also serves to wait until any writing finishes atomically $ do FileConnectorState fsize offset <- readTVar c_state - when (fsize > newSize) $ + when (fsize < newSize) $ writeTVar c_state $ FileConnectorState newSize offset logResult record = diff --git a/tests/Test/Connector/File.hs b/tests/Test/Connector/File.hs index 5e08061..8887da1 100644 --- a/tests/Test/Connector/File.hs +++ b/tests/Test/Connector/File.hs @@ -65,11 +65,13 @@ instance Table (EventsTable FileConnector) where name <- mkTableName let table = EventsTable name f table + mocks _ = [ [ Event err agg_id seq_id | seq_id <- [0..] ] | agg_id <- [0..] ] where err = error "aggregate id is determined by mysql" + insert (FileConnection connector varMaxId) _ events = forM_ events $ \(Event _ agg_id seq_id) -> do modifyMVar varMaxId $ \nxt -> do From e313f4c57772633235d4da601c52c1a5764a4f05 Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 18:10:50 +0000 Subject: [PATCH 8/9] Handle files updating outside of the RTS --- src/Ambar/Emulator/Connector/File.hs | 95 +++++++++++++++++----------- 1 file changed, 57 insertions(+), 38 deletions(-) diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 032f6c1..67a6fb5 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -50,6 +50,8 @@ import System.IO , hSeek , openFile , hSeek + , hIsEOF + , hClose , IOMode(..) , SeekMode(..) ) @@ -63,7 +65,6 @@ import Ambar.Emulator.Queue.Partition.File import Ambar.Emulator.Queue.Topic (modPartitioner) import Ambar.Emulator.Queue.Topic (Producer) import qualified Ambar.Emulator.Queue.Topic as Topic -import Ambar.Record (Value(..)) import Utils.Async (withAsyncThrow) import Utils.Logger (SimpleLogger, fatal, logInfo) import Utils.Delay (Duration, delay, millis) @@ -113,32 +114,6 @@ write FileConnector{..} json = do atomically $ modifyTVar c_state $ \state -> state { c_fileSize = c_fileSize state + entrySize } -readNext :: SimpleLogger -> TVar FileConnectorState -> TMVar Handle -> IO Json.Value -readNext logger varState varReadHandle = - withReadLock $ \readHandle -> do - bs <- Char8.hGetLine readHandle - value <- case Json.eitherDecode $ LB.fromStrict bs of - Left e -> fatal logger $ unlines - [ "Unable to decode value from source:" - , show e - , Text.unpack $ Text.decodeUtf8 bs - ] - Right v -> return v - let entrySize = fromIntegral $ BS.length bs + BS.length "\n" - atomically $ modifyTVar varState $ \state -> - state { c_offset = c_offset state + entrySize } - return value - where - withReadLock = bracket acquire release - where - acquire = atomically $ do - FileConnectorState fsize pos <- readTVar varState - when (fsize <= pos) retry - takeTMVar varReadHandle - - release readHandle = atomically $ - putTMVar varReadHandle readHandle - data FileConnectorState = FileConnectorState { c_fileSize :: Integer , c_offset :: Integer @@ -172,12 +147,6 @@ connect conn@(FileConnector {..}) logger initState producer f = do withAsyncThrow worker $ f (readTVar c_state) where - worker = forever $ do - value <- readNext logger c_state c_readHandle - let record = FileRecord value - Topic.write producer record - logResult record - updateFileSize = forever $ do newSize <- c_getFileSize delay _POLLING_INTERVAL -- also serves to wait until any writing finishes @@ -186,6 +155,12 @@ connect conn@(FileConnector {..}) logger initState producer f = do when (fsize < newSize) $ writeTVar c_state $ FileConnectorState newSize offset + worker = forever $ do + value <- readNext + let record = FileRecord value + Topic.write producer record + logResult record + logResult record = logInfo logger $ renderPretty $ "ingested." <+> commaSeparated @@ -193,20 +168,64 @@ connect conn@(FileConnector {..}) logger initState producer f = do , "partitioning_value:" <+> prettyJSON (partitioningValue conn record) ] -partitioningValue :: FileConnector -> FileRecord -> Value + -- | Blocks until there is something to read. + readNext :: IO Json.Value + readNext = + withReadLock $ \readHandle -> do + bs <- Char8.hGetLine readHandle + value <- case Json.eitherDecode $ LB.fromStrict bs of + Left e -> fatal logger $ unlines + [ "Unable to decode value from source:" + , show e + , Text.unpack $ Text.decodeUtf8 bs + ] + Right v -> return v + let entrySize = fromIntegral $ BS.length bs + BS.length "\n" + atomically $ modifyTVar c_state $ \state -> + state { c_offset = c_offset state + entrySize } + return value + + withReadLock :: (Handle -> IO a) -> IO a + withReadLock = bracket acquire release + where + acquire = do + -- wait till there is data to read and take the lock. + (h, offset) <- atomically $ do + FileConnectorState fsize offset <- readTVar c_state + when (fsize <= offset) retry + h <- takeTMVar c_readHandle + return (h, offset) + + -- For some reason, if the file we are reading is updated by an external + -- program (like the user manually adding an entry) the file reading library + -- don't detect that EOF has moved. In this case we have to close this handle + -- and open a new one. + eof <- hIsEOF h + if not eof + then return h + else do + hClose h + h' <- openFile c_path ReadMode + hSeek h' AbsoluteSeek offset + return h' + + release readHandle = atomically $ + putTMVar c_readHandle readHandle + + +partitioningValue :: FileConnector -> FileRecord -> Json.Value partitioningValue FileConnector{..} r = getField c_partitioningField r -incrementingValue :: FileConnector -> FileRecord -> Value +incrementingValue :: FileConnector -> FileRecord -> Json.Value incrementingValue FileConnector{..} r = getField c_incrementingField r -getField :: Text -> FileRecord -> Value +getField :: Text -> FileRecord -> Json.Value getField field (FileRecord json) = fromMaybe err $ do o <- getObject json let key = fromString $ Text.unpack field v <- KeyMap.lookup key o - let txt = jsonToTxt v - return $ Json txt v + return $ v where err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json From 34c182dcda3b628f0d91a77253d4bccdd1dffa2e Mon Sep 17 00:00:00 2001 From: Marcelo Lazaroni Date: Fri, 13 Dec 2024 18:18:53 +0000 Subject: [PATCH 9/9] Test file source config --- README.md | 12 ++++++++++- examples/config.yml | 12 ++++++++++- tests/Test/Config.hs | 48 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c673baf..60f1578 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ data_sources: partitioningColumn: aggregate_id # Connect to a MySQL database - - id: postgres_source + - id: mysql_source description: Main events store type: mysql host: localhost @@ -71,6 +71,16 @@ data_sources: autoIncrementingColumn: id partitioningColumn: aggregate_id + # Use a plain text file as a data source. + # Each line must be a valid JSON object. + # Values are projected as they are added. + - id: file_source + description: My file JSON event store + type: file + path: ./path/to/source.file + incrementingField: id + partitioningField: aggregate_id + # Connections to your endpoint. # The Emulator will send data read from the databases to these endpoints. data_destinations: diff --git a/examples/config.yml b/examples/config.yml index 2a45380..54eb049 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -21,7 +21,7 @@ data_sources: partitioningColumn: aggregate_id # Connect to a MySQL database - - id: postgres_source + - id: mysql_source description: Main events store type: mysql host: localhost @@ -56,6 +56,16 @@ data_sources: autoIncrementingColumn: id partitioningColumn: aggregate_id + # Use a plain text file as a data source. + # Each line must be a valid JSON object. + # Values are projected as they are added. + - id: file_source + description: My file JSON event store + type: file + path: ./path/to/source.file + incrementingField: id + partitioningField: aggregate_id + # Connections to your endpoint. # The Emulator will send data read from the databases to these endpoints. data_destinations: diff --git a/tests/Test/Config.hs b/tests/Test/Config.hs index a7c4dec..caa6a87 100644 --- a/tests/Test/Config.hs +++ b/tests/Test/Config.hs @@ -45,6 +45,42 @@ testConfig = do description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id + + - id: mysql_source + description: Main events store + type: mysql + host: localhost + port: 5432 + username: my_user + password: my_pass + database: my_db + table: events_table + columns: + - id + - aggregate_id + - sequence_number + - payload + autoIncrementingColumn: id + partitioningColumn: aggregate_id + + - id: sqlserver_source + description: Main events store + type: sqlserver + host: localhost + port: 1433 + username: my_user + password: my_pass + database: my_db + table: events_table + columns: + - id + - aggregate_id + - sequence_number + - payload + autoIncrementingColumn: id + partitioningColumn: aggregate_id data_destinations: - id: file_destination @@ -55,6 +91,8 @@ testConfig = do sources: - postgres_source - file_source + - mysql_source + - sqlserver_source - id: HTTP_destination description: my projection 2 @@ -66,8 +104,10 @@ testConfig = do sources: - postgres_source - file_source + - mysql_source + - sqlserver_source |] - annotate "source count" $ Map.size (c_sources config) `shouldBe` 2 + annotate "source count" $ Map.size (c_sources config) `shouldBe` 4 annotate "source count" $ Map.size (c_destinations config) `shouldBe` 2 it "detects duplicate sources" $ do @@ -77,11 +117,15 @@ testConfig = do description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id - id: source_1 description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id data_destinations: - id: file_destination @@ -100,6 +144,8 @@ testConfig = do description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id data_destinations: - id: dest_1