diff --git a/README.md b/README.md index c673baf..60f1578 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ data_sources: partitioningColumn: aggregate_id # Connect to a MySQL database - - id: postgres_source + - id: mysql_source description: Main events store type: mysql host: localhost @@ -71,6 +71,16 @@ data_sources: autoIncrementingColumn: id partitioningColumn: aggregate_id + # Use a plain text file as a data source. + # Each line must be a valid JSON object. + # Values are projected as they are added. + - id: file_source + description: My file JSON event store + type: file + path: ./path/to/source.file + incrementingField: id + partitioningField: aggregate_id + # Connections to your endpoint. # The Emulator will send data read from the databases to these endpoints. data_destinations: diff --git a/emulator.cabal b/emulator.cabal index 1a5758f..6ecd2e0 100644 --- a/emulator.cabal +++ b/emulator.cabal @@ -119,6 +119,7 @@ test-suite emulator-tests Test.Config Test.Queue Test.Connector + Test.Connector.File Test.Connector.MySQL Test.Connector.PostgreSQL Test.Connector.MicrosoftSQLServer diff --git a/examples/config.yml b/examples/config.yml index 2a45380..54eb049 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -21,7 +21,7 @@ data_sources: partitioningColumn: aggregate_id # Connect to a MySQL database - - id: postgres_source + - id: mysql_source description: Main events store type: mysql host: localhost @@ -56,6 +56,16 @@ data_sources: autoIncrementingColumn: id partitioningColumn: aggregate_id + # Use a plain text file as a data source. + # Each line must be a valid JSON object. + # Values are projected as they are added. + - id: file_source + description: My file JSON event store + type: file + path: ./path/to/source.file + incrementingField: id + partitioningField: aggregate_id + # Connections to your endpoint. # The Emulator will send data read from the databases to these endpoints. data_destinations: diff --git a/src/Ambar/Emulator.hs b/src/Ambar/Emulator.hs index c8fdf58..7972a1f 100644 --- a/src/Ambar/Emulator.hs +++ b/src/Ambar/Emulator.hs @@ -16,6 +16,7 @@ import System.Directory (doesFileExist) import System.FilePath (()) import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder) +import Ambar.Emulator.Connector.File (FileConnectorState, mkFileConnector) import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState) import Ambar.Emulator.Connector.MySQL (MySQLState) import Ambar.Emulator.Connector.Postgres (PostgreSQLState) @@ -98,7 +99,7 @@ emulate logger_ config env = do SourcePostgreSQL _ -> StatePostgres def SourceMySQL _ -> StateMySQL def SourceSQLServer _ -> StateSQLServer def - SourceFile _ -> StateFile () + SourceFile{} -> StateFile def projectAll queue = forConcurrently_ (c_destinations env) (project queue) @@ -137,7 +138,7 @@ data SavedState = StatePostgres PostgreSQLState | StateMySQL MySQLState | StateSQLServer SQLServerState - | StateFile () + | StateFile FileConnectorState deriving (Generic) deriving anyclass (ToJSON, FromJSON) @@ -168,10 +169,11 @@ toConnectorConfig source sstate = StateSQLServer state -> return $ ConnectorConfig source sqlserver state StateSQLServer _ -> incompatible - SourceFile path -> + SourceFile path partitioningField incrementingField -> case sstate of - StateFile () -> - return $ ConnectorConfig source path () StateFile + StateFile state -> do + fileconn <- mkFileConnector path partitioningField incrementingField + return $ ConnectorConfig source fileconn state StateFile _ -> incompatible where incompatible = throwIO $ ErrorCall $ diff --git a/src/Ambar/Emulator/Config.hs b/src/Ambar/Emulator/Config.hs index 7416672..0adeafa 100644 --- a/src/Ambar/Emulator/Config.hs +++ b/src/Ambar/Emulator/Config.hs @@ -30,7 +30,6 @@ import qualified Data.Yaml as Yaml import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..)) import Ambar.Emulator.Connector.Postgres (PostgreSQL(..)) import Ambar.Emulator.Connector.MySQL (MySQL(..)) -import Ambar.Emulator.Connector.File (FileConnector(..)) import Ambar.Transport (SubmissionError) import Ambar.Transport.Http (Endpoint, User, Password) @@ -58,7 +57,7 @@ data DataSource = DataSource } data Source - = SourceFile FileConnector + = SourceFile { sf_path :: FilePath, sf_partitioningField :: Text, sf_incrementingField :: Text } | SourcePostgreSQL PostgreSQL | SourceMySQL MySQL | SourceSQLServer SQLServer @@ -150,7 +149,11 @@ instance FromJSON DataSource where c_incrementingColumn <- o .: "autoIncrementingColumn" return $ SourceSQLServer SQLServer{..} - parseFile o = SourceFile . FileConnector <$> (o .: "path") + parseFile o = do + sf_path <- o .: "path" + sf_partitioningField <- o .: "partitioningField" + sf_incrementingField <- o .: "incrementingField" + return $ SourceFile{..} parseDataDestination :: Map (Id DataSource) DataSource diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 5b622c8..67a6fb5 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -1,47 +1,239 @@ module Ambar.Emulator.Connector.File - ( FileConnector(..) - ) where + ( FileConnector + , FileConnectorState + , FileRecord + , mkFileConnector + , write + , c_path + ) where {-| File connector. Read JSON values from a file. One value per line. -} +import Control.Concurrent (MVar, newMVar, withMVar) +import Control.Concurrent.STM + ( STM + , TMVar + , TVar + , newTVarIO + , readTVar + , atomically + , writeTVar + , newTMVarIO + , modifyTVar + , retry + , takeTMVar + , putTMVar + ) +import Control.Exception (bracket) +import Control.Monad (forever, when) import qualified Data.Aeson as Json -import Control.Monad (forM_) -import qualified Data.ByteString.Lazy.Char8 as Char8 +import qualified Data.Aeson.KeyMap as KeyMap +import qualified Data.ByteString as BS +import qualified Data.ByteString.Char8 as Char8 import qualified Data.ByteString.Lazy as LB -import qualified Data.Text.Lazy as Text -import qualified Data.Text.Lazy.Encoding as Text +import Data.Default (Default) +import Data.Maybe (fromMaybe) +import Data.String (IsString(fromString)) +import Data.Text (Text) +import qualified Data.Text.Lazy as LText +import qualified Data.Text.Lazy.Encoding as LText +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text +import GHC.Generics (Generic) +import GHC.IO.FD (FD) +import System.Directory (getFileSize) +import System.IO + ( Handle + , hSeek + , openFile + , hSeek + , hIsEOF + , hClose + , IOMode(..) + , SeekMode(..) + ) +import Prettyprinter ((<+>)) import qualified Ambar.Emulator.Connector as C +import Ambar.Emulator.Queue.Partition.File + ( openNonLockingWritableFD + , writeFD + ) import Ambar.Emulator.Queue.Topic (modPartitioner) +import Ambar.Emulator.Queue.Topic (Producer) import qualified Ambar.Emulator.Queue.Topic as Topic import Utils.Async (withAsyncThrow) -import Utils.Logger (fatal, logInfo) +import Utils.Logger (SimpleLogger, fatal, logInfo) +import Utils.Delay (Duration, delay, millis) +import Utils.Prettyprinter (prettyJSON, renderPretty, commaSeparated) -data FileConnector = FileConnector FilePath +_POLLING_INTERVAL :: Duration +_POLLING_INTERVAL = millis 50 + +data FileConnector = FileConnector + { c_path :: FilePath + , c_partitioningField :: Text + , c_incrementingField :: Text + , c_state :: TVar FileConnectorState + , c_readHandle :: TMVar Handle + , c_writeHandle :: MVar FD + , c_getFileSize :: IO Integer + } + +-- | We don't close these file descriptors because we consider that +-- this is only used during tests. +mkFileConnector :: FilePath -> Text -> Text -> IO FileConnector +mkFileConnector path partitioningField incrementingField = do + size <- getFileSize path + varState <- newTVarIO (FileConnectorState size 0) + varWriteHandle <- do + fd <- openNonLockingWritableFD path + newMVar fd + varReadHandle <- do + readHandle <- openFile path ReadMode + newTMVarIO readHandle + return $ FileConnector + path + partitioningField + incrementingField + varState + varReadHandle + varWriteHandle + (getFileSize path) + +-- Does not work in the presence of external writers to the same file. +write :: FileConnector -> Json.Value -> IO () +write FileConnector{..} json = do + withMVar c_writeHandle $ \fd -> do + let entry = LB.toStrict (Json.encode json) <> "\n" + entrySize = fromIntegral (BS.length entry) + writeFD fd entry + atomically $ modifyTVar c_state $ \state -> + state { c_fileSize = c_fileSize state + entrySize } + +data FileConnectorState = FileConnectorState + { c_fileSize :: Integer + , c_offset :: Integer + } + deriving (Show, Generic) + deriving anyclass (Json.ToJSON, Json.FromJSON, Default) newtype FileRecord = FileRecord Json.Value instance C.Connector FileConnector where - type ConnectorState FileConnector = () + type ConnectorState FileConnector = FileConnectorState type ConnectorRecord FileConnector = FileRecord partitioner = modPartitioner (const 1) encoder (FileRecord value) = LB.toStrict $ Json.encode value - connect (FileConnector path) logger () producer f = - withAsyncThrow worker $ f (return ()) + connect = connect + +connect + :: FileConnector + -> SimpleLogger + -> FileConnectorState + -> Producer (FileRecord) + -> (STM FileConnectorState -> IO a) + -> IO a +connect conn@(FileConnector {..}) logger initState producer f = do + h <- atomically $ do + writeTVar c_state initState + takeTMVar c_readHandle + hSeek h AbsoluteSeek (c_offset initState) + atomically $ putTMVar c_readHandle h + withAsyncThrow updateFileSize $ + withAsyncThrow worker $ + f (readTVar c_state) + where + updateFileSize = forever $ do + newSize <- c_getFileSize + delay _POLLING_INTERVAL -- also serves to wait until any writing finishes + atomically $ do + FileConnectorState fsize offset <- readTVar c_state + when (fsize < newSize) $ + writeTVar c_state $ FileConnectorState newSize offset + + worker = forever $ do + value <- readNext + let record = FileRecord value + Topic.write producer record + logResult record + + logResult record = + logInfo logger $ renderPretty $ + "ingested." <+> commaSeparated + [ "incrementing_value:" <+> prettyJSON (incrementingValue conn record) + , "partitioning_value:" <+> prettyJSON (partitioningValue conn record) + ] + + -- | Blocks until there is something to read. + readNext :: IO Json.Value + readNext = + withReadLock $ \readHandle -> do + bs <- Char8.hGetLine readHandle + value <- case Json.eitherDecode $ LB.fromStrict bs of + Left e -> fatal logger $ unlines + [ "Unable to decode value from source:" + , show e + , Text.unpack $ Text.decodeUtf8 bs + ] + Right v -> return v + let entrySize = fromIntegral $ BS.length bs + BS.length "\n" + atomically $ modifyTVar c_state $ \state -> + state { c_offset = c_offset state + entrySize } + return value + + withReadLock :: (Handle -> IO a) -> IO a + withReadLock = bracket acquire release where - worker = do - bs <- Char8.readFile path - forM_ (Char8.lines bs) $ \line -> do - value <- case Json.eitherDecode line of - Left e -> fatal logger $ unlines - [ "Unable to decode value from source:" - , show e - , Text.unpack $ Text.decodeUtf8 bs - ] - Right v -> return v - Topic.write producer (FileRecord value) - logInfo logger $ "ingested. " <> Text.decodeUtf8 line + acquire = do + -- wait till there is data to read and take the lock. + (h, offset) <- atomically $ do + FileConnectorState fsize offset <- readTVar c_state + when (fsize <= offset) retry + h <- takeTMVar c_readHandle + return (h, offset) + + -- For some reason, if the file we are reading is updated by an external + -- program (like the user manually adding an entry) the file reading library + -- don't detect that EOF has moved. In this case we have to close this handle + -- and open a new one. + eof <- hIsEOF h + if not eof + then return h + else do + hClose h + h' <- openFile c_path ReadMode + hSeek h' AbsoluteSeek offset + return h' + + release readHandle = atomically $ + putTMVar c_readHandle readHandle + + +partitioningValue :: FileConnector -> FileRecord -> Json.Value +partitioningValue FileConnector{..} r = getField c_partitioningField r + +incrementingValue :: FileConnector -> FileRecord -> Json.Value +incrementingValue FileConnector{..} r = getField c_incrementingField r + +getField :: Text -> FileRecord -> Json.Value +getField field (FileRecord json) = + fromMaybe err $ do + o <- getObject json + let key = fromString $ Text.unpack field + v <- KeyMap.lookup key o + return $ v + where + err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json + + jsonToTxt :: Json.Value -> Text + jsonToTxt = LText.toStrict . LText.decodeUtf8 . Json.encode + + getObject :: Json.Value -> Maybe Json.Object + getObject = \case + Json.Object o -> Just o + _ -> Nothing diff --git a/src/Ambar/Emulator/Projector.hs b/src/Ambar/Emulator/Projector.hs index ef6c602..2123f99 100644 --- a/src/Ambar/Emulator/Projector.hs +++ b/src/Ambar/Emulator/Projector.hs @@ -107,7 +107,7 @@ relevantFields :: Source -> Payload -> Text relevantFields source (Payload value) = renderPretty $ withObject $ \o -> case source of - SourceFile _ -> prettyJSON value + SourceFile{} -> prettyJSON value SourceMySQL MySQL{..} -> fillSep $ [ pretty field <> ":" <+> prettyJSON v diff --git a/src/Ambar/Emulator/Queue/Partition/File.hs b/src/Ambar/Emulator/Queue/Partition/File.hs index 57bb7d4..eea36bf 100644 --- a/src/Ambar/Emulator/Queue/Partition/File.hs +++ b/src/Ambar/Emulator/Queue/Partition/File.hs @@ -6,6 +6,10 @@ module Ambar.Emulator.Queue.Partition.File , close , OpenError(..) , WriteError(..) + + , openNonLockingWritableFD + , closeNonLockingWritableFD + , writeFD ) where import Control.Concurrent (MVar, newMVar, withMVarMasked, modifyMVar_) diff --git a/tests/Test/Config.hs b/tests/Test/Config.hs index a7c4dec..caa6a87 100644 --- a/tests/Test/Config.hs +++ b/tests/Test/Config.hs @@ -45,6 +45,42 @@ testConfig = do description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id + + - id: mysql_source + description: Main events store + type: mysql + host: localhost + port: 5432 + username: my_user + password: my_pass + database: my_db + table: events_table + columns: + - id + - aggregate_id + - sequence_number + - payload + autoIncrementingColumn: id + partitioningColumn: aggregate_id + + - id: sqlserver_source + description: Main events store + type: sqlserver + host: localhost + port: 1433 + username: my_user + password: my_pass + database: my_db + table: events_table + columns: + - id + - aggregate_id + - sequence_number + - payload + autoIncrementingColumn: id + partitioningColumn: aggregate_id data_destinations: - id: file_destination @@ -55,6 +91,8 @@ testConfig = do sources: - postgres_source - file_source + - mysql_source + - sqlserver_source - id: HTTP_destination description: my projection 2 @@ -66,8 +104,10 @@ testConfig = do sources: - postgres_source - file_source + - mysql_source + - sqlserver_source |] - annotate "source count" $ Map.size (c_sources config) `shouldBe` 2 + annotate "source count" $ Map.size (c_sources config) `shouldBe` 4 annotate "source count" $ Map.size (c_destinations config) `shouldBe` 2 it "detects duplicate sources" $ do @@ -77,11 +117,15 @@ testConfig = do description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id - id: source_1 description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id data_destinations: - id: file_destination @@ -100,6 +144,8 @@ testConfig = do description: The file source type: file path: ./source.txt + incrementingField: id + partitioningField: aggregate_id data_destinations: - id: dest_1 diff --git a/tests/Test/Connector.hs b/tests/Test/Connector.hs index 585f2c3..379fedc 100644 --- a/tests/Test/Connector.hs +++ b/tests/Test/Connector.hs @@ -11,6 +11,7 @@ import Test.Hspec , shouldBe ) import Ambar.Emulator.Connector.Poll (Boundaries(..), mark, boundaries, cleanup) +import Test.Connector.File (testFileConnector) import Test.Connector.PostgreSQL (PostgresCreds, testPostgreSQL, withPostgreSQL) import Test.Connector.MySQL (MySQLCreds, testMySQL, withMySQL) import Test.Connector.MicrosoftSQLServer (MicrosoftSQLServerCreds, testMicrosoftSQLServer, withMicrosoftSQLServer) @@ -34,6 +35,7 @@ testConnectors :: Databases -> Spec testConnectors (Databases pcreds mcreds screds) = do describe "connector" $ do testPollingConnector + testFileConnector testPostgreSQL pcreds testMySQL mcreds testMicrosoftSQLServer screds @@ -74,4 +76,3 @@ testPollingConnector = describe "Poll" $ it "cleanup doesn't remove ranges ending higher than time given" $ do (boundaries . cleanup 2 . bs) [1 ,2, 5, 3] `shouldBe` Boundaries [(1,3), (5,5)] - diff --git a/tests/Test/Connector/File.hs b/tests/Test/Connector/File.hs new file mode 100644 index 0000000..8887da1 --- /dev/null +++ b/tests/Test/Connector/File.hs @@ -0,0 +1,91 @@ +{-# OPTIONS_GHC -Wno-orphans #-} +module Test.Connector.File + ( testFileConnector + ) where + +import Control.Concurrent (MVar, newMVar, modifyMVar, withMVar) +import Data.ByteString.Lazy.Char8 as Char8 +import qualified Data.Aeson as Json +import Data.Default (def) +import Control.Monad (forM_, forM) +import System.IO (hClose) +import System.IO.Temp (withSystemTempFile) +import Test.Hspec (Spec, describe) + +import qualified Ambar.Emulator.Queue.Topic as Topic +import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) +import Ambar.Emulator.Connector (partitioner, encoder, connect) +import Ambar.Emulator.Connector.File (FileConnector, mkFileConnector, write, c_path) + + +import Test.Queue (withFileTopic) +import Utils.Logger (plainLogger, Severity(..)) +import Test.Utils.SQL + +testFileConnector :: Spec +testFileConnector = + describe "FileConnector" $ do + testGenericSQL with_ + where + with_ + :: PartitionCount + -> ( FileConnection + -> EventsTable FileConnector + -> Topic + -> (IO b -> IO b) + -> IO a + ) + -> IO a + with_ partitions f = + withSystemTempFile "file-db" $ \path h -> do + hClose h + connector <- mkFileConnector path "aggregate_id" "sequence_number" + var <- newMVar 0 + let conn = FileConnection connector var + withFileTopic partitions $ \topic -> -- create topic + Topic.withProducer topic partitioner encoder $ \producer -> -- create topic producer + withTable () conn $ \table -> do + let logger = plainLogger Warn + connected :: forall a. IO a -> IO a + connected act = connect connector logger def producer (const act) -- setup connector + f conn table topic connected + +data FileConnection = FileConnection + { _connector :: FileConnector + , _maxId :: MVar Int -- max ID and write lock + } + +instance Table (EventsTable FileConnector) where + type Entry (EventsTable FileConnector) = Event + type Config (EventsTable FileConnector) = () + type Connection (EventsTable FileConnector) = FileConnection + tableName (EventsTable name) = name + tableCols _ = ["id", "aggregate_id", "sequence_number"] + withTable _ _ f = do + name <- mkTableName + let table = EventsTable name + f table + + mocks _ = + [ [ Event err agg_id seq_id | seq_id <- [0..] ] + | agg_id <- [0..] + ] + where err = error "aggregate id is determined by mysql" + + insert (FileConnection connector varMaxId) _ events = + forM_ events $ \(Event _ agg_id seq_id) -> do + modifyMVar varMaxId $ \nxt -> do + write connector (Json.toJSON $ Event nxt agg_id seq_id) + return (nxt + 1, nxt) + + selectAll (FileConnection connector var) _ = + withMVar var $ \_ -> do + bs <- Char8.readFile (c_path connector) + forM (Char8.lines bs) $ \line -> + case Json.eitherDecode' line of + Left err -> error $ "unable to decode file entry: " <> err + Right v -> return v + + + + diff --git a/tests/Test/Connector/MicrosoftSQLServer.hs b/tests/Test/Connector/MicrosoftSQLServer.hs index ba2d1ff..cb81a95 100644 --- a/tests/Test/Connector/MicrosoftSQLServer.hs +++ b/tests/Test/Connector/MicrosoftSQLServer.hs @@ -21,7 +21,7 @@ import Test.Hspec ) import qualified Ambar.Emulator.Queue.Topic as Topic -import Ambar.Emulator.Queue.Topic (PartitionCount(..)) +import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..)) import Ambar.Record (Bytes(..)) import Database.MicrosoftSQLServer as S @@ -46,7 +46,7 @@ import Utils.Delay (deadline, seconds) testMicrosoftSQLServer :: OnDemand MicrosoftSQLServerCreds -> Spec testMicrosoftSQLServer od = do describe "MicrosoftSQLServer" $ do - testGenericSQL @(EventsTable SQLServer) od withConnection mkConnector () + testGenericSQL with describe "decodes" $ do -- Integers supported "TINYINT" (1 :: Int) @@ -103,9 +103,17 @@ testMicrosoftSQLServer od = do -- unsupported "CLR UDT" _NULL -- unsupported "IMAGE" _NULL -- unsupported "SQL_VARIANT" _NULL - - where + with + :: PartitionCount + -> ( S.Connection + -> EventsTable SQLServer + -> Topic + -> (IO b -> IO b) + -> IO a ) + -> IO a + with = withConnector od withConnection mkConnector () + _NULL :: Maybe String _NULL = Nothing diff --git a/tests/Test/Connector/MySQL.hs b/tests/Test/Connector/MySQL.hs index bccacd7..acf01c6 100644 --- a/tests/Test/Connector/MySQL.hs +++ b/tests/Test/Connector/MySQL.hs @@ -41,6 +41,7 @@ import Database.MySQL , withConnection , defaultConnectionInfo ) +import Ambar.Emulator.Queue.Topic (Topic) import qualified Ambar.Emulator.Queue.Topic as Topic import Ambar.Emulator.Queue.Topic (PartitionCount(..)) import Ambar.Record (Bytes(..)) @@ -53,9 +54,9 @@ import qualified Test.Utils.SQL as TS type MySQLCreds = ConnectionInfo testMySQL :: OnDemand MySQLCreds -> Spec -testMySQL c = do +testMySQL od = do describe "MySQL" $ do - testGenericSQL @(EventsTable MySQL) c withConnection mkMySQL () + testGenericSQL with -- Test that column types are supported/unsupported by -- creating database entries with the value and reporting @@ -163,6 +164,16 @@ testMySQL c = do describe "JSON" $ do supported "JSON" ("{\"a\": 1}" :: String) where + with + :: PartitionCount + -> ( Connection + -> EventsTable MySQL + -> Topic + -> (IO b -> IO b) + -> IO a ) + -> IO a + with = withConnector od withConnection mkMySQL () + _NULL :: Maybe String _NULL = Nothing @@ -181,7 +192,7 @@ testMySQL c = do -- Write a value of a given type to a database table, then read it from the Topic. roundTrip :: forall a. (FromField a, FromJSON a, ToField a, Show a, Eq a) => String -> a -> IO () roundTrip ty val = - withConnector @(TTable MySQL a) c withConnection mkMySQL (MySQLType ty) (PartitionCount 1) $ \conn table topic connected -> do + withConnector @(TTable MySQL a) od withConnection mkMySQL (MySQLType ty) (PartitionCount 1) $ \conn table topic connected -> do let record = TEntry 1 1 1 val insert conn table [record] connected $ deadline (seconds 1) $ do diff --git a/tests/Test/Connector/PostgreSQL.hs b/tests/Test/Connector/PostgreSQL.hs index 0f0d94d..93bb20b 100644 --- a/tests/Test/Connector/PostgreSQL.hs +++ b/tests/Test/Connector/PostgreSQL.hs @@ -1,3 +1,4 @@ +{-# OPTIONS_GHC -Wno-orphans #-} module Test.Connector.PostgreSQL ( testPostgreSQL @@ -12,16 +13,11 @@ module Test.Connector.PostgreSQL where import Control.Concurrent (MVar, newMVar, modifyMVar) -import Control.Concurrent.Async (mapConcurrently) import Control.Exception (bracket, throwIO, ErrorCall(..), fromException) -import Control.Monad (void, replicateM, forM_) +import Control.Monad (void, forM_) import qualified Data.Aeson as Aeson import Data.Aeson (FromJSON) -import qualified Data.ByteString.Lazy as LB -import Data.Default (def) -import Data.List (isInfixOf, sort, stripPrefix) -import qualified Data.Map.Strict as Map -import Data.Maybe (fromMaybe) +import Data.List (isInfixOf) import Data.Scientific (Scientific) import Data.String (fromString) import qualified Data.Text as Text @@ -34,87 +30,29 @@ import Test.Hspec , shouldBe , shouldThrow ) -import Test.Hspec.Expectations.Contrib (annotate) import qualified Database.PostgreSQL.Simple as P import qualified Database.PostgreSQL.Simple.ToField as P -import GHC.Generics +import qualified Database.PostgreSQL.Simple.FromField as P hiding (Binary) import System.IO.Unsafe (unsafePerformIO) import System.Exit (ExitCode(..)) import System.Process (readProcessWithExitCode) import qualified Ambar.Emulator.Connector as Connector -import Ambar.Emulator.Connector (partitioner, encoder) import Ambar.Emulator.Connector.Postgres (PostgreSQL(..)) import Ambar.Emulator.Queue.Topic (Topic, PartitionCount(..)) import qualified Ambar.Emulator.Queue.Topic as Topic import Ambar.Record (Bytes(..)) -import Test.Queue (withFileTopic) import Test.Utils.Docker (DockerCommand(..), withDocker) import Test.Utils.OnDemand (OnDemand) +import Test.Utils.SQL import qualified Test.Utils.OnDemand as OnDemand -import Utils.Async (withAsyncThrow) import Utils.Delay (deadline, seconds, delay) -import Utils.Logger (plainLogger, Severity(..)) testPostgreSQL :: OnDemand PostgresCreds -> Spec testPostgreSQL p = do describe "PostgreSQL" $ do - -- checks that our tests can connect to postgres - it "connects" $ - with (PartitionCount 1) $ \conn table _ _ -> do - insert conn table (take 10 $ head mocks) - rs <- P.query_ @Event conn (fromString $ "SELECT * FROM " <> tableName table) - length rs `shouldBe` 10 - - it "retrieves all events already in the db" $ - with (PartitionCount 1) $ \conn table topic connected -> do - let count = 10 - insert conn table (take count $ head mocks) - connected $ - deadline (seconds 2) $ - Topic.withConsumer topic group $ \consumer -> do - es <- replicateM count $ readEntry @Event consumer - length es `shouldBe` count - - it "can retrieve a large number of events" $ - with (PartitionCount 1) $ \conn table topic connected -> do - let count = 10_000 - insert conn table (take count $ head mocks) - connected $ deadline (seconds 2) $ - Topic.withConsumer topic group $ \consumer -> do - es <- replicateM count $ readEntry @Event consumer - length es `shouldBe` count - - it "retrieves events added after initial snapshot" $ - with (PartitionCount 1) $ \conn table topic connected -> do - let count = 10 - write = insert conn table (take count $ head mocks) - connected $ deadline (seconds 1) $ - Topic.withConsumer topic group $ \consumer -> - withAsyncThrow write $ do - es <- replicateM count $ readEntry @Event consumer - length es `shouldBe` count - - it "maintains ordering through parallel writes" $ do - let partitions = 5 - with (PartitionCount partitions) $ \conn table topic connected -> do - let count = 1_000 - write = mapConcurrently id - [ insert conn table (take count $ mocks !! partition) - | partition <- [1..partitions] ] - connected $ deadline (seconds 1) $ - Topic.withConsumer topic group $ \consumer -> do - -- write and consume concurrently - withAsyncThrow write $ do - es <- replicateM (count * partitions) $ readEntry consumer - let byAggregateId = Map.toList $ Map.fromListWith (flip (++)) - [ (e_aggregate_id, [e_sequence_number]) - | (Event{..}, _) <- es - ] - forM_ byAggregateId $ \(a_id, seqs) -> - annotate ("ordered (" <> show a_id <> ")") $ - sort seqs `shouldBe` seqs + testGenericSQL with -- Test that column types are supported/unsupported by -- creating database entries with the value and reporting @@ -267,7 +205,15 @@ testPostgreSQL p = do unsupported "REGTYPE" ("integer" :: String) unsupported "PG_LSN" ("AAA/AAA" :: String) where - with = with_ () + with + :: PartitionCount + -> ( P.Connection + -> EventsTable PostgreSQL + -> Topic + -> (IO b -> IO b) + -> IO a ) + -> IO a + with = withConnector p withConnection mkPostgreSQL () withType ty definition act = OnDemand.with p $ \creds -> @@ -276,7 +222,7 @@ testPostgreSQL p = do destroy _ = P.execute_ conn $ "DROP TYPE " <> ty bracket create destroy (const act) - unsupported :: (FromJSON a, P.ToField a, Show a, Eq a) => String -> a -> Spec + unsupported :: ValidTy a => String -> a -> Spec unsupported ty val = it ("unsupported " <> ty) $ roundTrip ty val `shouldThrow` unsupportedType @@ -285,13 +231,13 @@ testPostgreSQL p = do | Just (Connector.UnsupportedType _) <- fromException e = True | otherwise = False - supported :: (FromJSON a, P.ToField a, Show a, Eq a) => String -> a -> Spec + supported :: ValidTy a => String -> a -> Spec supported ty val = it ty $ roundTrip ty val -- Write a value of a given type to a database table, then read it from the Topic. - roundTrip :: (FromJSON a, P.ToField a, Show a, Eq a) => String -> a -> IO () + roundTrip :: forall a. ValidTy a => String -> a -> IO () roundTrip ty val = - with_ (PostgresType ty) (PartitionCount 1) $ \conn table topic connected -> do + withConnector @(TTable PostgreSQL a) p withConnection mkPostgreSQL (PostgresType ty) (PartitionCount 1) $ \conn table topic connected -> do let record = TEntry 1 1 1 val insert conn table [record] connected $ deadline (seconds 1) $ do @@ -299,31 +245,9 @@ testPostgreSQL p = do (entry, _) <- readEntry consumer entry `shouldBe` record - with_ - :: Table t - => Config t - -> PartitionCount - -> (P.Connection -> t -> Topic -> (IO b -> IO b) -> IO a) - -> IO a - with_ conf partitions f = - OnDemand.with p $ \creds -> -- load db - withConnection creds $ \conn -> - withTable conf conn $ \table -> -- create events table - withFileTopic partitions $ \topic -> -- create topic - let config = mkPostgreSQL creds table in - Topic.withProducer topic partitioner encoder $ \producer -> do -- create topic producer - let logger = plainLogger Warn - connected act = -- setup connector - Connector.connect config logger def producer (const act) - f conn table topic connected - -readEntry :: Aeson.FromJSON a => Topic.Consumer -> IO (a, Topic.Meta) -readEntry consumer = do - result <- Topic.read consumer - (bs, meta) <- either (throwIO . ErrorCall . show) return result - case Aeson.eitherDecode $ LB.fromStrict bs of - Left err -> throwIO $ ErrorCall $ "decoding error: " <> show err - Right val -> return (val, meta) +type ValidTy a = (FromJSON a, P.FromField a, P.ToField a, Show a, Eq a) + + mkPostgreSQL :: Table t => PostgresCreds -> t -> PostgreSQL mkPostgreSQL PostgresCreds{..} table = PostgreSQL @@ -338,9 +262,6 @@ mkPostgreSQL PostgresCreds{..} table = PostgreSQL , c_serialColumn = "id" } -group :: Topic.ConsumerGroupName -group = Topic.ConsumerGroupName "test_group" - data PostgresCreds = PostgresCreds { p_database :: String , p_username :: String @@ -349,66 +270,25 @@ data PostgresCreds = PostgresCreds , p_port :: Word16 } -data Event = Event - { e_id :: Int - , e_aggregate_id :: Int - , e_sequence_number :: Int - } - deriving (Eq, Show, Generic, P.FromRow) - -instance Aeson.FromJSON Event where - parseJSON = Aeson.genericParseJSON opt - where - opt = Aeson.defaultOptions - { Aeson.fieldLabelModifier = \label -> - fromMaybe label (stripPrefix "e_" label) - } - -class Table a where - type Entry a = b | b -> a - type Config a = b | b -> a - withTable :: Config a -> P.Connection -> (a -> IO b) -> IO b - tableCols :: a -> [Text.Text] - tableName :: a -> String - -- Mock events to be added to the database. - -- Each sublist is an infinite list of events for the same aggregate. - mocks :: [[Entry a]] - insert :: P.Connection -> a -> [Entry a] -> IO () - -data TTable a = TTable - { tt_name :: String - , _tt_tyName :: String - } - -data TEntry a = TEntry - { te_id :: Int - , te_aggregate_id :: Int - , te_sequence_number :: Int - , te_value :: a - } - deriving (Eq, Show, Generic, Functor) - -instance FromJSON a => FromJSON (TEntry a) where - parseJSON = Aeson.genericParseJSON opt - where - opt = Aeson.defaultOptions - { Aeson.fieldLabelModifier = \label -> - fromMaybe label (stripPrefix "te_" label) - } - newtype PostgresType a = PostgresType String -instance P.ToField a => Table (TTable a) where - type Entry (TTable a) = TEntry a - type Config (TTable a) = PostgresType a +instance P.FromField a => P.FromRow (TEntry a) +instance P.FromRow Event + +instance (P.FromField a, P.ToField a) => Table (TTable PostgreSQL a) where + type Entry (TTable PostgreSQL a) = TEntry a + type Config (TTable PostgreSQL a) = PostgresType a + type Connection (TTable PostgreSQL a) = P.Connection tableName (TTable name _) = name tableCols _ = ["id", "aggregate_id", "sequence_number", "value"] mocks = error "no mocks for TTable" + selectAll conn table = + P.query_ @(TEntry a) conn (fromString $ "SELECT * FROM " <> tableName table) insert conn t entries = void $ P.executeMany conn query [(agg_id, seq_num, val) | TEntry _ agg_id seq_num val <- entries ] where query = fromString $ unwords - [ "INSERT INTO", tt_name t + [ "INSERT INTO", tableName t ,"(aggregate_id, sequence_number, value)" ,"VALUES (?, ?, ?)" ] @@ -425,20 +305,22 @@ instance P.ToField a => Table (TTable a) where , ")" ] -newtype EventsTable = EventsTable String - -instance Table EventsTable where - type (Entry EventsTable) = Event - type (Config EventsTable) = () +instance Table (EventsTable PostgreSQL) where + type (Entry (EventsTable PostgreSQL)) = Event + type (Config (EventsTable PostgreSQL)) = () + type (Connection (EventsTable PostgreSQL)) = P.Connection tableName (EventsTable name) = name tableCols _ = ["id", "aggregate_id", "sequence_number"] - mocks = + mocks _ = -- the aggregate_id is given when the records are inserted into the database [ [ Event err agg_id seq_id | seq_id <- [0..] ] | agg_id <- [0..] ] where err = error "aggregate id is determined by postgres" + selectAll conn table = + P.query_ @Event conn (fromString $ "SELECT * FROM " <> tableName table) + insert conn (EventsTable table) events = void $ P.executeMany conn query [(agg_id, seq_num) | Event _ agg_id seq_num <- events ] where @@ -460,7 +342,7 @@ instance Table EventsTable where , ")" ] -withEventsTable :: PostgresCreds -> (P.Connection -> EventsTable -> IO a) -> IO a +withEventsTable :: PostgresCreds -> (P.Connection -> EventsTable PostgreSQL -> IO a) -> IO a withEventsTable creds f = withConnection creds $ \conn -> withTable () conn $ \table -> @@ -479,6 +361,9 @@ newtype BytesRow = BytesRow Bytes instance P.ToField BytesRow where toField (BytesRow (Bytes bs)) = P.toField (P.Binary bs) +instance P.FromField BytesRow where + fromField = error "not used" + type Schema = String withConnection :: PostgresCreds -> (P.Connection -> IO a) -> IO a diff --git a/tests/Test/Emulator.hs b/tests/Test/Emulator.hs index 4856b37..697abb8 100644 --- a/tests/Test/Emulator.hs +++ b/tests/Test/Emulator.hs @@ -28,9 +28,11 @@ import Ambar.Emulator.Config , Source(..) ) import Ambar.Emulator.Projector (Message(..), Payload(..)) +import Ambar.Emulator.Connector.Postgres (PostgreSQL) import Test.Connector.PostgreSQL (PostgresCreds, Event(..), mocks) import qualified Test.Connector.PostgreSQL as C +import Test.Utils.SQL (EventsTable) import Test.Utils.OnDemand (OnDemand) import qualified Test.Utils.OnDemand as OnDemand import Utils.Async (withAsyncThrow) @@ -41,10 +43,10 @@ testEmulator :: OnDemand PostgresCreds -> Spec testEmulator p = describe "emulator" $ do it "retrieves data in a PostgreSQL db" $ withConfig $ \config -> - withPostgresSource $ \insert source -> do + withPostgresSource $ \table insert source -> do (out, dest) <- funDestination [source] let env = mkEnv [source] [dest] - events = addIds $ take 10 $ head mocks + events = addIds $ take 10 $ head (mocks table) insert events withAsyncThrow (emulate logger config env) $ deadline (seconds 5) $ do @@ -53,10 +55,10 @@ testEmulator p = describe "emulator" $ do it "resumes from last index" $ withConfig $ \config -> - withPostgresSource $ \insert source -> do + withPostgresSource $ \table insert source -> do (out, dest) <- funDestination [source] let env = mkEnv [source] [dest] - events = addIds $ take 10 $ head mocks + events = addIds $ take 10 $ head (mocks table) (before, after) = splitAt 5 events -- insert and consume 'before' @@ -118,7 +120,12 @@ testEmulator p = describe "emulator" $ do , c_dataPath = path } - withPostgresSource :: (([Event] -> IO ()) -> DataSource -> IO a) -> IO a + withPostgresSource :: + ( EventsTable PostgreSQL + -> ([Event] -> IO ()) + -> DataSource + -> IO a ) + -> IO a withPostgresSource f = OnDemand.with p $ \creds -> C.withEventsTable creds $ \conn table -> do @@ -129,7 +136,7 @@ testEmulator p = describe "emulator" $ do , s_source = SourcePostgreSQL config } insert events = C.insert conn table events - f insert source + f table insert source -- Add ids to the events we chose to use addIds :: [Event] -> [Event] diff --git a/tests/Test/Utils/SQL.hs b/tests/Test/Utils/SQL.hs index 9e6a1a4..6a94bdf 100644 --- a/tests/Test/Utils/SQL.hs +++ b/tests/Test/Utils/SQL.hs @@ -20,7 +20,7 @@ import Control.Concurrent.Async (mapConcurrently_) import Control.Exception (throwIO, ErrorCall(..)) import Control.Monad (replicateM, forM_) import qualified Data.Aeson as Aeson -import Data.Aeson (FromJSON) +import Data.Aeson (FromJSON, ToJSON) import qualified Data.ByteString.Lazy as LB import Data.Default (Default, def) import Data.List (sort, stripPrefix) @@ -64,24 +64,27 @@ data Event = Event } deriving (Eq, Show, Generic) +instance ToJSON Event where + toJSON = Aeson.genericToJSON eventJSONOptions + instance FromJSON Event where - parseJSON = Aeson.genericParseJSON opt - where - opt = Aeson.defaultOptions - { Aeson.fieldLabelModifier = \label -> - fromMaybe label (stripPrefix "e_" label) - } + parseJSON = Aeson.genericParseJSON eventJSONOptions + +eventJSONOptions :: Aeson.Options +eventJSONOptions = Aeson.defaultOptions + { Aeson.fieldLabelModifier = \label -> + fromMaybe label (stripPrefix "e_" label) + } newtype EventsTable a = EventsTable String testGenericSQL - :: (Table table, Connector connector, Default (ConnectorState connector)) - => OnDemand db - -> (forall x. db -> (Connection table -> IO x) -> IO x) - -> (db -> table -> connector) - -> Config table + :: Table table + => (forall a b. PartitionCount + -> (Connection table -> table -> Topic -> (IO b -> IO b) -> IO a) + -> IO a) -> Spec -testGenericSQL od withConnection mkConfig conf = sequential $ do +testGenericSQL with = sequential $ do -- checks that our tests can connect to postgres it "connects" $ with (PartitionCount 1) $ \conn table _ _ -> do @@ -137,8 +140,6 @@ testGenericSQL od withConnection mkConfig conf = sequential $ do forM_ byAggregateId $ \(a_id, seqs) -> annotate ("ordered (" <> show a_id <> ")") $ sort seqs `shouldBe` seqs - where - with = withConnector od withConnection mkConfig conf -- | A generic consumer group group :: Topic.ConsumerGroupName @@ -173,6 +174,7 @@ withConnector od withConnection mkConfig conf partitions f = Topic.withProducer topic partitioner encoder $ \producer -> do -- create topic producer let logger = plainLogger Warn config = mkConfig db table + connected :: forall x. IO x -> IO x connected act = connect config logger def producer (const act) -- setup connector f conn table topic connected