std/io/buffered/linewritershim.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
use core::slice::memchr;
use crate::io::{self, BufWriter, IoSlice, Write};
/// Private helper struct for implementing the line-buffered writing logic.
///
/// This shim temporarily wraps a BufWriter, and uses its internals to
/// implement a line-buffered writer (specifically by using the internal
/// methods like write_to_buf and flush_buf). In this way, a more
/// efficient abstraction can be created than one that only had access to
/// `write` and `flush`, without needlessly duplicating a lot of the
/// implementation details of BufWriter. This also allows existing
/// `BufWriters` to be temporarily given line-buffering logic; this is what
/// enables Stdout to be alternately in line-buffered or block-buffered mode.
#[derive(Debug)]
pub struct LineWriterShim<'a, W: ?Sized + Write> {
buffer: &'a mut BufWriter<W>,
}
impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
Self { buffer }
}
/// Gets a reference to the inner writer (that is, the writer
/// wrapped by the BufWriter).
fn inner(&self) -> &W {
self.buffer.get_ref()
}
/// Gets a mutable reference to the inner writer (that is, the writer
/// wrapped by the BufWriter). Be careful with this writer, as writes to
/// it will bypass the buffer.
fn inner_mut(&mut self) -> &mut W {
self.buffer.get_mut()
}
/// Gets the content currently buffered in self.buffer
fn buffered(&self) -> &[u8] {
self.buffer.buffer()
}
/// Flushes the buffer iff the last byte is a newline (indicating that an
/// earlier write only succeeded partially, and we want to retry flushing
/// the buffered line before continuing with a subsequent write).
fn flush_if_completed_line(&mut self) -> io::Result<()> {
match self.buffered().last().copied() {
Some(b'\n') => self.buffer.flush_buf(),
_ => Ok(()),
}
}
}
impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
/// Writes some data into this BufReader with line buffering.
///
/// This means that, if any newlines are present in the data, the data up to
/// the last newline is sent directly to the underlying writer, and data
/// after it is buffered. Returns the number of bytes written.
///
/// This function operates on a "best effort basis"; in keeping with the
/// convention of `Write::write`, it makes at most one attempt to write
/// new data to the underlying writer. If that write only reports a partial
/// success, the remaining data will be buffered.
///
/// Because this function attempts to send completed lines to the underlying
/// writer, it will also flush the existing buffer if it ends with a
/// newline, even if the incoming data does not contain any newlines.
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let newline_idx = match memchr::memrchr(b'\n', buf) {
// If there are no new newlines (that is, if this write is less than
// one line), just do a regular buffered write (which may flush if
// we exceed the inner buffer's size)
None => {
self.flush_if_completed_line()?;
return self.buffer.write(buf);
}
// Otherwise, arrange for the lines to be written directly to the
// inner writer.
Some(newline_idx) => newline_idx + 1,
};
// Flush existing content to prepare for our write. We have to do this
// before attempting to write `buf` in order to maintain consistency;
// if we add `buf` to the buffer then try to flush it all at once,
// we're obligated to return Ok(), which would mean suppressing any
// errors that occur during flush.
self.buffer.flush_buf()?;
// This is what we're going to try to write directly to the inner
// writer. The rest will be buffered, if nothing goes wrong.
let lines = &buf[..newline_idx];
// Write `lines` directly to the inner writer. In keeping with the
// `write` convention, make at most one attempt to add new (unbuffered)
// data. Because this write doesn't touch the BufWriter state directly,
// and the buffer is known to be empty, we don't need to worry about
// self.buffer.panicked here.
let flushed = self.inner_mut().write(lines)?;
// If buffer returns Ok(0), propagate that to the caller without
// doing additional buffering; otherwise we're just guaranteeing
// an "ErrorKind::WriteZero" later.
if flushed == 0 {
return Ok(0);
}
// Now that the write has succeeded, buffer the rest (or as much of
// the rest as possible). If there were any unwritten newlines, we
// only buffer out to the last unwritten newline that fits in the
// buffer; this helps prevent flushing partial lines on subsequent
// calls to LineWriterShim::write.
// Handle the cases in order of most-common to least-common, under
// the presumption that most writes succeed in totality, and that most
// writes are smaller than the buffer.
// - Is this a partial line (ie, no newlines left in the unwritten tail)
// - If not, does the data out to the last unwritten newline fit in
// the buffer?
// - If not, scan for the last newline that *does* fit in the buffer
let tail = if flushed >= newline_idx {
let tail = &buf[flushed..];
// Avoid unnecessary short writes by not splitting the remaining
// bytes if they're larger than the buffer.
// They can be written in full by the next call to write.
if tail.len() >= self.buffer.capacity() {
return Ok(flushed);
}
tail
} else if newline_idx - flushed <= self.buffer.capacity() {
&buf[flushed..newline_idx]
} else {
let scan_area = &buf[flushed..];
let scan_area = &scan_area[..self.buffer.capacity()];
match memchr::memrchr(b'\n', scan_area) {
Some(newline_idx) => &scan_area[..newline_idx + 1],
None => scan_area,
}
};
let buffered = self.buffer.write_to_buf(tail);
Ok(flushed + buffered)
}
fn flush(&mut self) -> io::Result<()> {
self.buffer.flush()
}
/// Writes some vectored data into this BufReader with line buffering.
///
/// This means that, if any newlines are present in the data, the data up to
/// and including the buffer containing the last newline is sent directly to
/// the inner writer, and the data after it is buffered. Returns the number
/// of bytes written.
///
/// This function operates on a "best effort basis"; in keeping with the
/// convention of `Write::write`, it makes at most one attempt to write
/// new data to the underlying writer.
///
/// Because this function attempts to send completed lines to the underlying
/// writer, it will also flush the existing buffer if it contains any
/// newlines.
///
/// Because sorting through an array of `IoSlice` can be a bit convoluted,
/// This method differs from write in the following ways:
///
/// - It attempts to write the full content of all the buffers up to and
/// including the one containing the last newline. This means that it
/// may attempt to write a partial line, that buffer has data past the
/// newline.
/// - If the write only reports partial success, it does not attempt to
/// find the precise location of the written bytes and buffer the rest.
///
/// If the underlying vector doesn't support vectored writing, we instead
/// simply write the first non-empty buffer with `write`. This way, we
/// get the benefits of more granular partial-line handling without losing
/// anything in efficiency
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
// If there's no specialized behavior for write_vectored, just use
// write. This has the benefit of more granular partial-line handling.
if !self.is_write_vectored() {
return match bufs.iter().find(|buf| !buf.is_empty()) {
Some(buf) => self.write(buf),
None => Ok(0),
};
}
// Find the buffer containing the last newline
// FIXME: This is overly slow if there are very many bufs and none contain
// newlines. e.g. writev() on Linux only writes up to 1024 slices, so
// scanning the rest is wasted effort. This makes write_all_vectored()
// quadratic.
let last_newline_buf_idx = bufs
.iter()
.enumerate()
.rev()
.find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
// If there are no new newlines (that is, if this write is less than
// one line), just do a regular buffered write
let last_newline_buf_idx = match last_newline_buf_idx {
// No newlines; just do a normal buffered write
None => {
self.flush_if_completed_line()?;
return self.buffer.write_vectored(bufs);
}
Some(i) => i,
};
// Flush existing content to prepare for our write
self.buffer.flush_buf()?;
// This is what we're going to try to write directly to the inner
// writer. The rest will be buffered, if nothing goes wrong.
let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
// Write `lines` directly to the inner writer. In keeping with the
// `write` convention, make at most one attempt to add new (unbuffered)
// data. Because this write doesn't touch the BufWriter state directly,
// and the buffer is known to be empty, we don't need to worry about
// self.panicked here.
let flushed = self.inner_mut().write_vectored(lines)?;
// If inner returns Ok(0), propagate that to the caller without
// doing additional buffering; otherwise we're just guaranteeing
// an "ErrorKind::WriteZero" later.
if flushed == 0 {
return Ok(0);
}
// Don't try to reconstruct the exact amount written; just bail
// in the event of a partial write
let mut lines_len: usize = 0;
for buf in lines {
// With overlapping/duplicate slices the total length may in theory
// exceed usize::MAX
lines_len = lines_len.saturating_add(buf.len());
if flushed < lines_len {
return Ok(flushed);
}
}
// Now that the write has succeeded, buffer the rest (or as much of the
// rest as possible)
let buffered: usize = tail
.iter()
.filter(|buf| !buf.is_empty())
.map(|buf| self.buffer.write_to_buf(buf))
.take_while(|&n| n > 0)
.sum();
Ok(flushed + buffered)
}
fn is_write_vectored(&self) -> bool {
self.inner().is_write_vectored()
}
/// Writes some data into this BufReader with line buffering.
///
/// This means that, if any newlines are present in the data, the data up to
/// the last newline is sent directly to the underlying writer, and data
/// after it is buffered.
///
/// Because this function attempts to send completed lines to the underlying
/// writer, it will also flush the existing buffer if it contains any
/// newlines, even if the incoming data does not contain any newlines.
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match memchr::memrchr(b'\n', buf) {
// If there are no new newlines (that is, if this write is less than
// one line), just do a regular buffered write (which may flush if
// we exceed the inner buffer's size)
None => {
self.flush_if_completed_line()?;
self.buffer.write_all(buf)
}
Some(newline_idx) => {
let (lines, tail) = buf.split_at(newline_idx + 1);
if self.buffered().is_empty() {
self.inner_mut().write_all(lines)?;
} else {
// If there is any buffered data, we add the incoming lines
// to that buffer before flushing, which saves us at least
// one write call. We can't really do this with `write`,
// since we can't do this *and* not suppress errors *and*
// report a consistent state to the caller in a return
// value, but here in write_all it's fine.
self.buffer.write_all(lines)?;
self.buffer.flush_buf()?;
}
self.buffer.write_all(tail)
}
}
}
}