Skip to content

Commit 92d17f2

Browse files
committed
all async function returned futures should now be Send, despite rust-lang/rust#104883
1 parent eaaf458 commit 92d17f2

File tree

2 files changed

+90
-86
lines changed

2 files changed

+90
-86
lines changed

src/mapping.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use core::{
6969
use std::{
7070
cell::RefCell,
7171
collections::{HashMap, BTreeSet},
72-
sync::{Arc, Weak, Mutex, MutexGuard, RwLock, RwLockWriteGuard},
72+
sync::{Arc, Weak, Mutex, RwLock, RwLockWriteGuard},
7373
};
7474
use bilge::prelude::*;
7575

src/rawmaster.rs

+89-85
Original file line numberDiff line numberDiff line change
@@ -249,101 +249,105 @@ impl RawMaster {
249249
SlaveAddress::Logical => memory,
250250
};
251251

252-
let token;
253-
let (ready, _finisher) = {
252+
let (token, ready, _finisher);
253+
loop {
254254
// buffering the pdu sending
255-
let mut state = self.pdu_state.lock().unwrap();
256-
257-
while state.free.is_empty() {
258-
let notification = self.received.notified();
259-
drop(state);
260-
notification.await;
261-
state = self.pdu_state.lock().unwrap();
262-
}
263-
264-
// sending the buffer if necessary
265-
while self.socket.max_frame() < state.last_end + data.len() + PduHeader::packed_size() + PduFooter::packed_size() {
266-
assert!(self.socket.max_frame() >
267-
EthercatHeader::packed_size()
268-
+ data.len()
269-
+ PduHeader::packed_size()
270-
+ PduFooter::packed_size(), "data too big for an ethercat frame");
271-
state.ready = true;
272-
self.sendable.notify_one();
273-
let notification = self.sent.notified();
274-
drop(state);
275-
notification.await;
276-
state = self.pdu_state.lock().unwrap();
277-
}
278-
279-
// reserving a token number to ensure no other task will exchange a PDU with the same token and receive our data
280-
token = state.free.pop().unwrap();
281-
state.receive[token] = Some(PduStorage {
282-
// cast lifetime as static
283-
// memory safety: this slice is pinned by the caller and its access is managed by field `ready`
284-
data: unsafe {std::slice::from_raw_parts_mut(
285-
data.as_mut_ptr(),
286-
data.len(),
287-
)},
288-
ready: false,
289-
answers: 0,
290-
});
291-
292-
// change last value's PduHeader.next
293-
if state.last_start <= state.last_end {
294-
let range = state.last_start .. state.last_end;
295-
let place = &mut state.send[range];
296-
let mut header = PduHeader::unpack(place).unwrap();
297-
header.set_next(true);
298-
header.pack(place).unwrap();
299-
}
300-
else {
301-
state.last_end = state.last_start;
302-
}
303255

304-
// stacking the PDU in self.pdu_receive
305-
let advance = {
306-
let range = state.last_end ..;
307-
let mut cursor = Cursor::new(&mut state.send[range]);
308-
cursor.pack(&PduHeader::new(
309-
u8::from(command),
310-
token as u8,
311-
address,
312-
u11::new(data.len().try_into().unwrap()),
313-
false,
314-
false,
315-
u16::new(0),
316-
)).unwrap();
317-
cursor.write(data).unwrap();
318-
cursor.pack(&PduFooter::new(0)).unwrap();
319-
cursor.position()
320-
};
321-
state.last_start = state.last_end;
322-
state.last_end = state.last_start + advance;
323-
324-
self.sendable.notify_one();
325-
326-
// memory safety: this item in the array cannot be moved since self is borrowed, and will only be removed later by the current function
327-
// we will access it potentially concurrently, but since we only want to detect a change in the value, that's fine
328-
let ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const bool)};
329-
// clean up the receive table at function end, or in case the async runtime cancels this task
330-
let finisher = Finisher::new(|| {
256+
// this weird scope is here to prevent the rust thread checker to set this async future `!Send` just because there is remaining freed variables with `MutexGuard` type
257+
// TODO: restore the previous code (more readable and flexible) once https://github.com/rust-lang/rust/issues/104883 is fixed
258+
{
331259
let mut state = self.pdu_state.lock().unwrap();
332-
state.receive[token] = None;
333-
state.free.push(token).unwrap();
334-
});
335-
(ready, finisher)
336-
};
337-
260+
let space_available = || self.socket.max_frame() > state.last_end + data.len() + PduHeader::packed_size() + PduFooter::packed_size();
261+
let token_available = || ! state.free.is_empty();
262+
if ! token_available() {
263+
// there is nothing to do except waiting
264+
}
265+
else if ! space_available() {
266+
// sending the current buffer
267+
assert!(self.socket.max_frame() >
268+
EthercatHeader::packed_size()
269+
+ data.len()
270+
+ PduHeader::packed_size()
271+
+ PduFooter::packed_size(), "data too big for an ethercat frame");
272+
state.ready = true;
273+
self.sendable.notify_one();
274+
}
275+
else {
276+
// reserving a token number to ensure no other task will exchange a PDU with the same token and receive our data
277+
token = state.free.pop().unwrap();
278+
state.receive[token] = Some(PduStorage {
279+
// cast lifetime as static
280+
// memory safety: this slice is pinned by the caller and its access is managed by field `ready`
281+
data: unsafe {std::slice::from_raw_parts_mut(
282+
data.as_mut_ptr(),
283+
data.len(),
284+
)},
285+
ready: false,
286+
answers: 0,
287+
});
288+
289+
// change last value's PduHeader.next
290+
if state.last_start <= state.last_end {
291+
let range = state.last_start .. state.last_end;
292+
let place = &mut state.send[range];
293+
let mut header = PduHeader::unpack(place).unwrap();
294+
header.set_next(true);
295+
header.pack(place).unwrap();
296+
}
297+
else {
298+
state.last_end = state.last_start;
299+
}
300+
301+
// stacking the PDU in self.pdu_receive
302+
let advance = {
303+
let range = state.last_end ..;
304+
let mut cursor = Cursor::new(&mut state.send[range]);
305+
cursor.pack(&PduHeader::new(
306+
u8::from(command),
307+
token as u8,
308+
address,
309+
u11::new(data.len().try_into().unwrap()),
310+
false,
311+
false,
312+
u16::new(0),
313+
)).unwrap();
314+
cursor.write(data).unwrap();
315+
cursor.pack(&PduFooter::new(0)).unwrap();
316+
cursor.position()
317+
};
318+
state.last_start = state.last_end;
319+
state.last_end = state.last_start + advance;
320+
321+
self.sendable.notify_one();
322+
323+
// memory safety: this item in the array cannot be moved since self is borrowed, and will only be removed later by the current function
324+
// we will access it potentially concurrently, but since we only want to detect a change in the value, that's fine
325+
ready = unsafe {&*(&state.receive[token].as_ref().unwrap().ready as *const bool)};
326+
// clean up the receive table at function end, or in case the async runtime cancels this task
327+
_finisher = Finisher::new(|| {
328+
let mut state = self.pdu_state.lock().unwrap();
329+
state.receive[token] = None;
330+
state.free.push(token).unwrap();
331+
});
332+
333+
break
334+
}
335+
}
336+
self.received.notified().await;
337+
}
338+
338339
// waiting for the answer
339340
loop {
340341
let notification = self.received.notified();
341342
if *ready {break}
342343
notification.await;
343344
}
344345

345-
let state = self.pdu_state.lock().unwrap();
346-
state.receive[token].as_ref().unwrap().answers
346+
{
347+
// free the token
348+
let state = self.pdu_state.lock().unwrap();
349+
state.receive[token].as_ref().unwrap().answers
350+
}
347351
}
348352

349353
/// trigger sending the buffered PDUs, they will be sent as soon as possible by [Self::send] instead of waiting for the frame to be full or for the timeout

0 commit comments

Comments
 (0)