Review of "Kubernetes: Open Source Container Cluster Orchestration"

Kubernetes is a cluster management system built as a successor of Borg.

A pod is the unit of scheduling in Kubernetes. It is a resource envelope in which one or more containers can run. Which ensures that all containers will be scheduled onto the same machine. This enables deploying multiple co-located cooperating processes in a pod without having to give up the simplicity of "one-application-per-container" deployment model. Kubernetes supports naming and load-balancing using the service abstraction. A service has a name and maps to a dynamic set of pods defined by a label selector. Any service container in the cluster can connect to the service using the service name. Under the covers, Kubernetes automatically load-balances connections to service among the pods that match a label selector. Labels are used to tag service names, service instances (production, staging, test), and in general any subset of the pods. A label selector is used to select which set of pods an operation should be applied to. Together with replication controllers, labels allow for very flexible update semantics. Another feature of Kubernetes is that it provides an IP for a Pod, even pods residing on the same physical machine, sharing the same NIC. This is attributed to software defined overlay networks such as flannel or those built into public clouds.

Will this project be influential in 10 years? I think so. With clustering and distributed computing becoming more and more relevant as the information becomes more and more centralized, thus requires challenging computation power, how to efficiently manage all these distributed components will always be an interest for the industry.

Review of "The Case for RAMClouds: Scalable High-Performance Storage Entirely in DRAM"

People find it more and more difficult to scale disk-based systems to meet the needs of large scale Web applications. In this paper, they proposed to use RAM replace the disks, and only use disks for backup and archival role. This is what RAMCloud is built on. It is a general purpose distributed hash table storage system. All data in this system will reside in DRAM. It can scale to 1000+ servers, 100+ TB data. Since all data resides in the memory, it provides only 5-10us remote RAM access latency. Although individual RAM data is volatile, a RAMCloud can use replication and backup techniques to provide data durability and availability.

RAMCloud consists of 1000-10,000 storage servers, each has a master and backup component. Master component handles access requests whereas backup process handles persistence issues. It also contains a single coordinator which might be a single point of failure and brings scalability issues. But the design tries to make this coordinator as less loaded as possible, and they also have a standby coordinator ready to take over the work when the live one crashes.

RAMCloud keeps only 1 copy in DRAM, and keeps backup copies on disk/flash which gives almost free durability. However this approach brings extra challenge to synchronizing memory and disk. The way RAMCloud handles this is when a write request comes in, there's no disk IO, instead the master sends the update to several backups, once it receives the conformation from backups, it can be sure that this update is successful. The backups will then batch up writes and flush them onto disk when it has collected a 8MB "segment". The flush will occur even when power failure happens by using techniques such as power backup.

RAMCloud can recover very quickly. Master's data are divided into partitions. When a master crashes, the coordinator picks several new recovery masters, and each read the distributed backups and recover the data into the new memory. With this parallel recovery, it can recover 64GB in 1-2 seconds.

Will this paper be influential in 10 years? Yes, I think so. It has been a trend of loading more and more data into memory. All the giant Internet services in the world uses complex caching mechanism to overcome the speed lag of disks. With RAMCloud, you don't need to think about cache anymore, because everything is in memory already. And it solves the issue of data persistence and recovery by using smart partition and backup mechanism.

Review of "FaRM: Fast Remote Memory"

Traditionally, distributed computing platforms use TCP/IP to build distributed main memory systems. As the price of DRAM dropping, people can put all data inside memory for fast access without too much cost, but for distributed systems, the network communication remains a bottleneck. The platform FaRM which is built on RDMA improves both latency and throughput by an order of magnitude on the same network hardware.

Through a set of simple APIs, FaRM uses RDMA both to directly access data in the shared address space and for fast messaging. RDMA provides reliable user-level reads and writes of remote memory and has the advantage of low latency and high throughput because it bypasses the kernel and avoids overheads of complex protocol stacks, it can transmit data without the cooperation of CPU. Thus, built on top of RDMA with some optimization of the NIC driver, FaRM can be much more efficient than traditional TCP/IP based distributed memory system.

FaRM uses a circular buffer to implement a unidirectional channel. This buffer is stored on the receiver, and one sender/receiver pair uses one buffer. Unused buffers are all zeros. At the beginning of messaging, the receiver polls the word at the "Head" position to detect new messages, value in the "Header" position indicates the length of the message, then receiver uses this length information to locate the trailer position and polls it for non-zero value, once a non-zero value is detected, the entire message has been received. It then passes this message to the application layer, once consumed, the buffer is reset to the start.

FaRM provides a set of event-based API, which gives higher scalability than thread concurrency model. It provides strictly serializable ACID transactions mechanisms to help application maintain consistency based on optimistic concurrency control and two-phase commit. When performance is more of a concern, it also provides lock free read-only operations that are serializable with transactions. Replicated logging is used to high availability and strict serializability.

Object locations in the memory cluster is maintained by a consistent hashing which maps a 32-bit region identifier to a physical machine. To access the object, FaRM either do a RDMA request using the offset and object size or do a local memory access if the object is stored locally which is much faster than RDMA.

Will the technology be influential in 10 years? Maybe. It proposes a more efficient way of building distributed memory systems, which achieves an order of magnitude better throughput and latency than a similar system built on top of TCP/IP on the same physical network. But it would be better if it also provides transparent support for applications without modifications on them.

Review of "IOFlow: A Software-Defined Storage Architecture"

In data centers, IO path to storage is usually long and complex which makes it hard to enforce end-to-end policies that dictates performance and routing. These policies require differentiation of IOs along the path and global visibility at the control plane. IOFlow is an architecture which enables this kind of policies. It does so by decoupling control of IO flows from data plane by introducing programmable data plane queues that allows for flexible service and routing properties. It borrows the design concept of SDN's decoupling of control and data plane.

In an IOFlow system there's a logically centralized data center controller discovers and interacts with the stages in servers across the data center to maintain a topology graph. Stages implement traffic differentiation through queues. Queuing rules, created by controller, are used by stages to map IO requests to queues. Each queues have different configurations like bandwidth limit or next-hop, thus enforcing end-to-end performance policies or routing policies.

One interesting aspect of this architecture is the single controller program, which might bring single point of failure. I guess it makes the the implementation much easier not to think about fault-tolerant here. The solution of IOFlow is, in the face of a controller failure. All other control program will fall-back to a reasonable handling of IO requests, for example, for malware scanning application, it'll fall back to scan all traffic instead of doing it selectively.

IOFlow is implemented on top of Windows-based IO stack with two kernel drivers that intercept storage IO traffic and each serves as an IOFlow stage – one storage driver on top of SMBc driver and one storage driver below SMBs driver. This makes IOFlow be able to bring benefit to applications without any modification to the application code or the VM code.

Will this paper be influential in 10 years? Maybe. The separation of control and data plane borrowed from SDN is really necessary given the complexity of the current data centers. In an era of cloud computing, requirements of end-to-end policies enforcement will certainly grow. But the paper says evaluation on large data centers is delayed for future work, so let's stay tuned.

Review of "Flat Datacenter Storage"

DFS is a super fast, fault-tolerant blob storage system on CLOS network topology for data centers.

Because bandwidth is limited (or network is over subscribed), remote data access is slow. Even if you have a big bandwidth on the edge (within a rack), on the root (or core switches), the bandwidth shared by a server resides on the edge is small, so if a computer wants to talk to a "remote" host, it'll be very slow. Because of this, previous distributed file systems like GFS, they enforces applications built on it to think about locality which adds a layer complexity.

But when affordable CLOS topology ((small commodity switches + equal cost multi-path routing (load balancing traffic by using multiple best path to transmit data, can increase bandwidth a lot))) came out, suddenly the assumption that remote communication is slow is changed. With full bandwidth bisection topology, there won't be local/remote disk distinction and you can have simpler programming models.

In FDS, data is logically stored in blobs. Which has a 128-bit GUID and it can be application generated or system generated. Reads and writes to blobs are done in units called tracts. Tracts are sized at 8MB. Tract servers manage raw disks (no filesystem), they serve read and write requests from clients. Tract server calls are asynchronous, so you can allow deep pipelining, like tcp sliding window.

It uses a deterministic data placement mechanism called Tract Locator table. The index of the of the tract number i of blob with GUID g is calculated using Tract Locator = (Hash(g) + i) mod len(TLT). Blob metadata is also distributed, located at TLT[(Hash(g) - 1) % len(TLT)] which is specifically reserved for it. One of the benefit of using hash instead of keeping states is has provides satisfying distribution without incurring the complexity brought by states bookkeeping.

TLT is built by doing m permutations on a list of tractservers to fully distribute read and write requests to different disks. TLT is served by a light weight meta server (still has single point of failure?) to clients. TLT only needs to be updated when cluster changes, which is not a very frequent event in the assumption, thus allowing aggressive caching on TLT.

Replications information is also stored in tract locator table, which is used when client doing operations: Create, Delete, Extend, only writes to primary, and primary propagates to replicas; Write writes to all replicas; Read from random replica. Recovery is super fast because there's no locality issue, the system can just copy a bunch of still live replicas to other tractservers to take over the lost replicas. Since all of these is parallel, this can be done very quickly, 92GB lost recovery only takes 6.2s.

Will the work be influential in 10 years? I think so. As the ECMP and full bandwidth bisection network technology becoming more and more mature, the performance gain presented in this paper is certainly something worth storage people to think about.

Review of "The Google File System"

Faced with the needs of Google infrastructure, Google developed its own distributed file system Google File System (GFS), which is different from previous distributed file systems like NFS (Network File System) and AFS (Andrew File System) because it's designed for 1) inexpensive commodity components that fail often, 2) storing millions of (modest) large files files each 100MB or larger, 3) large streaming reads and small random reads, 4) large, sequential writes that append data to files, 5) concurrent appending, 6) high sustained throughput than low latency.

A typical working set of the GSF consists of a lot of clients, a single master and multiple chunk servers (hundreds of). The master server manages all the metadata, including namespaces, access control information, mapping from files to chunks and location of chunks. Metadata are served in memory but all information except for location to chunks are persisted whereas location of chunks are polled. They didn't persist chunk locations because it's simpler to do poll in the face of chunk servers come and go so often. A very interesting fact here is GFS has only one master which somehow constrained the scalability and brings challenge of high availability, but it makes the implementation much easier and they make some sacrifices by making the chunk size relatively large – 64BM.

The consistence model defined by GFS is like this. If it's consistent, then all clients will always see the same data, regardless of which replica they read from. If a region is defined after a file mutation, then it is consistent and the client will see the mutation writes in its entirety. For failure write or append, the state is inconsistent; for concurrent successful writes, the result will be consistent but undefined, but for serial successful writes the result will be defined; for either serial or concurrent append success, the result will be mostly defined, but may be inconsistent at some point.

A very interesting fact about the behavior of the client is it writes to each replicas individually. My question here is why shouldn't it write to the primary replica first and let the primary replica propagate. What's the reason behind this?

Will the work be influential in 10 years? Maybe yes, since it provides a detailed design for another flavor of distributed file systems in addition to AFS and NFS, providing strong support for large, mostly appended files.

Review of "The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing"

None of the existing distributed data processing frameworks can meet certain use cases completely. Batch systems such as MapReduce (and its Hadoop variants, including Pig and Hive), FlumeJava, and Spark suffer from the latency problems because they need to collect all input data into a batch before processing it. For many streaming systems, it's unclear how they'll be fault tolerant at scale, whereas frameworks like Storm although providing scalability and fault tolerance, fall short on expressiveness or correctness vectors. Others like Spark Streaming, Trident are limited at windowing. Spark Streaming and MillWheel are both sufficiently scalable, fault-tolerant and low-latency to act as reasonable substrates, but lack high-level programming models that make calculating event-time sessions straightforward. The major shortcoming of all these systems is that they focus on input data (unbounded or bounded) as something that will at some point be complete. To overcome this, one must provide simple but powerful tools for balancing the amount of correctness, latency and cost appropriate for specific use case at hand. The paper provides 1) a windowing model which supports unaligned event-time windows, 2) a triggering model that binds the output times of results to runtime characteristics of the pipeline, 3) an incremental processing model that integrates retractions and updates into the windowing and triggering models, 4) a scalable implementation on top of MillWheel and FlumeJava.

The windowing API and the underlying model presented in this paper does seem pretty powerful and easy to use. Although didn't talk about fault-tolerance and performance at all, it laid down several fundamental insights on designing the next generation big data processing frameworks. Such as discarding the notion of completeness, being flexible for future use cases, encouraging clarity of implementations, supporting robust analysis of data in their domain, etc. All of these guidelines or principles are really good. They are the experiences we accumulated over all these years of designing all these data processing frameworks. It'll definitely have an influence in our later development.

Review of "High-throughput, low- latency, and exactly-once stream processing with Apache Flink"

In streaming system designs, micro-batching or discretized streaming has three inherent problems. 1) They change the programming model from pure streaming to micro-batching, which means, users can not compute on window data in periods other than multiples of the checkpoint interval and it cannot support count-based model required by many applications. 2) It's inherently tough for them to deal with back-pressure. 3) It has the hard latency limit introduced by mini batch window. Thus the author proposes Transactional updates represented by Google Cloud Dataflow and Distributed Snapshots represented by Apache Flink as the next generation streaming systems.

Google Cloud Data Flow which employs transactional updates paradigm, logs record deliveries together with updates to the state. So that if failure happens, state and record deliveries are repeated from the log. Apache Flink on the the other hand employs distributed snapshots paradigm. It draws a snapshot of all the states the streaming computation is in, including in-flight records and operator state. So when failure happens, it can restore from the latest snapshot from a durable storage. The implementation uses Chandy-Lamport algorithm, which guarantees exactly-once stream processing.

Although claiming high throughput, Flink actually needs to balance throughput with recovery time – the higher the throughput, the larger the "barrier" interval needs to be, thus it needs to store more information about current state.

Whether Flink will be influential in 10 years? I don't know yet, need to actually use it in some project to see how it actually works.