cargo/util/
lockserver.rs
1use std::collections::HashMap;
15use std::io::{BufRead, BufReader, Read, Write};
16use std::net::{SocketAddr, TcpListener, TcpStream};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread::{self, JoinHandle};
20
21use anyhow::{Context, Error};
22
23use crate::util::network::LOCALHOST;
24
25pub struct LockServer {
26 listener: TcpListener,
27 addr: SocketAddr,
28 threads: HashMap<String, ServerClient>,
29 done: Arc<AtomicBool>,
30}
31
32pub struct LockServerStarted {
33 done: Arc<AtomicBool>,
34 addr: SocketAddr,
35 thread: Option<JoinHandle<()>>,
36}
37
38pub struct LockServerClient {
39 _socket: TcpStream,
40}
41
42struct ServerClient {
43 thread: Option<JoinHandle<()>>,
44 lock: Arc<Mutex<(bool, Vec<TcpStream>)>>,
45}
46
47impl LockServer {
48 pub fn new() -> Result<LockServer, Error> {
49 let listener = TcpListener::bind(&LOCALHOST[..])
50 .context("failed to bind TCP listener to manage locking")?;
51 let addr = listener.local_addr()?;
52 Ok(LockServer {
53 listener,
54 addr,
55 threads: HashMap::new(),
56 done: Arc::new(AtomicBool::new(false)),
57 })
58 }
59
60 pub fn addr(&self) -> &SocketAddr {
61 &self.addr
62 }
63
64 pub fn start(self) -> Result<LockServerStarted, Error> {
65 let addr = self.addr;
66 let done = self.done.clone();
67 let thread = thread::spawn(|| {
68 self.run();
69 });
70 Ok(LockServerStarted {
71 addr,
72 thread: Some(thread),
73 done,
74 })
75 }
76
77 fn run(mut self) {
78 while let Ok((client, _)) = self.listener.accept() {
79 if self.done.load(Ordering::SeqCst) {
80 break;
81 }
82
83 let mut client = BufReader::new(client);
86 let mut name = String::new();
87 if client.read_line(&mut name).is_err() {
88 continue;
89 }
90 let client = client.into_inner();
91
92 if let Some(t) = self.threads.get_mut(&name) {
96 let mut state = t.lock.lock().unwrap();
97 if state.0 {
98 state.1.push(client);
99 continue;
100 }
101 drop(t.thread.take().unwrap().join());
102 }
103
104 let lock = Arc::new(Mutex::new((true, vec![client])));
105 let lock2 = lock.clone();
106 let thread = thread::spawn(move || {
107 loop {
108 let mut client = {
109 let mut state = lock2.lock().unwrap();
110 if state.1.is_empty() {
111 state.0 = false;
112 break;
113 } else {
114 state.1.remove(0)
115 }
116 };
117 if client.write_all(&[1]).is_err() {
120 continue;
121 }
122 let mut dst = Vec::new();
123 drop(client.read_to_end(&mut dst));
124 }
125 });
126
127 self.threads.insert(
128 name,
129 ServerClient {
130 thread: Some(thread),
131 lock,
132 },
133 );
134 }
135 }
136}
137
138impl Drop for LockServer {
139 fn drop(&mut self) {
140 for (_, mut client) in self.threads.drain() {
141 if let Some(thread) = client.thread.take() {
142 drop(thread.join());
143 }
144 }
145 }
146}
147
148impl Drop for LockServerStarted {
149 fn drop(&mut self) {
150 self.done.store(true, Ordering::SeqCst);
151 if TcpStream::connect(&self.addr).is_err() {
153 return;
154 }
155 drop(self.thread.take().unwrap().join());
156 }
157}
158
159impl LockServerClient {
160 pub fn lock(addr: &SocketAddr, name: impl AsRef<[u8]>) -> Result<LockServerClient, Error> {
161 let mut client =
162 TcpStream::connect(&addr).context("failed to connect to parent lock server")?;
163 client
164 .write_all(name.as_ref())
165 .and_then(|_| client.write_all(b"\n"))
166 .context("failed to write to lock server")?;
167 let mut buf = [0];
168 client
169 .read_exact(&mut buf)
170 .context("failed to acquire lock")?;
171 Ok(LockServerClient { _socket: client })
172 }
173}