Custom Hive UDFs in Clojure

Introduction

We process all of our web-crawl data in Hadoop. If I’m writing jobs that will only be run by my team, then Cascalog is my tool of choice. But unfortunately, not everyone is going to learn Cascalog (much less Cascading or Clojure). However, many people know a little SQL and the best tool for them to use data in Hadoop is Hive.

Hive is great for straightforward, ad-hoc queries and it makes Hadoop accessible for SQL-minded folks who may not be programmers.

Hive’s functionality can be extended by writing User Defined Functions (UDFs). By writing custom UDFs you can create little mappers and reducers that can be easily stuffed into queries.

This article was written with Hive 0.5.0. YMMV.

UDFs – 1 to 1

Let’s begin with the simplest case, lower-casing a string:

  (ns smoker.udf.MyLowerCase
    (:import [org.apache.hadoop.hive.ql.exec UDF])
    (:import [org.apache.hadoop.io Text])
    (:require [clojure.contrib.str-utils2 :as su])
    (:gen-class
     :name smoker.udf.MyLowerCase
     :extends org.apache.hadoop.hive.ql.exec.UDF
     :methods [[evaluate [org.apache.hadoop.io.Text] org.apache.hadoop.io.Text]]))
 
  (defn #^Text -evaluate 
    "Lower-case the text"
    [this #^Text s]
    (when s
      (Text. (su/lower-case (.toString s)))))

Here we use gen-class to subclass exec.UDF. We use gen-class to generate a .class that can be called from Java.

We can run this query like so:

  # make the jar
  lein compile
  lein uberjar   # include dependencies for Hive/Hadoop
 
  # tell hive about your jars
  hive --auxpath ./build
  add jar /home/nmurray/hive-jars/smoker-standalone.jar;
  list jars; # verfiy it is there
 
  # create your operations
  create temporary function my_lower as 'smoker.udf.MyLowerCase';
 
  # given:  a_table
  # format: id,sentence
  1,My dog has fleas
  2,My cat Mr. Mittens has fleas
 
  SELECT my_lower(sentence) from a_table;
 
  # returns:
  my dog has fleas
  my cat mr. mittens has fleas

Easy!

UDTFs – 1 to Many

One problem with an exec.UDF is that is that you can only return one record. Often, we will want to take one record and transform it into multiple records. For this we can use a GenericUDTF.

A Generic User-defined Table Generating Function (GenericUDTF) generates a variable number of output rows for a single input row.

For instance, say we want to take our sentences and generate a count for each word:

 
  # given:  a_table
  # format: id,sentence
  1,My dog has fleas
  2,My cat Mr. Mittens has fleas
 
  SELECT tokenize(sentence) AS (word, count) FROM a_table;
 
  # returns:
  my 1
  dog 1
  has 1
  fleas 1
  my 1
  cat 1
  mr. 1
  mittens 1
  has 1
  fleas 1

(From there you could easily add a GROUP BY clause and find the global count for each word.)

To do this we subclass a UDTF. Subclassing a UDTF in Clojure is more involved than with a UDF. However, with smoker I’ve created some functions that make it much easier to create custom UDTFs.

  (ns smoker.udf.MyTokenize
    (:require [smoker.udtf.gen :as gen])
    (:import [org.apache.hadoop.hive.serde2.objectinspector.primitive 
              PrimitiveObjectInspectorFactory])
    (:require [clojure.contrib.str-utils2 :as su]))
 
  (gen/gen-udtf)
  (gen/gen-wrapper-methods 
   [PrimitiveObjectInspectorFactory/javaStringObjectInspector
    PrimitiveObjectInspectorFactory/javaIntObjectInspector])
 
  (defn -operate [this line]
    (map 
     (fn [token] [token (Integer/valueOf 1)]) 
     (su/split line #"\s+")))

The function gen-udtf creates the gen-class directive needed to compile this package into a java .class file. gen-wrapper-methods lets you specify what types your function will be emitting per tuple.

Use the PrimitiveObjectInspectorFactory to specify the types you’d plan on returning. Example:

(gen/gen-wrapper-methods 
 [PrimitiveObjectInspectorFactory/javaStringObjectInspector
  PrimitiveObjectInspectorFactory/javaIntObjectInspector])

Will allow you to return a tuple of (String, int)

Now we write an -operate method that accepts [this & args] and returns a seq of tuples that match the specified types.

UDTFs + LATERAL VIEW

One problem with UDTFs (in Hive 0.5.0) is that you can only have one UDTF per query and the UDTF is the only thing you can SELECT in that query. This is a problem. Let’s say that we want to get the count of each word within a particular document.

This simple approach doesn’t work:

  # wrong
  SELECT id, tokenize(sentence) AS (word, count) FROM a_table;
  # Hive ERROR...

Instead, we use must use the LATERAL VIEW syntax 1:

  SELECT tokenize(sentence) AS (word, count) FROM a_table;
 
  SELECT id, word, count
  FROM a_table 
  LATERAL VIEW tokenize(sentence) tokenizedTable as word, count 
 
  # returns:
  # id word count
  1 my 1
  1 dog 1
  1 has 1
  1 fleas 1
  2 my 1
  2 cat 1
  2 mr. 1
  2 mittens 1
  2 has 1
  2 fleas 1

Now you can easily add a GROUP BY id,word clause and SUM the count to get a count of the words within each document.

Summary

Writing Hive UDFs are a powerful way to extend the functionality of Hive. Writing Hive UDFs in Clojure using smoker is fast and easy.

If you’re interested in how the Hive boilerplate code is generated, checkout the code in gen.clj or you can checkout the smoker project on github .

You can follow me on twitter here.

Footnotes:

1 In Hive 0.5.0 if you have a WHERE clause with your LATERAL VIEW you may get a FAILED: Unknown exception: null error. The temporary fix is to put set hive.optimize.ppd=false; before your query. See: LateralView in HiveQL and HIVE-1056.

Share:
  • del.icio.us
  • Reddit
  • Technorati
  • Twitter
  • Facebook
  • Google Bookmarks
  • HackerNews
  • PDF
  • RSS
This entry was posted in big-data, crawling. Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.