Splunking Kafka with Kafka Connect

For this post I’m going to walk through how to use Kafka Connect (and the kafka-connect-splunk connector) along with a Splunk Heavy Forwarder to stream data from a Kafka cluster to one or many Splunk instances.  If you’re facing the prospect of building a data pipeline to consume data from a Kafka instance and forward it on to Splunk, your list of options is fairly short.

1. Splunk Add-On for Kafka

Ask Splunk and they will point you to their official, Splunk-built Kafka Technology Add-on (TA).  This TA works ok at small scale – say one or two heavy forwarders responsible for consuming, say, four or five Kafka topics.  Scaling to a web-scale Kafka data pipeline quickly becomes overly cumbersome.  Even other devs at Splunk admit the TA has serious shortcomings.  It’s achilles heel being that it acts a simple consumer.  If you run multiple instances, each instance is unaware of all the other instances and there’s no mechanism for them to coordinate with each other.  If you fire up the TA and point it at a Kafka topic it will automatically take ownership of all partitions in the topic.  In order to spread topic consumption across multiple TA instances you’ll need to manually configure which partitions which instances consume from.  The TA also adds three more conf files to the forest of Splunk configuration files – kafka.conf which handles overall configuration for the TA, kafka_forwarder_credentials.conf who’s use the documentation doesn’t really make very clear, and kafka_credentials.conf which doesn’t really deal with credentials of any sort and should really be inputs.conf.  I could go on, but I won’t.

All that being said the TA is actually a good start and is being heavily and actively developed.  I’ve spoken with the devs that work on it and I’m excited to see releases coming out this year.

2. Roll Your Own

Given the lack of options (1) in the space most folks have decided to develop their own solution.  And why not?  The Kafka consumer API is robust, stable and well documented.  Just grab one of your Java devs (or Python), give them your requirements, throw some documentation at them and have them whip up something.  If you have the resources and the developer cycles to do this, great, it may be your best option.  There is a third option, though.

3. Kafka Connect

Kafka Connect is a framework for easily streaming data to and from Kafka, and has been included as part of the Kafka package since 0.9.  It basically plays the role of producer or consumer (or both), doing all the heavy lifting of those processes for you.  Out of the box Kafka Connect coordinates pools of workers across multiple instances, task distribution, and tracks manages offsets.  No need to write your own producer or consumer, you just need to focus on what’s called the connector.  The connector tells Connect where to copy data from and to and performs the serialization / deserialization.  You can write your own connector, but there’s a slew of bundled and community supported connectors out there already.  HDFS, JDBC, Elasticsearch, Splunk – wherever you need to get data, there’s probably a connector already out there.    Plus, Connect is written by the same people who develop Kafka itself.  Who better to create a data transport framework for Kafka?

A deep dive in to Connect and how it works would be fun, but that is for another time and another post.  For now I’m going to assume you’ve already gone through the documentation – either Apache’s (good) or Confluent’s (excellent) – and have at least a familiarity with the concepts and common terms.  I’m also going to assume that you have at least the same familiarity with the Splunk concepts here – namely HTTP Event Collector, types of forwarders, indexers, search heads, etc.

Architecture

For the purposes of this walkthrough, the architecture of the logical data flow will be as follows:

logical

Physically (for a reason I’ll explain later) the Kafka Connect instance will be running on the same host as the heavy forwarder.  Within that Kafka Connect / Heavy Forwarder host, the logical flow of data will look like so:

kc_hf_logical

You might be asking – if we’re going to use HEC, why have a heavy forwarder at all?  Why not just send directly to HEC endpoints on the Splunk instances themselves?  Several reasons:

  • The kafka-connect-splunk connector config only allows (for now) to output to a single HEC endpoint per connector, so it cannot load-balance across a number of HEC endpoints.  If you have a large HEC environment you’ll likely already have some sort of load-balanced infrastructure in place that will multiplex across all of your HEC endpoints in a given cluster.  If so, you might not need to use a heavy forwarder in this scenario.  However Splunk forwarders, by design, multiplex across indexers.
  •  Speed – tcpout between forwarders and indexers is simply faster than HEC.  YMMV but in my testing a single tcpout thread from a heavy forwarder to an indexer will run around 10.5MB/sec, whereas a HEC thread in the same config will run around 6.5MB/sec.  If you’re dealing with terabytes of data going to many Splunk clusters, each with dozens of indexers, this difference can have a significant impact.
  • Having a heavy forwarder in the picture gives you the option to index-and-forward.  This is a powerful option that lets you decide where to spend your CPU cycles.  If your data volume spikes or gets out of hand, you can move that processing off your indexers and on to these heavy forwarders.  This frees up your indexers to do what they’re supposed to do – index data and serve it up to search heads.

So, lets get started

1. Install Kafka Connect

Since they’re both part of the same package, installing Kafka Connect is just a matter of installing Kafka.  Download the version of Kafka that matches the brokers you’ll be connecting to, untar the tarball in to the location where you want to run it, and you’re done.

2. Configure Kafka Connect

Since this architecture is designed for scalability were going to be running Connect in distributed mode.  Really, any production implementation of Connect will involve multiple hosts / containers, and will be running in distributed mode.

The main configuration file for Connect is in config/connect-distributed.properties.  This config file is brief and basic.  Here’s the configuration I’ll be using here, which is only slightly different than the default:

rest.port=8083

bootstrap.servers=kafka-host:9092
group.id=connect

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status

offset.flush.interval.ms=60000

The bootstrap.servers parameters is straight-forward – just set this to one (or more) of the brokers in your Kafka cluster.  group.id is a group name the Connect cluster will use when consuming topics.  If you plan on having multiple Connect clusters consuming from the same cluster, making sure each has a unique id here will make troubleshooting easier.  Otherwise, just pick an arbitrary name.

The next section specifying key and value converters deals with what format Connect will deserialize the data to after consuming it from Kafka.  Your two built-in options being  JSON or AVRO, but others are available.  Personally, all the data produced to my Kafka cluster is in JSON objects, and I want it sent off to Splunk in JSON format so I stick with the default converter here – org.apache.kafka.connect.json.JsonConverter.  The last two lines in this block enable using Schema Registry – if you’re not familiar with it, read up on it here – it’s worth a read even if you don’t plan on using it.  If you’re going to be serializing in multiple formats, use a wide variety of plugins with a wide variety of destinations, you’ll need to have Schema Registry setup.  For this purpose we’re working fairly simple JSON objects and nothing else, so we’ll disable Schema Registry by setting key.converter.schemas.enable and value.converter.schemas.enable  to false.

The next block of key value converter settings is the basically the same thing, but for the internal topics Connect uses for connector management.  These don’t need to be changed, I’m not sure why anyone would want to change them as they are completely internal to Connect.  I wouldn’t be surprised if these settings went away in a future release.  Speaking of internal topics…

All configuration, current status, and offset information for connectors is stored in Kafka topics.  Which is one of the primary strengths of Kafka Connect, making it not just fault tolerant but highly performant and easily scalable.  If a worker crashes, Connect will quickly detect it, know what that worker was doing when it stopped, and re-assign its tasks to another worker which will pick up right where it left off.  If you want to add more worker instances just spin them up and point them at these topics – Connect will detect the new workers and start assigning them tasks.

Before you can start up Kafka Connect you need to make three topics:

offset.storage.topic – The topic where all offset information is kept.  If a worker goes off line or is stopped, this is the topic that will let it know where to pick up from when it starts back up.  The official recommendation here is 50 partitions with a replication factor of at least 3.  My recommendation would be a replication factor that would ensure 60% coverage of all brokers – e.g. if you have 9 broker cluster, set the RF to 6.

config.storage.topic – This topic is where all connector-specific configurations are kept.  Connect consumes this topic and if you add / remove a connector or update an existing connector’s config, Connect will pick up the changes here and roll them out appropriately.  Not a large topic but a critical one.  Recommendation here is 1 partition with a RF of 3, though I recommend the same 60% rule here as well.

status.storage.topic – This is where all connectors and tasks current status is stored.  Recommendation here is 10 partitions with a RF of 3, but again I’d recommend a RF to cover 60% of your brokers.

Once you have your internal topics created and your configuration file set, startup Connect (on each node):

bin/connect-distributed.sh -daemon config/connect-distributed.properties

This will start up Connect in daemon mode.  Your main log file for Connect is logs/connectDistributed.out – check there to verify Connect started up without error.

3. Install kafka-connect-splunk Connector

The kafka-connect-splunk connector isn’t a bundled connector so it will require a bit of building before it be used.  First, clone the repository:

git clone https://github.com/jcustenborder/kafka-connect-splunk.git

Then cd to the directory and build with Maven:

cd kafka-connect-splunk ; mvn build

Once it is done, the built Java classes will be in target/kafka-connect-target/usr/share/java/kafka-connect-splunk –  copy the jars from this directory to a location in your CLASSPATH.  For simplicity (and since I don’t have access to /usr/share/java) I place all my connectors in the same location as Kafka Connect, under share/ – then update CLASSPATH to include this location.  The build also creates a tarball at target/ kafka-connect-splunk-X.X.X.tar.gz with all the jars, for easy distribution.

4. Configure a kafka-connect-splunk Connector

All connectors are configured using key value settings in JSON blocks passed to Connect via Connect’s HTTP API.  In fact, all your interaction with Connect will be via the API.  Check here for the full reference of all API calls.  Now that you have the kafka-connect-splunk connector jars installed and in your CLASSPATH, you can instantiate a connector:

curl -X POST -H "Content-Type: application/json" -d '{"name":"test-splunk-sink","config":{"connector.class":"io.confluent.kafka.connect.splunk.SplunkHttpSinkConnector","splunk.ssl.enabled":"true","splunk.remote.host":"localhost","tasks.max":"5","topics":"topic_to_consume","splunk.remote.port":"8088","splunk.ssl.validate.certs":"false","splunk.auth.token":"1E080C37-6D1D-42C2-81F6-07C89E5DA7F2"}}' localhost:8083/connectors

Most of the time curl can be used for this purpose with a bit of care – the lines can tend to get long and error prone if you have fat fingers.  I recommend drafting the commands in an editor first so you can get a better view of them and validate the format before you execute them.  I use Atom along with the Pretty JSON plug in, which makes the above a lot more readable:

{
   "name": "test-splunk-sink",
   "config": {
       "connector.class": "io.confluent.kafka.connect.splunk.SplunkHttpSinkConnector",
       "splunk.ssl.enabled": "true",
       "splunk.remote.host": "localhost",
       "tasks.max": "5",
       "topics": "topic_to_consume",
       "splunk.remote.port": "8088",
       "splunk.ssl.validate.certs": "false",
       "splunk.auth.token": "1E080C37-6D1D-42C2-81F6-07C89E5DA7F21"
   }
}

Everything here is documented on the kafka-connect-splunk Github repo but all these settings are straight forward. Just a few to note:

  • splunk.remote.host – In this scenario we’re setting it to local host, since the heavy forwarder will be on the same host.
  • splunk.auth.token – This is the HEC token we will configure on the heavy forwarder in the next few steps.  Even if you haven’t configured the heavy forwarder with this token yet, you can create a new one and use it here (I use guidgenerator.com), and just it the same one when configuring the forwarder.
  • tasks.max – My recommendation here is to set this to the number of partitions for the topic you are consuming.  This will ensure that data is flowing from each partition as data comes in, and the partition is not sitting around waiting for a worker to get to it.  Setting the number or workers higher than the number of partitions doesn’t increase  the consumption performance as each partition can only be consumed by one worker at a time

Now that you’ve created an instance of the kafka-connect-splunk connector you can verify.   Access the /connectors context to get a list of connectors:

# curl localhost:8083/connectors
["test-splunk-sink"]
#

/connectors/test-splunk-sink/status  will get you the current status of all workers:

# curl localhost:8083/connectors/test-splunk-sink/status | jq
{
  "name": "test-splunk-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "thishost:8083"
  },
  "tasks": [
    "state": "RUNNING",
    "id": 0,
    "worker_id": "anotherhost:8083",
    ...
    ...
  ]
}
#

If /status shows something other than RUNNING for your tasks, verify the configuration using the /connectors/test-splunk-sink/config context:

# curl localhost:8083/connectors/test-splunk-sink/config | jq
{
  "connector.class": "io.confluent.kafka.connect.splunk.SplunkHttpSinkConnector",
  "splunk.ssl.enabled": "true",
  "splunk.remote.host": "localhost",
  "tasks.max": "5",
  "topics": "topic_to_consume",
  "splunk.remote.port": "8088",
  "splunk.ssl.validate.certs": "false",
  "splunk.auth.token": "1E080C37-6D1D-42C2-81F6-07C89E5DA7F21"
}

5. Install the Splunk Heavy Forwarder

Installing a heavy forwarder is really just a matter of installing an instance of Splunk Enterprise and configuring it for forwarding.  Go ahead and install an Enterprise instance per Splunk’s instructions here.

6. Configure the Splunk Heavy Forwarder

I don’t enable the web UI on heavy forwarders since all of our management is done either through deployment servers or other automation tools.  You can do all the following config via the UI if you like.

I’m a big fan of keeping all things as simple as possible, and self-contained as possible.  To keep it self-contained I always deploy HEC as a new, separate app under etc/apps – usually something named like (environment)_(destination cluster)_(instance type)_httpinput, e.g. test_salescluster_hf_httpinput.  You’re welcome to use the built-in splunk_httpinput app for this purpose, I just find creating a new one with a  more descriptive name makes keeping track of things easier.  Having it setup as a separate app also enables you to manage it via a deployment server or other tool such as Puppet or Ansible much easier.  For our purposes here I’ll call the app test_connect_httpinput.

In the vein of keeping things as simple as possible, we’ll need just four basic files under the etc/apps/test_connect_httpinput/local app – inputs.conf, outputs.conf, props.conf and transforms.conf.

Starting at the front of the pipeline, configure inputs.conf for data coming from the kafka-connect-splunk connector:

local/inputs.conf

First, the main http input configuration:

[http]
dedicatedIoThreads = 2
disabled = 0
enableSSL = 1
sslVersions = tls1.2
cipherSuite = TLSv1.2:!aNULL:!eNULL:!LOW:!MEDIUM:!RC4:!DES-CBC-SHA:+HIGH

As usual, since default/inputs.conf takes care of most of the settings, I only specify the settings that need changing.  The default for dedicatedIoThreads is 1 – I’ve been in the habit of starting at 2 and upping it from there if I find disk IO is low.  It’s one parameter I have found can provide a significant improvement on performance when dealing with HEC inputs.  I enable SSL on heavy forwarders, though in this configuration its not really necessary since traffic between Connect and the heavy forwarder is all local.  Still, I enable it here to keep my heavy forwarder config consistent across the board.

Now the input:

[source::http://connect-test-input]
disabled = 0
token = "1E080C37-6D1D-42C2-81F6-07C89E5DA7F21"

Pretty straightforward – set a name for the input, enable it, set the token (making sure to use the same token from when you configured the connector).  You can also change the sourcetype here which I like to do.  By default it’ll show up in Splunk as http:connect-test-input, but I set it to kafka_(topic name) which is more descriptive in this case, and makes for easier searching later.

Next, configure outputs.conf with your tcpout destinations.  Again, this is just a very standard Splunk style outputs config:

local/outputs.conf

[tcpout:sales_cluster]
server=splunk-sales1.company.com:9998,splunk-sales2.company.com:9998,splunk-sales3.company.com:9998

This setup is designed to route Kafka data to multiple Splunk clusters so we’ll need to configure routing via props / transforms:

local/props.conf

[source::http://connect-test-input]
TRANSFORMS-routing = to_sales_cluster

Here again is more standard props.conf config just telling the heavy forwarder “anything that comes from source X, apply transforms X to it”.

local/transforms.conf

[to_sales_cluster]
REGEX = .
DEST_KEY = _TCP_ROUTING
FORMAT = sales_cluster

And the transform – take anything that matches this regex (in this case, everything), transform this key (in this case _TCP_ROUTING) to this FORMAT (in this case, sales_cluster – the stanza for the destination cluster in outputs.conf)

If you’re new to Splunk or new to routing with Splunk, this method of routing with props/transforms will seem a little opaque.  It is, and I’m not a huge fan of it.  It’s unintuitive, it’s easy to mis-configure, and once you start routing multiple data flows to multiple Splunk clusters it’s a bit too much to keep in your head at once.  Transforms.conf is great for what it was designed to do – transform data.  Using it to route data is unintuitive and seems like a bit of a hack.  Hopefully Splunk will give us better options in the future.

Now just restart your heavy forwarder to reload this new app with its new config and check your splunkd.log and connectDistributed.out to verify everything went according to plan.

If Connect is able to connect to your heavy forwarder you’ll see offset commit messages in connectDistributed.out similar to:

[2017-02-10 18:35:19,600] INFO WorkerSinkTask{id=test-splunk-sink-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Letting you know that Connect has consumed a topic offset and finished processing it to it’s destination.

Done!

You should now have a single data pipeline set up which will consume a Kafka topic and forward it to a Splunk Enterprise cluster of your choice.  Simply repeat steps 4 and 6 to add more pipelines.

If you have any questions or if there’s an error in any of these steps, please let me know in the comments below.

3 thoughts on “Splunking Kafka with Kafka Connect

  1. Pingback: Last week in Stream Processing & Analytics – 20.02.2017 | Enjoy IT - SOA, Java, Event-Driven Computing and Integration

  2. Pingback: Log Compaction - Highlights in the Apache Kafka™ and Stream Processing Community - March 2017 - Confluent

  3. Hi,

    Extremely useful blog. Have a question though.

    As part of the request to splunk from Kafka, I would also like to pass a different host to the one that gets passed automatically (ex: log_data instead of localhost:8088). How can that be accomplished.

    HEC Request being sent from Kafka to Splunk now :
    {“event”:{“data”:[{“name”:”somebody”,”id”:”100″}]}}

    HEC Request required:
    {“host”:”log_data”, “event”:{“data”:[{“name”:”somebody”,”id”:”100″}]}}

    Please help !

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s