Skip to content

Commit

Permalink
Update to validate feed_id as well
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Jun 18, 2024
1 parent 31d37e4 commit 247acb6
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 26 deletions.
62 changes: 37 additions & 25 deletions price/src/price_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use file_store::file_sink;
use futures::{future::LocalBoxFuture, TryFutureExt};
use helium_anchor_gen::anchor_lang::AccountDeserialize;
use helium_proto::{BlockchainTokenTypeV1, PriceReportV1};
use pyth_solana_receiver_sdk::price_update::{PriceFeedMessage, PriceUpdateV2};
use pyth_solana_receiver_sdk::price_update::{FeedId, PriceUpdateV2};
use serde::{Deserialize, Serialize};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey as SolPubkey;
Expand Down Expand Up @@ -36,6 +36,7 @@ pub struct PriceGenerator {
interval_duration: std::time::Duration,
last_price_opt: Option<Price>,
key: Option<SolPubkey>,
feed_id: Option<FeedId>,
default_price: Option<u64>,
stale_price_duration: Duration,
latest_price_file: PathBuf,
Expand Down Expand Up @@ -91,6 +92,7 @@ impl PriceGenerator {
token_type,
client,
key: settings.price_key(token_type)?,
feed_id: settings.price_feed_id(token_type)?,
default_price: settings.default_price(token_type)?,
interval_duration: settings.interval,
stale_price_duration: settings.stale_price_duration,
Expand All @@ -102,9 +104,16 @@ impl PriceGenerator {
}

pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> {
match (self.key, self.default_price, self.file_sink.clone()) {
(Some(key), _, Some(file_sink)) => self.run_with_key(key, file_sink, &shutdown).await,
(None, Some(defaut_price), Some(file_sink)) => {
match (
self.key,
self.feed_id,
self.default_price,
self.file_sink.clone(),
) {
(Some(key), Some(feed_id), _, Some(file_sink)) => {
self.run_with_key(key, feed_id, file_sink, &shutdown).await
}
(None, None, Some(defaut_price), Some(file_sink)) => {
self.run_with_default(defaut_price, file_sink, &shutdown)
.await
}
Expand Down Expand Up @@ -150,6 +159,7 @@ impl PriceGenerator {
async fn run_with_key(
&mut self,
key: SolPubkey,
feed_id: FeedId,
file_sink: file_sink::FileSinkClient,
shutdown: &triggered::Listener,
) -> Result<()> {
Expand All @@ -161,7 +171,7 @@ impl PriceGenerator {
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = trigger.tick() => self.handle(&key, &file_sink).await?,
_ = trigger.tick() => self.handle(&key, &feed_id, &file_sink).await?,
}
}

Expand All @@ -172,9 +182,10 @@ impl PriceGenerator {
async fn handle(
&mut self,
key: &SolPubkey,
feed_id: &FeedId,
file_sink: &file_sink::FileSinkClient,
) -> Result<()> {
let price_opt = match self.get_pyth_price(key).await {
let price_opt = match self.get_pyth_price(key, feed_id).await {
Ok(new_price) => {
tracing::info!(
"updating price for {:?} to {}",
Expand Down Expand Up @@ -234,46 +245,47 @@ impl PriceGenerator {
Ok(())
}

async fn get_pyth_price(&self, price_key: &SolPubkey) -> Result<Price> {
async fn get_pyth_price(&self, price_key: &SolPubkey, feed_id: &FeedId) -> Result<Price> {
let account = self.client.get_account(price_key).await?;
let PriceUpdateV2 {
price_message:
PriceFeedMessage {
ema_price: price,
ema_conf: confidence,
exponent,
publish_time,
..
},
..
} = PriceUpdateV2::try_deserialize(&mut account.data.as_slice())?;

if publish_time.saturating_add(self.pyth_price_interval.as_secs() as i64)
let PriceUpdateV2 { price_message, .. } =
PriceUpdateV2::try_deserialize(&mut account.data.as_slice())?;

if price_message.feed_id != *feed_id {
bail!("Mismatched feed id");
}

if price_message
.publish_time
.saturating_add(self.pyth_price_interval.as_secs() as i64)
< Utc::now().timestamp()
{
bail!("Price is too old");
}

if price < 0 {
if price_message.ema_price < 0 {
bail!("Price is less than zero");
}

// Remove the confidence interval from the price to get the most optimistic price:
let optimistic_price = price as u64 + confidence * 2;
let optimistic_price = price_message.ema_price as u64 + price_message.ema_conf * 2;

// We want the price to have a resulting exponent of 10^-6
// I don't think it's possible for pyth to give us anything other than -8, but we make
// this robust just in case:
let exp = exponent + 6;
let exp = price_message.exponent + 6;
let adjusted_optimistic_price = match exp.cmp(&0) {
Ordering::Less => optimistic_price / 10_u64.pow(exp.unsigned_abs()),
Ordering::Greater => optimistic_price * 10_u64.pow(exp as u32),
_ => optimistic_price,
};

Ok(Price::new(
DateTime::from_timestamp(publish_time, 0)
.ok_or_else(|| anyhow!("Invalid publish time for price: {}", publish_time))?,
DateTime::from_timestamp(price_message.publish_time, 0).ok_or_else(|| {
anyhow!(
"Invalid publish time for price: {}",
price_message.publish_time
)
})?,
adjusted_optimistic_price,
self.token_type,
))
Expand Down
22 changes: 21 additions & 1 deletion price/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
use config::{Config, Environment, File};
use helium_proto::BlockchainTokenTypeV1;
use humantime_serde::re::humantime;
use pyth_solana_receiver_sdk::price_update::{get_feed_id_from_hex, FeedId};
use serde::Deserialize;
use solana_sdk::pubkey::Pubkey as SolPubkey;
use std::{path::Path, str::FromStr, time::Duration};
Expand All @@ -10,22 +11,28 @@ use std::{path::Path, str::FromStr, time::Duration};
pub struct ClusterConfig {
pub name: String,
pub hnt_price_key: Option<String>,
pub hnt_price_feed_id: Option<String>,
pub hnt_price: Option<u64>,
pub mobile_price_key: Option<String>,
pub mobile_price_feed_id: Option<String>,
pub mobile_price: Option<u64>,
pub iot_price_key: Option<String>,
pub iot_price_feed_id: Option<String>,
pub iot_price: Option<u64>,
}

impl Default for ClusterConfig {
fn default() -> Self {
Self {
name: "devnet".to_string(),
hnt_price_key: Some("6Eg8YdfFJQF2HHonzPUBSCCmyUEhrStg9VBLK957sBe6".to_string()),
hnt_price_key: None,
hnt_price_feed_id: None,
hnt_price: None,
mobile_price_key: None,
mobile_price_feed_id: None,
mobile_price: None,
iot_price_key: None,
iot_price_feed_id: None,
iot_price: None,
}
}
Expand Down Expand Up @@ -117,6 +124,19 @@ impl Settings {
.transpose()
}

pub fn price_feed_id(&self, token_type: BlockchainTokenTypeV1) -> Result<Option<FeedId>> {
let feed_id = match token_type {
BlockchainTokenTypeV1::Hnt => Ok(self.cluster.hnt_price_feed_id.as_deref()),
BlockchainTokenTypeV1::Mobile => Ok(self.cluster.mobile_price_feed_id.as_deref()),
BlockchainTokenTypeV1::Iot => Ok(self.cluster.iot_price_feed_id.as_deref()),
_ => Err(anyhow::anyhow!("token type not supported")),
}?;

feed_id
.map(|f| get_feed_id_from_hex(f).map_err(|_| anyhow::anyhow!("invalid feed id")))
.transpose()
}

pub fn default_price(&self, token_type: BlockchainTokenTypeV1) -> Result<Option<u64>> {
match token_type {
BlockchainTokenTypeV1::Hnt => Ok(self.cluster.hnt_price),
Expand Down

0 comments on commit 247acb6

Please sign in to comment.