cascading-simhash a library to cluster by minhashes in Hadoop

simhashing

Say you have a large corpus of web documents and you want to group them together by some notion of “similarity”. For instance, we may want to detect plagiarism or find content that appears on multiple pages of a site.

In this scenario, it’s impractical to do a pairwise comparison of all documents. Fortunately, we can use simhashing.

Broadly speaking, simhashing is a algorithm that calculates a “cluster id” (the minimum hash, or minhash) from the content. Because the minhash for an item is calculated independently of the other items in the set, minhashing is an ideal candidate for MapReduce.

Ryan Moulton has written a wonderful article on Simhashing. I’m not going to repeat his content here, so if you’re unfamiliar with simhashing I encourage you to go and read his article first.

In his article, Ryan sketches the proof that the probability that any two sets (in this case, documents) share the same minhash is equal to their Jaccard similarity coefficient. This is a really neat result because we are able to get the Jaccard index without having to actually compare the intersection of the two sets directly.

cascading-simhash

I’ve created a library for calculating simhashes in Hadoop. It’s written in Clojure and Java and uses Casacalog and Cascading.

To use it, you 1) input tuples consisting of a (document_id, body) and 2) define how to tokenize your body. The job emits tuples of the form (minhash, document_id, body). You can then use minhash as the key for your next phase. (All records that share a minhash are potential duplicates.)

The library can be called from either Clojure or Java. Additionally, Simhash returns a Flow so you can use it in your Cascade if you want to make it part of a bigger pipeline.

A Java example

Here’s a quick example on how to use the library from Java:

/**
 * Simple Simhash - an example of how to use Simhash
 *
 * To run this example:
 *   lein uberjar
 *   lein classpath > classpath
 *   java -cp `cat classpath`:build/cascading-simhash-1.0.0-SNAPSHOT-standalone.jar simhash.examples.SimpleSimhash "test-resources/test-documents.txt"
 **/
public class SimpleSimhash {
  private static final Logger LOG = Logger.getLogger( SimpleSimhash.class );
 
  /**
   * Create a tokenizer that is a subclass of clojure.lang.AFn and
   * implements invoke(Object body)
   **/
  public static class Tokenizer extends AFn {
 
    /**
     * Your tokenization logic goes here
     *
     * @param String body
     * @return something seq-able
     */
    public Object invoke(Object body) throws Exception {
      String b = (String)body;
      return b.split(" ");
    }
  }
 
  public static void main( String[] args ) {
    Tap inputTap = new Hfs( new TextDelimited( 
                                new Fields("docid", "body"), "\t" ),
                            args[0] );
    Tap outputTap = new StdoutTap();
 
    // create the flow
    Flow simhashFlow = Simhash.simhash(inputTap, outputTap, 
                                       2, // combine n-th lowest minhashes (e.g. 2) 
                                       SimpleSimhash.Tokenizer.class);
    simhashFlow.complete(); // or add to your Cascade, etc
  }
}

Notice a few things here:

  • We’re inputting a tap of two fields: (docid, body)
  • The 2 parameter is the number of minhashes to combine. In this case, we will combine the 2 lowest hashes to create one minhash. This parameter controls the overlap required for a match. In this case, the two sets much share the same 2 minhashes in order to match.
  • The Tokenizer is a subclass of clojure.lang.AFn. Override the invoke(Object) method and you will be passed the body the current record. In this case, we’re tokenizing by doing a simple String split.

If you’ve checked out the source you can run it like this:

lein uberjar
lein classpath > classpath
java -cp `cat classpath`:build/cascading-simhash-1.0.0-SNAPSHOT-standalone.jar simhash.examples.SimpleSimhash "test-resources/test-documents.txt"

Given:

# test-resources/test-documents.txt
# docid \t body
DocA	my dog has fleas
DocB	my dog has fleas
DocC	my dog has hair
DocD	see spot run
DocE	We hold these truths

We get:

RESULTS
-----------------------
23fd68296bc65391799c8c441faf4403c729256f	DocE	We hold these truths
402183e1cbc52e7c87eb230c281f35e4b27c2a39	DocD	see spot run
49c31c1459a7603bd5680d11285a5716c4ba3903	DocA	my dog has fleas
49c31c1459a7603bd5680d11285a5716c4ba3903	DocB	my dog has fleas
58e5a2035461323a37102e22273c9b25cbb9df61	DocC	my dog has hair
-----------------------

A Clojure example

Similarly, here’s how to run the library from Clojure. This time we use bi-grams as the tokens.

(ns simhash.examples.bigrams
  (:use 
   [simhash core util]
   [cascalog api testing])
  (:require 
   [simhash [taps :as t] [ops :as ops]]
   [clojure.contrib.str-utils :as stu])
  (:gen-class))
 
(defn my-source [path]
  (<- [?docid ?body]
      ((hfs-textline path) ?line)
      (ops/re-split-op [#"\t" 2] ?line :> ?docid ?body)
      (:distinct false)))
 
(defn tokenize 
  "tokenize into bi-grams (sliding window)"
  [body]
  (map
   (fn [tokens] (stu/str-join " " tokens))
   (partition 2 1 (stu/re-split #"\s+" body))))
 
(defn -main [& args]
  (?- (stdout) 
      (simhash-q (my-source (first args))
                 2 ;; number of minhashes
                 tokenize)))

A few things to point out about the Clojure example:

  • simhash-q is just a Cascalog query. Unlike the Java example (which required a Tap as the input) simhash-q can accept any other Cascalog query as the input.
  • You must use gen-class on the namespace that holds your tokenize function. This is because Cascading will serialize your Flow and it has a hard time with functions generated at run-time. Generally speaking, if the tokenize function isn’t aot compiled into a class you’re going to run into problems.

The project also includes a tokenizer for extracting text from HTML documents. For examples see tokenizers.html_text.clj for an example on how to write a tokenizer in Clojure. See HtmlSimhash.java for a Java example on how to use it.

Summary

Simhashing in MapReduce is a quick way to find clusters in a huge amount of data. By using Cascading and Cascalog we’re able to work with MapReduce jobs at the level of functions rather than individual map-reduce phases.

Have any data you need clustered? Try cascading-simhash and let me know how it goes!

Learn more about big data by following me on twitter.

You can get the jars via clojars:
leiningen:

  [cascading-simhash "1.0.0-SNAPSHOT"]

maven

<dependency>
  <groupId>cascading-simhash</groupId>
  <artifactId>cascading-simhash</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

View the source on github.

References

  1. http://knol.google.com/k/simple-simhashing
  2. http://en.wikipedia.org/wiki/Jaccard_index
  3. http://en.wikipedia.org/wiki/MinHash
Share:
  • del.icio.us
  • Reddit
  • Technorati
  • Twitter
  • Facebook
  • Google Bookmarks
  • HackerNews
  • PDF
  • RSS
This entry was posted in big-data, programming. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.