How to use Cascading with Hadoop Streaming

Last time we talked about how to use a raw MapReduce job in Cascading. Now we are going to up the ante by using Hadoop Streaming as a Flow in Cascading. In this example, we hook a python streaming job into a Cascade.

Its pretty easy once you know how to do it:

  • Create a JobConf that defines the parameters for the streaming job
  • Send up the hadoop-*-streaming.jar with your cascading job by putting it in your jar
  • Send up the scripts (python, in this case) by using the -file option
  • Send up any other dependencies, corpora, etc. by using the -file, -cacheFile, or -cacheArchive options (See the Hadoop Streaming page for more details)

Resources

NLTK

To generate the nltkandyaml.mod zip file do the following:

# download nltk and unzip
cd nltk
zip -r nltkandyaml.zip nltk yaml
mv nltkandyaml.zip nltkandyaml.mod

Note that this technique is taken from Cloudera

WordNet

The WordNet zip file needs to be flat. e.g. don’t zip up the files with a subdirectory. You could create this file like so:

# download and unzip the wordnet corpus
cd wordnet
zip -r ../wordnet-flat.zip *

Streaming Script

In python, we’ll be using zipimport.zipimporter to import the nltk libraries from a zip file. In Hadoop 0.20.0, Hadoop didn’t decompress our wordnet-flat.zip file automatically (but we’ve heard reports that it will, but I’m not sure which versions). For us the .zip file was placed in lib relative to the pwd of the script. This allowed us to keep the WordNet corpus as a zip and read it in that format.

wn = WordNetCorpusReader(nltk.data.find('lib/wordnet-flat.zip'))

(In this code we’re not using the python reducer.)

#!/usr/bin/env python 
import os
import re
import sys
import zipimport
 
importer = zipimport.zipimporter('nltkandyaml.mod')
yaml = importer.load_module('yaml')
nltk = importer.load_module('nltk')
punct = re.compile('[^\w\s]+')
 
from nltk.corpus.reader import wordnet
from nltk.corpus.reader import WordNetCorpusReader
 
nltk.data.path += ["."]
wn = WordNetCorpusReader(nltk.data.find('lib/wordnet-flat.zip'))
 
def mapper(args):
  line = sys.stdin.readline();
  try:
    while line:
      line = line.strip()
      word = line
      all_synonyms = []
 
      string_synsets = wn.synsets(word)
 
      for synset in string_synsets:
        synonyms = [lemma.name for lemma in wn.synset(synset.name).lemmas]
        synonyms.pop(0)
        for synonym in synonyms:
          synonym = re.sub("_", " ", synonym)
          all_synonyms.append(synonym) 
 
      print "\t".join([word, ','.join(all_synonyms)])
      line = sys.stdin.readline()
  except "end of file":
    return None
 
# we're not using this, but we could
def reducer(args):
  for line in sys.stdin:
    line = line.strip()
    print line
 
if __name__ == "__main__":
  if sys.argv[1] == "mapper":
    mapper(sys.argv[2:])
  elif sys.argv[1] == "reducer":
    reducer(sys.argv[2:])

Cascading Code

Here’s the bulk of the code that will achieve the effect we want. Like last time, we’re using two intermediate taps as the input and output of the streaming job. Also, we’re just using TextLine files for simplicity. If you don’t want the intermediate files hanging around, look at the comments towards the bottom for some example code on how to remove the files when the job is finished running.

package com.xcombinator.hadoopjobs.cascadingstreamingtest;
 
import cascading.cascade.*;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.MapReduceFlow;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.*;
import cascading.pipe.*;
import cascading.scheme.*;
import cascading.tap.*;
import cascading.tuple.Fields;
import cascading.operation.Identity;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import cascading.operation.Debug;
import org.apache.hadoop.streaming.StreamJob;
import java.io.IOException;
 
/**
 * An example file to use a Hadoop Streaming job in cascading
 */
public class Main extends Configured implements Tool
  {
  private static final Logger LOG = Logger.getLogger( Main.class );
 
  public int run(String[] args)
  {
    JobConf conf = new JobConf(getConf(), this.getClass());
    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass(properties, this.getClass());
 
    CascadeConnector cascadeConnector = new CascadeConnector();
    FlowConnector flowConnector = new FlowConnector(properties);
 
    String inputPath  = args[0];
    String outputPath = args[1];
    String intermediatePath1 = args[1] + "-mr-input";
    String intermediatePath2 = args[1] + "-mr-output";
 
    Scheme textLineScheme = new TextLine();
 
    Tap sourceTap = new Hfs(textLineScheme, inputPath);
    Tap intermediateTap1 = new Hfs(textLineScheme, intermediatePath1);
    Tap intermediateTap2 = new Hfs(textLineScheme, intermediatePath2);
    Tap sinkTap   = new Hfs(textLineScheme, outputPath);
 
    // create our first flow, sink to the intermediateTap
    Pipe wsPipe = new Each("wordsplit", 
        new Fields("line"), 
        new RegexSplitGenerator(new Fields("word"), "\\s+"), 
        new Fields("word"));
 
    Flow parsedLogFlow = flowConnector.connect(sourceTap, intermediateTap1, wsPipe);
 
    // Create a pipe and set our mr job for it 
    Pipe importPipe = new Pipe("mr pipe");
    Flow mrFlow;
 
    try {
      JobConf streamConf = StreamJob.createJob( new String[]{
          "-input", intermediateTap1.getPath().toString(), 
          "-output", intermediateTap2.getPath().toString(),
 
          // straight unix
          // "-mapper", "/bin/cat",
          // "-reducer", "/usr/bin/wc"
 
          // ruby
          // "-mapper", "src/main/ruby/word_count_mapper.rb",
          // "-reducer", "src/main/ruby/word_count_reducer.rb",
          // "-file", "src/main/ruby/word_count_mapper.rb",
          // "-file", "src/main/ruby/word_count_reducer.rb"
 
          // python
          "-mapper", "python synsets.py mapper",
          "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
          "-file", "src/main/python/synsets.py",
          "-file", "resources/nltkandyaml.mod",
          "-file", "resources/lib/wordnet-flat.zip",
          });
      mrFlow = new MapReduceFlow("streaming flow", streamConf, intermediateTap1,
        intermediateTap2, false, true);
    } catch(IOException ioe) {
       ioe.printStackTrace();
       System.exit(1);
       return 1;
    }
 
    // create our third "regular" cascading pipe. this is a bit contrived, but
    // the idea is substitute all 'e's with 'x's. it's just here to show how to
    // take the input of a streaming job back into cascading
    Pipe subPipe = new Pipe("subber");
    subPipe = new Each(subPipe,
        new Fields("line"),
        new RegexReplace(new Fields("linx"), "e", "x", true),
        new Fields("linx"));
    Flow subFlow = flowConnector.connect(intermediateTap2, sinkTap, subPipe);
 
    Cascade cascade = cascadeConnector.connect(parsedLogFlow, mrFlow, subFlow);
    cascade.complete();
 
    // to get rid of the intermediate files you could do this:
    // Path tmp = tap.getPath();
    // FileSystem fs = tmp.getFileSystem(conf);
    // fs.delete(tmp, true);
 
    return 0;
  }
 
  public static void main(String[] args) throws Exception 
  {
    int res = ToolRunner.run(new Configuration(), new Main(), args);
    System.exit(res);
  }
 
  }
Share:
  • del.icio.us
  • Reddit
  • Technorati
  • Twitter
  • Facebook
  • Google Bookmarks
  • HackerNews
  • PDF
  • RSS
This entry was posted in programming. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.
  • Roc

    Any update for Cascading version 2.0? Thanks!