std/io/buffered/
linewritershim.rs

1use core::slice::memchr;
2
3use crate::io::{self, BufWriter, IoSlice, Write};
4
5/// Private helper struct for implementing the line-buffered writing logic.
6///
7/// This shim temporarily wraps a BufWriter, and uses its internals to
8/// implement a line-buffered writer (specifically by using the internal
9/// methods like write_to_buf and flush_buf). In this way, a more
10/// efficient abstraction can be created than one that only had access to
11/// `write` and `flush`, without needlessly duplicating a lot of the
12/// implementation details of BufWriter. This also allows existing
13/// `BufWriters` to be temporarily given line-buffering logic; this is what
14/// enables Stdout to be alternately in line-buffered or block-buffered mode.
15#[derive(Debug)]
16pub struct LineWriterShim<'a, W: ?Sized + Write> {
17    buffer: &'a mut BufWriter<W>,
18}
19
20impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
21    pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
22        Self { buffer }
23    }
24
25    /// Gets a reference to the inner writer (that is, the writer
26    /// wrapped by the BufWriter).
27    fn inner(&self) -> &W {
28        self.buffer.get_ref()
29    }
30
31    /// Gets a mutable reference to the inner writer (that is, the writer
32    /// wrapped by the BufWriter). Be careful with this writer, as writes to
33    /// it will bypass the buffer.
34    fn inner_mut(&mut self) -> &mut W {
35        self.buffer.get_mut()
36    }
37
38    /// Gets the content currently buffered in self.buffer
39    fn buffered(&self) -> &[u8] {
40        self.buffer.buffer()
41    }
42
43    /// Flushes the buffer iff the last byte is a newline (indicating that an
44    /// earlier write only succeeded partially, and we want to retry flushing
45    /// the buffered line before continuing with a subsequent write).
46    fn flush_if_completed_line(&mut self) -> io::Result<()> {
47        match self.buffered().last().copied() {
48            Some(b'\n') => self.buffer.flush_buf(),
49            _ => Ok(()),
50        }
51    }
52}
53
54impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
55    /// Writes some data into this BufReader with line buffering.
56    ///
57    /// This means that, if any newlines are present in the data, the data up to
58    /// the last newline is sent directly to the underlying writer, and data
59    /// after it is buffered. Returns the number of bytes written.
60    ///
61    /// This function operates on a "best effort basis"; in keeping with the
62    /// convention of `Write::write`, it makes at most one attempt to write
63    /// new data to the underlying writer. If that write only reports a partial
64    /// success, the remaining data will be buffered.
65    ///
66    /// Because this function attempts to send completed lines to the underlying
67    /// writer, it will also flush the existing buffer if it ends with a
68    /// newline, even if the incoming data does not contain any newlines.
69    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
70        let newline_idx = match memchr::memrchr(b'\n', buf) {
71            // If there are no new newlines (that is, if this write is less than
72            // one line), just do a regular buffered write (which may flush if
73            // we exceed the inner buffer's size)
74            None => {
75                self.flush_if_completed_line()?;
76                return self.buffer.write(buf);
77            }
78            // Otherwise, arrange for the lines to be written directly to the
79            // inner writer.
80            Some(newline_idx) => newline_idx + 1,
81        };
82
83        // Flush existing content to prepare for our write. We have to do this
84        // before attempting to write `buf` in order to maintain consistency;
85        // if we add `buf` to the buffer then try to flush it all at once,
86        // we're obligated to return Ok(), which would mean suppressing any
87        // errors that occur during flush.
88        self.buffer.flush_buf()?;
89
90        // This is what we're going to try to write directly to the inner
91        // writer. The rest will be buffered, if nothing goes wrong.
92        let lines = &buf[..newline_idx];
93
94        // Write `lines` directly to the inner writer. In keeping with the
95        // `write` convention, make at most one attempt to add new (unbuffered)
96        // data. Because this write doesn't touch the BufWriter state directly,
97        // and the buffer is known to be empty, we don't need to worry about
98        // self.buffer.panicked here.
99        let flushed = self.inner_mut().write(lines)?;
100
101        // If buffer returns Ok(0), propagate that to the caller without
102        // doing additional buffering; otherwise we're just guaranteeing
103        // an "ErrorKind::WriteZero" later.
104        if flushed == 0 {
105            return Ok(0);
106        }
107
108        // Now that the write has succeeded, buffer the rest (or as much of
109        // the rest as possible). If there were any unwritten newlines, we
110        // only buffer out to the last unwritten newline that fits in the
111        // buffer; this helps prevent flushing partial lines on subsequent
112        // calls to LineWriterShim::write.
113
114        // Handle the cases in order of most-common to least-common, under
115        // the presumption that most writes succeed in totality, and that most
116        // writes are smaller than the buffer.
117        // - Is this a partial line (ie, no newlines left in the unwritten tail)
118        // - If not, does the data out to the last unwritten newline fit in
119        //   the buffer?
120        // - If not, scan for the last newline that *does* fit in the buffer
121        let tail = if flushed >= newline_idx {
122            let tail = &buf[flushed..];
123            // Avoid unnecessary short writes by not splitting the remaining
124            // bytes if they're larger than the buffer.
125            // They can be written in full by the next call to write.
126            if tail.len() >= self.buffer.capacity() {
127                return Ok(flushed);
128            }
129            tail
130        } else if newline_idx - flushed <= self.buffer.capacity() {
131            &buf[flushed..newline_idx]
132        } else {
133            let scan_area = &buf[flushed..];
134            let scan_area = &scan_area[..self.buffer.capacity()];
135            match memchr::memrchr(b'\n', scan_area) {
136                Some(newline_idx) => &scan_area[..newline_idx + 1],
137                None => scan_area,
138            }
139        };
140
141        let buffered = self.buffer.write_to_buf(tail);
142        Ok(flushed + buffered)
143    }
144
145    fn flush(&mut self) -> io::Result<()> {
146        self.buffer.flush()
147    }
148
149    /// Writes some vectored data into this BufReader with line buffering.
150    ///
151    /// This means that, if any newlines are present in the data, the data up to
152    /// and including the buffer containing the last newline is sent directly to
153    /// the inner writer, and the data after it is buffered. Returns the number
154    /// of bytes written.
155    ///
156    /// This function operates on a "best effort basis"; in keeping with the
157    /// convention of `Write::write`, it makes at most one attempt to write
158    /// new data to the underlying writer.
159    ///
160    /// Because this function attempts to send completed lines to the underlying
161    /// writer, it will also flush the existing buffer if it contains any
162    /// newlines.
163    ///
164    /// Because sorting through an array of `IoSlice` can be a bit convoluted,
165    /// This method differs from write in the following ways:
166    ///
167    /// - It attempts to write the full content of all the buffers up to and
168    ///   including the one containing the last newline. This means that it
169    ///   may attempt to write a partial line, that buffer has data past the
170    ///   newline.
171    /// - If the write only reports partial success, it does not attempt to
172    ///   find the precise location of the written bytes and buffer the rest.
173    ///
174    /// If the underlying vector doesn't support vectored writing, we instead
175    /// simply write the first non-empty buffer with `write`. This way, we
176    /// get the benefits of more granular partial-line handling without losing
177    /// anything in efficiency
178    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
179        // If there's no specialized behavior for write_vectored, just use
180        // write. This has the benefit of more granular partial-line handling.
181        if !self.is_write_vectored() {
182            return match bufs.iter().find(|buf| !buf.is_empty()) {
183                Some(buf) => self.write(buf),
184                None => Ok(0),
185            };
186        }
187
188        // Find the buffer containing the last newline
189        // FIXME: This is overly slow if there are very many bufs and none contain
190        // newlines. e.g. writev() on Linux only writes up to 1024 slices, so
191        // scanning the rest is wasted effort. This makes write_all_vectored()
192        // quadratic.
193        let last_newline_buf_idx = bufs
194            .iter()
195            .enumerate()
196            .rev()
197            .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
198
199        // If there are no new newlines (that is, if this write is less than
200        // one line), just do a regular buffered write
201        let last_newline_buf_idx = match last_newline_buf_idx {
202            // No newlines; just do a normal buffered write
203            None => {
204                self.flush_if_completed_line()?;
205                return self.buffer.write_vectored(bufs);
206            }
207            Some(i) => i,
208        };
209
210        // Flush existing content to prepare for our write
211        self.buffer.flush_buf()?;
212
213        // This is what we're going to try to write directly to the inner
214        // writer. The rest will be buffered, if nothing goes wrong.
215        let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
216
217        // Write `lines` directly to the inner writer. In keeping with the
218        // `write` convention, make at most one attempt to add new (unbuffered)
219        // data. Because this write doesn't touch the BufWriter state directly,
220        // and the buffer is known to be empty, we don't need to worry about
221        // self.panicked here.
222        let flushed = self.inner_mut().write_vectored(lines)?;
223
224        // If inner returns Ok(0), propagate that to the caller without
225        // doing additional buffering; otherwise we're just guaranteeing
226        // an "ErrorKind::WriteZero" later.
227        if flushed == 0 {
228            return Ok(0);
229        }
230
231        // Don't try to reconstruct the exact amount written; just bail
232        // in the event of a partial write
233        let mut lines_len: usize = 0;
234        for buf in lines {
235            // With overlapping/duplicate slices the total length may in theory
236            // exceed usize::MAX
237            lines_len = lines_len.saturating_add(buf.len());
238            if flushed < lines_len {
239                return Ok(flushed);
240            }
241        }
242
243        // Now that the write has succeeded, buffer the rest (or as much of the
244        // rest as possible)
245        let buffered: usize = tail
246            .iter()
247            .filter(|buf| !buf.is_empty())
248            .map(|buf| self.buffer.write_to_buf(buf))
249            .take_while(|&n| n > 0)
250            .sum();
251
252        Ok(flushed + buffered)
253    }
254
255    fn is_write_vectored(&self) -> bool {
256        self.inner().is_write_vectored()
257    }
258
259    /// Writes some data into this BufReader with line buffering.
260    ///
261    /// This means that, if any newlines are present in the data, the data up to
262    /// the last newline is sent directly to the underlying writer, and data
263    /// after it is buffered.
264    ///
265    /// Because this function attempts to send completed lines to the underlying
266    /// writer, it will also flush the existing buffer if it contains any
267    /// newlines, even if the incoming data does not contain any newlines.
268    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
269        match memchr::memrchr(b'\n', buf) {
270            // If there are no new newlines (that is, if this write is less than
271            // one line), just do a regular buffered write (which may flush if
272            // we exceed the inner buffer's size)
273            None => {
274                self.flush_if_completed_line()?;
275                self.buffer.write_all(buf)
276            }
277            Some(newline_idx) => {
278                let (lines, tail) = buf.split_at(newline_idx + 1);
279
280                if self.buffered().is_empty() {
281                    self.inner_mut().write_all(lines)?;
282                } else {
283                    // If there is any buffered data, we add the incoming lines
284                    // to that buffer before flushing, which saves us at least
285                    // one write call. We can't really do this with `write`,
286                    // since we can't do this *and* not suppress errors *and*
287                    // report a consistent state to the caller in a return
288                    // value, but here in write_all it's fine.
289                    self.buffer.write_all(lines)?;
290                    self.buffer.flush_buf()?;
291                }
292
293                self.buffer.write_all(tail)
294            }
295        }
296    }
297}