Rust vs Python. Problem statement & Python impl
Hello, dear readers,
This is my first blog post (and hope not the last 😅). In the series of posts I want to share on the findings when I ported existing system from Python to Rust that gave the following benefits:
Reduced the cost from 70K USD per quarter to ~40 USD, that's right, 1400 times cost reduction;
Increased the success rate of the pipeline from 85% to 99.88%;
And reduced the time required for data to become available from 10 hours to less than 15 minutes).
Problem statement
On high level the system takes data in msgpack format and converts it to Apache Parquet format. The legacy system was built long ago and didn't expect to process 100x increase in the data size. Legacy system would read all the data into memory, convert all to JSON in memory, write JSON to file and use PySpark to read that huge JSON file and then write it to Parquet. As the input data increases, it requires more and more RAM, eventually requiring instances with 384 GiB of RAM, quite expensive one!
To reduce required memory we should make two important improvements:
Move away from reading everything into memory first and then processing, to streaming fashion, an approach that does not require all the data to fit into memory;
Get rid of intermediate JSON format. It is text-based and inefficient when dealing with large data. Both msgpack and Parquet are binary formats so one can convert between them directly!
Let's implement that in Python and Rust and see the difference in performance! Here I will focus on only benchmarking the part that reads/parses msgpack file. msgpack will contain 10000 messages generated randomly with provided seed, a single message is a list of 8 elements. Python code that represents that
from typing import Any, Self, final
from dataclasses import dataclass
@dataclass
@final
class Item:
id: int
process_id: int
thread_id: int
timestamp_ns: int
line: int
value: float
filename: str
path: str
@classmethod
def from_list(cls, items: list[Any]) -> Self:
return Item(
items[0],
items[1],
items[2],
items[3],
items[4],
items[5],
items[6],
items[7],
)
def item_to_list(item: Item) -> list:
return [
item.id,
item.process_id,
item.thread_id,
item.timestamp_ns,
item.line,
item.value,
item.filename,
item.path,
]
The data for benchmarking will be read into memory to reduce the effects of IO on the benchmark, e.g. we do not care how fast is IO-subsystem, we rather want to see how fast is language/compiler/interpreter.
Python
I like Python as a language to do quick POC, or as a language to poke the data, plot graphs, quick ML-related experiments (hello, jupyter notebook), however I prefer to avoid it for production systems due to the following limitations:
Interpreted nature. Right, JIT landed Python 3.13 , however it is very basic and will take quite some time to get to the state of art JITs like one in Java (will it even be possible? Python does not want to have slow start-up time)
Dynamic typing. Any large refactoring is a huge pain, requires a lot of unit tests to accidentally not break anything. Right, there is MyPy , but why don't go with statically typed languages in the first place? :). It bites you unexpectedly in places like BUG: Behavior changes when input changes from scalar np.float32 to ndarray of np.float32 #23085
Python C API limitations, it is forward- and backwards-compatible across a minor release (if these are compiled the same way; see Platform Considerations below). So, code compiled for Python 3.10.0 will work on 3.10.8 and vice versa, but will need to be compiled separately for 3.9.x and 3.11.x. The implication in case of multiple Python versions in your org you will need to compile and publish separate packages;
GIL that simplifies programming in multi-threaded environment, however it prevents CPU-bound multi-threading programs from taking full advantage of multiprocessor systems in certain situations. This is not the case for IO-bound multi-threading because it happens outside the GIL. One can use multi-processing, but be prepared for deadlocks if you use process forking instead of spawning (forking a process is faster than spawning). By the way, starting Python 3.14 the default way of creating process will be spawn for POSIX systems. Read more on the topic
Too many different package managers, linters and etc. Which one to choose? :)
I'm going to build Python version using several tools and libraries:
Poetry as the build system, Poetry tries to bring development experience close to cargo, still not the same.
pytest for unit testing
Coverage.py to measure code coverage
mypy for static type checking
ruff for code formatting
pyperf for benchmarking
Full code with project for Python version is located at msgpack-py-vs-rs/python/msgpack-reader-py.
Let's define how we want to use the reader from the caller side:
def identity(n: any) -> any:
return n
msgs: int = 0
with StreamingReader(file_path_or_io, mapper=identity) as rdr:
for _ in rdr:
msgs += 1
print(f"Read {msgs} messages")
It should have a constructor that accepts:
file_path or io.BytesIO;
a callable mapper to be applied on next read data.
It should also implement context manager (with
semantic) and behave like iterator so one can use it in for loop.
import io
import logging
from typing import Callable, TypeVar, Generic, BinaryIO, Any, Self, Optional
from msgpack import Unpacker
logger = logging.getLogger(__name__)
T = TypeVar("T")
class StreamingReader(Generic[T]):
file_path_or_binary: str | BinaryIO
def __init__(
self,
file_path_or_binary: str | BinaryIO,
mapper: Callable[[Any], T],
) -> None:
self.file_path_or_binary = file_path_or_binary
self.mapper = mapper
def __enter__(self) -> Self:
# Open provided `self.file_path_or_binary` as a file if it is string, meaning path to a file
if isinstance(self.file_path_or_binary, str):
self.file: Optional[BinaryIO] = open(self.file_path_or_binary, "rb")
elif isinstance(self.file_path_or_binary, io.BytesIO):
# if it is io.BytesIO, use it as file as well, it follows file's API
self.file = self.file_path_or_binary
else:
raise ValueError("file_path_or_binary must be str or io.BytesIO")
self.unpacker = Unpacker(self.file, raw=False)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
try:
if self.file:
self.file.close()
except Exception as ex:
logger.exception(f"Error when closing {self.file_path_or_binary}", ex)
finally:
self.file = None
def __iter__(self) -> Self:
return self
def __next__(self) -> T:
# Read next
unpacked = self.unpacker.next()
# Apply mapper to convert it to the type T
mapped = self.mapper(unpacked)
return mapped
Let's write a benchmark for that code using pyperf
#!/usr/bin/env python3
import io
import tempfile
import pyperf
from msgpack_reader_py.generator import generate
from msgpack_reader_py.model import Item
from msgpack_reader_py.streaming_reader import StreamingReader
def in_memory_stream_benchmark(msgpack_binary: bytes) -> int:
binary_io = io.BytesIO(msgpack_binary)
msgs = 0
with StreamingReader(binary_io, mapper=Item.from_list) as rdr:
for _ in rdr:
msgs += 1
return msgs
messages = 10000
seed = 42
with tempfile.NamedTemporaryFile(delete_on_close=False) as fp:
# Generate random data and write it to file
generate(path=fp.name, messages=messages, seed=seed)
fp.close()
# Read the data back into memory
with open(fp.name, "rb") as fh:
read_msgpack_bytes = fh.read()
# Create runnner and benchmark `in_memory_stream_benchmark` with read data
runner = pyperf.Runner()
runner.bench_func(
"in_memory_stream_benchmark", in_memory_stream_benchmark, read_msgpack_bytes
)
Benchmark results
MAD is Median absolute deviation
SD is Standard deviation
Interesting observation, Python 3.12.3 is 35% slower on Windows 10 than on Ubuntu 24.04 running on the same Windows 10 under WSL2! Likely memory allocator, don't think MSVC compiler can be so wrong.
In the next part of this series I'll go over ported version in Rust and its performance (hint: it is 35x times faster!). Stay tuned!
Comments and suggestions are welcome! Thank you for your time.