# Cascading, TF-IDF, and BufferedSum (Part 2)

## Introduction

The tf-idf weight (term frequency-inverse document frequency) is a weight often used in information retrieval and text mining. This weight is a statistical measure used to evaluate how important a word is to a document in a collection or corpus. 

To calculate tf-idf we need the following four values:

• The number of times a term appears in a given document (ni,j)
• The total number of terms in a given document (sum k, nk,j)
• The number of documents that contain a given term (|{d : ti E d}|)
• The total number of documents in the corpus (D)

## Mathematical Details

We want to score the importance of term ti in document dj.

Term frequency is defined by: Where ni,j is the number of occurrences of term ti in document dj.

Inverse document frequency is defined by: Where:

• D is the total number of documents in the corpus and
• |{d : ti E d}| is the number of documents in which the term ti appears.

Then: ## Operation Input

Last time we discussed the technique of taking a group of records, calculating a value from that group and emitting each record with the calculated value attached. We called this operation a BufferedSum. We’re going to build on our previous work and create a reusable component (called a SubAssembly) for calculating tf-idf using Cascading on Hadoop.

To make our tf-idf operation we need to decide what the input arguments will be. Last time, we used an input corpus of the format (document_id, body) and emitted (document_id, term, term_count_in_document) for all terms in each document. This last tuple will be the input format to our tf-idf operation.

## Creating a SubAssembly

The general format of a SubAssembly in Cascading is as follows:

```public class MySubAssembly extends SubAssembly { public MySubAssembly(Pipe pipe) { // do something with `pipe` setTails(pipe); // must register all assembly tails } }```

In our operation we are assuming that the total number of documents in the corpus is known or could be found with a simple MapReduce job. Given that we have the total number of documents we take that number as the input to our SubAssembly:

```public class TfIdfIndexSubAssmbly extends SubAssembly { private Integer totalNumberOfDocuments; public TfIdfIndexSubAssmbly(Pipe pipe, Integer totalNumberOfDocuments) { this.totalNumberOfDocuments = totalNumberOfDocuments; // do something with pipe setTails(pipe); // must register all assembly tails } }```

## Gathering Variables

To compute our final tf-idf score, we first need to compute the intermediate variables.

### total_terms_in_document

Given our input is (document_id, term, term_count_in_document) then we already have the first variable ni,j. We can now calculate the total number of terms in each document:

```// input: (document_id, term, term_count_in_document) // emits: (document_id, term, term_count_in_document, total_terms_in_document) pipe = new GroupBy(pipe, new Fields("document_id")); pipe = new Every(pipe, new BufferedSum(new Fields("total_terms_in_document"), new Fields("term_count_in_document"), new Fields("document_id", "term", "term_count_in_document")), Fields.SWAP);```

Remember that BufferedSum takes three arguments:

• The name of the Field to output
• The name of the Field to sum
• The other Fields to “pull through” the operation

So here we are grouping by document_id, and summing term_count_in_document for each group and placing the value into the field total_terms_in_document.

### number_of_documents_containing_term

Next we need to calculate the number of documents that contain each term. We’ve already grouped by document_id and term, therefore we know we only have one record for a given document_id/term pair.

Rather than counting the number of document_id/term pairs directly we are simply going to assign a count of 1 to each record and then sum that value. This allows us to reuse the code we’ve written for BufferedSum.

```// calculate the number of documents containing each term // input: (document_id, term, term_count_in_document, total_terms_in_document) // emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term) pipe = new Each(pipe, new Insert(new Fields("term_in_doc"), 1), Fields.ALL); // we're going to sum these, easier than creating BufferedCount pipe = new GroupBy(pipe, new Fields("term")); pipe = new Every(pipe, new BufferedSum(new Fields("number_of_documents_containing_term"), new Fields("term_in_doc"), new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "term_in_doc")), Fields.SWAP);```

Here we group on term and for every term group we calculate the number of documents that contain that term. Note that if you have a very large corpus some groups may become memory constrained as very common words such as “the” have groups containing nearly the entire corpus (it would be a good idea to remove stop-words during pre-processing).

After we’ve calculated the value for number_of_documents_containing_term we don’t need the term_in_doc field any longer. Using Cascading’s Identity operation we can reorder and discard fields.

``` pipe = new Each(pipe, // reorder and rm fields new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "number_of_documents_containing_term"), new Identity());```

### total_documents

Next we do a hard insert of the number of documents. Again, you can calculate this value with a relatively simple MapReduce job (e.g. use a counter), but here probably not the best place to do it. Because you have to count every document in the corpus, it would be better to calculate the number of documents when you are generating the input documents file.

```int D = this.totalNumberOfDocuments; pipe = new Each(pipe, new Insert(new Fields("total_documents"), D), Fields.ALL);```

The Insert operation simply inserts the number of documents into the tuple stream.

### Calculating Tf-idf with a Custom Operation

Now that we have all four values we can calculate tf-idf. We are going to create a custom operation to do this. We will perform this operation on each Tuple in the Pipe with the following:

```// calculate tf * idf // input: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents) // emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents, tf, idf, tfidf) pipe = new Each(pipe, new TfIdfOperation(), Fields.ALL);```

To create an operation in Cascading you simply subclass BaseOperation. For example:

```private static class MyOperation extends BaseOperation implements Function { public MyOperation() { super(new Fields("out_field_1", "out_field_2")); } public void operate(FlowProcess flowProcess, FunctionCall functionCall) { TupleEntry inputTuple = functionCall.getArguments(); // take values form inputTuple // transform them to make outputTuple Tuple outputTuple = new Tuple("a value 1", "a value 2"); functionCall.getOutputCollector().add(outputTuple); } }```

Here are the key things you need to do to create a Cascading operation:

• subclass BaseOperation and implement Function (there are other types of operations)
• call super and declare the names of the Fields this operation will be emitting. See BaseOperation for details
• functionCall.getArguments() returns a TupleEntry containing the input Tuple and input Fields.
• functionCall.getOutputCollector() is the OutputCollector you can use to emit Tuples from this operation.
• call outputCollector.add() to emit a Tuple. You can emit 0..n Tuples.

For our tf-idf operation we want to emit three Fields: tf, idf, and tfidf. We do this with the following code.

```// note that we're using a private nested class. This is not required. private static class TfIdfOperation extends BaseOperation implements Function { public TfIdfOperation() { super(new Fields("tf", "idf", "tfidf")); } public void operate(FlowProcess flowProcess, FunctionCall functionCall) { TupleEntry arguments = functionCall.getArguments(); // tf Double termCount = arguments.getDouble("term_count_in_document"); Double totalTerms = arguments.getDouble("total_terms_in_document"); BigDecimal tf = new BigDecimal(termCount / totalTerms); // idf Double totalDocuments = arguments.getDouble("total_documents"); Double td = arguments.getDouble("number_of_documents_containing_term"); BigDecimal idf = new BigDecimal(Math.log(totalDocuments / (1 + td))); // tfidf BigDecimal tfidf = tf.multiply(idf); functionCall.getOutputCollector().add( new Tuple( tf.toPlainString(), idf.toPlainString(), tfidf.toPlainString() )); } }```

When we convert, say, a Double to a string then we often get an exponent (e.g. 0.02948E7). The exponent can be cumbersome to work with so we use BigDecimal to convert the number into a string without an exponent using toPlainString().

## Building the Index

Now that we have the term, document_id, and tfidf score we can build our index. First we strip out the unnecessary fields and move the term to the front.

```// input: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents, tf, idf, tfidf) // emit: (term, document_id, tfidf) pipe = new Each(pipe, // reorder and rm some fields new Fields("term", "document_id", "tfidf"), new Identity());```

In our last step we want to build a single row that tells us “given a term, what documents are most relevant to it”. We format this list of (document_id, score) pairs as a JSON hash (JSONObject). (Formatting our records in this way is called using “stripes”.)

```pipe = new GroupBy(pipe, new Fields("term")); // "stripe" our group e.g.: // input: (term, // (document_id_1, tfidf_1), // (document_id_2, tfidf_2), // ...) // emit: (term, {document_id_1:tfidf_1, document_id_2:tfidf_2}) pipe = new Every(pipe, new Fields("document_id", "tfidf"), new JSONTupleAggregator(new Fields("scores"), "JSONObject"), new Fields("term", "scores"));```

JSONTupleAggregator is an operation that can be found in the cascading.json project. It takes a group of tuples and emits them as either a JSONArray (nested list) or JSONObject (hash).

## Using the Index

To use the index, simply perform a CoGroup on the term in your right-hand-side to the term in our index. See Cascading’s documentation on CoGroup for more information.

## Full Code Listing

```package com.xcombinator.cascading.pipes; import com.xcombinator.cascading.operations.*; import com.xcombinator.cascading.operations.buffers.*; import cascading.tuple.Fields; import cascading.pipe.*; import cascading.operation.regex.RegexSplitter; import cascading.operation.Identity; import cascading.operation.text.DateParser; import cascading.operation.Insert; import cascading.flow.FlowProcess; import cascading.tuple.TupleEntry; import cascading.operation.BaseOperation; import cascading.operation.Function; import cascading.operation.FunctionCall; import cascading.pipe.Each; import cascading.pipe.Every; import cascading.pipe.GroupBy; import cascading.tuple.Tuple; import cascading.operation.Debug; import cascading.json.operation.aggregator.*; import java.math.BigDecimal; /** * * The goal of this SubAssembly is to create an index that can be used to find * the most relevant document given a term. * Required: you need to input the number of total documents. * Input: list of (document_id, term, count). e.g. : * * (document_id, term, 2) * (document_id, term2, 1) * (document_id2, term2, 3) * # etc * * where `count` is the number of times that term appears in that document * * * Emits: list of (term: {document_id_1:score_1,document_id_2,score_2...}) * * Note that the tuple emitted is all String representation of the decimal * numbers. This is to allow easy (and correct) parsing if you write to a file * after this. * * It is assumed you've already done any normalization of the terms such as stemming etc. */ public class TfIdfIndexSubAssmbly extends SubAssembly { private static class TfIdfOperation extends BaseOperation implements Function { public TfIdfOperation() { super(new Fields("tf", "idf", "tfidf")); } public void operate(FlowProcess flowProcess, FunctionCall functionCall) { TupleEntry arguments = functionCall.getArguments(); // tf Double termCount = arguments.getDouble("term_count_in_document"); Double totalTerms = arguments.getDouble("total_terms_in_document"); BigDecimal tf = new BigDecimal(termCount / totalTerms); // idf Double totalDocuments = arguments.getDouble("total_documents"); Double td = arguments.getDouble("number_of_documents_containing_term"); BigDecimal idf = new BigDecimal(Math.log(totalDocuments / (1 + td))); // tfidf BigDecimal tfidf = tf.multiply(idf); functionCall.getOutputCollector().add( new Tuple( tf.toPlainString(), idf.toPlainString(), tfidf.toPlainString() )); } } private Integer totalNumberOfDocuments; public TfIdfIndexSubAssmbly(Pipe pipe, Integer totalNumberOfDocuments) { this.totalNumberOfDocuments = totalNumberOfDocuments; // calculate the total terms in each document. note that the input set is // smaller because we've already counted the occurrence of each term in // each document // input: (document_id, term, term_count_in_document) // emits: (document_id, term, term_count_in_document, total_terms_in_document) // pipe = new Each(pipe, new Debug(true)); pipe = new GroupBy(pipe, new Fields("document_id")); pipe = new Every(pipe, new BufferedSum(new Fields("total_terms_in_document"), new Fields("term_count_in_document"), new Fields("document_id", "term", "term_count_in_document")), Fields.SWAP); // calculate the number of documents containing each term // input: (document_id, term, term_count_in_document, total_terms_in_document) // emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term) pipe = new Each(pipe, new Insert(new Fields("term_in_doc"), 1), Fields.ALL); // we're going to sum these, easier than creating BufferedCount pipe = new GroupBy(pipe, new Fields("term")); pipe = new Every(pipe, new BufferedSum(new Fields("number_of_documents_containing_term"), new Fields("term_in_doc"), new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "term_in_doc")), Fields.SWAP); pipe = new Each(pipe, // reorder and rm some fields new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "number_of_documents_containing_term"), new Identity()); // here we do a hard-insert of the number of documents. again, you // could/should calculate this with MR, but this is not the place. It // would be better to calculate the number of documents when you are // generating the input documents file. int D = this.totalNumberOfDocuments; pipe = new Each(pipe, new Insert(new Fields("total_documents"), D), Fields.ALL); // now calculate tf * idf // input: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents) // emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents, tf, idf, tfidf) pipe = new Each(pipe, new TfIdfOperation(), Fields.ALL); pipe = new Each(pipe, // reorder and rm some fields new Fields("term", "document_id", "tfidf"), new Identity()); pipe = new GroupBy(pipe, new Fields("term")); // "stripe" our group e.g.: // input: (term, // (document_id_1, tfidf_1), // (document_id_2, tfidf_2), // ...) // emit: (term, {document_id_1:tfidf_1, document_id_2:tfidf_2}) pipe = new Every(pipe, new Fields("document_id", "tfidf"), new JSONTupleAggregator(new Fields("scores"), "JSONObject"), new Fields("term", "scores")); // must register all assembly tails setTails(pipe); } }```