cargo/util/
lockserver.rs

1//! An implementation of IPC locks, guaranteed to be released if a process dies
2//!
3//! This module implements a locking server/client where the main `cargo fix`
4//! process will start up a server and then all the client processes will
5//! connect to it. The main purpose of this file is to ensure that each crate
6//! (aka file entry point) is only fixed by one process at a time, currently
7//! concurrent fixes can't happen.
8//!
9//! The basic design here is to use a TCP server which is pretty portable across
10//! platforms. For simplicity it just uses threads as well. Clients connect to
11//! the main server, inform the server what its name is, and then wait for the
12//! server to give it the lock (aka write a byte).
13
14use std::collections::HashMap;
15use std::io::{BufRead, BufReader, Read, Write};
16use std::net::{SocketAddr, TcpListener, TcpStream};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread::{self, JoinHandle};
20
21use anyhow::{Context, Error};
22
23use crate::util::network::LOCALHOST;
24
25pub struct LockServer {
26    listener: TcpListener,
27    addr: SocketAddr,
28    threads: HashMap<String, ServerClient>,
29    done: Arc<AtomicBool>,
30}
31
32pub struct LockServerStarted {
33    done: Arc<AtomicBool>,
34    addr: SocketAddr,
35    thread: Option<JoinHandle<()>>,
36}
37
38pub struct LockServerClient {
39    _socket: TcpStream,
40}
41
42struct ServerClient {
43    thread: Option<JoinHandle<()>>,
44    lock: Arc<Mutex<(bool, Vec<TcpStream>)>>,
45}
46
47impl LockServer {
48    pub fn new() -> Result<LockServer, Error> {
49        let listener = TcpListener::bind(&LOCALHOST[..])
50            .context("failed to bind TCP listener to manage locking")?;
51        let addr = listener.local_addr()?;
52        Ok(LockServer {
53            listener,
54            addr,
55            threads: HashMap::new(),
56            done: Arc::new(AtomicBool::new(false)),
57        })
58    }
59
60    pub fn addr(&self) -> &SocketAddr {
61        &self.addr
62    }
63
64    pub fn start(self) -> Result<LockServerStarted, Error> {
65        let addr = self.addr;
66        let done = self.done.clone();
67        let thread = thread::spawn(|| {
68            self.run();
69        });
70        Ok(LockServerStarted {
71            addr,
72            thread: Some(thread),
73            done,
74        })
75    }
76
77    fn run(mut self) {
78        while let Ok((client, _)) = self.listener.accept() {
79            if self.done.load(Ordering::SeqCst) {
80                break;
81            }
82
83            // Learn the name of our connected client to figure out if it needs
84            // to wait for another process to release the lock.
85            let mut client = BufReader::new(client);
86            let mut name = String::new();
87            if client.read_line(&mut name).is_err() {
88                continue;
89            }
90            let client = client.into_inner();
91
92            // If this "named mutex" is already registered and the thread is
93            // still going, put it on the queue. Otherwise wait on the previous
94            // thread and we'll replace it just below.
95            if let Some(t) = self.threads.get_mut(&name) {
96                let mut state = t.lock.lock().unwrap();
97                if state.0 {
98                    state.1.push(client);
99                    continue;
100                }
101                drop(t.thread.take().unwrap().join());
102            }
103
104            let lock = Arc::new(Mutex::new((true, vec![client])));
105            let lock2 = lock.clone();
106            let thread = thread::spawn(move || {
107                loop {
108                    let mut client = {
109                        let mut state = lock2.lock().unwrap();
110                        if state.1.is_empty() {
111                            state.0 = false;
112                            break;
113                        } else {
114                            state.1.remove(0)
115                        }
116                    };
117                    // Inform this client that it now has the lock and wait for
118                    // it to disconnect by waiting for EOF.
119                    if client.write_all(&[1]).is_err() {
120                        continue;
121                    }
122                    let mut dst = Vec::new();
123                    drop(client.read_to_end(&mut dst));
124                }
125            });
126
127            self.threads.insert(
128                name,
129                ServerClient {
130                    thread: Some(thread),
131                    lock,
132                },
133            );
134        }
135    }
136}
137
138impl Drop for LockServer {
139    fn drop(&mut self) {
140        for (_, mut client) in self.threads.drain() {
141            if let Some(thread) = client.thread.take() {
142                drop(thread.join());
143            }
144        }
145    }
146}
147
148impl Drop for LockServerStarted {
149    fn drop(&mut self) {
150        self.done.store(true, Ordering::SeqCst);
151        // Ignore errors here as this is largely best-effort
152        if TcpStream::connect(&self.addr).is_err() {
153            return;
154        }
155        drop(self.thread.take().unwrap().join());
156    }
157}
158
159impl LockServerClient {
160    pub fn lock(addr: &SocketAddr, name: impl AsRef<[u8]>) -> Result<LockServerClient, Error> {
161        let mut client =
162            TcpStream::connect(&addr).context("failed to connect to parent lock server")?;
163        client
164            .write_all(name.as_ref())
165            .and_then(|_| client.write_all(b"\n"))
166            .context("failed to write to lock server")?;
167        let mut buf = [0];
168        client
169            .read_exact(&mut buf)
170            .context("failed to acquire lock")?;
171        Ok(LockServerClient { _socket: client })
172    }
173}