miri/concurrency/
sync.rs

1use std::cell::RefCell;
2use std::collections::VecDeque;
3use std::collections::hash_map::Entry;
4use std::default::Default;
5use std::ops::Not;
6use std::rc::Rc;
7use std::time::Duration;
8
9use rustc_abi::Size;
10use rustc_data_structures::fx::FxHashMap;
11use rustc_index::{Idx, IndexVec};
12
13use super::init_once::InitOnce;
14use super::vector_clock::VClock;
15use crate::*;
16
17/// We cannot use the `newtype_index!` macro because we have to use 0 as a
18/// sentinel value meaning that the identifier is not assigned. This is because
19/// the pthreads static initializers initialize memory with zeros (see the
20/// `src/shims/sync.rs` file).
21macro_rules! declare_id {
22    ($name: ident) => {
23        /// 0 is used to indicate that the id was not yet assigned and,
24        /// therefore, is not a valid identifier.
25        #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
26        pub struct $name(std::num::NonZero<u32>);
27
28        impl $crate::VisitProvenance for $name {
29            fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
30        }
31
32        impl Idx for $name {
33            fn new(idx: usize) -> Self {
34                // We use 0 as a sentinel value (see the comment above) and,
35                // therefore, need to shift by one when converting from an index
36                // into a vector.
37                let shifted_idx = u32::try_from(idx).unwrap().strict_add(1);
38                $name(std::num::NonZero::new(shifted_idx).unwrap())
39            }
40            fn index(self) -> usize {
41                // See the comment in `Self::new`.
42                // (This cannot underflow because `self.0` is `NonZero<u32>`.)
43                usize::try_from(self.0.get() - 1).unwrap()
44            }
45        }
46    };
47}
48pub(super) use declare_id;
49
50/// The mutex state.
51#[derive(Default, Debug)]
52struct Mutex {
53    /// The thread that currently owns the lock.
54    owner: Option<ThreadId>,
55    /// How many times the mutex was locked by the owner.
56    lock_count: usize,
57    /// The queue of threads waiting for this mutex.
58    queue: VecDeque<ThreadId>,
59    /// Mutex clock. This tracks the moment of the last unlock.
60    clock: VClock,
61}
62
63#[derive(Default, Clone, Debug)]
64pub struct MutexRef(Rc<RefCell<Mutex>>);
65
66impl MutexRef {
67    fn new() -> Self {
68        MutexRef(Rc::new(RefCell::new(Mutex::default())))
69    }
70
71    /// Get the id of the thread that currently owns this lock, or `None` if it is not locked.
72    pub fn owner(&self) -> Option<ThreadId> {
73        self.0.borrow().owner
74    }
75}
76
77impl VisitProvenance for MutexRef {
78    fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
79        // Mutex contains no provenance.
80    }
81}
82
83/// The read-write lock state.
84#[derive(Default, Debug)]
85struct RwLock {
86    /// The writer thread that currently owns the lock.
87    writer: Option<ThreadId>,
88    /// The readers that currently own the lock and how many times they acquired
89    /// the lock.
90    readers: FxHashMap<ThreadId, usize>,
91    /// The queue of writer threads waiting for this lock.
92    writer_queue: VecDeque<ThreadId>,
93    /// The queue of reader threads waiting for this lock.
94    reader_queue: VecDeque<ThreadId>,
95    /// Data race clock for writers. Tracks the happens-before
96    /// ordering between each write access to a rwlock and is updated
97    /// after a sequence of concurrent readers to track the happens-
98    /// before ordering between the set of previous readers and
99    /// the current writer.
100    /// Contains the clock of the last thread to release a writer
101    /// lock or the joined clock of the set of last threads to release
102    /// shared reader locks.
103    clock_unlocked: VClock,
104    /// Data race clock for readers. This is temporary storage
105    /// for the combined happens-before ordering for between all
106    /// concurrent readers and the next writer, and the value
107    /// is stored to the main data_race variable once all
108    /// readers are finished.
109    /// Has to be stored separately since reader lock acquires
110    /// must load the clock of the last write and must not
111    /// add happens-before orderings between shared reader
112    /// locks.
113    /// This is only relevant when there is an active reader.
114    clock_current_readers: VClock,
115}
116
117impl RwLock {
118    #[inline]
119    /// Check if locked.
120    fn is_locked(&self) -> bool {
121        trace!(
122            "rwlock_is_locked: writer is {:?} and there are {} reader threads (some of which could hold multiple read locks)",
123            self.writer,
124            self.readers.len(),
125        );
126        self.writer.is_some() || self.readers.is_empty().not()
127    }
128
129    /// Check if write locked.
130    #[inline]
131    fn is_write_locked(&self) -> bool {
132        trace!("rwlock_is_write_locked: writer is {:?}", self.writer);
133        self.writer.is_some()
134    }
135}
136
137#[derive(Default, Clone, Debug)]
138pub struct RwLockRef(Rc<RefCell<RwLock>>);
139
140impl RwLockRef {
141    fn new() -> Self {
142        RwLockRef(Rc::new(RefCell::new(RwLock::default())))
143    }
144
145    pub fn is_locked(&self) -> bool {
146        self.0.borrow().is_locked()
147    }
148
149    pub fn is_write_locked(&self) -> bool {
150        self.0.borrow().is_write_locked()
151    }
152}
153
154impl VisitProvenance for RwLockRef {
155    fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
156        // RwLockRef contains no provenance.
157    }
158}
159
160declare_id!(CondvarId);
161
162/// The conditional variable state.
163#[derive(Default, Debug)]
164struct Condvar {
165    waiters: VecDeque<ThreadId>,
166    /// Tracks the happens-before relationship
167    /// between a cond-var signal and a cond-var
168    /// wait during a non-spurious signal event.
169    /// Contains the clock of the last thread to
170    /// perform a condvar-signal.
171    clock: VClock,
172}
173
174/// The futex state.
175#[derive(Default, Debug)]
176struct Futex {
177    waiters: Vec<FutexWaiter>,
178    /// Tracks the happens-before relationship
179    /// between a futex-wake and a futex-wait
180    /// during a non-spurious wake event.
181    /// Contains the clock of the last thread to
182    /// perform a futex-wake.
183    clock: VClock,
184}
185
186#[derive(Default, Clone)]
187pub struct FutexRef(Rc<RefCell<Futex>>);
188
189impl FutexRef {
190    pub fn waiters(&self) -> usize {
191        self.0.borrow().waiters.len()
192    }
193}
194
195impl VisitProvenance for FutexRef {
196    fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
197        // No provenance in `Futex`.
198    }
199}
200
201/// A thread waiting on a futex.
202#[derive(Debug)]
203struct FutexWaiter {
204    /// The thread that is waiting on this futex.
205    thread: ThreadId,
206    /// The bitset used by FUTEX_*_BITSET, or u32::MAX for other operations.
207    bitset: u32,
208}
209
210/// The state of all synchronization objects.
211#[derive(Default, Debug)]
212pub struct SynchronizationObjects {
213    condvars: IndexVec<CondvarId, Condvar>,
214    pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
215}
216
217// Private extension trait for local helper methods
218impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {}
219pub(super) trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
220    fn condvar_reacquire_mutex(
221        &mut self,
222        mutex_ref: MutexRef,
223        retval: Scalar,
224        dest: MPlaceTy<'tcx>,
225    ) -> InterpResult<'tcx> {
226        let this = self.eval_context_mut();
227        if let Some(owner) = mutex_ref.owner() {
228            assert_ne!(owner, this.active_thread());
229            this.mutex_enqueue_and_block(mutex_ref, Some((retval, dest)));
230        } else {
231            // We can have it right now!
232            this.mutex_lock(&mutex_ref);
233            // Don't forget to write the return value.
234            this.write_scalar(retval, &dest)?;
235        }
236        interp_ok(())
237    }
238}
239
240impl SynchronizationObjects {
241    pub fn mutex_create(&mut self) -> MutexRef {
242        MutexRef::new()
243    }
244    pub fn rwlock_create(&mut self) -> RwLockRef {
245        RwLockRef::new()
246    }
247
248    pub fn condvar_create(&mut self) -> CondvarId {
249        self.condvars.push(Default::default())
250    }
251
252    pub fn init_once_create(&mut self) -> InitOnceId {
253        self.init_onces.push(Default::default())
254    }
255}
256
257impl<'tcx> AllocExtra<'tcx> {
258    fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
259        self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>())
260    }
261}
262
263/// We designate an `init`` field in all primitives.
264/// If `init` is set to this, we consider the primitive initialized.
265pub const LAZY_INIT_COOKIE: u32 = 0xcafe_affe;
266
267// Public interface to synchronization primitives. Please note that in most
268// cases, the function calls are infallible and it is the client's (shim
269// implementation's) responsibility to detect and deal with erroneous
270// situations.
271impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
272pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
273    /// Helper for lazily initialized `alloc_extra.sync` data:
274    /// this forces an immediate init.
275    /// Return a reference to the data in the machine state.
276    fn lazy_sync_init<'a, T: 'static>(
277        &'a mut self,
278        primitive: &MPlaceTy<'tcx>,
279        init_offset: Size,
280        data: T,
281    ) -> InterpResult<'tcx, &'a T>
282    where
283        'tcx: 'a,
284    {
285        let this = self.eval_context_mut();
286
287        let (alloc, offset, _) = this.ptr_get_alloc_id(primitive.ptr(), 0)?;
288        let (alloc_extra, _machine) = this.get_alloc_extra_mut(alloc)?;
289        alloc_extra.sync.insert(offset, Box::new(data));
290        // Mark this as "initialized".
291        let init_field = primitive.offset(init_offset, this.machine.layouts.u32, this)?;
292        this.write_scalar_atomic(
293            Scalar::from_u32(LAZY_INIT_COOKIE),
294            &init_field,
295            AtomicWriteOrd::Relaxed,
296        )?;
297        interp_ok(this.get_alloc_extra(alloc)?.get_sync::<T>(offset).unwrap())
298    }
299
300    /// Helper for lazily initialized `alloc_extra.sync` data:
301    /// Checks if the primitive is initialized:
302    /// - If yes, fetches the data from `alloc_extra.sync`, or calls `missing_data` if that fails
303    ///   and stores that in `alloc_extra.sync`.
304    /// - Otherwise, calls `new_data` to initialize the primitive.
305    ///
306    /// Return a reference to the data in the machine state.
307    fn lazy_sync_get_data<'a, T: 'static>(
308        &'a mut self,
309        primitive: &MPlaceTy<'tcx>,
310        init_offset: Size,
311        missing_data: impl FnOnce() -> InterpResult<'tcx, T>,
312        new_data: impl FnOnce(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, T>,
313    ) -> InterpResult<'tcx, &'a T>
314    where
315        'tcx: 'a,
316    {
317        let this = self.eval_context_mut();
318
319        // Check if this is already initialized. Needs to be atomic because we can race with another
320        // thread initializing. Needs to be an RMW operation to ensure we read the *latest* value.
321        // So we just try to replace MUTEX_INIT_COOKIE with itself.
322        let init_cookie = Scalar::from_u32(LAZY_INIT_COOKIE);
323        let init_field = primitive.offset(init_offset, this.machine.layouts.u32, this)?;
324        let (_init, success) = this
325            .atomic_compare_exchange_scalar(
326                &init_field,
327                &ImmTy::from_scalar(init_cookie, this.machine.layouts.u32),
328                init_cookie,
329                AtomicRwOrd::Relaxed,
330                AtomicReadOrd::Relaxed,
331                /* can_fail_spuriously */ false,
332            )?
333            .to_scalar_pair();
334
335        if success.to_bool()? {
336            // If it is initialized, it must be found in the "sync primitive" table,
337            // or else it has been moved illegally.
338            let (alloc, offset, _) = this.ptr_get_alloc_id(primitive.ptr(), 0)?;
339            let (alloc_extra, _machine) = this.get_alloc_extra_mut(alloc)?;
340            // Due to borrow checker reasons, we have to do the lookup twice.
341            if alloc_extra.get_sync::<T>(offset).is_none() {
342                let data = missing_data()?;
343                alloc_extra.sync.insert(offset, Box::new(data));
344            }
345            interp_ok(alloc_extra.get_sync::<T>(offset).unwrap())
346        } else {
347            let data = new_data(this)?;
348            this.lazy_sync_init(primitive, init_offset, data)
349        }
350    }
351
352    /// Get the synchronization primitive associated with the given pointer,
353    /// or initialize a new one.
354    ///
355    /// Return `None` if this pointer does not point to at least 1 byte of mutable memory.
356    fn get_sync_or_init<'a, T: 'static>(
357        &'a mut self,
358        ptr: Pointer,
359        new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> T,
360    ) -> Option<&'a T>
361    where
362        'tcx: 'a,
363    {
364        let this = self.eval_context_mut();
365        if !this.ptr_try_get_alloc_id(ptr, 0).ok().is_some_and(|(alloc_id, offset, ..)| {
366            let info = this.get_alloc_info(alloc_id);
367            info.kind == AllocKind::LiveData && info.mutbl.is_mut() && offset < info.size
368        }) {
369            return None;
370        }
371        // This cannot fail now.
372        let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0).unwrap();
373        let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc).unwrap();
374        // Due to borrow checker reasons, we have to do the lookup twice.
375        if alloc_extra.get_sync::<T>(offset).is_none() {
376            let new = new(machine);
377            alloc_extra.sync.insert(offset, Box::new(new));
378        }
379        Some(alloc_extra.get_sync::<T>(offset).unwrap())
380    }
381
382    /// Lock by setting the mutex owner and increasing the lock count.
383    fn mutex_lock(&mut self, mutex_ref: &MutexRef) {
384        let this = self.eval_context_mut();
385        let thread = this.active_thread();
386        let mut mutex = mutex_ref.0.borrow_mut();
387        if let Some(current_owner) = mutex.owner {
388            assert_eq!(thread, current_owner, "mutex already locked by another thread");
389            assert!(
390                mutex.lock_count > 0,
391                "invariant violation: lock_count == 0 iff the thread is unlocked"
392            );
393        } else {
394            mutex.owner = Some(thread);
395        }
396        mutex.lock_count = mutex.lock_count.strict_add(1);
397        if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
398            data_race.acquire_clock(&mutex.clock, &this.machine.threads);
399        }
400    }
401
402    /// Try unlocking by decreasing the lock count and returning the old lock
403    /// count. If the lock count reaches 0, release the lock and potentially
404    /// give to a new owner. If the lock was not locked by the current thread,
405    /// return `None`.
406    fn mutex_unlock(&mut self, mutex_ref: &MutexRef) -> InterpResult<'tcx, Option<usize>> {
407        let this = self.eval_context_mut();
408        let mut mutex = mutex_ref.0.borrow_mut();
409        interp_ok(if let Some(current_owner) = mutex.owner {
410            // Mutex is locked.
411            if current_owner != this.machine.threads.active_thread() {
412                // Only the owner can unlock the mutex.
413                return interp_ok(None);
414            }
415            let old_lock_count = mutex.lock_count;
416            mutex.lock_count = old_lock_count.strict_sub(1);
417            if mutex.lock_count == 0 {
418                mutex.owner = None;
419                // The mutex is completely unlocked. Try transferring ownership
420                // to another thread.
421
422                if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
423                    data_race.release_clock(&this.machine.threads, |clock| {
424                        mutex.clock.clone_from(clock)
425                    });
426                }
427                let thread_id = mutex.queue.pop_front();
428                // We need to drop our mutex borrow before unblock_thread
429                // because it will be borrowed again in the unblock callback.
430                drop(mutex);
431                if thread_id.is_some() {
432                    this.unblock_thread(thread_id.unwrap(), BlockReason::Mutex)?;
433                }
434            }
435            Some(old_lock_count)
436        } else {
437            // Mutex is not locked.
438            None
439        })
440    }
441
442    /// Put the thread into the queue waiting for the mutex.
443    ///
444    /// Once the Mutex becomes available and if it exists, `retval_dest.0` will
445    /// be written to `retval_dest.1`.
446    #[inline]
447    fn mutex_enqueue_and_block(
448        &mut self,
449        mutex_ref: MutexRef,
450        retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>,
451    ) {
452        let this = self.eval_context_mut();
453        let thread = this.active_thread();
454        let mut mutex = mutex_ref.0.borrow_mut();
455        mutex.queue.push_back(thread);
456        assert!(mutex.owner.is_some(), "queuing on unlocked mutex");
457        drop(mutex);
458        this.block_thread(
459            BlockReason::Mutex,
460            None,
461            callback!(
462                @capture<'tcx> {
463                    mutex_ref: MutexRef,
464                    retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>,
465                }
466                |this, unblock: UnblockKind| {
467                    assert_eq!(unblock, UnblockKind::Ready);
468
469                    assert!(mutex_ref.owner().is_none());
470                    this.mutex_lock(&mutex_ref);
471
472                    if let Some((retval, dest)) = retval_dest {
473                        this.write_scalar(retval, &dest)?;
474                    }
475
476                    interp_ok(())
477                }
478            ),
479        );
480    }
481
482    /// Read-lock the lock by adding the `reader` the list of threads that own
483    /// this lock.
484    fn rwlock_reader_lock(&mut self, rwlock_ref: &RwLockRef) {
485        let this = self.eval_context_mut();
486        let thread = this.active_thread();
487        trace!("rwlock_reader_lock: now also held (one more time) by {:?}", thread);
488        let mut rwlock = rwlock_ref.0.borrow_mut();
489        assert!(!rwlock.is_write_locked(), "the lock is write locked");
490        let count = rwlock.readers.entry(thread).or_insert(0);
491        *count = count.strict_add(1);
492        if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
493            data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads);
494        }
495    }
496
497    /// Try read-unlock the lock for the current threads and potentially give the lock to a new owner.
498    /// Returns `true` if succeeded, `false` if this `reader` did not hold the lock.
499    fn rwlock_reader_unlock(&mut self, rwlock_ref: &RwLockRef) -> InterpResult<'tcx, bool> {
500        let this = self.eval_context_mut();
501        let thread = this.active_thread();
502        let mut rwlock = rwlock_ref.0.borrow_mut();
503        match rwlock.readers.entry(thread) {
504            Entry::Occupied(mut entry) => {
505                let count = entry.get_mut();
506                assert!(*count > 0, "rwlock locked with count == 0");
507                *count -= 1;
508                if *count == 0 {
509                    trace!("rwlock_reader_unlock: no longer held by {:?}", thread);
510                    entry.remove();
511                } else {
512                    trace!("rwlock_reader_unlock: held one less time by {:?}", thread);
513                }
514            }
515            Entry::Vacant(_) => return interp_ok(false), // we did not even own this lock
516        }
517        if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
518            // Add this to the shared-release clock of all concurrent readers.
519            data_race.release_clock(&this.machine.threads, |clock| {
520                rwlock.clock_current_readers.join(clock)
521            });
522        }
523
524        // The thread was a reader. If the lock is not held any more, give it to a writer.
525        if rwlock.is_locked().not() {
526            // All the readers are finished, so set the writer data-race handle to the value
527            // of the union of all reader data race handles, since the set of readers
528            // happen-before the writers
529            let rwlock_ref = &mut *rwlock;
530            rwlock_ref.clock_unlocked.clone_from(&rwlock_ref.clock_current_readers);
531            // See if there is a thread to unblock.
532            if let Some(writer) = rwlock_ref.writer_queue.pop_front() {
533                drop(rwlock); // make RefCell available for unblock callback
534                this.unblock_thread(writer, BlockReason::RwLock)?;
535            }
536        }
537        interp_ok(true)
538    }
539
540    /// Put the reader in the queue waiting for the lock and block it.
541    /// Once the lock becomes available, `retval` will be written to `dest`.
542    #[inline]
543    fn rwlock_enqueue_and_block_reader(
544        &mut self,
545        rwlock_ref: RwLockRef,
546        retval: Scalar,
547        dest: MPlaceTy<'tcx>,
548    ) {
549        let this = self.eval_context_mut();
550        let thread = this.active_thread();
551        let mut rwlock = rwlock_ref.0.borrow_mut();
552        rwlock.reader_queue.push_back(thread);
553        assert!(rwlock.is_write_locked(), "read-queueing on not write locked rwlock");
554        drop(rwlock);
555        this.block_thread(
556            BlockReason::RwLock,
557            None,
558            callback!(
559                @capture<'tcx> {
560                    rwlock_ref: RwLockRef,
561                    retval: Scalar,
562                    dest: MPlaceTy<'tcx>,
563                }
564                |this, unblock: UnblockKind| {
565                    assert_eq!(unblock, UnblockKind::Ready);
566                    this.rwlock_reader_lock(&rwlock_ref);
567                    this.write_scalar(retval, &dest)?;
568                    interp_ok(())
569                }
570            ),
571        );
572    }
573
574    /// Lock by setting the writer that owns the lock.
575    #[inline]
576    fn rwlock_writer_lock(&mut self, rwlock_ref: &RwLockRef) {
577        let this = self.eval_context_mut();
578        let thread = this.active_thread();
579        trace!("rwlock_writer_lock: now held by {:?}", thread);
580
581        let mut rwlock = rwlock_ref.0.borrow_mut();
582        assert!(!rwlock.is_locked(), "the rwlock is already locked");
583        rwlock.writer = Some(thread);
584        if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
585            data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads);
586        }
587    }
588
589    /// Try to unlock an rwlock held by the current thread.
590    /// Return `false` if it is held by another thread.
591    #[inline]
592    fn rwlock_writer_unlock(&mut self, rwlock_ref: &RwLockRef) -> InterpResult<'tcx, bool> {
593        let this = self.eval_context_mut();
594        let thread = this.active_thread();
595        let mut rwlock = rwlock_ref.0.borrow_mut();
596        interp_ok(if let Some(current_writer) = rwlock.writer {
597            if current_writer != thread {
598                // Only the owner can unlock the rwlock.
599                return interp_ok(false);
600            }
601            rwlock.writer = None;
602            trace!("rwlock_writer_unlock: unlocked by {:?}", thread);
603            // Record release clock for next lock holder.
604            if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
605                data_race.release_clock(&this.machine.threads, |clock| {
606                    rwlock.clock_unlocked.clone_from(clock)
607                });
608            }
609
610            // The thread was a writer.
611            //
612            // We are prioritizing writers here against the readers. As a
613            // result, not only readers can starve writers, but also writers can
614            // starve readers.
615            if let Some(writer) = rwlock.writer_queue.pop_front() {
616                drop(rwlock); // make RefCell available for unblock callback
617                this.unblock_thread(writer, BlockReason::RwLock)?;
618            } else {
619                // Take the entire read queue and wake them all up.
620                let readers = std::mem::take(&mut rwlock.reader_queue);
621                drop(rwlock); // make RefCell available for unblock callback
622                for reader in readers {
623                    this.unblock_thread(reader, BlockReason::RwLock)?;
624                }
625            }
626            true
627        } else {
628            false
629        })
630    }
631
632    /// Put the writer in the queue waiting for the lock.
633    /// Once the lock becomes available, `retval` will be written to `dest`.
634    #[inline]
635    fn rwlock_enqueue_and_block_writer(
636        &mut self,
637        rwlock_ref: RwLockRef,
638        retval: Scalar,
639        dest: MPlaceTy<'tcx>,
640    ) {
641        let this = self.eval_context_mut();
642        let thread = this.active_thread();
643        let mut rwlock = rwlock_ref.0.borrow_mut();
644        rwlock.writer_queue.push_back(thread);
645        assert!(rwlock.is_locked(), "write-queueing on unlocked rwlock");
646        drop(rwlock);
647        this.block_thread(
648            BlockReason::RwLock,
649            None,
650            callback!(
651                @capture<'tcx> {
652                    rwlock_ref: RwLockRef,
653                    retval: Scalar,
654                    dest: MPlaceTy<'tcx>,
655                }
656                |this, unblock: UnblockKind| {
657                    assert_eq!(unblock, UnblockKind::Ready);
658                    this.rwlock_writer_lock(&rwlock_ref);
659                    this.write_scalar(retval, &dest)?;
660                    interp_ok(())
661                }
662            ),
663        );
664    }
665
666    /// Is the conditional variable awaited?
667    #[inline]
668    fn condvar_is_awaited(&mut self, id: CondvarId) -> bool {
669        let this = self.eval_context_mut();
670        !this.machine.sync.condvars[id].waiters.is_empty()
671    }
672
673    /// Release the mutex and let the current thread wait on the given condition variable.
674    /// Once it is signaled, the mutex will be acquired and `retval_succ` will be written to `dest`.
675    /// If the timeout happens first, `retval_timeout` will be written to `dest`.
676    fn condvar_wait(
677        &mut self,
678        condvar: CondvarId,
679        mutex_ref: MutexRef,
680        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
681        retval_succ: Scalar,
682        retval_timeout: Scalar,
683        dest: MPlaceTy<'tcx>,
684    ) -> InterpResult<'tcx> {
685        let this = self.eval_context_mut();
686        if let Some(old_locked_count) = this.mutex_unlock(&mutex_ref)? {
687            if old_locked_count != 1 {
688                throw_unsup_format!(
689                    "awaiting a condvar on a mutex acquired multiple times is not supported"
690                );
691            }
692        } else {
693            throw_ub_format!(
694                "awaiting a condvar on a mutex that is unlocked or owned by a different thread"
695            );
696        }
697        let thread = this.active_thread();
698        let waiters = &mut this.machine.sync.condvars[condvar].waiters;
699        waiters.push_back(thread);
700        this.block_thread(
701            BlockReason::Condvar(condvar),
702            timeout,
703            callback!(
704                @capture<'tcx> {
705                    condvar: CondvarId,
706                    mutex_ref: MutexRef,
707                    retval_succ: Scalar,
708                    retval_timeout: Scalar,
709                    dest: MPlaceTy<'tcx>,
710                }
711                |this, unblock: UnblockKind| {
712                    match unblock {
713                        UnblockKind::Ready => {
714                            // The condvar was signaled. Make sure we get the clock for that.
715                            if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
716                                data_race.acquire_clock(
717                                    &this.machine.sync.condvars[condvar].clock,
718                                    &this.machine.threads,
719                                );
720                            }
721                            // Try to acquire the mutex.
722                            // The timeout only applies to the first wait (until the signal), not for mutex acquisition.
723                            this.condvar_reacquire_mutex(mutex_ref, retval_succ, dest)
724                        }
725                        UnblockKind::TimedOut => {
726                            // We have to remove the waiter from the queue again.
727                            let thread = this.active_thread();
728                            let waiters = &mut this.machine.sync.condvars[condvar].waiters;
729                            waiters.retain(|waiter| *waiter != thread);
730                            // Now get back the lock.
731                            this.condvar_reacquire_mutex(mutex_ref, retval_timeout, dest)
732                        }
733                    }
734                }
735            ),
736        );
737        interp_ok(())
738    }
739
740    /// Wake up some thread (if there is any) sleeping on the conditional
741    /// variable. Returns `true` iff any thread was woken up.
742    fn condvar_signal(&mut self, id: CondvarId) -> InterpResult<'tcx, bool> {
743        let this = self.eval_context_mut();
744        let condvar = &mut this.machine.sync.condvars[id];
745
746        // Each condvar signal happens-before the end of the condvar wake
747        if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
748            data_race.release_clock(&this.machine.threads, |clock| condvar.clock.clone_from(clock));
749        }
750        let Some(waiter) = condvar.waiters.pop_front() else {
751            return interp_ok(false);
752        };
753        this.unblock_thread(waiter, BlockReason::Condvar(id))?;
754        interp_ok(true)
755    }
756
757    /// Wait for the futex to be signaled, or a timeout. Once the thread is
758    /// unblocked, `callback` is called with the unblock reason.
759    fn futex_wait(
760        &mut self,
761        futex_ref: FutexRef,
762        bitset: u32,
763        timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
764        callback: DynUnblockCallback<'tcx>,
765    ) {
766        let this = self.eval_context_mut();
767        let thread = this.active_thread();
768        let mut futex = futex_ref.0.borrow_mut();
769        let waiters = &mut futex.waiters;
770        assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
771        waiters.push(FutexWaiter { thread, bitset });
772        drop(futex);
773
774        this.block_thread(
775            BlockReason::Futex,
776            timeout,
777            callback!(
778                @capture<'tcx> {
779                    futex_ref: FutexRef,
780                    callback: DynUnblockCallback<'tcx>,
781                }
782                |this, unblock: UnblockKind| {
783                    match unblock {
784                        UnblockKind::Ready => {
785                            let futex = futex_ref.0.borrow();
786                            // Acquire the clock of the futex.
787                            if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
788                                data_race.acquire_clock(&futex.clock, &this.machine.threads);
789                            }
790                        },
791                        UnblockKind::TimedOut => {
792                            // Remove the waiter from the futex.
793                            let thread = this.active_thread();
794                            let mut futex = futex_ref.0.borrow_mut();
795                            futex.waiters.retain(|waiter| waiter.thread != thread);
796                        },
797                    }
798
799                    callback.call(this, unblock)
800                }
801            ),
802        );
803    }
804
805    /// Wake up `count` of the threads in the queue that match any of the bits
806    /// in the bitset. Returns how many threads were woken.
807    fn futex_wake(
808        &mut self,
809        futex_ref: &FutexRef,
810        bitset: u32,
811        count: usize,
812    ) -> InterpResult<'tcx, usize> {
813        let this = self.eval_context_mut();
814        let mut futex = futex_ref.0.borrow_mut();
815
816        // Each futex-wake happens-before the end of the futex wait
817        if let Some(data_race) = this.machine.data_race.as_vclocks_ref() {
818            data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock));
819        }
820
821        // Remove `count` of the threads in the queue that match any of the bits in the bitset.
822        // We collect all of them before unblocking because the unblock callback may access the
823        // futex state to retrieve the remaining number of waiters on macOS.
824        let waiters: Vec<_> =
825            futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect();
826        drop(futex);
827
828        let woken = waiters.len();
829        for waiter in waiters {
830            this.unblock_thread(waiter.thread, BlockReason::Futex)?;
831        }
832
833        interp_ok(woken)
834    }
835}