Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mq pallet] Custom next queue selectors #6059

Merged
merged 25 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
39d653d
wip
ggwpez Oct 14, 2024
7764f54
Add QueueNextSelector
ggwpez Oct 14, 2024
d24bac9
test
ggwpez Oct 17, 2024
f6b1817
Merge remote-tracking branch 'origin/master' into oty-queue-priority
ggwpez Oct 17, 2024
05d0442
benchmark
ggwpez Oct 17, 2024
22bca9a
cleanup
ggwpez Oct 17, 2024
1a70335
fix
ggwpez Oct 17, 2024
a0530a1
Merge remote-tracking branch 'origin/master' into oty-queue-priority
ggwpez Feb 12, 2025
551350f
add mocked weights
ggwpez Feb 12, 2025
b5f4865
Update from ggwpez running command 'prdoc --bump minor --audience run…
github-actions[bot] Feb 12, 2025
604a4ec
major prdoc, since weight trait was modified
ggwpez Feb 12, 2025
7eeefb7
Update from ggwpez running command 'bench --pallet pallet_message_queue'
github-actions[bot] Feb 12, 2025
99f67a5
more weight
ggwpez Feb 12, 2025
298bad0
more weights
ggwpez Feb 12, 2025
8b7cbf1
fml
ggwpez Feb 12, 2025
b106307
Merge branch 'master' into oty-queue-priority
ggwpez Feb 12, 2025
ae94788
Update from ggwpez running command 'bench --pallet pallet_message_queue'
github-actions[bot] Feb 12, 2025
cbb3082
Update from ggwpez running command 'bench --pallet pallet_message_queue'
github-actions[bot] Feb 12, 2025
610fc25
clippy
ggwpez Feb 12, 2025
4350733
Update from ggwpez running command 'prdoc --audience runtime_dev --bu…
github-actions[bot] Feb 12, 2025
47a059c
Apply suggestions from code review
ggwpez Feb 12, 2025
2e6a4eb
Apply suggestions from code review
ggwpez Feb 13, 2025
744e830
prdoc
ggwpez Feb 13, 2025
3cc0f8c
Merge branch 'master' into oty-queue-priority
ggwpez Feb 13, 2025
e86b1e7
Merge branch 'master' into oty-queue-priority
muharem Feb 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions substrate/frame/message-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ frame-support = { workspace = true }
frame-system = { workspace = true }

[dev-dependencies]
frame-support = { workspace = true, features = ["experimental"] }
sp-crypto-hashing = { workspace = true, default-features = true }
sp-tracing = { workspace = true, default-features = true }
rand = { workspace = true, default-features = true }
Expand Down
16 changes: 16 additions & 0 deletions substrate/frame/message-queue/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,22 @@ mod benchmarks {
assert_eq!(weight.consumed(), T::WeightInfo::bump_service_head());
}

// Worst case for calling `bump_service_head`.
#[benchmark]
fn force_set_head() {
setup_bump_service_head::<T>(0.into(), 1.into());
let mut weight = WeightMeter::new();
assert_eq!(ServiceHead::<T>::get().unwrap(), 0u32.into());

#[block]
{
assert!(MessageQueue::<T>::set_service_head(&mut weight, &1u32.into()).unwrap());
}

assert_eq!(ServiceHead::<T>::get().unwrap(), 1u32.into());
assert_eq!(weight.consumed(), T::WeightInfo::set_service_head());
}

#[benchmark]
fn reap_page() {
// Mock the storage to get a *cullable* but not *reapable* page.
Expand Down
215 changes: 212 additions & 3 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@

use crate::{
mock::{
build_and_execute, gen_seed, Callback, CountingMessageProcessor, IntoWeight,
build_and_execute, gen_seed, set_weight, Callback, CountingMessageProcessor, IntoWeight,
MessagesProcessed, MockedWeightInfo, NumMessagesProcessed, YieldingQueues,
},
mock_helpers::MessageOrigin,
mock_helpers::{MessageOrigin, MessageOrigin::Everywhere},
*,
};

Expand Down Expand Up @@ -68,14 +68,91 @@ impl Config for Test {
type WeightInfo = MockedWeightInfo;
type MessageProcessor = CountingMessageProcessor;
type Size = u32;
type QueueChangeHandler = ();
type QueueChangeHandler = AhmPrioritizer;
type QueuePausedQuery = ();
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
type IdleMaxServiceWeight = ();
}

/// The object that does the AHM message prioritization for us.
#[derive(Debug, Default, codec::Encode, codec::Decode)]
pub struct AhmPrioritizer {
streak_until: Option<u64>,
prioritized_queue: Option<MessageOriginOf<Test>>,
favorite_queue_num_messages: Option<u64>,
}

// The whole `AhmPrioritizer` could be part of the AHM controller pallet.
parameter_types! {
pub storage AhmPrioritizerStorage: AhmPrioritizer = AhmPrioritizer::default();
}

/// Instead of giving our prioritized queue only one block, we give it a streak of blocks.
const STREAK_LEN: u64 = 3;

impl OnQueueChanged<MessageOrigin> for AhmPrioritizer {
fn on_queue_changed(origin: MessageOrigin, f: QueueFootprint) {
let mut this = AhmPrioritizerStorage::get();

if this.prioritized_queue != Some(origin) {
return;
}

// Return early if this was an enqueue instead of a dequeue.
if this.favorite_queue_num_messages.map_or(false, |n| n <= f.storage.count) {
return;
}
this.favorite_queue_num_messages = Some(f.storage.count);

// only update when we are not already in a streak
if this.streak_until.map_or(false, |s| s < System::block_number()) {
this.streak_until = Some(System::block_number().saturating_add(STREAK_LEN));
}
}
}

impl AhmPrioritizer {
// This will need to be called by the migration controller.
fn on_initialize(now: u64) -> Weight {
let mut meter = WeightMeter::new();
let mut this = AhmPrioritizerStorage::get();

let Some(q) = this.prioritized_queue else {
return meter.consumed();
};
// init
if this.streak_until.is_none() {
this.streak_until = Some(0);
}
if this.favorite_queue_num_messages.is_none() {
this.favorite_queue_num_messages = Some(0);
}

// Our queue did not get a streak since 10 blocks. It must either be empty or starved:
if Pallet::<Test>::footprint(q).pages == 0 {
return meter.consumed();
}
if this.streak_until.map_or(false, |until| until < now.saturating_sub(10)) {
log::warn!("Queue is being starved, scheduling streak of {} blocks", STREAK_LEN);
this.streak_until = Some(now.saturating_add(STREAK_LEN));
}

if this.streak_until.map_or(false, |until| until > now) {
let _ = Pallet::<Test>::force_set_head(&mut meter, &q).defensive();
}

meter.consumed()
}
}

impl Drop for AhmPrioritizer {
fn drop(&mut self) {
AhmPrioritizerStorage::set(self);
}
}

/// Simulates heavy usage by enqueueing and processing large amounts of messages.
///
/// # Example output
Expand Down Expand Up @@ -122,6 +199,87 @@ fn stress_test_enqueue_and_service() {
});
}

/// Simulate heavy usage while calling `force_set_head` on random queues.
#[test]
#[ignore] // Only run in the CI, otherwise its too slow.
fn stress_test_force_set_head() {
let blocks = 20;
let max_queues = 10_000;
let max_messages_per_queue = 10_000;
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(gen_seed());

build_and_execute::<Test>(|| {
let mut msgs_remaining = 0;
for _ in 0..blocks {
// Start by enqueuing a large number of messages.
let enqueued =
enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng);
msgs_remaining += enqueued;

for _ in 0..10 {
let random_queue = rng.gen_range(0..=max_queues);
MessageQueue::force_set_head(&mut WeightMeter::new(), &Everywhere(random_queue))
.unwrap();
}

// Pick a fraction of all messages currently in queue and process them.
let processed = rng.gen_range(1..=msgs_remaining);
log::info!("Processing {} of all messages {}", processed, msgs_remaining);
process_some_messages(processed); // This also advances the block.
msgs_remaining -= processed;
}
log::info!("Processing all remaining {} messages", msgs_remaining);
process_all_messages(msgs_remaining);
post_conditions();
});
}

/// Check that our AHM prioritization does not affect liveness. This does not really check the AHM
/// prioritization works itself, but rather that it does not break things. The actual test is in
/// another test below.
#[test]
#[ignore] // Only run in the CI, otherwise its too slow.
fn stress_test_prioritize_queue() {
let blocks = 20;
let max_queues = 10_000;
let favorite_queue = Everywhere(9000);
let max_messages_per_queue = 1_000;
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(gen_seed());

build_and_execute::<Test>(|| {
let mut prio = AhmPrioritizerStorage::get();
prio.prioritized_queue = Some(favorite_queue);
drop(prio);

let mut msgs_remaining = 0;
for _ in 0..blocks {
// Start by enqueuing a large number of messages.
let enqueued =
enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng);
msgs_remaining += enqueued;
// ensure that our favorite queue always has some more messages
for _ in 0..200 {
MessageQueue::enqueue_message(
BoundedSlice::defensive_truncate_from("favorite".as_bytes()),
favorite_queue.clone(),
);
msgs_remaining += 1;
}

// Pick a fraction of all messages currently in queue and process them.
let processed = rng.gen_range(1..=100);
log::info!("Processing {} of all messages {}", processed, msgs_remaining);
process_some_messages(processed); // This also advances the block.
msgs_remaining -= processed;
}
log::info!("Processing all remaining {} messages", msgs_remaining);
process_all_messages(msgs_remaining);
post_conditions();
});
}

/// Very similar to `stress_test_enqueue_and_service`, but enqueues messages while processing them.
#[test]
#[ignore] // Only run in the CI, otherwise its too slow.
Expand Down Expand Up @@ -275,6 +433,55 @@ fn stress_test_queue_suspension() {
});
}

/// Test that our AHM prioritizer will ensure that our favorite queue always gets some dedicated
/// weight.
#[test]
#[ignore]
fn stress_test_ahm_despair_mode_works() {
build_and_execute::<Test>(|| {
let blocks = 200;
let queues = 200;

for o in 0..queues {
for i in 0..100 {
MessageQueue::enqueue_message(
BoundedSlice::defensive_truncate_from(format!("{}:{}", o, i).as_bytes()),
Everywhere(o),
);
}
}
set_weight("bump_head", Weight::from_parts(1, 1));

// Prioritize the last queue.
let mut prio = AhmPrioritizerStorage::get();
prio.prioritized_queue = Some(Everywhere(199));
drop(prio);

ServiceWeight::set(Some(Weight::from_parts(10, 10)));
for _ in 0..blocks {
next_block();
}

// Check that our favorite queue has processed the most messages.
let mut min = u64::MAX;
let mut min_origin = 0;

for o in 0..queues {
let fp = MessageQueue::footprint(Everywhere(o));
if fp.storage.count < min {
min = fp.storage.count;
min_origin = o;
}
}
assert_eq!(min_origin, 199);

// Process all remaining messages.
ServiceWeight::set(Some(Weight::MAX));
next_block();
post_conditions();
});
}

/// How many messages are in each queue.
fn msgs_per_queue() -> BTreeMap<u32, u32> {
let mut per_queue = BTreeMap::new();
Expand Down Expand Up @@ -353,10 +560,12 @@ fn process_all_messages(expected: u32) {

/// Returns the weight consumed by `MessageQueue::on_initialize()`.
fn next_block() -> Weight {
log::info!("Next block: {}", System::block_number() + 1);
MessageQueue::on_finalize(System::block_number());
System::on_finalize(System::block_number());
System::set_block_number(System::block_number() + 1);
System::on_initialize(System::block_number());
AhmPrioritizer::on_initialize(System::block_number());
MessageQueue::on_initialize(System::block_number())
}

Expand Down
38 changes: 35 additions & 3 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,17 @@ impl<Id> OnQueueChanged<Id> for () {
fn on_queue_changed(_: Id, _: QueueFootprint) {}
}

/// Allows to force the processing head to a specific queue.
pub trait ForceSetHead<O> {
/// Set the `ServiceHead` to `origin`.
///
/// This function:
/// - `Err`: Queue did not exist, not enough weight or other error.
/// - `Ok(true)`: The service head was updated.
/// - `Ok(false)`: The service head was not updated since the queue is empty.
fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
}

#[frame_support::pallet]
pub mod pallet {
use super::*;
Expand Down Expand Up @@ -626,16 +637,16 @@ pub mod pallet {

/// The index of the first and last (non-empty) pages.
#[pallet::storage]
pub(super) type BookStateFor<T: Config> =
pub type BookStateFor<T: Config> =
StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;

/// The origin at which we should begin servicing.
#[pallet::storage]
pub(super) type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;

/// The map of page indices to pages.
#[pallet::storage]
pub(super) type Pages<T: Config> = StorageDoubleMap<
pub type Pages<T: Config> = StorageDoubleMap<
_,
Twox64Concat,
MessageOriginOf<T>,
Expand Down Expand Up @@ -861,13 +872,28 @@ impl<T: Config> Pallet<T> {
ServiceHead::<T>::put(&head_neighbours.next);
Some(head)
} else {
defensive!("The head must point to a queue in the ready ring");
None
}
} else {
None
}
}

fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
return Err(())
}

// Ensure that we never set the head to an un-ready queue.
if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
ServiceHead::<T>::put(queue);
Ok(true)
} else {
Ok(false)
}
}

/// The maximal weight that a single message can consume.
///
/// Any message using more than this will be marked as permanently overweight and not
Expand Down Expand Up @@ -1575,6 +1601,12 @@ impl<T: Config> Pallet<T> {
}
}

impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pallet<T> {
fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
Pallet::<T>::set_service_head(weight, origin)
}
}

/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues.
pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
// Holds the singleton token instance.
Expand Down
Loading
Loading