Skip to content

Commit

Permalink
Unsettled transfer sessions (#857)
Browse files Browse the repository at this point in the history
* use proto with settlement status

* add table for pending data transfer burns

Check the status of a data transfer session event and store in the
corresponding table

* add simple test for pending data transfer sessions that are pending

* update to helium-proto with pending data transfer message

* use timestamp columns that let us know when a session occurred for settlement

first and last timestamp makes sense for burning because we try and do
that as often as possible.

But for something that may be sitting around for a long time...

A settlement message will come in and try to pick up all sessions within
a window (`recv_timestamp`).

`inserted_at` is for us to help with debugging.

* break out process_file for readability

cargo fmt really does not like lots of code inside tokio::select! macros

* Write pending data transfer sessions to s3

* allow all the arguments

* Use consistent timestamp granularity within a single message

* go back to proto master
  • Loading branch information
michaeldjeffrey authored Aug 29, 2024
1 parent ca12bb6 commit e6adffc
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 31 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
strategy:
fail-fast: false
matrix:
package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier]
package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-packet-verifier,mobile-verifier]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-tests-postgres-${{ matrix.package }}
cancel-in-progress: true
Expand Down Expand Up @@ -142,7 +142,7 @@ jobs:
strategy:
fail-fast: false
matrix:
package: [coverage-map,coverage-point-calculator,ingest,mobile-packet-verifier,reward-scheduler,task-manager]
package: [coverage-map,coverage-point-calculator,ingest,reward-scheduler,task-manager]
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-tests-${{ matrix.package }}
cancel-in-progress: true
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ing
pub const INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT: &str =
"invalid_data_transfer_session_ingest_report";
pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session";
pub const PENDING_DATA_TRANSFER_SESION: &str = "pending_data_transfer_sesion";
pub const PRICE_REPORT: &str = "price_report";
pub const MOBILE_REWARD_SHARE: &str = "mobile_reward_share";
pub const MAPPER_MSG: &str = "mapper_msg";
Expand Down Expand Up @@ -193,6 +194,7 @@ pub enum FileType {
DataTransferSessionIngestReport,
InvalidDataTransferSessionIngestReport,
ValidDataTransferSession,
PendingDataTransferSession,
PriceReport,
MobileRewardShare,
SubscriberLocationReq,
Expand Down Expand Up @@ -270,6 +272,7 @@ impl fmt::Display for FileType {
INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT
}
Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION,
Self::PendingDataTransferSession => PENDING_DATA_TRANSFER_SESION,
Self::PriceReport => PRICE_REPORT,
Self::MobileRewardShare => MOBILE_REWARD_SHARE,
Self::MapperMsg => MAPPER_MSG,
Expand Down Expand Up @@ -344,6 +347,7 @@ impl FileType {
INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT
}
Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION,
Self::PendingDataTransferSession => PENDING_DATA_TRANSFER_SESION,
Self::PriceReport => PRICE_REPORT,
Self::MobileRewardShare => MOBILE_REWARD_SHARE,
Self::MapperMsg => MAPPER_MSG,
Expand Down Expand Up @@ -418,6 +422,7 @@ impl FromStr for FileType {
Self::InvalidDataTransferSessionIngestReport
}
VALID_DATA_TRANSFER_SESSION => Self::ValidDataTransferSession,
PENDING_DATA_TRANSFER_SESION => Self::PendingDataTransferSession,
PRICE_REPORT => Self::PriceReport,
MOBILE_REWARD_SHARE => Self::MobileRewardShare,
MAPPER_MSG => Self::MapperMsg,
Expand Down
30 changes: 28 additions & 2 deletions file_store/src/mobile_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus,
DataTransferEvent as DataTransferEventProto, DataTransferRadioAccessTechnology,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1, InvalidDataTransferIngestReportV1,
DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionSettlementStatus, InvalidDataTransferIngestReportV1,
PendingDataTransferSessionV1,
};

use serde::Serialize;
Expand Down Expand Up @@ -181,6 +183,7 @@ pub struct DataTransferSessionReq {
pub rewardable_bytes: u64,
pub pub_key: PublicKeyBinary,
pub signature: Vec<u8>,
pub status: DataTransferSessionSettlementStatus,
}

impl MsgDecode for DataTransferSessionReq {
Expand All @@ -191,6 +194,7 @@ impl TryFrom<DataTransferSessionReqV1> for DataTransferSessionReq {
type Error = Error;

fn try_from(v: DataTransferSessionReqV1) -> Result<Self> {
let status = v.status();
Ok(Self {
rewardable_bytes: v.rewardable_bytes,
signature: v.signature,
Expand All @@ -199,6 +203,7 @@ impl TryFrom<DataTransferSessionReqV1> for DataTransferSessionReq {
.ok_or_else(|| Error::not_found("data transfer usage"))?
.try_into()?,
pub_key: v.pub_key.into(),
status,
})
}
}
Expand All @@ -212,7 +217,28 @@ impl From<DataTransferSessionReq> for DataTransferSessionReqV1 {
rewardable_bytes: v.rewardable_bytes,
pub_key: v.pub_key.into(),
signature: v.signature,
..Default::default()
reward_cancelled: false,
status: v.status as i32,
}
}
}

impl DataTransferSessionReq {
pub fn to_pending_proto(
self,
received_timestamp: DateTime<Utc>,
) -> PendingDataTransferSessionV1 {
let event_timestamp = self.data_transfer_usage.timestamp.encode_timestamp_millis();
let received_timestamp = received_timestamp.encode_timestamp_millis();

PendingDataTransferSessionV1 {
pub_key: self.pub_key.into(),
payer: self.data_transfer_usage.payer.into(),
upload_bytes: self.data_transfer_usage.upload_bytes,
download_bytes: self.data_transfer_usage.download_bytes,
rewardable_bytes: self.rewardable_bytes,
event_timestamp,
received_timestamp,
}
}
}
5 changes: 5 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ impl_file_sink!(
FileType::OracleBoostingReport.to_str(),
"oracle_boosting_report"
);
impl_file_sink!(
poc_mobile::PendingDataTransferSessionV1,
FileType::PendingDataTransferSession.to_str(),
"pending_data_transfer_session"
);
impl_file_sink!(
poc_mobile::RadioThresholdIngestReportV1,
FileType::RadioThresholdIngestReport.to_str(),
Expand Down
3 changes: 3 additions & 0 deletions mobile_packet_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ http-serde = { workspace = true }
sha2 = { workspace = true }
humantime-serde = { workspace = true }
custom-tracing = { path = "../custom_tracing" }

[dev-dependencies]
rand = { workspace = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE pending_data_transfer_sessions (
pub_key TEXT NOT NULL,
payer TEXT NOT NULL,
event_id TEXT NOT NULL,
uploaded_bytes BIGINT NOT NULL,
downloaded_bytes BIGINT NOT NULL,
rewardable_bytes BIGINT NOT NULL,
recv_timestamp TIMESTAMPTZ NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY(pub_key, payer, recv_timestamp)
);
Loading

0 comments on commit e6adffc

Please sign in to comment.