std/io/buffered/linewritershim.rs
1use core::slice::memchr;
2
3use crate::io::{self, BufWriter, IoSlice, Write};
4
5/// Private helper struct for implementing the line-buffered writing logic.
6///
7/// This shim temporarily wraps a BufWriter, and uses its internals to
8/// implement a line-buffered writer (specifically by using the internal
9/// methods like write_to_buf and flush_buf). In this way, a more
10/// efficient abstraction can be created than one that only had access to
11/// `write` and `flush`, without needlessly duplicating a lot of the
12/// implementation details of BufWriter. This also allows existing
13/// `BufWriters` to be temporarily given line-buffering logic; this is what
14/// enables Stdout to be alternately in line-buffered or block-buffered mode.
15#[derive(Debug)]
16pub struct LineWriterShim<'a, W: ?Sized + Write> {
17 buffer: &'a mut BufWriter<W>,
18}
19
20impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
21 pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
22 Self { buffer }
23 }
24
25 /// Gets a reference to the inner writer (that is, the writer
26 /// wrapped by the BufWriter).
27 fn inner(&self) -> &W {
28 self.buffer.get_ref()
29 }
30
31 /// Gets a mutable reference to the inner writer (that is, the writer
32 /// wrapped by the BufWriter). Be careful with this writer, as writes to
33 /// it will bypass the buffer.
34 fn inner_mut(&mut self) -> &mut W {
35 self.buffer.get_mut()
36 }
37
38 /// Gets the content currently buffered in self.buffer
39 fn buffered(&self) -> &[u8] {
40 self.buffer.buffer()
41 }
42
43 /// Flushes the buffer iff the last byte is a newline (indicating that an
44 /// earlier write only succeeded partially, and we want to retry flushing
45 /// the buffered line before continuing with a subsequent write).
46 fn flush_if_completed_line(&mut self) -> io::Result<()> {
47 match self.buffered().last().copied() {
48 Some(b'\n') => self.buffer.flush_buf(),
49 _ => Ok(()),
50 }
51 }
52}
53
54impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
55 /// Writes some data into this BufReader with line buffering.
56 ///
57 /// This means that, if any newlines are present in the data, the data up to
58 /// the last newline is sent directly to the underlying writer, and data
59 /// after it is buffered. Returns the number of bytes written.
60 ///
61 /// This function operates on a "best effort basis"; in keeping with the
62 /// convention of `Write::write`, it makes at most one attempt to write
63 /// new data to the underlying writer. If that write only reports a partial
64 /// success, the remaining data will be buffered.
65 ///
66 /// Because this function attempts to send completed lines to the underlying
67 /// writer, it will also flush the existing buffer if it ends with a
68 /// newline, even if the incoming data does not contain any newlines.
69 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
70 let newline_idx = match memchr::memrchr(b'\n', buf) {
71 // If there are no new newlines (that is, if this write is less than
72 // one line), just do a regular buffered write (which may flush if
73 // we exceed the inner buffer's size)
74 None => {
75 self.flush_if_completed_line()?;
76 return self.buffer.write(buf);
77 }
78 // Otherwise, arrange for the lines to be written directly to the
79 // inner writer.
80 Some(newline_idx) => newline_idx + 1,
81 };
82
83 // Flush existing content to prepare for our write. We have to do this
84 // before attempting to write `buf` in order to maintain consistency;
85 // if we add `buf` to the buffer then try to flush it all at once,
86 // we're obligated to return Ok(), which would mean suppressing any
87 // errors that occur during flush.
88 self.buffer.flush_buf()?;
89
90 // This is what we're going to try to write directly to the inner
91 // writer. The rest will be buffered, if nothing goes wrong.
92 let lines = &buf[..newline_idx];
93
94 // Write `lines` directly to the inner writer. In keeping with the
95 // `write` convention, make at most one attempt to add new (unbuffered)
96 // data. Because this write doesn't touch the BufWriter state directly,
97 // and the buffer is known to be empty, we don't need to worry about
98 // self.buffer.panicked here.
99 let flushed = self.inner_mut().write(lines)?;
100
101 // If buffer returns Ok(0), propagate that to the caller without
102 // doing additional buffering; otherwise we're just guaranteeing
103 // an "ErrorKind::WriteZero" later.
104 if flushed == 0 {
105 return Ok(0);
106 }
107
108 // Now that the write has succeeded, buffer the rest (or as much of
109 // the rest as possible). If there were any unwritten newlines, we
110 // only buffer out to the last unwritten newline that fits in the
111 // buffer; this helps prevent flushing partial lines on subsequent
112 // calls to LineWriterShim::write.
113
114 // Handle the cases in order of most-common to least-common, under
115 // the presumption that most writes succeed in totality, and that most
116 // writes are smaller than the buffer.
117 // - Is this a partial line (ie, no newlines left in the unwritten tail)
118 // - If not, does the data out to the last unwritten newline fit in
119 // the buffer?
120 // - If not, scan for the last newline that *does* fit in the buffer
121 let tail = if flushed >= newline_idx {
122 let tail = &buf[flushed..];
123 // Avoid unnecessary short writes by not splitting the remaining
124 // bytes if they're larger than the buffer.
125 // They can be written in full by the next call to write.
126 if tail.len() >= self.buffer.capacity() {
127 return Ok(flushed);
128 }
129 tail
130 } else if newline_idx - flushed <= self.buffer.capacity() {
131 &buf[flushed..newline_idx]
132 } else {
133 let scan_area = &buf[flushed..];
134 let scan_area = &scan_area[..self.buffer.capacity()];
135 match memchr::memrchr(b'\n', scan_area) {
136 Some(newline_idx) => &scan_area[..newline_idx + 1],
137 None => scan_area,
138 }
139 };
140
141 let buffered = self.buffer.write_to_buf(tail);
142 Ok(flushed + buffered)
143 }
144
145 fn flush(&mut self) -> io::Result<()> {
146 self.buffer.flush()
147 }
148
149 /// Writes some vectored data into this BufReader with line buffering.
150 ///
151 /// This means that, if any newlines are present in the data, the data up to
152 /// and including the buffer containing the last newline is sent directly to
153 /// the inner writer, and the data after it is buffered. Returns the number
154 /// of bytes written.
155 ///
156 /// This function operates on a "best effort basis"; in keeping with the
157 /// convention of `Write::write`, it makes at most one attempt to write
158 /// new data to the underlying writer.
159 ///
160 /// Because this function attempts to send completed lines to the underlying
161 /// writer, it will also flush the existing buffer if it contains any
162 /// newlines.
163 ///
164 /// Because sorting through an array of `IoSlice` can be a bit convoluted,
165 /// This method differs from write in the following ways:
166 ///
167 /// - It attempts to write the full content of all the buffers up to and
168 /// including the one containing the last newline. This means that it
169 /// may attempt to write a partial line, that buffer has data past the
170 /// newline.
171 /// - If the write only reports partial success, it does not attempt to
172 /// find the precise location of the written bytes and buffer the rest.
173 ///
174 /// If the underlying vector doesn't support vectored writing, we instead
175 /// simply write the first non-empty buffer with `write`. This way, we
176 /// get the benefits of more granular partial-line handling without losing
177 /// anything in efficiency
178 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
179 // If there's no specialized behavior for write_vectored, just use
180 // write. This has the benefit of more granular partial-line handling.
181 if !self.is_write_vectored() {
182 return match bufs.iter().find(|buf| !buf.is_empty()) {
183 Some(buf) => self.write(buf),
184 None => Ok(0),
185 };
186 }
187
188 // Find the buffer containing the last newline
189 // FIXME: This is overly slow if there are very many bufs and none contain
190 // newlines. e.g. writev() on Linux only writes up to 1024 slices, so
191 // scanning the rest is wasted effort. This makes write_all_vectored()
192 // quadratic.
193 let last_newline_buf_idx = bufs
194 .iter()
195 .enumerate()
196 .rev()
197 .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
198
199 // If there are no new newlines (that is, if this write is less than
200 // one line), just do a regular buffered write
201 let last_newline_buf_idx = match last_newline_buf_idx {
202 // No newlines; just do a normal buffered write
203 None => {
204 self.flush_if_completed_line()?;
205 return self.buffer.write_vectored(bufs);
206 }
207 Some(i) => i,
208 };
209
210 // Flush existing content to prepare for our write
211 self.buffer.flush_buf()?;
212
213 // This is what we're going to try to write directly to the inner
214 // writer. The rest will be buffered, if nothing goes wrong.
215 let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
216
217 // Write `lines` directly to the inner writer. In keeping with the
218 // `write` convention, make at most one attempt to add new (unbuffered)
219 // data. Because this write doesn't touch the BufWriter state directly,
220 // and the buffer is known to be empty, we don't need to worry about
221 // self.panicked here.
222 let flushed = self.inner_mut().write_vectored(lines)?;
223
224 // If inner returns Ok(0), propagate that to the caller without
225 // doing additional buffering; otherwise we're just guaranteeing
226 // an "ErrorKind::WriteZero" later.
227 if flushed == 0 {
228 return Ok(0);
229 }
230
231 // Don't try to reconstruct the exact amount written; just bail
232 // in the event of a partial write
233 let mut lines_len: usize = 0;
234 for buf in lines {
235 // With overlapping/duplicate slices the total length may in theory
236 // exceed usize::MAX
237 lines_len = lines_len.saturating_add(buf.len());
238 if flushed < lines_len {
239 return Ok(flushed);
240 }
241 }
242
243 // Now that the write has succeeded, buffer the rest (or as much of the
244 // rest as possible)
245 let buffered: usize = tail
246 .iter()
247 .filter(|buf| !buf.is_empty())
248 .map(|buf| self.buffer.write_to_buf(buf))
249 .take_while(|&n| n > 0)
250 .sum();
251
252 Ok(flushed + buffered)
253 }
254
255 fn is_write_vectored(&self) -> bool {
256 self.inner().is_write_vectored()
257 }
258
259 /// Writes some data into this BufReader with line buffering.
260 ///
261 /// This means that, if any newlines are present in the data, the data up to
262 /// the last newline is sent directly to the underlying writer, and data
263 /// after it is buffered.
264 ///
265 /// Because this function attempts to send completed lines to the underlying
266 /// writer, it will also flush the existing buffer if it contains any
267 /// newlines, even if the incoming data does not contain any newlines.
268 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
269 match memchr::memrchr(b'\n', buf) {
270 // If there are no new newlines (that is, if this write is less than
271 // one line), just do a regular buffered write (which may flush if
272 // we exceed the inner buffer's size)
273 None => {
274 self.flush_if_completed_line()?;
275 self.buffer.write_all(buf)
276 }
277 Some(newline_idx) => {
278 let (lines, tail) = buf.split_at(newline_idx + 1);
279
280 if self.buffered().is_empty() {
281 self.inner_mut().write_all(lines)?;
282 } else {
283 // If there is any buffered data, we add the incoming lines
284 // to that buffer before flushing, which saves us at least
285 // one write call. We can't really do this with `write`,
286 // since we can't do this *and* not suppress errors *and*
287 // report a consistent state to the caller in a return
288 // value, but here in write_all it's fine.
289 self.buffer.write_all(lines)?;
290 self.buffer.flush_buf()?;
291 }
292
293 self.buffer.write_all(tail)
294 }
295 }
296 }
297}