From 1b0cd1e82163b357dcba89a55e8a9a01af22127d Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Tue, 29 Oct 2024 15:23:30 +0100 Subject: [PATCH] remove update handle as soon as listeners are gone --- crates/corro-agent/src/api/public/update.rs | 94 ++++++--------------- crates/corro-types/src/updates.rs | 14 +-- 2 files changed, 35 insertions(+), 73 deletions(-) diff --git a/crates/corro-agent/src/api/public/update.rs b/crates/corro-agent/src/api/public/update.rs index c34b6cef..51c9deba 100644 --- a/crates/corro-agent/src/api/public/update.rs +++ b/crates/corro-agent/src/api/public/update.rs @@ -21,7 +21,6 @@ use crate::api::public::pubsub::MatcherUpsertError; pub type UpdateBroadcastCache = HashMap>; pub type SharedUpdateBroadcastCache = Arc>; -const MAX_UNSUB_TIME: Duration = Duration::from_secs(120); // this should be a fraction of the MAX_UNSUB_TIME const RECEIVERS_CHECK_INTERVAL: Duration = Duration::from_secs(30); @@ -107,88 +106,51 @@ pub async fn process_update_channel( ) { let mut buf = BytesMut::new(); - let mut deadline = if tx.receiver_count() == 0 { - Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME))) - } else { - None - }; - - // even if there are no more subscribers + // interval check for receivers // useful for queries that don't change often so we can cleanup... let mut subs_check = tokio::time::interval(RECEIVERS_CHECK_INTERVAL); loop { - let deadline_check = async { - if let Some(sleep) = deadline.as_mut() { - sleep.await - } else { - futures::future::pending().await - } - }; - - let notify_evt = tokio::select! { + tokio::select! { biased; - Some(query_evt) = evt_rx.recv() => query_evt, - _ = deadline_check => { - if tx.receiver_count() == 0 { - info!(sub_id = %id, "All listeners for subscription are gone and didn't come back within {MAX_UNSUB_TIME:?}"); - break; - } - - // reset the deadline if there are receivers! - deadline = None; - continue; + Some(query_evt) = evt_rx.recv() => { + match make_query_event_bytes(&mut buf, &query_evt) { + Ok(b) => { + if tx.send(b).is_err() { + break; + } + }, + Err(e) => { + match make_query_event_bytes(&mut buf, &NotifyEvent::Error(e.to_compact_string())) { + Ok(b) => { + let _ = tx.send(b); + } + Err(e) => { + warn!(update_id = %id, "failed to send error in update channel: {e}"); + } + } + break; + } + }; }, _ = subs_check.tick() => { if tx.receiver_count() == 0 { - if deadline.is_none() { - deadline = Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME))); - } - } else { - deadline = None; + break; }; - continue; }, - else => { - break; - } }; - - let is_still_active = match make_query_event_bytes(&mut buf, ¬ify_evt) { - Ok(b) => tx.send(b).is_ok(), - Err(e) => { - match make_query_event_bytes(&mut buf, &NotifyEvent::Error(e.to_compact_string())) { - Ok(b) => { - let _ = tx.send(b); - } - Err(e) => { - warn!(update_id = %id, "error sending error: {e}"); - } - } - break; - } - }; - - if is_still_active { - deadline = None; - } else { - debug!(sub_id = %id, "no active listeners to receive subscription event: {notify_evt:?}"); - if deadline.is_none() { - deadline = Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME))); - } - } } - warn!(sub_id = %id, "subscription query channel done"); + warn!(sub_id = %id, "updates channel done"); // remove and get handle from the agent's "matchers" let handle = match updates.remove(&id) { Some(h) => { - info!(update_id = %id, "Removed update from process_update_channel"); + info!(update_id = %id, "Removed update handle from process_update_channel"); h } None => { - warn!(update_id = %id, "subscription handle was already gone. odd!"); + warn!(update_id = %id, "update handle was already gone. odd!"); return; } }; @@ -234,7 +196,7 @@ async fn forward_update_bytes_to_body_sender( buf.extend_from_slice(&event_buf); if buf.len() >= 64 * 1024 { if let Err(e) = tx.send_data(buf.split().freeze()).await { - warn!(update_id = %update.id(), "could not forward subscription query event to receiver: {e}"); + warn!(update_id = %update.id(), "could not forward update query event to receiver: {e}"); return; } }; @@ -265,8 +227,8 @@ async fn forward_update_bytes_to_body_sender( } }, _ = update.cancelled() => { - // info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber"); - // return; + info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber"); + return; }, _ = &mut tripwire => { break; diff --git a/crates/corro-types/src/updates.rs b/crates/corro-types/src/updates.rs index 285c52e7..59daca07 100644 --- a/crates/corro-types/src/updates.rs +++ b/crates/corro-types/src/updates.rs @@ -430,9 +430,9 @@ where } // metrics... - for (table, pks) in candidates.clone() { + for (table, pks) in candidates.iter() { handle - .get_counter(&table) + .get_counter(table) .matched_count .increment(pks.len() as u64); } @@ -441,14 +441,14 @@ where if let Err(e) = handle .changes_tx() - .try_send((candidates.clone(), db_version)) + .try_send((candidates, db_version)) { error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}"); match e { mpsc::error::TrySendError::Full(item) => { warn!("channel is full, falling back to async send"); - let changes_tx = handle.changes_tx().clone(); + let changes_tx = handle.changes_tx(); tokio::spawn(async move { _ = changes_tx.send(item).await; }); @@ -521,11 +521,11 @@ where // metrics... for (id, (candidates, handle)) in candidates { let mut match_count = 0; - for (table, pks) in candidates.clone() { + for (table, pks) in candidates.iter() { let count = pks.len(); match_count += count; handle - .get_counter(&table) + .get_counter(table) .matched_count .increment(pks.len() as u64); } @@ -534,7 +534,7 @@ where if let Err(e) = handle .changes_tx() - .try_send((candidates.clone(), db_version)) + .try_send((candidates, db_version)) { error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}"); match e {