Tag Archives: Big Data
Using HBase and Impala to Add Update and Delete Capability to Hive DW Tables, and Improve Query Response Times
One of our customers is looking to offload part of their data warehouse platform to Hadoop, extracting data out of a source system and loading it into Apache Hive tables for subsequent querying using OBIEE11g. One of the challenges that the project faces though is how to handle updates to dimensions (and in their case, fact table records) when HDFS and Hive are typically append-only filesystems; ideally writes to fact tables should only require INSERTs and filesystem appends but in this case they wanted to use an accumulating fact snapshot table, whilst the dimension tables all used SCD1-type attributes that had their values overwritten when updates to those values came through from the source system.
The obvious answer then was to use Apache HBase as part of the design, a NoSQL database that sits over HDFS but allows updates and deletes to individual rows of data rather than restricting you just to append/inserts. I covered HBase briefly on the blog a few months ago when we used it to store webserver log entries brought into Hadoop via Flume, but in this case it makes an ideal landing point for data coming into our Hadoop system as we can maintain a current-state record of the data brought into the source system updating and overwriting values if we need to. What was also interesting to me though was how well we could integrate this HBase data into our mainly SQL-style data processing; how much Java I’d have to use to work with HBase, and whether we could get OBIEE to connect to the HBase tables and query them directly (with a reasonable response time). In particular, could we use the Hive-on-HBase feature to create Hive tables over the HBase ones, and then query those efficiently using OBIEE, so that the data flow looked like this?
To test this idea out, I took the Flight Delays dataset from the OBIEE11g SampleApp & Exalytics demo data [PDF] and created four HBase tables to hold the data from them, using the BigDataLite 4.1 VM and the HBase Shell. This dataset has four tables:
- FLIGHT_DELAYS – around 220m US flight records listing the origin airport, destination airport, carrier, year and a bunch of metrics (flights, late minutes, distance etc)
- GEOG_ORIGIN – a list of all the airports in the US along with their city, state, name and so on
- GEOG_DEST – a copy of the GEOG_ORIGIN table, used for filtering and aggregating on both origin and destination
- CARRIERS – a list of all the airlines associated with flights in the FLIGHT_DELAYS table
HBase is a NoSQL, key/value-store database where individual rows have a key, and then one or more column families made up of one or more columns. When you define a HBase table you only define the column families, and the data load itself creates the columns within them in a similar way to how the Endeca Server holds “jagged” data – individual rows might have different columns to each other and like MongoDB you can define a new column just by loading it into the database.
Using the HBase Shell CLI on the BigDataLite VM I therefore create the HBase tables using just these high-level column family definitions, with the individual columns within the column families to be defined later when I load data into them.
hbase shell create 'carriers','details' create 'geog_origin','origin' create 'geog_dest','dest' create 'flight_delays','dims','measures'
To get data into HBase tables there’s a variety of methods you can use. Most probably for the full project we’ll write a Java application that uses the HBase client to read, write, update and delete rows that are read in from the source application (see this previous blog post for an example where we use Flume as the source), or to set up some example data we can use the HBase Shell and enter the HBase row/cell values directly, like this for the geog_dest table:
put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles' put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles' put 'geog_dest','LAX','dest:city','Los Angeles, CA' put 'geog_dest','LAX','dest:state','California' put 'geog_dest','LAX','dest:id','12892'
and you can then use the “scan” command from the HBase shell to see those values stored in HBase’s key/value store, keyed on LAX as the key.
hbase(main):015:0> scan 'geog_dest' ROW COLUMN+CELL LAX column=dest:airport_name, timestamp=1432067861347, value=Los Angeles, CA: Los Angeles LAX column=dest:city, timestamp=1432067861375, value=Los Angeles, CA LAX column=dest:id, timestamp=1432067862018, value=12892 LAX column=dest:state, timestamp=1432067861404, value=California 1 row(s) in 0.0240 seconds
For testing purposes though we need a large volume of rows and entering them all in by-hand isn’t practical, so this is where we start to use the Hive integration that now comes with HBase. For the BigDataLite 4.1 VM all you need to do to get this working is install the hive-hbase package using yum (after first installing the Cloudera CDH5 repo into /etc/yum.repos.d), load the relevant JAR files when starting your Hive shell session, and then create a Hive table over the HBase table mapping Hive columns to the relevant HBase ones, like this:
hive ADD JAR /usr/lib/hive/lib/zookeeper.jar; ADD JAR /usr/lib/hive/lib/hive-hbase-handler.jar; ADD JAR /usr/lib/hive/lib/guava-11.0.2.jar; ADD JAR /usr/lib/hive/lib/hbase-client.jar; ADD JAR /usr/lib/hive/lib/hbase-common.jar; ADD JAR /usr/lib/hive/lib/hbase-hadoop-compat.jar; ADD JAR /usr/lib/hive/lib/hbase-hadoop2-compat.jar; ADD JAR /usr/lib/hive/lib/hbase-protocol.jar; ADD JAR /usr/lib/hive/lib/hbase-server.jar; ADD JAR /usr/lib/hive/lib/htrace-core.jar; CREATE EXTERNAL TABLE hbase_carriers (key string, carrier_desc string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,details:carrier_desc") TBLPROPERTIES ("hbase.table.name" = "carriers"); CREATE EXTERNAL TABLE hbase_geog_origin (key string, origin_airport_name string, origin_city string, origin_state string, origin_id string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,origin:airport_name,origin:city,origin:state,origin:id") TBLPROPERTIES ("hbase.table.name" = "geog_origin"); CREATE EXTERNAL TABLE hbase_geog_dest (key string, dest_airport_name string, dest_city string, dest_state string, dest_id string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,dest:airport_name,dest:city,dest:state,dest:id") TBLPROPERTIES ("hbase.table.name" = "geog_dest"); CREATE EXTERNAL TABLE hbase_flight_delays (key string, year string, carrier string, orig string, dest string, flights tinyint, late tinyint, cancelled bigint, distance smallint ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,dims:year,dims:carrier,dims:orig,dims:dest,measures:flights,measures:late,measures:cancelled,measures:distance") TBLPROPERTIES ("hbase.table.name" = "flight_delays");
Bulk loading data into these Hive-on-HBase tables is then just a matter of loading the source data into a regular Hive table, and then running INSERT INTO TABLE … SELECT commands to copy the regular Hive rows into the HBase tables via their Hive metadata overlays:
insert into table hbase_carriers select carrier, carrier_desc from carriers; insert into table hbase_geog_origin select * from geog_origin; insert into table hbase_geog_dest select * from geog_dest; insert into table hbase_flight_delays select row_number() over (), * from flight_delays;
Note that I had to create a synthetic sequence number key for the fact table, as the source data for that table doesn’t have a unique key for each row – something fairly common for data warehouse fact table datasets. In fact storing fact table data into a HBase table is not a very good idea for a number of reasons that we’ll see in a moment, and bear-in-mind that HBase is designed for sparse datasets and low-latency inserts and row retrievals so don’t read too much into this approach yet.
So going back to the original reason for using HBase to store these tables, updating rows within them is pretty straightforward. Taking the geog_origin HBase table at the start, if we get the row for SFO at the start using a Hive query over the HBase table, it looks like this:
hive> select * from hbase_geog_origin where key = 'SFO'; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator ... SFO San Francisco, CA: San Francisco San Francisco, CA California 14771 Time taken: 29.126 seconds, Fetched: 1 row(s)
To update that row and others, I can load a new data file into the Hive table using HiveQL’s LOAD DATA command, or INSERT INTO TABLE … SELECT from another Hive table containing the updates, like this:
insert into table hbase_geog_origin select * from origin_updates;
To check that the value has in-fact updated I can either run the same SELECT query against the Hive table over the HBase one, or drop into the HBase shell and check it there:
hbase(main):001:0> get 'geog_origin','SFO' COLUMN CELL origin:airport_name timestamp=1432050681685, value=San Francisco, CA: San Francisco International origin:city timestamp=1432050681685, value=San Francisco, CA origin:id timestamp=1432050681685, value=14771 origin:state timestamp=1432050681685, value=California 4 row(s) in 0.2740 seconds
In this case the update file/Hive table changed the SFO airport name from “San Francisco” to “San Francisco International”. I can change it back again using the HBase Shell like this, if I want:
put 'geog_origin','SFO','origin:airport_name','San Francisco, CA: San Francisco'
and then checking it again using the HBase Shell’s GET command on that key value shows it’s back to the old value – HBase actually stores X number of versions of each cell with a timestamp for each version, but by default it shows you the current one:
hbase(main):003:0> get 'geog_origin','SFO' COLUMN CELL origin:airport_name timestamp=1432064747843, value=San Francisco, CA: San Francisco origin:city timestamp=1432050681685, value=San Francisco, CA origin:id timestamp=1432050681685, value=14771 origin:state timestamp=1432050681685, value=California 4 row(s) in 0.0130 seconds
So, so far so good. We’ve got a way of storing data in Hive-type tables on Hadoop and a way of updating and amending records within them by using HBase as the underlying storage, but what are these tables like to query? Hive-on-HBase tables with just a handful of HBase rows return data almost immediately, for example when I create a copy of the geog_dest HBase table and put just a single row entry into it, then query it using a Hive table over it:
hive> select * from hbase_geog_dest2; OK LAXLos Angeles, CA: Los AngelesLos Angeles, CACalifornia12892 Time taken: 0.257 seconds, Fetched: 1 row(s)
Hive in this case even with a single row would normally take 30 seconds or more to return just that row; but when we move up to larger datasets such as the flight delays fact table itself, running a simple row count on the Hive table and then comparing that to the same query running against the Hive-on-HBase version shows a significant time-penalty for the HBase version:
hive> select sum(cast(flights as bigint)) as flight_count from flight_delays; Total jobs = 1 Launching Job 1 out of 1 ... Total MapReduce CPU Time Spent: 7 seconds 670 msec OK 29483653 Time taken: 37.327 seconds, Fetched: 1 row(s)
compared to the Hive-on-HBase version of the fact table:
hive> select sum(cast(flights as bigint)) as flight_count from hbase_flight_delays; Total jobs = 1 Launching Job 1 out of 1 ... Total MapReduce CPU Time Spent: 1 minutes 19 seconds 240 msec OK 21473738 Time taken: 99.154 seconds, Fetched: 1 row(s)
And that’s to be expected; as I said earlier, HBase is aimed at low-latency single-row operations rather than full table scan, aggregation-type queries, so it’s not unexpected that HBase performs badly here, but the response time is even worse if I try and join the HBase-stored Hive fact table to one or more of the dimension tables also stored in HBase.
In our particular customer example though these HBase tables were only going to be loaded once-a-day, so what if we copy the current version of each HBase table row into a snapshot Hive table stored in regular HDFS storage, so that our data loading process looks like this:
and then OBIEE queries the snapshot of the Hive-on-HBase table joined to the dimension table still stored in HBase, so that the query side looks like this:
Let’s try it out by taking the original Hive table I used earlier on to load the hbase_flight_delays table. and join that to one of the Hive-on-HBase dimension tables; I’ll start first by creating a baseline response time by joining that source Hive fact table to the source Hive dimension table (also used earlier to load the corresponding Hive-on-HBase table):
select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from flight_delays f join geog_origin o on f.orig = o.origin and o.origin_state = 'California' group by o.origin_airport_name; ... OK 17638Arcata/Eureka, CA: Arcata 9146Bakersfield, CA: Meadows Field 125433Burbank, CA: Bob Hope ... 1653Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field Time taken: 43.896 seconds, Fetched: 27 row(s)
So that’s just under 44 seconds to do the query entirely using regular Hive tables. So what if I swap-out the regular Hive dimension table for the Hive-on-HBase version, how does that affect the response time?
hive> select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from flight_delays f > join hbase_geog_origin o on f.orig = o.key > and o.origin_state = 'California' > group by o.origin_airport_name; ... OK 17638Arcata/Eureka, CA: Arcata 9146Bakersfield, CA: Meadows Field 125433Burbank, CA: Bob Hope ... 1653Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field Time taken: 51.757 seconds, Fetched: 27 row(s)
That’s interesting – even though we used the (updatable) Hive-on-HBase dimension table in the query, the response time only went up a few seconds to 51, compared to the 44 when we used just regular Hive tables. Taking it one step further though, what if we used Cloudera Impala as our query engine and copied the Hive-on-HBase fact table into a Parquet-stored Impala table, so that our inward data flow looked like this:
By using the Impala MPP engine – running on Hadoop but directly reading the underlying data files, rather than going through MapReduce as Hive does – and in-addition storing its data in column-store query-orientated Parquet storage, we can take advantage of OBIEE 11.1.1.9’s new support for Impala and potentially bring the query response time even further. Let’s go into the Impala Shell on the BigDataLite 4.1 VM, update Impala’s view of the Hive Metastore table data dictionary, and then create the corresponding Impala snapshot fact table using a CREATE TABLE … AS SELECT Impala SQL command:
[oracle@bigdatalite ~]$ impala-shell [bigdatalite.localdomain:21000] > invalidate metadata; [bigdatalite.localdomain:21000] > create table impala_flight_delays > stored as parquet > as select * from hbase_flight_delays;
Now let’s use the Impala Shell to join the Impala version of the flight delays table with data stored in Parquet files, to the Hive-on-HBase dimension table created earlier within our Hive environment:
[bigdatalite.localdomain:21000] > select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f > join hbase_geog_origin o on f.orig = o.key > and o.origin_state = 'California' > group by o.origin_airport_name; Query: select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f join hbase_geog_origin o on f.orig = o.key and o.origin_state = 'California' group by o.origin_airport_name +--------------+------------------------------------------------------------------+ | flight_count | origin_airport_name | +--------------+------------------------------------------------------------------+ | 31907 | Fresno, CA: Fresno Yosemite International | | 125433 | Burbank, CA: Bob Hope | ... | 1653 | Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field | +--------------+------------------------------------------------------------------+ Fetched 27 row(s) in 2.16s
Blimey – 2.16 seconds, compared to the best time of 44 seconds we go earlier when we just used regular Hive tables, let alone join to the dimension table stored in HBase. Let’s crank-it-up a bit and join another dimension table in, filtering on both origin and destination values:
[bigdatalite.localdomain:21000] > select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f > join hbase_geog_origin o on f.orig = o.key > join hbase_geog_dest d on f.dest = d.key > and o.origin_state = 'California' > and d.dest_state = 'New York' > group by o.origin_airport_name; Query: select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f join hbase_geog_origin o on f.orig = o.key join hbase_geog_dest d on f.dest = d.key and o.origin_state = 'California' and d.dest_state = 'New York' group by o.origin_airport_name +--------------+-------------------------------------------------------+ | flight_count | origin_airport_name | +--------------+-------------------------------------------------------+ | 947 | Sacramento, CA: Sacramento International | | 3880 | San Diego, CA: San Diego International | | 4030 | Burbank, CA: Bob Hope | | 41909 | San Francisco, CA: San Francisco International | | 3489 | Oakland, CA: Metropolitan Oakland International | | 937 | San Jose, CA: Norman Y. Mineta San Jose International | | 41407 | Los Angeles, CA: Los Angeles International | | 794 | Ontario, CA: Ontario International | | 4176 | Long Beach, CA: Long Beach Airport | +--------------+-------------------------------------------------------+ Fetched 9 row(s) in 1.48s
Even faster. So that’s what we’ll be going with as our initial approach for the data loading and querying; load data into HBase tables as planned at the start, taking advantage of HBase’s CRUD capabilities but bulk-loading and initially reading the data using Hive tables over the HBase ones; but then, before we make the data available for querying by OBIEE, we copy the current state of the HBase fact table into a Parquet-stored Impala table, using Impala’s ability to work with Hive tables and metadata and create joins across both Impala and Hive tables, even when one of the Hive tables uses HBase as its underlying storage.
OBIEE 11.1.1.9 Now Supports HiveServer2 and Cloudera Impala
As you all probably know I’m a big fan of Oracle’s BI and Big Data products, but something I’ve been critical of is OBIEE11g’s lack of support for HiveServer2 connections to Hadoop clusters. OBIEE 11.1.1.7 supported Hive connections using the older HiveServer1 protocol, but recent versions of Cloudera CDH4 and CDH5 use the HiveServer2 protocol by default and OBIEE 11.1.1.7 wouldn’t connect to them; not unless you switched to the Windows version of OBIEE and used the Cloudera ODBC drivers instead, which worked but weren’t supported by Oracle.
OBIEE 11.1.1.9 addresses this issue by shipping more recent DataDirect ODBC drivers for Hive, that are compatible with the HiveServer2 protocol used by CDH4 and CDH5 (check out this other article by Robin on general new features in 11.1.1.9). Oracle only really support Hive connectivity for Linux installs of OBIEE, and the Linux version of OBIEE 11.1.1.9 comes with the DataDirect ODBC drivers already installed and configured for use, all you have to do then is set up the ODBC connection in the odbc.ini file on Linux and install the Cloudera Hive ODBC drivers on your Windows workstation for the Admin too (the Hive ODBC drivers that Oracle supply on MOS still look like the old HIveServer1 version, though I could be wrong). To check that it all worked on this new 11.1.1.9 version of OBIEE11g I therefore downloaded and installed the Windows Cloudera Hive ODBC drivers and set up the System DSN like this:
and set up a corresponding entry in the Linux OBIEE 11.1.1.9’s odbc.ini file, like this:
with the key thing being to make sure you have matching DSN names on both the Windows workstation (for the Admin tool initial datasource setup and table metadata import) and the Linux server (for the actual online connection to Hive from the BI Server, and subsequent data retrieval). One thing I did notice was that whilst I could connect to the Hive database server and set up the connection in the Admin tool, I couldn’t view any Hive tables and had to manually create them myself in the RPD Physical Layer – this could just be a quirk on my workstation install though so I wouldn’t read too much into it. Checking connectivity in the Admin tool then showed it connecting properly and retrieving data from Hive on the Hadoop cluster. I didn’t test Kerberos-authentication connections but I’m assuming it’d work, as the previous version of OBIEE 11.1.1.7 on Linux just failed at this point anyway. The docs are here if you’d like to look into any more details, or check the full set of setup steps.
For Cloudera Impala connections, you’re directed in the docs to download the Windows Cloudera Impala ODBC drivers as Oracle don’t even ship them on MOS, but again the Linux install of OBIEE 11.1.1.9 comes with DataDirect Impala drivers that are already setup and ready for use (note that if you upgrade from 11.1.1.7 to 11.1.1.9 rather than do the fresh install that I did for testing purposes, you’ll need to edit the opmn.xml file to register these updated DataDirect drivers). Then it’s a case of setting the Windows System DSN up for the initial metadata import, like this:
then creating a corresponding entry in the Linux server’s odbc.ini file, like this:
Note that the docs do mention the issue with earlier versions of Impala where the Impala server is expecting LIMIT clauses when using ORDER BY in Impala SQL queries, and gives a couple of workarounds to fix the issue and stop Impala expecting this clause; for more recent (CDH5+) versions of Impala this requirement is in-fact lifted and you can connect-to and use Impala without needing to make the configuration change mentioned in the doc (or use the workaround I mentioned in this earlier blog post). Checking connectivity in the Admin tool then shows the connection is making its way through OK, from the Windows environment to the Linux server’s ODBC connection:
and creating a quick report shows data returned as expected, and considerably quicker than with Hive.
As I said, I’ve not really tested either of these two connections using Kerberos or any edge-case setups, but connectivity seems to be working and we’re now in a position where OBIEE11g can properly connect to both Hive, and Impala, on recent CDH installs and of course the Oracle Big Data Appliance. Good stuff, now what about Spark SQL or ElasticSearch..?
Presentation Slides and Photos from the Rittman Mead BI Forum 2015, Brighton and Atlanta
It’s now the Saturday after the two Rittman Mead BI Forum 2015 events, last week in Atlanta, GA and the week before in Brighton, UK. Both events were a great success and I’d like to say thanks to the speakers, attendees, our friends at Oracle and my colleagues within Rittman Mead for making the two events so much fun. If you’re interested in taking a look at some photos from the two events, I’ve put together two Flickr photosets that you can access using the links below:
- Flickr Photoset from the Brighton Rittman Mead BI Forum 2015
- Flickr Photoset from the Atlanta Rittman Mead BI Forum 2015
We’ve also uploaded the presentation slides from the two events (where we’ve been given permission to share them) to our website, and you can download them including the Delivering the Oracle Information Management and Big Data Reference Architecture masterclass using the links below:
Delivering the Oracle Information Management & Big Data Reference Architecture (Mark Rittman & Jordan Meyer, Rittman Mead)
- Part 1 : Delivering the Discovery Lab (Jordan Meyer, Head of R&D at Rittman Mead)
- Part 2 : Delivering the Data Factory, Data Reservoir and a Scalable Oracle Big Data Architecture (Mark Rittman, CTO, Rittman Mead)
Brighton, May 7th and 8th 2015
- Steve Devine, Independent : “The Art and Science of Creating Effective Data Visualisations”
- Chris Royles, Oracle Corporation : “Big Data Discovery”
- Christian Screen, Sierra-Cedar : “10 Tenats for Making Your Oracle BI Applications Project Succeed Like a Boss”
- Emiel van Bockel, CB : “Watch and see 12c on Exalytics”
- Daniel Adams, Rittman Mead : “User Experience First: Guided information and attractive dashboard design”
- Robin Moffatt, Rittman Mead : “Data Discovery and Systems Diagnostics with the ELK stack”
- André Lopes / Roberto Manfredini, Liberty Global : “A Journey into Big Data and Analytics”
- Antony Heljula, Peak Indicators : “Predictive BI – Using the Past to Predict the Future”
- Gerd Aiglstorfer, G.A. itbs GmbH : “Driving OBIEE Join Semantics on Multi Star Queries as User”
- Manuel Martin Marquez, CERN – European Laboratory for Particle Physics, “Governed Information Discovery: Data-driven decisions for more efficient operations at CERN”
Atlanta, May 14th and 15th 2015
- Robin Moffatt, Rittman Mead : “Smarter Regression Testing for OBIEE”
- Mark Rittman : “Oracle Big Data Discovery Tips and Techniques from the Field”
- Hasso Schaap, Qualogy : “Developing strategic analytics applications on OBICS PaaS”
- Tim German / Cameron Lackpour, Qubix / CLSolve : “Hybrid Mode – An Essbase Revolution”
- Stewart Bryson, Red Pill Analytics, “Supercharge BI Delivery with Continuous Integration”
- Andy Rocha & Pete Tamisin, Rittman Mead : “OBIEE Can Help You Achieve Your GOOOOOOOOOALS!”
- Christian Screen, Sierra-Cedar : “10 Tenats for Making Your Oracle BI Applications Project Succeed Like a Boss”
- Sumit Sarkar, Progress Software : “Make sense of NoSQL data using OBIEE”
Congratulations also to Emiel van Bockel and Robin Moffatt who jointly-won Best Speaker award at the Brighton event, and to Andy Rocha and Pete Tamsin who won Best Speaker in Atlanta for their joint session. It’s time for a well-earned rest now and then back to work, and hopefully we’ll see some of you at KScope’15, Oracle Openworld 2015 or the UKOUG Tech and Apps 2015 conferences later in 2015.
So What’s the Real Point of ODI12c for Big Data Generating Pig and Spark Mappings?
Oracle ODI12c for Big Data came out the other week, and my colleague Jérôme Françoisse put together an introductory post on the new features shortly after, covering ODI’s new ability to generate Pig and Spark transformations as well as the traditional Hive ones. How this works is that you can now select Apache Pig, or Apache Spark (through pySpark, the Spark API through Python) as the implementation language for an ODI mapping, and ODI will generate one of those languages instead of HiveQL commands to run the mapping.
How this works is that ODI12c 12.1.3.0.1 adds a bunch of new component-style KMs to the standard 12c ones, providing filter, aggregate, file load and other features that generate pySpark and Pig code rather than the usual HiveQL statement parts. Component KMs have also been added for Hive as well, making it possible now to include non-Hive datastores in a mapping and join them all together, something it was hard to do in earlier versions of ODI12c where the Hive IKM expected to do the table data extraction as well.
But when you first look at this you may well be tempted to think “…so what?”, in that Pig compiles down to MapReduce in the end, just like Hive does, and you probably won’t get the benefits of running Spark for just a single batch mapping doing largely set-based transformations. To my mind where this new feature gets interesting is its ability to let you take existing Pig and Spark scripts, which process data in a different, dataflow-type way compared to Hive’s set-based transformations and which also potentially also use Pig and Spark-specific function libraries, and convert them to managed graphical mappings that you can orchestrate and run as part of a wider ODI integration process.
Pig, for example, has the LinkedIn-originated DataFu UDF library that makes it easy to sessionize and further transform log data, and the Piggybank community library that extends Pig’s loading and saving capabilities to additional storage formats, and provides additional basic UDFs for timestamp conversion, log parsing and so forth. We’ve used these libraries in the past to process log files from our blog’s webserver and create classification models to help predict whether a visitor will return, with the Pig script below using the DataFu and Piggybank libraries to perform these tasks easily in Pig.
register /opt/cloudera/parcels/CDH/lib/pig/datafu.jar; register /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar; DEFINE Sessionize datafu.pig.sessions.Sessionize('60m'); DEFINE Median datafu.pig.stats.StreamingMedian(); DEFINE Quantile datafu.pig.stats.StreamingQuantile('0.9','0.95'); DEFINE VAR datafu.pig.VAR(); DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO(); DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix(); -------------------------------------------------------------------------------- -- Import and clean logs raw_logs = LOAD '/user/flume/rm_logs/apache_access_combined' USING TextLoader AS (line:chararray); -- Extract individual fields logs_base = FOREACH raw_logs GENERATE FLATTEN (REGEX_EXTRACT_ALL(line,'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\S+) (\S+) "([^"]*)" "([^"]*)"')) AS (remoteAddr: chararray, remoteLogName: chararray, user: chararray, time: chararray, request: chararray, status: chararray, bytes_string: chararray, referrer:chararray, browser: chararray); -- Remove Bots and convert timestamp logs_base_nobots = FILTER logs_base BY NOT (browser matches '.*(spider|robot|bot|slurp|Bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*'); -- Remove uselesss columns and convert timestamp clean_logs = FOREACH logs_base_nobots GENERATE CustomFormatToISO(time,'dd/MMM/yyyy:HH:mm:ss Z') as time, remoteAddr, request, status, bytes_string, referrer, browser; -------------------------------------------------------------------------------- -- Sessionize the data clean_logs_sessionized = FOREACH (GROUP clean_logs BY remoteAddr) { ordered = ORDER clean_logs BY time; GENERATE FLATTEN(Sessionize(ordered)) AS (time, remoteAddr, request, status, bytes_string, referrer, browser, sessionId); }; -- The following steps will generate a tsv file in your home directory to download and work with in R store clean_logs_sessionized into '/user/jmeyer/clean_logs' using PigStorage('t','-schema');
If you know Pig (or read my previous articles on this theme), you’ll know that pig has the concept of an “alias”, a dataset you define using filters, aggregations, projections and other operations against other aliases, with a typical pig script starting with a large data extract and then progressively whittling it down to just the subset of data, and derived data, you’re interested in. When it comes to script execution, Pig only materializes these aliases when you tell it to store the results in permanent storage (file, Hive table etc) with the intermediate steps just being instructions on how to progressively arrive at the final result. Spark works in a similar way with its RDDs, transformations and operations which either create a new dataset based off of an existing one, or materialise the results in permanent storage when you run an “action”. So let’s see if ODI12c for Big Data can create a similar dataflow, based as much as possible on the script I’ve used above.
… and in-fact it can. The screenshot below shows the logical mapping to implement this same Pig dataflow, with the data coming into the mapping as a Hive table, an expression operator creating the equivalent of a Pig alias based off of a filtered, transformed version of the original source data using the Piggybank CustomFormatToISO UDF, and then runs the results of that through an ODI table function that in the background transforms the data using Pig’s GENERATE FLATTEN command and a call to the DataFu Sessionize UDF.
And this is the physical mapping to go with the logical mapping. Note that all of the Pig transformations are contained within a separate execution unit, that contains operators for the expression to transform and filter the initial dataset, and another for the table function.
The table function operator runs the input fields through an arbitrary Pig Latin script, in this case defining another alias to match the table function operator name and using the DataFu Sessionize UDF within a FOREACH to first sort, and then GENERATE FLATTEN the same columns but with a session ID for user sessions with the same IP address and within 60 seconds of each other.
If you’re interested in the detail of how this works and other usages of the new ODI12c for Big Data KMs, then come along to the masterclass I’m running with Jordan Meyer at the Brighton and Atlanta Rittman Mead BI Forums where I’ll go into the full details as part of a live end-to-end demo. Looking at the Pig Latin that comes out of it though, you can see it more or less matches the flow of the hand-written script and implements all of the key steps.
Finally, checking the output of the mapping I can see that the log entries have been sessionized and they’re ready to pass on to the next part of the classification model.
So that to my mind is where the value is in ODI generating Pig and Spark mappings. It’s not so much taking an existing Hive set-based mapping and just running it using a different language, it’s more about being able to implement graphically the sorts of data flows you can create with Pig and Spark, and being able to get access to the rich UDF and data access libraries that these two languages benefit from. As I said, come along to the masterclass Jordan and I are running, and I’ll go into much more detail and show how the mapping is set up, along with other mappings to create an end-to-end Hadoop data integration process.
Setting up Security and Access Control on a Big Data Appliance
Like all Oracle Engineered Systems, Oracle’s field servicing and Advanced Customer Services (ACS) teams go on-site once a BDA has been sold to a customer and do the racking, installation and initial setup. They will usually ask the customer a set of questions such as “do you want to enable Kerberos authentication”, “what’s the range of IP addresses you want to use for each of the network interfaces”, “what password do you want to use” and so on. It’s usually enough to get a customer going, but in-practice we’ve found most customers need a number of other things set-up and configured before they use the BDA in development and production; for example:
- Integrating Cloudera Manager, Hue and other tools with the corporate LDAP directory
- Setting up HDFS and SSH access for the development and production support team, so they can log in with their usual corporate credentials
- Come up with a directory layout and file placement strategy for loading data into the BDA, and then moving it around as data gets processed
- Configuring some sort of access control to the Hive tables (and sometimes HDFS directories) that users use to get access to the Hadoop data
- Devising a backup and recovery strategy, and thinking about DR (disaster recovery)
- Linking the BDA to other tools and products in the Oracle Big Data and Engineered Systems family; Exalytics, for example, or setting up ODI and OBIEE to access data in the BDA
The first task we’re usually asked to do is integrate Cloudera Manager, the web-based admin console for the Hadoop parts of the BDA, with the corporate LDAP server. By doing this we can enable users to log into Cloudera Manager with their usual corporate login (and restrict access to just certain LDAP groups, and further segregate users into admin ones and stop/start/restart services-type ones), and similarly allow users to log into Hue using their regular LDAP credentials. In my experience Cloudera Manager is easier to set up than Hue, but let’s look at a high-level at what’s involved.
LDAP Integration for Hue, Cloudera Manager, Hive etc
In our Rittman Mead development lab, we have OpenLDAP running on a dedicated appliance VM and a number of our team setup as LDAP users. We’ve defined four LDAP groups, two for Cloudera Manager and two for Hue, with varying degrees of access for each product.
Setting up Cloudera Manager is pretty straightforward, using the Administration > Settings menu in the Cloudera Manager web UI (note this option is only available for the paid, Cloudera Enterprise version, not the free Cloudera Express version). Hue security integration is configured through the Hue service menu, and again you can configure the LDAP search credentials, any LDAPS or certificate setup, and then within Hue itself you can define groups to determine what Hue features each set of users can use.
Where Hue is a bit more fiddly (last time I looked) is in controlling access to the tool itself; Cloudera Manager lets you explicitly define which LDAP groups can access the tool with other users then locked-out, but Hue either allows all authenticated LDAP users to login to the tool or makes you manually import each authorised user to grant them access (you can then either have Hue check-back to the LDAP server for their password each login, or make a copy of the password and store it within Hue for later use, potentially getting out-of-sync with their LDAP directory password version). In practice what I do is use the manual authorisation method but then have Hue link back to the LDAP server to check the users’ password, and then map their LDAP groups into Hue groups for further role-based access control. There’s a similar process for Hive and Impala too, where you can configure the services to authenticate against LDAP, and also have Hive use user impersonation so their LDAP username is passed-through the ODBC or JDBC connection and queries run as that particular user.
Configuring SSH and HDFS Access and Setting-up Kerberos Authentication
Most developers working with Hadoop and the BDA will either SSH (Secure Shell) into the cluster and work directly on one of the nodes, or connect into their workstation which has been configured as a Hadoop client for the BDA. If they SSH in directly to the cluster they’ll need Linux user accounts there, and if they go in via their workstation the Hadoop client installed there will grant them access as the user they’re logged-into the workstation as. On the BDA you can either set-up user accounts on each BDA node separately, or more likely configure user authentication to connect to the corporate LDAP and check credentials there.
One thing you should definitely do, either when your BDA is initially setup by Oracle or later on post-install, is configure your Hadoop cluster as a secure cluster using Kerberos authentication. Hadoop normally trusts that each user accessing Hadoop services via the Hadoop Filesystem API (FS API) is who they say they are, but using the example above I could easily setup an “oracle” user on my workstation and then access all Hadoop services on the main cluster without the Hadoop FS API actually checking that I am who I say I am – in other words the Hadoop FS API shell doesn’t check your password, it merely runs a “whoami” Linux command to determine my username and grants me access as them.
The way to address this is to configure the cluster for Kerberos authentication, so that users have to have a valid Kerberos ticket before accessing any secured services (Hive, HDFS etc) on the cluster. I covered this as part of an article on configuring OBIEE11g to connect to Kerberos-secured Hadoop clusters last Christmas and you can either do it as part of the BDA install, or later on using a wizard in more recent versions of CDH5, the Cloudera Hadoop distribution that the BDA uses.
The complication with Kerberos authentication is that your organization needs to have a Kerberos KDC (Key Distribution Center) server setup already, which will then link to your corporate LDAP or Active Directory service to check user credentials when they request a Kerberos ticket. The BDA installation routine gives you the option of creating a KDC as part of the BDA setup, but that’s only really useful for securing inter-cluster connections between services as it won’t be checking back to your corporate directory. Ideally you’d set up a connection to an existing, well-tested and well-understood Kerberos KDC server and secure things that way – but beware that not all Oracle and other tools that run on the BDA are setup for Kerberos authentication – OBIEE and ODI are, for example, but the current 1.0 version of Big Data Discovery doesn’t yet support Kerberos-secured clusters.
Coming-up with the HDFS Directory Layout
It’s tempting with Hadoop to just have a free-for-all with the Hadoop HDFS filesystem setup, maybe restricting users to their own home directory but otherwise letting them put files anywhere. HDFS file data for Hive tables typically goes in Hive’s own filesystem area /user/hive/warehouse, but users can of course create Hive tables over external data files stored in their own part of the filesystem.
What we tend to do (inspired by Gwen Shapira’a “Scaling ETL with Hadoop” presentation) is create separate areas for incoming data, ETL processing data and process output data, with developers then told to put shared datasets in these directories rather than their own. I generally create additional Linux users for each of these directories so that these can own the HDFS files and directories rather than individual users, and then I can control access to these directories using HDFS’s POSIX permissions. A typical user setup script might look like this:
[oracle@bigdatalite ~]$ cat create_mclass_users.sh sudo groupadd bigdatarm sudo groupadd rm_website_analysis_grp useradd mrittman -g bigdatarm useradd ryeardley -g bigdatarm useradd mpatel -g bigdatarm useradd bsteingrimsson -g bigdatarm useradd spoitnis -g bigdatarm useradd rm_website_analysis -g rm_website_analysis_grp echo mrittman:welcome1 | chpasswd echo ryeardley:welcome1 | chpasswd echo mpatel:welcome1 | chpasswd echo bsteingrimsson:welcome1 | chpasswd echo spoitnis:welcome1 | chpasswd echo rm_website_analysis:welcome1 | chpasswd
whilst a script to setup the directories for these users, and the application user, might look like this:
[oracle@bigdatalite ~]$ cat create_hdfs_directories.sh set echo on #setup individual user HDFS directories, and scratchpad areas sudo -u hdfs hadoop fs -mkdir /user/mrittman sudo -u hdfs hadoop fs -mkdir /user/mrittman/scratchpad sudo -u hdfs hadoop fs -mkdir /user/ryeardley sudo -u hdfs hadoop fs -mkdir /user/ryeardley/scratchpad sudo -u hdfs hadoop fs -mkdir /user/mpatel sudo -u hdfs hadoop fs -mkdir /user/mpatel/scratchpad sudo -u hdfs hadoop fs -mkdir /user/bsteingrimsson sudo -u hdfs hadoop fs -mkdir /user/bsteingrimsson/scratchpad sudo -u hdfs hadoop fs -mkdir /user/spoitnis sudo -u hdfs hadoop fs -mkdir /user/spoitnis/scratchpad #setup etl directories sudo -u hdfs hadoop fs -mkdir -p /data/rm_website_analysis/logfiles/incoming sudo -u hdfs hadoop fs -mkdir /data/rm_website_analysis/logfiles/archive/ sudo -u hdfs hadoop fs -mkdir -p /data/rm_website_analysis/tweets/incoming sudo -u hdfs hadoop fs -mkdir /data/rm_website_analysis/tweets/archive #change ownership of user directories sudo -u hdfs hadoop fs -chown -R mrittman /user/mrittman sudo -u hdfs hadoop fs -chown -R ryeardley /user/ryeardley sudo -u hdfs hadoop fs -chown -R mpatel /user/mpatel sudo -u hdfs hadoop fs -chown -R bsteingrimsson /user/bsteingrimsson sudo -u hdfs hadoop fs -chown -R spoitnis /user/spoitnis sudo -u hdfs hadoop fs -chgrp -R bigdatarm /user/mrittman sudo -u hdfs hadoop fs -chgrp -R bigdatarm /user/ryeardley sudo -u hdfs hadoop fs -chgrp -R bigdatarm /user/mpatel sudo -u hdfs hadoop fs -chgrp -R bigdatarm /user/bsteingrimsson sudo -u hdfs hadoop fs -chgrp -R bigdatarm /user/spoitnis #change ownership of shared directories sudo -u hdfs hadoop fs -chown -R rm_website_analysis /data/rm_website_analysis sudo -u hdfs hadoop fs -chgrp -R rm_website_analysis_grp /data/rm_website_analysis
Giving you a directory structure like this (with the directories for Hive, Impala, HBase etc removed for clarity)
In terms of Hive and Impala data, there’s varying opinions on whether to create tables as EXTERNAL and store the data (including sub-directories for table partitions) in the /data/ HDFS area or let Hive store them in its own /user/hive/warehouse area – I tend to let Hive store them within its area as I use Apache Sentry to then control access to those Tables’s data.
Setting up Access Control for HDFS, Hive and Impala Data
At its simplest level, access control can be setup on the HDFS directory structure by using HDFS’s POSIX security model:
- Each HDFS file or directory has an owner, and a group
- You can add individual Linux users to a group, but an HDFS object can only have one group owning it
What this means in-practice though is you have to jump through quite a few hoops to set up finer-grained access control to these HDFS objects. What we tend to do is set RW access to the /data/ directory and subdirectories to the application user account (rm_website_analysis in this case), and RO access to that user’s associated group (rm_website_analysis_grp). If users then want access to that application’s data we add them to the relevant application group, and a user can belong to more than one group, making it possible to grant access to more than one application data area
[oracle@bigdatalite ~]$ cat ./set_hdfs_directory_permissions.sh sudo -u hdfs hadoop fs -chmod -R 750 /data/rm_website_analysis usermod -G rm_website_analysis_grp mrittman
making it possible for the main application owner to write data to the directory, but group members only have read access. What you can also now do with more recent versions of Hadoop (CDH5.3 onwards, for example) is define access control lists to go with individual HDFS objects, but this feature isn’t enabled by default as it consumes more namenode memory than the traditional POSIX approach. What I prefer to do though is control access by restricting users to only accessing Hive and Impala tables, and using Apache Sentry, or Oracle Big Data SQL, to provide role-based access control over them.
Apache Sentry is a project originally started by Cloudera and then adopted by the Apache Foundation as an incubating project. It aims to provide four main authorisation features over Hive, Impala (and more recently, the underlying HDFS directories and datafiles):
- Secure authorisation, with LDAP integration and Kerberos prerequisites for Sentry enablement
- Fine-grained authorisation down to the column-level, with this feature provided by granting access to views containing subsets of columns at this point
- Role-based authorisation, with different Sentry roles having different permissions on individual Hive and Impala tables
- Multi-tenant administration, with a central point of administration for Sentry permissions
From this Cloudera presentation on Sentry on Slideshare, Sentry inserts itself into the query execution process and checks access rights before allowing the rest of the Hive query to execute. Sentry is configured through security policy files, or through a new web-based interface introduced with recent versions of CDH5, for example.
The other option for customers using Oracle Exadata,Oracle Big Data Appliance and Oracle Big Data SQL is to use the Oracle Database’s access control mechanisms to govern access to Hive (and Oracle) data, and also set-up fine-grained access control (VPD), data masking and redaction to create a more “enterprise” access control system.
So these are typically tasks we perform when on-boarding an Oracle BDA for a customer. If this is of interest to you and you can make it to either Brighton, UK next week or Atlanta, GA the week after, I’ll be covering this topic at the Rittman Mead BI Forum 2015 as part of the one-day masterclass with Jordan Meyer on the Wednesday of each week, along with topics such as creating ETL data flows using Oracle Data Integrator for Big Data, using Oracle Big Data Discovery for faceted search and cataloging of the data reservoir, and reporting on Hadoop and NoSQL data using Oracle Business Intelligence 11g. Spaces are still available so register now if you’d like to hear more on this topic.