Monday, October 24, 2016

It is about time

A couple months ago, I had visitors from SpectraCom. They gifted me a GPS timeserver. I am now the guy with the most accurate time in the department.

We met with these engineers for a couple hours discussing about time synchronization. It was a very enjoyable discussion and time flew by. We talked about how to achieve time synchronization at a box, how to distribute it, and its applications. I had taken some notes, and thought it would be useful to share them.

Estimated reading time: 6 minutes.
Suggested song to accompany (right-click and open link in new tab): Time from Pink Floyd

Precise clocks in a box

Atomic clocks often use Ribidium, which has a drift of only 1 microsecond a day. OCXO ovenized oscillators are the second best way to have precise clocks in a box. Temperature change has the most effect in crystal oscillation rate, which results in clock drift from "True Time". Ovenizing the oscillators provides a way to control/compensate temperature change. OCXO ovenized oscillators have a drift of 25 microsecond a day.

GPS time (box/distribution)

There are 4 big satellite  systems. GPS is the biggest and is maintained by US. Then comes GLONASS (yeah I know) by Russia,  Galileo by Europe, and  Beidou by China. India also has some regional satellites as well. Today all smartphones have support for GPS, and some recent ones also support GLONASS as well.

The satellites, which are up there around 20,000km altitude, have Ribidium based atomic clocks and they distribute time sync information. That looks like an awfully long distance, but that doesn't stop the satellites to serve as the most prominent time sync solution. Why? The answer has to do with distribution of time sync. Distribution of time sync over many hops/routers on the  Internet degrades the precision of the time sync. When distributing with wires, you have to relay: it is infeasible to have one long physical cable. And thus relaying/switching/routing adds nondeterministic delays to the time sync information. For the satellites, we have wireless distribution. Albeit the long distance, distribution from satellite is still one hop. And the distance delay is deterministic, because it can be calculated precisely by dividing the distance to the satellite with the speed of light. The accuracy of GPS time signals is ±10 ns.

GPS is an engineering marvel. Here you can read more (and admire) about GPS. Here are some interesting highlights about GPS synchronization. Constant ground-based correction is issued to the satellites to account for relativistic effects and other effects. Ground stations (US naval observatory NIST) transmit to satellites periodically to update/correct their atomic clocks for  Coordinated Universal Time (UTC). GPS is weatherproof. Even big storms would not degrade GPS signals significantly. However, jamming is more of a problem, since GPS is a very low power signal.

Assisted GPS helps smartphones lock on to the low power GPS signals. Celltowers provide smartphones with approximate time and position information. And also include GPS constellation information. So the smartphone knows where/which signals to lock to. There are also Pseudolites. These are stable on-the-ground satellite beacons. They simulate satellites and are now being considered for indoor localization systems. They spoof GPS, their signal overloads smartphones GPS chipsets.

Time sync distribution

NTP is by far the most popular time sync distribution protocol on the Internet. However, NTP clock sync errors can be amount to tens of milliseconds. The biggest source of problem for NTP distribution is the asymmetry in the links. Consider 100 mbps link feeding into 1 Gbps link. One way there is no delay, but coming back the other way there is queuing delay. This asymmetry introduces errors into time sync. NTP is also prone to security attacks. Having your timeservers are good for increased security against NTP attacks. Finance sector doesn't use NIST public source NTP servers since men-in-the-middle attack is possible. (Of course, it is also not that difficult to spoof GPS.) That all being said, I have great respect for the engineering efforts went into NTP, and what all NTP has provided for distributed systems over Internet. David Mills is a hero.

PTP IEEE 1588 is another time sync distribution protocol. PTP stands for precision time protocol PTP comes from industrial networking where it started as a multicast protocol. PTP enables hardware timestamping and has measures to eliminate link delay asymmetry. The time provider sends MAC-stamped time to the client, so the client can measure in-flight-time between time-server and itself. (In NTP time provider does not know the client, and is stateless with respect to the client. In NTP, the client asks and gets response from the NTP server which is oblivious to the client.) PTP does not have a standard reference implementation.

Applications of time sync

A big customer of time synchronization systems is power grids which use time synchronization to manage load balancing, distribution, and load shedding. Celltowers are also big customers of time synchronization. Celltowers used to have on-the-wire proprietary synchronization updated with sync-e or PTP. GPS-based synchronization has been replacing those quickly. As I mentioned earlier finance industry is a big client for time synchronization systems.

Time sync also have emerging applications in cloud/datacenter computing. The most prominent is probably Google Spanner which uses atomic clocks and GPS clocks to support externally-consistent distributed transactions at global scale.

I have been working on better clocks for distributed systems, and hybrid logical clocks and hybrid vector clocks resulted from that work. I am continuing that work to further explore the use of clocks for improving auditability of large scale distributed systems, as part of our project titled "Synchrony-aware Primitives for Building Highly Auditable, Highly Scalable, Highly Available Distributed Systems" (funded by NSF XPS, from 2015-2019, PI: Murat Demirbas and coPI: Sandeep Kulkarni):

"Auditability is a key property for developing highly scalable and highly available distributed systems; auditability enables identifying performance bottlenecks, dependencies among events, and latent concurrency bugs. In turn, for the auditability of a system, time is a key concept. However, there is a gap between the theory and the practice of distributed systems in terms of the use of time. The theory of distributed systems shunned the notion of time and considered asynchronous systems, whose event ordering is captured by logical clocks. The practical distributed systems employed NTP synchronized clocks to capture time but did so in ad hoc undisciplined ways. This project will bridge this gap and provide synchrony-aware system primitives that will support building highly auditable, highly scalable, and highly available distributed systems. The project has applications to cloud computing, distributed NewSQL databases, and globally distributed web services."

Wednesday, October 19, 2016

Book Review: Smarter Faster Better (2016)

I read this book in early September. My review comes after a month's lag, so I am not fresh on the details of the book, and can't totally recall my reactions while reading each chapter. However, sometimes it is better to write a review not afresh, but after some gestation period, so you can provide a more balanced review.

Overall I liked the book. It is a fun read. Charles Duhigg is a great story teller. Maybe to a fault. In retrospect I think the stories diluted the focus of the book, and the real ideas/tips for becoming "smarter, faster, better" got shortchanged. After I briefly describe each chapter, I will revisit this point at the end of this post.

The first chapter is "Motivation". What motivates us to work hard and get after it? There is a physiological component to motivation regulated by the "striatum" in the subcortical part of the forebrain, a critical component of the reward system. This chapter talks about the case of a very successful businessman who got a minor stroke in striatum and lost all interest in business and life. This chapter also presents a story from Marines bootcamp, and how the Marines find the motivation to endure through the tough challenge exercises. Yet another story in the first chapter is about mundanely subversive/rebellious behavior by the elderly in nursing houses. The common theme about all three stories, which is the key for motivation, is the concept of "choice". The elderly who displays minor acts of rebellion/subversiveness remain healthy and function longer, because they are exercising their power to choose, and not blindly obey. In the case of the businessman who lost his interest to live and succeed, his wife relentlessly helps him make small choices, and overtime he starts to recover. In the case of the Marines, their commanders train them to make choices and display "ownership". (In another recent book "Extreme Ownership" by Jocko Willinks and Leif Babin explore this issue.) When facing great challenges, the best way to get started is by making some choices. Albeit tiny, the choices we make give us a sense of control, takes us out of the frame of mind of learned helplessness and victimhood, and we start to exercise our internal locus of control. Another point made through the Marines bootcamp story is to keep "a big vision" to motivate you in following your goals. When we are feeling down, we should ask ourselves "why" we choose to endure these hardship, and get the big vision to motivate us to inch along to that direction.

The second chapter is "Teams". It presents the story of Julia as she graduates from Yale School of Management and joins Google's People Analytics group to investigate what makes teams more effective. Yes, there is no surprise: having diverse teams is better. But, surprisingly, the ultimate factor in effectiveness of teams is the psychological safety: do team members feel safe in expressing and defending opinions? The chapter also tells about the story of two hospital wards, one with high mistake rates, and the other with low mistake rates. In a twist ending, it turns out that the ward with lower mistake rates is not better, but that the members feel unsafe about reporting mistakes and mistakes are simply not accounted for. The ward with the higher mistake rates turns out to be the better functioning ward, because the members report every mistake, and those mistakes are mostly smaller mistakes and measures are taken to correct them before they escalate to critical problems. Finally, the chapter tells colorful stories about the good olden days of the Saturday Night Live, and how a team of misfits and rebels,argue each other continuously but manage to put on a fantastic show every weekend.

The third chapter is "Focus". It turns out too much focus could be a bad thing, because it can lead to a tunnel-vision of attention. This point was driven home by the heart-wrenching story of Air France Flight 447. What a nightmare! I got very uncomfortable even by reading the account of it. (Charles Duhigg is a great story teller, and managed to climb the tension/tragedy of the story expertly.) The pilots tunnel-vision to a wrong measure/lead, and things gradually spiral worse and worse. I almost started to yell, "how could you get things so wrong?". This plane could not possibly have gone down, yet there it does like a slow-motion trainwreck (taking place at an altitude of 30,000 feet). The chapter also tells the story of Qantas Flight 32, the most damaged Airbus A380 ever to land safely. The pilot managed to achieve this by thinking of the plane as a simple Cessna plane, and not getting tunneled into irrelevant metrics/leads, and staying vigilant by keeping this simple but holistic mental model in mind. The pilot and copilots kept their options open, did not delegate thinking, and managed to make good decisions under stress by the power of their mental model for the plane. The chapter also tells the story of nurse Darlene: she had the mental model of what a healthy baby should look like, and was able to recognized problematic babies much earlier and save their lives. The take home message from this chapter is that you should have narratives about what to focus and what to ignore and mental models of the process (which you constantly reevaluate and readjust), so you can stay vigilant and prepared.

The fourth chapter is on "Goal setting". The chapter has two very engaging stories: one about General Electrics, and the other about the Yom Kippur war, and makes points about how to set smart goals without getting too carried away. A smart goal is specific, measurable, achievable, realistic, has timeline (see, it is an acronym). The chapter first makes this point through the GE story, and then says, smart goals are not enough, you also need to have stretch goals, and makes this point in the second half of the GE story. The stretch goals are there to push the limits; you can't grow a system unless you strain it some. This brings the antifragility concept to mind. Through the Yom Kippur War story the chapter drives the point that, you shouldn't get too obsessed about your goals that you get carried away and rule out obstacles/threats to your strategy as very low-risk.

The fifth chapter is about "Managing others". Through two stories, General Motors Toyota partnership story and a kidnapping case FBI successfully solved, it makes points about agile thinking and building a culture of thrust. The stories are again engaging, but at this point, I was hoping that there would be less stories, because I was losing the thread of smarter, faster, better. Again it takes a special talent to weave two very unrelated stories to make a somewhat coherent case, but I got weary of following the stories and trying to figure out how these relate to the themes of other chapters and the other 10 stories told on those chapters. At this point, I declared maximum story saturation for myself.

The sixth chapter is about "Decision Making". The story is about how Annie Duke won the 2004 Tournament of Champions for poker. Again an engaging story, and there is some connection to how to weigh out possible future outcomes when planning. While the connection of poker and weighing possible future outcomes through Bayesian reasoning is literal, I wished that instead of reading a lengthy poker story I could read about more concrete and applicable tactics/strategies about decision making and weighing out possible outcomes.

The seventh chapter is about "Innovation". It tells the story of Disney's Frozen and also there is a story about creation of "West Side Story" (emphasis mine). I tried to remember back to what was the insight in this chapter, but couldn't recall a significant one. When I looked up the chapter again, I found that the insight was "creativity is just problem solving". So I was not much off the mark.

The final chapter is about "Absorbing Data". The lesson here is that even if you collect data and use data processing/visualization, if you are not internalizing the results, these are all for vain. This lesson was told through the story of some teachers that analyzed student evaluation data by hand, made up stories/hypothesis about how to interpret these, and put effort in the process, which helped improve the success of Cincinnati's public schools. The problem with relying on automation of analysis is that, it would be easy come easy go. If you haven't thought about it and internalize it, you didn't get benefit from it. You can't delegate thinking to computers (at least not yet).

In conclusion, I think Duhigg is a great story teller, and he overdid that in this book. Yes, the stories were super engaging, but they are just anecdotes and you can come up with many other stories to make counterpoints. Instead of so many stories, I would have loved to read more tips/ideas/heuristics about how to get smarter, faster, better, and more concrete application use cases.

The subtitle of the book promises us "the secrets of being productive in life and business", and, of course, the book fails to deliver that. Because there is no secret. There is no silver bullet for avoiding failures, and no bulletproof recipe for success. There is no shortcut. To be smarter, faster, and better, you should become smarter, faster, and better. Tips, ideas, even paradigm shifts may give us only some boost in getting there. Becoming smarter, faster, and better is a constant inward battle where we have to continuously struggle to earn our badges.

Sunday, September 4, 2016

Sonification for monitoring and debugging distributed systems

Humans are top-grade pattern recognition machines. In his latest book, Ray Kurzweil, the Transcendent Man, theorizes that the human brain can be modeled as a hierarchy of pattern recognizers. Kurzweil says the neocortex contains 300 million very general pattern recognition circuits and argues that they are responsible for most aspects of human thought. (As a side note, here is an amusing little post I had written about some peculiar pattern recognition bug in our brains.)

I don't have theories or hard data to offer on human pattern recognition skills. All I can offer here are some anecdotes.

Some time ago, I had read a story about rangers decrypting a secure radio transmission by just getting accustomed to it. (This was a fascinating story, and I lost track of where I read this story. If you have a pointer, let me know in the comments.) The secure radio transmission system had some peculiarities when encrypting, maybe the encryption was taking tad bit longer when encrypting certain wovels. Now the rangers, with not much else to do, were listening to that channel continuously, and their ears started to pick up on these peculiarities. And soon they were able to decrypt the secure transmissions in real time. (Here is a more modern and advanced version of that attack, using software doing essentially the same thing.)

The auditory pattern recognition can go a long way in the extremes. Did you know that humans can learn to echolocate? Fricking Dare Devil stuff here.

My point with these examples is that humans are skilled at performing advanced pattern recognition feats. These skills are hardwired in our brains, and we rely on them as toddlers to pick up on spoken language, a distinctive human feature.

Using audio to debug

My examples were about audio pattern recognition, which is where I want to lead your attention. Sound is a multidimensional phenomenon, consisting of pitch, loudness, and timbre. Sound is also an ambient technology.  These make sound a viable alternative and complement to the visual sensing modalities.

Since sound is so fundamental and important for human perception, sound has been employed by several professions as a means of debugging and identifying problems. For many decades doctors and mechanics have been listening for abnormal noises for troubleshooting sickness and problems. Last week, I found myself intrigued with the question of whether we can use sound for software debugging. In other words, can we design a stethoscope analog tool for software debugging?

In order to do that, we should first figure out a way to transform data to sound. This process would be an analog of data visualization. I didn't know the correct term for this, so my initial Google searches where unproductive. Then, I came across the correct term, when I searched for "sound analog of visualization". Aha, sonification!

Armed with the correct terminology, all sorts of literature opened up on this process. A primitive and popular example of sonification is the Geiger counter. Radar is another basic example. Recently with SETI project, sonification became an interesting venue also in exploring space. This TED talk shows how Wanda Diaz, a blind astronomer, listens to the stars using sonification.

In the digital systems domain, there are also good examples of sonification. You must have seen this sonification of sorting algorithms. It is simple but brilliant. There has also been more advanced attempts at sonification. This work employs sonification for overviewing git conflicts.
These work employ sonification for business process monitoring. And these research papers explore how to employ sonification for understanding and debugging software
([1], [2], [3], [4]).

Sonification for distributed systems

What I was thinking was to use sonification for monitoring and debugging of distributed systems. Distributed systems are notoriously difficult to monitor and debug. Can we get any type of additional help or slight advantage through the sonification approach? Each type of message transmission can be assigned to a certain piano tone. Loudness may be used for sonifying the size of the message. The duration of computation or message transmission would naturally show up as the rhythm of the system. (Of course going at the millisecond scale won't work for human perception. The sonification software should be doing some slicing/sampling and slowing things down to provide the human with what he can cope with.) For datacenter monitoring, you may give each software system/service a different timbre, and assign each one to a different musical instrument. Then you may listen to your datacenter as if listening to an orchestra performing a symphony. Maybe your large map reduce deployment would go "da da da dum!".

Last night, my PhD students Aleksey and Ailidani surprised me with sonification related to projects they are working on. I had mentioned them about this idea on Thursday. When we met on Skype to catch up on their projects, the first thing they demonstrated to me was how their projects sounded. It was exciting to listen to the sonification of distributed systems. The Voldemort server operation certainly has a peculiar music/rythym. And I bet it is different from how Cassandra would sound --I haven't listened to that yet. The classic Paxos algorithm had a bluegrass like rhythm, at least in Ailidani's sonification. I bet other Paxos flavors, such as ePaxos, Raft, and Mencious will sound different than the classic Paxos. And certainly the faults and failures will sound quite abnormal and would be noticeable as off-tunes and disruptions in the rhythmic sonification of these systems.

Sonification for monitoring and debugging distributed systems may become a thing in a decade or so. What if you couple sonification with a virtual reality (VR) headset for visualization. Sonification could be a good complement to the VR headset. VR gives you total immersion. Sonification provides an ambient way for you to notice an anomaly (something slightly off-tune) in the first place. With VR you can go deep and investigate. With a good VR interface, maybe like guitar hero, dance dance revolution like interface, it is possible to look at the different nodes executing in parallel, crossing roads and diverging again. This blog post does a good job of reviewing how VR equipment can help in debugging.

Monday, July 18, 2016

Kafka, Samza, and the Unix Philosophy of Distributed Data

This paper is very related to the "Realtime Data Processing at Facebook" paper I reviewed in my previous post. As I mentioned there Kafka does basically the same thing as Facebook's Scribe, and Samza is a stream processing system on Kafka.

This paper is very easy to read. It is delightful in its simplicity. It summarizes the design of Apache Kafka and Apache Samza and compares their design principles to the design philosophy of Unix, in particular, Unix pipes.

Who says plumbing can't be sexy? (Seriously, don't Google this.) So without further ado, I present to you Mike Rowe of distributed systems.


I had talked about the motivation and applications of stream processing in the Facebook post. The application domain is basically building web services that adapt to your behaviour and personalize on the fly, including Facebook, Quora, Linkedin, Twitter, Youtube, Amazon, etc. These webservices take in your most recent actions (likes, clicks, tweets), analyze it on the fly, merge with previous analytics on larger data, and adapt to your recent activity as part of a feedback loop.

In theory you can achieve this personalization goal with a batch workflow system, like MapReduce, which provides system scalability, organizational scalability (that of the engineering/development team's efforts), operational robustness, multi-consumer support, loose coupling, data provenance, and friendliness to experimentation. However, batch processing will add large delays. Stream processing systems preserve all the good scalability features of batch workflow systems, and add "timeliness" feature as well.

I am using shortened descriptions from the paper for the following sections.

Apache Kafka

Kafka provides a publish-subscribe messaging service. Producer (publisher) clients write messages to a named topic, and consumer (subscriber) clients read messages in a topic. A topic is divided into partitions, and messages within a partition are totally ordered. There is no ordering guarantee across different partitions. The purpose of partitioning is to provide horizontal scalability: different partitions can reside on different machines, and no coordination across partitions is required.

Each partition is replicated across multiple Kafka broker nodes to tolerate node failures. One of a partition's replicas is chosen as leader, and the leader handles all reads and writes of messages in that partition. Writes are serialized by the leader and synchronously replicated to a configurable number of replicas. On leader failure, one of the in-sync replicas is chosen as the new leader.

The throughput of a single topic-partition is limited by the computing resources of a single broker node --the bottleneck is usually either its NIC bandwidth or the sequential write throughput of the broker's disks. When adding nodes to a Kafka cluster, some partitions can be reassigned to the new nodes, without changing the number of partitions in a topic. This rebalancing technique allows the cluster's computing resources to be increased or decreased without affecting partitioning semantics.

Apache Samza

A Samza job consists of a Kafka consumer, an event loop that calls application code to process incoming messages, and a Kafka producer that sends output messages back to Kafka. Unlike many other stream-processing frameworks, Samza does not implement its own network protocol for transporting messages from one operator to another.

Figure 3 illustrates the use of partitions in the word-count example: by using the word as message key, the SplitWords task ensures that all occurrences of the same word are routed to the same partition of the words topic.

Samza implements durable state through the KeyValueStore abstraction, exemplified in Figure 2. Samza uses the RocksDB embedded key-value store, which provides low-latency, high-throughput access to data on local disk. To make the embedded store durable in the face of disk and node failures, every write to the store (i.e., the changelog) is also sent to a dedicated topic-partition in Kafka, as illustrated in Figure 4. When recovering after a failure, a task can rebuild its store contents by replaying its partition of the changelog from the beginning. Rebuilding a store from the log is only necessary if the RocksDB database is lost or corrupted. While the changelog publishing to Kafka for durability seems wasteful, it can also be a useful feature for applications: other stream processing jobs can consume the changelog topic like any other stream, and use it to perform further computations.

One characteristic form of stateful processing is a join of two or more input streams, most commonly an equi-join on a key (e.g. user ID). One type of join is a window join, in which messages from input streams A and B are matched if they have the same key, and occur within some time interval delta-t of one another. Alternatively, a stream may be joined against tabular data: for example, user clickstream events could be joined with user profile data, producing a stream of clickstream events with embedded information about the user. When joining with a table, the authors recommend to make the table data available in the form of a log-compacted stream through Kafka. Processing tasks can consume this stream to build an in-process replica of a database table partition, using the same approach as the recovery of durable local state, and then query it with low latency. It seems wasteful to me, but it looks like the authors do not feel worried about straining Kafka, and are comfortable with using Kafka as a work horse.

Even though the intermediate state between two Samza stream processing operators is always materialized to disk, Samza is able to provide good performance: a simple stream processing job can process over 1 million messages per second on one machine, and saturate a gigabit Ethernet NIC.


The paper includes a nice discussion section as well.
  • Since the only access methods supported by a log are an appending write and a sequential read from a given offset, Kafka avoids the complexity of implementing random-access indexes. By doing less work, Kafka is able to provide much better performance than systems with richer access methods. Kafka's focus on the log abstraction is reminiscent of the Unix philosophy: "Make each program do one thing well. To do a new job, build afresh rather than complicate old programs by adding new features." 
  • If Kafka is like a streaming version of HDFS, then Samza is like a streaming version of MapReduce. The pipeline is loosely coupled, since a job does not know the identity of the jobs upstream or downstream from it, only the topic names. This principle again evokes a Unix maxim: “Expect the output of every program to become the input to another, as yet unknown, program.”
  • There are some key differences between Kafka topics and Unix pipes: A topic can have any number of consumers that do not interfere with each other, it tolerates failure of producers, consumers or brokers, and a topic is a named entity that can be used for tracing data provenance. Kafka topics deliberately do not provide backpressure: the on-disk log acts as an almost-unbounded buffer of messages.
  • The log-oriented model of Kafka and Samza is fundamentally built on the idea of composing heterogeneous systems through the uniform interface of a replicated, partitioned log. Individual systems for data storage and processing are encouraged to do one thing well, and to use logs as input and output. Even though Kafka's logs are not the same as Unix pipes, they encourage composability, and thus Unix-style thinking.

Related links

Further reading on this is Jay Kreps excellent blog post on logs.

Apache Bookkeeper and Hedwig are  good alternatives to Kafka

These days, there is also DistributedLog.

Saturday, July 9, 2016

Realtime Data Processing at Facebook

Recently there has been a lot of development in realtime data processing systems, including Twitter's Storm and Heron, Google's Millwheel, and LinkedIn's Samza. This paper presents Facebook's Realtime data processing system architecture and its Puma, Swift, and Stylus stream processing systems. The paper is titled "Realtime Data Processing at Facebook" and it appeared at Sigmod'16, June 26-July 1.

Motivation and applications

Facebook runs hundreds of realtime data pipelines in productions. As a motivation of the realtime data processing system the paper gives Chorus as an example. The Chorus data pipeline transforms a stream of individual Facebook posts into aggregated, anonymized, and annotated visual summaries. E.g., what are the top 5 topics being discussed for the election today? What are the demographic breakdowns (age, gender, country) of World Cup fans?

Another big application is the mobile analytics pipelines that provide realtime feedback for Facebook mobile application developers, who use this data to diagnose performance and correctness issues.

The system architecture

Scribe plays a central role in Facebook's realtime processing architecture. The main idea of the architecture is this: By trading seconds versus milliseconds latency, the architecture is able to employ a persistent message bus, i.e., Scribe, for data transport. Scribe provides a persistent, distributed messaging system for collecting, aggregating and delivering high volumes of log data with a few seconds of latency and high throughput. Scribe is the transport mechanism for sending data to both batch and realtime systems at Facebook. Using Scribe to decouple the data transport from the processing allows the system to achieve fault tolerance, scalability, and ease of use, as well as supporting multiple processing systems as options.

While Scribe incurs a few seconds of latency, it still meets Facebook's performance requirements for latency and provides hundreds of Gigabytes per second throughput. On the other hand, Scribe provides a persistent message bus service that enables decoupling and isolation of the data production and data analysis system components. Moreover, with persistent Scribe streams, the system can replay a stream from a recent time period, which makes debugging and iterative-development much easier.

The Kafka log blog by Jay Kreps described these benefits nicely as well. It talked about how practical systems can by simplified with a log-centric design, and how these log steams can enable data Integration by making all of an organization's data easily available in all its storage and processing systems. Kafka would have similar advantages to Scribe. Facebook uses Scribe because it is developed in house.

Below I copy snippets of descriptions from the paper for each of these subsystems.

Within Scribe, data is organized by distinct streams of "category". Usually, a streaming application consumes one Scribe category as input. A Scribe category has multiple buckets. A Scribe bucket is the basic processing unit for stream processing systems: applications are parallelized by sending different Scribe buckets to different processes. Scribe provides data durability by storing it in HDFS. Scribe messages are stored and streams can be replayed by the same or different receivers for up to a few days.

The realtime stream processing systems Puma, Stylus, and Swift read data from Scribe and also write to Scribe.  Laser, Scuba, and Hive are data stores that use Scribe for ingestion and serve different types of queries. Laser can also provide data to the products and streaming systems, as shown by the dashed (blue) arrows.

Puma is a stream processing system whose applications (apps) are written in a SQL-like language with UDFs (user-defined functions) written in Java. Puma apps are quick to write: it can take less than an hour to write, test, and deploy a new app. Unlike traditional relational databases, Puma is optimized for compiled queries, not for ad-hoc analysis. Puma provides filtering and processing of Scribe streams (with a few seconds delay). The output of these stateless Puma apps is another Scribe stream, which can then be the input to another Puma app, any other realtime stream processor, or a data store.

Swift is a basic stream processing engine which provides checkpointing functionalities for Scribe. If the app crashes, you can restart from the latest checkpoint; all data is thus read at least once from Scribe. Swift is mostly useful for low throughput, stateless processing.

Stylus is a low-level stream processing framework written in C++. A Stylus processor can be stateless or stateful. Stylus's processing API is similar to that of other procedural stream processing systems.

Laser is a high query throughput, low (millisecond) latency, key-value storage service built on top of RocksDB. Laser can be used to make the result of a complex Hive query or a Scribe stream available to a Puma or Stylus app, usually for a lookup join, such as identifying the topic for a given hashtag.

Scuba is Facebook's fast slice-and-dice analysis data store, most commonly used for trouble-shooting of problems as they happen. Scuba provides ad hoc queries with most response times under 1 second.

Hive is Facebook's exabyte-scale data warehouse. Facebook generates multiple new petabytes of data per day, about half of which is raw event data ingested from Scribe. (The other half of the data is derived from the raw data, e.g., by daily query pipelines.) Most event tables in Hive are partitioned by day. Scribe does not provide infinite retention; instead Facebook stores input and output streams in our data warehouse Hive for longer retention.

Design decisions

Figure 4 summarizes the five design decisions considered for this Facebook realtime processing system components. Figure 5 summarizes which alternatives were chosen by a variety of realtime systems, both at Facebook and in the related literature.

Lessons learned

The paper includes a great lessons learned section. It says: "It is not enough to provide a framework for users to write applications. Ease of use encompasses debugging, deployment, and monitoring, as well. The value of tools that make operation easier is underestimated. In our experience, every time we add a new tool, we are surprised that we managed without it."

The highlights from this section are as follows:

  • There is no single language that fits all use cases. Needing different languages (and the different levels of ease of use and performance they provide) is the main reason why Facebook has three different stream processing systems, Puma, Swift, and Stylus.
  • The ease or hassle of deploying and maintaining the application is equally important. Making Puma deployment self-service let them scale to the hundreds of data pipelines that use Puma. (See Facebook's holistic configuration management about what type of systems Facebook employs to manage/facilitate deployments.
  • Once an app is deployed, we need to monitor it: Is it using the right amount of parallelism? With Scribe, changing the parallelism is often just changing the number of Scribe buckets and restarting the nodes that output and consume that Scribe category. To find out the right amount of parallelism needed, Facebook uses alerts to detect when an app is processing its Scribe input more slowly than the input is being generated. 
  • Streaming versus batch processing is not an either/or decision. Originally, all data warehouse processing at Facebook was batch processing. Using a mix of streaming and batch processing can speed up long pipelines by hours.

Related posts

Facebook's software architecture 

Holistic Configuration Management at Facebook

Facebook's Mystery Machine: End-to-end Performance Analysis of Large-scale Internet Services 

Measuring and Understanding Consistency at Facebook

Thursday, July 7, 2016

Efficient Replication of Large Data Objects

This paper appeared in DISC 2003, and describes an application of the ABD replicated atomic storage algorithm for replication of large objects. When objects being replicated is much larger than the size of the metadata (such as tags or pointers), it is efficient to tradeoff performing cheaper operations on the metadata in order to avoid expensive operations on the data itself.

The basic idea of the algorithm is to separately store copies of the data objects in replica servers, and information about where the most up-to-date copies are located in directory servers. This Layered Data Replication (LDR) approach adopts the ABD algorithm for atomic fault-tolerant replication of the metadata, and prescribes how the replication of the data objects in the replica servers can accompany replication of the metadata in directory servers in a concurrent and consistent fashion: In order to read the data, a client first reads the directories to find the set of up-to-date replicas, then reads the data from one of the replicas. To write, a client first writes its data to a set of replicas, then informs the directories that these replicas are now up-to-date.

The LDR algorithm replicates a single data object supporting read and write operations, and guarantees that the operations appear to happen atomically.  While there exist multiple physical copies of the data, users only see one logical copy, and user operations appear to execute atomically on the logical copy. As such LDR provides linearizability, a strong type of consistency, that guarantees that a read operation returns the most recent version of data. LDR provides single-copy consistency and is on the CP side of the CAP triangle; availability is sacrificed when a majority of replicas are unreachable.

Client Protocol

When client i does a read, it goes through four phases in order: rdr, rdw, rrr and rok. The phase names describe what happens during the phase: read-directories-read, read-directories-write, read-replicas-read, and read-ok. During rdr, i reads (utd, tag) from a quorum of directories to find the most up-to-date replicas. i sets its own tag and utd to be the (tag, utd) it read with the highest tag, i.e., timestamp. During rdw, i writes (utd, tag) to a write quorum of directories, so that later reads will read i’s tag or higher. During rrr, i reads the value of x from a replica in utd. Since each replica may store several values of x, i tells the replica it wants to read the value of x associated with tag. During rok, i returns the x-value it read in rrr.

When i writes a value v, it also goes through four phases in order: wdr, wrw, wdw and wok. These phase names stand for write-directories-read, wrw for write-replicas-write, wdw for write-directories-write, and wok for write-ok, respectively. During wdr, i reads (utd, tag) from a quorum of directories, then sets its tag to be higher than the largest tag it read. During wrw, i writes (v, tag) to a set acc of replicas, where |acc| ≥ f + 1. Note that the set acc is arbitrary; it does not have to be a quorum. During wdw, i writes (acc, tag) to a quorum of directories, to indicate that acc is the set of most up-to-date replicas, and tag is the highest tag for x. Then i sends each replica a secure message to tell them that its write is finished, so that the replicas can garbage-collect older values of x. Then i finishes in phase wok.

If you have difficulty in understanding the need for 2-round directory reads/writes this protocol, reviewing how the ABD protocol works will help.

Replica and Directory node protocol

The replicas respond to client requests to read and write values of data object x. Replicas also garbage-collect out of date values of x, and gossip among themselves the latest value of x. The latter is an optimization to help spread the latest value of x, so that clients can read from a nearby replica.

The directories' only job is to respond to client requests to read and write utd and tag.

Questions and discussion

Google File System (SOSP 2003) addressed efficient replication of large data objects for datacenter computing in practice. GFS also provides a metadata service layer and data object replication layer. For the metadata directory service, GFS uses Chubby, a Paxos service which ZooKeeper cloned as opensource.  Today if you want to build from a consistent large object replication storage from scratch, your architecture would most likely use ZooKeeper as the metadata directory coordination service as GFS prescribed. ZooKeeper provides atomic consistency already, so it eliminates the 2-round needed for directory-reads and directory-writes in LDR.

LDR does not use a separate metadata service, instead it can scavenge raw dumb storage nodes for directory service and achieve the same effect by using ABD replication for making the metadata directory atomic/fault-tolerant. In other words, LDR takes a fully-decentralized approach, and can support loosely-connected heterogenous wimpy devices (maybe even smartphones?). I guess that means more freedom. On the other hand, LDR is bad for performance. It requires 2 rounds of directory-write for each write operation and 2 rounds of directory-read for each read operation. This is major drawback for LDR. Considering reads are generally 90% of the workload, supporting 1 round directory-reads would have alleviated the performance problem somewhat. Probably in normal cases (in the absence of failures, the first directory read (rdr operation) will show the up-to-date replica copy is present in a quorum of directory nodes, and the second round of directory access (rdw operation) can be skipped.

Using ZooKeeper for the metadata directory helps a lot, but a downside can be that ZooKeeper is a single centralized location, and that means for some clients across to ZooKeeper will always incur high WAN communication penalty. Using ZooKeepers observers reduce this cost for read operations. And as I will blog about soon, our work on WAN-Keeper reduces this cost also for write operations. The LDR paper suggests that LDR is suitable for WAN, but LDR still incurs WAN latencies while accessing a quorum of directory nodes (twice!) across WAN.

Another way to efficiently replicate large data objects is of course key-value stores. In key-value stores, you don't have a metadata directory, as "hashing" takes care of that. On the other hand, most key-value stores sacrifice strong consistency, in lieu for eventual consistency. Is it true that you can't just get away with using hashes and  need some sort of metadata service if you like to achieve consistency? The consistent key-value stores I can think of (and I can't think of too many) use either a Paxos commit on metadata or at least a chain replication approach such as in Hyperdex and Replex. The chain replication approach uses a Paxos box only for directory node replication configuration information; does that still count as a minimal and 1-level-indirect metadata service?

Friday, July 1, 2016

Replex: A Scalable, Highly Available Multi-Index Data Store

This paper received the best paper award at Usenix ATC'16 last week. It considers a timely important problem. With NoSQL databases, we got scalability, availability, and performance, but we lost secondary keys.  How do we put back the secondary indices, without compromising scalability, availability, and performance.

The paper mentions that previous work on Hyperdex  did a good job of re-introducing secondary keys to NoSQL, but with overhead: Hyperdex generates and partitions an additional copy of the datastore for each key. This introduces overhead for both storage and performance: supporting just one secondary key doubles storage requirements and write latencies.

Replex adds secondary keys to NoSQL databases without that overhead. The key insight of Replex is to combine the need to replicate for fault-tolerance and the need to replicate for index availability. After replication, Replex has both replicated and indexed a row, so there is no need for explicit indexing.

How does Replex work?

All replexes store the same data (every row in the table), the only difference across replexes is the way data is partitioned and sorted, which is by the sorting key of the index associated with the replex. Each replex is associated with a sharding function, h, such that h(r) defines the partition number in the replex that stores row r.

So, that was easy. But, there is an additional complication that needs to be dealt with. The difficulty arises because individual replexes can have requirements, such as uniqueness constraints, that cause the same operation to be both valid and invalid depending on the replex. Figure 2 gives an example scenario, linearizability requirement for a distributed log.

To deal with this problem, datastores with global secondary indexes need to employ a distributed transaction for update operations, because an operation must be atomically replicated as valid or invalid across all the indexes. But to use a distributed transaction for every update operation would cripple system throughput.

To remove the need for a distributed transaction in the replication protocol, they modify chain replication to include a consensus protocol. Figure 3 illustrates this solution. When the consensus phase (going to the right in Figure 3) reaches the last partition in the chain, the last partition aggregates each partition's decision into a final decision, which is simply the logical AND of all decisions. Then comes the replication phase, where the last partition initiates the propagation of this final decision back up the chain. As each partition receives this final decision, if the decision is to abort, then the partition discards that operation. If the decision is to commit, then that partition commits the operation to disk and continues propagating the decision.

This has similarities to the CRAQ protocol for chain replication. Linked is an earlier post that contains a summary of chain replication and CRAQ protocol.


There is additional complexity due to failure of the replicas. Failed partitions bring up two concerns: how to reconstruct the failed partition and how to respond to queries that would have been serviced by the failed partition.

If a partition fails, a simple recovery protocol would redirect queries originally destined for the failed partition to the other replex. Then the failure amplification is maximal: the read must now be broadcast to every partition in the other replex, and at each partition, a read becomes a brute-force search that must iterate through the entire local storage of a partition.

On the other hand, to avoid failure amplification within a failure threshold f, one could introduce f replexes with the same sharding function, h; as exact replicas. There is no failure amplification within the failure threshold, because sharding is identical across exact replicas. But the cost is storage and network overhead in the steady-state.

This is the tradeoff, and the paper dedicates "Section 3: Hybrid Replexes" to explore this tradeoff space.

Concluding remarks

The paper compares Replex to Hyperdex and Cassandra and shows that Replex's steady-state performance is 76% better than Hyperdex and on par with Cassandra for writes. For reads, Replex outperforms Cassandra by as much as 2-9x while maintaining performance equivalent with HyperDex. In addition, the paper shows that Replex can recover from one or two failures 2-3x faster than Hyperdex.

Replex solves an important problem with less overhead than previous solutions. The hybrid replexes method (explained in Section 3) can also be useful in other problems for preventing failure amplification.

Thursday, June 30, 2016

Modular Composition of Coordination Services

This paper appeared in Usenix ATC'16 last week. The paper considers the problem of scaling ZooKeeper to WAN deployments. (Check this earlier post for a brief review of ZooKeeper.)

The paper suggests a client-side only modification to ZooKeeper, and calls the resultant system ZooNet. ZooNet consists of a federation of ZooKeeper cluster at each datacenter/region. ZooNet assumes the workload is highly-partitionable, so  the data is partitioned among the ZooKeeper clusters, each accompanied by learners in the remote datacenter/regions. Each ZooKeeper cluster processes only updates for its own data partition and if applications in different regions need to access unrelated items they can also do so independently and in parallel at their own site.

However, the problem with such a deployment is that it does not preserve ZooKeeper's sequential execution semantics. Consider the example in Figure 1. (It is not clear to me why the so-called "loosely-coupled" applications in different ZooKeeper partitions need to be sequentially serialized. The paper does not give an examples/reasons for motivating this.)

Their solution is fairly simple. ZooNet achieves consistency by injecting sync requests. Their algorithm only makes the remote partition reads to be synced to achieve coordination. More accurately, they insert a sync every time a client's read request accesses a different coordination service than the previous request. Subsequent reads from the same coordination service are naturally ordered after the first, and so no additional syncs are needed.

Figure 3 demonstrates the solution. I still have some problems with Figure 3. What if both syncs occur at the same time?  I mean what if both occurs after the previous updates complete. Do they, then, prioritize one site over the other and take a deterministic order to resolve ties?

The paper also mentions that they fix a performance bug in ZooKeeper, but the bug is not relevant to the algorithm/problem. It is about more general ZooKeeper performance improvement by performing proper client isolation in the ZooKeeper commit processor.

ZooNet evaluation is done only with a 2-site ZooKeeper deployment. The evaluation did not look at WAN latency and focused on pushing the limits on throughput. Reads are asynchronously pipelined to compensate for the latency introduced by the sync operation. They benchmark the throughput when the system is saturated, which is not a typical ZooKeeper use case scenario.

ZooNet does not support transactions and watches.

Update 7/1/16: Kfir, the lead author of the ZooNet work, provides clarifications for the questions/concerns in my review. See the top comment after the post.

Our work on WAN coordination

We have also been working on scaling ZooKeeper to WAN deployments. We take a different approach. We have extended ZooKeeper with hierarchical composition of ZooKeeper clusters and added lock-tokens to enable local and consistent writes as well as local and consistent reads from any region in a WAN. Our WANKeeper system supports transactions and watches. We have submitted a paper on this which is still under review, so I can talk about our work only next month. In the meanwhile we are working on polishing our WANKeeper software for opensourcing it.

Tuesday, June 28, 2016

How to package your ideas using the Winston star

I came across this advice serendipitously by reading a random Hacker News comment. The advice shows up towards the last 10 minutes of an Artificial Intelligence lecture by Patrick Winston. Winston, a prominent professor at MIT, tells his class that he will disclose them important advice that may make or break their careers. It is about how to pack and present ideas.

His advice is simple. Follow this 5-tip star to pack your ideas better. All the tips start with "S":

  • Symbol: use a symbol to make your idea visual and memorable
  • Slogan: find a catchy slogan for your idea
  • Surprise: highlight the unconventional/counterintuitive/interesting part of your idea
  • Salient: focus on the essential part of your idea, remember: less is more, fewer ideas is better
  • Story: pack your idea with a story, human brains are hardwired for stories

Here let me try to apply the  Winston's Star method on itself, to make this more concrete.

  • Symbol: star is the symbol of the Winston's method
  • Slogan:  with these 5-star tips, you can 5x the impact of your good ideas
  • Surprise: you can achieve fame and impact by packaging your ideas better following these simple presentation tips
  • Salient: devising a good presentation is as important as having good ideas and doing good work
  • Story: these presentation tips are told by a top MIT prof to his undergraduate AI class as secrets to career success

After some googling I found another relevant video from Patrick Winston. This one is titled "How to Speak". (As always, watch at 1.5x speed.)

Related posts

How to present your work
Nobody wants to read your shit

Wednesday, June 22, 2016

Nobody wants to read your shit

Earlier I wrote about how much I liked the "The War of Art" by Steven Pressfield. This newly released book by Pressfield takes on where "The War of Art" has left. While the former focused on the psychological aspects of the writing process, this book focuses on the structural/mechanical aspects of writing. The book was freely distributed as pdf and ebook for a limited time for promotion purposes. (It looks like the promotion ended.) I read it in one sitting and found it interesting. This book can benefit anyone who needs to communicate in writing. (Of course, if you are a novice writer, start with the Elements of Style.)

The book gives a very interesting account of what Steven learned from a 50+ years career performing all forms of writing, including ad writing, Hollywood script-writing, novels, and nonfiction. The first chapter lays down the most important lesson Steven has learned: "Nobody wants to read your shit", and the rest of the book talks about what you can do about it. In a nutshell, you must streamline your message (staying on theme), and make its expression fun (organizing around an interesting concept).

Steven lists these as the universal principles of story telling:
  1. Every story must have a concept. It must put a unique and original spin, twist or framing device upon the material.
  2. Every story must be about something. It must have a theme.
  3. Every story must have a beginning (that grabs the listener), a middle (that escalates in tension, suspense, stakes, and excitement), and an end (that brings it all home with a bang). Act One, Act Two, Act Three.
  4. Every story must have a hero.
  5. Every story must have a villain.
  6. Every story must start with an Inciting Incident, embedded within which is the story's climax.
  7. Every story must escalate through Act Two in terms of energy, stakes, complication and significance/meaning as it progresses.
  8. Every story must build to a climax centered around a clash between the hero and the villain that pays off everything that came before and that pays it off on-theme.
He says that these rules for writing applies to writing nonfiction as well. That includes your whitepapers, blog posts, and theses. You should definitely have a theme and an interesting concept. The hero and villain can be abstract. They can be useful for building some tension when motivating your problem.

The book is an easy and fun read. It feels like Steven is having a heart-to-heart conversation with you and coaching you about how you can become a better writer. While there were many gems, I was particularly intrigued by this passage:
I will do between 10 and 15 drafts of every book I write. Most writers do.
This is a positive, not a negative.
If I screw up in Draft #1, I'll attack it again in Draft #2.
"You can't fix everything in one draft."
Thinking in multiple drafts takes the pressure off. 

My writing related posts:

Tuesday, June 7, 2016

Progress by impossibility proofs

I recently listened to this TED talk by Google X director Astro Teller. The talk is about the Google X moonshot projects, and the process they use for managing them.

Google X moonshot projects aim to address big, important, and challenging problems.  Teller tells (yay, pun!) that the process they manage a moonshot project is basically to try to find ways to kill the project. The team first tries to identify the bottleneck at a project by focusing on the most important/critical/risky part of the project. Then they try to either solve that hardest part or show that it is unsolvable (in a feasible manner) and kill the project. Teller claims that, at Google X, they actually incentivize people to kill the project by awarding bonuses, and celebrate when a project gets proved impossible and killed. And if a project still survives, that is a successful moonshot project that has potential for transformative change.

Although, this approach looks counter intuitive at first, it is actually a good way to pursue transformative impact and fail-fast without wasting too much time and resources. This can be very useful method for academic research as well.
  1. Find a use-inspired grand-challenge problem. (This requires creativity, domain expertise, and hard thinking.)
  2. Try to prove an impossibility result.
  3. If you prove the impossibility result, that is still nice and publishable.
  4. If you can't prove an impossibility result, figure out why, and try to turn that into a solution to an almost "impossibly difficult problem". Bingo!
"Once you eliminate the impossible, whatever remains, no matter how improbable, must be the truth."
-- Sir Arthur Conan Doyle channeling Sherlock Holmes

Thursday, June 2, 2016

TensorFlow: A system for large-scale machine learning

This paper has been uploaded to on May 27 2016, so it is the most recent description of Google TensorFlow. (Five months ago, I had commented on an earlier TensorFlow whitepaper, if you want to check that first.) Below I summarize the main points of this paper by using several sentences/paragraphs from the paper with some paraphrasing. I end the post with my wild speculations about TensorFlow. (This speculation thing is getting strangely addictive for me.)

TensorFlow is built leveraging Google's experience with their first generation distributed machine learning system, DistBelief. The core idea of this paper is that TensorFlow's dataflow representation subsumes existing work on parameter server systems (including DistBelief), and offers a uniform programming model that allows users to harness large-scale heterogeneous systems, both for production tasks and for experimenting with new approaches.

TensorFlow versus Parameter Server systems

DistBelief was based on the parameter server architecture, and satisfied most of Google's scalable machine learning requirements. However, the paper argues that this architecture lacked extensibility, because adding a new optimization algorithm, or experimenting with an unconventional model architecture would require users to modify the parameter server implementation. Not all the users are comfortable with making those changes due to the complexity of the high-performance parameter server implementation.  In contrast, TensorFlow provides a high-level uniform programming model that allows users to customize the code that runs in all parts of the system, and experiment with different optimization algorithms, consistency schemes, and parallelization strategies in userspace/unprivilege code.

TensorFlow is based on the dataflow architecture. Dataflow with mutable state enables TensorFlow to mimic the functionality of a parameter server, and even provide additional flexibility. Using TensorFlow, it becomes possible to execute arbitrary dataflow subgraphs on the machines that host the shared model parameters. We say more on this when we discuss the TensorFlow model and the structure of a typical training application below.

TensorFlow versus dataflow systems

The principal limitation of a batch dataflow systems (including Spark) is that they require the input data to be immutable and all of the subcomputations to be deterministic, so that the system can re-execute subcomputations when machines in the cluster fail.  This unfortunately makes updating a machine learning model a heavy operation. TensorFlow improves on this by supporting expressive control-flow and stateful constructs.

Naiad is designed for computing on sparse, discrete data, and does not support GPU acceleration. TensorFlow borrows aspects of timely dataflow iteration from Naiad in achieving dynamic control flow.

TensorFlow's programming model is close to Theano's dataflow representation, but Theano is for a single node and does not support distributed execution.

Tensorflow model

TensorFlow uses a unified dataflow graph to represent both the computation in an algorithm and the state on which the algorithm operates. Unlike traditional dataflow systems, in which graph vertices represent functional computation on immutable data, TensorFlow allows vertices to represent computations that own or update mutable state. By unifying the computation and state management in a single programming model, TensorFlow allows programmers to experiment with different parallelization schemes. For example, it is possible to offload computation onto the servers that hold the shared state to reduce the amount of network traffic.

In sum, TensorFlow innovates on these two aspects:

  • Individual vertices may have mutable state that can be shared between different executions of the graph.
  • The model supports multiple concurrent executions on overlapping subgraphs of the overall graph.

Figure 1 shows a typical training application, with multiple subgraphs that execute concurrently, and interact through shared variables and queues. Shared variables and queues are stateful operations that contain mutable state. (A Variable operation owns a mutable buffer that is used to store the shared parameters of a model as it is trained. A Variable has no inputs, and produces a reference handle.)

This Figure provides a concrete explanation of how TensorFlow works. The core training subgraph depends on a set of model parameters, and input batches from a queue. Many concurrent steps of the training subgraph update the model based on different input batches, to implement data-parallel training. To fill the input queue, concurrent preprocessing steps transform individual input records (e.g., decoding images and applying random distortions), and a separate I/O subgraph reads records from a distributed file system. A checkpointing subgraph runs periodically for fault tolerance.

The API for executing a graph allows the client to specify the subgraph that should be executed. A subgraph is specified declaratively: the client selects zero or more edges to feed input tensors into the dataflow, and one or more edges to fetch output tensors from the dataflow; the run-time then prunes the graph to contain the necessary set of operations. Each invocation of the API is called a step, and TensorFlow supports multiple concurrent steps on the same graph, where stateful operations enable coordination between the steps. TensorFlow is optimized for executing large subgraphs repeatedly with low latency. Once the graph for a step has been pruned, placed, and partitioned, its subgraphs are cached in their respective devices.

Distributed execution

TensorFlow's dataflow architecture simplifies distributed execution, because it makes communication between subcomputations explicit. Each operation resides on a particular device, such as a CPU or GPU in a particular task. A device is responsible for executing a kernel for each operation assigned to it. The TensorFlow runtime places operations on devices, subject to implicit or explicit device constraints in the graph. the user may specify partial device preferences such as “any device in a particular task”, or “a GPU in any Input task”, and the runtime will respect these constraints.

TensorFlow partitions the operations into per-device subgraphs. A per-device subgraph for device d contains all of the operations that were assigned to d, with additional Send and Recv operations that replace edges across device boundaries. Send transmits its single input to a specified device as soon as the tensor is available, using a rendezvous key to name the value. Recv has a single output, and blocks until the value for a specified rendezvous key is available locally, before producing that value. Send and Recv have specialized implementations for several device-type pairs. TensorFlow supports multiple protocols, including gRPC over TCP, and RDMA over Converged Ethernet.

TensorFlow is implemented as an extensible, cross-platform library. Figure 5 illustrates the system architecture: a thin C API separates user-level in various languages from the core library written in C++.

Current development on TensorFlow

On May 18th,  it was revealed that Google built the Tensor Processing Unit (TPU) specifically for machine learning. The paper mentions that TPUs achieve an order of magnitude improvement in performance-per-watt compared to alternative state-of-the-art technology.

The paper mentions ongoing work on automatic optimization to determine default policies for performance improvement that work well for most users. While power-users can get their way by taking advantage of TensorFlow's flexibility, this automatic optimization feature would make TensorFlow more user-friendly, and can help TensorFlow adopted more widely (which looks like what Google is pushing for). The paper also mentions that, on the system level, Google Brain team is actively developing algorithms for automatic placement, kernel fusion, memory management, and scheduling.

My wild speculations about TensorFlow 

Especially with the addition of mutable state and coordination via queues, TensorFlow is equipped for providing incremental on the fly machine learning. Machine learning applications built with TensorFlow can be long-running applications that keep making progress as new input arrive, and can adapt to new conditions/trends on the fly. Instead of one shot huge batch machine learning, such an incremental but continuous machine learning system has obvious advantages in today's fast paced environment. This is definitely good for Google's core search and information indexing business. I also speculate this is important for Android phones and self-driving cars.

Previously I had speculated that with the ease of partitioning of the dataflow graph and its heterogenous device support, TensorFlow can span over and bridge smartphone and cloud backend machine learning. I still standby that prediction.
TensorFlow enables cloud backend support for machine learning to the private/device-level machine learning going on in your smartphone. It doesn't make sense for a power-hungry entire TensorFlow program to run on your wimpy smartphone. Your smartphone will be running only certain TensorFlow nodes and operations, the rest of the TensorFlow graph will be running on the Google cloud backend. Such a setup is also great for preserving privacy of your phone while still enabling machine learned insights on your Android.
Since TensorFlow supports inference as well as training, it can use 100s of servers for fast training, and run trained models for inference in smartphones concurrently. Android voice assistant (or Google Now) is a good application for this. In any case, it is a good time to be working on smartphone machine learning.

This is a wilder speculation, but a long-running self-improving machine learning backend in the datacenter can also provide great support for self-driving cars. Every minute, new data and decisions from self-driving cars would flow from TensorFlow subgraphs running on the cars to the cloud backend TensorFlow program. Using this constant flux of data, the program can adopt to changing road conditions (snowy roads, poor visibility conditions) and new scenarios on the fly, and all self-driving cars would benefit from the new improvements to the models.

Though the paper mentions that reinforcement style learning is future work, for all we know Google might already have reinforcement learning implemented on TensorFlow. It also looks like the TensorFlow model is general enough to tackle some other distributed systems data processing applications, for example large-scale distributed monitoring at the datacenters. I wonder if there are already TensorFlow implementations for such distributed systems services.

In 2011, Steve Yegge ranted about the lack of platforms thinking in Google. It seems like Google is doing good in that department lately. TensorFlow constitutes an extensible and flexible distributed machine learning platform to leverage for several directions.

Sunday, May 15, 2016

Useful podcasts

I love listening to podcasts while commuting. I find that I learn a lot by listening to my selected list of podcasts, than just listening to the radio. I am blissfully ignorant about elections, US politics, and celebrity gossip.

Here are the podcasts I have been listening to. I will appreciate if you can let me know of your recommendations. Who said podcasting is dead?

(Here is one tip you may find useful. I listen to my podcasts at 1.5x speed. It is actually a more comfortable and natural way to listen to stuff than 1x. 1x is too slow, your brain starts losing the stream of conversation due to the slow pace, and starts wondering around. Give 1.5x a try.)

Dan Carlin's Hardcore History

Who knew history could be this interesting and captivating. Dan Carlin, a seasoned radio host and American political commentator, found his true calling in this podcasts series about history. The episodes are long, 4 hour episodes. But they are so exciting and captivating. I found myself depressed and scared while listening to World War 1 episodes. The Wrath of Khans episodes about the Genghis Khan were also very interesting.

The Tim Ferris Show

I don't like Tim Ferris's writing and his persona for the most part. But I am a big fan of his podcast. His host selection and his interviewing skills are excellent. The topics are almost always interesting. And I learn a lot from his podcasts.
For example, I learned about Dan Carlin's and Mike Rowe's podcasts from Tim's episodes. Just check this list, it is quite impressive. 

Some of my favorite episodes were:
Interview master: Cal Fussman and the Power of Listening
How Seth Godin Manages His Life -- Rules, Principles, and Obsessions
Luis von Ahn on Learning Languages, Building Companies, and Changing the World
The Scariest Navy SEAL Imaginable…And What He Taught Me
Scott Adams: The Man Behind Dilbert
Chris Sacca on Being Different and Making Billions

TEDTalks (audio)

I skip about half or 2/3rds of the talks, and listen to the ones that sound interesting. It is convenient to be able to listen to TED talks on your commute.

The Way I Heard It with Mike Rowe

Mike Rowe talks about 5 minutes about a captivating story. And, the story always has an amazing twist-ending, revelation at the end. Very interesting podcast.

OPTIMIZE with Brian Johnson

This podcast has 5 minute and 15 minute posts that review recent self-improvement books. It is nice to get the gist of the books summarized to you in a couple minutes to keep upto date with books.

Curious Minds: Innovation, Inspiration, Improvement

This podcast has 20 minute interviews with recent self-improvement books. The format of the interviews are a little dry. Topics are hit and miss.

This American Life

This American Life by Ira Glass. I don't think this requires any introduction.

Wednesday, April 13, 2016

Paper review. GraphLab: A new Framework for Parallel Machine Learning

This GraphLab paper is from 2010. It is written elegantly and it does a good job of explaining the GraphLab abstraction and foundations. A later VLDB 2012 paper presents how to extend this basic GraphLab abstraction to a distributed GraphLab implementation.

It seems like GraphLab has since took off big time. There are workshops and conferences about GraphLab. And GraphLab is now developed by the company Dato (formerly GraphLab inc.). Dato Inc. raised 6.75M$ from Madrona and New Enterprise Associates in A round, and 18.5M$ in B round from Vulcan Capital and Opus Capital, as well as Madrona and New Enterprise Associates.


The motivation for GraphLab was to hit the sweetspot in development of machine learning solutions. "(Then in 2010) Existing high-level parallel abstractions like MapReduce are insufficiently expressive, while low-level tools like MPI and Pthreads leave machine learning (ML) experts repeatedly solving the same design challenges."

GraphLab aimed to express asynchronous iterative algorithms with sparse computational dependencies while ensuring data consistency and achieving good parallel performance. This paper provides an efficient multicore parallel (but not distributed) shared-memory implementation of GraphLab. (The C++ reference implementation of the GraphLab is at

Related work

MapReduce works well on embrassingly data parallel tasks, but is not efficient for graph processing and iterative algorithms. This line from the paper gave me a chuckle. "By coercing efficient sequential ML algorithms to satisfy the restrictions imposed by MapReduce, we often produce inefficient parallel algorithms that require many processors to be competitive with comparable sequential methods." (Frank's laptop also took on against GraphLab.)

DAG abstraction represents parallel computation as a directed acyclic graph with data flowing along edges between vertices that correspond to computation/function. Dryad, Storm, Heron fit here. DAG does not naturally express iterative algorithms.

Dataflow abstraction in Naiad is a related work. This being a 2010 paper there is no comparison to Naiad in the paper. This is what Naiad (2013) has to say about GraphLab. "GraphLab and PowerGraph offer a different asynchronous programming model for graph computations, based on a shared memory abstraction. These asynchronous systems are not designed to execute dataflow graphs so the notion of completeness of an epoch or iteration is less important, but the lack of completeness notifications makes it hard to compose asynchronous computations. Although GraphLab and PowerGraph provide a global synchronization mechanism that can be used to write a program that performs one computation after another, they do not achieve task- or pipeline-parallelism between stages of a computation. Naiad allows programs to introduce coordination only where it is required, to support hybrid asynchronous and synchronous computation."

Petuum is also a closely related work. Petuum uses BSP, with SSP relaxation and nonuniform convergence. It looks like  GraphLab allows more fine-granular scheduling and supports data-graph computations and dependency modeling better. On the other hand, Petuum was designed clean-slate to support machine learning algorithms better, and introduced model-parallelism concept. GraphLab, as you will see in the summary below is principally a graph processing framework, with good applicability for graph-based machine learning tasks. Petuum paper says "Petuum ML implementations can run faster than other platforms (e.g. Spark, GraphLab4), because Petuum can exploit model dependencies, uneven convergence and error tolerance". The paper provides performance comparison experiments with GraphLab.

GraphLab abstraction

The minimalism yet generality of the GraphLab framework remind me of LISP. "The data graph G = (V, E) encodes both the problem specific sparse computational structure and directly modifiable program state. The user can associate arbitrary blocks of data (or parameters) with each vertex and directed edge in G." (Also the set scheduler that enables users to compose custom update schedules is clever and LISPy.)

Figure 3 illustrates the overall GraphLab framework. A GraphLab program is composed of the following parts:
1. A data graph which represents the data and computational dependencies.
2. Update functions which describe local computation
3. A Sync mechanism for aggregating global state
4. A data consistency model (i.e., Fully Consistent, Edge Consistent or Vertex Consistent), which determines the extent to which computation can overlap.
5. Scheduling primitives which express the order of computation and may depend dynamically on the data.

Data Model

The GraphLab data model consists of two parts: a directed data graph and a shared data table. The data graph G = (V, E) encodes both the problem specific sparse computational structure and directly modifiable program state. The user can associate arbitrary blocks of data (or parameters) with each vertex and directed edge in G. To support globally shared state, GraphLab provides a shared data table (SDT) which is an associative map, T: [Key] --> Value, between keys and arbitrary blocks of data. Here, Key can be a vertex, edge, result of a sync computation as explained below, or a global variable, say model parameter.

User defined computation

Computation in GraphLab can be performed either through an update function which defines the local computation, or through the sync mechanism which defines global aggregation. The Update Function is analogous to the Map in MapReduce, and sync mechanism is analogous to the Reduce operation. A GraphLab program may consist of multiple update functions and it is up to the scheduling model to determine which update functions are applied to which vertices and in which parallel order.


The GraphLab update schedule describes the order in which update functions are applied to vertices and is represented by a parallel data-structure called the scheduler. The scheduler abstractly represents a dynamic list of tasks (vertex-function pairs) which are to be executed by the GraphLab engine. This is minimalist yet flexible and general model. Since writing a scheduler is hard, GraphLab provides a default BSP style scheduler or a round-robin scheduler.

Since many ML algorithms (e.g., Lasso, CoEM, Residual BP) require more control over the tasks that are created and the order in which they are executed, GraphLab also allows update functions to add and reorder tasks. For this, GraphLab provides 1) FIFO schedulers that only permit task creation but do not permit task reordering, and 2) prioritized schedules that permit task reordering at the cost of increased overhead.


The paper  demonstrates the expressiveness of the GraphLab framework by designing and implementing parallel versions of belief propagation, Gibbs sampling, Co-EM, Lasso and Compressed Sensing.


GraphLab Wikipedia page

Paper Review. Petuum: A new platform for distributed machine learning on big data