Getting Started With Scalding and Amazon Elastic MapReduce

Here at Sharethrough, we’re big consumers of Twitter’s Scalding - a Scala-based DSL providing a functional programming abstraction on top of Cascading/Hadoop. It’s a terrific way to think through workflows and with Amazon’s Elastic MapReduce service, it couldn’t be easier to get started (well it could be, that’s where we hope this comes in!). Rather than throw another WordCount at you, we’re going to go with a job whose results our team actually used.

The Challenge

Let’s say you’ve got a publisher like IGN, owner of gamespy.com and the vaultnetwork.com sites. Across IGN’s sites (i.e. their network), we’re seeing millions of impressions. We want to know how many impressions occured on each site in the network. Click-stream data is both vast and unclean, so of course we have exceptions.

  • Sometimes the site is unknown. In those cases, tally that up under www.unknownLocation.com.
  • Sometimes the site URL contains illegal characters. In those cases, tally that up under www.badLocation.com.
  • Some networks have many sites, others have dozens, and a few even have hundreds. Don’t worry about showing sites with less than some parameterized number of impressions. We’ll call that the “impression floor”.

Here’s an example of our source data, for the full list have a look at the sample data in the accompanying scalding-emr-tutorial repo.

1
{"body": "73b56afa-1298-11e3-a985-12313d08da2a 10.32.101.252 - [31/Aug/2013:23:52:53 +0000] \"GET /impression?pid=FAKE_PLACEMENT_ID&ploc=http%3A%2F%2Fwww.allaboutbalance.com%2Fand-the-other-months-are%2F&pref= HTTP/1.1\" 200 145 \"INTENTIONALLY_BLANK_HTTP_REFERER\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0\" \"-\" \"76.110.105.162\" \"-\""}

We’re interested in three components:

  • /impression - The path, where impression signifies that our content was displayed. We need to only count requests that match this path.
  • pid=FAKE_PLACEMENT_ID - Placement FAKE_PLACEMENT_ID (a parameter we’ll specify at runtime) is the placement we’re concerned with. We’ll need to filter out all other placements.
  • ploc=http%3A%2F%2Fwww.refe... - The full URL of the page hosting this impression. Note that this is URI-escaped.

Now that we have a little more clarity on input data and exceptions, let’s rephrase the problem: For a given pid, find all /impression requests and count the number of times those impressions have similar ploc hostnames. In the final report, only include hostnames with an impression count greater than I.

Here’s an example of the form of output we expect.

1
2
3
4
www.allaboutbalance.com 3
www.sharethrough.com 2
www.unknownLocation.com 3
www.badLocation.com 3

Your First Scalding Job

The tactics of getting started with Scalding are covered in the accompanying scalding-emr-tutorial repo. There you’ll find a completely packaged example, that can be run on EMR with no code changes. Let’s step through the logic of the sample Scalding job contained therein.

Job Structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.sharethrough.emr_tutorial

import com.twitter.scalding._
import java.net.{URISyntaxException, URI, URLDecoder}

class LocationCountingJob(args: Args) extends Job(args) {
  TextLine(args("input"))
    .filter('line) { ... }
    .map('line -> 'ploc) { ... }
    .map('ploc -> 'hostname) { ... }
    .groupBy('hostname) { _.size }
    .filter('hostname, 'size) {
    .write(Tsv(args("output")))
}

Our job source is via raw text lines and we output a TSV. Note the use of the input argument, which we supply to the Jar file upon execution.

Even though Scalding supports JsonLine as a source, due to some dirty input JSON, it bombs out during parsing so I’ve decided to use TextLine. The meaning (and variations) of the various operations are well detailed in the Fields-based API Reference, which we’ll go into next.

Step 1 - Remove All Unnecessary Data

1
2
3
4
.filter('line) {
  line: String =>
    line.matches(".*GET /impression\\S*pid="+args("placementId")+".*")
}

The 'line field is made available to us by Scalding as the output of the TextLine source. The filter operation does not add add any new fields, outputting only lines for which the provided functions return true. Here we’re throwing out the majority of our data by including only /impression requests where pid=placementId.

Step 2 - Extract Page Location From Request

1
2
3
4
5
6
7
8
.map('line -> 'ploc) {
  line: String =>
    val pattern = """GET /impression\S*ploc=(\S*)&""".r
    pattern.findFirstMatchIn(body) match {
      case Some(matchData) => matchData.subgroups(0)
      case None => "http://www.emptyOrNoLocation.com/"
    }
}

On to the exciting stuff! A couple of cool things happening here:

  • Let’s add a ploc field to our rows by transforming the full line into only the value of the ploc parameter, where ploc contains the page the impression occurred on.
  • If the ploc is missing, we want to keep track of that. Why use a URL though? Later when we rely on new URI(...).getHost(), it will nicely flow through the rest of our data.

Step 3 - Extract Hostname From Page Location

1
2
3
4
5
6
7
8
9
.map('ploc -> 'hostname) { ploc : String =>
  ploc : String =>
    try {
      val hostname = Option(new URI(URLDecoder.decode(ploc, "UTF-8")).getHost)
      hostname.getOrElse("www.emptyOrNoLocation.com")
    } catch {
      case _: URISyntaxException => "www.badLocation.com"
    }
}

Another map step here, creating a hostname field, this time complete with exceptions, exceptions, exceptions (sung to the tune of…).

  • If the hostname extracts, let’s use it.
  • If it’s blank, it was never there to begin with. Again, let’s track it with a valid hostname so it continues to flow.
  • If it’s busted (re: URISyntaxException), let’s track that as well.

Step 4 - Grouping by Hostname, Summing

1
.groupBy('hostname) { _.size }

Finally we’ll group by the hostname which fits one of three cases:

  • The hostname is available and valid - the actual hostname will exist.
  • www.emptyOrNoLocation.com - when the ploc field was missing or contained no information.
  • www.badLocation.com - when the ploc field was not a valid URI.

Step 5 - Beating the Impression Floor

1
2
3
4
5
6
7
8
.filter('hostname, 'size) { fields: (String, Int) =>
  val (hostname, size) = fields
  hostname match {
    case "www.emptyOrNoLocation.com" => true
    case "www.badLocation.com" => true
    case _ => size >= args("impressionFloor").toInt
  }
}

Finally, let’s apply our “impression floor” filtering. This time, we’re including multiple fields and need to extract them from the tuple Scalding provides. The logic here says only to apply our “impression floor” restriction to actual valid hostnames and include the two exceptional hostnames we injected into the flow.

Up Next

From here, I would suggest heading over to scalding-emr-tutorial, building and executing this sample job on EMR. It’s good, clean fun and you’ll become addicted to the power and expressiveness of Scalding.

Now that you’re using Scalding, how do you responsibly deploy this technology? First things first - testing. You’re going to want to refactor or otherwise iterate on this job adding new business logic. You might also want to refactor portions into separate classes so they can shared between jobs. That’s coming up in Part 2.

Acknowledgements

A huge “Thanks!” to Oscar Boykin and the rest of the Scalding team for their support of Scalding on the Cascading group.