Skip to content

Commit

Permalink
ref(processor): Remove project id state field (#4388)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Dec 13, 2024
1 parent 06f956a commit 3423d8b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 85 deletions.
137 changes: 60 additions & 77 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,13 +776,6 @@ struct ProcessEnvelopeState<Group> {
/// This is the config used for trace-based dynamic sampling.
sampling_project_info: Option<Arc<ProjectInfo>>,

/// The id of the project that this envelope is ingested into.
///
/// This identifier can differ from the one stated in the Envelope's DSN if the key was moved to
/// a new project or on the legacy endpoint. In that case, normalization will update the project
/// ID.
project_id: ProjectId,

/// The managed envelope before processing.
managed_envelope: TypedEnvelope<Group>,
}
Expand Down Expand Up @@ -1230,13 +1223,17 @@ impl EnvelopeProcessorService {

/// Normalize monitor check-ins and remove invalid ones.
#[cfg(feature = "processing")]
fn process_check_ins(&self, state: &mut ProcessEnvelopeState<CheckInGroup>) {
fn process_check_ins(
&self,
state: &mut ProcessEnvelopeState<CheckInGroup>,
project_id: ProjectId,
) {
state.managed_envelope.retain_items(|item| {
if item.ty() != &ItemType::CheckIn {
return ItemAction::Keep;
}

match relay_monitors::process_check_in(&item.payload(), state.project_id) {
match relay_monitors::process_check_in(&item.payload(), project_id) {
Ok(result) => {
item.set_routing_hint(result.routing_hint);
item.set_payload(ContentType::Json, result.payload);
Expand All @@ -1254,50 +1251,6 @@ impl EnvelopeProcessorService {
})
}

/// Creates and initializes the processing state.
///
/// This applies defaults to the envelope and initializes empty rate limits.
#[allow(clippy::too_many_arguments)]
fn prepare_state<G>(
&self,
config: Arc<Config>,
mut managed_envelope: TypedEnvelope<G>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
rate_limits: Arc<RateLimits>,
sampling_project_info: Option<Arc<ProjectInfo>>,
) -> ProcessEnvelopeState<G> {
let envelope = managed_envelope.envelope_mut();

// Set the event retention. Effectively, this value will only be available in processing
// mode when the full project config is queried from the upstream.
if let Some(retention) = project_info.config.event_retention {
envelope.set_retention(retention);
}

// Ensure the project ID is updated to the stored instance for this project cache. This can
// differ in two cases:
// 1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
// 2. The DSN was moved and the envelope sent to the old project ID.
envelope.meta_mut().set_project_id(project_id);

let extracted_metrics = ProcessingExtractedMetrics::new();

ProcessEnvelopeState {
event: Annotated::empty(),
event_metrics_extracted: false,
spans_extracted: false,
metrics: Metrics::default(),
extracted_metrics,
project_info,
rate_limits,
config,
sampling_project_info,
project_id,
managed_envelope,
}
}

#[cfg(feature = "processing")]
fn enforce_quotas<G>(
&self,
Expand Down Expand Up @@ -1332,6 +1285,7 @@ impl EnvelopeProcessorService {
fn extract_transaction_metrics(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
project_id: ProjectId,
sampling_decision: SamplingDecision,
) -> Result<(), ProcessingError> {
if state.event_metrics_extracted {
Expand Down Expand Up @@ -1406,7 +1360,7 @@ impl EnvelopeProcessorService {
event,
combined_config,
sampling_decision,
state.project_id,
project_id,
self.inner
.config
.aggregator_config_for(MetricNamespace::Spans)
Expand All @@ -1431,7 +1385,7 @@ impl EnvelopeProcessorService {
generic_config: Some(combined_config),
transaction_from_dsc,
sampling_decision,
target_project_id: state.project_id,
target_project_id: project_id,
};

state
Expand All @@ -1447,6 +1401,7 @@ impl EnvelopeProcessorService {
fn normalize_event<G: EventProcessing>(
&self,
state: &mut ProcessEnvelopeState<G>,
project_id: ProjectId,
mut event_fully_normalized: EventFullyNormalized,
) -> Result<Option<EventFullyNormalized>, ProcessingError> {
if !state.has_event() {
Expand Down Expand Up @@ -1510,7 +1465,7 @@ impl EnvelopeProcessorService {
}

let normalization_config = NormalizationConfig {
project_id: Some(state.project_id.value()),
project_id: Some(project_id.value()),
client: request_meta.client().map(str::to_owned),
key_id,
protocol_version: Some(request_meta.version().to_string()),
Expand Down Expand Up @@ -1593,6 +1548,7 @@ impl EnvelopeProcessorService {
fn process_errors(
&self,
state: &mut ProcessEnvelopeState<ErrorGroup>,
project_id: ProjectId,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());

Expand All @@ -1616,7 +1572,7 @@ impl EnvelopeProcessorService {

event::finalize(state, &self.inner.config)?;
if let Some(inner_event_fully_normalized) =
self.normalize_event(state, event_fully_normalized)?
self.normalize_event(state, project_id, event_fully_normalized)?
{
event_fully_normalized = inner_event_fully_normalized;
};
Expand All @@ -1640,7 +1596,7 @@ impl EnvelopeProcessorService {

if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %state.project_id,
tags.project = %project_id,
tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()),
"ingested event without normalizing"
);
Expand All @@ -1653,6 +1609,7 @@ impl EnvelopeProcessorService {
fn process_transactions(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
project_id: ProjectId,
reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());
Expand All @@ -1661,12 +1618,12 @@ impl EnvelopeProcessorService {

event::extract(state, event_fully_normalized, &self.inner.config)?;

let profile_id = profile::filter(state);
let profile_id = profile::filter(state, project_id);
profile::transfer_id(state, profile_id);

event::finalize(state, &self.inner.config)?;
if let Some(inner_event_fully_normalized) =
self.normalize_event(state, event_fully_normalized)?
self.normalize_event(state, project_id, event_fully_normalized)?
{
event_fully_normalized = inner_event_fully_normalized;
}
Expand Down Expand Up @@ -1701,7 +1658,7 @@ impl EnvelopeProcessorService {
// Before metric extraction to make sure the profile count is reflected correctly.
profile::process(state, &global_config);
// Extract metrics here, we're about to drop the event/transaction.
self.extract_transaction_metrics(state, SamplingDecision::Drop)?;
self.extract_transaction_metrics(state, project_id, SamplingDecision::Drop)?;

dynamic_sampling::drop_unsampled_items(state, outcome);

Expand All @@ -1728,7 +1685,7 @@ impl EnvelopeProcessorService {
profile::transfer_id(state, profile_id);

// Always extract metrics in processing Relays for sampled items.
self.extract_transaction_metrics(state, SamplingDecision::Keep)?;
self.extract_transaction_metrics(state, project_id, SamplingDecision::Keep)?;

if state
.project_info
Expand All @@ -1749,7 +1706,7 @@ impl EnvelopeProcessorService {

if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
relay_log::error!(
tags.project = %state.project_id,
tags.project = %project_id,
tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()),
"ingested event without normalizing"
);
Expand Down Expand Up @@ -1777,8 +1734,9 @@ impl EnvelopeProcessorService {
fn process_standalone(
&self,
state: &mut ProcessEnvelopeState<StandaloneGroup>,
project_id: ProjectId,
) -> Result<(), ProcessingError> {
profile::filter(state);
profile::filter(state, project_id);

if_processing!(self.inner.config, {
self.enforce_quotas(state)?;
Expand Down Expand Up @@ -1834,11 +1792,12 @@ impl EnvelopeProcessorService {
/// Processes cron check-ins.
fn process_checkins(
&self,
_state: &mut ProcessEnvelopeState<CheckInGroup>,
#[allow(unused_variables)] state: &mut ProcessEnvelopeState<CheckInGroup>,
#[allow(unused_variables)] project_id: ProjectId,
) -> Result<(), ProcessingError> {
if_processing!(self.inner.config, {
self.enforce_quotas(_state)?;
self.process_check_ins(_state);
self.enforce_quotas(state)?;
self.process_check_ins(state, project_id);
});
Ok(())
}
Expand All @@ -1849,6 +1808,7 @@ impl EnvelopeProcessorService {
fn process_standalone_spans(
&self,
state: &mut ProcessEnvelopeState<SpanGroup>,
#[allow(unused_variables)] project_id: ProjectId,
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
) -> Result<(), ProcessingError> {
span::filter(state);
Expand All @@ -1863,6 +1823,7 @@ impl EnvelopeProcessorService {

span::process(
state,
project_id,
&global_config,
self.inner.geoip_lookup.as_ref(),
&reservoir,
Expand Down Expand Up @@ -1895,17 +1856,37 @@ impl EnvelopeProcessorService {
.parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
}

// Set the event retention. Effectively, this value will only be available in processing
// mode when the full project config is queried from the upstream.
if let Some(retention) = project_info.config.event_retention {
managed_envelope.envelope_mut().set_retention(retention);
}

// Ensure the project ID is updated to the stored instance for this project cache. This can
// differ in two cases:
// 1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
// 2. The DSN was moved and the envelope sent to the old project ID.
managed_envelope
.envelope_mut()
.meta_mut()
.set_project_id(project_id);

macro_rules! run {
($fn_name:ident $(, $args:expr)*) => {{
let managed_envelope = managed_envelope.try_into()?;
let mut state = self.prepare_state(
self.inner.config.clone(),
managed_envelope,
project_id,
let mut state = ProcessEnvelopeState {
event: Annotated::empty(),
event_metrics_extracted: false,
spans_extracted: false,
metrics: Metrics::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: self.inner.config.clone(),
project_info,
rate_limits,
sampling_project_info,
);
managed_envelope,
};

// The state is temporarily supplied, until it will be removed.
match self.$fn_name(&mut state, $($args),*) {
Ok(()) => Ok(ProcessingStateResult {
Expand All @@ -1926,14 +1907,16 @@ impl EnvelopeProcessorService {
relay_log::trace!("Processing {group} group", group = group.variant());

match group {
ProcessingGroup::Error => run!(process_errors),
ProcessingGroup::Transaction => run!(process_transactions, reservoir_counters),
ProcessingGroup::Error => run!(process_errors, project_id),
ProcessingGroup::Transaction => {
run!(process_transactions, project_id, reservoir_counters)
}
ProcessingGroup::Session => run!(process_sessions),
ProcessingGroup::Standalone => run!(process_standalone),
ProcessingGroup::Standalone => run!(process_standalone, project_id),
ProcessingGroup::ClientReport => run!(process_client_reports),
ProcessingGroup::Replay => run!(process_replays),
ProcessingGroup::CheckIn => run!(process_checkins),
ProcessingGroup::Span => run!(process_standalone_spans, reservoir_counters),
ProcessingGroup::CheckIn => run!(process_checkins, project_id),
ProcessingGroup::Span => run!(process_standalone_spans, project_id, reservoir_counters),
ProcessingGroup::ProfileChunk => run!(process_profile_chunks),
// Currently is not used.
ProcessingGroup::Metrics => {
Expand Down
4 changes: 1 addition & 3 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod tests {

use bytes::Bytes;
use relay_base_schema::events::EventType;
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_base_schema::project::ProjectKey;
use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
use relay_event_schema::protocol::{EventId, LenientString};
use relay_protocol::RuleCondition;
Expand Down Expand Up @@ -441,7 +441,6 @@ mod tests {
project_info,
rate_limits: Default::default(),
sampling_project_info: None,
project_id: ProjectId::new(42),
managed_envelope: ManagedEnvelope::new(
envelope,
outcome_aggregator.clone(),
Expand Down Expand Up @@ -745,7 +744,6 @@ mod tests {
}));
Some(Arc::new(state))
},
project_id: ProjectId::new(1),
managed_envelope: ManagedEnvelope::new(
envelope,
Addr::dummy(),
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/processor/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::IpAddr;
use relay_dynamic_config::{Feature, GlobalConfig};

use relay_base_schema::events::EventType;
use relay_base_schema::project::ProjectId;
use relay_config::Config;
use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
use relay_filter::ProjectFiltersConfig;
Expand All @@ -18,7 +19,7 @@ use crate::utils::ItemAction;
/// Filters out invalid and duplicate profiles.
///
/// Returns the profile id of the single remaining profile, if there is one.
pub fn filter<G>(state: &mut ProcessEnvelopeState<G>) -> Option<ProfileId> {
pub fn filter<G>(state: &mut ProcessEnvelopeState<G>, project_id: ProjectId) -> Option<ProfileId> {
let profiling_disabled = state.should_filter(Feature::Profiling);
let has_transaction = state.event_type() == Some(EventType::Transaction);
let keep_unsampled_profiles = true;
Expand All @@ -38,7 +39,7 @@ pub fn filter<G>(state: &mut ProcessEnvelopeState<G>) -> Option<ProfileId> {
return ItemAction::DropSilently;
}

match relay_profiling::parse_metadata(&item.payload(), state.project_id) {
match relay_profiling::parse_metadata(&item.payload(), project_id) {
Ok(id) => {
profile_id = Some(id);
ItemAction::Keep
Expand Down
Loading

0 comments on commit 3423d8b

Please sign in to comment.