-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Emulate
- Loading branch information
Showing
25 changed files
with
1,240 additions
and
305 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,45 @@ | ||
# Connections to your databases. | ||
# The Emulator will read data from those dbs. | ||
data_sources: | ||
- id: "postgres source" | ||
type: "postgres" | ||
host: "localhost" | ||
username: "temp" | ||
password: "some_pass" | ||
database: "db_name" | ||
table: "table_name" | ||
|
||
- id: "file source" | ||
type: "file" | ||
path: "./source.txt" | ||
- id: postgres_source | ||
description: Main events store | ||
type: postgres | ||
host: localhost | ||
port: 5432 | ||
username: my_user | ||
password: my_pass | ||
database: my_db | ||
table: events_table | ||
columns: | ||
- id | ||
- aggregate_id | ||
- sequence_number | ||
- payload | ||
serialColumn: id | ||
partitioningColumn: aggregate_id | ||
|
||
# Connections to your endpoint. | ||
# The Emulator will send data read from the databases to these endpoints. | ||
data_destinations: | ||
- id: "file destination" | ||
type: "file" | ||
path: "./temp.file" | ||
|
||
# Send data via HTTP | ||
- id: http_destination | ||
description: my projection 2 | ||
type: http-push | ||
endpoint: http://some.url.com:8080/my_projection | ||
username: name-of-user | ||
password: password123 | ||
|
||
sources: | ||
- id: "postgres source" | ||
- id: "file source" | ||
- postgres source | ||
- file source | ||
|
||
- id: "HTTP destination" | ||
type: "http-push" | ||
endpoint: http://some.url.com/one | ||
password: password123 | ||
port: 22 | ||
username: name-of-user | ||
# Send data to a file. One entry per line. | ||
- id: file_destination | ||
description: my projection 1 | ||
type: file | ||
path: ./temp.file | ||
|
||
sources: | ||
- id: "postgres source" | ||
- id: "file source" | ||
- postgres_source | ||
- file_source |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
module Ambar.Emulator where | ||
|
||
import Control.Concurrent.STM (atomically) | ||
import Control.Concurrent.Async (concurrently_, forConcurrently_, withAsync) | ||
import Control.Exception (finally, uninterruptibleMask_, throwIO, ErrorCall(..)) | ||
import Control.Monad (forM) | ||
import Data.Aeson (FromJSON, ToJSON) | ||
import qualified Data.Aeson as Aeson | ||
import Data.Default (def) | ||
import Data.Map (Map) | ||
import qualified Data.Map as Map | ||
import Data.Maybe (fromMaybe) | ||
import Foreign.Marshal.Utils (withMany) | ||
import GHC.Generics (Generic) | ||
import System.Directory (doesFileExist) | ||
import System.FilePath ((</>)) | ||
|
||
import qualified Ambar.Emulator.Connector.Postgres as Postgres | ||
import qualified Ambar.Emulator.Connector.File as FileConnector | ||
|
||
import qualified Ambar.Emulator.Projector as Projector | ||
import Ambar.Emulator.Projector (Projection(..)) | ||
import qualified Ambar.Transport.File as FileTransport | ||
import qualified Ambar.Transport.Http as HttpTransport | ||
import qualified Ambar.Emulator.Queue.Topic as Topic | ||
import Ambar.Emulator.Queue (TopicName(..)) | ||
import qualified Ambar.Emulator.Queue as Queue | ||
import Ambar.Emulator.Config | ||
( EmulatorConfig(..) | ||
, EnvironmentConfig(..) | ||
, Id(..) | ||
, DataSource(..) | ||
, Source(..) | ||
, DataDestination(..) | ||
, Destination(..) | ||
) | ||
import Utils.Logger (SimpleLogger, annotate) | ||
import Utils.Some (Some(..)) | ||
import Utils.Delay (every, seconds) | ||
|
||
data ConnectorState | ||
= StatePostgres Postgres.ConnectorState | ||
| StateFile () | ||
deriving (Generic) | ||
deriving anyclass (ToJSON, FromJSON) | ||
|
||
newtype EmulatorState = EmulatorState | ||
{ connectors :: Map (Id DataSource) ConnectorState | ||
} | ||
deriving (Generic) | ||
deriving anyclass (ToJSON, FromJSON) | ||
|
||
emulate :: SimpleLogger -> EmulatorConfig -> EnvironmentConfig -> IO () | ||
emulate logger config env = do | ||
Queue.withQueue queuePath pcount $ \queue -> | ||
concurrently_ (connectAll queue) (projectAll queue) | ||
where | ||
queuePath = c_dataPath config </> "queues" | ||
statePath = c_dataPath config </> "state.json" | ||
pcount = Topic.PartitionCount $ c_partitionsPerTopic config | ||
|
||
connectAll queue = do | ||
EmulatorState connectorStates <- load | ||
let getState source = | ||
fromMaybe (initialStateFor source) $ | ||
Map.lookup (s_id source) connectorStates | ||
|
||
sources = | ||
[ (source, getState source) | source <- Map.elems $ c_sources env ] | ||
|
||
withMany (connect queue) sources $ \svars -> | ||
every (seconds 30) (save svars) `finally` save svars | ||
|
||
load = do | ||
exists <- doesFileExist statePath | ||
if not exists | ||
then return (EmulatorState def) | ||
else do | ||
r <- Aeson.eitherDecodeFileStrict statePath | ||
case r of | ||
Right v -> return v | ||
Left err -> | ||
throwIO $ ErrorCall $ "Unable to decode emulator state: " <> show err | ||
|
||
save svars = | ||
uninterruptibleMask_ $ do | ||
-- reading is non-blocking so should be fine to run under uninterruptibleMask | ||
states <- forM svars $ \(sid, svar) -> (sid,) <$> atomically svar | ||
Aeson.encodeFile statePath $ EmulatorState (Map.fromList states) | ||
|
||
connect queue (source, sstate) f = do | ||
topic <- Queue.openTopic queue $ topicName $ s_id source | ||
case s_source source of | ||
SourcePostgreSQL pconfig -> do | ||
let logger' = annotate ("source: " <> unId (s_id source)) logger | ||
partitioner = Postgres.partitioner | ||
encoder = Postgres.encoder pconfig | ||
state <- case sstate of | ||
StatePostgres s -> return s | ||
_ -> throwIO $ ErrorCall $ | ||
"Incompatible state for source: " <> show (s_id source) | ||
Topic.withProducer topic partitioner encoder $ \producer -> | ||
Postgres.withConnector logger' state producer pconfig $ \stateVar -> | ||
f (s_id source, StatePostgres <$> stateVar) | ||
|
||
SourceFile path -> | ||
Topic.withProducer topic FileConnector.partitioner FileConnector.encoder $ \producer -> | ||
withAsync (FileConnector.connect logger producer path) $ \_ -> do | ||
f (s_id source, return $ StateFile ()) | ||
|
||
initialStateFor source = | ||
case s_source source of | ||
SourcePostgreSQL _ -> StatePostgres def | ||
SourceFile _ -> StateFile () | ||
|
||
projectAll queue = forConcurrently_ (c_destinations env) (project queue) | ||
|
||
project queue dest = | ||
withDestination dest $ \transport -> do | ||
sourceTopics <- forM (d_sources dest) $ \sid -> do | ||
topic <- Queue.openTopic queue (topicName sid) | ||
return (sid, topic) | ||
Projector.project logger Projection | ||
{ p_id = projectionId (d_id dest) | ||
, p_destination = d_id dest | ||
, p_sources = sourceTopics | ||
, p_transport = transport | ||
} | ||
|
||
withDestination dest act = | ||
case d_destination dest of | ||
DestinationFile path -> | ||
FileTransport.withFileTransport path (act . Some) | ||
DestinationHttp{..} -> do | ||
transport <- HttpTransport.new d_endpoint d_username d_password | ||
act (Some transport) | ||
DestinationFun f -> do | ||
act (Some f) | ||
|
||
topicName :: Id DataSource -> TopicName | ||
topicName sid = TopicName $ "t-" <> unId sid | ||
|
||
projectionId :: Id DataDestination -> Id Projection | ||
projectionId (Id dst) = Id ("p-" <> dst) |
Oops, something went wrong.