|
| 1 | +//! An implementation of IPC locks, guaranteed to be released if a process dies |
| 2 | +//! |
| 3 | +//! This module implements a locking server/client where the main `cargo fix` |
| 4 | +//! process will start up a server and then all the client processes will |
| 5 | +//! connect to it. The main purpose of this file is to enusre that each crate |
| 6 | +//! (aka file entry point) is only fixed by one process at a time, currently |
| 7 | +//! concurrent fixes can't happen. |
| 8 | +//! |
| 9 | +//! The basic design here is to use a TCP server which is pretty portable across |
| 10 | +//! platforms. For simplicity it just uses threads as well. Clients connect to |
| 11 | +//! the main server, inform the server what its name is, and then wait for the |
| 12 | +//! server to give it the lock (aka write a byte). |
| 13 | +
|
| 14 | +use std::collections::HashMap; |
| 15 | +use std::env; |
| 16 | +use std::io::{BufReader, BufRead, Read, Write}; |
| 17 | +use std::net::{TcpStream, SocketAddr, TcpListener}; |
| 18 | +use std::sync::{Arc, Mutex}; |
| 19 | +use std::sync::atomic::{AtomicBool, Ordering}; |
| 20 | +use std::thread::{self, JoinHandle}; |
| 21 | + |
| 22 | +use failure::{Error, ResultExt}; |
| 23 | + |
| 24 | +pub struct Server { |
| 25 | + listener: TcpListener, |
| 26 | + threads: HashMap<String, ServerClient>, |
| 27 | + done: Arc<AtomicBool>, |
| 28 | +} |
| 29 | + |
| 30 | +pub struct StartedServer { |
| 31 | + done: Arc<AtomicBool>, |
| 32 | + addr: SocketAddr, |
| 33 | + thread: Option<JoinHandle<()>>, |
| 34 | +} |
| 35 | + |
| 36 | +pub struct Client { |
| 37 | + _socket: TcpStream, |
| 38 | +} |
| 39 | + |
| 40 | +struct ServerClient { |
| 41 | + thread: Option<JoinHandle<()>>, |
| 42 | + lock: Arc<Mutex<(bool, Vec<TcpStream>)>>, |
| 43 | +} |
| 44 | + |
| 45 | +impl Server { |
| 46 | + pub fn new() -> Result<Server, Error> { |
| 47 | + let listener = TcpListener::bind("127.0.0.1:0") |
| 48 | + .with_context(|_| "failed to bind TCP listener to manage locking")?; |
| 49 | + env::set_var("__CARGO_FIX_SERVER", listener.local_addr()?.to_string()); |
| 50 | + Ok(Server { |
| 51 | + listener, |
| 52 | + threads: HashMap::new(), |
| 53 | + done: Arc::new(AtomicBool::new(false)), |
| 54 | + }) |
| 55 | + } |
| 56 | + |
| 57 | + pub fn start(self) -> Result<StartedServer, Error> { |
| 58 | + let addr = self.listener.local_addr()?; |
| 59 | + let done = self.done.clone(); |
| 60 | + let thread = thread::spawn(|| { |
| 61 | + self.run(); |
| 62 | + }); |
| 63 | + Ok(StartedServer { |
| 64 | + addr, |
| 65 | + thread: Some(thread), |
| 66 | + done, |
| 67 | + }) |
| 68 | + } |
| 69 | + |
| 70 | + fn run(mut self) { |
| 71 | + while let Ok((client, _)) = self.listener.accept() { |
| 72 | + if self.done.load(Ordering::SeqCst) { |
| 73 | + break |
| 74 | + } |
| 75 | + |
| 76 | + // Learn the name of our connected client to figure out if it needs |
| 77 | + // to wait for another process to release the lock. |
| 78 | + let mut client = BufReader::new(client); |
| 79 | + let mut name = String::new(); |
| 80 | + if client.read_line(&mut name).is_err() { |
| 81 | + continue |
| 82 | + } |
| 83 | + let client = client.into_inner(); |
| 84 | + |
| 85 | + // If this "named mutex" is already registered and the thread is |
| 86 | + // still going, put it on the queue. Otherwise wait on the previous |
| 87 | + // thread and we'll replace it just below. |
| 88 | + if let Some(t) = self.threads.get_mut(&name) { |
| 89 | + let mut state = t.lock.lock().unwrap(); |
| 90 | + if state.0 { |
| 91 | + state.1.push(client); |
| 92 | + continue |
| 93 | + } |
| 94 | + drop(t.thread.take().unwrap().join()); |
| 95 | + } |
| 96 | + |
| 97 | + let lock = Arc::new(Mutex::new((true, vec![client]))); |
| 98 | + let lock2 = lock.clone(); |
| 99 | + let thread = thread::spawn(move || { |
| 100 | + loop { |
| 101 | + let mut client = { |
| 102 | + let mut state = lock2.lock().unwrap(); |
| 103 | + if state.1.len() == 0 { |
| 104 | + state.0 = false; |
| 105 | + break |
| 106 | + } else { |
| 107 | + state.1.remove(0) |
| 108 | + } |
| 109 | + }; |
| 110 | + // Inform this client that it now has the lock and wait for |
| 111 | + // it to disconnect by waiting for EOF. |
| 112 | + if client.write_all(&[1]).is_err() { |
| 113 | + continue |
| 114 | + } |
| 115 | + let mut dst = Vec::new(); |
| 116 | + drop(client.read_to_end(&mut dst)); |
| 117 | + } |
| 118 | + }); |
| 119 | + |
| 120 | + self.threads.insert(name, ServerClient { |
| 121 | + thread: Some(thread), |
| 122 | + lock, |
| 123 | + }); |
| 124 | + } |
| 125 | + } |
| 126 | +} |
| 127 | + |
| 128 | +impl Drop for Server { |
| 129 | + fn drop(&mut self) { |
| 130 | + for (_, mut client) in self.threads.drain() { |
| 131 | + if let Some(thread) = client.thread.take() { |
| 132 | + drop(thread.join()); |
| 133 | + } |
| 134 | + } |
| 135 | + } |
| 136 | +} |
| 137 | + |
| 138 | +impl Drop for StartedServer { |
| 139 | + fn drop(&mut self) { |
| 140 | + self.done.store(true, Ordering::SeqCst); |
| 141 | + // Ignore errors here as this is largely best-effort |
| 142 | + if TcpStream::connect(&self.addr).is_err() { |
| 143 | + return |
| 144 | + } |
| 145 | + drop(self.thread.take().unwrap().join()); |
| 146 | + } |
| 147 | +} |
| 148 | + |
| 149 | +impl Client { |
| 150 | + pub fn lock(name: &str) -> Result<Client, Error> { |
| 151 | + let addr = env::var("__CARGO_FIX_SERVER") |
| 152 | + .map_err(|_| format_err!("locking strategy misconfigured"))?; |
| 153 | + let mut client = TcpStream::connect(&addr) |
| 154 | + .with_context(|_| "failed to connect to parent lock server")?; |
| 155 | + client.write_all(name.as_bytes()) |
| 156 | + .and_then(|_| client.write_all(b"\n")) |
| 157 | + .with_context(|_| "failed to write to lock server")?; |
| 158 | + let mut buf = [0]; |
| 159 | + client.read_exact(&mut buf) |
| 160 | + .with_context(|_| "failed to acquire lock")?; |
| 161 | + Ok(Client { _socket: client }) |
| 162 | + } |
| 163 | +} |
0 commit comments