Tag Archives: Big Data
Adding Geocoding Capabilities to Pig through Custom UDFs
In the previous two posts in this series, I’ve used Hive and Pig to process and analyse data from log files generated by the Rittman Mead Blog webserver. In the article on Hive, I created a relational table structure over a directory containing the log files, using a regular expression SerDe to split each log line up into its constituent parts (IP address, page requested, status code and so on). I then brought in another table of data, containing IP address ranges and countries they were assigned to, so that I could determine what parts of the world were accessing our site over time.
In the second example, I took the same log files but processed them this time using Apache Pig. I used Pig’s dataflow-style approach to loading and analysing data to progressively filter, pivot and analyse the log file dataset, and then joined it to an export of pages and authors from WordPress, the CMS that we used to run the website, so I could determine who wrote the most popular blog articles over the period covered by the logs.
But the first example, where I joined the log file data to the geocoding table, had a bit of an issue that only came-up when I tested it with a larger set of data than I used at the end of that article. In the article example, I limited the amount of log file rows to just five, at the time to keep the example readable on the blog, but when I tried it later on with the full dataset, the query eventually failed with an out-of-memory error from the Hadoop cluster. Now in practice, I could probably have increased the memory (java heap space) or otherwise got the query through, but geo-tagging my data in this way – as a big table join, and using an in-memory database engine (Impala) to do it – probably isn’t the most sensible way to do a single value lookup as part of a Hadoop transformation – instead, this is probably something better done through what’s called a “user-defined function”.
Both Hive and Pig support used-defined functions (UDFs), and a quick Google search brought up one for Hive called GeocodeIP, on Github, that looks like it might do the job. Sticking with Pig for the moment though, we thought this might be a good opportunity to see how UDFs for Pig are created, and so my colleague, Nelio Guimaraes, put together the following example to walk through how a typical one might be created. Before start though, a bit of background.
The problem we have is that we need to match an IP address in a webserver log file with an IP address range in a lookup table. For example, the IP address in the lookup table might be 124.232.100.105, any the lookup database would have an IP address range from, say, 124.232.100.0 to 124.232.100.255 which allocates to a particular country – Poland, for example. The lookup database itself comes from Maxmind, and there’s a formula they use to convert IP addresses to integers, like this:
so that you can do a simple BETWEEN in an SQL join to locate the range that matches the incoming IP address.
Except Pig, like Hive, can’t normally support non-equijoins, which leads us to UDFs and other approaches to getting the country for our IP address. Pig, again like Hive, is however extensible and its relatively easy to add Pig UDFs either yourself, or through UDF libraries like Pig’s Piggybank. The best language to write UDFs in is Java as it gives access to the largest amount of Pig native functionality (such as the ability to write custom data loaders and unloaders), but for what we’re doing Python will be fine, so let’s put one together in Python to do our IP address Geocoding and show how the process works.
Going back to the Pig scripts we put together yesterday, they started-off by declaring a relation that loaded the raw log files in from a directory in HDFS, and then used another relation to parse the first one via a regular expression, so we had each of the log file elements in its own column, like this:
raw_logs =LOAD '/user/root/logs/' USING TextLoader AS (line:chararray); logs_base = FOREACH raw_logs GENERATE FLATTEN ( REGEX_EXTRACT_ALL ( line, '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"' ) ) AS ( remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray, request: chararray, status: int, bytes_string: chararray, eferrer: chararray, browser: chararray );
At this point we can run-off a list of the top 5 browser type based on page access, for example:
grunt> by_browser_group = GROUP logs_base BY browser; grunt> by_browser_group_count = FOREACH by_browser_group >> GENERATE group, COUNT($1); grunt> by_browser_group_sorted = ORDER by_browser_group_count BY $1 DESC; grunt> by_browser_group_top_5 = LIMIT by_browser_group_sorted 5; grunt> dump by_browser_group_top_5 ... (Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36,675161) (-,394101) (Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com),371078) (Mozilla/5.0 (Windows NT 6.1; WOW64; rv:28.0) Gecko/20100101 Firefox/28.0,201861) (Mozilla/5.0 (Windows NT 6.1; WOW64; rv:27.0) Gecko/20100101 Firefox/27.0,167851)
See the previous post, and this other blog article, for more background on how you do grouping and aggregating in Pig, if you’re not familiar with the syntax.
But what if we want to group by country? That’s where the geocoding comes in, and the Pig UDF we’re going to create. As we’re going to create this example using Python, we’ll be using the pygeoip Python API that you install through the pip, the Python package manager, and the GeoLite Country database (.dat) file from Maxmind, who make this basic version available for download free, then follow these steps to set the Python UDF up:
1. On the master node on your Hadoop cluster (i’m using a three-node CDH4.6 cluster) where Pig and Hive run, install pip, and then download the pygeoip API, like this:
wget https://raw.github.com/pypa/pip/master/contrib/get-pip.py python get-pip.py pip install pygeoip
2. Copy the GeoIP.dat file to somewhere on the Hadoop master node, for example /home/nelio/. Make a note of the full path to the GeoIP.dat file, and then copy the file to the same location on all of the worker nodes – there’s probably a way to cache this or otherwise automatically distribute the file (suggestions welcome), but for now this will ensure that each worker node can get access to a copy of this file.
3. Using a text editor, create the python script that will provide the Python UDF, like this, substituting the path to your GeoIP.dat file if it’s somewhere else. Once done, save the file as python_geoip.py to the same directory (/home/nelio, in this example) – note that this file only needs to go on the main, master node in your cluster, and Pig/MapReduce will distribute it to the worker nodes when we register it in our Pig script, later on.
#!/usr/bin/python import sys sys.path.append('/usr/lib/python2.6/site-packages/') import pygeoip @outputSchema("country:chararray") def getCountry(ip): gi = pygeoip.GeoIP('/home/nelio/GeoIP.dat') country = gi.country_name_by_addr(ip) return country
Note that the sys.path.append line in the script is so that Jython knows to look in the place were the new python module, pygeoip, when starting up.
4. Let’s start another Pig session now and try and use this UDF. I exit back to the OS command prompt, and change directory to where I’ve stored the python file and the GeoIP.dat file, and start another Grunt shell session:
[root@cdh4-node1 ~]# cd /home/nelio [root@cdh4-node1 nelio]# pig
Now if I want to use this Python Pig UDF in a Grunt shell session, I need to register it as an scripting UDF either at the Grunt shell or in a Pig script I run, so let’s start with that, and then bring in the log data as before:
grunt> register 'python_geoip.py' using jython as pythonGeoIP; ... grunt> raw_logs =LOAD '/user/root/logs/' USING TextLoader AS (line:chararray); grunt> logs_base = FOREACH raw_logs >> GENERATE FLATTEN >> ( >> REGEX_EXTRACT_ALL >> ( >> line, >> '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"' >> ) >> ) >> AS >> ( >> remoteAddr: chararray, remoteLogname: chararray, user: chararray, >> time: chararray, request: chararray, status: int, bytes_string: chararray, >> eferrer: chararray, browser: chararray >> );
I’ll now define a new relation (alias) projecting just the IP addresses from the combined log files, and then filter that into another alias so we only have valid IP addresses to work with:
grunt> ipaddress = FOREACH logs_base GENERATE remoteAddr; grunt> clean_ip = FILTER ipaddress BY (remoteAddr matches '^.*?((?:\\d{1,3}\\.){3}\\d{1,3}).*?$');
Now we get to use the Python UDF. We’ll pass to it these IP addressees, with the UDF then returning the country that the IP address is located in, based on Maxmind’s IP address ranges.
grunt> country_by_ip = FOREACH clean_ip GENERATE pythonGeoIP.getCountry(remoteAddr); 2014-05-05 05:22:50,141 [MainThread] INFO org.apache.pig.scripting.jython.JythonFunction - Schema 'country:chararray' defined for func getCountry grunt> describe country_by_ip country_by_ip: {country: chararray}
So this function will have converted all of the IP addresses in the logs to country names; let’s now group, count, order and select the top five from that list.
grunt> group_by_country = GROUP country_by_ip BY country; grunt> count_by_country = FOREACH group_by_country GENERATE FLATTEN(group) as country, COUNT(country_by_ip) AS (hits_per_country:long); grunt> order_by_access = ORDER count_by_country BY hits_per_country DESC; grunt> top5_country = LIMIT order_by_access 5;
Of course all this has done so far is set-up a data-flow in Pig, telling it how to move the data through the pipeline and arrive at the final output I’m interested in; let’s now run the process by using the “dump” command:
grunt> dump top5_country ... (United States,2012311) (India,785585) (United Kingdom,459422) (Germany,231386) (France,168319)
So that’s a simple example of a Pig UDF, in this instance written in Python. There’s other ways to extend Pig beyond UDFs – Pig Streaming is the obvious alternative, where the entire relation goes through the streaming interface to be processed and then output back into Pig, and hopefully we’ll cover this at some point in the future – or then again, maybe it’s now time to take a proper look at Spark.
Simple Hadoop Dataflows using Apache Pig and CDH4.6
The other day I took some logs from the Apache webserver that runs the Rittman Mead website, and analysed them using Hadoop CDH5, Apache Hive and Impala to get some basic metrics on number of hits per month, where the hits came from and so on. Hive and Impala are great for analysing data sitting on HDFS on a Hadoop cluster, but like SQL compared to PL/SQL or C++, everything you do is declarative and set-based whereas sometimes, you want to build up your dataset using a dataflow-type approach, particularly if you’ve come from a programming vs. a data warehousing background.
If you’ve been looking at Hadoop for a while, you’ll probably therefore know there’s another basic high-level-language approach to querying Hadoop data to accompany Hive, and it’s called “Pig”. Pig, like Hive, is an Apache project and provides an engine for creating and executing data flows, in parallel, on Hadoop. Like Hive, jobs you create in Pig eventually translate into MapReduce jobs (with the advantages and disadvantages that this brings), and has concepts that are similar – but just that little bit different – to relational flows such as filters, joins and sorts.
It’s often called a “procedural” language (as opposed to Hive’s declarative language), but really it’s not – it’s a “data flow language” that has you specifically set out the data flow as the main part of a Pig program, rather than it being a by-product of the if/then/elses and control structures of a procedural language. For people like me that comes from an Oracle data warehousing background, in most cases we’d feel more comfortable using Hive’s set-based transformations to do our data loading and transformation on Hadoop, but in some cases – particularly when you’re querying data interactively, building up a data pipeline and working with nested data sets – it can be more appropriate.
Connecting to the Pig Console, and Pig Execution Options
Iteratively examining and analysing data from webserver log files is a great example of where Pig could be useful, as you naturally hone-down and pivot the data as you’re looking at it, and in-effect you’re looking to create a data pipeline from the raw logs through to whatever summary tables or files you’re looking to create. So let’s go back to the same input log files I used in the previous post on Hive and Impala, and this time bring them into Pig.
Within CDH (Cloudera Distribution including Hadoop) you can run Pig scripts either interactively from the Pig command-line shell, called “Grunt”, or you can submit them as workflow jobs using the Hue web interface and the Oozie workflow scheduler; the advantage when you’re starting to working with the interactive Grunt shell is that you can run your commands one-by-one and examine the metadata structures that you create along the way, so let’s use that approach first and move onto batch scheduling later on.
I’ll start by SSH’ing into one of the CDH4.6 nodes and starting the Grunt shell:
officeimac:~ markrittman$ ssh root@cdh4-node1 root@cdh4-node1's password: Last login: Sat May 3 06:38:18 2014 from 192.168.2.200 [root@cdh4-node1 ~]# pig 2014-05-03 06:44:39,257 [main] INFO org.apache.pig.Main - Apache Pig version 0.11.0-cdh4.6.0 (rexported) compiled Feb 26 2014, 03:01:22 2014-05-03 06:44:39,258 [main] INFO org.apache.pig.Main - Logging error messages to: /root/pig_1399095879254.log 2014-05-03 06:44:39,301 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /root/.pigbootup not found 2014-05-03 06:44:39,663 [main] WARN org.apache.hadoop.conf.Configuration - fs.default.name is deprecated. Instead, use fs.defaultFS 2014-05-03 06:44:39,663 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://cdh4-node1.rittmandev.com:8020 2014-05-03 06:44:40,392 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: cdh4-node1.rittmandev.com:8021 2014-05-03 06:44:40,394 [main] WARN org.apache.hadoop.conf.Configuration - fs.default.name is deprecated. Instead, use fs.defaultFS
Even from within the Grunt shell, there’s two ways I can then run Pig. The default way is to have Grunt run your Pig commands as you’d expect, converting them in the end to MapReduce jobs which then run on your Hadoop cluster. Or, you can run in “local mode”, which again uses MapReduce but only runs on the machine you’re logged in to and only single-threaded, but can often be faster for when you’re just playing around with a local dataset and you want to see results fast (you can turn on local mode by adding an ‘-x local’ flag when starting Grunt). In my example, I’m going to run Grunt in regular MapReduce mode though anyway.
Loading and Parsing the Weblog Files
I then define my first pig relation, analogous to a relational table and technically, a named Pig “bag”, like this:
grunt> raw_logs =LOAD '/user/root/logs/' USING TextLoader AS (line:chararray);
Compared to the Pig table DDL script in the previous article example I posted, we declare the incoming dataset much more programmatically – the first row of the script creates a relation called “raw_logs”, analogous to a table in Hive, and declares it as having a single column (“line:array”) that maps onto a directory of files in HDFS (“/user/root/logs”). You can ask Pig (through the Pig command-line client, which I’m using now) to list-out the structure of this relation using the “describe” command:
grunt> describe raw_logs; raw_logs: {line: chararray}
In this form the logs aren’t too useful though as each row contains all the data we want, as a single field. To take a look at what we’re working with currently, let’s create another relation that limits down the dataset to just five rows, and use the DUMP command to display the relation’s data on the screen:
grunt> raw_logs_limit_5 = LIMIT raw_logs 5; grunt> DUMP raw_logs_limit_5 2014-05-03 16:15:13,260 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 (38.127.201.3 - - [03/Apr/2014:20:56:34 +0000] "GET /wp-content/uploads/2012/01/Partial-Hybrid-Model-e1327470743307.png HTTP/1.1" 200 8432 "http://obiee.nl/?tag=dimensional-modelling&paged=2" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)") (84.39.213.18 - - [08/Apr/2014:20:13:45 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 - "http://www.rittmanmead.com/2009/05/troubleshooting-obiee-connectivity-and-server-issues/" "Mozilla/5.0 (Windows NT 5.1; rv:26.0) Gecko/20100101 Firefox/26.0") (137.151.212.38 - - [11/Apr/2014:06:08:10 +0000] "GET /wp-content/plugins/featured-content-gallery/scripts/jd.gallery.js.php HTTP/1.1" 200 6075 "http://www.rittmanmead.com/training/customobiee11gtraining/" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)") (137.151.212.38 - - [11/Apr/2014:06:08:10 +0000] "GET /wp-content/plugins/featured-content-gallery/scripts/jd.gallery.transitions.js HTTP/1.1" 200 492 "http://www.rittmanmead.com/training/customobiee11gtraining/" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)") (199.208.239.141 - - [18/Mar/2014:14:11:52 +0000] "GET /wp-content/uploads/2013/08/NewImage14.png HTTP/1.1" 200 217190 "http://www.rittmanmead.com/2013/08/inside-my-home-office-development-lab-vmware-os-x-server/" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)")
What I’ve omitted for clarity in the above output is the MapReduce console output – what you’ll see if you run this in MapReduce mode is the process starting up, and then running, to retrieve 5 rows effectively at random from the whole set of log files, process them through the Map > Shuffle > Reduce process and then return them to the Grunt shell.
What would be really good though, of course, is if we could split these single log row columns into multiple ones, one for each part of the log entry. In the Hive example I posted the other day, I did this through a Hive “SerDe” that used a regular expression to split the file, and I can do something similar in Pig; Pig has a function called REGEX_EXTRACT_ALL that takes a regular expression and creates a column for each part of the expression, and so I can use it in conjunction with another relational operator, GENERATE FLATTEN, to take the first set of data, run it through the regular expression and come out with another set of data that’s been split as I want it:
logs_base = FOREACH raw_logs GENERATE FLATTEN ( REGEX_EXTRACT_ALL ( line, '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"' ) ) AS ( remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray, request: chararray, status: chararray, bytes_string: chararray, referrer: chararray, browser: chararray ); grunt> describe logs_base logs_base: {remoteAddr: chararray,remoteLogname: chararray,user: chararray,time: chararray,request: chararray,status: chararray,bytes_string: chararray,referrer: chararray,browser: chararray}
GENERATE in Pig tells it to create (or “project”( some columns out of an incoming dataset; FLATTEN eliminates any nesting the resulting output (we’ll see more of FLATTEN and nesting in a moment). Notice how the DESCRIBE command afterwards now shows individual columns for the log elements, rather than just one single “line:chararray” column.
Using Pig to Interactively Filter the Dataset
So now we’ve got a more useful set of rows and columns in the Pig relation, and like an Oracle table, unless we do something to order them later, they’re effectively held in random order. Something we can do now is filter the dataset, for example creating another relation containing just those log entries where the request 404’d, and the further filter that dataset to those 404’d requests that were made by users using IE6:
grunt> logs_404 = FILTER logs_base BY status == ‘404'; grunt> logs_404_ie6 = FILTER logs_404 BY browser == 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)';
So how many of our website users are on IE6 and getting page not available errors? To find out, I create another relation that groups the entries up in a single row, and then generates a count of those rows that were aggregated:
grunt> logs_404_ie6_count = FOREACH (GROUP logs_404_ie6 ALL) GENERATE COUNT(logs_404_ie6); grunt> DUMP logs_404_ie6_count ... (95)
and I can do a similar thing all of the 404’s:
grunt> logs_404_count = FOREACH (GROUP logs_404 ALL) GENERATE COUNT(logs_404); grunt> dump logs_404_count ... (31998)
You can see these Pig scripts running in CDH’s Cloudera Manager web application, with the screenshot below showing one of them at the point where 92% of the Mapper parts have completed, waiting to hand-off to the Reducers; the console output in Grunt will show you the status too, the output of which I removed from the above two statements for clarity.
Grouping, Subsetting and Aggregating Data using Pig
How we generate counts and other aggregates is interesting in Pig. Pig has a relational operator called GROUP as we’ve seen before, and when you GROUP a relation by a column, or a group of columns, it creates a new relation that contains two columns; one called “group” that has the same datatype as whatever you grouped on (or a “tuple” made up of multiple columns, if you grouped-on more than one column), and a second column that’s named after whatever you grouped, i.e. the original relation. To take an example, if we grouped the logs_base relation on status code, you’d see the following if you then describe the resulting relation:
grunt> by_status = GROUP logs_base BY status; grunt> describe by_status by_status: {group: chararray,logs_base: {(remoteAddr: chararray,remoteLogname: chararray,user: chararray,time: chararray,request: chararray,status: chararray,bytes_string: chararray,referrer: chararray,browser: chararray)}}
What’s interesting though about a pig GROUP, and conceptually different to SQL (and therefore Hive)’s GROUP BY, is that this second column is actually in Pig terms a “bag”, a bag of rows (or “tuples”) that are unaltered compared to the original relation, i.e. they’ve not been aggregated up by the grouping, but are still in their same detail-level. So Pig gives you, apart from its step-by-step data flow method of working with data, this ability to group data whilst still preserving the detail of the individual grouped rows, leaving any summation or other aggregation step to something you do afterwards. So for example, if I wanted to see how many 200s, 404’s and so on my log file dataset contained in total, I then tell Pig to iterate through these bags, project out the columns I’m interested in (in this case, just the status) and also perform aggregation over the grouping buckets specified in the GROUP relational operator:
grunt> by_status_count = FOREACH by_status GENERATE FLATTEN (group) as status, COUNT(logs_base); grunt> dump by_status_count (302,23680) (304,512771) (403,32) (405,250) (414,13) (416,8) (500,2155) (,0) (200,4447087) (206,8875) (301,304901) (400,59) (404,31998) (408,9601) (501,12) (503,2)
So in that example, we told Pig to list out all of the groupings (i.e. the distinct list of status codes), and then run a count of rows against each of those groupings, giving us the output we’re interested in. We could, however, not aggregate those rows at this point though and instead treat each “bucket” formed by the grouping as a sub-selection, allowing us to, for example, investigate in more detail when and why the 301 errors – “Moved Permanently” – were caused. Let’s use that now to find out what the top 10 requests were that led to HTTP 301 errors, starting by creating another relation that just contains the ‘301’ group:
grunt> by_status_301 = FILTER by_status BY group == '301'; grunt> describe by_status_301 by_status_301: {group: chararray,logs_base: {(remoteAddr: chararray,remoteLogname: chararray,user: chararray,time: chararray,request: chararray,status: chararray,bytes_string: chararray,referrer: chararray,browser: chararray)}}
Looking at the structure of the relation this has created though, you can see that the rows we’ve grouped are all contained within a single tuple called “logs_base”, and to do anything interesting with that data we’ll need to flatten it, which takes that tuple and un-nests it:
grunt> by_status_301_flattened = FOREACH by_status_301 >> GENERATE $0, FLATTEN($1); grunt> describe by_status_301_flattened by_status_301_flattened: {group: chararray,logs_base::remoteAddr: chararray,logs_base::remoteLogname: chararray,logs_base::user: chararray,logs_base::time: chararray,logs_base::request: chararray,logs_base::status: chararray,logs_base::bytes_string: chararray,logs_base::referrer: chararray,logs_base::browser: chararray}
Notice also how I referenced the two columns in the by_status_301 relation by positional notation ($0 and $1)? This is handy when either you’ve not got a proper schema defined for your data (all part of the “pigs eat anything” approach for Pig, in that it even handles data you don’t yet have a formal schema for), or when it’s just easier to refer to a column by position than work out it’s formal name.
So now we’ve got our list of log entries that have recorded HTTP 301 “permanently moved” error messages, let’s use another relation to project just the columns we want – the date and the requests – and also use some Pig string functions to extract the day, month and year along, and also split the request field up into its constituent method, URI and protocol fields:
grunt> by_status_301_date_and_urls = FOREACH by_status_301_flattened >> GENERATE SUBSTRING(time,3,6) as month, >> SUBSTRING(time,7,11) as year, FLATTEN(STRSPLIT(request,' ',5)) AS (method:chararray, request_page:chararray, protocol:chararray); grunt> describe by_status_301_date_and_urls by_status_301_date_and_urls: {month: chararray,year: chararray,method: chararray,request_page: chararray,protocol: chararray} grunt> by_status_date_and_urls_group = GROUP by_status_301_date_and_urls BY (year,month); grunt> describe by_status_date_and_urls_group by_status_date_and_urls_group: {group: (year: chararray,month: chararray),by_status_301_date_and_urls: {(month: chararray,year: chararray,method: chararray,request_page: chararray,protocol: chararray)}} grunt> by_status_date_and_urls_group_count = FOREACH by_status_date_and_urls_group >> GENERATE FLATTEN(group) as (year,month), >> COUNT(by_status_301_date_and_urls);
All of these statements just set-up the data flow, and no actual processing takes place until we choose to dump, or store, the results of the data flow – which again makes Pig great for iteratively building-up a data flow, or in BI terms maybe an ETL flow, before finally pulling the trigger at the end and generating the end result. Let’s do that now using the dump command:
grunt> dump by_status_date_and_urls_group_count ... Success! Job Stats (time in seconds): JobIdMapsReducesMaxMapTimeMinMapTImeAvgMapTimeMedianMapTimeMaxReduceTimeMinReduceTimeAvgReduceTimeMedianReducetimeAliasFeatureOutputs job_201404280738_022212296889577by_status,by_status_301,by_status_301_date_and_urls,by_status_301_flattened,logs_base,raw_logsGROUP_BY job_201404280738_02231166665555by_status_date_and_urls_group,by_status_date_and_urls_group_countGROUP_BY,COMBINERhdfs://cdh4-node1.rittmandev.com:8020/tmp/temp-1401083448/tmp827611833, Input(s): Successfully read 5341613 records (1448814961 bytes) from: "/user/root/logs" Output(s): Successfully stored 3 records (63 bytes) in: "hdfs://cdh4-node1.rittmandev.com:8020/tmp/temp-1401083448/tmp827611833" Counters: Total records written : 3 Total bytes written : 63 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 1 Total records proactively spilled: 221601 Job DAG: job_201404280738_0222->job_201404280738_0223, job_201404280738_0223 2014-05-04 00:33:50,193 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 169 time(s). 2014-05-04 00:33:50,193 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2014-05-04 00:33:50,194 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code. 2014-05-04 00:33:50,199 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2014-05-04 00:33:50,199 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 (2014,Apr,85048) (2014,Feb,47) (2014,Mar,219806)
So we had around 85k “page permanently moved” errors in April, only a few in February, and a much larger amount in March 2014. So which web page requests in March 2014 were the biggest cause of this error? Let’s focus on just that month and list out the top ten page requests that hit this error:
grunt> mar_2014_urls = FILTER by_status_301_date_and_urls BY (year == '2014' AND month == 'Mar'); grunt> mar_2014_301_url_group = GROUP mar_2014_urls BY request_page; grunt> describe mar_2014_301_url_group mar_2014_301_url_group: {group: chararray,mar_2014_urls: {(month: chararray,year: chararray,method: chararray,request_page: chararray,protocol: chararray)}} grunt> mar_2014_301_url_group_count = FOREACH mar_2014_301_url_group >> GENERATE FLATTEN(group) as request, >> COUNT(mar_2014_urls) as num; grunt> mar_2014_301_url_group_count_sorted = ORDER mar_2014_301_url_group_count by num DESC; grunt> mar_2014_301_url_group_count_limited = LIMIT mar_2014_301_url_group_count_sorted 10; grunt> dump mar_2014_301_url_group_count_limited ... (/wp-login.php,124124) (/,72374) (/feed,1623) (/2013/12/creating-a-custom-analytics-dashboard-from-scratch-the-blue-peter-way/feed/index.php,896) (/biforum2014/feed/index.php,661) (/training/trn-205-oracle-bi-ee-11g-create-reports-dashboards-alerts-and-scorecards/feed/index.php,659) (/2009/01/rittman-mead-bi-forum-call-for-papers/feed/index.php,519) (/biforum2013/feed/index.php,430) (/2010/05/oracle-bi-ee-10-1-3-4-1-do-we-need-measures-in-a-fact-table/feed/feed/,376) (/2010/05/31/oracle-bi-ee-10-1-3-4-1-do-we-need-measures-in-a-fact-table/feed/,376)
Joining Datasets in Pig
So far we’ve worked with just a single set of data – the Apache weblog files that we’ve then filtered, subsetted, parsed, analysed and so forth. But what would be really interesting though, would be if we can bring in some additional, reference or other lookup data to help us make more sense of the log activity on our website. One of the motivators for the people behind Pig, right at the start, was to give Hadoop the ability to join datasets, which up until then was really hard to do with just Java and MapReduce; as we’ll see later on there are still a lot of restrictions on how these joins take place, but Pig gives you the ability to join two or more datasets together, which we’ll do now in another example where we’ll look at the most popular blog posts, and blog authors, over the period covered by our logs.
Let’s start by taking the full set of logs, parsed into the separate elements of the log file entry, and add in additional columns for month and the request elements:
grunt> page_requests = FOREACH logs_base >> GENERATE SUBSTRING(time,3,6) as month, >> FLATTEN(STRSPLIT(request,' ',5)) AS (method:chararray, request_page:chararray, protocol:chararray); grunt> describe page_requests; page_requests: {month: chararray,method: chararray,request_page: chararray,protocol: chararray}
One thing you’re taught with Pig is “project early, and often”, so let’s remove the method and protocol columns from that dataset and then filter the remaining page requests to remove those that are blank or aren’t blog post requests:
grunt> page_requests_short = FOREACH page_requests >> GENERATE $0,$2; grunt> page_requests_short_filtered = FILTER page_requests_short BY (request_page is not null AND SUBSTRING(request_page,0,3) == '/20'); grunt> page_requests_count = FOREACH (GROUP page_requests_short_filtered ALL) GENERATE COUNT (page_requests_short_filtered); grunt> dump page_requests_count ... (310695)
Let’s now reduce that list down to the top ten page requests, the way we did before with pages causing 301 errors:
grunt> page_request_group = GROUP page_requests_short_filtered BY request_page; grunt> page_request_group_count = FOREACH page_request_group GENERATE $0, COUNT(page_requests_short_filtered) as total_hits; grunt> page_request_group_count_sorted = ORDER page_request_group_count BY $1 DESC; grunt> page_request_group_count_limited = LIMIT page_request_group_count_sorted 10; grunt> dump page_request_group_count_limited ... (/2014/03/obiee-dashboard-prompt-at-least-one-mandatory/,4610) (/2012/03/obiee-11g-security-week-connecting-to-active-directory-and-obtaining-group-membership-from-database-tables/,3528) (/2013/04/upgrading-obiee-to-11-1-1-7/,2963) (/2014/04/bi-forum-2014-preview-no-silver-bullets-obiee-performance-in-the-real-world/,2605) (/2014/03/the-secret-life-of-conditional-formatting-in-obiee/,2579) (/2012/03/obiee-11g-security-week-subject-area-catalog-and-functional-area-security-2/,2410) (/2014/03/introducing-obi-metrics-agent-an-open-source-obiee-metrics-collector/,2321) (/2014/03/using-oracle-r-enterprise-to-analyze-large-in-database-datasets/,2309) (/2014/03/using-sqoop-for-loading-oracle-data-into-hadoop-on-the-bigdatalite-vm/,2240) (/2012/03/obiee-11-1-1-6-new-dashboard-analysis-and-reporting-features/,2160)
Not bad. What would be even better though, would be if I could retrieve the full names of these posts in WordPress, on which our website runs, and also the author name. I’ve got text file export file of post names, URLs and authors that’s been previously exported from our WordPress install, so let’s declare another relation to hold initially the raw rows from that file, like this:
grunt> raw_posts = LOAD '/user/root/posts/' USING TextLoader AS (line:chararray);
Then split that file by the semicolon that delimits each of the entries (author, post name etc):
grunt> posts_line = FOREACH raw_posts >> GENERATE FLATTEN >> ( >> STRSPLIT(line,';',10) >> ) >> AS >> ( >> post_id: chararray, title: chararray, post_date: chararray, >> type: chararray, author: chararray, post_name: chararray, >> url_generated: chararray >> ); grunt> describe posts_line posts_line: {post_id: chararray,title: chararray,post_date: chararray,type: chararray,author: chararray,post_name: chararray,url_generated: chararray}
I’ll now take that relation and project just the columns I’m interested in:
grunt> posts_and_authors = FOREACH posts_line >> GENERATE title,author,post_name,CONCAT(REPLACE(url_generated,'"',''),'/') AS (url_generated:chararray); grunt> describe posts_and_authors posts_and_authors: {title: chararray,author: chararray,post_name: chararray,url_generated: chararray}
Now I’ll do the join, and then take that join and use it to generate a combined list of pages and who wrote them:
grunt> pages_and_authors_join = JOIN posts_and_authors BY url_generated, page_request_group_count_limited BY group; grunt> pages_and_authors = FOREACH pages_and_authors_join GENERATE url_generated, post_name, author, total_hits; grunt> top_pages_and_authors = ORDER pages_and_authors BY total_hits DESC;
and then finally, output the joined set of data to a comma-separated file in HDFS:
grunt> STORE top_pages_and_authors into '/user/root/top-pages-and-authors.csv' USING PigStorage(‘,');
Once that’s run, I can use Grunt’s “cat” command to output the contents of the file I just created:
grunt> cat /user/root/top-pages-and-authors.csv /2014/03/obiee-dashboard-prompt-at-least-one-mandatory/,"obiee-dashboard-prompt-at-least-one-mandatory","Gianni Ceresa",4610 /2012/03/obiee-11g-security-week-connecting-to-active-directory-and-obtaining-group-membership-from-database-tables/,"obiee-11g-security-week-connecting-to-active-directory-and-obtaining-group-membership-from-database-tables","Mark Rittman",3528 /2013/04/upgrading-obiee-to-11-1-1-7/,"upgrading-obiee-to-11-1-1-7","Robin Moffatt",2963 /2014/04/bi-forum-2014-preview-no-silver-bullets-obiee-performance-in-the-real-world/,"bi-forum-2014-preview-no-silver-bullets-obiee-performance-in-the-real-world","Robin Moffatt",2605 /2014/03/the-secret-life-of-conditional-formatting-in-obiee/,"the-secret-life-of-conditional-formatting-in-obiee","Gianni Ceresa",2579 /2012/03/obiee-11g-security-week-subject-area-catalog-and-functional-area-security-2/,"obiee-11g-security-week-subject-area-catalog-and-functional-area-security-2","Mark Rittman",2410 /2014/03/introducing-obi-metrics-agent-an-open-source-obiee-metrics-collector/,"introducing-obi-metrics-agent-an-open-source-obiee-metrics-collector","Robin Moffatt",2321 /2014/03/using-oracle-r-enterprise-to-analyze-large-in-database-datasets/,"using-oracle-r-enterprise-to-analyze-large-in-database-datasets","Mark Rittman",2309 /2014/03/using-sqoop-for-loading-oracle-data-into-hadoop-on-the-bigdatalite-vm/,"using-sqoop-for-loading-oracle-data-into-hadoop-on-the-bigdatalite-vm","Mark Rittman",2240 /2012/03/obiee-11-1-1-6-new-dashboard-analysis-and-reporting-features/,"obiee-11-1-1-6-new-dashboard-analysis-and-reporting-features","Mark Rittman",2160
But What About More Complex Joins and Transformations … Enter, Pig Extensibility
This is of course great, but going back to my previous Hive example I also managed to geo-code the log file entries, converting the IP addresses into country names via a lookup to a geocoding database. What made that example “interesting” though was the need to join the Hive table of log posts to the geocode table via a BETWEEN, or > and < than operators, so that I could locate each IP address within the ranges given by the geocoding database – and the reason it got interesting was that Hive can only do equi-joins, not non-equijoins or joins involving greater than, BETWEEN and so on. Impala *could* do it, and on a small set of input rows – five in my example – it worked fine. Try and scale the Impala query up to the full dataset though, and the query fails, because it runs out of memory; and that’s potentially the issue with Impala, and set-based queries, as Impala does everything in-memory, and most Hadoop systems are designed for fast I/O, not lots of memory.
So can Pig help here? Well, it’s actually got the same limitation – non-equijoins are actually quite difficult to do in Hadoop because of the way MapReduce works, but where Pig could perhaps help is through its extensibility – you can stream Pig data, for example IP addresses, through Perl and Python scripts to return the relevant country, or you can write Pig UDFs – User-Defined Functions – to return the information we need in a similar way to how PL/SQL functions in Oracle let you call-out to arbitrary functions to return the results of a more complex look-up. But this is also where things get a bit more complicated, so we’ll save this to the next post in this series, where I’ll also be joined by my colleague Nelio who’s spent the best part of this last week VPNd into my VMWare-based Hadoop cluster getting this last example working.
More Details on the Lars George Cloudera Hadoop Masterclass – RM BI Forum 2014, Brighton & Atlanta
It’s just over a week until the first of the two Rittman Mead BI Forum 2014 events take place in Brighton and Atlanta, and one of the highlights of the events for me will be Cloudera’s Lars George’s Hadoop Masterclass. Hadoop and Big Data are two technologies that are becoming increasingly important to the worlds of Oracle BI and data warehousing, so this is an excellent opportunity to learn some basics, get some tips from an expert in the field, and then use the rest of the week to relate it all back to core OBIEE, ODI and Oracle Database.
Lars’ masterclass is running on the Wednesday before each event, on May 7th at the Brighton Hotel Seattle and then the week after, on Wednesday 14th May at the Renaissance Atlanta Midtown Hotel. Attendance for the masterclass is just £275 + VAT for the Brighton event, or $500 for the Atlanta event, but you can only book it as part of the overall BI Forum – so book-up now while there are still places left! In the meantime, here’s details of what’s in the masterclass:
Session 1 – Introduction to Hadoop
The first session of the day sets the stage for the following ones. We will look into the history of Hadoop, where it comes from, and how it made its way into the open-source world. Part of this overview are the basic building blocks in Hadoop, the file system HDFS and the batch processing system MapReduce.
Then we will look into the flourishing ecosystem that is now the larger Hadoop project, with its various components and how they help forming a complete data processing stack. We also briefly look into how Hadoop based distributions help today tying the various components together in a reliable manner based on predictable release cycles.
Finally we will pick up the topic of cluster design and planning, talking about the major decision points when provisioning a Hadoop cluster. This includes the hardware considerations depending on specific use-cases as well as how to deploy the framework on the cluster once it is operational.
Session 2 – Ingress and Egress
The second session dives into the use of the platform as part of an Enterprise Data Hub, i.e. the central storage and processing location for all of the data in a company (large, medium, or small). We will discuss how data is acquired into the data hub and provisioned for further access and processing. There are various tools that allow the importing of data from single event based systems to relational database management systems.
As data is stored the user has to make decisions how to store the data for further processing, since that can drive the performance implications considerably. In state-of-the-art data processing pipelines there are usually hybrid approaches that combine lightweight LT (no E for “extract” needed), i.e. transformations, with optimised data formats as the final location for fast subsequent processing. Continuous and reliable data collection is vital for productionising the initial proof-of-concept pipelines.
Towards the end we will also look at the lower level APIs available for data consumption, rounding off the set of available tools for a Hadoop practitioner.
Session 3 – NoSQL and Hadoop
For certain use-cases there is an inherent need for something more “database” like compared to the features offered by the original Hadoop components, i.e. file system and batch processing. Especially for slow changing dimensions and entities in general there is a need for updating previously stored data as time progresses. This is where HBase, the Hadoop Database, comes in and allows for random reads and writes to existing rows of data, or entities in a table.
We will dive into the architecture of HBase to derive the need for proper schema design, one of the key tasks implementing a HBase backed solution. Similar to the file formats from session 2, HBase allows to freely design table layouts which can lead to suboptimal performance. This session will introduce the major access patterns observed in practice and explain how they can play to HBase’s strengths.
Finally a set of real-world examples will show how fellow HBase users (e.g. Facebook) have gone through various modification of their schema design before arriving at their current setup. Available open-source projects show further schema designs that will help coming to terms with this involved topic.
Session 4 – Analysing Big Data
The final session of the day tackles the processing of data, since so far we have learned mostly about the storage and preparation of data for subsequent handling. We will look into the existing frameworks atop of Hadoop and how they offer distinct (but sometimes also overlapping) functionalities. There are frameworks that run as separate instance but also higher level abstractions on top of those that help developers and data wranglers of all kinds to find their right weapon of choice.
Using all of the learned the user will then see how the various tools can be combined to built the promised reliable data processing pipelines, landing data in the Enterprise Data Hub and using automatisms to start the said subsequent processing without any human intervention. The closing information provided in this session will look into the external interfaces, such as JDBC/ODBC, enabling the visualisation of the computed and analysed data in appealing UI based tools.
Detailed Agenda:
- Session 1 – Introduction to Hadoop
- Introduction to Hadoop
- Explain pedigree, history
- Explain and show HDFS, MapReduce, Cloudera Manager
- The Hadoop Ecosystem
- Show need for other projects within Hadoop
- Ingress, egress, random access, security
- Cluster Design and Planning
- Introduce concepts on how to scope out a cluster
- Typical hardware configuration
- Deployment methods
- Session 2 – Ingress and Egress
- Explain Flume, Sqoop to load data record based or in bulk
- Data formats and serialisation
- SequenceFile, Avro, Parquet
- Continuous data collection methods
- Interfaces for data retrieval (lower level)
- Session 3 – NoSQL and Hadoop
- HBase Introduction
- Schema design
- Access patterns
- Use-cases examples
- Session 4 – Analysing Big Data
- Processing frameworks
- Explain and show MapReduce, YARN, Spark, Solr
- High level abstractions
- Hive, Pig, CrunchImpalaSearch
- Datapipelines in Hadoop
- Explain Oozie, Crunch
- Access to data for existing systems
- ODBC/JDBC
Full details of both BI Forum events can be found at the Rittman Mead BI Forum 2014 web page, and full agenda details for Brighton and Atlanta are on this blog post from last week.
Simple Data Manipulation and Reporting using Hive, Impala and CDH5
Althought I’m pretty clued-up on OBIEE, ODI, Oracle Database and so on, I’m relatively new to the worlds of Hadoop and Big Data, so most evenings and weekends I play around with Hadoop clusters on my home VMWare ESXi rig and try and get some experience that might then come in useful on customer projects. A few months ago I went through an example of loading-up flight delays data into Cloudera CDH4 and then analysing it using Hive and Impala, but realistically it’s unlikely the data you’ll analyse in Hadoop will come in such convenient, tabular form. Something that’s more realistic is analysing log files from web servers or other high-volume, semi-structured sources, so I asked Robin to download the most recent set of Apache log files from our website, and I thought I’d have a go at analysing them using Pig and Hive, and maybe the visualise the output using OBIEE (if possible, later on).
As I said, I’m not an expert in Hadoop and the Cloudera platform, so I thought it’d be interesting to describe the journey I went through, and also give some observations from myself on when to use Hive and when to use Pig; when products like Cloudera Impala could be useful, and also the general state-of-play with the Cloudera Hadoop platform. So the files I started off with were Apache weblog files, with 10 in total and sizes ranging from 350MB to around 2MB.
Looking inside one of the log files, they’re in the standard Apache log file format (or “combined log format”), where the visitor’s IP address is recorded, the date of access, some other information and the page (or resource) they requested:
What I’m looking to do is count the number of visitors on a day, which was the most popular page, what time of day are we most busy, and so on. I’ve got a Cloudera Hadoop CDH5.0 6-node cluster running on a VMWare ESXi server at home, so the first thing to do is log into Hue, the web-based developer admin tool that comes with CDH5, and upload the files to a directory on HDFS (Hadoop Distributed File System), the Unix-like clustered file system that underpins most of Hadoop.
You can, of course, SFTP the files to one of the Hadoop nodes and use the “hadoop fs” command-line tool to copy the files into HDFS, but for relatively small files like these it’s easier to use the web interface to upload them from your workstation. Once I’ve done that, I can then view the log files in the HDFS directory, just as if they were sitting on a regular Unix filesystem.
At this point though, the files are still “unstructured’ – just a single log entry per line – and I’ll therefore need to do something before I can count things like number of hits per day, what pages were requested and so on. At this beginners level, there’s two main options you can use – Hive, a SQL interface over HDFS that lets you select from, and do set-based transformations with, files of data; or Pig, a more procedural language that lets you manipulate file contents as a series of step-by-step tasks. For someone like myself with a relational data warehousing background, Hive is probably easier to work with but it comes with some quite significant limitations compared to a database like Oracle – we’ll see more on this later.
Whilst Hive tables are, at the most simplest level, mapped onto comma or otherwise-delimted files, another neat feature in Hive is that you can use what’s called a “SerDe”, or “Serializer-Deserializer”, to map more complex file structures into regular table columns. In the Hive DDL script below, I use this SerDe feature to have a regular expression parse the log file into columns, with the data source being an entire directory of files, not just a single one:
CREATE EXTERNAL TABLE apachelog ( host STRING, identity STRING, user STRING, time STRING, request STRING, status STRING, size STRING, referer STRING, agent STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?", "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s" ) STORED AS TEXTFILE LOCATION '/user/root/logs';
Things to note in the above DDL are:
- EXTERNAL table means that the datafile used to populate the Hive table sits somewhere outside Hive’s usual /user/hive/warehouse directory, in this case in the /user/root/logs HDFS directory.
- ROW FORMAT SERDE ‘org.apache.hadoop.hive.contrib.serde2.RegexSerDe’ tells Hive to use the Regular Expressions Serializer-Deserializer to interpret the source file contents, and
- WITH SERDEPROPERTIES … gives the SerDe the regular expression to use, in this case to decode the Apache log format.
Probably the easiest way to run the Hive DDL command to create the table is to use the Hive query editor in Hue, but there’s a couple of things you’ll need to do before this particular command will work:
1. You’ll need to get hold of the JAR file in the Hadoop install that provides this SerDE (hive-contrib-0.12.0-cdh5.0.0.jar) and then copy it to somewhere on your HDFS file system, for example /user/root. In my CDH5 installation, this file was at opt/cloudera/parcels/CDH/lib/hive/lib/, but it’ll probably be at /usr/lib/hive/lib if you installed CDH5 using the traditional packages (rather than parcels) route. Also if you’re using a version of CDH prior to 5, the filename will be renamed accordingly. This JAR file then needs to accessible to Hive, and whilst there’s various more-permanent ways you can do this, the easiest is to point to the JAR file in an entry in the query editor File Resources section as shown below.
2. Whilst you’re there, un-check the “Enable Parameterization” checkbox, otherwise the query editor will interpret the SerDe output string as parameter references.
Once the command has completed, you can click over to the Hive Metastore table browser, and see the columns in the new table.
Behind the scenes, Hive maps its table structure onto all the files in the /user/root/logs HDFS directory, and when I run a SELECT statement against it, for example to do a simple row count, MapReduce mappers, shufflers and sorters are spun-up to return the count of rows to me.
But in its current form, this table still isn’t all that useful – I’ve just got raw IP addresses for page requesters, and the request date is a format that’s not easy to work with. So let’s do some further manipulation, creating another table that splits out the request date into year, month, day and time, using Hive’s CREATE TABLE AS SELECT command to transform and then load in one command:
CREATE TABLE apachelog_date_split_parquet ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat" OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat" AS SELECT host, identity, user, substr(time,9,4) year, substr(time,5,3) month, substr(time,2,2) day, substr(time,14,2) hours, substr(time,17,2) secs, substr(time,20,2) mins, request, status, size, referer, agent FROM apachelog ;
Note the ParquetHive SerDe I’m using in this table’s row format definition – Parquet is a compressed, column-store file format developed by Cloudera originally for Impala (more on that in a moment), that from CDH4.6 is also available for Hive and Pig. By using Parquet, we potentially take advantage of speed and space-saving advantages compared to regular files, so let’s use that feature now and see where it takes us. After creating the new Hive table, I can then run a quick query to count web server hits per month:
So – getting more useful, but it’d be even nicer if I could map the IP addresses to actual countries, so I can see how many hits came from the UK, how many from the US, and so on. To do this, I’d need to use a lookup service or table to map my IP addresses to countries or cities, and one commonly-used such service is the free GeoIP database provided by MaxMind, where you turn your IP address into an integer via a formula, and then do a BETWEEN to locate that IP within ranges defined within the database. How best to do this though?
There’s several ways that you can enhance and manipulate data in your Hadoop system like this. One way, and something I plan to look at on this blog later in this series, is to use Pig, potentially with a call-out to Perl or Python to do the lookup on a row-by-row (or tuple-by-tuple) basis – this blog article on the Cloudera site goes through a nice example. Another way, and again something I plan to cover in this series on the blog, is to use something called “Hadoop Streaming” – the ability within MapReduce to “subcontract” the map and reduce parts of the operation to external programs or scripts, in this case a Python script that again queries the MaxMind database to do the IP-to-country lookup.
But surely it’d be easiest to just calculate the IP address integer and just join my existing Hive table to this GeoIP lookup table, and do it that way? Let’s start by trying to do this, first by modifying my final table design to include the IP address integer calculation defined on the MaxMind website:
CREATE TABLE apachelog_date_ip_split_parquet ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat" OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat" AS SELECT host, (cast(split(host,'\\.')[0] as bigint) * 16777216) + (cast(split(host,'\\.')[1] as bigint) * 65535) + (cast(split(host,'\\.')[2] as bigint) * 256) + (cast(split(host,'\\.')[3] as bigint)) ip_add_int, identity, user, substr(time,9,4) year, substr(time,5,3) month, substr(time,2,2) day, substr(time,14,2) hours, substr(time,17,2) secs, substr(time,20,2) mins, request, status, size, referer, agent FROM apachelog ;
Now I can query this from the Hive query editor, and I can see the IP address integer calculations that I can then use to match to the GeoIP IP address ranges.
I then upload the IP Address to Countries CSV file from the MaxMind site to HDFS, and define a Hive table over it like this:
create external table geo_lookup ( ip_start string, ip_end string, ip_int_start int, ip_int_end int, country_code string, country_name string ) row format DELIMITED FIELDS TERMINATED BY '|' LOCATION '/user/root/lookups/geo_ip';
Then I try some variations on the BETWEEN clause, in a SELECT with a join:
select a.host, l.country_name from apachelog_date_ip_split a join geo_lookup l on (a.ip_add_int > l.ip_int_start) and (a.ip_add_int < l.ip_int_end) group by a.host, l.country_name;
select a.host, l.country_name from apachelog_date_ip_split_parquet a join geo_lookup l on a.ip_add_int between l.ip_int_start and l.ip_int_end;
.. which all fail, because Hive only supports equi-joins. One option is to use a Hive UDF (user-defined function) such as this one here to implement a GeoIP lookup, but something that’s probably a bit more promising is to switch over to Impala, which has the ability to do non-equality joins through the crossjoin feature (Hive can in fact also use cross-joins, but they’re not very efficient). Impala also has the benefit of being much faster for BI-type queries than Hive, and it’s also designed to work with Parquet, so let’s switch over to the Impala query editor, run the “invalidate metadata” command to re-sync it’s table view with Hive’s table metastore, and then try the join in there:
Not bad. Of course this is all fairly simple stuff, and we’re still largely working with relational-style set-based transformations. In the next two posts in the series though I want get a bit more deep into Hadoop-style transformations – first by using a feature called “Hadoop Streaming” to process data on its way into Hadoop, done in parallel, by calling out to Python and Perl scripts; and then take a look at Pig, the more “procedural” alternative to Hive – with the objective being to enhance this current dataset to bring in details of the pages being requested, filter out the non-page requests, and do some work with authors, tag and clickstream analysis.