About Pascal Alma

Pascal is a senior JEE Developer and Architect at 4Synergy in The Netherlands. Pascal has been designing and building J2EE applications since 2001. He is particularly interested in Open Source toolstack (Mule, Spring Framework, JBoss) and technologies like Web Services, SOA and Cloud technologies. Specialties: JEE, SOA, Mule ESB, Maven, Cloud Technology, Amazon AWS.

Writing a Hadoop MapReduce task in Java

Although Hadoop Framework itself is created with Java the MapReduce jobs can be written in many different languages. In this post I show how to create a MapReduce job in Java based on a Maven project like any other Java project.
 
 
 
 
 
 
 
 

    • Prepare the example input

Lets start with a fictional business case. In this case we need a CSV file with English words from a dictionary and all translations in other languages added to it, separated by a ‘|’ symbol. I have based this example on this post. So the job will read dictionaries of different languages and match each English word with a translation in another language. The input dictionaries for the job is taken from here. I downloaded a few files in different languages and put them together in one file (Hadoop is better to process one large file than multiple small ones). My example file can be found here.

    • Create the Java MapReduce project

Next step is creating the Java code for the MapReduce job. Like I said before I use a Maven project for this so I created a new empty Maven project in my IDE, IntelliJ. I modified the default pom to add the necessary plugins and dependencies:
The dependency I added:

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.0</version>
   <scope>provided</scope>
</dependency>

The Hadoop dependency is necessary to make use of the Hadoop classes in my MapReduce job. Since I want to run the job on AWS EMR I make sure I have a matching Hadoop version. Furthermore the scope can be set to ‘provided’ since the Hadoop framework will be available on the Hadoop cluster.

Beside the dependency I added the following two plugins to the pom.xml:

<plugins>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <configuration>
      <archive>
        <manifest>
          <addClasspath>true</addClasspath>
          <mainClass>net.pascalalma.hadoop.Dictionary</mainClass>
        </manifest>
      </archive>
    </configuration>
  </plugin>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
      <source>1.6</source>
      <target>1.6</target>
    </configuration>
  </plugin>
</plugins>

The first plugin is used to create an executable jar of our project. This makes the running of the JAR on the Hadoop cluster easier since we don’t have to state the main class.

The second plugin is necessary to make the created JAR compatible with the instances of the AWS EMR cluster. This AWS cluster comes with a JDK 1.6. If you omit this one the cluster will fail (I got a message like ‘Unsupported major.minor version 51.0′). I will show later in another post how to setup this AWS EMR cluster.

That is the basic project, just like a regular Java project. Lets implement the MapReduce jobs next.

    • Implement the MapReduce classes

I have described the functionality that we want to perform in the first step. To achieve this I created three Java classes in my Hadoop project. The first class is the ‘Mapper‘:

package net.pascalalma.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 16-07-13
 * Time: 12:07
 */
public class WordMapper extends Mapper<Text,Text,Text,Text> {

    private Text word = new Text();

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException
    {
        StringTokenizer itr = new StringTokenizer(value.toString(),",");
        while (itr.hasMoreTokens())
        {
            word.set(itr.nextToken());
            context.write(key, word);
        }
    }
}

This class isn’t very complicated. It just receives a row from the input file and creates a Map of it in which each key will have one value (and multiple keys are allowed at this stage).

The next class is the ‘Reducer‘ which reduces the map to the wanted output:

package net.pascalalma.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 17-07-13
 * Time: 19:50
 */
public class AllTranslationsReducer extends Reducer<Text, Text, Text, Text> {

    private Text result = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String translations = "";

        for (Text val : values) {
            translations += "|" + val.toString();
        }

        result.set(translations);
        context.write(key, result);
    }
}

This Reduce steps collects all values for a given key and put them after each other separated with a ‘|’ symbol.

The final class left is the one that is putting it all together to make it a runnable job:

package net.pascalalma.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 16-07-13
 * Time: 12:07
 */
public class Dictionary {

    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "dictionary");
        job.setJarByClass(Dictionary.class);
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(AllTranslationsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

In this main method we put together a Job and run it. Please note that I simply expect the args[0] and args[1] to be the name of the input file and output directory (non existing). I didn’t add any check for this. Here is my ‘Run Configuration’ in IntelliJ:

screen-shot-2013-08-15-at-21-36-35

Just make sure the output directory is not existing at the time you run the class. The logging output created by the job looks like this:

2013-08-15 21:37:00.595 java[73982:1c03] Unable to load realm info from SCDynamicStore
aug 15, 2013 9:37:01 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
aug 15, 2013 9:37:01 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_local_0001
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 0% reduce 0%
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 1 sorted segments
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 1 segments left of total size: 524410 bytes
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /Users/pascal/output
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 0%
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 100%
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Counters: 17
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   File Output Format Counters 
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Written=423039
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   FileSystemCounters
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_READ=1464626
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_WRITTEN=1537251
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   File Input Format Counters 
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Read=469941
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   Map-Reduce Framework
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input groups=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output materialized bytes=524414
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine output records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map input records=20487
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce shuffle bytes=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce output records=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Spilled Records=43234
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output bytes=481174
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Total committed heap usage (bytes)=362676224
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine input records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output records=21617
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     SPLIT_RAW_BYTES=108
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input records=21617

Process finished with exit code 0

The output file created by this job can be found in the supplied output directory as can be seen in the next screenshot:

screen-shot-2013-08-15-at-21-42-49

As you have seen we can run this main method in an IDE (or from the command line) but I would like to see some unit tests performed on the Mapper and Reducer before we go there. I will show this in another post how to do that.
 

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

2 Responses to "Writing a Hadoop MapReduce task in Java"

  1. Maffy Davison says:

    Tried to run the job as described – in intellij with the config (paths changed, obviously, but otherwise the same).

    The run doesn’t seem to be able to find the configuration class…

    Is there a dependency I’m missing in the pom? I just added the one hadoop dependency per the instructions.

    Here’s what I get:

    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
    at com.maffy.example.mapreduce.Dictionary.main(Dictionary.java:23)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    … 6 more

    • Silviu BURCEA says:

      I think you are using a different version of Hadoop or you don’t have the hadoop-common in your POM/classpath.

Leave a Reply


three − 2 =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close