How to load Kafka data into Elasticsearch via Logstash



Introducing a simple method to load Kafka data using the ETL module Logstash of the full-text search service Elasticsearch and the CData JDBC driver.

Elasticsearch is a popular distributed full-text search engine. By centrally storing data, you can perform ultra-fast searches, fine-tuning relevance, and powerful analytics with ease. Elasticsearch has a pipeline tool for loading data called "Logstash". You can use CData JDBC Drivers to easily import data from any data source into Elasticsearch for search and analysis.

This article explains how to use the CData JDBC Driver for Kafka to load data from Kafka into Elasticsearch via Logstash.

Using CData JDBC Driver for Kafka with Elasticsearch Logstash

  • Install the CData JDBC Driver for Kafka on the machine where Logstash is running.
  • The JDBC Driver will be installed at the following path (the year part, e.g. 20XX, will vary depending on the product version you are using). You will use this path later. Place this .jar file (and the .lic file if it's a licensed version) in Logstash.
    C:\Program Files\CData\CData JDBC Driver for ApacheKafka 20XX\lib\cdata.jdbc.apachekafka.jar
  • Next, install the JDBC Input Plugin, which connects Logstash to the CData JDBC driver. The JDBC Plugin comes by default with the latest version of Logstash, but depending on the version, you may need to add it.
    https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-jdbc.html
  • Move the CData JDBC Driver’s .jar file and .lic file to Logstash's "/logstash-core/lib/jars/".

Sending Kafka data to Elasticsearch with Logstash

Now, let's create a configuration file for Logstash to transfer Kafka data to Elasticsearch.

  • Write the process to retrieve Kafka data in the logstash.conf file, which defines data processing in Logstash. The input will be JDBC, and the output will be Elasticsearch. The data loading job is set to run at 30-second intervals.
  • Set the CData JDBC Driver's .jar file as the JDBC driver library, configure the class name, and set the connection properties to Kafka in the form of a JDBC URL. The JDBC URL allows detailed configuration, so please refer to the product documentation for more specifics.
  • Set BootstrapServers and the Topic properties to specify the address of your Apache Kafka server, as well as the topic you would like to interact with.

    Authorization Mechanisms

    • SASL Plain: The User and Password properties should be specified. AuthScheme should be set to 'Plain'.
    • SASL SSL: The User and Password properties should be specified. AuthScheme should be set to 'Scram'. UseSSL should be set to true.
    • SSL: The SSLCert and SSLCertPassword properties should be specified. UseSSL should be set to true.
    • Kerberos: The User and Password properties should be specified. AuthScheme should be set to 'Kerberos'.

    You may be required to trust the server certificate. In such cases, specify the TrustStorePath and the TrustStorePassword if necessary.

Executing data movement with Logstash

Now let's run Logstash using the created "logstash.conf" file.

logstash-7.8.0\bin\logstash -f logstash.conf

A log indicating success will appear. This means the Kafka data has been loaded into Elasticsearch.

For example, let's view the data transferred to Elasticsearch in Kibana.

    GET apachekafka_table/_search
    {
        "query": {
            "match_all": {}
        }
    }
Querying the Kafka data loaded into Elasticsearch

We have confirmed that the data is stored in Elasticsearch.

Confirming the Kafka data loaded into Elasticsearch

By using the CData JDBC Driver for Kafka with Logstash, it functions as a Kafka connector, making it easy to load data into Elasticsearch. Please try the 30-day free trial.

Ready to get started?

Download a free trial of the Apache Kafka Driver to get started:

 Download Now

Learn more:

Apache Kafka Icon Apache Kafka JDBC Driver

Rapidly create and deploy powerful Java applications that integrate with Apache Kafka.