How to use a raw MapReduce job in Cascading

Cascading is a great abstraction over MapReduce.

However, sometimes you may have code for an existing MapReduce job or want to drop directly to Hadoop for efficiency. Even if you’re using raw MapReduce jobs, Cascading can still be useful in planning the overall data pipeline.

The code below is an example of how to use a raw MapReduce job in a Cascade. The main thing to take away is that we are creating intermediate sinks and sources and relying on Cascading to schedule the flows in the correct order.

NOTE: this code below depends on commit f0dd84cd which is a patch to MapReduceFlow.java that allows you to specifically set the Taps for a MapReduceFlow. I’ve contacted Chris about integrating this into the trunk.

Also note this patch applies to the branch wip-1.1 and later.

package com.xcombinator.hadoopjobs.mapreducetest;
 
import cascading.cascade.*;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.MapReduceFlow;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.*;
import cascading.pipe.*;
import cascading.scheme.*;
import cascading.tap.*;
import cascading.tuple.Fields;
import cascading.operation.Identity;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import cascading.operation.Debug;
 
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 
/**
 * An example file to use a raw MapReduce job in cascading
 */
public class Main extends Configured implements Tool
  {
  private static final Logger LOG = Logger.getLogger( Main.class );
 
  public int run(String[] args)
  {
    JobConf conf = new JobConf(getConf(), this.getClass());
    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass(properties, this.getClass());
 
    CascadeConnector cascadeConnector = new CascadeConnector();
    FlowConnector flowConnector = new FlowConnector(properties);
 
    String inputPath  = args[0];
    String outputPath = args[1];
    String intermediatePath1 = args[1] + "-mr-input";
    String intermediatePath2 = args[1] + "-mr-output";
 
    Scheme textLineScheme = new TextLine();
 
    Tap sourceTap = new Hfs(textLineScheme, inputPath);
    Tap intermediateTap1 = new Hfs(new TextLine(new Fields("line")), intermediatePath1);
    Tap intermediateTap2 = new Hfs(textLineScheme, intermediatePath2);
    Tap sinkTap   = new Hfs(textLineScheme, outputPath);
 
    // create our first flow, sink to the intermediateTap
    Pipe wsPipe = new Each("wordsplit", 
        new Fields("line"), 
        new RegexSplitGenerator(new Fields("word"), "\\s+"), 
        new Fields("word"));
 
    Flow parsedLogFlow = flowConnector.connect(sourceTap, intermediateTap1, wsPipe);
 
    // Create a pipe and set our mr job for it 
    Pipe importPipe = new Pipe("mr pipe");
 
    JobConf mrconf = new JobConf();
    mrconf.setJobName("custom mr");
 
    mrconf.setOutputKeyClass(LongWritable.class);
    mrconf.setOutputValueClass(Text.class);
 
    // the IdentityMapper, in this case, will actually output the key, which is
    // a long of offset in bytes. Not what we'd usually want, but we'll leave
    // it in for now.
    mrconf.setMapperClass(IdentityMapper.class);
    mrconf.setReducerClass(IdentityReducer.class);
 
    // note that your input is straight text-lines. This means in a real mr job
    // you'd most likely need to split the line by some convention
    TextInputFormat format = new TextInputFormat();
    format.configure(mrconf);
 
    // NOTE: this is both here and in the MapReduceFlow below
    FileInputFormat.setInputPaths(mrconf, intermediateTap1.getPath());  
    FileOutputFormat.setOutputPath(mrconf, intermediateTap2.getPath()); // likewise
 
    // create our second flow, this one is for the mrjob. Notice source and sink taps
    Flow mrFlow = new MapReduceFlow("mrflow", 
      mrconf, intermediateTap1, intermediateTap2, false, true);
 
    // create our third "regular" cascading pipe
    Pipe countPipe = new Pipe("count");
 
    // b/c our IdentityMapper is emitting long of offset in the line, just
    // strip that out. You wouldn't have to do this if you had a smarter Mapper
    // class.
    countPipe = new Each(countPipe, 
        new Fields("line"), 
        new RegexParser(new Fields("word"), ".*?\\t(.*)"), 
        new Fields("word"));
 
    countPipe = new GroupBy(countPipe, new Fields("word"));
    countPipe = new Every(countPipe, new Count(), new Fields("count", "word"));
 
    // create the flow for the last count pipe
    Flow countFlow = flowConnector.connect(intermediateTap2, sinkTap, countPipe);
 
    Cascade cascade = cascadeConnector.connect(parsedLogFlow, mrFlow, countFlow);
    cascade.complete();
 
    // if you want to get rid of the intermediate files you 
    // could do something like the following here:
    // Path tmp = tap.getPath();
    // FileSystem fs = tmp.getFileSystem(conf);
    // fs.delete(tmp, true);
 
    return 0;
  }
 
  public static void main(String[] args) throws Exception 
  {
    int res = ToolRunner.run(new Configuration(), new Main(), args);
    System.exit(res);
  }
 
  }
Share:
  • del.icio.us
  • Reddit
  • Technorati
  • Twitter
  • Facebook
  • Google Bookmarks
  • HackerNews
  • PDF
  • RSS
This entry was posted in big-data and tagged , , , . Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.
  • dano

    great article! I was also wondering if you’ve ported this to 2.x?