The One Billion Row Challenge in Rust: Part 1
Problem statement
Hello, dear reader,
I spent last couple of days on solving the 1 Billion Row Challenge in Rust programming language. The challenge is simple
Your mission, should you choose to accept it, is to write a program that retrieves temperature measurement values from a text file and calculates the min, mean, and max temperature per weather station. There's just one caveat: the file has 1,000,000,000 rows! That's more than 10 GB of data!
The challenge has important constraints that give us flexibility in our implementation:
Station name: non null UTF-8 string of min length 1 character and max length 100 bytes (i.e. this could be 100 one-byte characters, or 50 two-byte characters, etc.)
Temperature value: non null double between -99.9 (inclusive) and 99.9 (inclusive), always with one fractional digit;
Runtime numbers you will see in the series are done in the following environment:
48 Intel vCPUs / 96 GB Memory / 600 GB Disk, dedicated CPU-optimized DigitalOcean instance with Premium Intel
CPU, c-48-intelUbuntu 24.04 LTS (GNU/Linux 6.8.0-31-generic x86_64)
Rust compiler rustc 1.78.0 (9b00956e5 2024-04-29), x86_64-unknown-linux-gnu, LLVM version: 18.1.2
Java 21.0.3 2024-04-16 LTS, Java HotSpot(TM) 64-Bit Server VM Oracle GraalVM 21.0.3+7.1 (build
21.0.3+7-LTS-jvmci-23.1-b37, mixed mode, sharing)File with 1 billion measurements that is used as an input is stored in tmpfs
To reproduce use scripts/run_benchmark.sh and scripts/run_benchmark_original.sh for reference implementation.
Reference implementation
The reference implementation of the challenge in Rust can be found at tumdum/1brc. The results are obtained by running scripts/run_benchmark_original.sh. It slightly modifies the original code to allow passing number of cores and also makes sure the project is built with native CPU support (-C target-cpu=native
) to get max performance.
It uses:
Memory-mapped file via memmap
memchr for string search
fast-float to parse string to floats
bstr to represent strings
rayon for multi-threading
rustc-hash fast, non-cryptographic hash used by rustc and Firefox
Can we make faster implementation with less dependencies? Let's try 🚀
Naïve implementation
Let's start with a naïve implementation that keeps parsing next line until it reaches the end of file (EOF). We can use BufReader to read a file line by line, code for naive_line_by_line0
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
/// Reads from provided buffered reader line by line, finds station name and
/// temperature and calls processor with found byte slices.
///
/// This is a naive implementation used by [naive_line_by_line_dummy] and
/// [naive_line_by_line]
fn naive_line_by_line0<R: Read + Seek, F>(
mut rdr: BufReader<R>,
mut processor: F,
start: u64,
end_inclusive: u64,
) where
F: FnMut(&[u8], &[u8]),
{
let mut offset: usize = start as usize;
rdr.seek(SeekFrom::Start(start)).unwrap();
// We actually need 100 + 1 (';') + 5 ("-99.9") + 1 ('\n') = 107
const MAX_LINE_LENGTH_IN_BYTES: usize = 108;
let mut s: String = String::with_capacity(MAX_LINE_LENGTH_IN_BYTES);
while offset <= end_inclusive as usize {
let read_bytes = rdr.read_line(&mut s).expect("Unable to read line");
// Check whether we reached EOF
if read_bytes == 0 {
break;
}
offset += read_bytes;
let slice = s.as_bytes();
let mut idx: usize = 0;
// Find station name
while idx < s.len() && slice[idx] != b';' {
idx += 1
}
let name = &slice[0..idx];
// The remaining bytes are for temperature
// We need to subtract 1 from read_bytes because `read_line` includes delimiter
// as well
let value = &slice[idx + 1..read_bytes - 1];
// Call processor to handle the temperature for the station
processor(name, value);
// Clear the buffer to make sure next read won't have data from previous read
s.clear();
}
}
Notes on the code above:
It has generic type
R
that must implement std::io::Read and std::io::Seek. This way I can pass anything that implements that trait, for example, std::fs::File or std::io::Cursor , it simplifies unit testing and benchmarking. std::io::Seek is required when we parallelize our solution to use multi-threading;It has generic type
F
forprocessor
that is a user defined closure, it is FnMut so that provided closure can be called repeatedly and may mutate state.
Providing processor
as a closure gives me flexibility to write different implementation using naive_line_by_line0
as core. I'll be using similar approach for all other implementations.
To understand how fast an implementation can be I'll create set of _dummy
implementations that will do simple operation on parsed data, here is the one, naive_line_by_line_dummy
/// Reads from provided buffered reader station name and temperature and simply
/// accumulates some dummy value.
///
/// This method helps us to understand what is the maximum possible throughput
/// in case of running very simple operation on parsed data.
pub fn naive_line_by_line_dummy<R: Read + Seek>(
rdr: BufReader<R>,
start: u64,
end_inclusive: u64,
_should_sort: bool,
) -> Vec<(String, StateF)> {
let mut dummy_result: usize = 0;
naive_line_by_line0(
rdr,
|name: &[u8], t: &[u8]| {
dummy_result += name.len() + t.len();
},
start,
end_inclusive,
);
let mut s = StateF::default();
s.count = dummy_result as u32;
vec![("dummy".to_string(), s)]
}
And the actual implementation naive_line_by_line
const DEFAULT_HASHMAP_CAPACITY: usize = 10000;
/// Converts a slice of bytes to a string slice.
#[inline]
pub fn byte_to_string(bytes: &[u8]) -> &str {
std::str::from_utf8(bytes).unwrap()
}
/// Converts a string in base 10 to a float.
#[inline]
pub fn parse_f64(s: &str) -> f64 {
f64::from_str(s).unwrap()
}
pub fn sort_result(all: &mut Vec<(String, StateF)>) {
all.sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
/// Reads from provided buffered reader station name and temperature and
/// aggregates temperature per station.
///
/// The method uses [`byte_to_string`], [`parse_f64`] and
/// [`std::collections::HashMap`] from standard library.
pub fn naive_line_by_line<R: Read + Seek>(
rdr: BufReader<R>,
start: u64,
end_inclusive: u64,
should_sort: bool,
) -> Vec<(String, StateF)> {
let mut hs = std::collections::HashMap::with_capacity(DEFAULT_HASHMAP_CAPACITY);
naive_line_by_line0(
rdr,
|name: &[u8], t: &[u8]| {
// Convert bytes to str
let station_name: &str = byte_to_string(name);
let measurement: &str = byte_to_string(t);
// Parse measurement as f64
let value = parse_f64(measurement);
// Insert new state or update existing
match hs.get_mut(station_name) {
None => {
let mut s = StateF::default();
s.update(value);
hs.insert(station_name.to_string(), s);
},
Some(prev) => prev.update(value),
}
},
start,
end_inclusive,
);
let mut all: Vec<(String, StateF)> = hs.into_iter().collect();
if should_sort {
sort_result(&mut all);
}
all
}
Let's run the benchmark to see the runtime:
OK, it is slow, 2.13 times slower than the reference implementation in single-threaded mode. naive_line_by_line_dummy
tells how fast (theoretically) can be that approach. How can we make it faster? Steps we can take to improve performance:
We know from the rules that station name is a non null UTF-8 string of min length 1 character and max length 100 bytes, this allows us to skip UTF-8 validation when constructing station name, we can use str::from_utf8_unchecked;
Another obvious optimization comes from another rule for temperature value - a non null double between -99.9 (inclusive) and 99.9 (inclusive), always with one fractional digit, this allows us to use scaled integers from [-999, 999] instead of f64. Parsing and manipulating integer numbers are faster than floats;
Replace HashMap from standard library with the one from rustc-hash, all we care here is speed, not cryptographic property of hash function.
An improved version naive_line_by_line_v2
/// Converts a slice of bytes to a string slice without checking that the string
/// contains valid UTF-8.
#[inline]
pub const fn byte_to_string_unsafe(bytes: &[u8]) -> &str {
unsafe { std::str::from_utf8_unchecked(bytes) }
}
/// Converts byte to a digit
#[inline]
const fn get_digit(b: u8) -> u32 {
(b as u32).wrapping_sub('0' as u32)
}
/// Converts a float number in the range [-99.9, 99.9] with step 0.1 provided as
/// bytes of str to a scaled i32 value [-999, 999]
///
/// "0.0" -> 0
/// "-99.9" -> -999
/// "99.9" -> 999
#[inline]
pub const fn to_scaled_integer(bytes: &[u8]) -> i16 {
let is_negative = bytes[0] == b'-';
let as_decimal = match (is_negative, bytes.len()) {
(true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]),
(true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]),
(false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]),
(false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]),
_x => panic!(),
};
if is_negative {
-(as_decimal as i16)
} else {
as_decimal as i16
}
}
//// Reads from provided buffered reader station name and temperature and
/// aggregates temperature per station.
///
/// The method relies on [`naive_line_by_line0`] but uses
/// [`byte_to_string_unsafe`], aggregates data in [`StateI`] and uses
/// [`rustc_hash::FxHashMap`]
pub fn naive_line_by_line_v2<R: Read + Seek>(
rdr: BufReader<R>,
start: u64,
end_inclusive: u64,
should_sort: bool,
) -> Vec<(String, StateF)> {
let mut hs: FxHashMap<String, StateI> =
FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_CAPACITY, Default::default());
naive_line_by_line0(
rdr,
|name: &[u8], t: &[u8]| {
let station_name: &str = byte_to_string_unsafe(name);
let value = to_scaled_integer(t);
match hs.get_mut(station_name) {
None => {
let mut s = StateI::new(value);
s.update(value);
hs.insert(station_name.to_string(), s);
},
Some(prev) => prev.update(value),
}
},
start,
end_inclusive,
);
let mut all: Vec<(String, StateF)> = hs
.into_iter()
.map(|(k, v)| (k.clone(), v.to_f64()))
.collect();
if should_sort {
sort_result(&mut all);
}
all
}
Let's run the benchmark for naive_line_by_line_v2
and put all the runtimes into a single table.
It got 1.57 times faster than the first version, however it is still slower than the reference implementation. It is clear that relying on BufReader to read a file line by line is slow.
In the next part we will further improve the performance of our implementation and make it 2x faster than the reference implementation in single-threaded mode!