Home » Software Development » Monitoring Real-Time Uber Data Using Spark Machine Learning, Streaming, and the Kafka API (Part 2)

About Carol Mcdonald

Carol Mcdonald

Monitoring Real-Time Uber Data Using Spark Machine Learning, Streaming, and the Kafka API (Part 2)

This post is the second part in a series where we will build a real-time example for analysis and monitoring of Uber car GPS trip data. If you have not already read the first part of this series, you should read that first.

The first post discussed creating a machine learning model using Apache Spark’s K-means algorithm to cluster Uber data based on location. This second post will discuss using the saved K-means model with streaming data to do real-time analysis of where and when Uber cars are clustered.
 
 
 
 
 

Example Use Case: Real-Time Analysis of Geographically Clustered Vehicles/Items

Uber trip data is published to a MapR Streams topic using the Kafka API. A Spark streaming application, subscribed to the topic, enriches the data with the cluster Id corresponding to the location using a k-means model, and publishes the results in JSON format to another topic. A Spark streaming application subscribed to the second topic analyzes the JSON messages in real time.

Example Use Case Data

The example data set is Uber trip data, which you can read more about in part 1 of this series. The incoming Data Records are in CSV format.

An example line is shown below:

​​The enriched Data Records are in JSON format. An example line is shown below:

Spark Kafka Consumer Producer Code

Parsing the Data Set Records

A Scala Uber case class defines the schema corresponding to the CSV records.  The parseUber function parses the comma separated values into the Uber case class.

Loading the K-Means Model

The Spark KMeansModel class is used to load the saved K-means model fitted on the historical Uber trip data.

Output of model clusterCenters:

Below the cluster centers are displayed on a google map:

Spark Streaming Code

These are the basic steps for the Spark Streaming Consumer Producer code:

  1. Configure Kafka Consumer Producer properties.
  2. Initialize a Spark StreamingContext object. Using this context, create a DStream which reads message from a Topic.
  3. Apply transformations (which create new DStreams).
  4. Write messages from the transformed DStream to a Topic.
  5. Start receiving data and processing. Wait for the processing to be stopped.

We will go through each of these steps with the example application code.

  1. Configure Kafka Consumer Producer properties

The first step is to set the KafkaConsumer and KafkaProducer configuration properties, which will be used later to create a DStream for receiving/sending messages to topics. You need to set the following paramters:

  • Key and value deserializers: for deserializing the message.
  • Auto offset reset: to start reading from the earliest or latest message.
  • Bootstrap servers: this can be set to a dummy host:port since the broker address is not actually used by MapR Streams.

For more information on the configuration parameters, see the MapR Streams documentation.

  1. Initialize a Spark StreamingContext object.

ConsumerStrategies.Subscribe, as shown below, is used to set the topics  and Kafka configuration parameters. We use the KafkaUtils createDirectStream method with a StreamingContext, the consumer and location strategies, to create an input stream from a MapR Streams topic. This creates a DStream that represents the stream of incoming data, where each message is a key value pair. We use the DStream map transformation to create a DStream with the message values.

  1. Apply transformations (which create new DStreams)

We use the DStream foreachRDD method to apply processing to each RDD in this DStream. We parse the message values into Uber objects, with the map operation on the DStream. Then we convert the RDD to a DataFrame, which allows you to use DataFrames and SQL operations on streaming data.

Here is example output from the df.show:

A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.

Then the model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster predictions.

The output of categories.show is below:

The DataFrame is then registered as a table so that it can be used in SQL statements. The output of the SQL query is shown below:

  1. Write messages from the transformed DStream to a Topic

The Dataset result of the query is converted to JSON RDD Strings, then the RDD sendToKafka method is used to send the JSON key-value messages to a topic (the key is null in this case).

Example message values (the output for temp.take(2) ) are  shown below:

{“dt”:”2014-08-01 00:00:00″,”lat”:40.729,”lon”:-73.9422,”base”:”B02598″,”cluster”:7}

{“dt”:”2014-08-01 00:00:00″,”lat”:40.7406,”lon”:-73.9902,”base”:”B02598″,”cluster”:7}

  1. Start receiving data and processing it. Wait for the processing to be stopped.

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

Spark Kafka Consumer Code

Next, we will go over some of the Spark streaming code which consumes the JSON-enriched messages.

We specify the schema with a Spark Structype:

Below is the code for:

  • Creating a Direct Kafka Stream
  • Converting the JSON message values to Dataset[Row] using spark.read.json with the schema
  • Creating two temporary views for subsequent SQL queries
  • Using ssc.remember to cache data for queries

Now we can query the streaming data to ask questions like: which hours had the highest number of pickups? (Output is shown in a Zeppelin notebook):

spark.sql(“SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)”)

How many pickups occurred in each cluster?

df.groupBy(“cluster”).count().show()

or

spark.sql(“select cluster, count(cluster) as count from uber group by cluster”)

Which hours of the day and which cluster had the highest number of pickups?

spark.sql(“SELECT hour(uber.dt) as hr,count(cluster) as ct FROM uber group By hour(uber.dt)”)

Display datetime and cluster counts for Uber trips:

%sql select cluster, dt, count(cluster) as count from uber group by dt, cluster  order by dt, cluster

Software

Summary

In this blog post, you learned how to use a Spark machine learning model in a Spark Streaming application, and how to integrate Spark Streaming with MapR Streams to consume and produce messages using the Kafka API.

References and More Information:

Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
Email address:

Leave a Reply

Be the First to Comment!

Notify of
avatar
wpDiscuz