Saturday, April 2, 2011

MapReduce and Hive by example

In my last post, I was able to get MySQL and the Ruby-on-Rails environment setup. After updating my Rails configuration, I was able to fire up a working version of DataWrangling's TrendingTopics site.


Of course, the data is still two years old, so I need to delve further into how I would update the data. Here, Pete has some interesting stuff going on:
- use hadoop streaming to power python scripts that chunk through that fat weblog data
- kick off HiveQL script to load final output and create other temporary tables
- from Hive, join tables and prep latest daily data to ship off to MySQL
- wraps the status of what happens during the process in an email

Let's look at MapReduce and Hive in a bit more depth.

MapReduce
Using a hint from Sujit Pal's blog post, it was helpful to see what exactly Pete's mapper and reducer scripts do. These scripts are written in Python and are called by run_daily_timelines.sh and run_daily_trends.sh.

daily_timelines.py
run_daily_timelines.sh has two Hadoop Streaming jobs that run Pete's MapReduce code. The MR code is in daily_timelines.py. The first mapper code outputs this:
[sodo@computer python_streaming]$ zgrep '^en Oldsmobile' ~/Downloads/pagecounts-20090401-000001.gz | python daily_timelines.py mapper1 | sort | head
LongValueSum:Oldsmobile}20090419 65
LongValueSum:Oldsmobile_442}20090419 31
LongValueSum:Oldsmobile_66}20090419 1
LongValueSum:Oldsmobile_88}20090419 26
LongValueSum:Oldsmobile_98}20090419 8
LongValueSum:Oldsmobile_Achieva}20090419 5
LongValueSum:Oldsmobile_achieva_catalytic_converter}20090419 1
LongValueSum:Oldsmobile_achieva_fuel_tanks}20090419 1
LongValueSum:Oldsmobile_Alero_1999_2004_Center}20090419 1
LongValueSum:Oldsmobile_Alero}20090419 20


In daily_timelines.py, the reducer morphs the output of the mapper into this:
[sodo@computer python_streaming]$ zgrep '^en Oldsmobile' ~/Downloads/pagecounts-20090401-000001.gz | python daily_timelines.py mapper1 | sort | python daily_timelines.py reducer1 | head
Oldsmobile}20090419 65
Oldsmobile_442}20090419 31
Oldsmobile_66}20090419 1
Oldsmobile_88}20090419 26
Oldsmobile_98}20090419 8
Oldsmobile_Achieva}20090419 5
Oldsmobile_achieva_catalytic_converter}20090419 1
Oldsmobile_achieva_fuel_tanks}20090419 1
Oldsmobile_Alero_1999_2004_Center}20090419 1
Oldsmobile_Alero}20090419 20


A second mapper takes that output and changes it some more:
[sodo@computer python_streaming]$ zgrep '^en Oldsmobile' ~/Downloads/pagecounts-20090401-000001.gz | python daily_timelines.py mapper1 | sort | head -100 | python daily_timelines.py reducer1 | python daily_timelines.py mapper2
Oldsmobile 20090419 65
Oldsmobile_442 20090419 31
Oldsmobile_66 20090419 1
Oldsmobile_88 20090419 26
Oldsmobile_98 20090419 8
Oldsmobile_Achieva 20090419 5
Oldsmobile_achieva_catalytic_converter 20090419 1
Oldsmobile_achieva_fuel_tanks 20090419 1
Oldsmobile_Alero_1999_2004_Center 20090419 1


Finally, the second reducer code formats the timeline data, now ready to import into Hive:
[sodo@computer python_streaming]$ zgrep '^en Oldsmobile' ~/Downloads/pagecounts-20090401-000001.gz | python daily_timelines.py mapper1 | sort | python daily_timelines.py reducer1 | python daily_timelines.py mapper2 | python daily_timelines.py reducer2 1 | head
Oldsmobile [20090419] [65] 65 65.0
Oldsmobile_442 [20090419] [31] 31 31.0
Oldsmobile_66 [20090419] [1] 1 1.0
Oldsmobile_88 [20090419] [26] 26 26.0
Oldsmobile_98 [20090419] [8] 8 8.0
Oldsmobile_Achieva [20090419] [5] 5 5.0
Oldsmobile_achieva_catalytic_converter [20090419] [1] 1 1.0
Oldsmobile_achieva_fuel_tanks [20090419] [1] 1 1.0
Oldsmobile_Alero_1999_2004_Center [20090419] [1] 1 1.0
Oldsmobile_Alero [20090419] [20] 20 20.0


* It is here that I discovered a problem: the reducer script requires a minimum of 45 days of data. For testing purposes, I changed the following line in the script to lower the minimum to one day:
-reducer "daily_timelines.py reducer2 45" \

daily_trends.py
In like fashion, the run_daily_trends.sh script also utilizes a Hadoop Streaming job to calculate trend data against the Wikipedia weblogs. This MapReduce code is found in daily_trends.py. Herein, the mapper does a first pass at the data:
linux-z6tw:/home/sodo/trendingtopics/lib/scripts # zcat /home/sodo/Downloads/pagecounts-20110328-010000.gz | python ../python_streaming/daily_trends.py mapper | grep '^Oldsmobile' | head
Oldsmobile 20090419 64
Oldsmobile_"Rocket_V8"_engine 20090419 2
Oldsmobile_4-4-2 20090419 1
Oldsmobile_442 20090419 40
Oldsmobile_66 20090419 2
Oldsmobile_88 20090419 39
Oldsmobile_98 20090419 25
Oldsmobile_Achieva 20090419 2
Oldsmobile_Aerotech 20090419 1
Oldsmobile_Aerotech_III 20090419 1


The reducer takes the mapper feed and creates final trend numbers like so:
linux-z6tw:/home/sodo/trendingtopics/lib/scripts # zcat /home/sodo/Downloads/pagecounts-20110328-010000.gz | python ../python_streaming/daily_trends.py mapper | python ../python_streaming/daily_trends.py reducer 1 | grep '^Oldsmobile' | head
Oldsmobile 331.160785273 0.125
Oldsmobile_"Rocket_V8"_engine 4.19722457734 0.707106781187
Oldsmobile_4-4-2 1.69314718056 1.0
Oldsmobile_442 188.542882668 0.158113883008
Oldsmobile_66 4.19722457734 0.707106781187
Oldsmobile_88 182.86629871 0.160128153805
Oldsmobile_98 106.452413451 0.2
Oldsmobile_Achieva 4.19722457734 0.707106781187
Oldsmobile_Aerotech 1.69314718056 1.0
Oldsmobile_Aerotech_III 1.69314718056 1.0


Later scripts will then load this data into Hive and eventually, MySQL for display in the web app.

HiveQL
To really oversimplify, HiveQL is just like SQL (the basic syntax is the same), but for databases that sit atop a Hadoop HDFS filesystem. The benefit of that is that you can divide up the computational tasks of a relational database into little chunks spread out over many servers, managed by MapReduce. This is especially helpful if you have say, big fat ass weblogs you need to munge through in a relational fashion.

Here's sample HiveQL code, courtesy of Peter Skomoroch:
sodo@linux-z6tw:~/trendingtopics/lib/hive> cat hive_daily_timelines.sql
CREATE TABLE raw_daily_stats_table (redirect_title STRING, dates STRING, pageviews STRING, total_pageviews BIGINT, monthly_tre
nd DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;

CREATE TABLE redirect_table (redirect_title STRING, true_title STRING, page_id BIGINT, page_latest BIGINT) ROW FORMAT DELIMITE
D FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;

CREATE TABLE daily_timelines (page_id BIGINT, dates STRING, pageviews STRING, total_pageviews BIGINT) ROW FORMAT DELIMITED FIE
LDS TERMINATED BY '\t' STORED AS TEXTFILE;

CREATE TABLE pages (page_id BIGINT, url STRING, title STRING, page_latest BIGINT, total_pageviews BIGINT, monthly_trend DOUBLE
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;

CREATE TABLE sample_pages (page_id BIGINT, url STRING, title STRING, page_latest BIGINT, total_pageviews BIGINT, monthly_trend
DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH '/mnt/page_lookup_nonredirects.txt' OVERWRITE INTO TABLE redirect_table;
-- columns: redirect_title string,true_title string, page_id bigint, page_latest bigint
-- sample data: Anarchism Anarchism 12 275854375

LOAD DATA INPATH 'finaloutput' INTO TABLE raw_daily_stats_table;
-- columns: redirect_title string, dates string, pageviews string, total_pageviews bigint, monthly_trend double
-- sample data: Anarchism [20110215,20110216,20110217] [127,126,100] 4394 -286.0

INSERT OVERWRITE TABLE pages
SELECT redirect_table.page_id, redirect_table.redirect_title, redirect_table.true_title, redirect_table.page_latest, raw_daily
_stats_table.total_pageviews, raw_daily_stats_table.monthly_trend
FROM redirect_table
JOIN raw_daily_stats_table ON (redirect_
table.redirect_title = raw_daily_stats_table.redirect_title
);
-- columns: page_id bigint, url string, title string, page_latest bigint, total_pageviews bigint, monthly_trend double
-- sample data: 12 Anarchism Anarchism 275854375 4394 -286.0

INSERT OVERWRITE TABLE sample_pages
SELECT * FROM pages SORT BY monthly_trend DESC LIMIT 100;

INSERT OVERWRITE TABLE daily_timelines
SELECT redirect_table.page_id, raw_daily_stats_table.dates, raw_daily_stats_table.pageviews, raw_daily_stats_table.total_pagev
iews FROM redirect_table JOIN raw_daily_stats_table ON (redirect_table.redirect_title = raw_daily_stats_table.redirect_title);
-- columns: page_id bigint, dates string, pageviews string, total_pageviews bigint
-- sample data: 12 [20110215,20110216,20110217] [127,126,100] 4394


You see a variety of standard SQL features that Pete applies to Hadoop's distributed filesystem model:
1) loading data from a file in a local filesystem
2) loading data from an HDFS/Hadoop filesystem
3) creating tables by SELECTing from other tables
4) creating tables based on JOINs

Nice work, Mr.Skomoroch!

Minutiae and Some Time Wasters
Set environment variables
export MYBUCKET=trendingtopics
export MYSERVER=linux-z6tw
export MAILTO=cacasododom@gmail.com

Commands to be run as non-privileged user
whirr launch-cluster --config=hadoop.properties
./updateHadoopConfig.sh (including launch proxy)

Commands to be run as root
start run_daily_timelines.sh
bash trendingtopics/lib/scripts/run_daily_timelines.sh $MYBUCKET $MYSERVER $MAILTO

Diff between Pete's original run_daily_timelines.sh and mine
1) my s3n input parameter doesn't accept wildcards like this:
-input s3n://$1/wikistats/pagecounts-200*
2) my scripts are in a different directory:
-file '/home/sodo/trendingtopics/lib/python_streaming/daily_timelines.py'
-file '/home/sfrase/trendingtopics/lib/python_streaming/daily_timelines.py'
hive -f /home/sfrase/trendingtopics/lib/hive/hive_daily_timelines.sql

3) the hadoop streaming jar file has changed:
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-CDH3B4.jar
4) for testing purposes, I had to change the reducer to look at one day of data
-reducer "daily_timelines.py reducer2 1"

Among the various permissions, credentials and communication issues..
A final gotcha was that at one point, because of incorrect pathing, the script failed. But the script failed after it created some of the data tables in Hive. Therefore, when I ran the script again, the HiveQL scripts would bomb out:
FAILED: Error in metadata: AlreadyExistsException(message:Table raw_daily_stats_table already exists)
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask


Once I dropped those tables, all was right with the world.

References
http://sujitpal.blogspot.com/2009/10/parallelizing-crawling-with-hadoop.html
Writing a MapReduce Program in Python
Tom White's MapReduce Primer
How To Setup Nutch and Hadoop
How Hadoop MapReduce Works

2 comments:

  1. I've been trying to execute a join on two tables inside the same database namespace and Hive keeps complaining.

    I'm using the following format:

    SELECT namespace.table1.* FROM namespace.table1 JOIN namespace.table2 ON (namespace.table1.field1 = namespace.table2.field2);

    Any thoughts?

    ReplyDelete

  2. In Hadoop, MapReduce is a calculation that decomposes large manipulation jobs into individual tasks that can be executed in parallel cross a cluster of servers. The results of tasks can be joined together to compute final results.
    Mapreduce program example
    Hadoop fs command using java api

    ReplyDelete