Tag Archives: Oracle Big Data Appliance

End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.4 : Transforming Data using Python & Hive Streaming

This week I’m taking an in-depth look at ETL on the Oracle Big Data Appliance, using Oracle Data Integrator 12c to call the various bits of Hadoop functionality and orchestrate the whole process. So far, I’ve landed web log data into the Hadoop cluster using Flume, created a Hive table over the log data using Hive and the RegEx SerDe, then used further Hive transformations to join this data to other reference data, some of which came from an Oracle database via Sqoop. Here’s a complete listing of the posts so far, and the ones to come:

Our next transformation is a bit trickier though. The data we’re bringing in from our webserver has IP addresses recorded for each HTTP access, and what we want to do is use that IP address to identify the country that the website visitor was located in. Geocoding, as it’s called, is fairly easy to do if your data is stored in a database such as Oracle Database or you’re using a language like Python, with two steps needed to retrieve the IP address’s country:

1. First convert the IP address into a single integer, like this:

address = '174.36.207.186'
( o1, o2, o3, o4 ) = address.split('.')
integer_ip =   ( 16777216 * o1 )
             + (    65536 * o2 )
             + (      256 * o3 ) 
           +              o4

2. Then, using a freely-downloadable database from MaxMind, look to see which range of numbers your converted IP address is in:

SELECT ip_country
FROM geoip
WHERE 2921648058 BETWEEN begin_ip_num AND end_ip_num
LIMIT 1

The only problem is that Hive doesn’t support anything other than equi-joins. In a previous blog post I got around this issue by using Impala rather than Hive, and in another one I used Pig, and a custom Python UDF, to do the geocoding outside of the main Pig data flow. I could use Pig and a Python UDF in this situation, creating an ODI Procedure to call Pig from the command-line, but instead I’ll use another ODI KM, IKM Hive Transform, to call a Python script but entirely controlled from within ODI.

To set this geocoding process up, I first create a new ODI mapping, with my current Hive table as the source, and another Hive table this time with an additional column for the IP address country, as the target. I then map what columns I can into the target, leaving just the country column without a mapping. Notice I’ve put the source table within an ODI12c dataset – this isn’t mandatory, it’s just something I decided to do at the time – it doesn’t impact on the rest of the process.

NewImage

Now in a similar way to how I created the Python UDF to do the country lookup for Pig, I’ll create a variation of that script that expects columns of data as an input, outputs other columns of data, and does the geocoding within the script by calling the Python GeoIP API provided by MaxMind. Most importantly though, I’ll use it in conjunction with IKM Hive Transform’s ability to stream it’s Hive data through an arbitrary script, making use of the Hive TRANSFORM function and a feature called “hive streaming”. To keep things simple let’s first set it up outside of the KM, see how it works, and then configure the KM to use this python script as its transformation function.

Let’s start by looking at the source Hive table I’ll be starting with:

hive> describe access_per_post_categories;
OK
hostname           string             None                
request_date       string             None                
post_id            string             None                
title              string             None                
author             string             None                
category           string             None                
Time taken: 0.112 seconds, Fetched: 6 row(s)
hive> select * from access_per_post_categories limit 5;
OK
137.254.4.12[02/Jun/2014:21:20:41 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:20:42 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:20:43 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:20:45 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:21:23 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
Time taken: 0.976 seconds, Fetched: 5 row(s)

The target table is exactly the same, except for the additional column for country:

hive> describe access_per_post_full;
OK
hostname           string             None                
request_date       string             None                
post_id            string             None                
title              string             None                
author             string             None                
country            string             None                
category           string             None                
Time taken: 0.098 seconds, Fetched: 7 row(s)

The way IKM Hive Transform works is by streaming each incoming row of Hive data to the script registered with it, with each row’s columns tab-separated. In-turn, it expects the script to output similar rows of tab-separated values, each line terminated with a newline. My script, therefore, needs to accept an arbitrary number of incoming rows, parse each row into its constituent columns, call the MaxMind API to do the geocoding, and then output a tab-separated set of columns for each row it processes (a bit like pipelined table functions in Oracle PL/SQL).

So here’s the Python script that does this:

[root@bdanode1 tmp]# cat add_countries.py
#!/usr/bin/python
import sys
sys.path.append('/usr/lib/python2.6/site-packages/')
import pygeoip
gi = pygeoip.GeoIP('/tmp/GeoIP.dat')
for line in sys.stdin:
  line = line.rstrip()
  hostname,request_date,post_id,title,author,category = line.split('\t')
  country = gi.country_name_by_addr(hostname)
  print hostname+'\t'+request_date+'\t'+post_id+'\t'+title+'\t'+author+'\t'+country+'\t'+category

A few things to note about the script:

1. “Import sys” brings in the Python libraries that then allows me to read the input data, via sys.stdin.
2. I’ve previously installed the MaxMind GeoIP API, database and libraries, with the GeoIP.dat database stored alongside the script in /tmp. Import pygeoip brings in the Python library to use the API.
3. The bit that does the geocoding is “gi.country_name_by_addr”, which calls the MaxMind API. It’s this bit that replaces the need for a join with a BETWEEN clause in HiveQL.
4. To output data back to the calling Hive transformation, I just “print” it, using “\t” for tab.

One thing we’ll need to do though, similar to how we had to copy the JAR file used by Hive for the RegEx SerDe around all the nodes in the cluster, Is copy the GeoIP.dat file used by the MaxMind geocoding API to the same place on all the cluster nodes. We’ll also have to set the permissions on this file so it’s readable and writeable by the Hive/YARN process on each Hadoop node:

officeimac:~ markrittman$ ssh root@bdanode2.rittmandev.com
The authenticity of host 'bdanode2.rittmandev.com (192.168.2.231)' can't be established.
RSA key fingerprint is 1b:e7:ec:01:57:0c:09:4e:6d:19:08:a4:0d:df:00:e0.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'bdanode2.rittmandev.com' (RSA) to the list of known hosts.
root@bdanode2.rittmandev.com's password: 
Last login: Fri Jun  6 16:55:44 2014 from 192.168.2.201
[root@bdanode2 ~]# ls /tmp/Geo* -l
-rwxrwxrwx 1 oracle oracle 687502 Jun  7 12:31 /tmp/GeoIP.dat

I also need to make sure the MaxMind geocoding Python API is installed into each node too:

wget https://raw.github.com/pypa/pip/master/contrib/get-pip.py
python get-pip.py pip 
install pygeoip

Now I can run a HiveQL command and test out the script. I’ll start up a Hive shell session, and the first thing I need to do is register the script with Hive, distributing across the cluster and putting it in cache. Then I can select against the source table, using the TRANSFORM USING clause:

[oracle@bdanode1 ~]$ hive
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
14/06/09 20:48:01 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.
 
Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-0.12.0-cdh5.0.1.jar!/hive-log4j.properties
hive> add file x-marsedit-filelocal:///tmp/add_countries.py;                                
Added resource: x-marsedit-filelocal:///tmp/add_countries.py
hive> select transform (hostname,request_date,post_id,title,author,category)
    > using 'add_countries.py'                                              
    > as (hostname,request_date,post_id,title,author,category,country)      
    > from access_per_post_categories; 
...
63.73.199.69[02/Jun/2014:21:19:28 +0000] 14726 SmartView as the Replacement for BI Office with OBIEE 11.1.1.7 Mark Rittman United States Oracle EPM
77.125.81.239[02/Jun/2014:21:18:58 +0000] 14476 Upgrading OBIEE to 11.1.1.7 Robin Moffatt Israel Oracle BI Suite EE
77.125.81.239[02/Jun/2014:21:18:59 +0000] 14476 Upgrading OBIEE to 11.1.1.7 Robin Moffatt Israel Oracle BI Suite EE
Time taken: 27.645 seconds, Fetched: 176 row(s)

So it looks like it’s working. Let’s move back to ODI now and reference the script within the IKM Hive Transform KM settings. Note that I enter the name and filesystem location of the script in the KM settings but I don’t use the TRANSFORM_SCRIPT option to directly key the script contents into the KM; this should work, and if you do so the KM will write the script contents out to the agent’s host filesystem at the start of the process, but the KM in this ODI12c release has an issue parsing these keyed-in scripts, so you’re best leaving this setting empty and the KM will just use the file already in the filesystem.

NewImage

After running the mapping, I then go over to the Operator navigator and see that it’s run successfully. Looking at the code the KM generates, I can see it referencing our script in the HiveQL statement, with the script outputting the extra country column that’s then loaded into the target table along with the other columns.

NewImage

Then finally, checking the target table, I can see that each row in my final Hive table has the country name alongside each log file entry, along with the IP address and other details.

So – now we’re at the stage where we’ve finished the processing with the Hadoop cluster, and I’d like to copy the final set of data into an Oracle database, so that I can analyse it using Oracle SQL and other tools. In the final installment in this series we’ll do just that, using IKM File/Hive to Oracle (OLH/ODCH) and Oracle Loader for Hadoop, one of the Oracle Big Data Connectors.

End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.3 : Enhance with Oracle Reference Data via Sqoop, and CKMs

In the first two posts in this series, I used the software on the Oracle Big Data Appliance 3.0 to ingest web log data from the Rittman Mead blog server, parse and load that data into a Hive table, and then join that table to another to add details on post and author. Links to the post in this series are below, and I’ll complete them as the series is posted this week:

In this next step, I’ve got some data sitting in an Oracle database that I’d like to use to enhance the data in the Hadoop cluster; as a reminder, the diagram below shows how I’m looking to move data through the system, and we’re currently at step number three:

NewImage

The data in the Oracle tables contains categories that I can join to the page entries that have come through the ETL process, and the join is pretty simple: POST_ID = POST_ID; but how do I get access to the Oracle data – I need to register the Oracle source in the ODI topology and create a model to represent the table, but can I then just join that table to the Hive table and load the results into another Hive table? Let’s try.

The screenshot below shows the mapping that would use this approach, with two Oracle tables joining to the Hive table and another Hive table as the target; on the right-hand side of the screen is the join condition, across both sources:

NewImage

The Execution view shows how ODI intends to run the transformation, with LKM SQL to SQL KMs used to load the Oracle tables and stage their data in Hive, and then IKM Hive Control Append used to load the joined dataset into another Hive table.

NewImage

If you try and execute the mapping though, it fails – because the LKM SQL to SQL KM can’t work with Hive tables yet.

NewImage

In fact. the various IKMs that come with Oracle Data Integrator Application Adapter for Hadoop (IKM file to Hive, IKM Hive Control Append etc) are a bit of a mix of IKMs and LKMs in that they contain extraction code, and integration code in the same KM, and none of the regular LKMs and IKMs will otherwise work with Hadoop sources. In fact, what we need to do with ODI at this point is actually land the Oracle data in Hive first, then do the join, which then begs the second question – how do we do that?

Currently, the only way to get Oracle data in to Hadoop is via IKM File to Hive (LOAD DATA), which involves an unnecessary extra step of exporting the Oracle data to a file, and then loading that file into HDFS and Hive. What we can do though is use sqoop, a tool within Hadoop to extract from, and load into, relational databases, something I covered on the blog a few weeks ago. Sqoop creates data loading and unloading jobs that run in parallel on the Hadoop cluster, and can use native JDBC drivers or even additional plugins such as Oraoop to make the process run faster (though Oracle Loader for Hadoop is considered the fastest way to unload to Oracle, if you’ve licensed the Big Data Adapters).

The only problem is that there’s no official support for sqoop in ODI, so no KMs that make use of it. What you can do though is create a command-line script to run sqoop and include that in an ODI procedure, which is what I’ll now do to bring in my Oracle data into Hive. To do this, I create an ODI procedure and add a single task, using the Operating System (command-shell) technology type, and use it to tell Sqoop to create me a Hive table based on an SQL SELECT statement against my Oracle database:

NewImage

(note that I had to format the sqoop command, in practice, as one line, to get it to run – the above listing is so that you can see all of the code.)

Executing this procedure works OK, and thereafter I’ve got a single Hive table containing the joined dataset from Oracle.

NewImage

Joining this new Hive table to the previous one containing the distinct set of page views is then fairly straightforward, but something I’d also like to do is stop any entries going into the rest of the ETL process where the calling IP address is a test one we use, “63.73.199.69″. The way I can do this is to use the CKM Hive knowledge module and put a constraint on the hostname in the table I’ll be loading from the join, so that I can then use ODI’s flow control feature to divert those rows to an error table.

NewImage

I also need to define a primary key for this table, something that’s mandatory when flow control is used. So let’s put the mapping together, joining the table I just brought in from Sqoop with the latest version of the weblog entries Hive table, loading into the Hive table I’ve just enabled the constraint for:

NewImage

and then I enable flow control, and the CKM Hive check knowledge module, in the Physical mapping settings.

NewImage

This is of course one of the benefits of using ODI to do your Hadoop data loading – you’ve got access to the data quality and error handling features that come with the tool. Then, when I execute the mapping and check with the Operator navigator, I can see the error handling process running, and afterwards in Hue I can see the contents of the new error table, which now contains those log entries where my test IP address was used, removing them from the target Hive table where they’d ordinarily have gone.

NewImage

So that’s the third step in the ODI BDA ETL process complete. The next one’s a bit trickier though – I need to geocode the entries in the log table, assigning country names to each row based on where the IP address is located. More tomorrow.

End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.2 : Hive Table Joins, Aggregation and Loading

In this series of posts, I’m going to be looking at an end-to-end ETL process on Oracle Big Data Appliance, using Hadoop technologies to do the data manipulation and Oracle Data Integrator 12c to orchestrate the process. Over the five posts, I’ll be landing web server log data on the Hadoop cluster using Flume, loading that data into a Hive table, transforming and enhancing the dataset, and then loading the final dataset into an Oracle database using one of the Oracle Big Data Connectors. The first post in the list below has a schematic for the overall process, and over the rest of the week I’ll be adding the links in for the remaining posts in the series.

So in today’s step, I want to take the initial Hive table containing the full set of log data, and then do three things; first extract the page details out of the “request” column in the incoming Hive table, then join that to some reference data also in the BDA that’ll allow me to add details of the post author and title of the post, and finally aggregate and project just certain columns from the the dataset so I’ve got a Hive table of just page accesses by IP address, with the author and title details.

In fact this is the easiest out of the five steps to set up and bring together, as we’re just using basic ODI functionality, albeit with Hive tables rather than regular database tables. In the screenshot below you can see the incoming weblog hive table, with the individual columns split-out in the previous step via the RegEx SerDe, and I just join it to the table containing post and author details, apply a distinct on the join output and then load the results into a third Hive table.

NewImage

The join between the two tables is bit complex, because I need to extract just the URL details out of the request field (which also contains the transport method and other metadata), but its no different that joining two regular database tables. Also, I did need to check the Generate ANSI Syntax checkbox, as Hive expects table joins to be in this format rather than the “Oracle” format.

NewImage

Going over to the Physical part of the mapping, I set the KM to IKM Hive Control Append, turn on the feature to enable table truncation prior to load, and I’m ready to go.

NewImage

Then, when I execute the mapping, I can see its progress in the Operator navigator, and the code view shows me the HiveQL commands that the KM has generated to move data around the BDA.

NewImage

So far so good, and as I said, this was the easy bit. For the next transformation I want to bring in some additional reference data from an Oracle database, so the question for me is whether I need to land that data into the BDA before doing the transformation, or whether I can create a join between the Oracle table and my Hive table? Check back tomorrow for the next installment…

Rittman Mead at the Oracle Virtual Technology Summit 2014 – “The Architecture of Analytics: Big Time Big Data and Business Intelligence”

I’m pleased to announce that I’ll be presenting at the Oracle Technology Network (OTN) Virtual Technology Summit, an online event running over three days in early July. I’m speaking as part of the Middleware track, entitled “The Architecture of Analytics: Big Time Big Data and Business Intelligence”, along with fellow Oracle ACE Director Kevin McGinley, US BI Forum keynote speaker Richard Tomlinson, and by Tom Plunkett, Lead Author of the Oracle Big Data Handbook. It’s free to attend, it’s running in three timezones over the three days, and here’s the track details and session abstracts:

The Architecture of Analytics: Big Data and Business Intelligence

“More than just another hyped-up technological buzzword,  Big Data represents a dramatic shift not just in the amount of data to be dealt with, but also in how business intelligence is extracted from that data in order to inform critical business decisions. This OTN Virtual Technology Summit track will present a solution architect’s perspective on how business intelligence products in Oracle’s Fusion Middleware family and beyond fit into an effective big data architecture, and present insight and expertise from Oracle ACE Directors specializing in business Intelligence to help you meet your big data business intelligence challenges. Technologies covered in this track include Oracle Big Data Appliance, Oracle Data Integrator, Oracle Business Intelligence Enterprise Edition, Oracle Endeca, Oracle Data Warehouse, and Oracle Big Data Connectors, along with Hadoop, Hive, NoSQL, and MapReduce.”

Session Abstracts

1.Oracle Big Data Appliance Case Study: Using Big Data to Analyze Cancer-Genome Relationships [30:00]
by Tom Plunkett, Lead Author of the Oracle Big Data Handbook

This presentation takes a deep technical dive into the use of the Oracle Big Data Appliance in a project for the National Cancer Institute’s Frederick National Laboratory for Cancer Research.  The Frederick National Laboratory and the Oracle team won several awards for analyzing relationships between genomes and cancer subtypes with big data, including the 2012 Government Big Data Solutions Award, the 2013 Excellence.Gov Finalist for Innovation, and the 2013 ComputerWorld Honors Laureate for Innovation.

2.Getting Value from Big Data Variety [30:00]
by Richard Tomlinson, Director, Product Management, Oracle

Big data variety implies big data complexity. Performing analytics on diverse data typically involves mashing up structured, semi-structured and unstructured content. So how can we do this effectively to get real value? How do we relate diverse content so we can start to analyze it? This session looks at how we approach this tricky problem using Endeca Information Discovery.

3.How To Leverage Your Investment In Oracle Business Intelligence Enterprise Edition Within A Big Data Architecture [30:00]
By Oracle ACE Director Kevin McGinley

More and more organizations are realizing the value Big Data technologies contribute to the return on investment in Analytics.  But as an increasing variety of data types reside in different data stores, organizations are finding that a unified Analytics layer can help bridge the divide in modern data architectures.  This session will examine how you can enable Oracle Business Intelligence Enterprise Edition (OBIEE) to play a role in a unified Analytics layer and the benefits and use cases for doing so.

4.Oracle Data Integrator 12c As Your Big Data Data Integration Hub [90:00]
by Oracle ACE Director Mark Rittman

Oracle Data Integrator 12c (ODI12c), as well as being able to integrate and transform data from application and database data sources, also has the ability to load, transform and orchestrate data loads to and from Big Data sources. In this session, we’ll look at ODI12c’s ability to load data from Hadoop, Hive, NoSQL and file sources, transform that data using Hive and MapReduce processing across the Hadoop cluster, and then bulk-load that data into an Oracle Data Warehouse using Oracle Big Data Connectors.

We will also look at how ODI12c enables ETL-offloading to a Hadoop cluster, with some tips and techniques on real-time capture into a Hadoop data reservoir and techniques and limitations when performing ETL on big data sources.

Registration is free, and the form for all three events is online here. Look out for a preview of my big data ETL on BDA session over the next five days, and hopefully you’ll be able to make the VTS event and hear me talk about it in-person.

Preview of Maria Colgan, and Andrew Bond/Stewart Bryson Sessions at RM BI Forum 2014

We’ve got a great selection of presentations at the two upcoming Rittman Mead BI Forum 2014 events in Brighton and Atlanta, including sessions on Endeca, TimesTen, OBIEE (of course), ODI, GoldenGate, Essbase and Big Data (full timetable for both events here). Two of the sessions I’m particularly looking forward to though are ones by Maria Colgan, product manager for the new In-Memory Option for Oracle Database, and another by Andrew Bond and Stewart Bryson, on an update to Oracle’s reference architecture for Data Warehousing and Information Management.

The In-Memory Option for Oracle Database was of course the big news item from last year’s Oracle Openworld, promising to bring in-memory analytics and column-storage to the Oracle Database. Maria is of course well known to the Oracle BI and Data Warehousing community through her work with the Oracle Database Cost-Based Optimizer, so we’re particular glad to have her at the Atlanta BI Forum 2014 to talk about what’s coming with this new feature. I asked Maria to jot down a few worlds for the blog on what she’ll be covering, so over to Maria:


NewImage“Given this announcement and the performance improvements promised by this new functionality is it still necessary to create a separate access and performance layer in your data warehouse environment or to run  your Oracle data warehouse  on an Exadata environment?“At Oracle Open World last year, Oracle announced the upcoming availability of the Oracle Database In-Memory option, a solution for accelerating database-driven business decision-making to real-time. Unlike specialized In-Memory Database approaches that are restricted to particular workloads or applications, Oracle Database 12c leverages a new in-memory column store format to speed up analytic workloads.

This session explains in detail how Oracle Database In-Memory works and will demonstrate just how much performance improvements you can expect. We will also discuss how it integrates into the existing Oracle Data Warehousing Architecture and with an Exadata environment.”

The other session I’m particularly looking forward to is one being delivered jointly by Andrew Bond, who heads-up Enterprise Architecture at Oracle and was responsible along with Doug Cackett for the various data warehousing, information management and big data reference architectures we’ve covered on the blog over the past few years, including the first update to include “big data” a year or so ago.

NewImage

Back towards the start of this year, Stewart, myself and Jon Mead met up with Andrew and his team to work together on an update to this reference architecture, and Stewart carried on with the collaboration afterwards, bringing in some of our ideas around agile development, big data and data warehouse design into the final architecture. Stewart and Andrew will be previewing the updated reference architecture at the Brighton BI Forum event, and in the meantime, here’s a preview from Andrew:

“I’m very excited to be attending the event and unveiling Oracle’s latest iteration of the Information Management reference architecture. In this version we have focused on a pragmatic approach to “Analytics 3.0″ and in particular looked at bringing an agile methodology to break the IT / business barrier. We’ve also examined exploitation of in-memory technologies and the Hadoop ecosystem and guiding the plethora of new technology choices.

We’ve worked very closely with a number of key customers and partners on this version – most notably Rittman Mead and I’m delighted that Stewart and I will be able to co-present the architecture and receive immediate feedback from delegates.”

Full details of the event, running in Brighton on May 7-9th 2014 and Atlanta, May 15th-17th 2014, can be found on the Rittman Mead BI Forum 2014 homepage, and the agendas for the two days are on this blog post from earlier in the week.