Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File Sink Client Typing #849

Merged
merged 28 commits into from
Aug 26, 2024
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
420648a
make file sink parts declare their type
michaeldjeffrey Jul 19, 2024
dc66e79
ingest: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
48f37f1
boost_manager: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
8d0399a
iot_packet_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
be57669
mobile_packet_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
91418ff
price: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
ea033cf
poc_entropy: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
ada8975
iot_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
edcb89c
mobile_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
8b1294c
Rename trait FileStoreAsBytes -> MsgBytes to follow convention for pr…
michaeldjeffrey Jul 22, 2024
990fd46
create file sinks directly from the message
michaeldjeffrey Jul 24, 2024
2b916a6
rename file sink trait to writer
michaeldjeffrey Jul 30, 2024
4e90746
file sink metric from static str to String
michaeldjeffrey Jul 30, 2024
ae81c48
ingest iot/mobile servers to use convenience file_sink method
michaeldjeffrey Jul 30, 2024
02f422c
iot_verifier to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
da58d2a
boost manager to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
94c5d28
mobile packet verifier use file_sink convenience methods
michaeldjeffrey Jul 30, 2024
61bd7a1
mobile_verifier to use file_sink convenience methods
michaeldjeffrey Jul 30, 2024
a18218c
poc_entropy to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
758bb18
price to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
8fbd2f5
move MsgBytes trait back out of FileSinkWrite
michaeldjeffrey Jul 30, 2024
e17ed62
autocommit=false is already the default
michaeldjeffrey Jul 30, 2024
a59351f
msg is no longer bytes in test
michaeldjeffrey Jul 30, 2024
9457e98
stick with single style of reference
michaeldjeffrey Jul 30, 2024
5e216b6
combine auto_commit and roll_time into 1 argument
michaeldjeffrey Jul 30, 2024
68d9a1c
fixup after rebase
michaeldjeffrey Aug 2, 2024
b5bbad4
fixup after rebase
michaeldjeffrey Aug 19, 2024
a5fe861
uncouple FileSinkWriteExt from FileType enum
michaeldjeffrey Aug 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
boost_manager: type all FileSinkClients
michaeldjeffrey committed Aug 19, 2024

Unverified

This user has not yet uploaded their public signing key.
commit 48f37f152e9517afc5b297b4f6f6b0bdf06bcae7
4 changes: 2 additions & 2 deletions boost_manager/src/watcher.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ const LAST_PROCESSED_TIMESTAMP_KEY: &str = "last_processed_hex_boosting_info";
pub struct Watcher<A> {
pub pool: Pool<Postgres>,
pub hex_boosting_client: A,
pub file_sink: FileSinkClient,
pub file_sink: FileSinkClient<BoostedHexUpdateProto>,
}

impl<A> ManagedTask for Watcher<A>
@@ -45,7 +45,7 @@ where
{
pub async fn new(
pool: Pool<Postgres>,
file_sink: FileSinkClient,
file_sink: FileSinkClient<BoostedHexUpdateProto>,
hex_boosting_client: A,
) -> Result<Self> {
Ok(Self {
6 changes: 3 additions & 3 deletions boost_manager/tests/integrations/common/mod.rs
Original file line number Diff line number Diff line change
@@ -40,15 +40,15 @@ impl HexBoostingInfoResolver for MockHexBoostingClient {
}

pub struct MockFileSinkReceiver {
pub receiver: tokio::sync::mpsc::Receiver<SinkMessage>,
pub receiver: tokio::sync::mpsc::Receiver<SinkMessage<BoostedHexUpdateProto>>,
}

impl MockFileSinkReceiver {
pub async fn receive(&mut self) -> Option<Vec<u8>> {
match timeout(seconds(2), self.receiver.recv()).await {
Ok(Some(SinkMessage::Data(on_write_tx, msg))) => {
let _ = on_write_tx.send(Ok(()));
Some(msg)
Some(msg.encode_to_vec())
}
Ok(None) => None,
Err(e) => panic!("timeout while waiting for message1 {:?}", e),
@@ -81,7 +81,7 @@ impl MockFileSinkReceiver {
}
}

pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) {
pub fn create_file_sink() -> (FileSinkClient<BoostedHexUpdateProto>, MockFileSinkReceiver) {
let (tx, rx) = tokio::sync::mpsc::channel(20);
(
FileSinkClient {