Category Archives: Rittman Mead

Connecting OBIEE 11.1.1.9 to Hive, HBase and Impala Tables for a DW-Offloading Project

In two previous posts this week I talk about a client request to offload part of their data warehouse top Hadoop, taking data from a source application and loading it into Hive tables on Hadoop for subsequent reporting-on by OBIEE11g. In the first post I talked about hosting the offloaded data warehouse elements on Cloudera Hadoop CDH5.3, and how I used Apache Hive and Apache HBase to support insert/update/delete activity to the fact and dimension tables, and how we’d copy the Hive-on-HBase fact table data into optimised Impala tables stored in Parquet files to make sure reports and dashboards ran fast.

NewImage

In the second post I got into the detail of how we’d keep the Hive-on-HBase tables up-to-date with new and changed data from the source system, using HiveQL bulk-inserts to load up the initial table data and a Python script to handle subsequent inserts, updates and deletes by working directly with the HBase Client and the HBase Thrift Server. Where this leaves us at the end then is with a set of fact and dimension tables stored as optimised Impala tables and updatable Hive-on-HBase tables, and our final step is to connect OBIEE11g to it and see how it works for reporting.

NewImage

As I mentioned in another post a week or so ago, the new 11.1.1.9 release of OBIEE11g supports Cloudera Impala connections from Linux servers to Hadoop, with the Linux Impala drivers being shipped by Oracle as part of the Linux download and the Windows ones used for the Admin Tool workstation downloadable directly from Cloudera. Once you’ve got all the drivers and OBIEE software setup, it’s then just a case of setting up the ODBC connections on the Windows and Linux environments, and you should then be in a position to connect it all up.

NewImage

In the Impala side, I first need to create a copy of the Hive-on-HBase table I’ve been using to load the fact data into from the source system, after running the invalidate metadata command to refresh Impala’s view of Hive’s metastore.

[oracle@bigdatalite~]$impala-shell
 
[bigdatalite.localdomain:21000]>invalidate metadata;
 
[bigdatalite.localdomain:21000]>create table impala_flight_delays
                                >stored as parquet
                                >as select *from hbase_flight_delays;

Next I import the Hive-on-HBase and the Impala table through the Impala ODBC connection – even though only one of the tables (the main fact table snapshot copy) was created using Impala, I still get the Impala speed benefit for the other three tables created in Hive (against the HBase source, no less). Once the table metadata is imported into the RPD physical layer, I can then create a business model and subject area as I would do normally, so my final RPD looks like this:

NewImage

Now it’s just a case of saving the repository online and creating some reports. If you’re using an older version of Impala you may need to disable the setting where a LIMIT clause is needed for every GROUP BY (see the docs for more details, but recent (CDH5+) versions will work fine without this). Something you’ll also need to do back in Impala is compute statistics for each of the tables, like this:

[bigdatalite.localdomain:21000] > compute stats default.impala_flight_delays;
Query: compute stats default.impala_flight_delays
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 8 column(s). |
+-----------------------------------------+
Fetched 1 row(s) in 2.73s
[bigdatalite.localdomain:21000] > show table stats impala_flight_delays;
Query: show table stats impala_flight_delays
+---------+--------+---------+--------------+---------+-------------------+
| #Rows   | #Files | Size    | Bytes Cached | Format  | Incremental stats |
+---------+--------+---------+--------------+---------+-------------------+
| 2514141 | 1      | 10.60MB | NOT CACHED   | PARQUET | false             |
+---------+--------+---------+--------------+---------+-------------------+
Fetched 1 row(s) in 0.01s

Fetched 1 row(s) in 0.01s

Apart from being generic “good practice” and giving the Impala query optimizer better information to form a query plan with, you might hit the error below in OBIEE if you don’t do this.

NewImage

If you do hit this error, go back to the Impala Shell or Hue and compute statistics, and it should go away next time. Then, finally, you can go and create some analyses and dashboards and you should find the queries run fine against the various tables in Hadoop, and moreover the response time is excellent if you use Impala as the main query engine.

NewImage

I did a fair bit of testing of OBIEE 11.1.1.9 running against Cloudera Impala, and my findings were that all of the main analysis features worked (prompts, hierarchies, totals and subtotals etc) and the response time was comparable with a well-turned data warehouse, maybe even Exalytics-level of speed. If you take a look at the nqquery.log file for the Impala SQL queries OBIEE is sending to Impala, you can see they get fairly complex (which is good, as I didn’t hit any errors when running the dashboards) and you can also see where the BI Server takes a more simple approach to creating subtotals, nested queries etc compared to the GROUP BY … GROUPING SETS that you get when using a full Oracle database.

select D1.c1 as c1,
     D1.c2 as c2,
     D1.c3 as c3,
     D1.c4 as c4,
     D1.c5 as c5,
     D1.c6 as c6,
     D1.c7 as c7,
     D1.c8 as c8,
     D1.c9 as c9,
     D1.c10 as c10,
     D1.c11 as c11,
     D1.c12 as c12
from 
     (select 0 as c1,
               D1.c3 as c2,
               substring(cast(NULL as  STRING ), 1, 1 ) as c3,
               substring(cast(NULL as  STRING ), 1, 1 ) as c4,
               substring(cast(NULL as  STRING ), 1, 1 ) as c5,
               'All USA' as c6,
               substring(cast(NULL as  STRING ), 1, 1 ) as c7,
               1 as c8,
               substring(cast(NULL as  STRING ), 1, 1 ) as c9,
               substring(cast(NULL as  STRING ), 1, 1 ) as c10,
               D1.c2 as c11,
               D1.c1 as c12
          from 
               (select sum(T44037.late) as c1,
                         sum(T44037.flights) as c2,
                         T43925.carrier_desc as c3
                    from 
                         
                              hbase_carriers T43925 inner join 
                              impala_flight_delays T44037 On (T43925.key = T44037.carrier)
                    where  ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) 
                    group by T43925.carrier_desc
               ) D1
          union all
          select 1 as c1,
               D1.c3 as c2,
               substring(cast(NULL as  STRING ), 1, 1 ) as c3,
               substring(cast(NULL as  STRING ), 1, 1 ) as c4,
               D1.c4 as c5,
               'All USA' as c6,
               substring(cast(NULL as  STRING ), 1, 1 ) as c7,
               1 as c8,
               substring(cast(NULL as  STRING ), 1, 1 ) as c9,
               D1.c4 as c10,
               D1.c2 as c11,
               D1.c1 as c12
          from 
               (select sum(T44037.late) as c1,
                         sum(T44037.flights) as c2,
                         T43925.carrier_desc as c3,
                         T43928.dest_state as c4
                    from 
                         
                              
                                   hbase_carriers T43925 inner join 
                                   impala_flight_delays T44037 On (T43925.key = T44037.carrier) inner join 
                              hbase_geog_dest T43928 On (T43928.key = T44037.dest)
                    where  ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) 
                    group by T43925.carrier_desc, T43928.dest_state
               ) D1
          union all
          select 2 as c1,
               D1.c3 as c2,
               substring(cast(NULL as  STRING ), 1, 1 ) as c3,
               D1.c4 as c4,
               D1.c5 as c5,
               'All USA' as c6,
               substring(cast(NULL as  STRING ), 1, 1 ) as c7,
               1 as c8,
               D1.c4 as c9,
               D1.c5 as c10,
               D1.c2 as c11,
               D1.c1 as c12
          from 
               (select sum(T44037.late) as c1,
                         sum(T44037.flights) as c2,
                         T43925.carrier_desc as c3,
                         T43928.dest_city as c4,
                         T43928.dest_state as c5
                    from 
                         
                              
                                   hbase_carriers T43925 inner join 
                                   impala_flight_delays T44037 On (T43925.key = T44037.carrier) inner join 
                              hbase_geog_dest T43928 On (T43928.key = T44037.dest and T43928.dest_state = 'Georgia')
                    where  ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) 
                    group by T43925.carrier_desc, T43928.dest_city, T43928.dest_state
               ) D1
          union all
          select 3 as c1,
               D1.c3 as c2,
               D1.c4 as c3,
               D1.c5 as c4,
               D1.c6 as c5,
               'All USA' as c6,
               D1.c4 as c7,
               1 as c8,
               D1.c5 as c9,
               D1.c6 as c10,
               D1.c2 as c11,
               D1.c1 as c12
          from 
               (select sum(T44037.late) as c1,
                         sum(T44037.flights) as c2,
                         T43925.carrier_desc as c3,
                         T43928.dest_airport_name as c4,
                         T43928.dest_city as c5,
                         T43928.dest_state as c6
                    from 
                         
                              
                                   hbase_carriers T43925 inner join 
                                   impala_flight_delays T44037 On (T43925.key = T44037.carrier) inner join 
                              hbase_geog_dest T43928 On (T43928.key = T44037.dest and T43928.dest_city = 'Atlanta, GA')
                    where  ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) 
                    group by T43925.carrier_desc, T43928.dest_airport_name, T43928.dest_city, T43928.dest_state
               ) D1
     ) D1
order by c1, c6, c8, c5, c10, c4, c9, c3, c7, c2 limit 65001

Not bad though for a data warehouse offloaded entirely to Hadoop, and it’s good to see such a system handling full updates and deletes to data as well as insert appends, and it’s also good to see OBIEE working against an Impala datasource and with such good response times. If any of this interests you as a potential customer, feel free to drop me an email at mark.rittman@rittmanmead.com, or check-out our Big Data Quickstart page on the website.

Loading, Updating and Deleting From HBase Tables using HiveQL and Python

Earlier in the week I blogged about a customer looking to offload part of the data warehouse platform to Hadoop, extracting data from a source system and then incrementally loading data into HBase and Hive before analysing it using OBIEE11g. One of the potential complications for this project was that the fact and dimension tables weren’t append-only; Hive and HDFS are generally considered write-once, read-many systems where data is inserted or appended into a file or table but generally then can’t be updated or overwritten without deleting the whole file and writing it again with the updated dataset.

To get around this problem we loaded our incoming data into HBase tables, a NoSQL key/value-store database that also runs on Hadoop and HDFS but permits update and delete operations on rows as well as selects and inserts; later on we took the main fact table stored in Hive-on-HBase and copied its contents into Impala to considerably improve the response time of queries against this tables and the still-Hive-on-HBase dimension tables, but going back to the insert-update-delete operations on the HBase tables, how exactly does this work and what’s the most efficient way to do it?

Taking a step back for a moment, HBase is a NoSQL, key/value-type database where each row has a key (for example, “SFO” for San Francisco airport) and then a number of columns, grouped into column families. In the Flight Delays dataset that we used in the previous blog post, an HBase of origin airports might have a few thousand entries with each entry, or row, keyed on a particular airport code like this:

NewImage

(Note that at the start, these key values won’t be there – they’re more for illustrative purposes)

At the time of HBase table definition, you specify one or more “column families”. These are group headers for columns you might add earlier, and in the case of my origin airport table I might just use the column family name “dest”, so that the HBase table DDL looks like this:

create 'geog_origin','origin'

and the conceptual view of the table would look like this:

NewImage

Now what’s neat about NoSQL-style databases like this (and Endeca Server is the same) is that you can define individual columns just by using them. For example, I could create columns for the airport name, airport city, airport state and airport code just by using their name in a data load, prefixing those column names with the named of a previously-defined column family. Using the HBase Shell, for example, I could issue the following PUT commands to insert the first row of data into this HBase table, like this:

put 'geog_origin’,’SFO','origin:airport_name','San Francisco, CA: San Francisco'
put 'geog_origin’,’SFO','origin:city’,’San Francisco, CA'
put 'geog_origin’,’SFO',’origin':state','California'
put 'geog_origin’,'SFO',’origin':id’,'14771'

Now my HBase table conceptually looks like this:

NewImage

If I then want to use another column under the “origin” column family for LAX, I can just do so by using it in the next set of PUT commands, like this:

put 'geog_origin','LAX’,origin:airport_name','Los Angeles, CA: Los Angeles'
put 'geog_origin','LAX','origin:city','Los Angeles, CA'
put 'geog_origin','LAX','origin:state','California'
put 'geog_origin','LAX','origin:region’,’West Coast'
put 'geog_origin','LAX','origin:id','12892'

NewImage

Each column within column families has its values individually set, retrieved and deleted using PUT, GET and DELETE commands, and as long as you prefix the column name with one of the previously-defined column-family names and provide the key value for the row you’re interested in, HBase database tables are very flexible and were designed for simple product catalog-type applications running on hundreds of sharded server nodes for companies of the likes of Amazon, Google and Facebook (see this HBase “Powered-by” page for more examples of organizations using HBase).

But what HBase very much isn’t is a relational database like Oracle, Microsoft SQL server or even Apache Hive, databases that we’re much more likely to store data warehouse-type data in. In the previous post I showed how Hive table structures can in-fact be put over HBase tables, mapping HBase columns to Hive columns, and then HiveQL INSERT INTO TABLE … SELECT commands can be used to bulk-load these HBase tables with initial sets of data. So back to the original question – what’s the best way to then incrementally load and refresh these HBase tables, and I can I still use HiveQL for this?

In my original post, I defined Hive tables over my HBase ones using the Hive-on-Hbase (yum install hive-hbase) package and associated Hive storage handler; for example, the Hive table that provided SQL access over the flight_delays HBase tables was defined like this:

ADD JAR /usr/lib/hive/lib/zookeeper.jar;
ADD JAR /usr/lib/hive/lib/hive-hbase-handler.jar;
ADD JAR /usr/lib/hive/lib/guava-11.0.2.jar;
ADD JAR /usr/lib/hive/lib/hbase-client.jar;
ADD JAR /usr/lib/hive/lib/hbase-common.jar;
ADD JAR /usr/lib/hive/lib/hbase-hadoop-compat.jar;
ADD JAR /usr/lib/hive/lib/hbase-hadoop2-compat.jar;
ADD JAR /usr/lib/hive/lib/hbase-protocol.jar;
ADD JAR /usr/lib/hive/lib/hbase-server.jar;
ADD JAR /usr/lib/hive/lib/htrace-core.jar;

 
CREATE EXTERNAL TABLE hbase_flight_delays
 (key string,
  year string,
  carrier string,
  orig string,
  dest string,
  flights string,
  late   string,
  cancelled string,
  distance string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,dims:year,dims:carrier,dims:orig,dims:dest,measures:flights,measures:late,measures:cancelled,measures:distance")
TBLPROPERTIES ("hbase.table.name" = "test1_flight_delays");

With the underlying HBase table defined with a key and two column families, one for dimension columns and one for fact (measure) ones – the key is a sequence number that I added to the source dataset to give each row a unique identifier.

create ‘test1_flight_delays','dims','measures'

To initially populate the table, I’ve created another Hive table with the initial set of source data in it, and I just insert its values in to the Hive-on-HBase table, like this:

insert into table hbase_flight_delays              
select * from flight_delays_initial_load;      
         
Total jobs = 1
...
Total MapReduce CPU Time Spent: 11 seconds 870 msec
OK
Time taken: 40.301 seconds

This initial load of 200,000 rows in this instance took 40 seconds to load; not bad, certainly acceptable for this particular project. Imagine now for every day after this we typically added another 500 or so flight records; in regular Hive this would be straightforward and we’d use the LOAD DATA or INSERT INTO TABLE … SELECT commands to add new file data to the Hive table’s underlying HDFS directories. And we can do this with the Hive-on-HBase table too, with the INSERT INTO TABLE command adding the incoming data to new rows/cells in the HBase table. Checking the row count and min/max ID values in the Hive-on-HBase table at the start, like this:

select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key
from hbase_flight_delays;
 
Total jobs = 1
...
Total MapReduce CPU Time Spent: 14 seconds 660 msec
OK
200000  1  200000
Time taken: 53.076 seconds, Fetched: 1 row(s)

I can see that there’s 200,000 rows in the HBase table, starting at key value 1 and ending at key value 200,000. The table containing new data has key values going from 200,001 to 200,500, so let’s insert that new data into the Hive-on-HBase table:

insert into table hbase_flight_delays                                              
select * from flight_delays_daily_update_500_rows;     
                                 
Total jobs = 1
...
Total MapReduce CPU Time Spent: 3 seconds 870 msec
OK
Time taken: 26.368 seconds

Not bad – 26 seconds for the 500 rows, not quite as fast as the initial load but acceptable. Let’s also check that the data went in OK:

select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key
from hbase_flight_delays;  
                                                      
Total jobs = 1
...
Total MapReduce CPU Time Spent: 13 seconds 580 msec
OK
200500   1   200500
Time taken: 44.552 seconds, Fetched: 1 row(s)

As I’d hoped, the number of rows has increased by 500 and the maximum key value is now 200,500. But how do we apply updates to the data in the table? I’ve got another source table that this time contains 1,000 randomly-selected rows from the initial data load dataset, where I’ve set the LATE column value to ‘999’:

hive> select * from flight_delays_daily_changes_1000_rows                                      
    > limit 5;
OK
21307  2008 WN BDL  BWI  1  999  1  283
136461  2008  OO  ORD  TYS  0  999  1  475
107768  2008  WN  BWI  ORF  0  999  1  159
102393  2008  OO  SLC  ACV  0  999  1  635
110639  2008  WN  BOI  OAK  0  999  1  511
Time taken: 0.139 seconds, Fetched: 5 row(s)

In fact the way you apply these updates is just to INSERT INTO TABLE … SELECT again, and the incoming values create new versions of existing rows/cells if needed. Some versions of HBase automatically keep a number of versions of each cell value (typically 3 versions), however the version of HBase that comes with CDH5.2 and higher only keeps one version by default (you can increase this number per table, or system wide, using the steps in the CDH5.2 release notes). Let’s try this out now, first using the HBase shell to see the values and timestamps currently held for one particular key value I know should by updated by the next dataset:

hbase(main):029:0> get 'test1_flight_delays', '102393'
COLUMN                                     CELL                                                                                                                       
 dims:carrier                              timestamp=1432236609421, value=OO                                                                                          
 dims:dest                                 timestamp=1432236609421, value=ACV                                                                                         
 dims:orig                                 timestamp=1432236609421, value=SLC                                                                                         
 dims:year                                 timestamp=1432236609421, value=2008                                                                                        
 measures:cancelled                        timestamp=1432236609421, value=1                                                                                           
 measures:distance                         timestamp=1432236609421, value=635                                                                                         
 measures:flights                          timestamp=1432236609421, value=0                                                                                           
 measures:late                             timestamp=1432236609421, value=0                                                                                           
8 row(s) in 0.0330 seconds

I’ll now use Hive to apply the updates, like this:

insert into table hbase_flight_delays                                              
select * from flight_delays_daily_changes_1000_rows;
 
Total jobs = 1
...
Total MapReduce CPU Time Spent: 4 seconds 340 msec
OK
Time taken: 24.805 seconds
 
select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key
from hbase_flight_delays;                                                          
Total jobs = 1
...
Total MapReduce CPU Time Spent: 13 seconds 430 msec
OK
200500 1 200500
Time taken: 47.379 seconds, Fetched: 1 row(s)

Notice how this third INSERT didn’t create any new rows, the max key ID in the follow-up query hasn’t increased since the previous insert of new data. Querying one of the rows that I know was changed by this new table of data updates, I can see that the LATE column value has been changed:

select * from hbase_flight_delays where key = '102393';
Total jobs = 1
...
Total MapReduce CPU Time Spent: 3 seconds 600 msec
OK
102393  2008  OO  SLC  ACV  0  999  1  635

Let’s go into the HBase shell now and take a look at the columns cells for that same key ID:

hbase(main):030:0> get 'test1_flight_delays', '102393'
COLUMN                                     CELL                                                                                                                       
 dims:carrier                              timestamp=1432236723680, value=OO                                                                                          
 dims:dest                                 timestamp=1432236723680, value=ACV                                                                                         
 dims:orig                                 timestamp=1432236723680, value=SLC                                                                                         
 dims:year                                 timestamp=1432236723680, value=2008                                                                                        
 measures:cancelled                        timestamp=1432236723680, value=1                                                                                           
 measures:distance                         timestamp=1432236723680, value=635                                                                                         
 measures:flights                          timestamp=1432236723680, value=0                                                                                           
 measures:late                             timestamp=1432236723680, value=999                                                                                         
8 row(s) in 0.0800 seconds

Notice how the timestamp for each of the cells has now updated? If I had more than the default 1 version of each cell enabled, I could query the previous versions to see the old values and timestamps. So this works pretty well, and all I need to do is use HiveQL and INSERT INTO TABLE … SELECT to initially populate, append to and even update values in the table. But what If I want to update HBase more “programmatically”, maybe as part of a process that reads directly from a source application (for example, Salesforce or a web service) and then writes directly into HBase without the intermediate step of landing the incoming data into a file? For this we can use the HBase Client API of which there are libraries for many languages with the most popular being the Java API. If Java is too much though and you’d rather interact with HBase using a language such as Python, as this Cloudera blog post explains you can use either a REST API interface to HBase or one using the Thrift interface and work with languages such as Python.

In my case, my preferred way of programatically working with HBase is to use Python and a developer library called Happybase, where I can also bring in other libraries such as ones to work with Hive and even ones to work with OBIEE and Fusion Middleware and do my work at a much higher-level of abstraction. To show how this might work, I’m going to use Python, the HBase Client API and Happybase to programatically read from my update Hive tables (in real-life I’d probably connect directly to a web service if going down this more complicated route) and write a routine to read rows from the Hive table and load them into HBase.

Again I’m using the Oracle Big Data Lite 4.1 VM which has Python 2.7.6 already installed, and to get ready to install the Happybase library I first need to install pip, the “preferred installer program” for Python. As per the pip installation instructions, first download pip and then install it from the command-line:

python get-pip.py

Then use Pip to install Happybase 

pip install happybase

Whist you’re there you might as well install “pyhs2”, another python package that in this case lets us easily connect to Hive tables via the HiveServer2 interface found on CDH5+ and the Big Data Lite 4.1 VM.

pip install pyhs2

Now I can put together a Python program such as the one below, that in this case creates a connection to a Hive table, selects all rows from it into a cursor and then PUTs these rows into the HBase table, via a batch process that sends data to HBase via the Thrift interface every 10,000 rows:

import pyhs2
import happybase
 
connection = happybase.Connection('bigdatalite')
flight_delays_hbase_table = connection.table('test1_flight_delays')
b = flight_delays_hbase_table.batch(batch_size=10000)
 
with pyhs2.connect(host='bigdatalite',
               port=10000,
               authMechanism="PLAIN",
               user='oracle',
               password='welcome1',
               database='default') as conn:
    with conn.cursor() as cur:
 
        #Execute query
        cur.execute("select * from flight_delays_initial_load")
 
        #Fetch table results
        for i in cur.fetch():
            b.put(str(i[0]),{'dims:year': i[1],
                             'dims:carrier': i[2],
                             'dims:orig': i[3],
                             'dims:dest': i[4],
                             'measures:flights': i[5],
                             'measures:late': i[6],
                             'measures:cancelled': i[7],
                             'measures:distance': i[8]})
b.send()

which I can then run from the command-line like this:

[oracle@bigdatalite ~]$ python ./load_update_flight_delays.py

As I said, using this approach I could just as easily connect to a web service or read in data via Flume or Kafka, and I can delete rows as well as insert/update them and add any other logic. From my testing it’s not all that faster than going via HiveQL and INSERT INTO TABLE … SELECT scripts (most probably because I’m still going into HBase indirectly, via the Thrift interface) but it does offer the possibility of direct inserts into HBase (and therefore Hive) from the source application without the intermediate step of writing files to disk.

So to finish this short series, tomorrow I’ll look at how well these Hive-on-HBase tables, and the Impala table I created in the previous example, work when queried from OBIEE11g. Back tomorrow.

Using HBase and Impala to Add Update and Delete Capability to Hive DW Tables, and Improve Query Response Times

One of our customers is looking to offload part of their data warehouse platform to Hadoop, extracting data out of a source system and loading it into Apache Hive tables for subsequent querying using OBIEE11g. One of the challenges that the project faces though is how to handle updates to dimensions (and in their case, fact table records) when HDFS and Hive are typically append-only filesystems; ideally writes to fact tables should only require INSERTs and filesystem appends but in this case they wanted to use an accumulating fact snapshot table, whilst the dimension tables all used SCD1-type attributes that had their values overwritten when updates to those values came through from the source system.

The obvious answer then was to use Apache HBase as part of the design, a NoSQL database that sits over HDFS but allows updates and deletes to individual rows of data rather than restricting you just to append/inserts. I covered HBase briefly on the blog a few months ago when we used it to store webserver log entries brought into Hadoop via Flume, but in this case it makes an ideal landing point for data coming into our Hadoop system as we can maintain a current-state record of the data brought into the source system updating and overwriting values if we need to. What was also interesting to me though was how well we could integrate this HBase data into our mainly SQL-style data processing; how much Java I’d have to use to work with HBase, and whether we could get OBIEE to connect to the HBase tables and query them directly (with a reasonable response time). In particular, could we use the Hive-on-HBase feature to create Hive tables over the HBase ones, and then query those efficiently using OBIEE, so that the data flow looked like this?

NewImage

To test this idea out, I took the Flight Delays dataset from the OBIEE11g SampleApp & Exalytics demo data [PDF] and created four HBase tables to hold the data from them, using the BigDataLite 4.1 VM and the HBase Shell. This dataset has four tables:

  • FLIGHT_DELAYS – around 220m US flight records listing the origin airport, destination airport, carrier, year and a bunch of metrics (flights, late minutes, distance etc)
  • GEOG_ORIGIN – a list of all the airports in the US along with their city, state, name and so on
  • GEOG_DEST – a copy of the GEOG_ORIGIN table, used for filtering and aggregating on both origin and destination 
  • CARRIERS – a list of all the airlines associated with flights in the FLIGHT_DELAYS table

HBase is a NoSQL, key/value-store database where individual rows have a key, and then one or more column families made up of one or more columns. When you define a HBase table you only define the column families, and the data load itself creates the columns within them in a similar way to how the Endeca Server holds “jagged” data – individual rows might have different columns to each other and like MongoDB you can define a new column just by loading it into the database.

Using the HBase Shell CLI on the BigDataLite VM I therefore create the HBase tables using just these high-level column family definitions, with the individual columns within the column families to be defined later when I load data into them.

hbase shell
 
create 'carriers','details'
create 'geog_origin','origin'
create 'geog_dest','dest'
create 'flight_delays','dims','measures'

To get data into HBase tables there’s a variety of methods you can use. Most probably for the full project we’ll write a Java application that uses the HBase client to read, write, update and delete rows that are read in from the source application (see this previous blog post for an example where we use Flume as the source), or to set up some example data we can use the HBase Shell and enter the HBase row/cell values directly, like this for the geog_dest table:

put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles'
put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles'
put 'geog_dest','LAX','dest:city','Los Angeles, CA'
put 'geog_dest','LAX','dest:state','California'
put 'geog_dest','LAX','dest:id','12892'

and you can then use the “scan” command from the HBase shell to see those values stored in HBase’s key/value store, keyed on LAX as the key.

hbase(main):015:0> scan 'geog_dest'
ROW                                    COLUMN+CELL                                                                                                     
 LAX                                   column=dest:airport_name, timestamp=1432067861347, value=Los Angeles, CA: Los Angeles                           
 LAX                                   column=dest:city, timestamp=1432067861375, value=Los Angeles, CA                                                
 LAX                                   column=dest:id, timestamp=1432067862018, value=12892                                                            
 LAX                                   column=dest:state, timestamp=1432067861404, value=California                                                    
1 row(s) in 0.0240 seconds

For testing purposes though we need a large volume of rows and entering them all in by-hand isn’t practical, so this is where we start to use the Hive integration that now comes with HBase. For the BigDataLite 4.1 VM all you need to do to get this working is install the hive-hbase package using yum (after first installing the Cloudera CDH5 repo into /etc/yum.repos.d), load the relevant JAR files when starting your Hive shell session, and then create a Hive table over the HBase table mapping Hive columns to the relevant HBase ones, like this:

hive
 
ADD JAR /usr/lib/hive/lib/zookeeper.jar;
ADD JAR /usr/lib/hive/lib/hive-hbase-handler.jar;
ADD JAR /usr/lib/hive/lib/guava-11.0.2.jar;
ADD JAR /usr/lib/hive/lib/hbase-client.jar;
ADD JAR /usr/lib/hive/lib/hbase-common.jar;
ADD JAR /usr/lib/hive/lib/hbase-hadoop-compat.jar;
ADD JAR /usr/lib/hive/lib/hbase-hadoop2-compat.jar;
ADD JAR /usr/lib/hive/lib/hbase-protocol.jar;
ADD JAR /usr/lib/hive/lib/hbase-server.jar;
ADD JAR /usr/lib/hive/lib/htrace-core.jar;
 
CREATE EXTERNAL TABLE hbase_carriers
 (key string,
  carrier_desc string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,details:carrier_desc")
TBLPROPERTIES ("hbase.table.name" = "carriers");
 
CREATE EXTERNAL TABLE hbase_geog_origin
 (key string,
  origin_airport_name string,
  origin_city string,
  origin_state string,
  origin_id string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,origin:airport_name,origin:city,origin:state,origin:id")
TBLPROPERTIES ("hbase.table.name" = "geog_origin");
 
CREATE EXTERNAL TABLE hbase_geog_dest
 (key string,
  dest_airport_name string,
  dest_city string,
  dest_state string,
  dest_id string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,dest:airport_name,dest:city,dest:state,dest:id")
TBLPROPERTIES ("hbase.table.name" = "geog_dest");
 
CREATE EXTERNAL TABLE hbase_flight_delays
 (key string,
  year string,
  carrier string,
  orig string,
  dest string,
  flights tinyint,
  late   tinyint,
  cancelled bigint,
  distance smallint
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,dims:year,dims:carrier,dims:orig,dims:dest,measures:flights,measures:late,measures:cancelled,measures:distance")
TBLPROPERTIES ("hbase.table.name" = "flight_delays");

Bulk loading data into these Hive-on-HBase tables is then just a matter of loading the source data into a regular Hive table, and then running INSERT INTO TABLE … SELECT commands to copy the regular Hive rows into the HBase tables via their Hive metadata overlays:

insert into table hbase_carriers                           
select carrier, carrier_desc from carriers;
 
insert into table hbase_geog_origin
select * from geog_origin;
 
insert into table hbase_geog_dest
select * from geog_dest;
 
insert into table hbase_flight_delays
select row_number() over (), * from flight_delays;

Note that I had to create a synthetic sequence number key for the fact table, as the source data for that table doesn’t have a unique key for each row – something fairly common for data warehouse fact table datasets. In fact storing fact table data into a HBase table is not a very good idea for a number of reasons that we’ll see in a moment, and bear-in-mind that HBase is designed for sparse datasets and low-latency inserts and row retrievals so don’t read too much into this approach yet.

So going back to the original reason for using HBase to store these tables, updating rows within them is pretty straightforward. Taking the geog_origin HBase table at the start, if we get the row for SFO at the start using a Hive query over the HBase table, it looks like this:

hive> select * from hbase_geog_origin where key = 'SFO'; 
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
SFO   San Francisco, CA: San Francisco   San Francisco, CA   California   14771
Time taken: 29.126 seconds, Fetched: 1 row(s)

To update that row and others, I can load a new data file into the Hive table using HiveQL’s LOAD DATA command, or INSERT INTO TABLE … SELECT from another Hive table containing the updates, like this:

insert into table hbase_geog_origin    
select * from origin_updates;

To check that the value has in-fact updated I can either run the same SELECT query against the Hive table over the HBase one, or drop into the HBase shell and check it there:

hbase(main):001:0> get 'geog_origin','SFO'
COLUMN                                 CELL                                                                                                           
 origin:airport_name                   timestamp=1432050681685, value=San Francisco, CA: San Francisco International                                  
 origin:city                           timestamp=1432050681685, value=San Francisco, CA                                                               
 origin:id                             timestamp=1432050681685, value=14771                                                                           
 origin:state                          timestamp=1432050681685, value=California                                                                      
4 row(s) in 0.2740 seconds

In this case the update file/Hive table changed the SFO airport name from “San Francisco” to “San Francisco International”. I can change it back again using the HBase Shell like this, if I want:

put 'geog_origin','SFO','origin:airport_name','San Francisco, CA: San Francisco'

and then checking it again using the HBase Shell’s GET command on that key value shows it’s back to the old value – HBase actually stores X number of versions of each cell with a timestamp for each version, but by default it shows you the current one:

hbase(main):003:0> get 'geog_origin','SFO'
COLUMN                                 CELL                                                                                                           
 origin:airport_name                   timestamp=1432064747843, value=San Francisco, CA: San Francisco                                                
 origin:city                           timestamp=1432050681685, value=San Francisco, CA                                                               
 origin:id                             timestamp=1432050681685, value=14771                                                                           
 origin:state                          timestamp=1432050681685, value=California                                                                      
4 row(s) in 0.0130 seconds

So, so far so good. We’ve got a way of storing data in Hive-type tables on Hadoop and a way of updating and amending records within them by using HBase as the underlying storage, but what are these tables like to query? Hive-on-HBase tables with just a handful of HBase rows return data almost immediately, for example when I create a copy of the geog_dest HBase table and put just a single row entry into it, then query it using a Hive table over it:

hive> select * from hbase_geog_dest2;
OK
LAXLos Angeles, CA: Los AngelesLos Angeles, CACalifornia12892
Time taken: 0.257 seconds, Fetched: 1 row(s)

Hive in this case even with a single row would normally take 30 seconds or more to return just that row; but when we move up to larger datasets such as the flight delays fact table itself, running a simple row count on the Hive table and then comparing that to the same query running against the Hive-on-HBase version shows a significant time-penalty for the HBase version:

hive> select sum(cast(flights as bigint)) as flight_count from flight_delays;
Total jobs = 1
Launching Job 1 out of 1
...
Total MapReduce CPU Time Spent: 7 seconds 670 msec
OK
29483653
Time taken: 37.327 seconds, Fetched: 1 row(s)

compared to the Hive-on-HBase version of the fact table:

hive> select sum(cast(flights as bigint)) as flight_count from hbase_flight_delays;
Total jobs = 1
Launching Job 1 out of 1
...
Total MapReduce CPU Time Spent: 1 minutes 19 seconds 240 msec
OK
21473738
Time taken: 99.154 seconds, Fetched: 1 row(s)

And that’s to be expected; as I said earlier, HBase is aimed at low-latency single-row operations rather than full table scan, aggregation-type queries, so it’s not unexpected that HBase performs badly here, but the response time is even worse if I try and join the HBase-stored Hive fact table to one or more of the dimension tables also stored in HBase.

In our particular customer example though these HBase tables were only going to be loaded once-a-day, so what if we copy the current version of each HBase table row into a snapshot Hive table stored in regular HDFS storage, so that our data loading process looks like this:

NewImage

and then OBIEE queries the snapshot of the Hive-on-HBase table joined to the dimension table still stored in HBase, so that the query side looks like this:

NewImage

Let’s try it out by taking the original Hive table I used earlier on to load the hbase_flight_delays table. and join that to one of the Hive-on-HBase dimension tables; I’ll start first by creating a baseline response time by joining that source Hive fact table to the source Hive dimension table (also used earlier to load the corresponding Hive-on-HBase table):

select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from flight_delays f 
join geog_origin o on f.orig = o.origin                                                             
and o.origin_state = 'California'                                                                       
group by o.origin_airport_name; 
...
OK
17638Arcata/Eureka, CA: Arcata
9146Bakersfield, CA: Meadows Field
125433Burbank, CA: Bob Hope
...
1653Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field
Time taken: 43.896 seconds, Fetched: 27 row(s)

So that’s just under 44 seconds to do the query entirely using regular Hive tables. So what if I swap-out the regular Hive dimension table for the Hive-on-HBase version, how does that affect the response time?

hive> select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from flight_delays f       
    > join hbase_geog_origin o on f.orig = o.key                                                        
    > and o.origin_state = 'California'                                                                 
    > group by o.origin_airport_name;
...
OK
17638Arcata/Eureka, CA: Arcata
9146Bakersfield, CA: Meadows Field
125433Burbank, CA: Bob Hope
...
1653Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field
Time taken: 51.757 seconds, Fetched: 27 row(s)

That’s interesting – even though we used the (updatable) Hive-on-HBase dimension table in the query, the response time only went up a few seconds to 51, compared to the 44 when we used just regular Hive tables. Taking it one step further though, what if we used Cloudera Impala as our query engine and copied the Hive-on-HBase fact table into a Parquet-stored Impala table, so that our inward data flow looked like this:

NewImage

By using the Impala MPP engine – running on Hadoop but directly reading the underlying data files, rather than going through MapReduce as Hive does – and in-addition storing its data in column-store query-orientated Parquet storage, we can take advantage of OBIEE 11.1.1.9’s new support for Impala and potentially bring the query response time even further. Let’s go into the Impala Shell on the BigDataLite 4.1 VM, update Impala’s view of the Hive Metastore table data dictionary, and then create the corresponding Impala snapshot fact table using a CREATE TABLE … AS SELECT Impala SQL command:

[oracle@bigdatalite ~]$ impala-shell
 
[bigdatalite.localdomain:21000] > invalidate metadata;
 
[bigdatalite.localdomain:21000] > create table impala_flight_delays
                                > stored as parquet
                                > as select * from hbase_flight_delays;

Now let’s use the Impala Shell to join the Impala version of the flight delays table with data stored in Parquet files, to the Hive-on-HBase dimension table created earlier within our Hive environment:

[bigdatalite.localdomain:21000] > select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
                                > join hbase_geog_origin o on f.orig = o.key
                                > and o.origin_state = 'California'  
                                > group by o.origin_airport_name;
Query: select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
join hbase_geog_origin o on f.orig = o.key
and o.origin_state = 'California'
group by o.origin_airport_name
+--------------+------------------------------------------------------------------+
| flight_count | origin_airport_name                                              |
+--------------+------------------------------------------------------------------+
| 31907        | Fresno, CA: Fresno Yosemite International                        |
| 125433       | Burbank, CA: Bob Hope                                            |
...
| 1653         | Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field |
+--------------+------------------------------------------------------------------+
Fetched 27 row(s) in 2.16s

Blimey – 2.16 seconds, compared to the best time of 44 seconds we go earlier when we just used regular Hive tables, let alone join to the dimension table stored in HBase. Let’s crank-it-up a bit and join another dimension table in, filtering on both origin and destination values:

[bigdatalite.localdomain:21000] > select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
                                > join hbase_geog_origin o on f.orig = o.key
                                > join hbase_geog_dest d on f.dest = d.key
                                > and o.origin_state = 'California'  
                                > and d.dest_state = 'New York'
                                > group by o.origin_airport_name;
Query: select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
join hbase_geog_origin o on f.orig = o.key
join hbase_geog_dest d on f.dest = d.key
and o.origin_state = 'California'
and d.dest_state = 'New York'
group by o.origin_airport_name
+--------------+-------------------------------------------------------+
| flight_count | origin_airport_name                                   |
+--------------+-------------------------------------------------------+
| 947          | Sacramento, CA: Sacramento International              |
| 3880         | San Diego, CA: San Diego International                |
| 4030         | Burbank, CA: Bob Hope                                 |
| 41909        | San Francisco, CA: San Francisco International        |
| 3489         | Oakland, CA: Metropolitan Oakland International       |
| 937          | San Jose, CA: Norman Y. Mineta San Jose International |
| 41407        | Los Angeles, CA: Los Angeles International            |
| 794          | Ontario, CA: Ontario International                    |
| 4176         | Long Beach, CA: Long Beach Airport                    |
+--------------+-------------------------------------------------------+
Fetched 9 row(s) in 1.48s

Even faster. So that’s what we’ll be going with as our initial approach for the data loading and querying; load data into HBase tables as planned at the start, taking advantage of HBase’s CRUD capabilities but bulk-loading and initially reading the data using Hive tables over the HBase ones; but then, before we make the data available for querying by OBIEE, we copy the current state of the HBase fact table into a Parquet-stored Impala table, using Impala’s ability to work with Hive tables and metadata and create joins across both Impala and Hive tables, even when one of the Hive tables uses HBase as its underlying storage.

OBIEE 11.1.1.9 Now Supports HiveServer2 and Cloudera Impala

As you all probably know I’m a big fan of Oracle’s BI and Big Data products, but something I’ve been critical of is OBIEE11g’s lack of support for HiveServer2 connections to Hadoop clusters. OBIEE 11.1.1.7 supported Hive connections using the older HiveServer1 protocol, but recent versions of Cloudera CDH4 and CDH5 use the HiveServer2 protocol by default and OBIEE 11.1.1.7 wouldn’t connect to them; not unless you switched to the Windows version of OBIEE and used the Cloudera ODBC drivers instead, which worked but weren’t supported by Oracle.

OBIEE 11.1.1.9 addresses this issue by shipping more recent DataDirect ODBC drivers for Hive, that are compatible with the HiveServer2 protocol used by CDH4 and CDH5 (check out this other article by Robin on general new features in 11.1.1.9). Oracle only really support Hive connectivity for Linux installs of OBIEE, and the Linux version of OBIEE 11.1.1.9 comes with the DataDirect ODBC drivers already installed and configured for use, all you have to do then is set up the ODBC connection in the odbc.ini file on Linux and install the Cloudera Hive ODBC drivers on your Windows workstation for the Admin too (the Hive ODBC drivers that Oracle supply on MOS still look like the old HIveServer1 version, though I could be wrong). To check that it all worked on this new 11.1.1.9 version of OBIEE11g I therefore downloaded and installed the Windows Cloudera Hive ODBC drivers and set up the System DSN like this:

NewImage

and set up a corresponding entry in the Linux OBIEE 11.1.1.9’s odbc.ini file, like this:

NewImage

with the key thing being to make sure you have matching DSN names on both the Windows workstation (for the Admin tool initial datasource setup and table metadata import) and the Linux server (for the actual online connection to Hive from the BI Server, and subsequent data retrieval). One thing I did notice was that whilst I could connect to the Hive database server and set up the connection in the Admin tool, I couldn’t view any Hive tables and had to manually create them myself in the RPD Physical Layer – this could just be a quirk on my workstation install though so I wouldn’t read too much into it. Checking connectivity in the Admin tool then showed it connecting properly and retrieving data from Hive on the Hadoop cluster. I didn’t test Kerberos-authentication connections but I’m assuming it’d work, as the previous version of OBIEE 11.1.1.7 on Linux just failed at this point anyway. The docs are here if you’d like to look into any more details, or check the full set of setup steps.

NewImage

For Cloudera Impala connections, you’re directed in the docs to download the Windows Cloudera Impala ODBC drivers as Oracle don’t even ship them on MOS, but again the Linux install of OBIEE 11.1.1.9 comes with DataDirect Impala drivers that are already setup and ready for use (note that if you upgrade from 11.1.1.7 to 11.1.1.9 rather than do the fresh install that I did for testing purposes, you’ll need to edit the opmn.xml file to register these updated DataDirect drivers). Then it’s a case of setting the Windows System DSN up for the initial metadata import, like this:

NewImage

then creating a corresponding entry in the Linux server’s odbc.ini file, like this:

NewImage

Note that the docs do mention the issue with earlier versions of Impala where the Impala server is expecting LIMIT clauses when using ORDER BY in Impala SQL queries, and gives a couple of workarounds to fix the issue and stop Impala expecting this clause; for more recent (CDH5+) versions of Impala this requirement is in-fact lifted and you can connect-to and use Impala without needing to make the configuration change mentioned in the doc (or use the workaround I mentioned in this earlier blog post). Checking connectivity in the Admin tool then shows the connection is making its way through OK, from the Windows environment to the Linux server’s ODBC connection:

NewImage

and creating a quick report shows data returned as expected, and considerably quicker than with Hive.

NewImage

As I said, I’ve not really tested either of these two connections using Kerberos or any edge-case setups, but connectivity seems to be working and we’re now in a position where OBIEE11g can properly connect to both Hive, and Impala, on recent CDH installs and of course the Oracle Big Data Appliance. Good stuff, now what about Spark SQL or ElasticSearch..?

Presentation Slides and Photos from the Rittman Mead BI Forum 2015, Brighton and Atlanta

It’s now the Saturday after the two Rittman Mead BI Forum 2015 events, last week in Atlanta, GA and the week before in Brighton, UK. Both events were a great success and I’d like to say thanks to the speakers, attendees, our friends at Oracle and my colleagues within Rittman Mead for making the two events so much fun. If you’re interested in taking a look at some photos from the two events, I’ve put together two Flickr photosets that you can access using the links below:

NewImage

We’ve also uploaded the presentation slides from the two events (where we’ve been given permission to share them) to our website, and you can download them including the Delivering the Oracle Information Management and Big Data Reference Architecture masterclass using the links below:

Delivering the Oracle Information Management & Big Data Reference Architecture (Mark Rittman & Jordan Meyer, Rittman Mead)

Brighton, May 7th and 8th 2015

Atlanta, May 14th and 15th 2015

Congratulations also to Emiel van Bockel and Robin Moffatt who jointly-won Best Speaker award at the Brighton event, and to Andy Rocha and Pete Tamsin who won Best Speaker in Atlanta for their joint session. It’s time for a well-earned rest now and then back to work, and hopefully we’ll see some of you at KScope’15, Oracle Openworld 2015 or the UKOUG Tech and Apps 2015 conferences later in 2015.