22
33use std:: error:: Error ;
44use std:: num:: NonZeroU8 ;
5- use std:: sync:: Arc ;
65use std:: sync:: atomic:: Ordering ;
76use std:: sync:: atomic:: { AtomicBool , AtomicU64 , AtomicUsize } ;
7+ use std:: sync:: { Arc , LazyLock } ;
88use std:: time:: Duration ;
99
1010use ahash:: RandomState ;
1111use chrono:: DateTime ;
1212use chrono:: Utc ;
13+ use relay_base_schema:: project:: ProjectKey ;
1314use relay_config:: { Config , EnvelopeSpoolPartitioning } ;
1415use relay_system:: Receiver ;
1516use relay_system:: ServiceSpawn ;
@@ -26,7 +27,7 @@ use crate::services::outcome::DiscardReason;
2627use crate :: services:: outcome:: Outcome ;
2728use crate :: services:: outcome:: TrackOutcome ;
2829use crate :: services:: processor:: { EnvelopeProcessor , ProcessEnvelope } ;
29- use crate :: services:: projects:: cache:: { ProjectCacheHandle , ProjectChange } ;
30+ use crate :: services:: projects:: cache:: { Project , ProjectCacheHandle , ProjectChange } ;
3031use crate :: statsd:: RelayCounters ;
3132
3233use crate :: MemoryChecker ;
@@ -44,7 +45,7 @@ pub use envelope_stack::EnvelopeStack;
4445// pub for benchmarks
4546pub use envelope_store:: sqlite:: SqliteEnvelopeStore ;
4647
47- use crate :: services:: projects:: project:: ProjectState ;
48+ use crate :: services:: projects:: project:: { ProjectInfo , ProjectState } ;
4849pub use common:: ProjectKeyPair ;
4950
5051mod common;
@@ -536,91 +537,50 @@ impl EnvelopeBufferService {
536537 buffer : & mut PolymorphicEnvelopeBuffer ,
537538 project_key_pair : ProjectKeyPair ,
538539 ) -> Result < ( ) , EnvelopeBufferError > {
539- let own_key = project_key_pair. own_key ;
540- let own_project = services. project_cache_handle . get ( own_key) ;
541- // We try to load the own project state and bail in case it's pending.
542- let own_project_info = match own_project. state ( ) {
543- ProjectState :: Enabled ( info) => Some ( info. clone ( ) ) ,
544- ProjectState :: Disabled => None ,
545- ProjectState :: Pending => {
546- buffer. mark_ready ( & own_key, false ) ;
540+ macro_rules! pop_envelope {
541+ ( ) => { {
542+ relay_log:: trace!( "EnvelopeBufferService: popping envelope" ) ;
543+ // If we arrived here, know that both projects are available, so we pop the envelope.
544+ //
545+ // Available, doesn't necessarily mean enabled/active.
546+ let envelope = buffer. pop( ) . await ?;
547+ let envelope = envelope. expect( "Element disappeared despite exclusive excess" ) ;
548+ Managed :: from_envelope( envelope, services. outcome_aggregator. clone( ) )
549+ } } ;
550+ }
551+
552+ match resolve_project ( & services. project_cache_handle , project_key_pair) {
553+ ResolvedProject :: Enabled {
554+ own_project,
555+ own_project_info,
556+ sampling_project_info,
557+ } => {
558+ let mut envelope = pop_envelope ! ( ) ;
559+ if own_project. check_envelope ( & mut envelope) . await . is_err ( ) || envelope. is_empty ( ) {
560+ // Outcomes are emitted by `check_envelope`.
561+ return Ok ( ( ) ) ;
562+ } ;
563+
564+ services. envelope_processor . send ( ProcessEnvelope {
565+ envelope : envelope. into ( ) ,
566+ project_info : own_project_info,
567+ rate_limits : own_project. rate_limits ( ) . current_limits ( ) ,
568+ sampling_project_info,
569+ } ) ;
570+ }
571+ // If the own project state is disabled, we want to drop the envelope.
572+ ResolvedProject :: Disabled => {
573+ let _ = pop_envelope ! ( ) . reject_err ( Outcome :: Invalid ( DiscardReason :: ProjectId ) ) ;
574+ }
575+ ResolvedProject :: NotReady ( key) => {
576+ buffer. mark_ready ( & key, false ) ;
547577 relay_statsd:: metric!(
548578 counter( RelayCounters :: BufferProjectPending ) += 1 ,
549579 partition_id = partition_tag
550580 ) ;
551-
552- return Ok ( ( ) ) ;
553- }
554- } ;
555-
556- let sampling_key = project_key_pair. sampling_key ;
557- // If the projects are different, we load the project key of the sampling project. On the
558- // other hand, if they are the same, we just reuse the own project.
559- let sampling_project_info = if project_key_pair. has_distinct_sampling_key ( ) {
560- // We try to load the sampling project state and bail in case it's pending.
561- match services. project_cache_handle . get ( sampling_key) . state ( ) {
562- ProjectState :: Enabled ( info) => Some ( info. clone ( ) ) ,
563- ProjectState :: Disabled => None ,
564- ProjectState :: Pending => {
565- buffer. mark_ready ( & sampling_key, false ) ;
566- relay_statsd:: metric!(
567- counter( RelayCounters :: BufferProjectPending ) += 1 ,
568- partition_id = partition_tag
569- ) ;
570-
571- return Ok ( ( ) ) ;
572- }
573581 }
574- } else {
575- own_project_info. clone ( )
576- } ;
577-
578- relay_log:: trace!( "EnvelopeBufferService: popping envelope" ) ;
579-
580- // If we arrived here, know that both projects are available, so we pop the envelope.
581- let envelope = buffer
582- . pop ( )
583- . await ?
584- . expect ( "Element disappeared despite exclusive excess" ) ;
585-
586- // If the own project state is disabled, we want to drop the envelope and early return since
587- // we can't do much about it.
588- let Some ( own_project_info) = own_project_info else {
589- let mut managed_envelope =
590- ManagedEnvelope :: new ( envelope, services. outcome_aggregator . clone ( ) ) ;
591- managed_envelope. reject ( Outcome :: Invalid ( DiscardReason :: ProjectId ) ) ;
592-
593- return Ok ( ( ) ) ;
594- } ;
595-
596- // We only extract the sampling project info if both projects belong to the same org.
597- let sampling_project_info = sampling_project_info
598- . filter ( |info| info. organization_id == own_project_info. organization_id ) ;
599-
600- let mut managed_envelope =
601- Managed :: from_envelope ( envelope, services. outcome_aggregator . clone ( ) ) ;
602-
603- if own_project
604- . check_envelope ( & mut managed_envelope)
605- . await
606- . is_err ( )
607- {
608- // Outcomes are emitted by `check_envelope`.
609- return Ok ( ( ) ) ;
610- } ;
611-
612- if managed_envelope. is_empty ( ) {
613- // Nothing left to process.
614- return Ok ( ( ) ) ;
615582 }
616583
617- services. envelope_processor . send ( ProcessEnvelope {
618- envelope : managed_envelope. into ( ) ,
619- project_info : own_project_info. clone ( ) ,
620- rate_limits : own_project. rate_limits ( ) . current_limits ( ) ,
621- sampling_project_info : sampling_project_info. clone ( ) ,
622- } ) ;
623-
624584 Ok ( ( ) )
625585 }
626586
@@ -747,6 +707,103 @@ impl Service for EnvelopeBufferService {
747707 }
748708}
749709
710+ /// Resolves the project and project information for an envelope about to be popped from the buffer.
711+ ///
712+ /// Resolves the own and sampling project information
713+ fn resolve_project (
714+ project_cache : & ProjectCacheHandle ,
715+ ProjectKeyPair {
716+ own_key,
717+ sampling_key,
718+ } : ProjectKeyPair ,
719+ ) -> ResolvedProject < ' _ > {
720+ static DUMMY_CONFIG : LazyLock < Arc < ProjectInfo > > = LazyLock :: new ( || {
721+ Arc :: new ( ProjectInfo {
722+ project_id : None ,
723+ last_change : None ,
724+ rev : Default :: default ( ) ,
725+ public_keys : Default :: default ( ) ,
726+ slug : None ,
727+ config : Default :: default ( ) ,
728+ organization_id : None ,
729+ upstream : None ,
730+ } )
731+ } ) ;
732+
733+ let own_project = project_cache. get ( own_key) ;
734+ let own_project_info = match own_project. state ( ) {
735+ ProjectState :: Enabled ( info) => info. clone ( ) ,
736+ ProjectState :: Dummy => {
737+ return ResolvedProject :: Enabled {
738+ own_project,
739+ // Since downstream requires a project config, we re-use this dummy config.
740+ //
741+ // This is how Relay historically always handled its proxy mode.
742+ // It would make sense to instead of passing down this dummy, making the project
743+ // config state here optional or similarly typed to the project state.
744+ own_project_info : Arc :: clone ( & DUMMY_CONFIG ) ,
745+ sampling_project_info : None ,
746+ } ;
747+ }
748+ ProjectState :: Disabled => return ResolvedProject :: Disabled ,
749+ ProjectState :: Pending => return ResolvedProject :: NotReady ( own_key) ,
750+ } ;
751+
752+ // If the projects are different, we load the project key of the sampling project. On the
753+ // other hand, if they are the same, we just reuse the own project.
754+ let sampling_project_info = match own_key == sampling_key {
755+ // For matching keys, we can re-use the existing config.
756+ true => Some ( own_project_info. clone ( ) ) ,
757+ // If the sampling project is distinct, we need also fetch that config.
758+ false => {
759+ match project_cache. get ( sampling_key) . state ( ) {
760+ ProjectState :: Enabled ( info) => {
761+ // The sampling project key must belong to the same organization as the own project key.
762+ //
763+ // Dynamic sampling does not work across organizations, we also want to have a clear separation
764+ // of data between organizations.
765+ ( info. organization_id == own_project_info. organization_id )
766+ . then ( || Arc :: clone ( info) )
767+ }
768+ ProjectState :: Dummy => {
769+ // This case should never happen, the own project info would already be dummy allowed.
770+ debug_assert ! ( false ) ;
771+ None
772+ }
773+ ProjectState :: Disabled => None ,
774+ ProjectState :: Pending => return ResolvedProject :: NotReady ( sampling_key) ,
775+ }
776+ }
777+ } ;
778+
779+ ResolvedProject :: Enabled {
780+ own_project,
781+ own_project_info,
782+ sampling_project_info,
783+ }
784+ }
785+
786+ /// State returned from [`resolve_project`].
787+ enum ResolvedProject < ' a > {
788+ /// The project is enabled and data for it can be processed.
789+ Enabled {
790+ /// The own project.
791+ own_project : Project < ' a > ,
792+ /// The own, enabled, project info.
793+ own_project_info : Arc < ProjectInfo > ,
794+ /// The sampling project info.
795+ ///
796+ /// May be `None` when the sampling project is disabled or from a different organization.
797+ sampling_project_info : Option < Arc < ProjectInfo > > ,
798+ } ,
799+ /// The project is disabled.
800+ Disabled ,
801+ /// The project information isn't ready.
802+ ///
803+ /// This may be returned for either the own project or the sampling project.
804+ NotReady ( ProjectKey ) ,
805+ }
806+
750807/// The spooler uses internal time based mechanics and to not make the tests actually wait
751808/// it's good to use `#[tokio::test(start_paused = true)]`. For memory based spooling, this will
752809/// just work.
0 commit comments