## Introduction

The tf-idf weight (term frequency-inverse document frequency) is a weight often used in information retrieval and text mining. This weight is a statistical measure used to evaluate how important a word is to a document in a collection or corpus. [1]

To calculate tf-idf we need the following four values:

- The number of times a term appears in a given document (
`n`_{i,j})
- The total number of terms in a given document (
`sum k, n`_{k,j})
- The number of documents that contain a given term (
`|{d : t`_{i} E d}|)
- The total number of documents in the corpus (
`D`)

## Mathematical Details

We want to score the importance of term `t`_{i} in document `d`_{j}.

Term frequency is defined by:

Where `n`_{i,j} is the number of occurrences of term `t`_{i} in document `d`_{j}.

Inverse document frequency is defined by:

Where:

`D` is the total number of documents in the corpus and
`|{d : t`_{i} E d}| is the number of documents in which the term `t`_{i} appears.

Then:

Refer to [2] for more information on tf-idf.

## Operation Input

Last time we discussed the technique of taking a group of records, calculating a value from that group and emitting each record with the calculated value attached. We called this operation a BufferedSum. We’re going to build on our previous work and create a reusable component (called a SubAssembly) for calculating tf-idf using Cascading on Hadoop.

To make our tf-idf operation we need to decide what the input arguments will be. Last time, we used an input corpus of the format `(document_id, body)` and emitted `(document_id, term, term_count_in_document)` for all terms in each document. This last tuple will be the input format to our tf-idf operation.

## Creating a SubAssembly

The general format of a SubAssembly in Cascading is as follows:

public class MySubAssembly extends SubAssembly {
public MySubAssembly(Pipe pipe) {
// do something with `pipe`
setTails(pipe); // must register all assembly tails
}
}

In our operation we are assuming that the total number of documents in the corpus is known or could be found with a simple MapReduce job. Given that we have the total number of documents we take that number as the input to our SubAssembly:

public class TfIdfIndexSubAssmbly extends SubAssembly {
private Integer totalNumberOfDocuments;
public TfIdfIndexSubAssmbly(Pipe pipe, Integer totalNumberOfDocuments) {
this.totalNumberOfDocuments = totalNumberOfDocuments;
// do something with pipe
setTails(pipe); // must register all assembly tails
}
}

## Gathering Variables

To compute our final tf-idf score, we first need to compute the intermediate variables.

`total_terms_in_document`

Given our input is `(document_id, term, term_count_in_document)` then we already have the first variable `n`_{i,j}. We can now calculate the total number of terms in each document:

// input: (document_id, term, term_count_in_document)
// emits: (document_id, term, term_count_in_document, total_terms_in_document)
pipe = new GroupBy(pipe, new Fields("document_id"));
pipe = new Every(pipe,
new BufferedSum(new Fields("total_terms_in_document"),
new Fields("term_count_in_document"),
new Fields("document_id", "term", "term_count_in_document")),
Fields.SWAP);

Remember that `BufferedSum` takes three arguments:

- The name of the
`Field` to output
- The name of the
`Field` to sum
- The other
`Fields` to “pull through” the operation

So here we are grouping by `document_id`, and summing `term_count_in_document` for each group and placing the value into the field `total_terms_in_document`.

`number_of_documents_containing_term`

Next we need to calculate the number of documents that contain each term. We’ve already grouped by `document_id` and `term`, therefore we know we only have one record for a given `document_id`/`term` pair.

Rather than counting the number of `document_id`/`term` pairs directly we are simply going to assign a count of 1 to each record and then sum that value. This allows us to reuse the code we’ve written for `BufferedSum`.

// calculate the number of documents containing each term
// input: (document_id, term, term_count_in_document, total_terms_in_document)
// emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term)
pipe = new Each(pipe, new Insert(new Fields("term_in_doc"), 1), Fields.ALL); // we're going to sum these, easier than creating BufferedCount
pipe = new GroupBy(pipe, new Fields("term"));
pipe = new Every(pipe,
new BufferedSum(new Fields("number_of_documents_containing_term"),
new Fields("term_in_doc"),
new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "term_in_doc")),
Fields.SWAP);

Here we group on `term` and for every term group we calculate the number of documents that contain that term. Note that if you have a very large corpus some groups may become memory constrained as very common words such as “the” have groups containing nearly the entire corpus (it would be a good idea to remove stop-words during pre-processing).

After we’ve calculated the value for `number_of_documents_containing_term` we don’t need the `term_in_doc` field any longer. Using Cascading’s `Identity` operation we can reorder and discard fields.

pipe = new Each(pipe, // reorder and rm fields
new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "number_of_documents_containing_term"),
new Identity());

`total_documents`

Next we do a hard insert of the number of documents. Again, you can calculate this value with a relatively simple MapReduce job (e.g. use a counter), but here probably not the best place to do it. Because you have to count every document in the corpus, it would be better to calculate the number of documents when you are generating the input documents file.

int D = this.totalNumberOfDocuments;
pipe = new Each(pipe, new Insert(new Fields("total_documents"), D), Fields.ALL);

The `Insert` operation simply inserts the number of documents into the tuple stream.

### Calculating Tf-idf with a Custom Operation

Now that we have all four values we can calculate tf-idf. We are going to create a custom operation to do this. We will perform this operation on each `Tuple` in the `Pipe` with the following:

// calculate tf * idf
// input: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents)
// emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents, tf, idf, tfidf)
pipe = new Each(pipe, new TfIdfOperation(), Fields.ALL);

To create an operation in Cascading you simply subclass `BaseOperation`. For example:

private static class MyOperation extends BaseOperation implements Function {
public MyOperation() {
super(new Fields("out_field_1", "out_field_2"));
}
public void operate(FlowProcess flowProcess, FunctionCall functionCall)
{
TupleEntry inputTuple = functionCall.getArguments();
// take values form inputTuple
// transform them to make outputTuple
Tuple outputTuple = new Tuple("a value 1", "a value 2");
functionCall.getOutputCollector().add(outputTuple);
}
}

Here are the key things you need to do to create a Cascading operation:

- subclass BaseOperation and implement
`Function` (there are other types of operations)
- call
`super` and declare the names of the Fields this operation will be emitting. See BaseOperation for details
`functionCall.getArguments()` returns a TupleEntry containing the input `Tuple` and input `Fields`.
`functionCall.getOutputCollector()` is the `OutputCollector` you can use to emit `Tuples` from this operation.
- call
`outputCollector.add()` to emit a `Tuple`. You can emit 0..n `Tuples`.

For our tf-idf operation we want to emit three Fields: `tf`, `idf`, and `tfidf`. We do this with the following code.

// note that we're using a private nested class. This is not required.
private static class TfIdfOperation extends BaseOperation implements Function {
public TfIdfOperation() { super(new Fields("tf", "idf", "tfidf")); }
public void operate(FlowProcess flowProcess, FunctionCall functionCall)
{
TupleEntry arguments = functionCall.getArguments();
// tf
Double termCount = arguments.getDouble("term_count_in_document");
Double totalTerms = arguments.getDouble("total_terms_in_document");
BigDecimal tf = new BigDecimal(termCount / totalTerms);
// idf
Double totalDocuments = arguments.getDouble("total_documents");
Double td = arguments.getDouble("number_of_documents_containing_term");
BigDecimal idf = new BigDecimal(Math.log(totalDocuments / (1 + td)));
// tfidf
BigDecimal tfidf = tf.multiply(idf);
functionCall.getOutputCollector().add(
new Tuple( tf.toPlainString(), idf.toPlainString(), tfidf.toPlainString() ));
}
}

When we convert, say, a `Double` to a string then we often get an exponent (e.g. `0.02948E7`). The exponent can be cumbersome to work with so we use `BigDecimal` to convert the number into a string without an exponent using `toPlainString()`.

## Building the Index

Now that we have the `term`, `document_id`, and `tfidf` score we can build our index. First we strip out the unnecessary fields and move the `term` to the front.

// input: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents, tf, idf, tfidf)
// emit: (term, document_id, tfidf)
pipe = new Each(pipe, // reorder and rm some fields
new Fields("term", "document_id", "tfidf"),
new Identity());

In our last step we want to build a single row that tells us “given a term, what documents are most relevant to it”. We format this list of `(document_id, score)` pairs as a JSON hash (`JSONObject`). (Formatting our records in this way is called using “stripes”.)

pipe = new GroupBy(pipe, new Fields("term"));
// "stripe" our group e.g.:
// input: (term,
// (document_id_1, tfidf_1),
// (document_id_2, tfidf_2),
// ...)
// emit: (term, {document_id_1:tfidf_1, document_id_2:tfidf_2})
pipe = new Every(pipe,
new Fields("document_id", "tfidf"),
new JSONTupleAggregator(new Fields("scores"), "JSONObject"),
new Fields("term", "scores"));

JSONTupleAggregator is an operation that can be found in the cascading.json project. It takes a group of tuples and emits them as either a `JSONArray` (nested list) or `JSONObject` (hash).

## Using the Index

To use the index, simply perform a `CoGroup` on the term in your right-hand-side to the term in our index. See Cascading’s documentation on CoGroup for more information.

## Full Code Listing

package com.xcombinator.cascading.pipes;
import com.xcombinator.cascading.operations.*;
import com.xcombinator.cascading.operations.buffers.*;
import cascading.tuple.Fields;
import cascading.pipe.*;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.Identity;
import cascading.operation.text.DateParser;
import cascading.operation.Insert;
import cascading.flow.FlowProcess;
import cascading.tuple.TupleEntry;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.tuple.Tuple;
import cascading.operation.Debug;
import cascading.json.operation.aggregator.*;
import java.math.BigDecimal;
/**
*
* The goal of this SubAssembly is to create an index that can be used to find
* the most relevant document given a term.
* Required: you need to input the number of total documents.
* Input: list of (document_id, term, count). e.g. :
*
* (document_id, term, 2)
* (document_id, term2, 1)
* (document_id2, term2, 3)
* # etc
*
* where `count` is the number of times that term appears in that document
*
*
* Emits: list of (term: {document_id_1:score_1,document_id_2,score_2...})
*
* Note that the tuple emitted is all String representation of the decimal
* numbers. This is to allow easy (and correct) parsing if you write to a file
* after this.
*
* It is assumed you've already done any normalization of the terms such as stemming etc.
*/
public class TfIdfIndexSubAssmbly extends SubAssembly {
private static class TfIdfOperation extends BaseOperation implements Function {
public TfIdfOperation() { super(new Fields("tf", "idf", "tfidf")); }
public void operate(FlowProcess flowProcess, FunctionCall functionCall)
{
TupleEntry arguments = functionCall.getArguments();
// tf
Double termCount = arguments.getDouble("term_count_in_document");
Double totalTerms = arguments.getDouble("total_terms_in_document");
BigDecimal tf = new BigDecimal(termCount / totalTerms);
// idf
Double totalDocuments = arguments.getDouble("total_documents");
Double td = arguments.getDouble("number_of_documents_containing_term");
BigDecimal idf = new BigDecimal(Math.log(totalDocuments / (1 + td)));
// tfidf
BigDecimal tfidf = tf.multiply(idf);
functionCall.getOutputCollector().add(
new Tuple( tf.toPlainString(), idf.toPlainString(), tfidf.toPlainString() ));
}
}
private Integer totalNumberOfDocuments;
public TfIdfIndexSubAssmbly(Pipe pipe, Integer totalNumberOfDocuments) {
this.totalNumberOfDocuments = totalNumberOfDocuments;
// calculate the total terms in each document. note that the input set is
// smaller because we've already counted the occurrence of each term in
// each document
// input: (document_id, term, term_count_in_document)
// emits: (document_id, term, term_count_in_document, total_terms_in_document)
// pipe = new Each(pipe, new Debug(true));
pipe = new GroupBy(pipe, new Fields("document_id"));
pipe = new Every(pipe,
new BufferedSum(new Fields("total_terms_in_document"),
new Fields("term_count_in_document"),
new Fields("document_id", "term", "term_count_in_document")),
Fields.SWAP);
// calculate the number of documents containing each term
// input: (document_id, term, term_count_in_document, total_terms_in_document)
// emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term)
pipe = new Each(pipe, new Insert(new Fields("term_in_doc"), 1), Fields.ALL); // we're going to sum these, easier than creating BufferedCount
pipe = new GroupBy(pipe, new Fields("term"));
pipe = new Every(pipe,
new BufferedSum(new Fields("number_of_documents_containing_term"),
new Fields("term_in_doc"),
new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "term_in_doc")),
Fields.SWAP);
pipe = new Each(pipe, // reorder and rm some fields
new Fields("document_id", "term", "term_count_in_document", "total_terms_in_document", "number_of_documents_containing_term"),
new Identity());
// here we do a hard-insert of the number of documents. again, you
// could/should calculate this with MR, but this is not the place. It
// would be better to calculate the number of documents when you are
// generating the input documents file.
int D = this.totalNumberOfDocuments;
pipe = new Each(pipe, new Insert(new Fields("total_documents"), D), Fields.ALL);
// now calculate tf * idf
// input: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents)
// emit: (document_id, term, term_count_in_document, total_terms_in_document, number_of_documents_containing_term, total_documents, tf, idf, tfidf)
pipe = new Each(pipe, new TfIdfOperation(), Fields.ALL);
pipe = new Each(pipe, // reorder and rm some fields
new Fields("term", "document_id", "tfidf"),
new Identity());
pipe = new GroupBy(pipe, new Fields("term"));
// "stripe" our group e.g.:
// input: (term,
// (document_id_1, tfidf_1),
// (document_id_2, tfidf_2),
// ...)
// emit: (term, {document_id_1:tfidf_1, document_id_2:tfidf_2})
pipe = new Every(pipe,
new Fields("document_id", "tfidf"),
new JSONTupleAggregator(new Fields("scores"), "JSONObject"),
new Fields("term", "scores"));
// must register all assembly tails
setTails(pipe);
}
}

## Cascading, TF-IDF, and BufferedSum (Part 2)

## Introduction

The tf-idf weight (term frequency-inverse document frequency) is a weight often used in information retrieval and text mining. This weight is a statistical measure used to evaluate how important a word is to a document in a collection or corpus. [1]

To calculate tf-idf we need the following four values:

n)_{i,j}sum k, n)_{k,j}|{d : t)_{i}E d}|D)## Mathematical Details

We want to score the importance of term

tin document_{i}d._{j}Term frequency is defined by:

Where

nis the number of occurrences of term_{i,j}tin document_{i}d._{j}Inverse document frequency is defined by:

Where:

Dis the total number of documents in the corpus and|{d : tis the number of documents in which the term_{i}E d}|tappears._{i}Then:

Refer to [2] for more information on tf-idf.

## Operation Input

Last time we discussed the technique of taking a group of records, calculating a value from that group and emitting each record with the calculated value attached. We called this operation a BufferedSum. We’re going to build on our previous work and create a reusable component (called a SubAssembly) for calculating tf-idf using Cascading on Hadoop.

To make our tf-idf operation we need to decide what the input arguments will be. Last time, we used an input corpus of the format

(document_id, body)and emitted(document_id, term, term_count_in_document)for all terms in each document. This last tuple will be the input format to our tf-idf operation.## Creating a SubAssembly

The general format of a SubAssembly in Cascading is as follows:

In our operation we are assuming that the total number of documents in the corpus is known or could be found with a simple MapReduce job. Given that we have the total number of documents we take that number as the input to our SubAssembly:

## Gathering Variables

To compute our final tf-idf score, we first need to compute the intermediate variables.

total_terms_in_documentGiven our input is

(document_id, term, term_count_in_document)then we already have the first variablen. We can now calculate the total number of terms in each document:_{i,j}Remember that

BufferedSumtakes three arguments:Fieldto outputFieldto sumFieldsto “pull through” the operationSo here we are grouping by

document_id, and summingterm_count_in_documentfor each group and placing the value into the fieldtotal_terms_in_document.number_of_documents_containing_termNext we need to calculate the number of documents that contain each term. We’ve already grouped by

document_idandterm, therefore we know we only have one record for a givendocument_id/termpair.Rather than counting the number of

document_id/termpairs directly we are simply going to assign a count of 1 to each record and then sum that value. This allows us to reuse the code we’ve written forBufferedSum.Here we group on

termand for every term group we calculate the number of documents that contain that term. Note that if you have a very large corpus some groups may become memory constrained as very common words such as “the” have groups containing nearly the entire corpus (it would be a good idea to remove stop-words during pre-processing).After we’ve calculated the value for

number_of_documents_containing_termwe don’t need theterm_in_docfield any longer. Using Cascading’sIdentityoperation we can reorder and discard fields.total_documentsNext we do a hard insert of the number of documents. Again, you can calculate this value with a relatively simple MapReduce job (e.g. use a counter), but here probably not the best place to do it. Because you have to count every document in the corpus, it would be better to calculate the number of documents when you are generating the input documents file.

The

Insertoperation simply inserts the number of documents into the tuple stream.## Calculating Tf-idf with a Custom Operation

Now that we have all four values we can calculate tf-idf. We are going to create a custom operation to do this. We will perform this operation on each

Tuplein thePipewith the following:To create an operation in Cascading you simply subclass

BaseOperation. For example:Here are the key things you need to do to create a Cascading operation:

Function(there are other types of operations)superand declare the names of the Fields this operation will be emitting. See BaseOperation for detailsfunctionCall.getArguments()returns a TupleEntry containing the inputTupleand inputFields.functionCall.getOutputCollector()is theOutputCollectoryou can use to emitTuplesfrom this operation.outputCollector.add()to emit aTuple. You can emit 0..nTuples.For our tf-idf operation we want to emit three Fields:

tf,idf, andtfidf. We do this with the following code.When we convert, say, a

Doubleto a string then we often get an exponent (e.g.0.02948E7). The exponent can be cumbersome to work with so we useBigDecimalto convert the number into a string without an exponent usingtoPlainString().## Building the Index

Now that we have the

term,document_id, andtfidfscore we can build our index. First we strip out the unnecessary fields and move thetermto the front.In our last step we want to build a single row that tells us “given a term, what documents are most relevant to it”. We format this list of

(document_id, score)pairs as a JSON hash (JSONObject). (Formatting our records in this way is called using “stripes”.)JSONTupleAggregator is an operation that can be found in the cascading.json project. It takes a group of tuples and emits them as either a

JSONArray(nested list) orJSONObject(hash).## Using the Index

To use the index, simply perform a

CoGroupon the term in your right-hand-side to the term in our index. See Cascading’s documentation on CoGroup for more information.## Full Code Listing

Share: