1//! An implementation of IPC locks, guaranteed to be released if a process dies
2//!
3//! This module implements a locking server/client where the main `cargo fix`
4//! process will start up a server and then all the client processes will
5//! connect to it. The main purpose of this file is to ensure that each crate
6//! (aka file entry point) is only fixed by one process at a time, currently
7//! concurrent fixes can't happen.
8//!
9//! The basic design here is to use a TCP server which is pretty portable across
10//! platforms. For simplicity it just uses threads as well. Clients connect to
11//! the main server, inform the server what its name is, and then wait for the
12//! server to give it the lock (aka write a byte).
1314use std::collections::HashMap;
15use std::io::{BufRead, BufReader, Read, Write};
16use std::net::{SocketAddr, TcpListener, TcpStream};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread::{self, JoinHandle};
2021use anyhow::{Context, Error};
2223use crate::util::network::LOCALHOST;
2425pub struct LockServer {
26 listener: TcpListener,
27 addr: SocketAddr,
28 threads: HashMap<String, ServerClient>,
29 done: Arc<AtomicBool>,
30}
3132pub struct LockServerStarted {
33 done: Arc<AtomicBool>,
34 addr: SocketAddr,
35 thread: Option<JoinHandle<()>>,
36}
3738pub struct LockServerClient {
39 _socket: TcpStream,
40}
4142struct ServerClient {
43 thread: Option<JoinHandle<()>>,
44 lock: Arc<Mutex<(bool, Vec<TcpStream>)>>,
45}
4647impl LockServer {
48pub fn new() -> Result<LockServer, Error> {
49let listener = TcpListener::bind(&LOCALHOST[..])
50 .context("failed to bind TCP listener to manage locking")?;
51let addr = listener.local_addr()?;
52Ok(LockServer {
53 listener,
54 addr,
55 threads: HashMap::new(),
56 done: Arc::new(AtomicBool::new(false)),
57 })
58 }
5960pub fn addr(&self) -> &SocketAddr {
61&self.addr
62 }
6364pub fn start(self) -> Result<LockServerStarted, Error> {
65let addr = self.addr;
66let done = self.done.clone();
67let thread = thread::spawn(|| {
68self.run();
69 });
70Ok(LockServerStarted {
71 addr,
72 thread: Some(thread),
73 done,
74 })
75 }
7677fn run(mut self) {
78while let Ok((client, _)) = self.listener.accept() {
79if self.done.load(Ordering::SeqCst) {
80break;
81 }
8283// Learn the name of our connected client to figure out if it needs
84 // to wait for another process to release the lock.
85let mut client = BufReader::new(client);
86let mut name = String::new();
87if client.read_line(&mut name).is_err() {
88continue;
89 }
90let client = client.into_inner();
9192// If this "named mutex" is already registered and the thread is
93 // still going, put it on the queue. Otherwise wait on the previous
94 // thread and we'll replace it just below.
95if let Some(t) = self.threads.get_mut(&name) {
96let mut state = t.lock.lock().unwrap();
97if state.0 {
98 state.1.push(client);
99continue;
100 }
101 drop(t.thread.take().unwrap().join());
102 }
103104let lock = Arc::new(Mutex::new((true, vec![client])));
105let lock2 = lock.clone();
106let thread = thread::spawn(move || {
107loop {
108let mut client = {
109let mut state = lock2.lock().unwrap();
110if state.1.is_empty() {
111 state.0 = false;
112break;
113 } else {
114 state.1.remove(0)
115 }
116 };
117// Inform this client that it now has the lock and wait for
118 // it to disconnect by waiting for EOF.
119if client.write_all(&[1]).is_err() {
120continue;
121 }
122let mut dst = Vec::new();
123 drop(client.read_to_end(&mut dst));
124 }
125 });
126127self.threads.insert(
128 name,
129 ServerClient {
130 thread: Some(thread),
131 lock,
132 },
133 );
134 }
135 }
136}
137138impl Drop for LockServer {
139fn drop(&mut self) {
140for (_, mut client) in self.threads.drain() {
141if let Some(thread) = client.thread.take() {
142 drop(thread.join());
143 }
144 }
145 }
146}
147148impl Drop for LockServerStarted {
149fn drop(&mut self) {
150self.done.store(true, Ordering::SeqCst);
151// Ignore errors here as this is largely best-effort
152if TcpStream::connect(&self.addr).is_err() {
153return;
154 }
155 drop(self.thread.take().unwrap().join());
156 }
157}
158159impl LockServerClient {
160pub fn lock(addr: &SocketAddr, name: impl AsRef<[u8]>) -> Result<LockServerClient, Error> {
161let mut client =
162 TcpStream::connect(&addr).context("failed to connect to parent lock server")?;
163 client
164 .write_all(name.as_ref())
165 .and_then(|_| client.write_all(b"\n"))
166 .context("failed to write to lock server")?;
167let mut buf = [0];
168 client
169 .read_exact(&mut buf)
170 .context("failed to acquire lock")?;
171Ok(LockServerClient { _socket: client })
172 }
173}