Hadoop Tutorial Series, Issue #4: To Use Or Not To Use A Combiner

combinerWelcome to the fourth issue of the Hadoop Tutorial Series. Combiners are another important Hadoop’s feature that every hadoop developer should be aware of. The primary goal of combiners is to optimize/minimize the number of key value pairs that will be shuffled accross the network between mappers and reducers and thus to save as most bandwidth as possible.

Indeed, to give you the intuition of why combiner helps reducing the number of data sent to the reducers, imagine the word count example on a text containing one million times the word “the”. Without combiner the mapper will send one million key/value pairs of the form <the,1>. With combiners, it will potentially send much less key/value pairs of the form <the,N> with N a number potentially much bigger than 1. That’s just the intuition (see the references at the end of the post for more details).

Simply speaking a combiner can be considered as a “mini reducer” that will be applied potentially several times still during the map phase before to send the new (hopefully reduced) set of key/value pairs to the reducer(s). This is why a combiner must implement the Reducer interface (or extend the Reducer class as of hadoop 0.20).

In general you can even use the same reducer method as both your reducer and your combiner. This is the case for the word count example where using a combiner remains to add a single line of code in your main method:

conf.setCombinerClass(Reduce.class);

where conf is your JobConf, or, if you use hadoop 0.20.1:

job.setCombinerClass(Reduce.class);

where job is your Job built with a customized Configuration.

That sounds pretty simple and useful and at first look you would be ready to use combiners all the time by adding this simple line, but there is a small catch. The first kind of reducers that comes naturally as a counter example of using combiner is the “mean reducer” that computes the mean of all the values associated with an given key.

Indeed, suppose 5 key/value pairs emitted from the mapper for a given key k: <k,40>, <k,30>, <k,20>, <k,2>, <k,8>. Without combiner, when the reducer will receive the list <k,{40,30,20,2,8}>, the mean output will be 20, but if a combiner were applied before on the two sets (<k,40>, <k,30>, <k,20>) and (<k,2>, <k,8>) separately, then the reducer would have received the list <k,{30,5}> and the output would have been different (17.5) which is an unexpected behavior.

More generally, combiners can be used when the function you want to apply is both commutative and associative (that’s pretty intuitive to understand why). That’s the case for the addition function, this is why the word count example can benefit from combiners but not for the mean function (which is not associative as shown in the counter example above).

Note that for the mean function you can use a workaround for using combiners by using two separate reduce methods, a first one that would be used as the addition function (and thus that can be set as the combiner) that would emit the intermediate sum as the key and the number of addition involved as the value, and a second reduce function that would compute the mean by taking into account the number of addition involved (see the references for more details on that).

As usual in this series, let’s observe the lesson learned in action using our learning playground. For that you can use the original word count example (or its hadoop 0.20.1 version that we used in the previous issue), add it the single combine line as specified earlier in the post and run it on our moby-dick mascot. Here what we can see at the end of the execution:

combiner

Output of the word count example when using a combiner. Click to enlarge.

Now that you understand what counters are, if you click to enlarge the picture, you’ll see the value of two counters: Combine input records=215137 and Combine output records=33783. That’s a pretty serious reduction of the number of key/value pairs to send to the reducers. You can easily imagine the impact for much larger jobs (see the reference below for real numbers).

Enjoy combiners, whenever you can…

References

  • See the 4th tip of this must read blog post by Todd Lipcon for feeling better the benefit of combiners on a 40GB wordcount job benchmark.
  • For a deeper understanding of when and how combiners are used in the mapReduce data flow, check this section of the (quiet heavy but) excellent Yahoo! hadoop tutorial.
  • To extend the intuition given in the post on why combiners help, you can go over this walk-through.
  • Both Hadoop the definitive guide and Hadoop in Action contains interesting information on combiners (part of both of them inspired this post). In particular the first contains a great section on when exactly the combiners comes into play in the mapReduce data flow. The second contains a full code of the mean function workaround mentioned above.

Hadoop Tutorial Series, Issue #3: Counters In Action

Note: This post has been updated with a code working for hadoop 0.20.1.

In this 3rd issue of the hadoop tutorial series, we’ll speak about a very simple but very useful hadoop’s feature: counters.

Even if you have never defined any counters in hadoop, you can see some of them each time you are running an hadoop job. Indeed, here is what you can see from the client console at the end of the execution of a job (can also be seen from the web interface):

counters

Hadoop internal counters at the end of a job (Click to enlarge).

As you can see, 18 internal counters are presented inside different groups. For instance, you can see a section “Job Counters” with three different counters giving basic information about the job like the number of mappers and reducers. In that example, “Job Counters” is called the group of the counter and “Launched reduce tasks” (for instance) is properly the name of the counter.

It is very handy to define your own counters to track any kind of statistics about the records you are manipulating in the mapper and the reducer. The most natural use of that is to use counters to track the number of malformed records.

If you are executing a job  and you see an abnormally high number of malformed records, it can give a good hint that you perhaps have a bug in your code or some problem with your data (note this is actually a much simpler way to spot issues than tracking error messages in a distributed set of log files). But you can actually use counters for any kind of other statistics on your records.

One easy way to define your own counters from your Java code is:

  • Declaring an enum representing your counters. The enum name is the group of the counter, and each field of the enum is the name of the counter that will be reported in this same group
  • Incrementing the desired counters from your map and reduce methods through the Context of your mapper or reducer (in previous hadoop version it was through the Reporter.incrCounter() method, but the reporter no longer exists in hadoop 0.20)

So let’s see an example. We’ll take the word count example revised for version 0.20.1 to illustrate the use of counters. We will create a counter group called WordsNature that will count how many unique tokens there is in all, how many unique tokens starts with a digit and how many unique tokens starts with a letter.

So our enum declaration will look like that:

 static enum WordsNature { STARTS_WITH_DIGIT, STARTS_WITH_LETTER, ALL }

We will also need a very basic StringUtils class:

package com.philippeadjiman.hadooptraining;
 
public class StringUtils {
 
	public static boolean startsWithDigit(String s){
		if( s == null || s.length() == 0 )
			return false;
 
		return Character.isDigit(s.charAt(0));
	}
 
	public static boolean startsWithLetter(String s){
		if( s == null || s.length() == 0 )
			return false;
 
		return Character.isLetter(s.charAt(0));
	}
 
}

Since we are interested in unique tokens, we will put the code related with the counter into the reduce method. So here how the reduce method will look like:

public void reduce(Text key, Iterable values, Context context)
	throws IOException, InterruptedException {
 
	int sum = 0;
	String token = key.toString();
	if( StringUtils.startsWithDigit(token) ){
		context.getCounter(WordsNature.STARTS_WITH_DIGIT).increment(1);
	}
	else if( StringUtils.startsWithLetter(token) ){
		context.getCounter(WordsNature.STARTS_WITH_LETTER).increment(1);
	}
	context.getCounter(WordsNature.ALL).increment(1);
	for (IntWritable value : values) {
		sum += value.get();
	}
	context.write(key, new IntWritable(sum));
}

Here is the code of the WordCountWithCounter that include this code.

If you want to run it inside our learning playground you’ll just have to update the pom with hadoop latest version:

<dependency>
<groupId>org.apache.mahout.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.1</version>
</dependency>

So here is the result after running the code with, as input, the whole text of moby dick:

jobResultsWithCounters

We can now see our home made counters. (Click to enlarge)

So we can see now that we have 33783 unique tokens, 32511 starting with a letter and 263 starting with a digit. What about the 1009 others?? Well, because the word count example use a basic StringTokenizer that splits tokens at spaces, a lot of words simply starts with a ‘(‘ or with a ‘[‘ and even with ‘–’. To solve that you can for instance use a lucene StandardAnalyzer.

You should now be able to easily implements your own counters for tracking bad records/missing values, debugging or gathering any kind of statistics around your job.

See you soon for another issue…

Hadoop Tutorial Series, Issue #2: Getting Started With (Customized) Partitioning

In the Issue #1 of this series, we set up the “learning playground” (based on the Cloudera Virtual Machine) in order to enjoy hands-on learning experiences around Hadoop. In this issue, we’ll use our playground to investigate the partitioning features offered by Hadoop.

What is it all about?

As you may know, a map/reduce job will contains most of the time more than  1 reducer.  So basically, when a mapper emits a key value pair, it has to be sent to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning (the key-value pairs space is partitioned among the reducers). A Partitioner is responsible to perform the partitioning.

In Hadoop, the default partitioner is HashPartitioner, which hashes a record’s key to determine which partition (and thus which reducer) the record belongs in.The number of partition is then equal to the number of reduce tasks for the job.

Why is it important?

First, it has a direct impact on the overall performance of your job: a poorly designed partitioning function will not evenly distributes the charge over the reducers, potentially loosing all the interest of the map/reduce distributed infrastructure.

Second, it maybe sometimes necessary to control the key/value pairs partitioning over the reducers. Let’s illustrate it on a simple example. Suppose that your job’s input is a (huge) set of tokens and their number of occurrences (for instance the output of the canonical word count hadoop example) and that you want to sort them by number of occurrences. Let’s also suppose that your job will be handled by 2 reducers. If you run your job without using any customized partitioner, you’ll get something like this:

partialSortOn2Reducers

(Click to enlarge)

As you can see, the tokens are correctly ordered by number of occurrences on each reducer (which is what hadoop guarantees by default) but this is not what you need! You’d rather expect something like:

TotalSortOn2Reducers

(Click to enlarge)

where tokens are totally ordered over the reducers, from 1 to 30 occurrences on the first reducer and from 31 to 14620 on the second. This would happen as a result of a correct partitioning function: all the tokens having a number of occurrences inferior to N (here 30) are sent  to reducer 1 and the others are sent to reducer 2, resulting in two partitions. Since the tokens are sorted on each partition, you get the expected total order on the number of occurrences.

Below, we’ll use our playground to observe the issue happening  on real data and see how we solve it using customized partitioners.

Also, as a second example of use of customized partitioning functions, let’s cite the original map/reduce google paper: “sometimes the output keys are URLs, and we want all entries for a single host to end up in the same output. To support situations like this, the user of the MapReduce library can provide a special partitioning function. For example, using “hash(Hostname(urlkey)) mod R” as the partitioning function causes all URLs from the same host to end up in the same output”.

Feeling the partitions in our playground

If your playground is not yet set up, check the Issue #1 of this series. As an input for our job, we’ll use a tsv file containing the list of tokens and their number of occurrences extracted from (once again) the full moby dick text. Click here to download this input. You’ll notice that the pairs (tokens, #occurrences) are alphanumerically sorted on tokens value.

First, we’ll use a very simple pre-processing job to transform the input data into a more convenient format to use within hadoop: the Sequence File Output Format. Sequence files are a basic file based data structure persisting the key/value pairs in a binary format and allowing you to interact more easily with basic hadoop data types (e.g IntWritable, LongWritable, etc…). Here is the simple pre-processing job:

package com.philippeadjiman.hadooptraining;
 
import java.io.IOException;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
public class SortDataPreprocessor {
 
	static class PreprocessorMapper extends MapReduceBase implements Mapper {
 
		private Text word = new Text();
 
		public void map(LongWritable key, Text value,
				OutputCollector output, Reporter reporter) throws IOException {
			String line = value.toString();
			String[] tokens = line.split("t");
			if( tokens == null || tokens.length != 2 ){
				System.err.print("Problem with input line: "+line+"n");
				return;
			}
			int nbOccurences = Integer.parseInt(tokens[1]);
			word.set(tokens[0]);
			output.collect(new IntWritable(nbOccurences),word );
		}
	}
 
	public static void main(String[] args) throws IOException {
		JobConf conf = new JobConf(SortDataPreprocessor.class);
 
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
 
		conf.setMapperClass(PreprocessorMapper.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(Text.class);
		conf.setNumReduceTasks(0);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		JobClient.runJob(conf);
	}
}

You’ll notice that:

  • it contains only a mapper (no reducer),
  • a basic error management is performed for potential malformed lines,
  • the output key is the number of occurrences (as an IntWritable) and the output value is the associated token,
  • The sequence file output format is specified using setOutputFormat(SequenceFileOutputFormat.class);

To run it, package the job using maven (see Issue #1), put the input file on hdfs in an input directory (let’s call it input) and execute:

hadoop jar playing-with-partitions.jar com.philippeadjiman.hadooptraining.SortDataPreprocessor /user/training/input /user/training/pre_process

This will create a directory called “pre_process” on hdfs containing a set of pairs (#occurrences,token), respectively of format IntWritable and Text, in a SequenceFileOutputFormat.

Now we can, perform the sort based on this new input. Writing a job for such a task is actually trivial since this is primarily what hadoop is doing by default, so here it is:

package com.philippeadjiman.hadooptraining;
 
import java.io.IOException;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
 
public class SortExample {
	public static void main(String[] args) throws IOException {
		JobConf conf = new JobConf(SortExample.class);
		conf.setJobName("sortexample");
 
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
 
		conf.setInputFormat(SequenceFileInputFormat.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(Text.class);
 
		conf.setNumReduceTasks(2);
 
		JobClient.runJob(conf);
	}
}

You’ll notice that:

  • There is neither map nor reduce methods! This is because sorting is a default behavior so we don’t have to do anything (we’re just interested here to see how it’ll be partitioned),
  • The input/output formats are specified based on the output of our pre-processing job,
  • We explicitly set the number of reducer to 2, which is the important part here since we want to observe how the output will be partitioned (without specifying it, the output will be generated using only one reducer).

Just run it using:

hadoop jar playing-with-partitions.jar com.philippeadjiman.hadooptraining.SortExample /user/training/pre_process /user/training/output

Once completed, an output directory will be created on hdfs with two files, one for each reducer that were used. You can observe the content of the output using commands like:

hadoop fs -cat output/part-00000 | less
hadoop fs -cat output/part-00001 | less

As you’ll see, the two outputs are sorted but do not represent a total order, as explained above. Let’s fix it.

How to implement your own partitioning function

So how do we create a total order out of those two reducers?

A first solution is to create our own partitionner which is as simple as implementing the Partitioner<K,V> interface:

package com.philippeadjiman.hadooptraining;
 
package com.philippeadjiman.hadooptraining;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
 
public class MyPartitioner implements Partitioner<IntWritable,Text> {
	@Override
	public int getPartition(IntWritable key, Text value, int numPartitions) {
		/* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
		int nbOccurences = key.get();
 
		if( nbOccurences < 3 )
			return 0;
		else
			return 1;
	}
 
	@Override
	public void configure(JobConf arg0) {
 
	}
 
}

This implementation of getPartition specifies to put all the pairs having a key (which is here the number of occurrences) being less than 3 into the first partition and the other into the second one. This is of course a pretty bad practice to hard code like that a partitioning function but this is simply for the sake of understanding.

To use this created partition just add the following line to the main method of the previous SortExample class:

conf.setPartitionerClass(MyPartitioner.class);

Why did I choose 3? Because as a side effect of the Zipf law, the number of tokens having a number of occurrences of 1 and 2 will be as much as big that all the others together! (to see a Zipf Law in action, check this post). So 3 was chosen just for balancing a little bit the partitions.

Re-package the code with the customized partition, remove the old output, run it again and check that our problem is solved: there is a total order over the partitions.

How to automatically find “good” partitioning function using sampling

Now, as I mentioned above, it is a pretty bad practice to hard code how to partition the keys. But on the other hand, how would I know automatically and in advance how to divide the partition in the general case?

Hadoop provide a nice way to approximate a priori a good partitioning function using an InputSampler. For instance, a RandomSampler, will sample the input at random to estimate what is the best way to partition. The sampler will wrote into a file called, by default, _patition.lst describing the partition that the job will automatically use to decide which key/value pairs to send to which reducers. This mechanism has to be used in combination with a TotalOrderPartitioner.

Here is a code sample using such a sampler with a total order partitioner:

InputSampler.Sampler<IntWritable, Text> sampler =
	new InputSampler.RandomSampler<IntWritable, Text>(0.1, 100);
InputSampler.writePartitionFile(conf, sampler);
conf.setPartitionerClass(TotalOrderPartitioner.class);

Sometimes, there are some issues with the file _partition.lst that is not found. It always worked for me when I specified explicitly where to find the file using the TotalOrderPartitioner.setPartitionFile(); method. Also pay attention to invoke this method before the call to writePartitionFile. Also, note that the sampling mechanism is necessary since considering all the input to compute the partition would be inefficient for large files.

Some remarks

  • A customized partitioning would not have been necessary if we had only one reducer since all the key/value pairs would have ended into the same output file. It is easy to understand that such a constraint is a nonsense and that using more than one reducer is most of the time necessary, else the map/reduce concept would not be very useful…
  • Even if we used a small dataset on the semi-distributed infrastructure of the cloudera virtual machine to observe partitioning in action and to learn how to customize it, the same concepts can be applied to a larger infrastructure. To see a very interesting use case of customized partitioning strategy for sorting purpose on a big infrastructure, check the famous TeraByte sort on hadoop.

Conclusion

Partitioning in map/reduce is a fairly simple concept but that is important to get correctly. Most of the time, the default partitioning based on an hash function can be sufficient. But as we illustrated in this Issue, you’ll need sometime to modify the default behavior and to customize your own partitioning suited for your needs.

If you have some questions, or if you have experimented other use cases of customized partitioning in your application, please comment/share. See you soon for another Issue.

Hadoop Tutorial Series, Issue #1: Setting Up Your MapReduce Learning Playground

hadoop-logo

Update: Instructions updated for hadoop 0.20.2.
This is the first post of a series of small hadoop tutorials introducing progressively core hadoop functionnalities. You might be interested in that series if you recognized yourself in one or more of the following points :

  • You’ve heard about the basics of MapReduce (else check the links that I recommend at the end of this post)
  • Even if you’re not working at Google, Yahoo, Facebook (or many others) for now, you know it’s been years that MapReduce/Hadoop has become a must-have skill and that you should practice it but you have very few time
  • You did try to read some tutorials but it was always either not hands-on enough or too much detailed

This first post is dedicated to build what I called a “MapReduce Learning Playground”: for practice or for a real need, you read or wrote  on a sheet of paper the map and reduce functions that might solve a particular problem and you want to see it in action, not necessarily on huge data sets, just check that it computes the correct answer.

A lot of material can be found on the internet to do the same. The steps below are my attempt to present the best part of all the training material that I read on the subject, adapt it, adding it some glue (here with maven) and compile the whole into something that I hope will save you some time.

Step 1: Install the cloudera training virtual machine

Cloudera is really doing a great job at providing training material for hadoop. The most useful one in my opinion is their hadoop training virtual machine. Update: they changed a lot of things since that post was written, here is a better link for a training virtual machine (Thanks Karthick for letting me know that the old link was broken ). It provides a VMWare image of a Linux Ubuntu distribution with a pre-installed hadoop cluster in Pseudo-Distributed Mode.

To install the VM on your computer, just follow their instructions, it is free (except if you’re on Mac) and very easy. The VM comes also with hadoop related tools already installed like hive and pig (it will probably be the subject of other posts).

At the end of the installation, open the VMWare Player, start the cloudera VM (with training/training as user/pass) and you should get something like this:

cloudera-hadoop-vm-training

The cloudera hadoop virtual machine for training (click to enlarge)

Step 2: Creating an “hadoop ready” project with maven

Cloudera does provide some training projects already mounted in the eclipse installed in the VM but those projects contains several small errors (like missing dependencies). Even if those are very easily fixed, I describe here how to build your own project from scratch; it will give you a better basis in case you want to extend them and you’ll always be able to copy-paste the map-reduce functions of some interesting cloudera training projects into your own ones.

First install maven on the VM. Open a terminal from the VM and type:

sudo apt-get install maven2

If for some reasons you run into trouble with the installation of maven, you can always download it directly from here. Assuming you are unzipping it into the /user/local/apache-maven directory, you can add those lines into your /home/training/.bashrc configuration file:

export M2_HOME=/usr/local/apache-maven
export M2=$M2_HOME/bin
export PATH=$M2:$PATH

Then from the same terminal go into the workspace directory (usually located at ~/workspace) and create a java project hierarchy using the following maven command (change the groupId and the artifactId as you like):

mvn archetype:create -DarchetypeGroupId=org.apache.maven.archetypes  -DgroupId=com.philippeadjiman.hadooptraining -DartifactId=hadoop-first-example

Then enter into the hadoop-first-example directory and generate the necessary files for eclipse:

mvn eclipse:eclipse

Then open eclipse from the VM then File -> Import -> Existing Projects into Workspace -> Browse, choose the hadoop-first-example directory, OK -> Finish. Then you should see your project on the left side.

You may have an error on a M2_REPO unresolved variable, that’s OK, it’s because it is the first time that this eclipse use a maven project. To fix it, just right click on your project -> Build Path -> Configure Build Path -> Add Variable -> Configure Variables -> New. In the name type M2_REPO and in the path type /home/training/.m2/repository (just check that this directory exists).

Then you’ll have to add the hadoop jar dependency. To do so, you just have to open you pom.xml file (you’ll see it at the bottom of your project) and the following dependency (add it just before the </dependencies> closing tag ):

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-core</artifactId>

<version>0.20.2</version>

</dependency>

You can check here if there is a newer version. Note also that if you plan to use hadoop to run with a specific framework built on top of it then make sure you’re using the right version. E.g. for mahout, use the version you’ll find here.

Then you can go back to the terminal in your hadoop-first-example directory and type again mvn eclipse:eclipse to regenerate the eclipse files with now the hadoop dependency. You can now refresh your directory. You have now an “hadoop-ready” project.

Step 3: Put your map reduce program into your project and prepare the data on HDFS

The first time you heard about MapReduce, there is a good chance that you also heard about the word count example. The wordCount code on hadoop website is quiet outdated for hadoop v0.20, here is a link to a blog post with a more updated version of the word count code that will work with the 0.20.2 hadoop version used in that tutorial (Thanks Yi!). Be sure to put that code into the src directory of your project into a package called com.philippeadjiman.hadooptraining (or whatever else, as long as it matches to your package declaration).

Then before to deploy the job, you’ll have to count some words. Following a moby dick tradition in this blog,  let’s download the full english raw text of moby dick that you can  find here.  If you want to run the hadoop job on it, you’ll have to put this file on HDFS, the underlying hadoop file system (check the “useful links” section below if you’ve never heard about HDFS).

Navigating, reading from and writing to HDFS is super simple and if you’re already familiar with regular unix file system commands  then you’ll get it instantly: almost all the commands are the same, you just have to pass them to the wrapper command hadoop with ‘fs’ as argument and a ‘-’ before the command. Examples (to run from a regular terminal):

hadoop fs -help # will print all the command that you can execute on the HDFS
hadoop fs -ls  # will perform an ls from the HDFS home directory (set to /user/training in the VM).
hadoop fs -mkdir input # will create the directory 'input' in the HDFS home directory (check if it does not already exists)
hadoop fs -mkdir output # will create the directory 'input' in the HDFS home directory (check if it does not already exists)
hadoop fs -put mobyDick.txt input # will put your local copy of mobyDick into the directory 'input' on the HDFS

Step 4: package your job, run it, observe the result

To launch your job on the hadoop infrastructure, you’ll have to package it into a jar file. With maven, nothing is more simple. Just go into your hadoop-first-example directory and type:

mvn jar:jar

This will generate a jar into a sub-directory called target. Your jar will have a name like hadoop-first-example-1.0-SNAPSHOT.jar (you can change that generated name by editing the jar section of your pom). You can check that your jar file contains as expected the WordCount.class (and its inner classes) by typing:

jar -tf hadoop-first-example-1.0-SNAPSHOT.jar

You can now launch your hadoop job by executing the following command (adapt it with the correct package name if necessary):

hadoop jar hadoop-first-example-1.0-SNAPSHOT.jar com.philippeadjiman.hadooptraining.WordCount /user/training/input /user/training/output

Cloudera also comes with an handy web interface allowing to, among other things, monitor the jobs running on the cluster. Just open firefox and go to the url http://localhost:8088/. From the menu at the top right side of the page, choose job browser and you should see the status of your job. You can also click on it to see the details (status and number of mapper/reducer of your job):

cloudera-web-interface

Using the cloudera web interface to see your job status (click to enlarge)

You can also see your job output from there but I found it easier to check it directly from HDFS:

hadoop fs -ls output # should now contains something (a file named part-00000 should be your output)
hadoop fs -cat output/part-00000 | less # will let you browse easily your output

Important: if you want to run your job another time, you’ll have to first erase all the current files of the output:

hadoop fs -rmr output # erase all the files contained in the output directory

You’ll notice that the output file contains many words with the ” character and other similar noise. This is of course because a simple StringTokenizer is used in the map function. To parse the text correctly, consider using some standard analyzers from Lucene for instance (you have an example in the code of step 2 of this post).

Step 5: Modify, Customize, Play, Learn. Now start the real fun…

Now that you built and deployed you own project from scratch, you have all you needs to modify certain parts of the code, create new map/reduce programs, test methods from the hadoop api, observe the results.

You can for instance try to see what happens if you use more than one reducer in the word count job (using conf.setNumReduceTasks(2) in the main method). Which lines are sent to which reducer? How to control that? How to sort the output of word count by number of occurrence (highest number first)?

Also I recommend to go over this tutorial that shows how to build an inverted index (see the theory here) using map/reduce (the tutorial contains many broken references w.r.t. the cloudera VM but you don’t care since now you know what you’re doing ;) )

Future posts of this series will leverage the playground we built here to illustrate and learn about other interesting stuff around hadoop.

Useful links:

hadoop-first-example-1.0-SNAPSHOT

Drawing A Zipf Law Using Gnuplot, Java and Moby-Dick

whaleThere are many tools out there to build more or less quickly any kind of graphs. Depending on your needs a tool may be more suited than another. When it comes to draw graphs from a set of generated coordinates, I love the simplicity of gnuplot.

Let’s see together a simple example that explain how to draw a zipf law observed on a long english text.
If you’re not familiar with zipf law, simply put it states that the product of the rank (R) of a word and its frequency (F) is roughly constant. This law is also know under the name “principle of the least effort” because people tends to use the same words often and rarely use new or different words.

Step 1 : Install gnuplot

For mac, check this.
For linux, depending on your distrib it should be as simple as an apt-get install (for ubuntu you can check this howto).
For windows you can either go the “hard” way with cygwin + X11 (see Part 1,4 and 5 of those instructions) or the easy way by clicking on pgnuplot.exe located in the gpXXXwin32.zip located here (this last solution may be also easier if you want to have copy/paste between the gnuplot terminal and other windows).

Step 2: Generate the Zipf Law data using Java and Moby Dick!

As I told you above, gnuplot is particularly simple for drawing a set of generated coordinates. All you have to do is to generated a file containing on each line a couple of coordinates.

For the sake of the example, I will use the full raw text of Moby Dick to generate the points. The goal is to generate a list of points of the form x y where x represents the rank of the word (the more frequent the word is, the higher its rank) and y represents its number of occurrences.

Find below the java code I used to do that. If you want to execute it, you will need lucene and the google collections (soon to become part of guava) libraries.

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
 
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
 
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.collect.Multiset.Entry;
 
public class ZipfLawOnMobyDick {
	public static void main(String[] args) throws IOException {
 
		//Multiset for storing word occurrences
		Multiset multiset = HashMultiset.create();
 
		//Creating a standard analyzer with no stop words (we need them to observe the zipf law)
		String[] STOP_WORDS = {};
		StandardAnalyzer analyzer = new StandardAnalyzer(STOP_WORDS);
 
		//Initializing the multiset by parsing the whole content of Moby Dick
		TokenStream stream = analyzer.tokenStream("content", new FileReader(new File("C:\moby_dick.txt")));
		Token token = new Token();
		while ((token = stream.next(token)) != null){
			multiset.add(token.term());
		}
 
		//Sorting the multiset by number of occurrences using a comparator on the Entries of the multiset
		List&gt; l = new ArrayList&gt;(multiset.entrySet());
		Comparator&gt; occurence_comparator = new Comparator&gt;() {
			public int compare(Multiset.Entry e1, Multiset.Entry e2) {
				return e2.getCount() - e1.getCount() ;
			}
		};
		Collections.sort(l,occurence_comparator);
 
		int rank = 1;
		for( Multiset.Entry e : l ){
			System.out.println(rank+"t"+e.getCount());
			rank++;
		}
	}
}

This will generate the following output (the set of coordinates) that you can put in a file called moby_dick.gp. If you’re curious about what are the 100 hottest keywords of the whole text you can check them here.

Step 3: Drawing using gnuplot

What you can do first is simply to type the following command in the gnuplot console (you have to be on the same directory as the moby_dick.gp file):

plot [0:500][0:16000] "moby_dick.gp"

It simply draws the points and rescale the range of x and y respectively to [0:500] and [0:16000] so we can see something.
Play with the ranges to see the differences.
If you want the dots to be connected, just type:

plot [0:500][0:16000] "moby_dick.gp" with lines

If you want to add some legends, you can put some labels and arrows.
Here is an example of a gnuplot script that will set some information on the graph (you can simply copy/paste it in the gnuplot console):

set xlabel "word rank"
set ylabel "# of occurrences"
set label 1 "the word ranked #14 occurs 1753 times" at 70,4000
set arrow 1 from 65,3750 to 15,1800
plot [0:500][0:16000] "moby_dick.gp

As you can see it is pretty straightforward. You can play with the coordinates to adjust where to put the labels and arrow.
You will obtain this graph (click to enlarge):

moby_dick

To export it as a png file just type:

set terminal png
set output "moby_dick.png"
plot [0:500][0:16000] "moby_dick.gp"

You also might want to try a log scale on the vertical axis as to not waste the majority of the graph’s scale (thanks Bob for the remark).
To do so, you can simply type in the gnuplot console:

set logscale y

by plotting within the range [1:3000][5:10000], you’ll obtain:

moby_dick_semilog

Finally, you might want to use a log-log scale that are traditionally used to observe such power laws. Just set the logscale for x as you did for y and you’ll obtain:

moby_dick_loglog

You can of course add as much eye candies as you want (the demo page of the gnuplot website gives tons of example).

Also, there are probably dozens of ways to draw the same thing, I just loved the fun and simplicity of that one.

Flexible Java Profiling And Monitoring Using The Netbeans Profiler

cpuProfile I have tested a lot of those open source profiler. My preference goes definitely to the integrated Netbeans profiler. It was simply the easiest and unified solution adapted to all the different settings I ever met, including profiling java applications that (i) were not developed under netbeans (ii) were only in the form of standalone jar (iii) were running on a remote Linux machine for which no X server were running (i.e. no UI), and other cases.

Here I describe how in 3 simple steps you can profile any java application using the wonderful Netbeans profiler.

Step 1: Download and install the latest Netbeans version on your machine(s)

On the netbeans download page choose the version adapted for your environment (Windows,Linux,Solaris,Mac…) and download/install it. All the bundles contain the profiler so I choose the lightest one: the JavaSE. If you want to profile a program running on a remote machine(s), you’ll have to download/install it on each machine.

Step 2: Modify the command line that runs the java application that you want to profile/monitor

You just have to add an argument to the Java VM.
On windows, the argument to add is of the form:

 -agentpath:"C:Program FilesNetBeans 6.7.1profiler3libdeployedjdk16windowsprofilerinterface.dll"="C:Program FilesNetBeans 6.7.1profiler3lib,5140"

Replace the portion “C:Program FilesNetBeans 6.7.1profiler3″ by the correct path (located where you installed Netbeans). Keep 5140, it is the port on which the application will listen for a remote profiler session (that you can also perform locally, as in this tutorial).
On Linux, it is exactly the same, just look for the right path containing the profiler3 folder.
So the java command line of the application to profile should look something like:

java -agentpath:"C:Program FilesNetBeans 6.7.1profiler3libdeployedjdk16windowsprofilerinterface.dll"="C:Program FilesNetBeans 6.7.1profiler3lib,5140" MyApp param1 param2

When launching this command, you should see on your console a message saying:
Profiler Agent: Waiting for connection on port 5140 (Protocol version: 9)
meaning that the application is listening and waiting for a profiler session on port 5140.

Note the flexibility behind this approach: it allows you to add this simple argument to the exsiting command of (i) any java applications running inside eclipse (in that case just open the “Run configuration” windows, in the “Arguments” tab just add the -agentpath option in the “VM arguments” section) or other IDE than Netbeans, (ii) any remote java applications (iii) any standalone jar file, or whatever existing java command that runs any kind of java application you can imagine…

Step 3: Run the Netbeans profiler GUI

Just open Netbeans, profile -> attach profiler. Choose which kind of profiling/monitoring you need, you can also configure it.

attachProfiler

Press Attach. Note that the first time you attach a profiler it may fail since you have to calibrate the profiler (in that case, a simple textbox will tell you how, it takes seconds).

That’s it!! You can now see in real time which part of your application is the heaviest, estimate what its memory footprint, analyze the threads and much more.

memory

If you want even more, note that it also exists specific profilers for collections (HashMap, HashSet, ArrayList, …) like collection spy (not free).

BeanShell Tutorial: Quick Start On Invoking Your Own Or External Java Code From The Shell

bshsplash3BeanShell is a lightweight scripting language that’s compatible with the Java language.
It provides a dynamic environment for executing Java code in its standard syntax but also allow common scripting conveniences such as loose types, commands, and method closures like those in Perl and JavaScript. It is considered so useful that it should became part of the J2SE at some time in the future (the BeanShell Scripting Language JSR-274 , has passed the voting process with flying colors).

Here I simply describe how to call you own code or any external existing code directly from the bean shell. You first have to download the last bean shell jar release. Let’s suppose that you put it in the directory “C:libs” along with the famous Apache commons lang library. So we suppose that “C:libs” contains two jars called bsh-2.0b4.jar and commons-lang-2.4.jar.

Open a command prompt and type:

java -cp C:libsbsh-2.0b4.jar;C:libscommons-lang-2.3.jar bsh.Interpreter

You should see a prompt “bsh %” indicating that the bean shell session has started. So here an example of session using the method getLevenshteinDistance from the StringUtils utility class of the apache commons lang package:

bsh % import  org.apache.commons.lang.StringUtils;
bsh % d = StringUtils.getLevenshteinDistance("Louisville Slugger", "Lousiville Slugger");
bsh % print(d);
2

Note that instead of having to type the precise import, you can type instead:

bsh % import *;

This will trigger a set of “mappings” between the shell and the external jars that you specified in your classpath. By doing this, just remember that you are importing every possible class accessible from the classpath so it may force you to type the full path of classes in the case that two classes exists with the same name in different packages (it happens more often than one may think).

A good intermediary solution is to define a file called .bshrc and to put there all the specific imports that you are usually using. Then, while invoking the interpreter, just set the java system property user.home to the directory containing the .bshrc file. Let’s say for example that it is located in “C:appbshconfig”, you just have to type:

java -Duser.home=C:appbshconfig -cp C:libsbsh-2.0b4.jar;C:libscommons-lang-2.3.jar bsh.Interpreter

Note that you can add to the java command any options that you need (for instance you can use -Xmx if you need to).

For a complete doc of bean shell commands, consult the bean shell documentation page.

For an eclipse plugin allowing you to perform auto-complete from the bean shell and other nice features, take a look at EclipseShell (I didn’t tested it yet but the site contains nice screencasts and documentation).

5 Video Tutorials Of Small To Killer Eclipse Shortcuts

eclipse I believe that when you spend a significant percentage of your time on a specific software, it is an obligation to become “mouse-less” using it. Few years ago when I started to use the powerful eclipse shortcuts, I observed that my productivity was dramatically improving. You’ll be able to find a lot of posts promoting some lists of “Top 10 eclipse shortcuts” (I like this one). I believe that small video tutorials can show more easily (rather than a bunch of screenshots) the power that some shortcuts can unleash.

So here 5 small video tutorials of shortcuts ranging from small ones to killer ones, all of them together making my day on eclipse much more easier and productive. The first two are small ones but still nice and useful. The remaining ones are more advanced and really have impact since you can potentially use them every couple of line of codes.

  1. Ctrl + Alt + Arrow (up or down): duplicating lines.
  2. Impact on productivity: low to medium

  3. Alt + Arrow (up or down): moving lines
  4. Impact on productivity: low to medium

  5. Ctrl +1: How To Directly or Indirectly Use The Power Of Quick Fixes.
  6. Impact on productivity: huge

  7. Alt + Shift + L: Extract Local Variables
  8. Impact on productivity: medium

  9. Ctrl + Space: Beyond Auto Completion, The Template Assistant (+ customization)
  10. Impact on productivity: high if heavily customized

    Except those, I highly recommend to heavily use those five ones (for which I think a video is less useful):

    • Ctrl + Shift + R (open resources)
    • Ctrl + O (quick outline). Pressing Ctrl + O again will show inherited members.
    • Ctrl + E (quick switch editor). Very handy to navigate between files.
    • Alt + Shift + R (rename variable). A very powerful one since it resolves all the possible dependencies on the renamed variable (works also on filenames).
    • Ctrl + T (quick type hierarchy).

    Become as much mouse-less as possible in Eclipse. Don’t try to start using them all in one day, try to integrate one per day, even week. You’ll end up much more productive anyway.