Real-time analytics with Apache Druid

Every now and then there is a new product for Big Data™ that makes working with huge datasets easy: BigQuery, Redshift, Elasticsearch... You name it. Apache Druid, a database focused on real-time analytics with high performance, was the latest one to catch my attention. In this post I am going to look at Druid's key features, set up a small instance, ingest some data, and make a few queries. Let's start with the key features: + **Column storage** - Popular SQL databases such as MySQL and PostgreSQL use rows as their basic blocks of storage. These databases perform best for operations in the whole row - they are faster to insert/update/delete, but they are less performant if your query only needs a few columns, since it will load the entire row in memory. Databases with column storage, however, use columns as their basic blocks of storage. Inserts/updates/deletes are slower, since you have to repeat this operation for every column of the new entity, but it is much more performant if you are dealing with aggregations and querying only a few columns, since the database will only have to load the columns it needs in memory. + **Indexes for string values** - If you are searching for strings, Druid will keep them indexed, so query time will be greatly reduced. + **Support for both streaming and batch insertion** - Loading data on Druid can be done in bulk (for example, from files on Hadoop) or streaming (from message queues such as Kafka). + **Flexible schemas with nesting (kind of...)** - Druid does not handle completely unstructured data, but you can change the schema later and re-index all segments. If you don't reindex all segments, then the ones that were already ingested will keep the old schema. Nesting is *almost* supported - the documentation says it is supported, but I think this is a bit of a stretch: Druid can handle nested schemas (for example, nested JSON objects), but it will be flattened at ingestion time, and you need to tell it how to flatten the field. + **Druid partitions data by time** - In order to distribute the data between nodes, Druid will use a time field (which is mandatory, but can be created at ingestion time) from the "row" you are inserting. + **JSON is Druid's native query language, but it also supports SQL** - If you ever used Elasticsearch, you probably know what I am talking about. If you have some SQL queries that you are using with an existing database, you may be able to migrate them to Druid without a problem, but if you really want to take advantage of all the features, you should probably use its native JSON language. + **Horizontal scalability** - A database for huge amounts of data would not be complete without this. Running Druid as a cluster means that we can add more and more machines as our requirements increase. + **Druid clusters can be autoscaled** - This is great news if you have access to Kubernetes. There are Helm charts ready for deploying Druid and you can also make use of the Horizontal Pod Autoscaler to automatically scale Druid clusters! + **Automatic rollup during ingestion time** - Instead of storing individual rows, Druid *can* optionally perform a pre-aggregation for data during ingestion time, storing only the result of this pre-aggregation. This saves a lot of storage space and will make aggregation much faster later on. ![Rollup during ingestion](
Rollup during ingestion
Sounds pretty good, right? I wanted to try these features out myself, and here is what I learned. # BigQuery, Elasticsearch, Redshift... This comparison must sound strange, since these are very different technologies and not really direct competitors, but why would you choose Druid over any of these? All these technologies are excellent at handling massive amounts of data - this goes without saying. But what does Druid solve that BigQuery, Redshift, and Elasticsearch do not? ## Elasticsearch Elasticsearch is a database that focuses primarily on textual search, and not analytics/aggregations; although it does seem to be focusing more and more in this field. Still, Elasticsearch is much less efficient or powerful when working with analytics and aggregations. Druid, on the other hand, has some support for textual search, but it is very primitive compared to Elasticsearch; instead, Druid focuses on high-performance aggregation, analytics and ingestion. Both have some characteristics in common though: both are (mostly) open-source, both use JSON as their primary querying language, both have limited SQL support, and both will take a while to set up (although you can get Elasticsearch SaaS out there). In summary: if you need powerful textual search, go with Elasticsearch; if you need powerful anaytics, go with Druid. ## Redshift and BigQuery Now, I am not a huge expert on Redshift, so forgive me if I say something wrong, but let's talk a bit about it and BigQuery. As far as I know, Redshift is the only one in this group that does not support streaming-inserts. Instead, you will have to write data into files on Amazon S3 and copy the data into Redshift. It is possible to stream data into both BigQuery and Druid - the latter includes connectors for sources like Kinesis and Kafka. Druid also has a unique feature for ingestion that neither BigQuery nor Redshift have, which is the "rollup during ingestion" that I mentioned before. Before inserting your data, however, it would be a good idea to consider how these databases partition it. Both BigQuery and Druid offer partitioning by time, meaning that if your queries are selecting data based on time, they will be very efficient, since irrelevant data will not be queried. Redshift does not offer this kind of partitioning, and relies on a hashed key to distribute data between data nodes (I think Google BigTable also uses the same method) - this is a problem when you need to add or remove nodes, since the data needs to be re-hashed. After we figured out our partition method and ingestion, then comes querying. Both BigQuery and Redshift use SQL for querying, and naturally, their support for SQL is very good; this is not the case for Druid, which has a much more limited SQL interface. This may be a problem if you are thinking about migrating from one of these two and don't want to rewrite anything. Lastly, we should think about hosting. Unlike Druid, you will not have to host Redshift and BigQuery yourself: these are proprietary technologies from Amazon and Google respectively, which are hosted by AWS and GCP. It is very possible that we will see Druid SaaS very soon (if it doesn't exist already), but it is likely that you will have to host it yourself at least for now. If you choose to do so, Kubernetes may be a great tool for hosting Druid. Before we move on, let's talk a little bit more about querying, specifically, cost and query time. I haven't used Redshift yet, so I don't know how it performs. However, I do have experience with BigQuery. BigQuery is insanely fast when you consider how much data it can go through - a query for aggregating several terabytes of data can easily take less than a minute. Although very impressive, we still have two problems: + In many cases, we don't have 30 seconds to spare + Query cost goes up relatively to the amount of data transversed This is where Druid shines. Rolling up data before ingestion means that we will transverse less data during query time, which will speed up the process dramatically and will make it much more efficient. This is a huge selling point if BigQuery's costs and query times are an issue. Overall, Druid seems to shine if you want a database with focus on real-time analytics from timeseries data and you don't mind leaving SQL behind. # Running Druid Druid is fault-tolerant and highly scalable, so it is meant to be deployed in a cluster (in several different machines). According to the documentation, a basic Druid deployment contains three different servers: + **Data servers** are responsible for ingesting and holding the data from the database. Usually this means that each data server will hold a fraction of all the data in the cluster, as well as backups for other data servers in case they fail. This setup ensures that even if a data server goes down, the data will still be available in other servers. + **Query servers** expose the API for querying and communicate with the data servers to run the queries. + **Master servers** are responsible for coordinating tasks (such as ingestion and data availability) between the other servers. How many of each do you need? The number is not set in stone, but from my knowledge of Elasticsearch (and here I am hoping this knowledge is transferable), a small-but-reliable cluster would have: + **3 master servers.** More than one master server means that we will always have another one available in case one of them goes down, and an odd number of master servers means that they will always reach a consensus when making decisions via elections (problems reaching consensus is known as the Split Brain problem.) + **2 data servers.** More than one data server is important to ensure that data will always be available, even if one of the servers become unresponsive. + **2 query servers.** Similarly to the data servers, we always want at least one query server to be available. In a Kubernetes environment, you can deploy each of these servers as a pod. You can find the configuration for a simple clustered deployment here, and here is a tutorial for it. Although running Druid in a cluster is recommended for production, I will run it in as a single server on my machine, since this great article I linked before already shows how to run it on Kubernetes with an HPA. Let's get it running. 1. Download Druid from here. 2. Decompress the contents: ```bash $ tar -xzf apache-druid-0.16.0-incubating-bin.tar.gz ``` 3. Navigate to the newly created directory: ```bash $ cd apache-druid-0.16.0-incubating ``` 4. Download Zookeeper, which is a dependency for Druid: ```bash $ curl -o zookeeper-3.4.14.tar.gz $ tar -xzf zookeeper-3.4.14.tar.gz $ mv zookeeper-3.4.14 zk ``` 5. Now we can start Druid: ```bash $ ./bin/start-micro-quickstart [Sun Dec 1 13:06:58 2019] Running command[zk], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/zk.log]: bin/run-zk conf [Sun Dec 1 13:06:58 2019] Running command[coordinator-overlord], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/coordinator-overlord.log]: bin/run-druid coordinator-overlord conf/druid/single-server/micro-quickstart [Sun Dec 1 13:06:58 2019] Running command[broker], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/broker.log]: bin/run-druid broker conf/druid/single-server/micro-quickstart [Sun Dec 1 13:06:58 2019] Running command[router], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/router.log]: bin/run-druid router conf/druid/single-server/micro-quickstart [Sun Dec 1 13:06:58 2019] Running command[historical], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/historical.log]: bin/run-druid historical conf/druid/single-server/micro-quickstart [Sun Dec 1 13:06:58 2019] Running command[middleManager], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/middleManager.log]: bin/run-druid middleManager conf/druid/single-server/micro-quickstart ``` To stop Druid, just use CTRL+C, and to restart it, run the command from step 5 again. Now we should be good to go. Navigate to http://localhost:8888 and you should see Druid's lovely console. ![Apache Druid Console](
Apache Druid Console
See that **Servers** section? Let's take a closer look. ## Servers ![Apache Druid Console - Servers](
Apache Druid Console - Servers
You may be asking yourself what are these types... Coordinator? Overlord? Router? I was hoping to see *master*, *data*, and *query* instead. Well, it turns out that functionality is split inside the servers in what Druid call **processes**, which live inside the servers: + **Data server** + **Historical** - Handles historical data (data that has already been consolidated in the database) + **Middle Manager** - Handles ingestion of new data into the cluster + **Indexer** - An optional alternative to the Middle Manager, which uses threads instead of processes. You should use either the Indexer or the Middle Manager, but not both. + **Query server** + **Brokers** - Receives requests for queries and forward them to the data servers + **Router** - An optional gateway that sits in front of the brokers + **Master server** + **Coordinator** - Manages how data is distributed between the Historical processes + **Overlord** - Manages how data is ingested in the MIddle Manager processes In our case, the single server we just started instantiated all these servers with their processes for us. # Ingesting data Now let's try adding some data to our Druid instance. Clicking on the `Load Data` link in the menu takes us to this page: ![Apache Druid Console - Load data screen](
Apache Druid Console - Load data screen
One of the options is to just paste the raw data, but I will take a slightly more difficult route and use Kafka for this. To run Kafka, you can use this compose file I adapted from here (this will avoid conflicts of ports with Druid's Zookeeper): ```yaml version: '2.1' services: zoo1: image: zookeeper:3.4.9 hostname: zoo1 ports: - "2182:2182" environment: ZOO_MY_ID: 1 ZOO_PORT: 2182 ZOO_SERVERS: server.1=zoo1:2888:3888 volumes: - ./zk-single-kafka-single/zoo1/data:/data - ./zk-single-kafka-single/zoo1/datalog:/datalog kafka1: image: confluentinc/cp-kafka:5.3.1 hostname: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2182" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data depends_on: - zoo1 ``` You can run this file like this: ```bash $ docker-compose -f up ``` Now let's ssh into our Kafka container: ```bash $ docker exec -it kafka-docker_kafka1_1 bash ``` And now we can create a topic: ```bash # Inside docker_kafka1_1 container $ kafka-topics --zookeeper kafka-docker_zoo1_1:2182 --create --topic druid_topic --replication-factor 1 --partitions 1 ``` Finally, let's publish some messages: ```bash # Inside docker_kafka1_1 container $ kafka-console-producer --broker-list localhost:9092 --topic druid_topic hello world {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 4} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 2} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 3} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 5} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 1} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:40.000Z", "clicks": 2} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:00.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:10.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:20.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:30.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:00.000Z", "clicks": 6} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:10.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:20.000Z", "clicks": 9} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:30.000Z", "clicks": 7} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:40.000Z", "clicks": 8} ``` Take a closer look at those JSON messages I sent. There are two users there: John (1) and Cena (2). Both users performed a few clicks in the same minute (John performed 8 clicks and Cena performed 14 clicks at 00:00). After that, John performed 5 more clicks at 00:05 and Cena performed 32 more clicks at 00:07. Let's see how the rollup feature deals with this! Now let's try connecting Druid to our Kafka topic. From the **Load Data** screen, we select **Apache Kafka**, and fill the information about the "Bootstrap servers" (our host, which is `localhost:9092`) and topic name (`druid_topic`). After filling this information, click on the Preview button and you should see the messages we published in the topic. ![Apache Druid Console - Load data (connecting) screen](
Apache Druid Console - Load data (connecting) screen
Sweet! Now the next page: Parsing Data. ## Parsing data Now it is time to tell Druid how to parse our data. We inserted some random string messages there, but we also used JSON. Let's tell Druid to ignore the string messages and use our JSON data instead. For this, we select the JSON parser. ![Apache Druid Console - Load data (Parsing with JSON parser) screen](
Apache Druid Console - Load data (Parsing with JSON parser) screen
Since we also have nested fields, I am going to add column flattening for both User ID and User Name: ![Apache Druid Console - Load data (Parsing with flattening fields) screen](
Apache Druid Console - Load data (Parsing with flattening fields) screen
And now we can see how our data is going to be parsed. ![Apache Druid Console - Load data (Parsing preview after flattening) screen](
Apache Druid Console - Load data (Parsing preview after flattening) screen
The string messages were not parsed - but that was expected. The important part is that we got our JSON messages working. Now to the next stage: Parsing time. ![Apache Druid Console - Load data (Parsing time) screen](
Apache Druid Console - Load data (Parsing time) screen
As we can see, Druid recognized that we already have a column with a valid date, and it defaulted the time partitioning column to that value. Very helpful! The next screen gives us some options to transform the data before completely ingesting it. We don't really have a good use case for it, but let's try creating a new column that simply concatenates the user ID with the username. ![Apache Druid Console - Load data (transform) screen](
Apache Druid Console - Load data (transform) screen
This screen gives us the opportunity to set some filters to remove any unwanted data. In this case, I don't want to set any filters, so let's move on. ![Apache Druid Console - Load data (filter) screen](
Apache Druid Console - Load data (filter) screen
The next screen is where things get interesting: schema definition. At this stage, we can see the columns for `user_name` and `user_id`, but `date` and `clicks` now disappeared, and we see 3 new columns: `count`, `sum_clicks`, and `sum_user_id`. What we are seeing there is a preview of what our data will look like after being rolled-up and ingested. **Count** is the count of "rows" that were rolled up in that timeframe, **sum\_clicks** is the sum of the **clicks** column for that timeframe, and **sum\_user\_id** is the sum of **user\_id** in that timeframe! ![Apache Druid Console - Load data (schema definition) screen](
Apache Druid Console - Load data (schema definition) screen
Just a few little problems there: 1. The default roll-up is by hour, but I want it by the minute 1. Rolling up the User ID is completely useless. I don't want this Let's start by changing the roll-up strategy. Click on the **__time** column and change the granularity to **minute**: ![Apache Druid Console - Load data (Rollup granularity) screen](
Apache Druid Console - Load data (Rollup granularity) screen
If the page does not reload, click on another stage in the top menu (like **Filter**) and then go back to **Configure Schema**. Now we can click on the **user\_name\_id** column and remove it. ![Apache Druid Console - Load data (Rollup - removing column) screen](
Apache Druid Console - Load data (Rollup - removing column) screenn
With this, we can move forward. The next three stages are optional. They allow us to change the strategy of partitioning, tuning how the data will be ingested, and tuning how to deal with errors. I will leave the default values. Finally, we get a confirmation screen with a JSON configuration for the source. This is my configuration: ```yaml { "type": "kafka", "ioConfig": { "type": "kafka", "consumerProperties": { "bootstrap.servers": "localhost:9092" }, "topic": "druid_topic" }, "tuningConfig": { "type": "kafka" }, "dataSchema": { "dataSource": "druid_topic", "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "MINUTE", "rollup": true }, "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "date", "format": "iso" }, "flattenSpec": { "fields": [ { "type": "path", "expr": "$", "name": "user_name" }, { "type": "path", "name": "user_id", "expr": "$" } ] }, "dimensionsSpec": { "dimensions": [ "user_name", "user_name_id" ] } } }, "transformSpec": { "transforms": [ { "type": "expression", "name": "user_name_id", "expression": "concat(\"user_name\", \"user_id\")" } ] }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "sum_clicks", "type": "longSum", "fieldName": "clicks" } ] } } ``` With the creation of the source confirmed, the task to ingest data from Kafka should start. If you go to the **Tasks** page, you will see something similar to this: ![Apache Druid Console - Tasks screen](
Apache Druid Console - Tasks screen
Our ingestion pipeline is ready. However, you may notice that the main page still indicates "0 datasources" - this is because Druid still hasn't received data from Kafka. The data we inserted before was only for setting up our dataset, so we need to send it again. This time, let's send only the clean data: ```bash # Inside docker_kafka1_1 container $ kafka-console-producer --broker-list localhost:9092 --topic druid_topic {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 4} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 2} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 3} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 5} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 1} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:40.000Z", "clicks": 2} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:00.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:10.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:20.000Z", "clicks": 1} {"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:30.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:00.000Z", "clicks": 6} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:10.000Z", "clicks": 2} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:20.000Z", "clicks": 9} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:30.000Z", "clicks": 7} {"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:40.000Z", "clicks": 8} ``` And now we can see a dataset running! ![Apache Druid Console - Main screen datasources](
Apache Druid Console - Main screen datasources
![Apache Druid Console - Datasources screen](
Apache Druid Console - Datasources screen
# Querying Time to look at the data! Similar to Elasticsearch, Druid has an HTTP API to perform queries, but we can also take the easy route this time and use the Console to run them. Navigate to the **Query** page on the top menu, and let's try some SQL. ![Apache Druid Console - Query screen with SQL](
Apache Druid Console - Query screen with SQL
There is our data! Now let's try some native JSON querying. I am going to count how many clicks each user made in a daily bucket (since we only added data for one day, we should see only two rows in the results). ![Apache Druid Console - Query screen with native JSON query](
Apache Druid Console - Query screen with native JSON query
There it is! Isn't it awesome? # Conclusion I am very impressed by how easy it was to get up and running with Druid. I haven't had time yet to play with many features or try more advanced aggregations, but I am very excited about this new technology and I really want to work more with it. If you work with real-time analytics, Druid is definitely worth checking out.