M3DB is a time series database that was primarily designed to be horizontally scalable and able to handle high data throughput.
One of M3DB’s biggest strengths as a time series database (as opposed to using a more general-purpose horizontally scalable, distributed database like Cassandra) is its ability to compress time series data resulting in huge memory and disk savings. There are two compression algorithms used in M3DB: M3TSZ and protobuf encoding.
M3TSZ is used when values are floats. A variant of the streaming time series compression algorithm described in Facebook’s Gorilla paper, it achieves a high compression ratio. The compression ratio will vary depending on the workload and configuration, but we found that we were able to achieve a compression ratio of 1.45 bytes/datapoint with Uber’s production workloads. This was a 40% improvement over standard TSZ, which only gave us a compression ratio of 2.42 bytes/datapoint under the same conditions.
For more complex value types, M3DB also supports generic Protobuf messages with a few exceptions. The algorithm takes on a hybrid approach and uses different compression schemes depending on the field types within the Protobuf message.
Details on the encoding, marshaling and unmarshaling methods can be read here.
M3DB is a persistent database with durable storage, but it is best understood via the boundary between its in-memory object layout and on-disk representations.
┌────────────────────────────────────────────────────────────┐ │ Database │ ├────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────────────┐ │ │ │ Namespaces │ │ │ ├────────────────────────────────────────────────────┤ │ │ │ │ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ Shards │ │ │ │ │ ├────────────────────────────────────────────┤ │ │ │ │ │ │ │ │ │ │ │ ┌────────────────────────────────────┐ │ │ │ │ │ │ │ Series │ │ │ │ │ │ │ ├────────────────────────────────────┤ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌────────────────────────────┐ │ │ │ │ │ │ │ │ │ Buffer │ │ │ │ │ │ │ │ │ └────────────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌────────────────────────────┐ │ │ │ │ │ │ │ │ │ Cached blocks │ │ │ │ │ │ │ │ │ └────────────────────────────┘ │ │ │ │ │ │ │ │ ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └────────────────────────────────────┘ │ │ │ │ │ │ ... │ │ │ │ │ │ │ │ │ │ │ └────────────────────────────────────────────┘ │ │ │ │ ... │ │ │ │ │ │ │ └────────────────────────────────────────────────────┘ │ │ ... │ │ │ └────────────────────────────────────────────────────────────┘
The in-memory portion of M3DB is implemented via a hierarchy of objects:
databaseof which there is only one per M3DB process. The
namespaceis similar to a table in other databases. Each
namespacehas a unique name and a set of configuration options, such as data retention and block size (which we will discuss in more detail later). A namespace owns multiple
shardis effectively the same as a “virtual shard” in Cassandra in that it provides an arbitrary distribution of time series data via a simple hash of the series ID. A shard owns multiple
seriesrepresents a sequence of time series datapoints. For example, the CPU utilization for a host could be represented as a series with the ID “host1.system.cpu.utilization” and a vector of (TIMESTAMP, CPU_LEVEL) tuples. Visualizing this example in a graph, there would a single line with time on the x-axis and CPU utilization on the y-axis. A
bufferand any cached
bufferis where all data that has yet to be written to disk gets stored in memory. This includes both new writes to M3DB and data obtained through bootstrapping. More details on the buffer is explained below. Upon flushing, the buffer creates a
blockof its data to be persisted to disk.
blockrepresents a stream of compressed time series data for a pre-configured block size, for example, a block could hold data for 6-8PM (block size of two hours). A
blockcan arrive directly into the series only as a result of getting cached after a read request. Since blocks are in a compressed format, individual datapoints cannot be read from it. In other words, in order to read a single datapoint, the entire block up to that datapoint needs to be decompressed beforehand.
While in-memory databases can be useful (and M3DB supports operating in a memory-only mode), some form of persistence is required for durability. In other words, without a persistence strategy, it would be impossible for M3DB to restart (or recover from a crash) without losing all of its data.
In addition, with large volumes of data, it becomes prohibitively expensive to keep all of the data in memory. This is especially true for monitoring workloads which often follow a “write-once, read-never” pattern where less than a few percent of all the data that’s stored is ever read. With that type of workload, it’s wasteful to keep all of that data in memory when it could be persisted on disk and retrieved when required.
The block size parameter is the most important variable that needs to be tuned for a particular workload. A small block size will mean more frequent flushing and a smaller memory footprint for the data that is being actively compressed, but it will also reduce the compression ratio and data will take up more space on disk.
If the database is stopped for any reason in between flushes, then when the node is started back up those writes will be recovered by reading the commit log or streaming in the data from a peer responsible for the same shard (if the replication factor is larger than one).
While the fileset files are designed to support efficient data retrieval via the series ID, there is still a heavy cost associated with any query that has to retrieve data from disk because going to disk is always much slower than accessing main memory. To compensate for that, M3DB supports various caching policies which can significantly improve the performance of reads by caching data in memory.
We now have enough context of M3DB’s architecture to discuss the lifecycle of a write. A write begins when an M3DB client calls the
writeBatchRaw endpoint on M3DB’s embedded thrift server. The write itself will contain the following information:
M3DB will consult the database object to check if the namespace exists, and if it does, then it will hash the series ID to determine which shard it belongs to. If the node receiving the write owns that shard, then it will lookup the series in the shard object. If the series exists, then an encoder in the buffer will encode the datapoint into the compressed stream. If the encoder doesn’t exist (no writes for this series have occurred yet as part of this block) then a new encoder will be allocated and it will begin a compressed M3TSZ stream with that datapoint. There is also some additional logic for handling multiple encoders and filesets which is discussed in the buffer section.
At the same time, the write will be appended to the commit log, which is periodically compacted via a snapshot process. Details of this is outlined in the commit log page.
Note: Regardless of the success or failure of the write in a single node, the client will return a success or failure to the caller for the write based on the configured consistency level.
M3DB will consult the database object to check if the namespace exists, and if it does, it will hash the series ID to determine which shard it belongs to. If the node receiving the read owns that shard, then M3DB needs to determine two things:
Determining whether the series exists is simple. M3DB looks up the series in the shard object. If it exists, then the series exists. If it doesn’t, then M3DB consults in-memory bloom filters(s) for all shard/block start combinations(s) that overlap the query range to determine if the series exists on disk.
If the series exists, then for every block that the request spans, M3DB needs to consolidate data from the buffer, in-memory cache, and fileset files (disk).
Let’s imagine a read for a given series that requests the last 6 hours worth of data, and an M3DB namespace that is configured with a block size of 2 hours, i.e. we need to find 3 different blocks.
If the current time is 8PM, then the location of the requested blocks might be as follows:
[2PM - 4PM (fileset file)] - Flushed block that isn't cached [4PM - 6PM (in-memory cache)] - Flushed block that is cached [4PM - 6PM (cold write in active buffer)] - Cold write that hasn't been flushed yet [6PM - 8PM (active buffer)] - Hasn't been flushed yet
Then M3DB will need to consolidate:
Retrieving blocks from the buffer and in-memory cache is simple, the data is already present in memory and easily accessible via hashmaps keyed by series ID. Retrieving a block from disk is more complicated. The flow for retrieving a block from disk is as follows:
index_lookup.gofile for implementation details.
Once M3DB has retrieved the three blocks from their respective locations in memory / on-disk, it will transmit all of the data back to the client. Whether or not the client returns a success to the caller for the read is dependent on the configured consistency level.
Note: Since M3DB nodes return compressed blocks (the M3DB client decompresses them), it’s not possible to return “partial results” for a given block. If any portion of a read request spans a given block, then that block in its entirety must be transmitted back to the client. In practice, this ends up being not much of an issue because of the high compression ratio that M3DB is able to achieve.
Each series object contains a buffer, which is in charge of handling all data that has yet to be flushed - new writes and bootstrapped data. To accomplish this, it keeps mutable “buckets” of encoders (for new writes) and immutable blocks (for bootstrapped data). M3TSZ, the database’s encoding scheme, is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For metrics workloads this works very well because every subsequent datapoint is almost always after the previous one. However, out of order writes will occasionally be received, for example due to clock skew. When this happens, M3DB will allocate a new encoder for the out of order datapoints. These encoders are contained in a bucket along with any blocks that got bootstrapped.
Upon a flush (discussed further below), all data within a bucket gets merged and its version gets incremented - the specific version it gets set to depends on the number of times this block has previously been flushed. This bucket versioning allows the buffer to know which data has been flushed so that subsequent flushes will not try to flush it again. It also indicates to the clean up process (also discussed below) that that data can be evicted.
Given this complex, concurrent logic, this has been modeled in TLA.
┌─────────────────────────┐ │ Buffer │ ├─────────────────────────┤ │ │ │ ┌─────────────────┐ │ │ │ 2-4PM buckets │ │ │ └─────────────────┘ │ │ │ │ ┌─────────────────┐ │ ┌────│───│ 4-6PM buckets │ | │ │ └─────────────────┘ │ │ │ │ │ │ ... │ │ └─────────────────────────┘ │ │ v After flush: ┌─────────────────────┐ ┌─────────────────────┐ │ 4-6PM buckets │ │ 4-6PM buckets │ ├─────────────────────┤ ├─────────────────────┤ │ │ │ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ │ Bucket v0 │<--│--writes │ │ Bucket v3 │ │ │ └─────────────┘ │ │ └─────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─────────────────────┘ └─────────────────────┘ More writes after flush: After clean up: ┌─────────────────────┐ ┌─────────────────────┐ │ 4-6PM buckets │ │ 4-6PM buckets │ ├─────────────────────┤ ├─────────────────────┤ │ │ │ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ │ Bucket v3 │ │ │ │ Bucket v0 │<--│--writes │ └─────────────┘ │ │ └─────────────┘ │ │ │ │ │ │ ┌─────────────┐ │ │ │ │ │ Bucket v0 │<--│--writes │ │ │ └─────────────┘ │ │ │ │ │ │ │ └─────────────────────┘ └─────────────────────┘
M3DB has a variety of processes that run in the background during normal operation.
As discussed in the architecture section, writes are actively buffered / compressed in memory and the commit log is continuously being written to, but eventually data needs to be flushed to disk in the form of fileset files to facilitate efficient storage and retrieval.
This is where the configurable “block size” comes into play. The block size is simply a duration of time that dictates how long new writes will be compressed (in a streaming manner) in memory before being flushed to disk. Let’s use a block size of two hours as an example.
If the block size is set to two hours, then all writes for all series for a given shard will be buffered in memory for two hours at a time. At the end of the two hour period all of the fileset files will be generated, written to disk, and then the in-memory objects can be released and replaced with new ones for the new block. The old objects will be removed from memory in the subsequent tick.
If a flush happens for a namespace/shard/series/block for which there is already a fileset, in-memory data will get merged with data on disk from the fileset. The resultant merged data will then be flushed as a separate fileset.
The ticking process runs continously in the background and is responsible for a variety of tasks:
If there are multiple encoders for a block, they need to be merged before flushing the data to disk. To prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background.
Depending on the configured caching policy, the in-memory object layout can end up with references to series or data blocks that are expired (have fallen out of the retention period) or no longer needed to be in memory (due to the data being flushed to disk or no longer needing to be cached). The background tick will identify these structures and release them from memory.
Fileset files can become no longer necessary for two reasons:
During the clean up process, these fileset files will get deleted.