Tag Archives: Big Data

OBIEE and ODI on Hadoop : Next-Generation Initiatives To Improve Hive Performance

The other week I posted a three-part series (part 1, part 2 and part 3) on going beyond MapReduce for Hadoop-based ETL, where I looked at a typical Apache Pig dataflow-style ETL process and showed how Apache Tez and Apache Spark can potentially make these processes run faster and make better use of in-memory processing. I picked Pig as a data processing environment as the multi-step data transformations creates translate into lots of separate MapReduce jobs in traditional Hadoop ETL environments, but run as a single DAG (directed acyclic graph) under Tez and Spark and can potentially use memory to pass intermediate results between steps, rather than writing all those intermediate datasets to disk.

But tools such as OBIEE and ODI use Apache Hive to interface with the Hadoop world, not Pig, so its improvements to Hive that will have the biggest immediate impact on the tools we use today. And what’s interesting is the developments and work thats going on around Hive in this area, with four different “next-generation Hive” initiatives going on that could end-up making OBIEE and ODI on Hadoop run faster:

  • Hive-on-Tez (or “Stinger”), principally championed by Hortonworks, along with Stinger.next which will enable ACID transactions in HiveQL
  • Hive-on-Spark, a more limited port of Hive to run on Spark and backed by Cloudera amongst others
  • Spark SQL within Apache Spark, which enables SQL queries against Spark RDDs (and Hive tables), and exposes a HiveServer2-compatible Thrift Server for JDBC access
  • Vendor initiatives that build on Hive but are mainly around integration with their RDBMS engines, for example Oracle Big Data SQL

Vendor initiatives like Oracle’s Big Data SQL and Cloudera Impala have the benefit of working now (and are supported), but usually come with some sort of penalty for not working directly within the Hive framework. Oracle’s Big Data SQL, for example, can read data from Hive (very efficiently, using Exadata SmartScan-type technology) but then can’t write-back to Hive, and currently pulls all the Hive data into Oracle if you try and join Oracle and Hive data together. Cloudera’s Impala, on the other hand, is lightening-fast and works directly on the Hadoop platform, but doesn’t support the same ecosystem of SerDes and storage handlers that Hive supports, taking away one of the key flexibility benefits of working with Hive. 

So what about the attempts to extend and improve Hive, or include Hive interfaces and compatibility in Spark? In most cases an ETL routine written as a series of Hive statements isn’t going to be as fast or resource-efficient as a custom Spark program, but if we can make Hive run faster or have a Spark application masquerade as a Hive database, we could effectively give OBIEE and ODI on Hadoop a “free” platform performance upgrade without having to change the way they access Hadoop data. So what are these initiatives about, and how usable are they now with OBIEE and ODI?

Probably the most ambitious next-generation Hive project is the Stinger initiative. Backed by Hortonworks and based on the Apache Tez framework that runs on Hadoop 2.0 and YARN. Stinger aimed first to port Hive to run on Tez (which runs MapReduce jobs but enables them to potentially run as a single DAG), and then add ACID transaction capabilities so that you can UPDATE and DELETE from a Hive table as well as INSERT and SELECT, using a transaction model that allows you to roll-back uncommitted changes (diagram from the Hortonworks Stinger.next page)

NewImage

Tez is more of a set of developer APIs rather than the full data discovery / data analysis platform that Spark aims to provide, but it’s a technology that’s available now as part of Hortonworks HDP2.2 platform and as I showed in the blog post a few days ago, an existing Pig script that you run as-is on a Tez environment typically runs twice as fast as when its using MapReduce to move data around (with usual testing caveats applying, YMMV etc). Hive should be the same as well, giving us the ability to take Hive transformation scripts and run them unchanged except for specifying Tez at the start as the execution engine.

NewImage

Hive on Tez is probably the first of these initiatives we’ll see working with ODI and OBIEE, as ODI has just been certified for Hortonworks HDP2.1, and the new HDP2.2 release is the one that comes with Tez as an option for Pig and Hive query execution. I’m guessing ODI will need to have its Hive KMs updated to add a new option to select Tez or MapReduce as the underlying Hive execution engine, but otherwise I can see this working “out of the box” once ODI support for HDP2.2 is announced.

Going back to the last of the three blog posts I wrote on going beyond MapReduce, many in the Hadoop industry back Spark as the successor to MapReduce rather than Tez as its a more mature implementation that goes beyond the developer-level APIs that Tez aims to provide to make Pig and Hive scripts run faster. As we’ll see in a moment Spark comes with its own SQL capabilities and a Hive-compatible JDBC interface, but the other “swap-out-the-execution-engine” initiative to improve Hive is Hive on Spark, a port of Hive that allows Spark to be used as Hive’s execution engine instead of just MapReduce.

Hive on Spark is at an earlier stage in development than Hive on Tez with the first demo being given at the recent Strata + Hadoop World New York, and specific builds of Spark and versions of Hive needed to get it running. Interestingly though, a post went on the Cloudera Blog a couple of days ago announcing an Amazon AWS AMI machine image that you could use to test Hive on Spark, which though it doesn’t come with a full CDH or HDP installation or features such as a HiveServer JDBC interface, comes with a small TPC-DS dataset and some sample queries that we can use to get a feeling for how it works. I used the AMI image to create an Amazon AWS m3.large instance and gave it a go.

By default, Hive in this demo environment is configured to use Spark as the underlying execution engine. Running a couple of the TPC-DS queries first using this Spark engine, and then switching back to MapReduce by running the command “set hive.execution.engine=mr” within the Hive CLI, I generally found queries using Spark as the execution engine ran 2-3x faster than the MapReduce ones.

NewImage

You can’t read too much into this timing as the demo AMI is really only to show off the functional features (Hive using Spark as the execution engine) and no work on performance optimisation has been done, but it’s encouraging even at this point that it’s significantly faster than the MapReduce version.

Long-term the objective is to have both Tez and Spark available as options as execution engines under Hive, along with MapReduce, as the diagram below from a presentation by Cloudera’s Szenon Ho shows; the advantage of building on Hive like this rather than creating your own new SQL-on-Hadoop engine is that you can make use of the library of SerDes, storage handlers and so on that you’d otherwise need to recreate for any new tool.

NewImage

The third major SQL-on-Hadoop initiative I’ve been looking at is Spark SQL within Apache Spark. Unlike Hive on Spark which aims to swap-out the compiler and execution engine parts of Hive but otherwise leave the rest of the product unchanged, Apache Spark as a whole is a much more freeform, flexible data query and analysis environment that’s aimed more at analysts that business users looking to query their dataset using SQL. That said, Spark has some cool SQL and Hive integration features that make it an interesting platform for doing data analysis and ETL.

In my Spark ETL example the other day, I loaded log data and some reference data into RDDs and then filtered and transformed them using a mix of Scala functions and Spark SQL queries. Running on top of the set of core Spark APIs, Spark SQL allows you to register temporary tables within Spark that map onto RDDs, and give you the option of querying your data using either familiar SQL relational operators, or the more functional programming-style Scala language

NewImage

You can also create connections to the Hive metastore though, and create Hive tables within your Spark application for when you want to persist results to a table rather than work with the temporary tables that Spark SQL usually creates against RDDs. In the code example below, I create a HiveContext as opposed to the sqlContext that I used in the example on the previous blog, and then use that to create a table in my Hive database, running on a Hortonworks HDP2.1 VM with Spark 1.0.0 pre-built for Hadoop 2.4.0:

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.hql("CREATE TABLE posts_hive (post_id int, title string, postdate string, post_type string, author string, post_name string, generated_url string) row format delimited fields terminated by '|' stored as textfile")
scala> hiveContext.hql("LOAD DATA INPATH '/user/root/posts.psv' INTO TABLE posts_hive")

If I then go into the Hive CLI, I can see this new table listed there alongside the other ones:

hive> show tables;
OK
dummy
posts
posts2
posts_hive
sample_07
sample_08
src
testtable2
Time taken: 0.536 seconds, Fetched: 8 row(s)

What’s even more interesting is that Spark also comes with a HiveServer2-compatible Thrift Server, making it possible for tools such as ODI that connect to Hive via JDBC to run Hive queries through Spark, with the Hive metastore providing the metadata but Spark running as the execution engine.

NewImage

This is subtly different to Hive-on-Spark as Hive’s metastore, support for SerDes and storage handlers runs under the covers but Spark provides you with a full programmatic environment, making it possible to just expose Hive tables through the Spark layer, or mix and match data from RDDs, Hive tables and other sources before storing and then exposing the results through the Hive SQL interface. For example then, you could use Oracle SQL*Developer 4.1 with the Cloudera Hive JDBC drivers to connect to this Spark SQL Thrift Server and query the tables just like any other Hive source, but crucially the Hive execution is being done by Spark, rather than MapReduce as would normally happen.

NewImage

Like Hive-on-Spark, Spark SQL and Hive support within Spark SQL are at early stages, with Spark SQL not yet being supported by Cloudera whereas the core Spark API is. From the work I’ve done with it, it’s not yet possible to expose Spark SQL temporary tables through the HiveServer2 Thrift Server interface, and I can’t see a way of creating Hive tables out of RDDs unless you stage the RDD data to a file in-between. But it’s clearly a promising technology and if it becomes possible to seamlessly combine RDD data and Hive data, and expose Spark RDDs registered as tables through the HiveServer2 JDBC interface it could make Spark a very compelling platform for BI and data analyst-type applications. Oracle’s David Allen, for example, blogged about using Spark and the Spark SQL Thrift Server interface to connect ODI to Hive through Spark, and I’d imagine it’d be possible to use the Cloudera HiveServer2 ODBC drivers along with the Windows version of OBIEE 11.1.1.7 to connect to Spark in this way too – if I get some spare time over the Christmas break I’ll try and get an example working.

Rittman Mead BI Forum 2015 Call for Papers Now Open!

I’m very pleased to announce that the Call for Papers for the Rittman Mead BI Forum 2015 is now open, with abstract submissions open to January 18th 2015. As in previous years the BI Forum will run over consecutive weeks in Brighton, UK and Atlanta, GA, with the provisional dates and venues as below:

  • Brighton, UK : Hotel Seattle, Brighton, UK : May 6th – 8th 2015
  • Atlanta, GA : Renaissance Atlanta Midtown Hotel, Atlanta, USA : May 13th-15th 2015

Now on it’s seventh year, the Rittman Mead BI Forum is the only conference dedicated entirely to Oracle Business Intelligence, Oracle Business Analytics and the technologies and processes that support it – data warehousing, data analysis, data visualisation, big data and OLAP analysis. We’re looking for session around tips & techniques, project case-studies and success stories, and sessions where you’ve taken Oracle’s BI products and used them in new and innovative ways. Each year we select around eight-to-ten speakers for each event along with keynote speakers and a masterclass session, with speaker choices driven by attendee votes at the end of January, and editorial input from myself, Jon Mead and Charles Elliott and Jordan Meyer.

NewImage

Last year we had a big focus on cloud, and a masterclass and several sessions on bringing Hadoop and big data to the world of OBIEE. This year we’re interested in project stories and experiences around cloud and Hadoop, and we’re keen to hear about any Oracle BI Apps 11g implementations or migrations from the earlier 7.9.x releases. Getting back to basics we’re always interested in sessions around OBIEE, Essbase and data warehouse data modelling, and we’d particularly like to encourage session abstracts on data visualization, BI project methodologies and the incorporation of unstructured, semi-structured and external (public) data sources into your BI dashboards. For an idea of the types of presentations that have been selected in the past, check out the BI Forum 2014, 2013 and 2012 homepages, or feel free to get in touch via email at mark.rittman@rittmanmead.com

The Call for Papers entry form is here, and we’re looking for speakers for Brighton, Atlanta, or both venues if you can speak at both. All session this year will be 45 minutes long, all we’ll be publishing submissions and inviting potential attendees to vote on their favourite sessions towards the end of January. Other than that – have a think about abstract ideas now, and make sure you get them in by January 18th 2015.

Going Beyond MapReduce for Hadoop ETL Pt.3 : Introducing Apache Spark

In the first two posts in this three part series on going beyond MapReduce for Hadoop ETL, we looked at why MapReduce and Hadoop 1.0 was only really suitable for batch processing, and how the new Apache Tez framework enabled by Apache YARN on the Hadoop 2.0 platform can be swapped-in for MapReduce to improve the performance of existing Pig and Hive scripts. Today though in the final post I want to take a look at Apache Spark, the next-gen compute framework that Cloudera are backing as the long-term successor to MapReduce.

Like Tez, Apache Spark supports DAGs that describe the entire dataflow process, not just individual map and reduce jobs. Like Pig, it has a concept of datasets (Pig’s aliases and relations), but crucially these datasets (RDDs, or “resilient distributed datasets”) can be cached in-memory, fail-back gracefully to disk and can be rebuilt using a graph that says how to reconstruct. With Tez, individual jobs in the DAG can now hand-off their output to the next job in-memory rather than having to stage in HDFS, but Spark uses memory for the actual datasets and is a much better choice for the types of iterative, machine-learning tasks that you tend to do on Hadoop systems. Moreover, Spark has arguably a richer API and when used with Scala, a functional programming-orientated language that uses Java libraries and whose collections framework maps well on to the types of operations you’d want to make use of with dataflow-type applications on a cluster.

Spark can run standalone, on YARN or on other cluster management platforms, and comes with a handy command-line interpreter that you can use to interactively load, filter, analyse and work with RDDs. Cloudera CDH5.2 comes with Spark 1.0.1 and can either be configured standalone or to run on YARN, with Spark as a service added to nodes in the cluster using Cloudera Manager. 

NewImage

So looking back at the Pig example, we create the dataflow using a number of aliases in that case, that we progressively filter, transform, join together and then aggregate to get to the final top ten set of pages from the website logs. Translating that dataflow to Spark we end up with a similar set of RDDs that take our initial set of logs, apply transformations and join the datasets to store the final aggregated output back on HDFS.

NewImage

Spark supports in-memory sharing of data within a single DAG (i.e. RDD to RDD), but also between DAGs running in the same Spark instance. As such, Spark becomes a great framework for doing iterative and cyclic data analysis, and can make much better use of the memory on cluster servers whilst still using disk for overflow data and persistence.

Moreover, Spark powers a number of higher-level tools build on the core Spark engine to provide features like real-time loading and analysis (Spark Streaming), SQL access and integration with Hive (Spark SQL), machine learning (MLib) and so forth. In fact, as well as Hive and Pig being reworked to run on Tez there’s also projects underway to port them both to Spark, though to be honest they’re both at early stages compared to Tez integration and most probably you’ll be using Scala, Java or Python to work with Spark now.

NewImage

So taking the Pig script we had earlier and translating that to the same dataflow in Spark and Scala, we end up with something like this:

package com.cloudera.analyzeblog
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
case class accessLogRow(host: String, identity: String, user: String, time: String, request: String, status: String, size: String, referer: String, agent: String)
case class pageRow(host: String, request_page: String, status: String, agent: String)
case class postRow(post_id: String, title: String, post_date: String, post_type: String, author: String, url: String, generated_url: String)
object analyzeBlog {
        def getRequestUrl(s: String): String = {
        try {
                s.split(' ')(1)
        } catch {
                case e: ArrayIndexOutOfBoundsException => { "N/A" }
        }
}
        def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("analyzeBlog"))
val sqlContext = new SQLContext(sc)
import sqlContext._
val raw_logs = "/user/mrittman/rm_logs"
//val rowRegex = """^([0-9.]+)\s([\w.-]+)\s([\w.-]+)\s(\[[^\[\]]+\])\s"((?:[^"]|\")+)"\s(\d{3})\s(\d+|-)\s"((?:[^"]|\")+)"\s"((?:[^"]|\")+)"$""".r
val rowRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r

val logs_base = sc.textFile(raw_logs) flatMap {
                        case rowRegex(host, identity, user, time, request, status, size, referer, agent) =>
                                Seq(accessLogRow(host, identity, user, time, request, status, size, referer, agent))
                        case _ => Nil
                                }
val logs_base_nobots = logs_base.filter( r => ! r.request.matches(".*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*"))

val logs_base_page = logs_base_nobots.map { r =>
  val request = getRequestUrl(r.request)
  val request_formatted = if (request.charAt(request.length-1).toString == "/") request else request.concat("/")
  (r.host, request_formatted, r.status, r.agent)
}

val logs_base_page_schemaRDD = logs_base_page.map(p => pageRow(p._1, p._2, p._3, p._4))

logs_base_page_schemaRDD.registerAsTable("logs_base_page")

val page_count = sql("SELECT request_page, count(*) as hits FROM logs_base_page GROUP BY request_page").registerAsTable("page_count")

val postsLocation = "/user/mrittman/posts.psv"

val posts = sc.textFile(postsLocation).map{ line =>
        val cols=line.split('|')

        postRow(cols(0),cols(1),cols(2),cols(3),cols(4),cols(5),cols(6).concat("/"))
}

posts.registerAsTable("posts")

val pages_and_posts_details = sql("SELECT p.request_page, p.hits, ps.title, ps.author FROM page_count p JOIN posts ps ON p.request_page = ps.generated_url ORDER BY hits DESC LIMIT 10")

pages_and_posts_details.saveAsTextFile("/user/mrittman/top_10_pages_and_author4")

        }
}

I’ll do a code-walkthrough for this Spark application in a future post, but for now note the map and flatMap Scala collection functions used to transform RDDs, and the sql(“…”) function that allows us to register RDDs as tables and then manipulate the contents using SQL, including joining to other RDDs registered as tables. For now though, let’s run the application on the CDH5.2 using YARN and see how long it takes to process the same set of log files (remember, the Pig script on this CDH5.2 cluster took around 5 minutes to run, and the Pig on Tez version on the Hortonworks cluster was around 2.5 minutes:

[mrittman@bdanode1 analyzeBlog]$ spark-submit --class com.cloudera.analyzeblog.analyzeBlog --master yarn target/analyzeblog-0.0.1-SNAPSHOT.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/06 19:18:25 INFO SecurityManager: Changing view acls to: mrittman
14/12/06 19:18:25 INFO SecurityManager: Changing modify acls to: mrittman
...
14/12/06 19:19:41 INFO DAGScheduler: Stage 0 (takeOrdered at basicOperators.scala:171) finished in 3.585 s
14/12/06 19:19:41 INFO SparkContext: Job finished: takeOrdered at basicOperators.scala:171, took 53.591560036 s
14/12/06 19:19:41 INFO SparkContext: Starting job: saveAsTextFile at analyzeBlog.scala:56
14/12/06 19:19:41 INFO DAGScheduler: Got job 1 (saveAsTextFile at analyzeBlog.scala:56) with 1 output partitions (allowLocal=false)
14/12/06 19:19:41 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at analyzeBlog.scala:56)
14/12/06 19:19:41 INFO DAGScheduler: Parents of final stage: List()
14/12/06 19:19:41 INFO DAGScheduler: Missing parents: List()
14/12/06 19:19:41 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[15] at saveAsTextFile at analyzeBlog.scala:56), which has no missing parents
14/12/06 19:19:42 INFO MemoryStore: ensureFreeSpace(64080) called with curMem=407084, maxMem=278302556
14/12/06 19:19:42 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 62.6 KB, free 265.0 MB)
14/12/06 19:19:42 INFO MemoryStore: ensureFreeSpace(22386) called with curMem=471164, maxMem=278302556
14/12/06 19:19:42 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 21.9 KB, free 264.9 MB)
14/12/06 19:19:42 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on bdanode1.rittmandev.com:44486 (size: 21.9 KB, free: 265.3 MB)
14/12/06 19:19:42 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0
14/12/06 19:19:42 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (MappedRDD[15] at saveAsTextFile at analyzeBlog.scala:56)
14/12/06 19:19:42 INFO YarnClientClusterScheduler: Adding task set 3.0 with 1 tasks
14/12/06 19:19:42 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 215, bdanode5.rittmandev.com, PROCESS_LOCAL, 3331 bytes)
14/12/06 19:19:42 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on bdanode5.rittmandev.com:13962 (size: 21.9 KB, free: 530.2 MB)
14/12/06 19:19:42 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 215) in 311 ms on bdanode5.rittmandev.com (1/1)
14/12/06 19:19:42 INFO YarnClientClusterScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/12/06 19:19:42 INFO DAGScheduler: Stage 3 (saveAsTextFile at analyzeBlog.scala:56) finished in 0.312 s
14/12/06 19:19:42 INFO SparkContext: Job finished: saveAsTextFile at analyzeBlog.scala:56, took 0.373096676 s

It ran in just over a minute in the end, and most of that was around submitting the job to YARN – not bad. We’ll be covering more of Spark on the blog over the next few weeks including streaming and machine learning examples, and connecting it to ODI and OBIEE via Hive on Spark, and Spark SQL’s own Hive-compatible Thrift server. I’ll also be taking a look at Pig on Spark (or “Spork”…) to see how well that works, and most interestingly how Pig and Hive on Spark compares to running them on Tez – watch this space as they say.

Going Beyond MapReduce for Hadoop ETL Pt.2 : Introducing Apache YARN and Apache Tez

In the first post in this three part series on going beyond MapReduce for Hadoop ETL, I looked at how a typical Apache Pig script gets compiled into a series of MapReduce jobs, and those MapReduce jobs pass data between themselves by writing intermediate resultsets to disk (HDFS, the Hadoop cluster file system). As a reminder, here’s the Pig script we’re working with:

register /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar
raw_logs = LOAD '/user/mrittman/rm_logs' USING TextLoader AS (line:chararray);
logs_base = FOREACH raw_logs
GENERATE FLATTEN
  (REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')
)AS
  (remoteAddr: chararray, remoteLogname: chararray, user: chararray,time: chararray, request: chararray, status: chararray, bytes_string: chararray,referrer:chararray,browser: chararray);
logs_base_nobots = FILTER logs_base BY NOT (browser matches '.*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*');
logs_base_page = FOREACH logs_base_nobots GENERATE SUBSTRING(time,0,2) as day, SUBSTRING(time,3,6) as month, SUBSTRING(time,7,11) as year, FLATTEN(STRSPLIT(request,' ',5)) AS (method:chararray, request_page:chararray, protocol:chararray), remoteAddr, status;
logs_base_page_cleaned = FILTER logs_base_page BY NOT (SUBSTRING(request_page,0,3) == '/wp' or request_page == '/' or SUBSTRING(request_page,0,7) == '/files/' or SUBSTRING(request_page,0,12) == '/favicon.ico');
logs_base_page_cleaned_by_page = GROUP logs_base_page_cleaned BY request_page;
page_count = FOREACH logs_base_page_cleaned_by_page GENERATE FLATTEN(group) as request_page, COUNT(logs_base_page_cleaned) as hits;
page_count_sorted = ORDER page_count BY hits DESC;
page_count_top_10 = LIMIT page_count_sorted 10;
posts = LOAD '/user/mrittman/posts.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage() as (post_id:int,title:chararray,post_date:chararray,post_type:chararray,author:chararray,url:chararray,generated_url:chararray);
posts_cleaned = FOREACH posts GENERATE CONCAT(generated_url,'/') as page_url,author as author, title as title;
pages_and_post_details = JOIN page_count by request_page, posts_cleaned by page_url;
pages_and_posts_trim = FOREACH pages_and_post_details GENERATE page_count::request_page as request_page, posts_cleaned::author as author, posts_cleaned::title as title, page_count::hits as hits;
pages_and_posts_sorted = ORDER pages_and_posts_trim BY hits DESC;
pages_and_post_top_10 = LIMIT pages_and_posts_sorted 10;
store pages_and_post_top_10 into 'top_10s/pages';

When you submit the script for execution using the Pig client, it then parses the Pig Latin script, logically optimizes it and then compiles it into MapReduce programs; these programs are then sent in-turn to the Hadoop 1.0 Job Manager which then sends them for execution on the Hadoop cluster – in the case of our Pig script, there’s five MapReduce programs generated in-total.

NewImage

Each one of these MapReduce programs are what’s called “directed acyclic graphs”, or DAGs. A directed acyclic graph is a programming style within distributed computing where processing is broken down into functions that can be run independently of one another, as long as one is not an ancestor of another. In MapReduce terms, this means that all mappers can run independently of each other (and therefore on different nodes in a cluster) and it’s only the reducers that have to wait for their ancestors to finish before they can also start their work independently of the other reducers. It’s a great programming model for processing large amounts of data with fault tolerance across a cluster and it was the key insight that made MapReduce possible and the “big data” systems that we work with today.

NewImage

A Pig script that generates five MapReduce jobs then effectively creates five DAGs, with each one using files (HDFS) to persist and hand-off data between them and JVMs being spun-up for the individual map and reduce jobs. As such, there’s no way for this version of MapReduce and the Hadoop framework it runs on to consider the overall dataflow, and each DAG has to run in isolation.

NewImage

To address this issue, version 2.0 of Hadoop introduced a new feature called Apache YARN, or “yet another resource negotiator”. YARN took on the resource management and job scheduling/monitoring parts of Hadoop and made it so that YARN then effectively became an “operating system” for Hadoop that then allowed frameworks to run on it; initially MapReduce2 (reworked to run on YARN), but since then other ones like Apache Tez, and Apache Spark.

NewImage

YARN also crucially supported frameworks that used DAGs that describe the entire dataflow, not just individual MapReduce jobs, and a new framework that came out of this that used that new capability was Apache Tez. Tez is a generalisation of the MapReduce distributed compute framework that supports these dataflow-style DAGs and runs either MapReduce code unaltered, or has its own API for describing DAGs using vertexes  (logic and resources) or edges (connections). Both Hive and Pig have been ported to run on Tez, and in Pig’s case this means another type of compiler is added to the existing MapReduce one, and Pig scripts executed on Tez can now submit a DAG that encompasses all stages in the dataflow and the reducers and be linked-together via in-memory passing of intermediate steps, rather than having to write those intermediate steps to disk.

NewImage

In practice, what this means is that if your version of Pig or Hive has been updated to also run on Tez, you can run your code unaltered on Tez and typically see a 2-3x performance improvement without any changes to your code or application logic. Hortonworks have been the main Hadoop vendor backing Tez and their new Hortonworks Data Platform 2.2 comes with Tez support, so I took my Pig script and ran it on a five-node cluster to get an initial timing and it took 4 min 17 secs to run, again generating the same five MapReduce jobs that I saw on the Cloudera CDH5 cluster. Then, running it using Tez as the execution engine (pig -x tez analyzeblog.pig) the same script took 2mins 25secs to run, around twice as fast as when we ran it using regular MapReduce.

NewImage

 

And Tez is great for getting existing Pig and Hive scripts to run faster – if your platform supports it, you should use it instead of MapReduce as your execution engine; MapReduce code submitted “as is” will benefit from better YARN container re-use, and Hive and Pig scripts that run on the Tez execution engine can run as a single DAG and use memory, rather than disk, to pass data between jobs in the DAG. For ETL and analytic jobs that you’re creating from new though, Apache Spark is arguably the framework you should look to use instead of Tez, and tomorrow we’ll find out why.

Going Beyond MapReduce for Hadoop ETL Pt.1 : Why MapReduce Is Only for Batch Processing

Over the previous few months I’ve been looking at the various ways you can load data into Hadoop, process it and then report on it using Oracle tools. We’ve looked at Apache Hive and how it provides a SQL layer over Hadoop, making it possible for tools like ODI and OBIEE to use their usual SQL set-based process approach to access Hadoop data; later on, we looked at another Hadoop tool, Apache Pig, which provides a more dataflow-type language over Hadoop for when you want to create step-by-step data pipelines for processing data. Under the covers, both Hive and Pig generate Java MapReduce code to actually move data around, with MapReduce then working hand-in-hand with the Hadoop framework to run your jobs in parallel across the cluster.

But MapReduce can be slow; it’s designed for very large datasets and batch processing, with overall analysis tasks broken-down into individual map and reduce tasks that start by reading data off disk, do their thing and then write the intermediate results back to disk again.

NewImage

Whilst this approach means the system is extremely fault-tolerant and effectively infinitely-scalable, this writing to disk of each step in the process means that MapReduce jobs typically take a long time to run and don’t really take advantage of the RAM that’s available in today’s commodity servers. Whilst this is a limitation most early adopters of Hadoop were happy to live with (in exchange for being able to analyse cheaply data on a scale previously unheard of), over the past few years as Hadoop adoption has broadened there’s been a number of initiatives to move Hadoop past it’s batch processing roots and into something more real-time that does more of its processing in-memory. Whilst there are whole bunch of projects and products out there that claim to improve the speed of Hadoop processing and bring in-memory capabilities – Apache Drill, Cloudera Impala, Oracle’s Big Data SQL are just some examples – the two that are probably of most interest to Hadoop customers working in an Oracle environment are called Apache Spark, and Apache Tez. But before we get into the details of Spark, Tez and how they improve over MapReduce, let’s take a look at why MapReduce can be slow.

MapReduce and Hadoop 1.0 – Scalable, Fault-Tolerant, but Aimed at Batch Processing

Going back to MapReduce and what’s now termed “Hadoop 1.0”, MapReduce works on the principle of breaking larger jobs down into lots of smaller ones, with each one running independently and persisting its results back to disk at the end to ensure data doesn’t get lost if a server node breaks down. To take an example, the Apache Pig script below reads in some webserver log files, parses and filters them, aggregates the data and then joins it to another Hadoop dataset before outputting the results to a directory in the HDFS storage layer:

register /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar
raw_logs = LOAD '/user/mrittman/rm_logs' USING TextLoader AS (line:chararray);
logs_base = FOREACH raw_logs
GENERATE FLATTEN
  (REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')
)AS
  (remoteAddr: chararray, remoteLogname: chararray, user: chararray,time: chararray, request: chararray, status: chararray, bytes_string: chararray,referrer:chararray,browser: chararray);
logs_base_nobots = FILTER logs_base BY NOT (browser matches '.*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*');
logs_base_page = FOREACH logs_base_nobots GENERATE SUBSTRING(time,0,2) as day, SUBSTRING(time,3,6) as month, SUBSTRING(time,7,11) as year, FLATTEN(STRSPLIT(request,' ',5)) AS (method:chararray, request_page:chararray, protocol:chararray), remoteAddr, status;
logs_base_page_cleaned = FILTER logs_base_page BY NOT (SUBSTRING(request_page,0,3) == '/wp' or request_page == '/' or SUBSTRING(request_page,0,7) == '/files/' or SUBSTRING(request_page,0,12) == '/favicon.ico');
logs_base_page_cleaned_by_page = GROUP logs_base_page_cleaned BY request_page;
page_count = FOREACH logs_base_page_cleaned_by_page GENERATE FLATTEN(group) as request_page, COUNT(logs_base_page_cleaned) as hits;
page_count_sorted = ORDER page_count BY hits DESC;
page_count_top_10 = LIMIT page_count_sorted 10;
posts = LOAD '/user/mrittman/posts.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage() as (post_id:int,title:chararray,post_date:chararray,post_type:chararray,author:chararray,url:chararray,generated_url:chararray);
posts_cleaned = FOREACH posts GENERATE CONCAT(generated_url,'/') as page_url,author as author, title as title;
pages_and_post_details = JOIN page_count by request_page, posts_cleaned by page_url;
pages_and_posts_trim = FOREACH pages_and_post_details GENERATE page_count::request_page as request_page, posts_cleaned::author as author, posts_cleaned::title as title, page_count::hits as hits;
pages_and_posts_sorted = ORDER pages_and_posts_trim BY hits DESC;
pages_and_post_top_10 = LIMIT pages_and_posts_sorted 10;
store pages_and_post_top_10 into 'top_10s/pages';

Pig works by defining what are called “relations” or “aliases”, similar to tables in SQL that contain data or pointers to data. You start by loading data into a relation from a file or other source, and then progressively define further relations take that initial dataset and apply filters, use transformations, re-orientate the data or join it to other relations until you’ve arrived at the final set of data you’re looking for. In this example we start with raw log data, parse it, filter out bit and spider activity, project just the columns we’re interested in and then remove further “noise” from the logs, then join it to reference data and finally return the top ten pages over that period based on total hits.

NewImage

Pig uses something called “lazy evaluation”, where relations you define don’t necessarily get created when they’re defined in the script; instead they’re used as a pointer to data and instructions on how to produce it if needed, with the Pig interpreter only materializing a dataset when it absolutely has to (for example, when you ask it to store a dataset on disk or output to console). Moreover, all the steps leading up to the final dataset you’ve requested are considered as a whole, giving Pig the ability to merge steps, miss out steps completely if they’re not actually needed to produce the final output, and otherwise optimize the flow data through the process.

Running the Pig script and then looking at the console from the script running the Grunt command-line interpreter, you can see that five separate MapReduce jobs were generated to load in the data, filter join and transform it, and then produce the output we requested at the end.

JobId                  Maps Reduces Alias                                                                                                               Feature Outputs
job_1417127396023_0145 12   2       logs_base,logs_base_nobots,logs_base_page,logs_base_page_cleaned,logs_base_page_cleaned_by_page,page_count,raw_logs GROUP_BY,COMBINER 
job_1417127396023_0146 2    1       pages_and_post_details,pages_and_posts_trim,posts,posts_cleaned                                                     HASH_JOIN 
job_1417127396023_0147 1    1       pages_and_posts_sorted                                                                                              SAMPLER 
job_1417127396023_0148 1    1       pages_and_posts_sorted                                                                                              ORDER_BY,COMBINER 
job_1417127396023_0149 1    1       pages_and_posts_sorted                                                                                              hdfs://bdanode1....pages2,

Pig generated five separate MapReduce jobs that loaded, parsed, filtered, aggregated and joined the datasets as part of an overall data “pipeline”, with the intermediate results staged to disk before the next MapReduce job took over. On my six-node CDH5.2 VM cluster it took just over five minutes to load, process and aggregate 5m records from our site’s webserver.

NewImage

Now the advantage of this approach is that its more or less infinitely scalable and certainly resilient, but whilst Pig can look at your overall dataflow “graph” and come up with an optimal efficient way to get to your end result, MapReduce treats every step as atomic and separate and insists on writing every intermediate step to disk before moving on.

What this means in-practice is that ETL routines that use Pig, Hive and MapReduce whilst scaling well, never really get to the point where you can run them as micro-batches or in real-time. For that type of scenario we need to look at moving away from MapReduce and breaking the link between Hadoop (the platform, the cluster management and resource handling part) and the processing that runs on it, so that we can run alternative execution engines on the Hadoop platform such as Apache Tez, which we’ll cover in tomorrow’s post.