Monthly Archives: December 2009

Hadoop Tutorial Series, Issue #2: Getting Started With (Customized) Partitioning

In the Issue #1 of this series, we set up the “learning playground” (based on the Cloudera Virtual Machine) in order to enjoy hands-on learning experiences around Hadoop. In this issue, we’ll use our playground to investigate the partitioning features offered by Hadoop.

What is it all about?

As you may know, a map/reduce job will contains most of the time more than  1 reducer.  So basically, when a mapper emits a key value pair, it has to be sent to one of the reducers. Which one? The mechanism sending specific key-value pairs to specific reducers is called partitioning (the key-value pairs space is partitioned among the reducers). A Partitioner is responsible to perform the partitioning.

In Hadoop, the default partitioner is HashPartitioner, which hashes a record’s key to determine which partition (and thus which reducer) the record belongs in.The number of partition is then equal to the number of reduce tasks for the job.

Why is it important?

First, it has a direct impact on the overall performance of your job: a poorly designed partitioning function will not evenly distributes the charge over the reducers, potentially loosing all the interest of the map/reduce distributed infrastructure.

Second, it maybe sometimes necessary to control the key/value pairs partitioning over the reducers. Let’s illustrate it on a simple example. Suppose that your job’s input is a (huge) set of tokens and their number of occurrences (for instance the output of the canonical word count hadoop example) and that you want to sort them by number of occurrences. Let’s also suppose that your job will be handled by 2 reducers. If you run your job without using any customized partitioner, you’ll get something like this:

partialSortOn2Reducers
(Click to enlarge)

As you can see, the tokens are correctly ordered by number of occurrences on each reducer (which is what hadoop guarantees by default) but this is not what you need! You’d rather expect something like:

TotalSortOn2Reducers
(Click to enlarge)

where tokens are totally ordered over the reducers, from 1 to 30 occurrences on the first reducer and from 31 to 14620 on the second. This would happen as a result of a correct partitioning function: all the tokens having a number of occurrences inferior to N (here 30) are sent  to reducer 1 and the others are sent to reducer 2, resulting in two partitions. Since the tokens are sorted on each partition, you get the expected total order on the number of occurrences.

Below, we’ll use our playground to observe the issue happening  on real data and see how we solve it using customized partitioners.

Also, as a second example of use of customized partitioning functions, let’s cite the original map/reduce google paper: “sometimes the output keys are URLs, and we want all entries for a single host to end up in the same output. To support situations like this, the user of the MapReduce library can provide a special partitioning function. For example, using “hash(Hostname(urlkey)) mod R” as the partitioning function causes all URLs from the same host to end up in the same output”.

Feeling the partitions in our playground

If your playground is not yet set up, check the Issue #1 of this series. As an input for our job, we’ll use a tsv file containing the list of tokens and their number of occurrences extracted from (once again) the full moby dick text. Click here to download this input. You’ll notice that the pairs (tokens, #occurrences) are alphanumerically sorted on tokens value.

First, we’ll use a very simple pre-processing job to transform the input data into a more convenient format to use within hadoop: the Sequence File Output Format. Sequence files are a basic file based data structure persisting the key/value pairs in a binary format and allowing you to interact more easily with basic hadoop data types (e.g IntWritable, LongWritable, etc…). Here is the simple pre-processing job:

package com.philippeadjiman.hadooptraining;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;

public class SortDataPreprocessor {

	static class PreprocessorMapper extends MapReduceBase implements Mapper {

		private Text word = new Text();

		public void map(LongWritable key, Text value,
				OutputCollector output, Reporter reporter) throws IOException {
			String line = value.toString();
			String[] tokens = line.split("t");
			if( tokens == null || tokens.length != 2 ){
				System.err.print("Problem with input line: "+line+"n");
				return;
			}
			int nbOccurences = Integer.parseInt(tokens[1]);
			word.set(tokens[0]);
			output.collect(new IntWritable(nbOccurences),word );
		}
	}

	public static void main(String[] args) throws IOException {
		JobConf conf = new JobConf(SortDataPreprocessor.class);

		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		conf.setMapperClass(PreprocessorMapper.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(Text.class);
		conf.setNumReduceTasks(0);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		JobClient.runJob(conf);
	}
}

You’ll notice that:

  • it contains only a mapper (no reducer),
  • a basic error management is performed for potential malformed lines,
  • the output key is the number of occurrences (as an IntWritable) and the output value is the associated token,
  • The sequence file output format is specified using setOutputFormat(SequenceFileOutputFormat.class);

It is sildenafil citrate medicine, which enhances blood flow near reproductive area and makes the organ becoming erect with fuller, viagra soft tabs icks.org thicker and firm erections, so theseare erection-helping medicines. Not just children can be determined to have a dysfunctional http://icks.org/n/bbs/content.php?co_id=INAUGURAL_ISSUE_1997 best generic viagra behavior. The unhealthful eating habits and cialis price in canada discount here lifestyle choices that led to a decreased utilization of higher expense processes. As we age the prenatal life force or chi is drained out from the naval and kidney areas as we move further into the ego, the illusion, and dysfunctional breathing patterns. buy soft cialis
To run it, package the job using maven (see Issue #1), put the input file on hdfs in an input directory (let’s call it input) and execute:

hadoop jar playing-with-partitions.jar com.philippeadjiman.hadooptraining.SortDataPreprocessor /user/training/input /user/training/pre_process

This will create a directory called “pre_process” on hdfs containing a set of pairs (#occurrences,token), respectively of format IntWritable and Text, in a SequenceFileOutputFormat.

Now we can, perform the sort based on this new input. Writing a job for such a task is actually trivial since this is primarily what hadoop is doing by default, so here it is:

package com.philippeadjiman.hadooptraining;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;

public class SortExample {
	public static void main(String[] args) throws IOException {
		JobConf conf = new JobConf(SortExample.class);
		conf.setJobName("sortexample");

		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		conf.setInputFormat(SequenceFileInputFormat.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(Text.class);

		conf.setNumReduceTasks(2);

		JobClient.runJob(conf);
	}
}

You’ll notice that:

  • There is neither map nor reduce methods! This is because sorting is a default behavior so we don’t have to do anything (we’re just interested here to see how it’ll be partitioned),
  • The input/output formats are specified based on the output of our pre-processing job,
  • We explicitly set the number of reducer to 2, which is the important part here since we want to observe how the output will be partitioned (without specifying it, the output will be generated using only one reducer).

Just run it using:

hadoop jar playing-with-partitions.jar com.philippeadjiman.hadooptraining.SortExample /user/training/pre_process /user/training/output

Once completed, an output directory will be created on hdfs with two files, one for each reducer that were used. You can observe the content of the output using commands like:

hadoop fs -cat output/part-00000 | less
hadoop fs -cat output/part-00001 | less

As you’ll see, the two outputs are sorted but do not represent a total order, as explained above. Let’s fix it.

How to implement your own partitioning function

So how do we create a total order out of those two reducers?

A first solution is to create our own partitionner which is as simple as implementing the Partitioner<K,V> interface:

package com.philippeadjiman.hadooptraining;

package com.philippeadjiman.hadooptraining;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;

public class MyPartitioner implements Partitioner<IntWritable,Text> {
	@Override
	public int getPartition(IntWritable key, Text value, int numPartitions) {
		/* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */
		int nbOccurences = key.get();

		if( nbOccurences < 3 )
			return 0;
		else
			return 1;
	}

	@Override
	public void configure(JobConf arg0) {

	}

}

This implementation of getPartition specifies to put all the pairs having a key (which is here the number of occurrences) being less than 3 into the first partition and the other into the second one. This is of course a pretty bad practice to hard code like that a partitioning function but this is simply for the sake of understanding.

To use this created partition just add the following line to the main method of the previous SortExample class:

conf.setPartitionerClass(MyPartitioner.class);

Why did I choose 3? Because as a side effect of the Zipf law, the number of tokens having a number of occurrences of 1 and 2 will be as much as big that all the others together! (to see a Zipf Law in action, check this post). So 3 was chosen just for balancing a little bit the partitions.

Re-package the code with the customized partition, remove the old output, run it again and check that our problem is solved: there is a total order over the partitions.

How to automatically find “good” partitioning function using sampling

Now, as I mentioned above, it is a pretty bad practice to hard code how to partition the keys. But on the other hand, how would I know automatically and in advance how to divide the partition in the general case?

Hadoop provide a nice way to approximate a priori a good partitioning function using an InputSampler. For instance, a RandomSampler, will sample the input at random to estimate what is the best way to partition. The sampler will wrote into a file called, by default, _patition.lst describing the partition that the job will automatically use to decide which key/value pairs to send to which reducers. This mechanism has to be used in combination with a TotalOrderPartitioner.

Here is a code sample using such a sampler with a total order partitioner:

InputSampler.Sampler<IntWritable, Text> sampler =
	new InputSampler.RandomSampler<IntWritable, Text>(0.1, 100);
InputSampler.writePartitionFile(conf, sampler);
conf.setPartitionerClass(TotalOrderPartitioner.class);

Sometimes, there are some issues with the file _partition.lst that is not found. It always worked for me when I specified explicitly where to find the file using the TotalOrderPartitioner.setPartitionFile(); method. Also pay attention to invoke this method before the call to writePartitionFile. Also, note that the sampling mechanism is necessary since considering all the input to compute the partition would be inefficient for large files.

Some remarks

  • A customized partitioning would not have been necessary if we had only one reducer since all the key/value pairs would have ended into the same output file. It is easy to understand that such a constraint is a nonsense and that using more than one reducer is most of the time necessary, else the map/reduce concept would not be very useful…
  • Even if we used a small dataset on the semi-distributed infrastructure of the cloudera virtual machine to observe partitioning in action and to learn how to customize it, the same concepts can be applied to a larger infrastructure. To see a very interesting use case of customized partitioning strategy for sorting purpose on a big infrastructure, check the famous TeraByte sort on hadoop.

Conclusion

Partitioning in map/reduce is a fairly simple concept but that is important to get correctly. Most of the time, the default partitioning based on an hash function can be sufficient. But as we illustrated in this Issue, you’ll need sometime to modify the default behavior and to customize your own partitioning suited for your needs.

If you have some questions, or if you have experimented other use cases of customized partitioning in your application, please comment/share. See you soon for another Issue.

Hadoop Tutorial Series, Issue #1: Setting Up Your MapReduce Learning Playground

hadoop-logo

Update: Instructions updated for hadoop 0.20.2.
This is the first post of a series of small hadoop tutorials introducing progressively core hadoop functionnalities. You might be interested in that series if you recognized yourself in one or more of the following points :

  • You’ve heard about the basics of MapReduce (else check the links that I recommend at the end of this post)
  • Even if you’re not working at Google, Yahoo, Facebook (or many others) for now, you know it’s been years that MapReduce/Hadoop has become a must-have skill and that you should practice it but you have very few time
  • You did try to read some tutorials but it was always either not hands-on enough or too much detailed

This first post is dedicated to build what I called a “MapReduce Learning Playground”: for practice or for a real need, you read or wrote  on a sheet of paper the map and reduce functions that might solve a particular problem and you want to see it in action, not necessarily on huge data sets, just check that it computes the correct answer.

A lot of material can be found on the internet to do the same. The steps below are my attempt to present the best part of all the training material that I read on the subject, adapt it, adding it some glue (here with maven) and compile the whole into something that I hope will save you some time.

Step 1: Install the cloudera training virtual machine

Cloudera is really doing a great job at providing training material for hadoop. The most useful one in my opinion is their hadoop training virtual machine. Update: they changed a lot of things since that post was written, here is a better link for a training virtual machine (Thanks Karthick for letting me know that the old link was broken ). It provides a VMWare image of a Linux Ubuntu distribution with a pre-installed hadoop cluster in Pseudo-Distributed Mode.

To install the VM on your computer, just follow their instructions, it is free (except if you’re on Mac) and very easy. The VM comes also with hadoop related tools already installed like hive and pig (it will probably be the subject of other posts).

At the end of the installation, open the VMWare Player, start the cloudera VM (with training/training as user/pass) and you should get something like this:

cloudera-hadoop-vm-training
The cloudera hadoop virtual machine for training (click to enlarge)

Step 2: Creating an “hadoop ready” project with maven

Cloudera does provide some training projects already mounted in the eclipse installed in the VM but those projects contains several small errors (like missing dependencies). Even if those are very easily fixed, I describe here how to build your own project from scratch; it will give you a better basis in case you want to extend them and you’ll always be able to copy-paste the map-reduce functions of some interesting cloudera training projects into your own ones.

First install maven on the VM. Open a terminal from the VM and type:

sudo apt-get install maven2

If for some reasons you run into trouble with the installation of maven, you can always download it directly from here. Assuming you are unzipping it into the /user/local/apache-maven directory, you can add those lines into your /home/training/.bashrc configuration file:

export M2_HOME=/usr/local/apache-maven
export M2=$M2_HOME/bin
export PATH=$M2:$PATH

Then from the same terminal go into the workspace directory (usually located at ~/workspace) and create a java project hierarchy using the following maven command (change the groupId and the artifactId as you like):

mvn archetype:create -DarchetypeGroupId=org.apache.maven.archetypes  -DgroupId=com.philippeadjiman.hadooptraining -DartifactId=hadoop-first-example

Then enter into the hadoop-first-example directory and generate the necessary files for eclipse:

mvn eclipse:eclipse

Then open eclipse from the VM then File -> Import -> Existing Projects into Workspace -> Browse, choose the hadoop-first-example directory, OK -> Finish. Then you should see your project on the left side.

You may have an error on a M2_REPO unresolved variable, that’s OK, it’s because it is the first time that this eclipse use a maven project. To fix it, just right click on your project -> Build Path -> Configure Build Path -> Add Variable -> Configure Variables -> New. In the name type M2_REPO and in the path type /home/training/.m2/repository (just check that this directory exists).

Then you’ll have to add the hadoop jar dependency. To do so, you just have to open you pom.xml file (you’ll see it at the bottom of your project) and the following dependency (add it just before the </dependencies> closing tag ):

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-core</artifactId>

<version>0.20.2</version>

</dependency>

You can check here if there is a newer version. Note also that if you plan to use hadoop to run with a specific framework built on top of it then make sure you’re using the right version. E.g. for mahout, use the version you’ll find here.

Then you can go back to the terminal in your hadoop-first-example directory and type again mvn eclipse:eclipse to regenerate the eclipse files with now the hadoop dependency. You can now refresh your directory. You have now an “hadoop-ready” project.

Step 3: Put your map reduce program into your project and prepare the data on HDFS

The first time you heard about MapReduce, there is a good chance that you also heard about the word count example. The wordCount code on hadoop website is quiet outdated for hadoop v0.20, here is a link to a blog post with a more updated version of the word count code that will work with the 0.20.2 hadoop version used in that tutorial (Thanks Yi!). Be sure to put that code into the src directory of your project into a package called com.philippeadjiman.hadooptraining (or whatever else, as long as it matches to your package declaration).

Then before to deploy the job, you’ll have to count some words. Following a moby dick tradition in this blog,  let’s download the full english raw text of moby dick that you can  find here.  If you want to run the hadoop job on it, you’ll have to put this file on HDFS, the underlying hadoop file system (check the “useful links” section below if you’ve never heard about HDFS).

Navigating, reading from and writing to HDFS is super simple and if you’re already familiar with regular unix file system commands  then you’ll get it instantly: almost all the commands are the same, you just have to pass them to the wrapper command hadoop with ‘fs’ as argument and a ‘-‘ before the command. Examples (to run from a regular terminal):

hadoop fs -help # will print all the command that you can execute on the HDFS
hadoop fs -ls  # will perform an ls from the HDFS home directory (set to /user/training in the VM).
hadoop fs -mkdir input # will create the directory 'input' in the HDFS home directory (check if it does not already exists)
hadoop fs -mkdir output # will create the directory 'input' in the HDFS home directory (check if it does not already exists)
hadoop fs -put mobyDick.txt input # will put your local copy of mobyDick into the directory 'input' on the HDFS

Step 4: package your job, run it, observe the result

To launch your job on the hadoop infrastructure, you’ll have to package it into a jar file. With maven, nothing is more simple. Just go into your hadoop-first-example directory and type:

mvn jar:jar

This will generate a jar into a sub-directory called target. Your jar will have a name like hadoop-first-example-1.0-SNAPSHOT.jar (you can change that generated name by editing the jar section of your pom). You can check that your jar file contains as expected the WordCount.class (and its inner classes) by typing:

jar -tf hadoop-first-example-1.0-SNAPSHOT.jar

You can now launch your hadoop job by executing the following command (adapt it with the correct package name if necessary):

hadoop jar hadoop-first-example-1.0-SNAPSHOT.jar com.philippeadjiman.hadooptraining.WordCount /user/training/input /user/training/output

Cloudera also comes with an handy web interface allowing to, among other things, monitor the jobs running on the cluster. Just open firefox and go to the url http://localhost:8088/. From the menu at the top right side of the page, choose job browser and you should see the status of your job. You can also click on it to see the details (status and number of mapper/reducer of your job):

cloudera-web-interface
Using the cloudera web interface to see your job status (click to enlarge)

You can also see your job output from there but I found it easier to check it directly from HDFS:

hadoop fs -ls output # should now contains something (a file named part-00000 should be your output)
hadoop fs -cat output/part-00000 | less # will let you browse easily your output

Important: if you want to run your job another time, you’ll have to first erase all the current files of the output:

hadoop fs -rmr output # erase all the files contained in the output directory

You’ll notice that the output file contains many words with the ” character and other similar noise. This is of course because a simple StringTokenizer is used in the map function. To parse the text correctly, consider using some standard analyzers from Lucene for instance (you have an example in the code of step 2 of this post).

Step 5: Modify, Customize, Play, Learn. Now start the real fun…

Now that you built and deployed you own project from scratch, you have all you needs to modify certain parts of the code, create new map/reduce programs, test methods from the hadoop api, observe the results.

You can for instance try to see what happens if you use more than one reducer in the word count job (using conf.setNumReduceTasks(2) in the main method). Which lines are sent to which reducer? How to control that? How to sort the output of word count by number of occurrence (highest number first)?

Also I recommend to go over this tutorial that shows how to build an inverted index (see the theory here) using map/reduce (the tutorial contains many broken references w.r.t. the cloudera VM but you don’t care since now you know what you’re doing 😉 )

Future posts of this series will leverage the playground we built here to illustrate and learn about other interesting stuff around hadoop.

Useful links:

hadoop-first-example-1.0-SNAPSHOT

If cialis brand worked miracles for some guys, but for the rest of us it takes a minimum of three years to get a degree in anything – give yourself some time to learn. With your partner, like the other person in viagra from canada pharmacy the relationship above their own. Some of the important tips to keep in mind during dive sessions include: Proper Descent and Ascent It is important that you descent and ascent slowly and carefully every time. http://djpaulkom.tv/crakd-gospel-granny-lyrical-livelys-mixtape-is-fire/ sildenafil for women buy You can buy Shilajit ES capsules from reliable online levitra india online stores using a credit or debit card.