Skip to content

Commit d48994a

Browse files
slptylerfanelli
authored andcommitted
Replace mpsc with crossbeam-channel
On Apple Silicon, we were hitting a concurrency error that manifest with recv() panicking like this: thread '<unnamed>' panicked at 'internal error: entered unreachable code' Seems to be a known issue rust-lang/rust#39364 Follow the recommendation on the thread, and switch from mpsc (which seems its going to be retired) to crossbeam-channel. Signed-off-by: Sergio Lopez <[email protected]>
1 parent 2e4a8a4 commit d48994a

File tree

10 files changed

+49
-24
lines changed

10 files changed

+49
-24
lines changed

Cargo.lock

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/devices/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ amd-sev = []
99

1010
[dependencies]
1111
bitflags = "1.2.0"
12+
crossbeam-channel = "0.5"
1213
env_logger = "0.9.0"
1314
libc = ">=0.2.39"
1415
log = "0.4.0"

src/devices/src/legacy/gic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
// Copyright 2021 Red Hat, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use crossbeam_channel::Sender;
45
use std::collections::btree_map::Entry;
56
use std::collections::{BTreeMap, VecDeque};
67
use std::convert::TryInto;
7-
use std::sync::mpsc::Sender;
88

99
use arch::aarch64::gicv2::GICv2;
1010
use arch::aarch64::layout::GTIMER_VIRT;

src/devices/src/virtio/vsock/muxer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::collections::HashMap;
22
use std::os::unix::io::RawFd;
33
use std::sync::atomic::{AtomicUsize, Ordering};
4-
use std::sync::mpsc::{channel, Sender};
54
use std::sync::{Arc, Mutex, RwLock};
65

76
use super::super::super::legacy::Gic;
@@ -17,6 +16,7 @@ use super::reaper::ReaperThread;
1716
use super::tcp::TcpProxy;
1817
use super::udp::UdpProxy;
1918
use super::VsockError;
19+
use crossbeam_channel::{unbounded, Sender};
2020
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
2121
use utils::eventfd::EventFd;
2222
use vm_memory::GuestMemoryMmap;
@@ -146,7 +146,7 @@ impl VsockMuxer {
146146
self.intc = intc.clone();
147147
self.irq_line = irq_line;
148148

149-
let (sender, receiver) = channel();
149+
let (sender, receiver) = unbounded();
150150

151151
let thread = MuxerThread::new(
152152
self.cid,

src/devices/src/virtio/vsock/muxer_thread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::os::unix::io::RawFd;
22
use std::sync::atomic::{AtomicUsize, Ordering};
3-
use std::sync::mpsc::Sender;
43
use std::sync::{Arc, Mutex};
54
use std::thread;
65

@@ -12,6 +11,7 @@ use super::muxer_rxq::MuxerRxQ;
1211
use super::proxy::{ProxyRemoval, ProxyUpdate};
1312
use super::tcp::TcpProxy;
1413

14+
use crossbeam_channel::Sender;
1515
use rand::{rngs::ThreadRng, thread_rng, Rng};
1616
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
1717
use utils::eventfd::EventFd;

src/devices/src/virtio/vsock/reaper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::collections::HashMap;
2-
use std::sync::mpsc::Receiver;
32
use std::sync::{Arc, Mutex, RwLock};
43
use std::thread;
54
use std::time::{Duration, Instant};
65

76
use super::proxy::Proxy;
7+
use crossbeam_channel::Receiver;
88

99
pub type ProxyMap = Arc<RwLock<HashMap<u64, Mutex<Box<dyn Proxy>>>>>;
1010
const TIMEOUT: Duration = Duration::new(5, 0);

src/vmm/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = "2021"
88
amd-sev = [ "codicon", "kbs-types", "procfs", "serde", "serde_json", "sev", "curl" ]
99

1010
[dependencies]
11+
crossbeam-channel = "0.5"
1112
env_logger = "0.9.0"
1213
libc = ">=0.2.39"
1314
log = "0.4.0"

src/vmm/src/builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33

44
//! Enables pre-boot setup, instantiation and booting of a Firecracker VMM.
55
6+
#[cfg(target_os = "macos")]
7+
use crossbeam_channel::unbounded;
68
use std::fmt::{Display, Formatter};
79
use std::io;
810
use std::os::unix::io::{AsRawFd, RawFd};
9-
#[cfg(target_os = "macos")]
10-
use std::sync::mpsc::channel;
1111
use std::sync::{Arc, Mutex};
1212

1313
use super::{Error, Vmm};
@@ -930,7 +930,7 @@ fn create_vcpus_aarch64(
930930

931931
for cpu_index in 0..vcpu_config.vcpu_count {
932932
let boot_receiver = if cpu_index != 0 {
933-
let (boot_sender, boot_receiver) = channel();
933+
let (boot_sender, boot_receiver) = unbounded();
934934
boot_senders.push(boot_sender);
935935
Some(boot_receiver)
936936
} else {

src/vmm/src/linux/vstate.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
// Use of this source code is governed by a BSD-style license that can be
66
// found in the THIRD-PARTY file.
77

8+
use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError};
89
use libc::{c_int, c_void, siginfo_t};
910
use std::cell::Cell;
1011
use std::fmt::{Display, Formatter};
1112
use std::io;
1213
use std::result;
1314
use std::sync::atomic::{fence, Ordering};
14-
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
1515
#[cfg(not(test))]
1616
use std::sync::Barrier;
1717
use std::thread;
@@ -815,8 +815,8 @@ impl Vcpu {
815815
create_ts: TimestampUs,
816816
) -> Result<Self> {
817817
let kvm_vcpu = vm_fd.create_vcpu(id as u64).map_err(Error::VcpuFd)?;
818-
let (event_sender, event_receiver) = channel();
819-
let (response_sender, response_receiver) = channel();
818+
let (event_sender, event_receiver) = unbounded();
819+
let (response_sender, response_receiver) = unbounded();
820820

821821
// Initially the cpuid per vCPU is the one supported by this VM.
822822
Ok(Vcpu {
@@ -851,8 +851,8 @@ impl Vcpu {
851851
create_ts: TimestampUs,
852852
) -> Result<Self> {
853853
let kvm_vcpu = vm_fd.create_vcpu(id as u64).map_err(Error::VcpuFd)?;
854-
let (event_sender, event_receiver) = channel();
855-
let (response_sender, response_receiver) = channel();
854+
let (event_sender, event_receiver) = unbounded();
855+
let (response_sender, response_receiver) = unbounded();
856856

857857
Ok(Vcpu {
858858
fd: kvm_vcpu,
@@ -973,7 +973,7 @@ impl Vcpu {
973973
pub fn start_threaded(mut self) -> Result<VcpuHandle> {
974974
let event_sender = self.event_sender.take().unwrap();
975975
let response_receiver = self.response_receiver.take().unwrap();
976-
let (init_tls_sender, init_tls_receiver) = channel();
976+
let (init_tls_sender, init_tls_receiver) = unbounded();
977977
let vcpu_thread = thread::Builder::new()
978978
.name(format!("fc_vcpu {}", self.cpu_index()))
979979
.spawn(move || {
@@ -1420,6 +1420,7 @@ enum VcpuEmulation {
14201420

14211421
#[cfg(test)]
14221422
mod tests {
1423+
use crossbeam_channel::unbounded;
14231424
use std::fs::File;
14241425
use std::sync::{Arc, Barrier};
14251426

@@ -1434,7 +1435,7 @@ mod tests {
14341435
// Make sure the Vcpu is out of KVM_RUN.
14351436
self.send_event(VcpuEvent::Pause).unwrap();
14361437
// Close the original channel so that the Vcpu thread errors and goes to exit state.
1437-
let (event_sender, _event_receiver) = channel();
1438+
let (event_sender, _event_receiver) = unbounded();
14381439
self.event_sender = event_sender;
14391440
// Wait for the Vcpu thread to finish execution
14401441
self.vcpu_thread.take().unwrap().join().unwrap();

src/vmm/src/macos/vstate.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::cell::Cell;
99
use std::fmt::{Display, Formatter};
1010
use std::io;
1111
use std::result;
12-
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
1312
#[cfg(not(test))]
1413
use std::sync::{Arc, Mutex};
1514
use std::thread;
@@ -21,6 +20,7 @@ use crate::vmm_config::machine_config::CpuFeaturesTemplate;
2120

2221
use arch;
2322
use arch::aarch64::gic::GICDevice;
23+
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
2424
use devices::legacy::Gic;
2525
use hvf::{HvfVcpu, HvfVm, VcpuExit};
2626
use utils::eventfd::EventFd;
@@ -267,8 +267,8 @@ impl Vcpu {
267267
_create_ts: TimestampUs,
268268
intc: Arc<Mutex<Gic>>,
269269
) -> Result<Self> {
270-
let (event_sender, event_receiver) = channel();
271-
let (response_sender, response_receiver) = channel();
270+
let (event_sender, event_receiver) = unbounded();
271+
let (response_sender, response_receiver) = unbounded();
272272

273273
Ok(Vcpu {
274274
id,
@@ -325,7 +325,7 @@ impl Vcpu {
325325
pub fn start_threaded(mut self) -> Result<VcpuHandle> {
326326
let event_sender = self.event_sender.take().unwrap();
327327
let response_receiver = self.response_receiver.take().unwrap();
328-
let (init_tls_sender, init_tls_receiver) = channel();
328+
let (init_tls_sender, init_tls_receiver) = unbounded();
329329

330330
let vcpu_thread = thread::Builder::new()
331331
.name(format!("fc_vcpu {}", self.cpu_index()))
@@ -441,7 +441,7 @@ impl Vcpu {
441441
let mut hvf_vcpu = HvfVcpu::new().expect("Can't create HVF vCPU");
442442
let hvf_vcpuid = hvf_vcpu.id();
443443

444-
let (wfe_sender, wfe_receiver) = channel();
444+
let (wfe_sender, wfe_receiver) = unbounded();
445445
self.intc
446446
.lock()
447447
.unwrap()
@@ -599,11 +599,11 @@ enum VcpuEmulation {
599599

600600
#[cfg(test)]
601601
mod tests {
602+
#[cfg(target_arch = "x86_64")]
603+
use crossbeam_channel::{unbounded, RecvTimeoutError};
602604
use std::fs::File;
603605
#[cfg(target_arch = "x86_64")]
604606
use std::os::unix::io::AsRawFd;
605-
#[cfg(target_arch = "x86_64")]
606-
use std::sync::mpsc;
607607
use std::sync::{Arc, Barrier};
608608
#[cfg(target_arch = "x86_64")]
609609
use std::time::Duration;
@@ -619,7 +619,7 @@ mod tests {
619619
// Make sure the Vcpu is out of KVM_RUN.
620620
self.send_event(VcpuEvent::Pause).unwrap();
621621
// Close the original channel so that the Vcpu thread errors and goes to exit state.
622-
let (event_sender, _event_receiver) = channel();
622+
let (event_sender, _event_receiver) = unbounded();
623623
self.event_sender = event_sender;
624624
// Wait for the Vcpu thread to finish execution
625625
self.vcpu_thread.take().unwrap().join().unwrap();
@@ -933,7 +933,7 @@ mod tests {
933933
handle
934934
.response_receiver()
935935
.recv_timeout(Duration::from_millis(100)),
936-
Err(mpsc::RecvTimeoutError::Timeout)
936+
Err(RecvTimeoutError::Timeout)
937937
);
938938
}
939939

0 commit comments

Comments
 (0)