Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prod Release 06/08/24 #970

Merged
merged 10 commits into from
Aug 6, 2024
14 changes: 14 additions & 0 deletions .github/workflows/coordinator-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ jobs:
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.75.0
override: true
profile: minimal
components: rustfmt
- name: Check
working-directory: ./coordinator
run: cargo check
Expand All @@ -33,6 +40,13 @@ jobs:
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.75.0
override: true
profile: minimal
components: rustfmt
- name: Test
working-directory: ./coordinator
run: cargo test
Expand Down
30 changes: 18 additions & 12 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,22 @@ impl BlockStream {
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
fn start_health_monitoring_task(
&self,
redis: Arc<RedisClient>,
start_block_height: near_indexer_primitives::types::BlockHeight,
) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();
let stalled_timeout_seconds = 120;

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

let mut last_processed_block = Some(start_block_height - 1);
loop {
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds))
.await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
Expand Down Expand Up @@ -183,6 +187,11 @@ impl BlockStream {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"No block has been processed for {stalled_timeout_seconds} seconds"
);
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
Expand Down Expand Up @@ -266,7 +275,8 @@ impl BlockStream {

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());
let monitor_handle =
self.start_health_monitoring_task(redis.clone(), start_block_height.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
Expand Down Expand Up @@ -533,11 +543,7 @@ mod tests {
predicate::eq("near-lake-data-mainnet".to_string()),
predicate::eq("000091940840/block.json"),
)
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
"2023-12-09",
))
});
.returning(move |_, _| Ok(crate::test_utils::generate_block_with_date("2023-12-09")));

let mut mock_graphql_client = crate::graphql::client::GraphQLClient::default();

Expand Down Expand Up @@ -663,7 +669,7 @@ mod tests {
predicate::eq("000107503704/block.json"),
)
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
Ok(crate::test_utils::generate_block_with_date(
&chrono::Utc::now().format("%Y-%m-%d").to_string(),
))
});
Expand Down
22 changes: 15 additions & 7 deletions block-streamer/src/receiver_blocks/receiver_blocks_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use async_stream::try_stream;
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono::{DateTime, Duration, TimeZone, Timelike, Utc};
use near_lake_framework::near_indexer_primitives;
use regex::Regex;

Expand Down Expand Up @@ -165,7 +165,15 @@ impl ReceiverBlocksProcessor {
try_stream! {
let start_date = self.get_nearest_block_date(start_block_height).await?;
let contract_pattern_type = ContractPatternType::from(contract_pattern.as_str());
let mut current_date = start_date;
let mut current_date = start_date
.with_hour(0)
.unwrap()
.with_minute(0)
.unwrap()
.with_second(0)
.unwrap()
.with_nanosecond(0)
.unwrap();

while current_date <= Utc::now() {
let base_64_bitmaps: Vec<Base64Bitmap> = self.query_base_64_bitmaps(&contract_pattern_type, &current_date).await?;
Expand Down Expand Up @@ -288,13 +296,13 @@ mod tests {
}

#[tokio::test]
async fn collect_block_heights_from_one_day() {
async fn collect_block_heights_from_today() {
let mut mock_s3_client = crate::s3_client::S3Client::default();
mock_s3_client
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&Utc::now().format("%Y-%m-%d").to_string(),
&Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
))
});

Expand Down Expand Up @@ -326,7 +334,7 @@ mod tests {
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&Utc::now().format("%Y-%m-%d").to_string(),
&Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
))
});

Expand Down Expand Up @@ -362,8 +370,8 @@ mod tests {
.expect_get_text_file()
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
&(Utc::now() - Duration::days(2))
.format("%Y-%m-%d")
&(Utc::now() - Duration::days(2) + Duration::minutes(10))
.format("%Y-%m-%dT%H:%M:%S")
.to_string(),
))
});
Expand Down
2 changes: 1 addition & 1 deletion block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ mod tests {
predicate::always(),
)
.returning(move |_, _| {
Ok(crate::test_utils::generate_block_with_timestamp(
Ok(crate::test_utils::generate_block_with_date(
&chrono::Utc::now().format("%Y-%m-%d").to_string(),
))
});
Expand Down
7 changes: 6 additions & 1 deletion block-streamer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,16 @@ pub fn utc_date_time_from_date_string(date: &str) -> chrono::DateTime<chrono::Ut
chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive_date_time)
}

pub fn generate_block_with_timestamp(date: &str) -> String {
pub fn generate_block_with_date(date: &str) -> String {
let naive_date = chrono::NaiveDate::parse_from_str(date, "%Y-%m-%d")
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
return generate_block_with_timestamp(&naive_date.format("%Y-%m-%dT%H:%M:%S").to_string());
}

pub fn generate_block_with_timestamp(date: &str) -> String {
let naive_date = chrono::NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%S").unwrap();

let date_time_utc = chrono::Utc.from_utc_datetime(&naive_date).timestamp() * 1_000_000_000;

Expand Down
Loading
Loading