Getting Cascading to Read Sequence Files Created Somewhere Else

Sometimes you can’t control where your data comes from or how it’s formatted. For instance, where I work a lot data is stored in SequenceFiles. Unfortunately, the files are not taking advantage of the typing SequenceFiles provide and instead each record is a single field containing delimited string.

I like to use Cascading (or cascalog) for my Hadoop jobs, but out of the box Cascading doesn’t support using SequenceFiles that were created outside of Cascading. That is to say, Cascading requires that your SequenceFiles values be an instance of Tuple.

The solution is to create your own Scheme that parses a SequenceFile according to your own format. In my case I just want to parse each line as the string list.

The code is simple but may not be obvious for a first-time Cascading user. I hope this will save someone a few minutes.

    package com.xcombinator;
 
    import java.io.IOException;
 
    import cascading.tap.Tap;
    import cascading.tuple.Fields;
    import cascading.tuple.Tuple;
    import cascading.tuple.TupleEntry;
    import cascading.tuple.Tuples;
    import cascading.scheme.SequenceFile;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.SequenceFileInputFormat;
    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 
    /**
     * A SequenceFileAsText is a type of {@link SequenceFile}, however the
     * SequenceFile has been created outside of Cascading and is assumed to have a
     * value of a string.
     */
    public class SequenceFileAsText extends SequenceFile
      {
      /** Field serialVersionUID */
      private static final long serialVersionUID = 1L;
 
      /** Protected for use by TempDfs and other subclasses. Not for general consumption. */
      protected SequenceFileAsText()
        {
        super( null );
        }
 
      /**
       * Creates a new SequenceFileAsText instance that stores the given field names.
       *
       * @param fields
       */
      public SequenceFileAsText( Fields fields )
        {
        super( fields );
        }
 
      @Override
      public Tuple source( Object key, Object value )
      {
        if (value instanceof Tuple)
        {
          return (Tuple) value;
        }
        else if (value instanceof Comparable)
        {
          return new Tuple((Comparable) value);
        }
        else if (value != null)
        {
          return new Tuple(String.valueOf(value));
        }
        else
        {
          return new Tuple((Comparable)null);
        }
      }
 
    }
Share:
  • del.icio.us
  • Reddit
  • Technorati
  • Twitter
  • Facebook
  • Google Bookmarks
  • HackerNews
  • PDF
  • RSS
This entry was posted in programming. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.