Enterprise Java

Processing real-time data with Storm, Kafka and ElasticSearch – Part 3

This is the third part of the article series: Processing real-time data with Storm, Kafka, and ElasticSearch.

1. Introduction

In the second part, we learned how to perform searches in ElasticSearch. However, we failed to import the .json flight data files to ElasticSearch using its bulk API. In this article, we will do some programming, and learn some ways on how to import the .json flight data files to ElasticSearch:

  1. By transforming the .json flight data files to the format that ElasticSearch’s bulk API likes
  2. By parsing the .json flight data files, extracting their values using a JSON library (like gson) and then importing the data using ElasticSearch’s REST APIs.

If you can’t follow up what I‘m talking about, you may start from the first part of this series of articles and continue with the second part; then come back here.

Our scenario will be to process historical flight data. Ideally, these data would arrive in real-time from a number of sensors (radars), but since this is a bit difficult to achieve, we shall use batch flight history data that you can download from here. An explanation of the various data fields can be found here.

2. Transform the JSON Flight data file

ElasticSearch expects its .json documents to have a specific format:

{"index":{"_id":4800770}}
{"Rcvr":1,"HasSig":false,"Icao":"494102", "Bad":false,"Reg":"CS-PHB", ...}
...

This means that you have to convert each downloaded .json file to the above format, which consists of the following steps:

  • Add a line that starts with "index" above each actual data document
  • Move the "Id":<value> to {"_id":<value>}

We can write a Java program to format the .json files according to the format that the bulk API needs.

Opening a .json file in an editor that can handle big text files (e.g. UltraEdit if you use Windows, Sublime or TextMate or vim in other Operating Systems), you will notice that before the actual data we need, we get a line like this that we can ignore (the useful data start at acList):

{"src":1,"feeds":[{"id":1,"name":"ADSBexchange.com","polarPlot":false}],"srcFeed":1,"showSil":true,"showFlg":true,"showPic":true,"flgH":20,"flgW":85,"acList":[

and at the end of the file we can ignore everything after the last ]:

],"totalAc":5900,"lastDv":"636029391340296420","shtTrlSec":61,"stm":1467378030008}

Here is our first attempt:

package com.jgc;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
 * Converts a flight data json file to a format that can be imported to 
 * ElasticSearch using its bulk API.
 */
public class JsonFlightFileConverter {
    private static final Path flightDataJsonFile = 
        Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
            list = stream
                    .map(line -> line.split("\\{"))
                    .flatMap(Arrays::stream)
                    .collect(toList());
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(list);
    }
}

We read the .json file as a Java 8 stream in a try-with-resources block, and then we split the single long line that the file contains by the character {. This produces a Stream<String[]> which is not what we want, so we use an flatMap in order to transform it to a Stream<String>. Finally, we collect the results to a List<String>.

If we debug the above program by adding a breakpoint in the last line, we easily notice that we don’t need the first 3 elements of the list as they correspond to the first part of the file that we mentioned before.

Next step will be to try to assemble our output. Replace the last command in the program with the following lines:

final String result = list.stream().skip(3)
                .map(s -> "{" + s + "\n")
                .collect(Collectors.joining());
System.out.println(result);

We see that the output is now very close to what we wish to achieve.

{"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...

We can actually join the last code snippet to the original stream like so:

String result = "";
try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
     result = stream
            .map(line -> line.split("\\{"))
            .flatMap(Arrays::stream)
            .skip(3)
            .map(s -> "{" + s + "\n")
            .collect(Collectors.joining());
} catch (IOException e) {
    e.printStackTrace();
}

We now need to insert a new line above each of these lines that contains the index of the document, like so:

{"index":{"_id":4800770}}

We can create a function to make our code easier to read:

private static String insertIndex(String s) {
    final String[] keyValues = s.split(",");
    final String[] idKeyValue = keyValues[0].split(":");
    return "{\"index\":{\"_id\":"+ idKeyValue[1] +"}}\n";
}

The above method receives a full line like this:

{"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...

splits it first by comma to get key value pairs (e.g. "Rcvr":1), then splits the first key-value pair, which is going to be {"Id":4800770, by colon (:). The document’s id is the second element of the array which we use to construct our resulting string:

{"index":{"_id":4800770}}

One more detail that we need to fix. We need to remove the last comma from each document.

private static String removeLastComma(String s) {
    return s.charAt(s.length() - 1) == ',' ? s.substring(0, s.length() - 1) : s;
}

Now our stream processing code becomes like so:

String result = "";
try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
     result = stream
            .map(line -> line.split("\\{"))
            .flatMap(Arrays::stream)
            .skip(3)
            .map(s -> insertIndex(s) + "{" + removeLastComma(s) + "\n")
            .collect(Collectors.joining());
} catch (IOException e) {
    e.printStackTrace();
}

But we forgot that we also need to remove the first entry (e.g. "Id":4800770,) from our actual data JSON object. Let’s better do it in its own method. The modifications are shown below (we replace the map() call above with the following):

...
.map(s -> createResult(s))
...

/**
 * Create the result required by ElasticSearch's bulk API
 * @param s a flight data JSON object
 * @return the flight data JSON object prepended by {@code {"index":{"_id":"<id>"}}}
 */
private static String createResult(String s) {
    final String[] keyValues = s.split(",");
    final String[] idKeyValue = keyValues[0].split(":");
    return "{\"index\":{\"_id\":"+ idKeyValue[1] +"}}\n" + "{" + removeLastComma(removeIdEntry(s)) + "\n";
}
/**
 * Removes {@code "Id":<id>,}
 * @param s a flight document (JSON object)
 * @return a flight document without its {@code Id} entry
 */
private static String removeIdEntry(String s) {
    return s.substring(s.indexOf(",") + 1);
}

The method createResult() constructs the final string, i.e. the two lines that contain the index’s key and the actual flight data (no checks are done, for readability; validations need to be added to this method). To make the code more readable, we have added removeIdEntry() method (without any validations).

And don’t forget to remove the last part after the last ] and add an empty newline at the end of the file:

     .collect(Collectors.joining());
result = result.substring(0, result.lastIndexOf("]")) + "\n";

Not bad. The last step is to write the result to a file.

private static final Path outFlightDataJsonFile = 
      Paths.get("src/main/resources/flightdata/2016-07-01-1300Z-es.json");
…
    .collect(Collectors.joining());
    Files.writeString(outFlightDataJsonFile, result);
} catch (IOException e) {
    e.printStackTrace();
}

Excellent. We managed to convert a big .json file to the format we want, or more accurately, to the format that ElasticSearch’s bulk API wants, with just few lines of code. We can also use all of the CPUs/cores of our machine by converting our stream to a parallel stream:

result = Files.lines(inFlightDataJsonFile.toAbsolutePath())
        .parallel()
       .map(line -> line.split("\\{")) ...

Amazing isn’t it? The power of streams and multi-threading in our hands.

We can now make our program more generic to receive the .json file as a command-line parameter or passing it a directory and then it will read all .json files in that directory and convert them to the new format. I like more the idea of converting a batch of .json files at once, so lets do that.

Here is the code:

public class JsonFlightFileConverter {

 public static void main(String[] args) {
  if (args.length == 1) {
    Path inDirectoryPath = Paths.get(args[0]);
    if (inDirectoryPath != null) {
        Path outDirectoryPath = Paths.get(inDirectoryPath.toString(), "out");
        try {
            if (Files.exists(outDirectoryPath)) {
                Files.walk(outDirectoryPath)
                        .sorted(Comparator.reverseOrder())
                        .map(Path::toFile)
                        .forEach(File::delete);
            }
            Files.createDirectory(Paths.get(inDirectoryPath.toString(), "out"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        try (DirectoryStream ds = Files.newDirectoryStream(inDirectoryPath, "*.json")) {
            for (Path inFlightDataJsonFile : ds) {
                String result = "";
                try (Stream stream = 
                     Files.lines(inFlightDataJsonFile.toAbsolutePath())) {
			result = stream
                      .parallel()
                      .map(line -> line.split("\\{"))
                      .flatMap(Arrays::stream)
                      .skip(3)
                      .map(s -> createResult(s))
                      .collect(Collectors.joining());
                Path outFlightDataJsonFile = 
                     Paths.get(outDirectoryPath.toString(), 
                               inFlightDataJsonFile.getFileName().toString());
                Files.createFile(outFlightDataJsonFile);
                Files.writeString(outFlightDataJsonFile, result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 } else {
    System.out.println("Usage: java JsonFlightFileConverter ");
 }
...

I won’t go into much detail as this is outside the scope of this article. In short, you can call the program passing as argument the directory that contains the flight data json files, and it will output the resulted json files inside a subdirectory named out.

2.1 Import data using ElasticSearch’s bulk API

It is important to stress again that the file needs to end with an empty line. If not, then add one (actually the previous program has already added a newline at the end of the file). Inside the directory where you produced the new .json files (the out directory), issue the following command (per .json file):

curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"

Please note that the content type is "application/x-ndjson" and not "application/x-json". Also note that we denote the data to be binary in order to keep the newlines. The file name is 2016-07-01-1300Z.json. Any pre-existing documents in ElasticSearch with the same Ids will be replaced by the ones in the .json file. You may write a bash or powershell script to automate the process of importing all 9 json files 2016-07-01-1300Z.json to 2016-07-01-1309Z.json to ElasticSearch.

In the end you should have 7679 flight documents imported.

"hits" : {
    "total" : {
      "value" : 7679,
      "relation" : "eq"
    },
GET /_cat/shards?v

returns:

index   shard prirep state      docs   store ip        node
flight  0     p      STARTED    7679   71mb 127.0.0.1 MacBook-Pro.local
flight  0     r      UNASSIGNED 

Unfortunately, the documents have not been distributed evenly in my machine, as I have only one node started. If you have two nodes started, then you will see that the documents will be evenly distributed between the shards.

3. Parse the JSON flight data file

Another way to import these documents to ElasticSearch is to parse the JSON flight data file to memory and use ElasticSearch’s REST API to import them to ElasticSearch. There are a number of libraries to parse JSON files in Java:

JEP 198: Light-Weight JSON API is not yet part of JDK, unfortunately.

We will use the GSon library from Google but any other JSON library could do the job. Let’s write a simple program to parse 2016-07-01-1300Z.json using GSon.

GSon provides many ways to represent JSON data. And this representation will depend on the next step which is how to import the data to ElasticSearch. The ElasticSearch APIs will require the data to be of the form: Map<String, Object> and this is where we will store our parsed JSON data into.

We need to add the following dependency to your pom.xml using the latest version at the time you are reading this:

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>

Then we will parse 2016-07-01-1300Z.json:

package com.jcg;

import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

public class JsonFlightFileReader {
    private static final String flightDataJsonFile = "src/main/resources/flightdata/2016-07-01-1300Z.json";
    private static final Gson gson = new Gson();
    public static void main(String[] args) {
        parseJsonFile(flightDataJsonFile);
    }
    private static void parseJsonFile(String file) {
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(file))) {
            Map<String, Object> map = gson.fromJson(reader, 
                       new TypeToken<Map<String, Object>>() { }.getType());
            List<Object> acList = (List<Object>) (map.get("acList"));
            for (Object item : acList) {
                LinkedTreeMap<String, Object> flight = 
						(LinkedTreeMap<String, Object>) item;
                for (Map.Entry<String, Object> entry : flight.entrySet()) {
                    String key = entry.getKey();
                    Object value = entry.getValue();
                    String outEntry = (key.equals("Id") ? "{" + key : key) + " : " + value + ", ";
                    System.out.print(outEntry);
                }
                System.out.println("}");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

All the work is done by these two lines:

Map<String, Object> map = gson.fromJson(reader, 
                            new TypeToken<Map<String, Object>>() {}.getType());
List<Object> acList = (List<Object>) (map.get("acList"));

As we saw above, the actual flight data are under the key acList, which is how they are accessed here. Each list’s item is a Map<String, Object>, exactly what we will need next.

Below is another way. Apart from a Map<String, Object>, ElasticSearch’s REST API can also accept a String that is constructed of key-value pairs. The below method produces exactly that:

private static void parseJsonFile2(String file) {
    try (BufferedReader reader = 
                  Files.newBufferedReader(Paths.get(flightDataJsonFile))) {
        Map<String, JsonElement> map = gson.fromJson(reader,
                new TypeToken<Map<String, JsonElement>>() {}.getType());
        JsonElement acList = (map.get("acList"));
        JsonArray array = acList.getAsJsonArray();
        for (JsonElement item : array) {
            JsonObject itemAsJsonObject = item.getAsJsonObject();
            String id = itemAsJsonObject.get("Id").getAsString();
            StringBuilder recordAsString = new StringBuilder();
            Set<Map.Entry<String, JsonElement>> entries = 
                                 item.getAsJsonObject().entrySet();
            for (Map.Entry<String, JsonElement> entry : entries) {
                if (entry.getKey().equals("Id")) {
                    recordAsString.append("{");
                } else {
                    recordAsString.append(
              entry.getKey()).append(" : ").append(entry.getValue()).append(",");
                }
            }
            recordAsString.setCharAt(recordAsString.length() - 1, '}');
            System.out.println(recordAsString.toString());
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Here we get the result as a JsonArray which we construct a StringBuilder of key-values from.

3.1 Import data using ElasticSearch’s REST API

ElasticSearch provides two API’s to import data to it:

  • Java Low Level REST Client allows to communicate with an Elasticsearch cluster through http leaving requests’ marshaling and responses’ un-marshalling to users. It is compatible with all Elasticsearch versions.
  • Java High Level REST Client is based on the low-level client and it exposes API specific methods and take care of requests’ marshaling and responses’ un-marshalling. It was added in version 6.0.0.

3.1.1 Low level REST API

To use ElasticSearch’s low lever API, we need to add the following dependency to our pom.xml:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.10.0</version>
</dependency>

We can interact with ElasticSearch via a RestClient:

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http"));
.setDefaultHeaders(new Header[]{
        new BasicHeader("accept", "application/json"),
        new BasicHeader("content-type", "application/json")})
.setFailureListener(new RestClient.FailureListener() {
    public void onFailure(Node node) {
        System.err.println("Low level Rest Client Failure on node " +
                node.getName());
    }
}).build();

We build a RestClient with the help of a RestClientBuilder. It is good practice to set the default headers as well as a failure listener in case something goes wrong. The RestClient class is thread-safe and ideally has the same lifecycle as the application that uses it.

The next step is to create a Request:

Request request = new Request("POST", "/flight/_doc/4800770");

Next pass the JSON object to the request:

String jsonDoc = "{\"Rcvr\":1,\"HasSig\":false,\"Icao\":\"494102\",...]}";
request.setJsonEntity(jsonDoc);

The JSON document needs to be formatted like above, i.e. escaping the double quotes \".

Finally, we send the request. There are two ways, synchronously (i.e. blocking):

Response response = restClient.performRequest(request);
if (response.getStatusLine().getStatusCode() != 200) {
    System.err.println("Could not add document with Id: " + id + " to index /flight");
}

or asynchronously (non-blocking):

Cancellable cancellable = restClient.performRequestAsync(request,
    new ResponseListener() {
        @Override
        public void onSuccess(Response response) {
            System.out.println("Document with Id: " + id + " was successfully added to index /flight");
        }

        @Override
        public void onFailure(Exception exception) {
            System.err.println("Could not add document with Id: " + id + " to index /flight");
        }
});

Finally, don’t forget to close the client connection:

} finally {
    try {
        restClient.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

(unfortunately RestClient doesn’t implement the AutoCloseable interface to add it to a try-with-resources block).

If we execute the request asynchronously and we close the connection before the request completes we won’t be able to know if the request was successful or not. In our case, a synchronous request makes sense, however, if you have to process many documents, asynchronous requests has advantages. But how can you know when the request has finished? One way is to count the number of documents (succeeded and failed) and compare them with the total number of documents that you expect to index (should you know the number in advance of course). We will see later how you can count the number of indexed documents (see countDocument() method below). Alternatively, you can let the program wait for some time before you call restClient.close().

We indexed only one document. But how about the total number of flight documents that exist in our resources/flightdata folder? The strategy is to read the .json files, parse the contents with e.g. Gson, remove the unnecessary parts, construct the jsonDoc like in the example above for each flight JSON document, and post it to ElasticSearch using the RestClient as we saw above. Here is the code for one .json file (it is left as an exercise to write the code for all the .json files in the folder):

public class ElasticSearchLowLevelRestClient {
    private static final Path flightDataJsonFile =
            Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
    private static final Gson gson = new Gson();

    public static void main(String[] args) {
        RestClient restClient = getRestClient();
        try (BufferedReader reader = Files.newBufferedReader(flightDataJsonFile)) {
            Map<String, JsonElement> map = gson.fromJson(reader,
                    new TypeToken<Map<String, JsonElement>>() {}.getType());
            JsonElement acList = (map.get("acList"));
            JsonArray array = acList.getAsJsonArray();
            for (JsonElement item : array) {
                JsonObject itemAsJsonObject = item.getAsJsonObject();
                String id = itemAsJsonObject.remove("Id").getAsString();
                Request request = new Request("POST", "/flight/_doc/" + id);
                String jsonDoc = item.toString();
                jsonDoc = jsonDoc.replaceAll("\"", "\\\"");
                request.setJsonEntity(jsonDoc);
                Response response = restClient.performRequest(request);
                if (response.getStatusLine().getStatusCode() != 200) {
                    System.err.println("Could not add document with Id: " + id + " to index /flight");
                }
            }
            System.out.println("/flight/_count" + countDocuments(restClient, "flight"));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                restClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

For each JSON document in the file, we get and remove its Id, create a request out of it, replace any double quote " with \" and perform a synchronous request.

Issue the following command to Kibana:

GET /flight/_search

to verify how many documents are stored under index flight, or use the low level API again:

private static String countDocuments(RestClient restClient, String index) throws IOException {
    Request request = new Request("GET", "/"+index+"/_count");
    Response response = restClient.performRequest(request);
    String responseBody = EntityUtils.toString(response.getEntity());
    final String[] entries = responseBody.split(","); // {"count":5900,...
    return entries[0].substring(8);
}

which you can call from the main() method like so:

System.out.println("/flight/_count" + countDocuments(restClient, "flight"));

The client is quite happy to execute many actions in parallel, similar to ElasticSearch’s _bulk API:

final CountDownLatch latch = new CountDownLatch(array.size());
for (JsonElement item : array) {
    //...
    request.setJsonEntity(jsonDoc);
    Cancellable cancellable = restClient.performRequestAsync(
            request,
            new ResponseListener() {
                @Override
                public void onSuccess(Response response) {
                    System.out.println("Document with Id: " + id + " was successfully added to index /flight");
                    latch.countDown();
                }

                @Override
                public void onFailure(Exception exception) {
                    System.err.println("Could not add document with Id: " + id + " to index /flight");
                    latch.countDown();
                }
            }
    );
}
latch.await();

Please consult the Javadoc to learn more about ElasticSearch’s low-level REST API.

3.1.2 High level REST API

The Java High Level REST Client works on top of the Java Low Level REST client. Its main goal is to expose API specific methods, that accept request objects as an argument and return response objects, so that request’s marshalling and response’s un-marshalling is handled by the client itself.

It requires JDK 8 or later. We need to add the following coordinates to our maven project:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.10.0</version>
</dependency>

The High Level Java REST Client depends on the following artifacts and their transitive dependencies:

  • org.elasticsearch.client:elasticsearch-rest-client
  • org.elasticsearch:elasticsearch

A RestHighLevelClient instance needs a REST low-level client builder to be built as follows:

RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(new HttpHost("localhost", 9200, "http"));

The high-level client will internally create the low-level client used to perform requests based on the provided builder. That low-level client maintains a pool of connections and starts some threads so you should close the high-level client when you are well and truly done with it and it will in turn close the internal low-level client to free those resources. This can be done through:

client.close();

Like the low-level API, it supports both synchronous and asynchronous requests.

The Java High Level REST Client supports the following Document APIs:

Single document APIs

Multi-document APIs

As you can see, you can do almost everything with the High-Level API.

As you might have guessed, we will use the Bulk API. A BulkRequest can be used to execute multiple index, update and/or delete operations using a single request (synchronous or asynchronous):

BulkRequest request = new BulkRequest("flight"); 
request.add(new IndexRequest("flight").id("1")  
        .source(...));
…
client.bulk(request, RequestOptions.DEFAULT); 

The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.

The BulkRequest uses the Index API as you saw in the above code example (IndexRequest). The document source can be provided as a:

  • String,e.g.
String jsonString = "{\"Rcvr\":1,\"HasSig\":false,...}"
  • Map which gets automatically converted to JSON format e.g.
Map<String, Object> jsonMap = new HashMap();
jsonMap.put("Rcvr", 1);
jsonMap.put("HasSig", false);
...
  • XContentBuilder object, e.g.
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("Rcvr", 1);
    builder.field("HasSig", false);
}
builder.endObject();
  • Object key-pairs, which get converted to JSON format, e.g.
IndexRequest indexRequest = new IndexRequest("flight")
    .id("4800770")
    .source("Rcvr", 1,
        "HasSig", false,
        ...);

In the asynchronous request, users need to specify how the response or potential failures will be handled by passing the request and a listener to the asynchronous index method:

client.bulkAsync(request, RequestOptions.DEFAULT, listener); 

The asynchronous method does not block and returns immediately. Once it is completed the ActionListener is called back using the onResponse method if the execution successfully completed or using the onFailure method if it failed. Failure scenarios and expected exceptions are the same as in the synchronous execution case.

ActionListener listener = new ActionListener() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

The returned BulkResponse contains information about the executed operations and allows to iterate over each result as follows:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    }
}

In case of error:

if (bulkResponse.hasFailures()) { 
  for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure = 
                bulkItemResponse.getFailure(); 
    }
  }
}

Let’s see how we can import 2016-07-01-1300Z.json to ElasticSearch:

package com.jcg;

...

public class ElasticSearchHighLevelRestClient {
    private static final Path flightDataJsonFile =
            Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
    private static final RestHighLevelClient restClient = 
            new RestHighLevelClient(
              RestClient.builder(new HttpHost("localhost", 9200, "http")));

    public static void main(String[] args) {
        BulkRequest request = addDocumentsToIndex("flight");
        try {
           restClient.bulk(request, RequestOptions.DEFAULT);
           restClient.close();
        } catch (IOException e) {
           System.err.println(e.getStackTrace());
        }
    }

   …
}

The method parseJsonFile(), which we show below, adds the flight JSON objects to a bulk request which is then passed to the synchronous bulk() method to be posted to ElasticSearch.

@SuppressWarnings("unchecked")
private static BulkRequest addDocumentsToIndex(final String index) {
    final Gson gson = new Gson();
    BulkRequest request = new BulkRequest(index);
    try (BufferedReader reader = Files.newBufferedReader(flightDataJsonFile)) {
        Map<String, Object> map = gson.fromJson(reader,
                new TypeToken<Map<String, Object>>() {}.getType());
        List<Object> acList = (List<Object>) (map.get("acList"));
        for (Object item : acList) {
            LinkedTreeMap<String, Object> flight = 
                (LinkedTreeMap<String, Object>) item;
            final long id = 
                     Double.valueOf(flight.remove("Id").toString()).longValue();
            request.add(new IndexRequest("flight")
                    .id(String.valueOf(id))
                    .source(flight));
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    return request;
}

We have seen most of this code before. The difference here is that we add each LinkedTreeMap<String, Object> to a BulkRequest object. We also make sure to add the Id to the IndexRequest but to remove it from the actual source data. Gson parses the Id as double, so we convert it back to long.

Issue the following command to Kibana:

GET /flight/_search

to verify that there are now 5900 documents stored under index flight. Or use the Count API:

private static long countDocuments(String index) {
    long numOfDocuments = 0;
    CountRequest countRequest = new CountRequest(index);
    countRequest.query(QueryBuilders.matchAllQuery());
    try {
        CountResponse countResponse = restClient
                .count(countRequest, RequestOptions.DEFAULT);
        numOfDocuments = countResponse.getCount();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return numOfDocuments;
}

which you can call from the main() method like so:

try {
     restClient.bulk(request, RequestOptions.DEFAULT);
     System.out.println("/flight/_count:" + countDocuments("flight"));
     restClient.close();
} catch (IOException e) {
     System.err.println(e.getStackTrace());
}

It is left as an exercise to modify the above class in order to import all the .json files in the directory.

Using the bulkAsync() method instead of bulk(), requires to wait until all the documents are imported to ElasticSearch, otherwise you end up calling close() before the process finishes. You can do that by calling countDocuments() periodically checking if the expected total number of documents have been indexed, and when done, then you call restClient.close(). Or you can simply wait for some time before you close the connection.

3.1.3 BulkProcessor

The BulkProcessor simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently executed as they are added to the processor.

In order to execute the requests, the BulkProcessor requires the following components:

  • RestHighLevelClient to execute the BulkRequest and to retrieve the BulkResponse
  • BulkProcessor.Listener which is called before and after every BulkRequest execution or when a BulkRequest failed

Then the BulkProcessor.builder method can be used to build a new BulkProcessor:

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions();
	  System.out.printf("Executing bulk [%d] with %d requests\n",
          executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) {
            System.err.printf("Bulk [%d] executed with failures\n", executionId);
        } else {
            System.err.printf("Bulk [%d] completed in %d milliseconds\n",
              executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        System.err.println("Failed to execute bulk process: " + failure + "\n");
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();

The BulkProcessor.Builder provides methods to configure how the BulkProcessor should handle requests execution:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

Once the BulkProcessor is created requests can be added to it:

for (Object item : acList) {
    LinkedTreeMap<String, Object> flight = (LinkedTreeMap<String, Object>) item;
    final long id = Double.valueOf(flight.remove("Id").toString()).longValue();
    bulkProcessor.add(new IndexRequest("flight")
            .id(String.valueOf(id))
            .source(flight));
}

The requests will be executed by the BulkProcessor, which takes care of calling the BulkProcessor.Listener for every bulk request. Once all requests have been added to the BulkProcessor, its instance needs to be closed using one of the two available closing methods. The awaitClose() method can be used to wait until all requests have been processed or the specified waiting time elapses:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

The output says everything:

Executing bulk [1] with 1000 requests
Executing bulk [2] with 1000 requests
Bulk [1] completed in 1053 milliseconds
Executing bulk [3] with 1000 requests
Bulk [2] completed in 653 milliseconds
Executing bulk [4] with 1000 requests
Bulk [3] completed in 740 milliseconds
Executing bulk [5] with 1000 requests
Bulk [4] completed in 686 milliseconds
Executing bulk [6] with 900 requests
Bulk [5] completed in 654 milliseconds
Bulk [6] completed in 526 milliseconds
/flight/_count:5900

4. Summary

In this article we focused on how to import the .json flight data batch files to ElasticSearch. We saw how to do that in two ways:

  • using ElasticSearch’s bulk API, and
  • using a JSON library to parse the .json files and inserting the documents to ElasticSearch using its REST APIs:
    • the low-level and
    • the high-level (which is the recommended way).

It is a matter of taste what you wish to use.

Please keep in mind that the code is not production ready in order to highlight the important stuff.

You can now revisit the previous article and re-run the various search queries to evaluate the results with the real data.

5. References

  1. Aggarwal S. (2018), “ElasticSearch Tutorial for Beginners”, JavaCodeGeeks.
  2. Toshev M. (2020), “ElasticSearch SQL”, JavaCodeGeeks.
  3. Wong W. T. (2019), Advanced ElasticSearch 7.0, Packt.

6. Download the source code

That was the third part of the article series: Processing real-time data with Storm, Kafka and ElasticSearch.

Download
You can download the full source code here: Processing real-time data with Storm, Kafka and ElasticSearch – Part 3

Ioannis Kostaras

Software architect awarded the 2012 Duke's Choice Community Choice Award and co-organizing the hottest Java conference on earth, JCrete.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button