Skip to content

Commit

Permalink
refactor: clean up the interface a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaRedHand committed Jan 24, 2025
1 parent 9d9e379 commit 3e95909
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 32 deletions.
46 changes: 22 additions & 24 deletions crates/aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,26 @@ pub mod is_impl {
event: Self::NewTaskEvent,
) -> impl Future<Output = TaskInfo> + Send;

/// Processes a task response, returning the response's digest
fn process_task_response(
&self,
event: Self::TaskResponse,
) -> impl Future<Output = B256> + Send;

fn process_aggregated_response(
&self,
response: BlsAggregationServiceResponse,
) -> impl Future<Output = ()>;

// TODO: move these to a separate "TaskResponse" trait
/// Hashes a task response
fn hash_task_response(
&self,
task_response: &Self::TaskResponse,
) -> impl Future<Output = B256>;

fn task_response_get_task_index(task_response: &Self::TaskResponse) -> TaskIndex;
}

// pub trait TaskResponse {
// fn hash(&self) -> B256;
// }

#[derive(Debug)]
/// Task Processor for the Incredible Squaring Task Manager
pub struct ISTaskProcessor {
Expand Down Expand Up @@ -161,12 +166,9 @@ pub mod is_impl {
}
}

async fn hash_task_response(&self, task_response: &Self::TaskResponse) -> B256 {
let hash = alloy::primitives::keccak256(TaskResponse::abi_encode(task_response));
self.task_responses
.lock()
.await
.insert(hash, task_response.clone());
async fn process_task_response(&self, task_response: Self::TaskResponse) -> B256 {
let hash = alloy::primitives::keccak256(TaskResponse::abi_encode(&task_response));
self.task_responses.lock().await.insert(hash, task_response);
hash
}

Expand Down Expand Up @@ -437,22 +439,18 @@ impl Aggregator {
<ISTaskProcessor as TaskProcessor>::TaskResponse,
>,
) -> Result<(), AggregatorError> {
let task_index = <ISTaskProcessor as TaskProcessor>::task_response_get_task_index(
signed_task_response.task_response(),
);
let SignedTaskResponseImpl {
task_response,
signature,
operator_id,
} = signed_task_response;
let task_index =
<ISTaskProcessor as TaskProcessor>::task_response_get_task_index(&task_response);

let task_response_digest = self
.tp
.hash_task_response(&signed_task_response.task_response())
.await;
let task_response_digest = self.tp.process_task_response(task_response).await;

self.bls_aggregation_service
.process_new_signature(
task_index,
task_response_digest,
signed_task_response.signature(),
signed_task_response.operator_id(),
)
.process_new_signature(task_index, task_response_digest, signature, operator_id)
.await?;
info!("processed signature for index {:?}", task_index);
let quorum_reached = {
Expand Down
14 changes: 6 additions & 8 deletions crates/aggregator/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ pub type SignedTaskResponse = SignedTaskResponseImpl<TaskResponse>;
/// Signed Task Response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignedTaskResponseImpl<T> {
task_response: T,
signature: Signature,
operator_id: OperatorId,
/// A response to a task
pub task_response: T,
/// Signature of the task response
pub signature: Signature,
/// ID of the operator corresponding to the signature
pub operator_id: OperatorId,
}

impl<T: Serialize + for<'de> Deserialize<'de>> SignedTaskResponseImpl<T> {
Expand All @@ -34,9 +37,4 @@ impl<T: Serialize + for<'de> Deserialize<'de>> SignedTaskResponseImpl<T> {
pub fn operator_id(&self) -> OperatorId {
self.operator_id
}

/// [`TaskResponse`]
pub fn task_response(&self) -> &T {
&self.task_response
}
}

0 comments on commit 3e95909

Please sign in to comment.