Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Try improving relay_datagram_send_channel() #3118

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
task::{Context, Poll},
task::{Context, Poll, Waker},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -1548,7 +1548,7 @@ impl Handle {

let (actor_sender, actor_receiver) = mpsc::channel(256);
let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256);
let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_sender();
let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel();
let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256);

// load the node data
Expand Down Expand Up @@ -1743,27 +1743,17 @@ enum DiscoBoxError {
///
/// These includes the waker coordination required to support [`AsyncUdpSocket::try_send`]
/// and [`quinn::UdpPoller::poll_writable`].
///
/// Note that this implementation has several bugs in them, but they have existed for rather
/// a while:
///
/// - There can be multiple senders, which all have to be woken if they were blocked. But
/// only the last sender to install the waker is unblocked.
///
/// - poll_writable may return blocking when it doesn't need to. Leaving the sender stuck
/// until another recv is called (which hopefully would happen soon given that the channel
/// is probably still rather full, but still).
fn relay_datagram_sender() -> (
fn relay_datagram_send_channel() -> (
RelayDatagramSendChannelSender,
RelayDatagramSendChannelReceiver,
) {
let (sender, receiver) = mpsc::channel(256);
let waker = Arc::new(AtomicWaker::new());
let wakers = Arc::new(std::sync::Mutex::new(Vec::new()));
let tx = RelayDatagramSendChannelSender {
sender,
waker: waker.clone(),
wakers: wakers.clone(),
};
let rx = RelayDatagramSendChannelReceiver { receiver, waker };
let rx = RelayDatagramSendChannelReceiver { receiver, wakers };
(tx, rx)
}

Expand All @@ -1774,7 +1764,7 @@ fn relay_datagram_sender() -> (
#[derive(Debug, Clone)]
struct RelayDatagramSendChannelSender {
sender: mpsc::Sender<RelaySendItem>,
waker: Arc<AtomicWaker>,
wakers: Arc<std::sync::Mutex<Vec<Waker>>>,
}

impl RelayDatagramSendChannelSender {
Expand All @@ -1788,8 +1778,18 @@ impl RelayDatagramSendChannelSender {
fn poll_writable(&self, cx: &mut Context) -> Poll<io::Result<()>> {
match self.sender.capacity() {
0 => {
self.waker.register(cx.waker());
Poll::Pending
let mut wakers = self.wakers.lock().expect("poisoned");
if !wakers.iter().any(|waker| waker.will_wake(cx.waker())) {
wakers.push(cx.waker().clone());
}
drop(wakers);
if self.sender.capacity() != 0 {
// We "risk" a spurious wake-up in this case, but rather that
// than potentially skipping a receive.
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
_ => Poll::Ready(Ok(())),
}
Expand All @@ -1803,13 +1803,14 @@ impl RelayDatagramSendChannelSender {
#[derive(Debug)]
struct RelayDatagramSendChannelReceiver {
receiver: mpsc::Receiver<RelaySendItem>,
waker: Arc<AtomicWaker>,
wakers: Arc<std::sync::Mutex<Vec<Waker>>>,
}

impl RelayDatagramSendChannelReceiver {
async fn recv(&mut self) -> Option<RelaySendItem> {
let item = self.receiver.recv().await;
self.waker.wake();
let mut wakers = self.wakers.lock().expect("poisoned");
wakers.drain(..).for_each(Waker::wake);
item
}
}
Expand Down
Loading