Henry S. Coelho

Real-time analytics with Apache Druid

Every now and then there is a new product for Big Data™ that makes working with huge datasets easy: BigQuery, Redshift, Elasticsearch… You name it. Apache Druid, a database focused on real-time analytics with high performance, was the latest one to catch my attention. In this post I am going to look at Druid's key features, set up a small instance, ingest some data, and make a few queries.

Let's start with the key features:

Rollup during ingestion

Rollup during ingestion

Sounds pretty good, right? I wanted to try these features out myself, and here is what I learned.

BigQuery, Elasticsearch, Redshift…

This comparison must sound strange, since these are very different technologies and not really direct competitors, but why would you choose Druid over any of these? All these technologies are excellent at handling massive amounts of data - this goes without saying. But what does Druid solve that BigQuery, Redshift, and Elasticsearch do not?

Elasticsearch

Elasticsearch is a database that focuses primarily on textual search, and not analytics/aggregations; although it does seem to be focusing more and more in this field. Still, Elasticsearch is much less efficient or powerful when working with analytics and aggregations. Druid, on the other hand, has some support for textual search, but it is very primitive compared to Elasticsearch; instead, Druid focuses on high-performance aggregation, analytics and ingestion. Both have some characteristics in common though: both are (mostly) open-source, both use JSON as their primary querying language, both have limited SQL support, and both will take a while to set up (although you can get Elasticsearch SaaS out there).

In summary: if you need powerful textual search, go with Elasticsearch; if you need powerful anaytics, go with Druid.

Redshift and BigQuery

Now, I am not a huge expert on Redshift, so forgive me if I say something wrong, but let's talk a bit about it and BigQuery.

As far as I know, Redshift is the only one in this group that does not support streaming-inserts. Instead, you will have to write data into files on Amazon S3 and copy the data into Redshift. It is possible to stream data into both BigQuery and Druid - the latter includes connectors for sources like Kinesis and Kafka. Druid also has a unique feature for ingestion that neither BigQuery nor Redshift have, which is the "rollup during ingestion" that I mentioned before.

Before inserting your data, however, it would be a good idea to consider how these databases partition it. Both BigQuery and Druid offer partitioning by time, meaning that if your queries are selecting data based on time, they will be very efficient, since irrelevant data will not be queried. Redshift does not offer this kind of partitioning, and relies on a hashed key to distribute data between data nodes (I think Google BigTable also uses the same method) - this is a problem when you need to add or remove nodes, since the data needs to be re-hashed.

After we figured out our partition method and ingestion, then comes querying. Both BigQuery and Redshift use SQL for querying, and naturally, their support for SQL is very good; this is not the case for Druid, which has a much more limited SQL interface. This may be a problem if you are thinking about migrating from one of these two and don't want to rewrite anything.

Lastly, we should think about hosting. Unlike Druid, you will not have to host Redshift and BigQuery yourself: these are proprietary technologies from Amazon and Google respectively, which are hosted by AWS and GCP. It is very possible that we will see Druid SaaS very soon (if it doesn't exist already), but it is likely that you will have to host it yourself at least for now. If you choose to do so, Kubernetes may be a great tool for hosting Druid.

Before we move on, let's talk a little bit more about querying, specifically, cost and query time. I haven't used Redshift yet, so I don't know how it performs. However, I do have experience with BigQuery. BigQuery is insanely fast when you consider how much data it can go through - a query for aggregating several terabytes of data can easily take less than a minute. Although very impressive, we still have two problems:

This is where Druid shines. Rolling up data before ingestion means that we will transverse less data during query time, which will speed up the process dramatically and will make it much more efficient. This is a huge selling point if BigQuery's costs and query times are an issue.

Overall, Druid seems to shine if you want a database with focus on real-time analytics from timeseries data and you don't mind leaving SQL behind.

Running Druid

Druid is fault-tolerant and highly scalable, so it is meant to be deployed in a cluster (in several different machines). According to the documentation, a basic Druid deployment contains three different servers:

How many of each do you need? The number is not set in stone, but from my knowledge of Elasticsearch (and here I am hoping this knowledge is transferable), a small-but-reliable cluster would have:

In a Kubernetes environment, you can deploy each of these servers as a pod.

You can find the configuration for a simple clustered deployment here, and here is a tutorial for it.

Although running Druid in a cluster is recommended for production, I will run it in as a single server on my machine, since this great article I linked before already shows how to run it on Kubernetes with an HPA. Let's get it running.

  1. Download Druid from here.
  2. Decompress the contents:
$ tar -xzf apache-druid-0.16.0-incubating-bin.tar.gz
  1. Navigate to the newly created directory:
$ cd apache-druid-0.16.0-incubating
  1. Download Zookeeper, which is a dependency for Druid:
$ curl https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz -o zookeeper-3.4.14.tar.gz
$ tar -xzf zookeeper-3.4.14.tar.gz
$ mv zookeeper-3.4.14 zk
  1. Now we can start Druid:
$ ./bin/start-micro-quickstart
[Sun Dec  1 13:06:58 2019] Running command[zk], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/zk.log]: bin/run-zk conf
[Sun Dec  1 13:06:58 2019] Running command[coordinator-overlord], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/coordinator-overlord.log]: bin/run-druid coordinator-overlord conf/druid/single-server/micro-quickstart
[Sun Dec  1 13:06:58 2019] Running command[broker], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/broker.log]: bin/run-druid broker conf/druid/single-server/micro-quickstart
[Sun Dec  1 13:06:58 2019] Running command[router], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/router.log]: bin/run-druid router conf/druid/single-server/micro-quickstart
[Sun Dec  1 13:06:58 2019] Running command[historical], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/historical.log]: bin/run-druid historical conf/druid/single-server/micro-quickstart
[Sun Dec  1 13:06:58 2019] Running command[middleManager], logging to[/home/hscasn/Downloads/apache-druid-0.16.0-incubating/var/sv/middleManager.log]: bin/run-druid middleManager conf/druid/single-server/micro-quickstart

To stop Druid, just use CTRL+C, and to restart it, run the command from step 5 again.

Now we should be good to go. Navigate to http://localhost:8888 and you should see Druid's lovely console.

Apache Druid Console

Apache Druid Console

See that Servers section? Let's take a closer look.

Servers

Apache Druid Console - Servers

Apache Druid Console - Servers

You may be asking yourself what are these types… Coordinator? Overlord? Router? I was hoping to see master, data, and query instead. Well, it turns out that functionality is split inside the servers in what Druid call processes, which live inside the servers:

In our case, the single server we just started instantiated all these servers with their processes for us.

Ingesting data

Now let's try adding some data to our Druid instance. Clicking on the Load Data link in the menu takes us to this page:

Apache Druid Console - Load data screen

Apache Druid Console - Load data screen

One of the options is to just paste the raw data, but I will take a slightly more difficult route and use Kafka for this.

To run Kafka, you can use this compose file I adapted from here (this will avoid conflicts of ports with Druid's Zookeeper):

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2182:2182"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2182
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.3.1
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2182"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

You can run this file like this:

$ docker-compose -f <file.yaml> up

Now let's ssh into our Kafka container:

$ docker exec -it kafka-docker_kafka1_1 bash

And now we can create a topic:

# Inside docker_kafka1_1 container
$ kafka-topics --zookeeper kafka-docker_zoo1_1:2182 --create --topic druid_topic --replication-factor 1 --partitions 1

Finally, let's publish some messages:

# Inside docker_kafka1_1 container
$ kafka-console-producer --broker-list localhost:9092 --topic druid_topic
hello
world
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 4}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 2}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 3}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 5}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 1}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:40.000Z", "clicks": 2}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:00.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:10.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:20.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:30.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:00.000Z", "clicks": 6}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:10.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:20.000Z", "clicks": 9}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:30.000Z", "clicks": 7}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:40.000Z", "clicks": 8}

Take a closer look at those JSON messages I sent. There are two users there: John (1) and Cena (2). Both users performed a few clicks in the same minute (John performed 8 clicks and Cena performed 14 clicks at 00:00). After that, John performed 5 more clicks at 00:05 and Cena performed 32 more clicks at 00:07. Let's see how the rollup feature deals with this!

Now let's try connecting Druid to our Kafka topic. From the Load Data screen, we select Apache Kafka, and fill the information about the "Bootstrap servers" (our host, which is localhost:9092) and topic name (druid_topic).

After filling this information, click on the Preview button and you should see the messages we published in the topic.

Apache Druid Console - Load data (connecting) screen

Apache Druid Console - Load data (connecting) screen

Sweet! Now the next page: Parsing Data.

Parsing data

Now it is time to tell Druid how to parse our data. We inserted some random string messages there, but we also used JSON. Let's tell Druid to ignore the string messages and use our JSON data instead. For this, we select the JSON parser.

Apache Druid Console - Load data (Parsing with JSON parser) screen

Apache Druid Console - Load data (Parsing with JSON parser) screen

Since we also have nested fields, I am going to add column flattening for both User ID and User Name:

Apache Druid Console - Load data (Parsing with flattening fields) screen

Apache Druid Console - Load data (Parsing with flattening fields) screen

And now we can see how our data is going to be parsed.

Apache Druid Console - Load data (Parsing preview after flattening) screen

Apache Druid Console - Load data (Parsing preview after flattening) screen

The string messages were not parsed - but that was expected. The important part is that we got our JSON messages working.

Now to the next stage: Parsing time.

Apache Druid Console - Load data (Parsing time) screen

Apache Druid Console - Load data (Parsing time) screen

As we can see, Druid recognized that we already have a column with a valid date, and it defaulted the time partitioning column to that value. Very helpful!

The next screen gives us some options to transform the data before completely ingesting it. We don't really have a good use case for it, but let's try creating a new column that simply concatenates the user ID with the username.

Apache Druid Console - Load data (transform) screen

Apache Druid Console - Load data (transform) screen

This screen gives us the opportunity to set some filters to remove any unwanted data. In this case, I don't want to set any filters, so let's move on.

Apache Druid Console - Load data (filter) screen

Apache Druid Console - Load data (filter) screen

The next screen is where things get interesting: schema definition.

At this stage, we can see the columns for user_name and user_id, but date and clicks now disappeared, and we see 3 new columns: count, sum_clicks, and sum_user_id.

What we are seeing there is a preview of what our data will look like after being rolled-up and ingested. Count is the count of "rows" that were rolled up in that timeframe, sum_clicks is the sum of the clicks column for that timeframe, and sum_user_id is the sum of user_id in that timeframe!

Apache Druid Console - Load data (schema definition) screen

Apache Druid Console - Load data (schema definition) screen

Just a few little problems there:

  1. The default roll-up is by hour, but I want it by the minute
  2. Rolling up the User ID is completely useless. I don't want this

Let's start by changing the roll-up strategy. Click on the __time column and change the granularity to minute:

Apache Druid Console - Load data (Rollup granularity) screen

Apache Druid Console - Load data (Rollup granularity) screen

If the page does not reload, click on another stage in the top menu (like Filter) and then go back to Configure Schema.

Now we can click on the user_name_id column and remove it.

Apache Druid Console - Load data (Rollup - removing column) screen

Apache Druid Console - Load data (Rollup - removing column) screenn

With this, we can move forward. The next three stages are optional. They allow us to change the strategy of partitioning, tuning how the data will be ingested, and tuning how to deal with errors. I will leave the default values.

Finally, we get a confirmation screen with a JSON configuration for the source. This is my configuration:

{
  "type": "kafka",
  "ioConfig": {
    "type": "kafka",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "topic": "druid_topic"
  },
  "tuningConfig": {
    "type": "kafka"
  },
  "dataSchema": {
    "dataSource": "druid_topic",
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "MINUTE",
      "rollup": true
    },
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "date",
          "format": "iso"
        },
        "flattenSpec": {
          "fields": [
            {
              "type": "path",
              "expr": "$.user.name",
              "name": "user_name"
            },
            {
              "type": "path",
              "name": "user_id",
              "expr": "$.user.id"
            }
          ]
        },
        "dimensionsSpec": {
          "dimensions": [
            "user_name",
            "user_name_id"
          ]
        }
      }
    },
    "transformSpec": {
      "transforms": [
        {
          "type": "expression",
          "name": "user_name_id",
          "expression": "concat(\"user_name\", \"user_id\")"
        }
      ]
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "sum_clicks",
        "type": "longSum",
        "fieldName": "clicks"
      }
    ]
  }
}

With the creation of the source confirmed, the task to ingest data from Kafka should start. If you go to the Tasks page, you will see something similar to this:

Apache Druid Console - Tasks screen

Apache Druid Console - Tasks screen

Our ingestion pipeline is ready. However, you may notice that the main page still indicates "0 datasources" - this is because Druid still hasn't received data from Kafka. The data we inserted before was only for setting up our dataset, so we need to send it again. This time, let's send only the clean data:

# Inside docker_kafka1_1 container
$ kafka-console-producer --broker-list localhost:9092 --topic druid_topic
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 4}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:00.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 2}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:10.000Z", "clicks": 3}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 5}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:20.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:30.000Z", "clicks": 1}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:00:40.000Z", "clicks": 2}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:00.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:10.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:20.000Z", "clicks": 1}
{"user": {"id": 1, "name": "john"}, "date": "2019-12-01T00:05:30.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:00.000Z", "clicks": 6}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:10.000Z", "clicks": 2}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:20.000Z", "clicks": 9}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:30.000Z", "clicks": 7}
{"user": {"id": 2, "name": "cena"}, "date": "2019-12-01T00:07:40.000Z", "clicks": 8}

And now we can see a dataset running!

Apache Druid Console - Main screen datasources

Apache Druid Console - Main screen datasources

Apache Druid Console - Datasources screen

Apache Druid Console - Datasources screen

Querying

Time to look at the data! Similar to Elasticsearch, Druid has an HTTP API to perform queries, but we can also take the easy route this time and use the Console to run them. Navigate to the Query page on the top menu, and let's try some SQL.

Apache Druid Console - Query screen with SQL

Apache Druid Console - Query screen with SQL

There is our data!

Now let's try some native JSON querying. I am going to count how many clicks each user made in a daily bucket (since we only added data for one day, we should see only two rows in the results).

Apache Druid Console - Query screen with native JSON query

Apache Druid Console - Query screen with native JSON query

There it is! Isn't it awesome?