Skip to content

Commit

Permalink
Added triples and presigs back to storage; update redis (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaoticTempest authored Jan 9, 2025
1 parent b263fc5 commit 5a628ea
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
run: |
docker pull ghcr.io/near/near-lake-indexer:node-2.3.0
docker pull localstack/localstack:3.5.0
docker pull redis:7.0.15
docker pull redis:7.4.2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ jobs:

- name: Pull Relayer & Sandbox Docker Images
run: |
docker pull ghcr.io/near/os-relayer:12ba6e35690df3979fce0b36a41d0ca0db9c0ab4
docker pull ghcr.io/near/near-lake-indexer:node-2.3.0
docker pull localstack/localstack:3.5.0
docker pull redis:7.4.2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
Expand Down
17 changes: 11 additions & 6 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,14 @@ impl PresignatureManager {
}
}

pub async fn insert(&mut self, presignature: Presignature, mine: bool) {
pub async fn insert(&mut self, presignature: Presignature, mine: bool, back: bool) {
let id = presignature.id;
tracing::debug!(id, mine, "inserting presignature");
if let Err(store_err) = self.presignature_storage.insert(presignature, mine).await {
if let Err(store_err) = self
.presignature_storage
.insert(presignature, mine, back)
.await
{
tracing::error!(?store_err, mine, "failed to insert presignature");
} else {
// Remove from taken list if it was there
Expand Down Expand Up @@ -437,9 +441,10 @@ impl PresignatureManager {
participants = ?presig_participants.keys_vec(),
"running: the intersection of participants is less than the threshold"
);
// We are not inserting triples back to
// - prevent triple reusage
// - prevent a situation where we have a full stockpile of triples that can not be used
// TODO: do not insert back triples when we have a clear model for data consistency
// between nodes and utilizing only triples that meet threshold requirements.
triple_manager.insert(triple0, true, true).await;
triple_manager.insert(triple1, true, true).await;
} else {
self.generate(
&presig_participants,
Expand Down Expand Up @@ -640,7 +645,7 @@ impl PresignatureManager {
});

for (presignature, mine) in presignatures {
self.insert(presignature, mine).await;
self.insert(presignature, mine, false).await;
}

if !errors.is_empty() {
Expand Down
3 changes: 3 additions & 0 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,9 @@ impl SignatureManager {
participants = ?sig_participants.keys_vec(),
"intersection of stable participants and presignature participants is less than threshold, trashing presignature"
);
// TODO: do not insert back presignature when we have a clear model for data consistency
// between nodes and utilizing only presignatures that meet threshold requirements.
presignature_manager.insert(presignature, true, true).await;
continue;
}

Expand Down
6 changes: 3 additions & 3 deletions chain-signatures/node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,10 @@ impl TripleManager {
}
}

pub async fn insert(&self, triple: Triple, mine: bool) {
pub async fn insert(&self, triple: Triple, mine: bool, back: bool) {
let id = triple.id;
tracing::debug!(id, mine, "inserting triple");
if let Err(e) = self.triple_storage.insert(triple, mine).await {
if let Err(e) = self.triple_storage.insert(triple, mine, back).await {
tracing::warn!(?e, mine, "failed to insert triple");
} else {
self.gc.write().await.remove(&id);
Expand Down Expand Up @@ -753,7 +753,7 @@ impl TripleManager {
}

for (triple, mine) in triples {
self.insert(triple, mine).await;
self.insert(triple, mine, false).await;
}

messages
Expand Down
41 changes: 27 additions & 14 deletions chain-signatures/node/src/storage/presignature_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ impl PresignatureStorage {
.map_err(StoreError::Connect)
}

pub async fn insert(&self, presignature: Presignature, mine: bool) -> StoreResult<()> {
/// Insert a presignature into the storage. If `mine` is true, the presignature will be
/// owned by the current node. If `back` is true, the presignature will be marked as unused.
pub async fn insert(
&self,
presignature: Presignature,
mine: bool,
back: bool,
) -> StoreResult<()> {
let mut conn = self.connect().await?;

let script = r#"
Expand All @@ -59,9 +66,12 @@ impl PresignatureStorage {
local presig_id = ARGV[1]
local presig_value = ARGV[2]
local mine = ARGV[3]
local back = ARGV[4]
if redis.call("SISMEMBER", used_key, presig_id) == 1 then
return {err = "Presignature " .. presig_id .. " has already been used"}
if back == "true" then
redis.call("HDEL", used_key, presig_id)
elseif redis.call('HEXISTS', used_key, presig_id) == 1 then
return {err = 'Presignature ' .. presig_id .. ' is already used'}
end
if mine == "true" then
Expand All @@ -80,6 +90,7 @@ impl PresignatureStorage {
.arg(presignature.id)
.arg(presignature)
.arg(mine.to_string())
.arg(back.to_string())
.invoke_async(&mut conn)
.await?;

Expand All @@ -100,7 +111,7 @@ impl PresignatureStorage {

pub async fn contains_used(&self, id: &PresignatureId) -> StoreResult<bool> {
let mut conn = self.connect().await?;
let result: bool = conn.sismember(&self.used_key, id).await?;
let result: bool = conn.hexists(&self.used_key, id).await?;
Ok(result)
}

Expand All @@ -112,20 +123,21 @@ impl PresignatureStorage {
local presig_key = KEYS[2]
local used_key = KEYS[3]
local presig_id = ARGV[1]
if redis.call('SISMEMBER', mine_key, presig_id) == 1 then
return {err = 'Cannot take mine presignature as foreign owned'}
end
local presig_value = redis.call("HGET", presig_key, presig_id)
if not presig_value then
return {err = "Presignature " .. presig_id .. " is missing"}
end
redis.call("HDEL", presig_key, presig_id)
redis.call("SADD", used_key, presig_id)
redis.call("EXPIRE", used_key, ARGV[2])
redis.call("HSET", used_key, presig_id, "1")
redis.call("HEXPIRE", used_key, ARGV[2], "FIELDS", "1", presig_id)
return presig_value
"#;

Expand All @@ -148,20 +160,21 @@ impl PresignatureStorage {
local mine_key = KEYS[1]
local presig_key = KEYS[2]
local used_key = KEYS[3]
local presig_id = redis.call("SPOP", mine_key)
if not presig_id then
return {err = "Mine presignature stockpile does not have enough presignatures"}
end
local presig_value = redis.call("HGET", presig_key, presig_id)
if not presig_value then
return {err = "Unexpected behavior. Presignature " .. presig_id .. " is missing"}
end
redis.call("HDEL", presig_key, presig_id)
redis.call("SADD", used_key, presig_id)
redis.call("EXPIRE", used_key, ARGV[1])
redis.call("HSET", used_key, presig_id, "1")
redis.call("HEXPIRE", used_key, ARGV[1], "FIELDS", "1", presig_id)
return presig_value
"#;

Expand Down
46 changes: 26 additions & 20 deletions chain-signatures/node/src/storage/triple_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl TripleStorage {
.map_err(StoreError::Connect)
}

pub async fn insert(&self, triple: Triple, mine: bool) -> StoreResult<()> {
pub async fn insert(&self, triple: Triple, mine: bool, back: bool) -> StoreResult<()> {
let mut conn = self.connect().await?;

let script = r#"
Expand All @@ -51,8 +51,11 @@ impl TripleStorage {
local triple_id = ARGV[1]
local triple_value = ARGV[2]
local mine = ARGV[3]
local back = ARGV[4]
if redis.call("SISMEMBER", used_key, triple_id) == 1 then
if back == "true" then
redis.call("HDEL", used_key, triple_id)
elseif redis.call("HEXISTS", used_key, triple_id) == 1 then
return {err = "Triple " .. triple_id .. " has already been used"}
end
Expand All @@ -72,6 +75,7 @@ impl TripleStorage {
.arg(triple.id)
.arg(triple)
.arg(mine.to_string())
.arg(back.to_string())
.invoke_async(&mut conn)
.await?;

Expand All @@ -92,7 +96,7 @@ impl TripleStorage {

pub async fn contains_used(&self, id: TripleId) -> StoreResult<bool> {
let mut conn = self.connect().await?;
let result: bool = conn.sismember(&self.used_key, id).await?;
let result: bool = conn.hexists(&self.used_key, id).await?;
Ok(result)
}

Expand All @@ -104,29 +108,30 @@ impl TripleStorage {
if redis.call("SISMEMBER", KEYS[2], ARGV[1]) == 1 then
return {err = "Triple " .. ARGV[1] .. " cannot be taken as foreign"}
end
if redis.call("SISMEMBER", KEYS[2], ARGV[2]) == 1 then
return {err = "Triple " .. ARGV[2] .. " cannot be taken as foreign"}
end
-- Fetch the triples
local v1 = redis.call("HGET", KEYS[1], ARGV[1])
if not v1 then
return {err = "Triple " .. ARGV[1] .. " is missing"}
end
local v2 = redis.call("HGET", KEYS[1], ARGV[2])
if not v2 then
return {err = "Triple " .. ARGV[2] .. " is missing"}
end
-- Delete the triples from the hash map
redis.call("HDEL", KEYS[1], ARGV[1], ARGV[2])
-- Add the triples to the used set and set expiration time
redis.call("SADD", KEYS[3], ARGV[1], ARGV[2])
redis.call("EXPIRE", KEYS[3], ARGV[3])
-- Add the triples to the used set and set expiration time. Note, HSET is used so
-- we can expire on each field instead of the whole hash set.
redis.call("HSET", KEYS[3], ARGV[1], "1", ARGV[2], "1")
redis.call("HEXPIRE", KEYS[3], ARGV[3], "FIELDS", 2, ARGV[1], ARGV[2])
-- Return the triples
return {v1, v2}
"#;
Expand All @@ -150,33 +155,34 @@ impl TripleStorage {
let lua_script = r#"
-- Check the number of triples in the set
local count = redis.call("SCARD", KEYS[1])
if count < 2 then
return {err = "Mine triple stockpile does not have enough triples"}
end
-- Pop two IDs atomically
local id1 = redis.call("SPOP", KEYS[1])
local id2 = redis.call("SPOP", KEYS[1])
-- Retrieve the corresponding triples
local v1 = redis.call("HGET", KEYS[2], id1)
if not v1 then
return {err = "Unexpected behavior. Triple " .. id1 .. " is missing"}
end
local v2 = redis.call("HGET", KEYS[2], id2)
if not v2 then
return {err = "Unexpected behavior. Triple " .. id2 .. " is missing"}
end
-- Delete the triples from the hash map
redis.call("HDEL", KEYS[2], id1, id2)
-- Add the triples to the used set and set expiration time
redis.call("SADD", KEYS[3], id1, id2)
redis.call("EXPIRE", KEYS[3], ARGV[1])
-- Add the triples to the used set and set expiration time. Note, HSET is used so
-- we can expire on each field instead of the whole hash set.
redis.call("HSET", KEYS[3], id1, "1", id2, "1")
redis.call("HEXPIRE", KEYS[3], ARGV[1], "FIELDS", 2, id1, id2)
-- Return the triples as a response
return {v1, v2}
"#;
Expand Down
9 changes: 1 addition & 8 deletions integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@
Running integration tests requires you to have redis and sandbox docker images present on your machine:

```BASH
docker pull ghcr.io/near/sandbox
docker pull redis:7.0.15
```

For M1 you may want to pull the following image instead:

```BASH
docker pull ghcr.io/near/sandbox:latest-aarch64
docker pull redis:7.4.2
```

In case of authorization issues make sure you have logged into docker using your [access token](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry#authenticating-with-a-personal-access-token-classic).
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl Redis {

pub async fn run(docker_client: &DockerClient, network: &str) -> Self {
tracing::info!("Running Redis container...");
let container = GenericImage::new("redis", "7.0.15")
let container = GenericImage::new("redis", "7.4.2")
.with_exposed_port(Self::DEFAULT_REDIS_PORT.tcp())
.with_wait_for(WaitFor::message_on_stdout("Ready to accept connections"))
.with_network(network)
Expand Down
Loading

0 comments on commit 5a628ea

Please sign in to comment.