Cascading, TF-IDF, and BufferedSum (Part 1)

Introduction

A common technique in MapReduce is to input a group of records, calculate a value from that group, and emit each record with the new value attached. While this is easy to do in raw MR jobs, the solution in Cascading is not very obvious. This tutorial introduces a new operation to Cascading called BufferedSum. BufferedSum allows us to calculate values from a group of tuples and emit the group value to individual tuples in a scalable way.

Describing the operation of BufferedSum is clearer when discussed in concrete terms, so let’s work with an example.

Example

When dealing with large amounts of documents in Hadoop, its common to have each input file to contain many documents. Our input file in this case will contain two documents:

a.txt\thello world world
b.txt\tgoodbye goodbye world

Lets say we want to calculate tf-idf for these documents. One of the first values we need is the count of the occurrence particular term within each document.

First, we will split each line into (document_id, body) pairs:

  pipe = new Each(pipe, 
      new Fields("line"), 
      new RegexSplitter(new Fields("document_id", "body"), "\t"));

From there we “tokenize” the document and extract each term:

  pipe = new Each(pipe, // tokenize words by space
      new Fields("body"),
      new RegexSplitGenerator(new Fields("term"), "\\s+"), 
      new Fields("document_id", "term"));

Now our tuple stream is the following:

a.txt hello
a.txt world
a.txt world
b.txt goodbye
b.txt goodbye
b.txt world

Count of term in document_id

We now have (document_id, term) and we want to calculate (document_id, term, term_count_in_document). With Cascading, this is easy, simply group by document_id and term and use the Count() function:

  // count how many times `term` appears in `document_id`
  pipe = new GroupBy(pipe, new Fields("document_id", "term"));
  pipe = new Every(pipe, 
      new Fields("term"), 
      new Count(new Fields("term_count_in_document")), 
      new Fields("document_id", "term", "term_count_in_document"));

Calculating total_terms_in_document

So far, so good. Up to this point Cascading has provided everything we need. However, next we want to get the total terms within each document and keep the tuples we have calculated thus far. Put another way, we have an input of (document_id, term, term_count_in_document) and we want to emit (document_id, term, term_count_in_document, total_terms_in_document)

Our first instinct might be to use GroupBy() and Count() like before. But there is a catch: Every operations emit the operator result with the group tuple (see the Each and Every Pipes in the Cascading User Guide).

This means if we group by document_id and Sum() the total_terms_in_document we will emit (document_id, total_terms_in_document). The number in total_terms_in_document will be accurate, but we lose our term and term_count_in_document.

If we try to save our other fields by grouping on all three of them (document_id, term, term_count_in_document) then we’ve “over-grouped” and every “group” is a single tuple (the input tuple) and we won’t get the count of terms in the document as a whole. BufferedSum was created to solve this problem.

BufferedSum

BufferedSum takes as its input three things:

  • The name of the Field to output
  • The name of the Field to sum
  • The other Fields to “pull through” the operation

Here is how we can use BufferedSum to achieve the desired effect:

// input: (document_id, term, term_count_in_document)
// emits: (document_id, term, term_count_in_document, total_terms_in_document) 
pipe = new GroupBy(pipe, new Fields("document_id"));
pipe = new Every(pipe, 
    new BufferedSum(new Fields("total_terms_in_document"), 
                    new Fields("term_count_in_document"),
                    new Fields("document_id", "term", "term_count_in_document")), 
    Fields.SWAP);

Note: the output selector Fields.SWAP is critical due to Cascading tuple selection.

Memory considerations

One thing to be careful of when using BufferedSum is to try and keep your groups small enough to fit in memory. However, this is not a requirement. BufferedSum uses Cascading’s SpillableTupleList which will spill to the HDFS if it grows too large. That said, spilling is an expensive operation and should be avoided if possible.

Summary

BufferedSum is a widely useful operation when dealing with sums in Cascading. In Part 2 we will use BufferedSum and Cascading to finish calculating tf-idf.

The Code

package com.xcombinator.cascading.operations.buffers;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.BufferCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.SpillableTupleList;
 
import java.util.Iterator;
 
/**
 * BufferedSum sums a value for every Tuple in a Group and emits every input
 * Tuple with the sum appended.
 * <p/>
 * 
 * 
 * EXAMPLE:
 *
 * {@code 
 *
 * // input: (document_id, term, term_count_in_document)
 * // emits: (document_id, term, term_count_in_document, total_terms_in_document) 
 *
 *     pipe = new GroupBy(pipe, new Fields("document_id"));
 *     pipe = new Every(pipe, 
 *         new BufferedSum(new Fields("total_terms_in_document"), 
 *                        new Fields("term_count_in_document"),
 *                        new Fields("document_id", "term", "term_count_in_document")), 
 *         Fields.SWAP);
 * }
 *
 * @see BufferedSum
 * 
 */
 
public class BufferedSum extends BaseOperation implements Buffer
  {
  private Double sum;
  private SpillableTupleList list;
  private Fields extrasSelector;
  private Fields fieldToSum;
 
  /**
   * Returns a BufferedSum Buffer Operation. 
   *
   * @param emittedSumFieldName a {@link Fields} naming the field to emit the sum value
   * @param fieldToSum          a {@link Fields} naming the field to sum
   * @param extrasSelector      a {@link Fields} naming the other fields to "pull through". These fields *must* be of the same order and size as the input Tuple
   */
  public BufferedSum( Fields emittedSumFieldName, Fields fieldToSum, Fields extrasSelector )
    {
    super( extrasSelector.append( emittedSumFieldName ) );
    this.extrasSelector = extrasSelector;
    this.fieldToSum = fieldToSum;
    }
 
  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    Iterator<TupleEntry> iterator = bufferCall.getArgumentsIterator();
    sum = 0.0D;
    list = new SpillableTupleList( 10000 );
 
    while( iterator.hasNext() )
      {
      TupleEntry arguments = iterator.next(); // must be called
      sum += arguments.getDouble( this.fieldToSum.get(0) );
      list.add( arguments.getTuple() );
      }
 
    for( Tuple tuple : list )
      {
      bufferCall.getOutputCollector().add( tuple.append( new Tuple( sum ) ) );
      }
 
    }
  }
Share:
  • del.icio.us
  • Reddit
  • Technorati
  • Twitter
  • Facebook
  • Google Bookmarks
  • HackerNews
  • PDF
  • RSS
This entry was posted in big-data and tagged , , . Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.
  • http://www.xcombinator.com/2010/05/14/cascading-tf-idf-and-bufferedsum-part-2/ Cascading, TF-IDF, and BufferedSum (Part 2)

    [...] Archives Skip to content « Cascading, TF-IDF, and BufferedSum (Part 1) [...]