Various data storages have seen increased growth over the last few years. There are now so many of them that it becomes overwhelming to choose the right tool for the job. Among all of the Big Data ready replacements for traditional relational databases (think Cassandra or HBase), there are also a few tools that try to solve a slightly different problem: to provide a centralized log of data, that is both writable by multiple producers and easily readable by multiple consumers.
The original idea of using this kind of log as a central piece of data infrastructure comes from LinkedIn engineers, who developed Kafka and then branched out into a separate company Confluent to continue building (and eventually selling) tools that develop this idea even further. I will not try to rephrase the ideas behind Kafka, but all of you should definitely go and read this excellent article The Log: What every software engineer should know about real-time data’s unifying abstraction to have more context.
The gist of the article is as follows:
The core of data infrastructure should be a distributed highly available log, to which you can write data from any data producer and from where you can copy data into any other data sources.
Of course, it’s not only about forwarding data from this log to another database. As you could guess, this kind of log also allows performance of various computations on its data stream (which is a term that can be used instead of “log” in this case). An example would be real time user behavior analysis.
Kinesis: Amazon’s Kafka
Kafka, while being a great tool and impressive piece of engineering, has an obvious disadvantage: you have to manage it yourself. Prior to Kafka 0.9 there is a hard dependency on Zookeeper and taking care of configuration, scalability and reliability of these components is your responsibility. Amazon Web Services (AWS), seeing increased demand for a tool like Kafka, developed Kinesis – “a fully managed, cloud-based service for real-time data processing over large, distributed data streams”. Or, in simpler words: a managed Kafka-like service by AWS.
As usual, AWS takes care of providing a reliable service and (almost) all you have to do is to throw your data at Kinesis and read it from there. There are some manual steps, though – you have to decide on the number of shards in your stream and also things like choosing a partition key are up to you. Kinesis also has some limitations. For example each shard can handle only up to 1,000 records or 1MB per second and each message size should not exceed 1MB. The main limitation though is that data is persisted in a stream only for 24 hours. These limits do change for the better, though and you can check the actual state of them in the docs.
There is also a conceptual limitation compared to Kafka: Kinesis has no concept of topics. You either read all the records in the stream or you don’t read anything. You can, however, just create multiple streams, so unless you are going to create dozens of streams every minute – and why would you, anyway – you are going to be okay.
There are many great things about Kinesis, like the ability to consume records using AWS Lambda or to stream all changes in DynamoDB tables to a… egh… DynamoDB streams. I did prepare a reading list for you to learn more about Kinesis at the end of the article. But next, let’s focus on how Kinesis is used at Babbel.
Trees of queues aka old data pipeline
Babbel is mostly AWS-based. The current data pipeline is mostly based on Simple Queue Service (SQS) – another AWS product for queues (whoa). SQS is indeed dead simple, works perfectly fine and even allows you to persist messages for up to 14 days. It also provides lots of features you can expect from a queue (see documentation).
One missing feature, perhaps, is being a real queue: due to distributed nature of SQS it is not a FIFO system. There are no guarantees neither that a message is read only once nor that the order of messages is correct. But these are details.
Because SQS is a queue, you can’t really work with the same message simultaneously in multiple consumers and that’s great, because it allows you to easily implement almost exactly-once semantic: after the consumer has processed the message it can just delete it and, yay, no-one can access it again.
There are also cases when this message gets deleted on _some_ SQS servers and not on the others and, due to the distributed nature of SQS, you can get even a deleted message twice (see official FAQ), but these are also details.
Now that we know how great SQS is, I can show you how it is used as a core component of a data pipeline at Babbel. Below is a slightly simplified diagram of this pipeline.
Producers (these could be anything from public facing applications to daemons) write everything to the Master Queue, which basically just accumulates all the events. Then a separate application, Master Queue Worker (MQW), pulls messages from the Master Queue and distributes them across multiple other queues. There are three of them on the diagram, but actually Babbel has many more.
MQW doesn’t blindly copy the same message to all destination queues. Some configuration describes what types of events should go to which destination queue. Of course, for this to work you have to have a consistent, predictable and documented naming scheme for your events.
Even with this configuration in place, many destination queues in the end receive all the messages the master queue receives. For instance, all the events are stored in multiple data stores. S3 is used for archiving, Elasticsearch for in-house insights and RDS is the primary tool of Babbel’s BI team.
There is nothing wrong with this system. Actually, it works perfectly and allows any amount of branching – you can add more levels of queues beneath the master queue. For example, you could dispatch all payment events to Payment Queue, and then dispatch “failed payment” events to another queue and work with these in a separate app or set alerts on this queue. There are three issues, though and all of them became obvious with the appearance of Kinesis.
1. SQS is not that cheap…
…or at least this architecture is not cheap. Even though SQS is indeed a cheap solution when you get started, it becomes more and more costly as you put more data in it. In case of Babbel, there is a need to pay not only for the master queue but also for every destination queue. And, as I said, destination queues are often a duplicate of the master queue, so, roughly, the price for full processing (going through the whole pipeline to all destinations) of one message should be multiplied by the number of queues.
2. Batch limits of SQS are pretty strict
You can pull at most 10 messages and you can put at most 10 messages at a time. Now, compare it with Kinesis, where you can put batches of 500 messages and pull batches of up to 10,000 messages. You can imagine the performance benefit of switching to Kinesis here.
3. SQS is not the right tool for this job
An attentive reader might have already noticed that the SQS approach, while simple, is basically a custom solution to the problem that Kafka and Kinesis solve: to give a centralized log into which you can plug virtually unlimited producers and consumers. Indeed, creating trees of queues is nothing more than a workaround, because you can’t attach multiple consumers to one queue.
New data pipeline
Naturally, Kinesis is just a perfect fit for solving the same problem, so let’s discuss its introduction in more detail.
– Oh, hold on for a second. Why Kinesis? Kafka is more fancy, it has topics, and it is almost as hot as Docker!
There are a few good reasons for Babbel to choose Kinesis instead of Kafka, the most important one being the cost of managing our own Kafka cluster. In an environment fully based on AWS there is not that much sense in rolling out a Kafka cluster. Even though feature-wise Kafka is, of course, superior to Kinesis as a data stream, Kinesis still is fully integrated into the AWS environment and solves all security, monitoring, scaling etc. issues without any effort.
That said, Kafka might be re-evaluated as a replacement, especially because as of version 0.9 it has no Zookeeper dependency. Confluent also now provides Schema Registry and the concept of topics is still desirable to have. But at the moment of choosing the tool the downside of supporting an own Kafka cluster was outweighing the benefits.
Let’s get visual first and take a look at the (still simplified) diagram of the new data pipeline.
As you can see, SQS is still used as the primary data receiver. With all the disadvantages of SQS described above, it still has a the benefit of storing data for 14 days, while Kinesis stream does it only for 24 hours. Just to avoid taking this risk now, SQS is left as a buffer. Also, most of the destination queues do still exist. But some of them are already replaced by Kinesis stream with another intermediate Kinesis Queue.
There are currently two new workers – one for Elasticsearch (ES) and another one for S3. These two data stores were chosen because, first, they process the same amount of data as the master queue and, second, S3 and Elasticsearch are not as important for production operations as some internal workers that actually perform actions instead of just copying data. So replacing S3 Queue and ES Queue both reduced data pipeline costs and allowed us to try out Kinesis with production load without serious consequences.
With the new pipeline, everyone can easily plug new workers into the Kinesis stream. Minor read-operation limits exist in Kinesis, but they can be easily resolved by adding more shards.
Overall, we encountered no serious issues transitioning to Kinesis. This technology has been built to manage terabytes of data per day, which is not yet the case at Babbel, but the new data pipeline is already prepared for any kind of traffic explosions.
What nobody told us about Kinesis
Writing Kinesis consumer applications still requires some tooling around it and involves a lot of components. It was known before that writing consumers would be more complicated than doing it for SQS.
The main reason is that Kinesis by its nature is an append-only store and you can’t remove records from it. It makes total sense, because how else would we be able to attach multiple consumers to it? If one consumer removes a message, then others can’t read it – a problem that existed with SQS.
This means that application developers have to worry about choosing a good partition key, synchronizing multiple workers of the same application (so they don’t read the same message multiple times), taking care of locking shards and persisting the check pointer.
The check pointer, as you might know, allows the consumer to understand until which record it has processed the stream and allows to avoid duplication of messages. It’s nice not to have duplicates, but where the hell should you store this check pointer?
Luckily, the AWS team solved this problem too. There are two ways you can write Kinesis apps: by talking to the API yourself and by using the Kinesis Client Library (KCL)*. Talking to the API even via SDK is both painful and wrong, because you have to take care of all the problems I’ve mentioned above.
Kinesis Client Library, on the other side, is an open source tool that takes care of all the pain points (except choosing a good partition key) and allows you to focus on writing the application itself. You can then deploy this application on multiple servers and they will synchronize seamlessly. As a consequence, it practically doesn’t matter if your consumer application (S3 Worker and ES Worker on the diagram below) is deployed on one EC2 instance or multiple ones.
But there are still two issues even if you use Kinesis Client Library. First, KCL takes care of checkpointing by using separate DynamoDB tables for each application. You need to configure correct IAM permissions to make it work. It doesn’t take long to do so, but you have to keep in mind that the actual state of each consumer is stored in DynamoDB and using it increases your monthly AWS bill a bit. Each KCL application is also automatically monitored by CloudWatch, so you also have a few useful metrics out of the box (once again – if you configure the required IAM permissions).
Second, Kinesis Client Library is 100% Java and at Babbel, many services are relying on Ruby (except some hidden Go pieces). Ouch. There is a solution though: KCL also implements something called MultiLangDaemon. Let me quote the doc:
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over STDIN and STDOUT using a defined protocol. There will be a one to one correspondence amongst record processors, child processes, and shards. … This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
In theory you can use any programming language as long as it is Ruby will read from this MultiLangDaemon. That would be still a pain to use it, though, if the AWS Team wouldn’t also have implemented aws-kclrb: a wrapper around MultiLangDaemon that allows you to, finally, focus on writing Kinesis apps.
Let’s not think about the ridiculous “rb” in the end of the Ruby gem name and just be happy about it existing!
Even with these tools in place, you still need to take care of deploying the applications. Configuration the worker server requires at least these steps:
- Installing Java
- Installing KCL with all dependencies
- Configuring Ruby app
- Creating a system service that binds it all together
Currently there is no open source solution for this process. At Babbel we wrote a small custom framework, which abstracts all Java parts away and it might be open sourced one day. We also wrote a few Chef cookbooks to automate the whole deployment part.
Summary and outlook
After all deployment details were figured out it became quite trivial to create new consumers. While it took a few days of exploration and trial-and-error for writing the S3 Worker, it took only a few hours to implement the complete ES Worker.
One apparent disadvantage of using a single Kinesis stream is that every worker is now obliged to read the whole stream to find the events it needs. This issue can be solved by introducing multiple streams which will result in a system similar to the SQS-based one. Or it can also be ignored as long as every worker is still capable of reading through the complete stream of events (which is not really a hard performance problem for the current load).
The other one, already mentioned above, is a retention period of only 24 hours. For Kinesis a solid infrastructure that can handle all the incoming events without significant delays and downtime is required. But, knowing that LinkedIn sets their Kafka log retention period to just 2 days, smaller scale start ups can feel confident that 24 hours is enough.
Our new data pipeline with Kinesis in place allows us to plug new consumers without causing any damage to the current system, so it’s possible to rewrite all Queue Workers one by one and replace them with Kinesis Workers. In general, the transition to Kinesis was smooth and there were not so tricky parts.
Another outcome was significantly reduced costs – handling almost the same amount of data as SQS, Kinesis appeared to be many times cheaper than SQS.
Reading list
- AWS Summit Video on building KCL apps
- Go client for Kinesis
- Kinesis Producer Library, an efficient C++ based library for writing records to Kinesis.
- Amazon Kinesis – Real-Time Processing of Streaming Big Data
- Amazon Kinesis Documentation
Also feel free to ask about it in the comments.
* KCL is not the only way to work with Kinesis streams. There are also libraries for Spark and Storm