diff --git a/sub_crates/data_tree/src/lib.rs b/sub_crates/data_tree/src/lib.rs index 08ecee6..b949abd 100644 --- a/sub_crates/data_tree/src/lib.rs +++ b/sub_crates/data_tree/src/lib.rs @@ -89,6 +89,7 @@ pub struct Parser { buf_consumed_idx: usize, total_bytes_processed: usize, inner_opens: usize, + eof: bool, } impl Parser { @@ -100,6 +101,7 @@ impl Parser { buf_consumed_idx: 0, total_bytes_processed: 0, inner_opens: 0, + eof: false, } } @@ -113,8 +115,13 @@ impl Parser { } loop { - // Read in new data and make a string from the valid prefix. - let (read_count, valid_count) = self.do_read()?; + // Determine how much of the buffer is valid utf8. + let valid_count = match std::str::from_utf8(&self.buffer[..self.buf_fill_idx]) { + Ok(_) => self.buf_fill_idx, + Err(e) => e.valid_up_to(), + }; + + // Make a str slice out of the valid prefix. let buffer_text = std::str::from_utf8(&self.buffer[..valid_count]).unwrap(); // Try to parse an event from the valid prefix. @@ -157,7 +164,7 @@ impl Parser { EventParse::IncompleteData => { // If we're at the end, it's a problem. // Otherwise, wait for more data. - if read_count == 0 { + if self.eof { return Err(Error::UnexpectedEOF( self.total_bytes_processed + valid_count, )); @@ -186,6 +193,15 @@ impl Parser { )); } } + + // If we couldn't parse a complete event, and if there were + // no errors, read in more data and loop back to try again. + if !self.eof { + let (read_count, _valid_count) = self.do_read()?; + if read_count == 0 { + self.eof = true; + } + } } }