Tag Archives: Big Data
Combining Spark Streaming and Data Frames for Near-Real Time Log Analysis & Enrichment
A few months ago I posted an article on the blog around using Apache Spark to analyse activity on our website, using Spark to join the site activity to some reference tables for some one-off analysis. In this article I’ll be taking an initial look at Spark Streaming, a component within the overall Spark platform that allows you to ingest and process data in near real-time whilst keeping the same overall code-based as your batch-style Spark programs.
Like regular batch-based Spark programs, Spark Streaming builds on the concept of RDDs (Resilient Distributed Datasets) and provides an additional high-level abstraction called a “discretized stream” or DStream, representing a continuous stream of RDDs over a defined time period. In the example I’m going to create I’ll use Spark Streaming’s DStream feature to hold in-memory the last 24hrs worth of website activity, and use it to update a “Top Ten Pages” Impala table that’ll get updated once a minute.
To create the example I started with the Log Analyzer example in the set of DataBricks Spark Reference Applications, and adapted the Spark Streaming / Spark SQL example to work with our CombinedLogFormat log format that contains two additional log elements. In addition, I’ll also join the incoming data stream with some reference data sitting in an Oracle database and then output a parquet-format file to the HDFS filesystem containing the top ten pages over that period.
The bits of the Log Analyzer reference application that we reused comprise of two scripts that compile into a single JAR file; a script that creates a Scala object to parse the incoming CombinedLogFormat log files, and other with the main program in. The log parsing object contains a single function that takes a set of log lines, then returns a Scala class that breaks the log entries down into the individual elements (IP address, endpoint (URL), referrer and so on). Compared to the DataBricks reference application I had to add two extra log file elements to the ApacheAccessLog class (referer and agent), and add some code in to deal with “-“ values that could be in the log for the content size; I also added some extra code to ensure the URLs (endpoints) quoted in the log matched the format used in the data extracted from our WordPress install, which stores all URLs with a trailing forward-slash (“/“).
package com.databricks.apps.logs case class ApacheAccessLog(ipAddress: String, clientIdentd: String, userId: String, dateTime: String, method: String, endpoint: String, protocol: String, responseCode: Int, contentSize: Long, referer: String, agent: String) { } object ApacheAccessLog { val PATTERN = """^(S+) (S+) (S+) [([wd:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) ([d-]+) "([^"]+)" "([^"]+)"""".r def parseLogLine(log: String): ApacheAccessLog = { val res = PATTERN.findFirstMatchIn(log) if (res.isEmpty) { ApacheAccessLog("", "", "", "","", "", "", 0, 0, "", "") } else { val m = res.get val contentSizeSafe : Long = if (m.group(9) == "-") 0 else m.group(9).toLong val formattedEndpoint : String = (if (m.group(6).charAt(m.group(6).length-1).toString == "/") m.group(6) else m.group(6).concat("/")) ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), formattedEndpoint, m.group(7), m.group(8).toInt, contentSizeSafe, m.group(10), m.group(11)) } } }
The body of the main application script starts by importing Scala classes for Spark, Spark SQL and Spark Streaming, and then defines two variable that determine the amount of log data the application will consider; WINDOW_LENGTH (86400 milliseconds, or 24hrs) which determines the window of log activity that the application will consider, and SLIDE_INTERVAL, set to 60 milliseconds or one minute, which determines how often the statistics are recalculated. Using these values means that our Spark Streaming application will recompute every minute the top ten most popular pages over the last 24 hours.
package com.databricks.apps.logs.chapter1 import com.databricks.apps.logs.ApacheAccessLog import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} object LogAnalyzerStreamingSQL { val WINDOW_LENGTH = new Duration(86400 * 1000) val SLIDE_INTERVAL = new Duration(60 * 1000)
In our Spark Streaming application, we’re also going to load-up reference data from our WordPress site, exported and stored in an Oracle database, to add post title and post author values to the raw log entries that come in via Spark Streaming. In the next part of the script then we define a new Spark context and then a Spark SQL context off-of the base Spark context, then create a Spark SQL data frame to hold the Oracle-sourced WordPress data to later-on join to the incoming DStream data – using Spark’s new Data Frame feature and the Oracle JDBC drivers that I separately download off-of the Oracle website, I can pull in reference data from Oracle or other database sources, or bring it in from a CSV file as I did in the previous Spark example, to supplement my raw incoming log data.
def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in Scala") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val postsDF = sqlContext.load("jdbc", Map( "url" -> "jdbc:oracle:thin:blog_refdata/password@bigdatalite.rittmandev.com:1521:orcl", "dbtable" -> "BLOG_REFDATA.POST_DETAILS")) postsDF.registerTempTable("posts")
Note also how Spark SQL lets me declare a data frame (or indeed any RDD with an associated schema) as a Spark SQL table, so that I can later run SQL queries against it – I’ll come back to this at the end).
Now comes the first part of the Spark Streaming code. I start by defining a new Spark Streaming content off of the same base Spark context that I created the Spark SQL one off-of, then I use that Spark Streaming context to create a DStream that reads newly-arrived files landed in an HDFS directory – for this example I’ll manually copy the log files into an “incoming” HDFS directory, whereas in real-life I’d connect Spark Streaming to Flume using FlumeUtils for a more direct-connection to activity on the webserver.
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.textFileStream("/user/oracle/rm_logs_incoming")
Then I call the Scala “map” transformation to convert the incoming DStream into an ApacheAccessLog-formatted DStream, and cache this new DStream in-memory. Next and as the final part of this stage, I call the Spark Streaming “window” function which packages the input data into in this case a 24-hour window of data, and creates a new Spark RDD every SLIDE_INTERVAL – in this case 1 minute – of time.
val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)
Now that Spark Streaming is creating RDDs for me to represent all the log activity over my 24 hour period, I can use the .foreachRDD control structure to turn that RDD into its own data frame (using the schema I’ve inherited from the ApacheAccessLog Scala class earlier on), and filter out bot activity and references to internal WordPress pages so that I’m left with actual page accesses to then calculate the top ten list from.
windowDStream.foreachRDD(accessLogs => { if (accessLogs.count() == 0) { println("No logs received in this time interval") } else { accessLogs.toDF().registerTempTable("accessLogs") // Filter out bots val accessLogsFilteredDF = accessLogs .filter( r => ! r.agent.matches(".*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*")) .filter( r => ! r.endpoint.matches(".*(wp-content|wp-admin|wp-includes|favicon.ico|xmlrpc.php|wp-comments-post.php).*")).toDF() .registerTempTable("accessLogsFiltered")
Then, I use Spark SQL’s ability to join tables created against the windowed log data and the Oracle reference data I brought in earlier, to create a parquet-formatted file containing the top-ten most popular pages over the past 24 hours. Parquet is the default storage format used by Spark SQL and is suited best to BI-style columnar queries, but I could use Avro, CSV or another file format If I brought the correct library imports in.
val topTenPostsLast24Hour = sqlContext.sql("SELECT p.POST_TITLE, p.POST_AUTHOR, COUNT(*) as total FROM accessLogsFiltered a JOIN posts p ON a.endpoint = p.POST_SLUG GROUP BY p.POST_TITLE, P.POST_AUTHOR ORDER BY total DESC LIMIT 10 ") // Persist top ten table for this window to HDFS as parquet file topTenPostsLast24Hour.save("/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet", "parquet", SaveMode.Overwrite) } })
Finally, the last piece of the code starts-off the data ingestion process and then continues until the process is interrupted or stopped.
streamingContext.start() streamingContext.awaitTermination() } }
I can now go over to Hue and move some log files into the HDFS directory that the Spark application is running on, like this:
Then, based on the SLIDE_INTERVAL I defined in the main Spark application earlier on (60 seconds, in my case) the Spark Streaming application picks up the new files and processes them, outputting the results as a Parquet file back on the HDFS filesystem (these two screenshots should display as animated GIFs)
So what to do with the top-ten pages parquet file that the Spark Streaming application creates? The most obvious thing to do would be to create an Impala table over it, using the schema metadata embedded into the parquet file, like this:
CREATE EXTERNAL TABLE rm_logs_24hr_top_ten <br />LIKE PARQUET '/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet/part-r-00001.parquet'<br /> STORED AS PARQUET<br /> LOCATION '/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet';
Then I can query the table using Hue again, or I can import the Impala table metadata into OBIEE and analyse it using Answers and Dashboards.
So that’s a very basic example of Spark Streaming, and I’ll be building on this example over the new few weeks to add features such as persistent storing of all processed data, and classification and clustering the data using Spark MLlib. More importantly, copying files into HDFS for ingestion into Spark Streaming adds quite a lot of latency and it’d be better to connect Spark directly to the webserver using Flume or even better, Kafka – I’ll add examples showing these features in the next few posts in this series.
Rittman Mead at ODTUG KScope’15, Hollywood Florida
ODTUG KScope’15 is running in Hollywood, Florida next week and Rittman Mead are running a number of sessions during the week on OBIEE, Essbase, ODI and Big Data. I’ve personally been attending ODTUG KScope (or “Kaleidoscope”, as it used to be known) for many years now and it’s the best developer-centric conference we go to, coupled with amazing venues and a great community atmosphere.
Sessions we’re running over the week include:
- Gianni Ceresa : 2-in-1: RPD Magic and Hyperion Planning “Adapter”
- Jerome : Manage Your Oracle Data Integrator Development Lifecycle
- Michael Rainey : Practical Tips for Oracle Business Intelligence Applications 11g Implementations
- Michael Rainey : GoldenGate and Oracle Data Integrator: A Perfect Match
- Mark Rittman : Bringing Oracle Big Data SQL to OBIEE and ODI
- Mark Rittman : End-to-End Hadoop Development Using OBIEE, ODI, and Oracle Big Data
- Mark Rittman : Thursday Deep Dive – Business Intelligence: Bringing Oracle Tools to Big Data
- Andy Rocha & Pete Tamisin : OBIEE Can Help You Achieve Your GOOOOOOOOOALS!
We’ll also be taking part in various “Lunch and Learn” sessions, community and ACE/ACE Director events, and you can also talk to us about our new OBIEE “User Engagement” initiative and how you can get involved as an early adopter. Details and agenda for KScope’15 can be found on the event website, and if you’re coming we’ll look forward to seeing you in sunny Hollywood, Florida!
Replicating Hive Data Into Oracle BI Cloud Service for Visual Analyzer using BICS Data Sync
In yesterday’s post on using Oracle Big Data Discovery with Oracle Visual Analyzer in Oracle BI Cloud Service, I said mid-way through the article that I had to copy the Hadoop data into BI Cloud Service so that Visual Analyzer could use it; at present Oracle Visual Analyzer is only available as part of Oracle BI Cloud Service (BICS) so at some point the data prepared by Big Data Discovery had to be moved into BICS so that Visual Analyzer (VA) could access it. In the future once Visual Analyzer is available on-premise as part of Oracle Business Intelligence 12c we’ll be able to connect the on-premise RPD directly to Hadoop via the Cloudera Impala ODBC driver, but for now to get this early access to VA features we’re going to have to copy the data up to BICS and report on it from there. So how does this work?
With this second release of BICS there are actually a number of ways to get on-premise data up into BICS’s accompanying database service:
- As before, you can export data as CSV or an Excel Spreadsheet, and upload it manually into BICS using the Data Load feature (the approach I took in this recent Oracle Magazine article)
- You can use SQL*Developer to SFTP “carts” of Oracle database data up into BICS, where it’ll then be unpacked and used to create Oracle tables in the accompanying database
- You can now also connect BICS to the full Oracle Database-as-a-Service, a full database rather than a single schema that also provides a SQL*Net connection that ETL tools can connect to, for example ODI or Informatica
- And there’s now a new utility called “Data Sync” that we’ll use in this example, to replicate tables or files up into BICS’s database store with options for incremental refresh, drop-and-reload and so forth
In our case the situation is a bit more complicated in that our data sits in a Hadoop cluster, as Hive tables that we’re accessing through the Cloudera Impala MPP engine. OBIEE can actually connect directly to Impala and if we were just using Answers and Dashboards we wouldn’t have any more work to do, but as we’re using VA through BICS and BICS can’t access on-premise data sources, we need some way of copying the data up into BICS so VA can access it. Again, there’s many ways you can get data out of Hive on Hadoop and into databases and files, but the approach I took is this:
- First export each of the Hive tables I accessed through the on-premise RPD into CSV files, in my case using the Hue web-based user interface in CDH5
- Then use the Data Sync to upload the contents of those CSV files to BICS’s database store, selecting the correct Oracle datatypes for each of the columns
- Do any modeling on those tables to add any sequences or keys that I’m going to need when working with BICS’s more simplistic RPD modeller
- Then create a replica (or as close to replica) RPD model in BICS to support the work I’m going to want to do with VA
Again, there are also other ways to do this – another option is to just lift-and-shift the current RPD up into BICS, and replicate the Hive/CSV data into Oracle Database-as-a-Service and then repoint the uploaded RPD to this service, but I haven’t got a DBaaS instance to-hand and I think it’d be more useful to replicate using BICS and recreate the RPD manually – as that’s what most customers using BICS will end-up doing. So the first step then is to export the Hive data out into CSV files using Hue, by first running a SELECT * FROM … for each table, then using the menu option to export the query results to a CSV file on my workstation.
Then it’s a case of setting up BICS Data Sync to first connect to my BICS cloud instance, and then selecting one-by-one the CSV files that I’ll be uploading into BICS via this tool.
Of course anyone who’s been around Oracle BI for a while will recognise Data Sync as being built on the DAC, the ETL orchestration tool that came with the 7.9.x releases of BI Apps and worked in-conjunction with Informatica PowerCenter to load data into the BI Apps data warehouse. The DAC is actually a pretty flexible tool (disclaimer – I know the development PMs at Redwood Shores and think they’re a pretty cool bunch of people) and more recently it gained the ability to replicate BI Apps DW data into TimesTen for use with Exalytics, so it’s pluggable architecture and active development team meant it provided a useful platform to deliver something in-between BICS’s ApEX data uploader and fully-fledged ODI loading into Oracle DBaaS. The downside of using something built on the DAC is that the DAC had some UI “quirks”, but equally the upside is that if you know the DAC, you can pretty much pick up Data Sync and guess how it works.
As part of uploading each CSV file, I also get to sample the file contents and confirm the datatype choices that Data Sync has suggested; these can of course be amended, and if I’m bringing in data from Oracle, for example, I wouldn’t need to go through such an involved process.
Then it’s a case of uploading the data. In my case one of the tables uploaded OK first time, but an issue I hit was where Hive tables had numeric columns containing NULLs that got exported as the text “NULL” and then caused the load to fail when trying to insert them into numeric columns. Again, a bit of knowledge of how the DAC worked came in useful as I went through the log files and then restarted parts of the load – in the end I replaced the word NULL with an empty string and the loads then succeeded.
Now the data should be uploaded to BICS, you can check out the new tables and their contents either from within BICSs Data Modeller function, or from within the ApEx console that comes with BICS’s database part.
One thing I did know I’d have to give some thought to was how to do the types of RPD modelling I’d done in the on-premise RPD, within the more constrained environment of the BICS data modeller. Looking back at the on-premise RPD I’ve made a lot of use of aliases to create fact and dimension versions of key log tables (posts, tweets) and multiple versions of the date dimensions, whereas in BICS you don’t get aliases but you can create database views. What was more worrying was that I’d used columns from the main webserver log table to populate both the main logical fact table and another dimension whilst still keeping a single source table as their physical source, but in BICS I’d have to create these two sources as views and then join them on a common key, which would be tricky as the log table in Hive didn’t have an obvious primary key. In the end I “cheated” a bit and created a derived copy of the incoming log file table with a sequence number added to it, so that I could then join both the derived fact table and dimension table on this synthetic unique key column.
Now it’s a case of modelling out the various incoming tables uploaded via Data Sync into the facts and dimensions that the BICS data model will use; again something to be aware of is that each of these tables will need to join to its relevant dimensions or facts, so you need to leave the joining keys in the fact table rather than remove them as you’d do when creating logical fact tables in on-premise OBIEE.
Tables that only perform one role, for example the IMP_RM_POSTS_VA table that contains details of all blog posts and web pages on our site, can be brought into the model as they are without creating views. For the second time when I add in the time dimension table, this time to create a time dimension role table for the Tweets fact table, I have to create a view over the table that performs a similar role to alias tables in on-premise OBIEE, and I’m then directed to create a fact or dimension object in the model from that view.
Once this is all done, I end up with a basic BICS data model that’s starting to look like the one I had with the on-premise OBIEE install.
Then finally, once I’d amended all the column names, brought in all of the additional columns and lookup tables to provide for example lists of Twitter user handles, I could then view the model in BICS’s Visual Analyzer and start produce data visualisation projects off of it.
So – it’s admittedly a bit convoluted in the first iteration but once you’ve set up the BICS data model and the Data Sync upload process, you can use DataSync to refresh the replicated Hive table data in the BICS database instance and keep the two systems in-sync. As I said, OBIEE12c will come with Visual Analyzer as part of the on-premise install, but until then this is the way we link VA to Big Data Discovery on Hadoop to enable Visual Analyzer access to BDD’s enriched datasets.
Combining Oracle Big Data Discovery and Oracle Visual Analyzer on BICS
So now that Oracle Visual Analyzer is out as part of Oracle BI Cloud Service, and Visual Analyzer (VA) is due to ship on-premise as part of OBIEE12c sometime in the next twelve months, several of our customers have asked us if they need both VA and Oracle Big Data Discovery if they’re looking to analyse Hadoop data as part of a BI project. It’s an interesting question so I thought it’d be useful to go through my thoughts on how the two tools work together, when to use one, and when to use the other.
Taking our standard “big data” dataset of website log activity, Twitter mentions and page details from our WordPress blogging software, before Visual Analyzer came along the two usual ways we’d want to analyze these datasets is either a traditional BI metrics analysis-type scenario, and a data discovery/visualization scenario where we’re more interested in the content of the data rather than precise metrics. My half of the recent BI Forum 2015 Masterclass goes through these two scenarios in detail (presentation slides in PDF format here), and it’s Big Data Discovery that provides the more “Tableau”-type experience with fast point-and-click access to both datasets joined together on their common website page URL details.
Now we have Visual Analyzer though, things get interesting; in my article on Visual Analyzer within BICS I showed a number of data visualisations that look pretty similar to what you’d get with Big Data Discovery, and when we have VA available on-site as part of OBIEE12c we’ll be able to connect it directly to Hadoop via Cloudera Impala, potentially analyzing the whole dataset rather than the (representative) sample that Big Data Discovery loads into its Endeca Server-based engine.
So if the customer is looking to analyze data held in Hadoop and Visual Analyzer is available, where’s the value in Big Data Discovery (BDD)? To my mind there’s three areas where BDD goes beyond what VA can do, or helps you perform tasks that you’ll need to do before you can work with your data in VA:
- The initial data discovery, preparation and cleansing that you’d otherwise have to do using HiveQL or an ETL tool such as ODI12c
- Providing you with a high-level overview and landscape of your data, when that’s more important to you at the time than precise counts and drill-down analysis
- Understanding how data joins together, and how best to use your datasets in terms of metrics, facts, dimensions and so forth
Taking the data preparation and cleansing part first, I’ve covered in several blogs over the past couple of years how tools such as ODI can be used to create formal, industrialized data pipelines to ingest, prepare and then summarise data coming into your Hadoop system, and how you can drop-down to languages such as HiveQL, Pig and Spark to code these data transformations yourself. In the case of my webserver log, twitter and page details datasets this work would include standardising URL formats across the three sources, geocoding the IP addresses in the access logs to derive the country and city for site visitors, turning dates and times in different formats into ones that work as Hive timestamps, and so forth. Doing this all using ODI and/or HiveQL can be a pretty technical task, so where BDD comes in useful even – if VA and an OBIEE RPD is the final destination for the data.
Datasets that you transform and enrich in Big Data Discovery can be saved back to Hive as new Hive tables, or exported out as files for you to load into Oracle using SQL*Developer, or upload into BICS to use in Visual Analyzer. Where BDD then becomes useful is giving you a quick, easy to use overview of your dataset before you get into the serious business of defining facts, dimensions and aliases against these three Hive tables. The screenshots below show a couple of typical Big Data Discovery Studio data visualisation pages against the webserver logs dataset, and you can see how easy it is to create simple charts, tag clouds and maps against the data you’re working with – the aim being to give you an overview of the data you’re working with, help you understand its contents and “shape”, before moving further down the curation process and applying formal structures to the data.
Where things get harder to do within Big Data Discovery is when more-and-more formatting, complex joining and “arranging” of the data is required; for example, BDD gives you a lot of flexibility in how you join datasets, but this flexibility can be confusing for end-users if they’re then presented with every possible variation of a three-table join rather than having the data presented to them as simple facts and dimensions. But this is how we’d really expect it – if you go back to the logical data architecture I went through in the blog post a while ago about the updated Oracle Information Management Reference Architecture, the trade-off in using schema-on-read data reservoirs is that this data, although quick and cheap to store, requires a lot more work to be done each time you access the data to get “value” from it.
OBIEE, in contrast, makes you define your data structures in-full before you present data to end-users, dividing data in the three datasets into measures (for the fact tables) and attributes (for dimensions) and making it possible to add more dimension lookups (for a date dimension, for Twitter users in this case) and separate the overall set of data into more focused subject areas. Working with the dataset on the on-premise version of OBIEE first, the RPD that I created to present this data in a more formal, dimensional and hierarchical way to users looked like this:
I can leave this RPD connected directly to the underlying Hive and Impala tables if I want to use just Answers and Dashboards, but for the time being I either need to export the underlying Hive tables into CSV files or into an Oracle Database before uploading into Visual Analyzer, but come OBIEE12c this should all be seamless. What users are then presented with when they go into Visual Analyzer is then something like this:
Notice how the various attributes of interest are grouped into fact and dimension table folders, and there’s a simple means to add calculations, change the visualisation type and swap chart settings around. Note also that the count on the screen is the actual count of records in the full dataset, not the sample that BDD takes in order to provide an overview of values and distribution in the full dataset. Whilst it’s relatively easy to create a line chart, for example, to show tweets per user within BDD, using Visual Analyzer it’s just a case of double-clicking on the relevant measures and attributes on one side of the page, selecting and arranging the visualisation and applying any filters using dialog boxes and value-selectors – all much more familiar and obvious to BI users.
Enrichment to the data that I’ve done in Big Data Discovery should in most cases be able to come through to Visual Analyzer; for example, I used Big Data Discovery’s text enrichment features to determine the sentiment of blog post titles, tweets and other commentary data, I could use the latitude and longitude values derived during the visitor IP address geocoding to plot site visitors on a map. Using the sentiment value derived from the post title, tweet contents and other textual data, I can create a chart of our most popular posts mentioned on Twitter and colour bars to show how positive, or negative, the comments about the post were.
The only thing that Visual Analyzer can’t yet do that would be useful, is to be able to include more than one subject area in a project. To analyze the number of tweets and the number of page views for posts in a scatter chart, for example, I currently have to create a separate subject area that includes both sets of facts and dimensions, though I understand BICS on VA will have the ability to include multiple subject areas in a forthcoming release.
So in summary, I’d say that Big Data Discovery, and Visual Analyzer as part of BI Cloud Service, are complementary tools rather than one being able to replace the other in a big data context. I find that Big Data Discovery is a great tool to initially understand, catalog and view at a high-level data sources going into VA, and then to do some user-driven cleaning-up of the data, enhancing it and enriching it before committing it to the formal dimensional model that Visual Analyzer requires.
In its BICS guise there’s the additional step of having to export the Hadoop data out of your Big Data Appliance or other Hadoop cluster and upload it in the form of files using BICS’s data load or the new Data Sync utility, but when VA comes as part of OBIEE12c in the next twelve months you’ll be able to directly connect to the Hadoop cluster using Impala ODBC and analyse the data directly, in-place.
I’ll be covering more on BICS over the next few weeks, including how I got data from Hadoop into BICS using the new Data Sync utility.
Connecting OBIEE to Hive, HBase and Impala Tables for a DW-Offloading Project
In two previous posts this week I talk about a client request to offload part of their data warehouse top Hadoop, taking data from a source application and loading it into Hive tables on Hadoop for subsequent reporting-on by OBIEE11g. In the first post I talked about hosting the offloaded data warehouse elements on Cloudera Hadoop CDH5.3, and how I used Apache Hive and Apache HBase to support insert/update/delete activity to the fact and dimension tables, and how we’d copy the Hive-on-HBase fact table data into optimised Impala tables stored in Parquet files to make sure reports and dashboards ran fast.
In the second post I got into the detail of how we’d keep the Hive-on-HBase tables up-to-date with new and changed data from the source system, using HiveQL bulk-inserts to load up the initial table data and a Python script to handle subsequent inserts, updates and deletes by working directly with the HBase Client and the HBase Thrift Server. Where this leaves us at the end then is with a set of fact and dimension tables stored as optimised Impala tables and updatable Hive-on-HBase tables, and our final step is to connect OBIEE11g to it and see how it works for reporting.
As I mentioned in another post a week or so ago, the new release of OBIEE11g supports Cloudera Impala connections from Linux servers to Hadoop, with the Linux Impala drivers being shipped by Oracle as part of the Linux download and the Windows ones used for the Admin Tool workstation downloadable directly from Cloudera. Once you’ve got all the drivers and OBIEE software setup, it’s then just a case of setting up the ODBC connections on the Windows and Linux environments, and you should then be in a position to connect it all up.
In the Impala side, I first need to create a copy of the Hive-on-HBase table I’ve been using to load the fact data into from the source system, after running the invalidate metadata command to refresh Impala’s view of Hive’s metastore.
[oracle@bigdatalite~]$impala-shell [bigdatalite.localdomain:21000]>invalidate metadata; [bigdatalite.localdomain:21000]>create table impala_flight_delays >stored as parquet >as select *from hbase_flight_delays;
Next I import the Hive-on-HBase and the Impala table through the Impala ODBC connection – even though only one of the tables (the main fact table snapshot copy) was created using Impala, I still get the Impala speed benefit for the other three tables created in Hive (against the HBase source, no less). Once the table metadata is imported into the RPD physical layer, I can then create a business model and subject area as I would do normally, so my final RPD looks like this:
Now it’s just a case of saving the repository online and creating some reports. If you’re using an older version of Impala you may need to disable the setting where a LIMIT clause is needed for every GROUP BY (see the docs for more details, but recent (CDH5+) versions will work fine without this). Something you’ll also need to do back in Impala is compute statistics for each of the tables, like this:
[bigdatalite.localdomain:21000] > compute stats default.impala_flight_delays; Query: compute stats default.impala_flight_delays +-----------------------------------------+ | summary | +-----------------------------------------+ | Updated 1 partition(s) and 8 column(s). | +-----------------------------------------+ Fetched 1 row(s) in 2.73s [bigdatalite.localdomain:21000] > show table stats impala_flight_delays; Query: show table stats impala_flight_delays +---------+--------+---------+--------------+---------+-------------------+ | #Rows | #Files | Size | Bytes Cached | Format | Incremental stats | +---------+--------+---------+--------------+---------+-------------------+ | 2514141 | 1 | 10.60MB | NOT CACHED | PARQUET | false | +---------+--------+---------+--------------+---------+-------------------+ Fetched 1 row(s) in 0.01s
Fetched 1 row(s) in 0.01s
Apart from being generic “good practice” and giving the Impala query optimizer better information to form a query plan with, you might hit the error below in OBIEE if you don’t do this.
If you do hit this error, go back to the Impala Shell or Hue and compute statistics, and it should go away next time. Then, finally, you can go and create some analyses and dashboards and you should find the queries run fine against the various tables in Hadoop, and moreover the response time is excellent if you use Impala as the main query engine.
I did a fair bit of testing of OBIEE running against Cloudera Impala, and my findings were that all of the main analysis features worked (prompts, hierarchies, totals and subtotals etc) and the response time was comparable with a well-turned data warehouse, maybe even Exalytics-level of speed. If you take a look at the nqquery.log file for the Impala SQL queries OBIEE is sending to Impala, you can see they get fairly complex (which is good, as I didn’t hit any errors when running the dashboards) and you can also see where the BI Server takes a more simple approach to creating subtotals, nested queries etc compared to the GROUP BY … GROUPING SETS that you get when using a full Oracle database.
select D1.c1 as c1, D1.c2 as c2, D1.c3 as c3, D1.c4 as c4, D1.c5 as c5, D1.c6 as c6, D1.c7 as c7, D1.c8 as c8, D1.c9 as c9, D1.c10 as c10, D1.c11 as c11, D1.c12 as c12 from (select 0 as c1, D1.c3 as c2, substring(cast(NULL as STRING ), 1, 1 ) as c3, substring(cast(NULL as STRING ), 1, 1 ) as c4, substring(cast(NULL as STRING ), 1, 1 ) as c5, 'All USA' as c6, substring(cast(NULL as STRING ), 1, 1 ) as c7, 1 as c8, substring(cast(NULL as STRING ), 1, 1 ) as c9, substring(cast(NULL as STRING ), 1, 1 ) as c10, D1.c2 as c11, D1.c1 as c12 from (select sum(T44037.late) as c1, sum(T44037.flights) as c2, T43925.carrier_desc as c3 from hbase_carriers T43925 inner join impala_flight_delays T44037 On (T43925.key = T44037.carrier) where ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) group by T43925.carrier_desc ) D1 union all select 1 as c1, D1.c3 as c2, substring(cast(NULL as STRING ), 1, 1 ) as c3, substring(cast(NULL as STRING ), 1, 1 ) as c4, D1.c4 as c5, 'All USA' as c6, substring(cast(NULL as STRING ), 1, 1 ) as c7, 1 as c8, substring(cast(NULL as STRING ), 1, 1 ) as c9, D1.c4 as c10, D1.c2 as c11, D1.c1 as c12 from (select sum(T44037.late) as c1, sum(T44037.flights) as c2, T43925.carrier_desc as c3, T43928.dest_state as c4 from hbase_carriers T43925 inner join impala_flight_delays T44037 On (T43925.key = T44037.carrier) inner join hbase_geog_dest T43928 On (T43928.key = T44037.dest) where ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) group by T43925.carrier_desc, T43928.dest_state ) D1 union all select 2 as c1, D1.c3 as c2, substring(cast(NULL as STRING ), 1, 1 ) as c3, D1.c4 as c4, D1.c5 as c5, 'All USA' as c6, substring(cast(NULL as STRING ), 1, 1 ) as c7, 1 as c8, D1.c4 as c9, D1.c5 as c10, D1.c2 as c11, D1.c1 as c12 from (select sum(T44037.late) as c1, sum(T44037.flights) as c2, T43925.carrier_desc as c3, T43928.dest_city as c4, T43928.dest_state as c5 from hbase_carriers T43925 inner join impala_flight_delays T44037 On (T43925.key = T44037.carrier) inner join hbase_geog_dest T43928 On (T43928.key = T44037.dest and T43928.dest_state = 'Georgia') where ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) group by T43925.carrier_desc, T43928.dest_city, T43928.dest_state ) D1 union all select 3 as c1, D1.c3 as c2, D1.c4 as c3, D1.c5 as c4, D1.c6 as c5, 'All USA' as c6, D1.c4 as c7, 1 as c8, D1.c5 as c9, D1.c6 as c10, D1.c2 as c11, D1.c1 as c12 from (select sum(T44037.late) as c1, sum(T44037.flights) as c2, T43925.carrier_desc as c3, T43928.dest_airport_name as c4, T43928.dest_city as c5, T43928.dest_state as c6 from hbase_carriers T43925 inner join impala_flight_delays T44037 On (T43925.key = T44037.carrier) inner join hbase_geog_dest T43928 On (T43928.key = T44037.dest and T43928.dest_city = 'Atlanta, GA') where ( T43925.carrier_desc = 'American Airlines Inc.' or T43925.carrier_desc = 'Delta Air Lines Inc.' or T43925.carrier_desc = 'Southwest Airlines Co.' or T43925.carrier_desc = 'Spirit Air Lines' or T43925.carrier_desc = 'Virgin America' ) group by T43925.carrier_desc, T43928.dest_airport_name, T43928.dest_city, T43928.dest_state ) D1 ) D1 order by c1, c6, c8, c5, c10, c4, c9, c3, c7, c2 limit 65001
Not bad though for a data warehouse offloaded entirely to Hadoop, and it’s good to see such a system handling full updates and deletes to data as well as insert appends, and it’s also good to see OBIEE working against an Impala datasource and with such good response times. If any of this interests you as a potential customer, feel free to drop me an email at mark.rittman@rittmanmead.com, or check-out our Big Data Quickstart page on the website.