Category Archives: Rittman Mead
ETL Offload with Spark and Amazon EMR – Part 3 – Running pySpark on EMR
In the previous articles (here, and here) I gave the background to a project we did for a client, exploring the benefits of Spark-based ETL processing running on Amazon's Elastic Map Reduce (EMR) Hadoop platform. The proof of concept we ran was on a very simple requirement, taking inbound files from a third party, joining to them to some reference data, and then making the result available for analysis.
I showed here how I built up the prototype PySpark code on my local machine, using Docker to quickly and easily make available the full development environment needed.
Now it's time to get it running on a proper Hadoop platform. Since the client we were working with already have a big presence on Amazon Web Services (AWS), using Amazon's Hadoop platform made sense. Amazon's Elastic Map Reduce, commonly known as EMR, is a fully configured Hadoop cluster. You can specify the size of the cluster and vary it as you want (hence, "Elastic"). One of the very powerful features of it is that being a cloud service, you can provision it on demand, run your workload, and then shut it down. Instead of having a rack of physical servers running your Hadoop platform, you can instead spin up EMR whenever you want to do some processing - to a size appropriate to the processing required - and only pay for the processing time that you need.
Moving my locally-developed PySpark code to run on EMR should be easy, since they're both running Spark. Should be easy, right? Well, this is where it gets - as we say in the trade - "interesting". Part of my challenges were down to the learning curve in being new to this set of technology. However, others I would point to more as being examples of where the brave new world of Big Data tooling becomes less an exercise in exciting endless possibilities and more stubbornly Googling errors due to JAR clashes and software version mismatches...
Provisioning EMR
Whilst it's possible to make the entire execution of the PySpark job automated (including the provisioning of the EMR cluster itself), to start with I wanted to run it manually to check each step along the way.
To create an EMR cluster simply login to the EMR console and click Create
I used Amazon's EMR distribution, configured for Spark. You can also deploy a MapR-based hadoop platform, and use the Advanced tab to pick and mix the applications to deploy (such as Spark, Presto, etc).
The number and size of the nodes is configured here (I used the default, 3 machines of m3.xlarge spec), as is the SSH key. The latter is very important to get right, otherwise you won't be able to connect to your cluster over SSH.
Once you click Create cluster Amazon automagically provisions the underlying EC2 servers, and deploys and configures the software and Hadoop clustering across them. Anyone who's set up a Hadoop cluster will know that literally a one-click deploy of a cluster is a big deal!
If you're going to be connecting to the EMR cluster from your local machine you'll want to modify the security group assigned to it once provisioned and enable access to the necessary ports (e.g. for SSH) from your local IP.
Deploying the code
I developed the ETL code in Jupyter Notebooks, from where it's possible to export it to a variety of formats - including .py
Python script. All the comment blocks from the Notebook are carried across as inline code comments.
To transfer the Python code to the EMR cluster master node I initially used scp
, simply out of habit. But, a much more appropriate solution soon presented itself - S3! Not only is this a handy way of moving data around, but it comes into its own when we look at automating the EMR execution later on.
To upload a file to S3 you can use the S3 web interface, or a tool such as Cyberduck. Better, if you like the command line as I do, is the AWS CLI tools. Once installed, you can run this from your local machine:
aws s3 cp Acme.py s3://foobar-bucket/code/Acme.py
You'll see that the syntax is pretty much the same as the Linux cp
comand, specifying source and then destination. You can do a vast amount of AWS work from this command line tool - including provisioning EMR clusters, as we'll see shortly.
So with the code up on S3, I then SSH'd to the EMR master node (as the hadoop
user, not ec2-user
), and transfered it locally. One of the nice things about EMR is that it comes with your AWS security automagically configred. Whereas on my local machine I need to configure my AWS credentials in order to use any of the aws commands, on EMR the credentials are there already.
aws s3 cp s3://foobar-bucket/code/Acme.py ~
This copied the Python code down into the home folder of the hadoop
user.
Running the code - manually
To invoke the code, simply run:
spark-submit Acme.py
A very useful thing to use, if you aren't already, is GNU screen (or tmux, if that's your thing). GNU screen is installed by default on EMR (as it is on many modern Linux distros nowadays). Screen does lots of cool things, but of particular relevance here is it lets you close your SSH connection whilst keeping your session on the server open and running. You can then reconnect at a later time back to it, and pick up where you left off. Whilst you're disconnected, your session is still running and the work still being processed.
From the Spark console you can monitor the execution of the job running, as well as digging into the details of how it undertakes the work. See the EMR cluster home page on AWS for the Spark console URL
Problems encountered
I've worked in IT for 15 years now (gasp). Never has the phrase "The devil's in the detail" been more applicable than in the fast-moving world of big data tools. It's not suprising really given the staggering rate at which code is released that sometimes it's a bit quirky, or lacking what may be thought of as basic functionality (often in areas such as security). Each of these individual points could, I suppose, be explained away with a bit of RTFM - but the nett effect is that what on paper sounds simple took the best part of half a day and a LOT of Googling to resolve.
Bear in mind, this is code that ran just fine previously on my local development environment.
When using SigV4, you must specify a 'host' parameter
boto.s3.connection.HostRequiredError: BotoClientError: When using SigV4, you must specify a 'host' parameter.
To fix, switch
conn_s3 = boto.connect_s3()
for
conn_s3 = boto.connect_s3(host='s3.amazonaws.com')
You can see a list of endpoints here.
boto.exception.S3ResponseError: S3ResponseError: 400 Bad Request
Make sure you're specifying the correct hostname (see above) for the bucket's region. Determine the bucket's region from the S3 control panel, and then use the endpoint listed here.
Error: Partition column not found in schema
Strike this one off as bad programming on my part; in the step to write the processed file back to S3, I had partitionBy='',
in the save function
duplicates_df.coalesce(1).write.save(full_uri,
format='com.databricks.spark.csv',
header='false',
partitionBy='',
mode='overwrite')
This, along with the coalesce
(which combined all the partitions down to a single one) were wrong, and fixed by changing to:
duplicates_df.write.save(full_uri,
format='com.databricks.spark.csv',
header='false',
mode='overwrite')
Exception: Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions
To get the code to work on my local Docker/Jupyter development environment, I set an environment variable as part of the Python code to specify the Python executable:
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'
I removed this (along with all the PYSPARK_SUBMIT_ARGS
) and the code then ran fine.
Timestamp woes
In my original pySpark code I was letting it infer the schema from the source, which included it determining (correctly) that one of the columns was a timestamp. When it wrote the resulting processed file, it wrote the timestamp in a standard format (YYYY-MM-DD HH24:MI:SS). Redshift (of which more in the next article) was quite happy to process this as a timestamp, because it was one.
Once I moved the pySpark code to EMR, the Spark engine moved from my local 1.6 version to 2.0.0 - and the behaviour of the CSV writer changed. Instead of the format before, it switched to writing the timestamp in epoch form, and not just that but microseconds since epoch. Whilst Redshift could cope with epoch seconds, or milliseconds, it doesn't support microseconds, and the load job failed
Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS]
and then
Fails: Epoch time copy out of acceptable range of [-62167219200000, 253402300799999]
Whilst I did RTFM, it turns out that I read the wrong FM, taking the latest (2.0.1) instead of the version that EMR was running (2.0.0). And whilst 2.0.1 includes support for specifying the output timestampFormat, 2.0.0 doesn't.
In the end I changed the Spark job to not infer the schema, and so treat the timestamp as a string, thus writing it out in the same format. This was a successful workaround here, but if I'd needed to do some timestamp-based processing in the Spark job I'd have had to find another option.
Success!
I now had the ETL job running on Spark on EMR, processing multiple files in turn. Timings were approximately five minutes to process five files, half a million rows in total.
One important point to bear in mind through all of this is that I've gone with default settings throughout, and not made any effort to optimise the PySpark code. At this stage, it's simply proving the end-to-end process.
Automating the ETL
Having seen that the Spark job would run successfully manually, I now went to automate it. It's actually very simple to do. When you launch an EMR cluster, or indeed even if it's running, you can add a Step, such as a Spark job. You can also configure EMR to terminate itself once the step is complete.
From the EMR cluster create screen, switch to Advanced. Here you can specify exactly which applications you want deployed - and what steps to run. Remember how we copied the Acme.py
code to S3 earlier? Now's when it comes in handy! We simply point EMR at the S3 path and it will run that code for us - no need to do anything else. Once the code's finished executing, the EMR cluster will terminate itself.
After testing out this approach successfully, I took it one step further - command line invocation. AWS make this ridiculously easier, because from the home page of any EMR cluster (running or not) there is a button to click which gives you the full command to run to spin up another cluster with the exact same configuration
This gives us a command like this:
aws emr create-cluster \
--termination-protected \
--applications Name=Hadoop Name=Spark Name=ZooKeeper \
--tags 'owner=Robin Moffatt' \
--ec2-attributes '{"KeyName":"Test-Environment","InstanceProfile":"EMR_EC2_DefaultRole","AvailabilityZone":"us-east-1b","EmrManagedSlaveSecurityGroup":"sg-1eccd074","EmrManagedMasterSecurityGroup":"sg-d7cdd1bd"}' \
--service-role EMR_DefaultRole \
--enable-debugging \
--release-label emr-5.0.0 \
--log-uri 's3n://aws-logs-xxxxxxxxxx-us-east-1/elasticmapreduce/' \
--steps '[{"Args":["spark-submit","--deploy-mode","cluster","s3://foobar-bucket/code/Acme.py"],"Type":"CUSTOM_JAR","ActionOnFailure":"TERMINATE_CLUSTER","Jar":"command-runner.jar","Properties":"","Name":"Acme"}]' \
--name 'Rittman Mead Acme PoC' \
--instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master instance group - 1"},{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core instance group - 2"}]' \
--region us-east-1 \
--auto-terminate
This spins up an EMR cluster, runs the Spark job and waits for it to complete, and then terminates the cluster. Logs written by the Spark job get copied to S3, so that even once the cluster has been shutdown, the logs can still be accessed. Seperation of compute from storage - it makes a lot of sense. What's the point having a bunch of idle CPUs sat around just so that I can view the logs at some point if I want to?
The next logical step for this automation would be the automatic invocation of above process based on the presence of a defined number of files in the S3 bucket. Tools such as Lambda, Data Pipeline, and Simple Workflow Service are all ones that can help with this, and the broader management of ETL and data processing on AWS.
Spot Pricing
You can save money further with AWS by using Spot Pricing for EMR requests. Spot Pricing is used on Amazon's EC2 platform (on which EMR runs) as a way of utilising spare capacity. Instead of paying a fixed (higher) rate for some server time, you instead 'bid' at a (lower) rate and when the demand for capacity drops such that the spot price does too and your bid price is met, you get your turn on the hardware. If the spot price goes up again - your server gets killed.
Why spot pricing makes sense on EMR particularly is that Hadoop is designed to be fault-tolerant across distributed nodes. Whilst pulling the plug on an old-school database may end in tears, dropping a node from a Hadoop cluster may simply mean a delay in the processing whilst the particular piece of (distributed) work is restarted on another node.
Summary
We've developed out simple ETL application, and got it running on Amazon's EMR platform. Whilst we used AWS because it's the client's platform of choice, in general there's no reason we couldn't take it and run it on another Hadoop platform. This could be a Hadoop platform such as Oracle's Big Data Cloud Service, Cloudera's CDH running on Oracle's Big Data Appliance, or simply a self-managed Hadoop cluster on commodity hardware.
Processing time was in the region of 30 minutes to process 2M rows across 30 files, and in a separate batch run 3.8 hours to process 283 files of around 25M rows in total.
So far, the data that we've processed is only sat in a S3 bucket up in the cloud.
In the next article we'll look at what the options are for actually analysing the data and running reports against it.
ETL Offload with Spark and Amazon EMR – Part 2 – Code development with Notebooks and Docker
In the previous article I gave the background to a project we did for a client, exploring the benefits of Spark-based ETL processing running on Amazon's Elastic Map Reduce (EMR) Hadoop platform. The proof of concept we ran was on a very simple requirement, taking inbound files from a third party, joining to them to some reference data, and then making the result available for analysis. The primary focus was proving the end-to-end concept, with future iterations focussing on performance and design optimisations.
Here we'll see how I went about building up the ETL process.
Processing Overview
The processing needed to iterate over a set of files in S3, and for each one:
- Loads the file from S3
- Determines region from filename, and adds as column to data
- Deduplicates it
- Writes duplicates to separate file
- Loads sites reference data
- Extracts domain from URL string
- Joins facts with sites on domain
- Writes resulting file to S3
Once the data is processed and landed back to S3, we can run analytics on it. See subsequent articles for discussion of Redshift vs in-place querying with tools such as Presto.
Ticking All The Cool-Kid Boxes - Spark AND Notebooks AND Docker!
Whilst others in Rittman Mead have done lots of work with Spark, I myself was new to it, and needed a sandpit in which I could flail around without causing any real trouble. Thus I set up a nice self-contained development environment on my local machine, using Docker to provision and host it, and Jupyter Notebooks as the interface.
Notebooks
In a world in which it seems that there are a dozen cool new tools released every day, Interactive Notebooks are for me one of the most significant of recent times for any developer. They originate in the world of data science, where taking the 'science' bit at its word, data processing and exploration is written in a self-documenting manner. It makes it possible to follow how and why code was written, what the output at each stage was -- and to run it yourself too. By breaking code into chunks it makes it much easier to develop as well, since you can rerun and perfect each piece before moving on.
Notebooks are portable, meaning that you can work with them in one system, and then upload them for others to learn from and even import to run on their own systems. I've shared a simplified version of the notebook that I developed for this project on gist here, and you can see an embedded version of it at the end of this article.
The two most common are Apache Zeppelin, and Jupyter Notebooks (previously known as iPython Notebooks). Jupyter is the one I've used previously, and stuck with again here. To read more about notebooks and see them in action, see my previous blog posts here and here.
Docker
Plenty's been written about Docker. In a nutshell, it is a way to provision and host a set of self-contained software. It takes the idea of a virtual machine (such as VMWare, or VirtualBox), but without having to install an OS, and then the software, and then configure it all yourself. You simply take a "Dockerfile" that someone has prepared, and run it. You can create copies, or throwaway and start again, from a single command. I ran Docker on my Mac through Kitematic, and natively on my home server.
There are prebuilt Docker configuration files for lots of software (including Oracle and OBIEE!), and I found one that includes Spark, PySpark, and Jupyter - perfect!
To launch it, you simply enter:
docker run -d -p 18888:8888 jupyter/all-spark-notebook
This downloads all the necessary Docker files etc - you don't need anything local first, except Docker.
I ran it with an additional flag, -v
, configuring it to use a folder on my local machine to store the work that I created. By default all files reside within the Docker image itself - and get deleted when you delete the Docker instance.
docker run -d -p 18888:8888 -v /Users/rmoff/all-spark-notebook:/home/jovyan/work jupyter/all-spark-notebook
You can also run the container with an additional flag, GRANT_SUDO
, so that the guest user can run sudo commands within it. To do this include -e GRANT_SUDO=yes --user root
:
docker run -d -p 18888:8888 -e GRANT_SUDO=yes --user root -v /Users/rmoff/all-spark-notebook:/home/jovyan/work jupyter/all-spark-notebook
With the docker container running, you can access Jupyter notebooks on the port exposed in the command used to launch it (18888)
Getting Started with Jupyter
From Jupyter's main page you can see the files within the main folder (see above for how you can map this to a local folder on the machine hosting Docker). Using the New menu in the top-right you can create:
- Folders and edit Text files
- A terminal
- A notebook, running under one of several different 'Kernels' (host interpreters and environments)
The ability to run a terminal from Jupyter is very handy - particularly on Docker. Docker by its nature isn't really designed for interaction within the container itself. It's the point of Docker in a way, that it provisions and configures all the software for you. You can use Docker to run a bash shell directly, but it's more fiddly than just using the Jupyer Terminal.
I used a Python 2 notebook for my work; with this Docker image you also have the option of Python 3, Scala, and R.
Developing the Spark job
With my development environment up and running, I set to writing the Spark job. Because I'm already familiar with Python I took advantage of PySpark. Below I describe the steps in the processing and how I achieved them.
Environment Preparation
Define AWS parameters:
access_key='XXXXXXXXXXXXXXXXXX
secret='YYYYYYYYYYYYYYYYY'
bucket_name='foobar-bucket'
Set up the PySpark environment, including necessary JAR files for accessing S3 from Spark:
import os
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk-pom:1.10.34,com.databricks:spark-csv_2.11:1.3.0 pyspark-shell'
Create a spark context:
import pyspark
sc = pyspark.SparkContext('local[*]')
sqlContext = pyspark.SQLContext(sc)
Import Python libraries
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
import boto
from urlparse import urlsplit
Note that to install python libraries not present on the Docker image (such as boto, which is used for accessing AWS functionality from within Python) you can run from a Jupyter Terminal:
/opt/conda/envs/python2/bin/pip install boto
On other platforms the path to pip
will vary, but the install command is the same
Loading Data from S3
The source data comes from an S3 "bucket", on which there can be multiple "keys". Buckets and keys roughly translate to "disk drive" and "file".
We use the boto
library to interact with S3 to find a list of files ('keys') matching the pattern of source files that we want to process.
Connect to the bucket
conn_s3 = boto.connect_s3()
bucket = conn_s3.get_bucket(bucket_name)
Iterate over the bucket contents
This bit would drive iterative processing over multiple input files; for now it just picks the last file on the list (acme_file
getting set on each iteration and so remaining set after the loop)
contents=bucket.list(prefix='source_files/')
for f in contents:
print f.name
print f.size
acme_file = f.name
print "\n\n--\nFile to process: %s" % acme_file
Read the CSV from S3 into Spark dataframe
The Docker image I was using was running Spark 1.6, so I was using the Databricks CSV reader; in Spark 2 this is now available natively. The CSV file is loaded into a Spark data frame. Note that Spark is reading the CSV file directly from a S3 path.
full_uri = "s3n://{}/{}".format(bucket_name, acme_file)
print full_uri
s3n://foobar-bucket/source_files/acme_GB_20160803_100000.csv
acme_df = sqlContext.read.load(full_uri,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
acme_df.printSchema()
root
|-- product: string (nullable = true)
|-- product_desc: string (nullable = true)
|-- product_type: string (nullable = true)
|-- supplier: string (nullable = true)
|-- date_launched: timestamp (nullable = true)
|-- position: string (nullable = true)
|-- url: string (nullable = true)
|-- status: string (nullable = true)
|-- reject_reason: string (nullable = true)
The above shows the schema of the dataframe; Spark has infered this automagically from the column headers (for the column names), and then the data types within (note that it has correctly detected a timestamp in the date_launched
column)
Add country column to data frame
The filename of the source data includes a country field as part of it. Here we use this regular expression to extract it:
filename=os.path.split(acme_file)[1]
import re
m=re.search('acme_([^_]+)_.+$', filename)
if m is None:
country='NA'
else:
country=m.group(1)
print "Country determined from filename '%s' as : %s" % (filename,country)
Country determined from filename 'acme_GB_20160803_100000.csv' as : GB
With the country stored in a variable, we add it as a column to the data frame:
Note that the withColumn
function requires a Column value, which we create here using the PySpark lit
function that was imported earlier on.
acme_df=acme_df.withColumn('country',lit(country))
acme_df.printSchema()
root
|-- product: string (nullable = true)
|-- product_desc: string (nullable = true)
|-- product_type: string (nullable = true)
|-- supplier: string (nullable = true)
|-- date_launched: timestamp (nullable = true)
|-- position: string (nullable = true)
|-- url: string (nullable = true)
|-- status: string (nullable = true)
|-- reject_reason: string (nullable = true)
|-- country: string (nullable = false)
Note the new column added to the end of the schema.
Deduplication
Now that we've imported the file, we need to deduplicate it to remove entries with the same value for the url
field. Here I'm created a second dataframe based on a deduplication of the first, using the PySpark native function dropDuplicates
:
acme_deduped_df = acme_df.dropDuplicates(['url'])
For informational purposes we can see how many records are in the two dataframes, and determine how many duplicates there were:
orig_count = acme_df.count()
deduped_count = acme_deduped_df.count()
print "Original count: %d\nDeduplicated count: %d\n\n" % (orig_count,deduped_count)
print "Number of removed duplicate records: %d" % (orig_count - deduped_count)
Original count: 97974
Deduplicated count: 96706
Number of removed duplicate records: 1268
Deriving Domain from URL
One of the sets of reference data is information about the site on which the product was viewed. To bring these sets of attributes into the main dataset we join on the domain itself. To perform this join we need to derive the domain from the URL. We can do this using the python urlsplit
library, as seen in this example:
sample_url = 'https://www.rittmanmead.com/blog/2016/08/using-apache-drill-with-obiee-12c/'
print sample_url
print urlsplit(sample_url).netloc
https://www.rittmanmead.com/blog/2016/08/using-apache-drill-with-obiee-12c/
www.rittmanmead.com
We saw above that to add a column to the dataframe the withColumn
function can be used. However, to add a column that's based on another (rather than a literal, which is what the country column added above was) we need to use the udf function. This generates the necessary Column field based on the urlsplit
output for the associated url value.
First we define our own function which simply applies urlsplit
to the value passed to it
def getDomain(value):
return urlsplit(value).netloc
and then a UDF based on it:
udfgetDomain = udf(getDomain)
Finally, apply this to a third version of the dataframe:
acme_deduped_df_with_netloc = acme_deduped_df.withColumn("netloc", udfgetDomain(acme_deduped_df.url))
Joining to Second Dataset
Having preparing the primary dataset, we'll now join it to the reference data. The source of this is currently an Oracle database. For simplicity we're working with a CSV dump of the data, but PySpark supports the option to connect to sources with JDBC so we could query it directly if required.
First we import the sites reference data CSV:
sites_file = "s3n://{}/{}".format('foobar-bucket', 'sites.csv')
sites_df = sqlContext.read.load(sites_file,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
Then some light data cleansing with the filter
function to remove blank SITE
entries, and blank SITE_RETAIL_TYPE
entries:
sites_pruned_df = sites_df.filter("NOT (SITE ='' OR SITE_RETAIL_TYPE = '')")
Now we can do the join itself. This joins the original dataset (acme_deduped_df_with_netloc
) with the sites reference data (sites_pruned_df
), using a left outer join.
merged_df = acme_deduped_df_with_netloc.join(sites_pruned_df,acme_deduped_df_with_netloc.netloc == sites_pruned_df.SITE, 'left_outer')
Using the filter
function and show
we can inspect the dataset for matches, and misses:
First 10 matched:
merged_df.filter(merged_df.ID.isNotNull()).select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)
First 10 unmatched:
merged_df.filter(merged_df.ID.isNull()).select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)
Write Back to S3
The finished dataset is written back to S3. As before, we're using the databricks CSV writer here but in Spark 2 would be doing it natively:
acme_enriched_filename='acme_enriched/%s' % filename.replace('.csv','')
full_uri = "s3n://{}/{}".format(bucket_name, acme_enriched_filename)
print 'Writing enriched acme data to %s' % full_uri
merged_df.write.save(path=full_uri,
format='com.databricks.spark.csv',
header='false',
nullValue='null',
mode='overwrite')
Summary
With the above code written, I could process input files in a couple of minutes per 30MB file. Bear in mind two important constraints to this performance:
I was working with data residing up in the Amazon Cloud, with the associated network delay in transferring to and from it
The processing was taking place on a single node Spark deployment (on my laptop, under virtualisation), rather than the multiple-node configuration typically seen.
The next steps, as we'll see in the next article, were to port this code up to Amazon Elastic Map Reduce (EMR). Stay tuned!
Footnote: Notebooks FTW!
(FTW)
Whilst I've taken the code and written it out above more in the form of a blog post, I could have actually just posted the Notebook itself, and it wouldn't have needed much more explanation. Here it is, along with some bonus bits on using S3 from python:
ETL Offload with Spark and Amazon EMR – Part 1
We recently undertook a two-week Proof of Concept exercise for a client, evaluating whether their existing ETL processing could be done faster and more cheaply using Spark. They were also interested in whether something like Redshift would provide a suitable data warehouse platform for them. In this series of blog articles I will look at how we did this, and what we found.
Background
The client has an existing analytics architecture based primarily around Oracle database, Oracle Data Integrator (ODI), Oracle GoldenGate, and Oracle Business Intelligence Enterprise Edition (OBIEE), all running on Amazon EC2. The larger architecture in the organisation is all AWS based too.
Existing ETL processing for the system in question is done using ODI, loading data daily into a partitioned Oracle table, with OBIEE providing the reporting interface.
There were two aspects to the investigation that we did:
Primarily, what would an alternative platform for the ETL look like? With lots of coverage recently of the concept of "ETL offloading" and "Apache-based ETL", the client was keen to understand how they might take advantage of this
Within this, key considerations were:
- Cost
- Scalability
- Maintenance
- Fit with existing and future architectures
The second aspect was to investigate whether the performance of the existing reporting could be improved. Despite having data for multiple years in Oracle, queries were too slow to provide information other than within a period of a few days.
Oracle licenses were a sensitive point for the client, who were keen to reduce - or at least, avoid increased - costs. ODI for Big Data requires additional licence, and so was not in scope for the initial investigation.
Data and Processing
The client uses their data to report on the level of requests for different products, including questions such as:
- How many requests were there per day?
- How many requests per product type in a given period?
- For a given product, how many requests were there, from which country?
Data volumes were approximately 50MB, arriving in batch files every hour. Reporting requirements were previous day and before only. Being able to see data intra-day would be a bonus but was not a requirement.
High Level Approach
Since the client already uses Amazon Web Services (AWS) for all its infrastructure, it made sense to remain in the AWS realm for the first round of investigation. We broke the overall requirement down into pieces, so as to understand (a) the most appropriate tool at each point and (b) the toolset with best overall fit. A very useful reference for an Amazon-based big data design is the presentation Big Data Architectural Patterns and Best Practices on AWS. Even if you're not running on AWS, the presentation has some useful pointers for things like where to be storing your data based on volumes, frequency of access, etc.
Data Ingest
The starting point for the data was Amazon's storage service - S3, in which the data files in CSV format are landed by an external process every hour.
Processing (Compute)
Currently the processing is done by loading the external data into a partitioned Oracle table, and resolving dimension joins and de-duplication at query time.
Taking away any assumptions, other than a focus on 'new' technologies (and a bias towards AWS where appropriate), we considered:
- Switch out Oracle for Redshift, and resolve the joins and de-duplication there
- Loading the data to Redshift would be easy, but would be switching one RDBMS-based solution for another. Part of the aim of the exercise was to review a broader solution landscape than this.
Use Hadoop-based processing, running on Elastic Map Reduce (EMR):
- Hive QL to process the data on S3 (or HDFS on EMR)
- Not investigated, because provides none of the error handling etc that Spark would, and Spark has SparkSQL for any work that needs doing in SQL.
- Pig
- Still used, but 'old' technology, somewhat esoteric language, and superseded by Spark
- Spark
- Support for several languages including commonly-used ones such as Python
- Gaining increasing levels of adoption in the industry
- Opens up rich eco-system of processing possibilities with related projects such as Machine Learning, and Graph.
- Hive QL to process the data on S3 (or HDFS on EMR)
We opted to use Spark to process the files, joining them to the reference data, and carrying out de-duplication. For a great background and discussion on Spark and its current place in data architectures, have a listen to this podcast.
Storage
The output from Spark was written back to S3.
Analytics
With the processed data in S3, we evaluated two options here:
- Load it to Redshift for query
- Query in-place with a SQL-on-Hadoop engine such as Presto or Impala
- With the data at rest on S3, Amazon's Athena is also of interest here, but was released after we carried out this particular investigation.
The presumption was that OBIEE would continue to provide the front-end to the analytics. Oracle's Data Visualization Desktop tool was also of interest.
In the next post we'll see the development environment that we used for prototyping. Stay tuned!
The Visual Plugin Pack for OBIEE
Last week we announced the Rittman Mead Open Source project, and released into open source:
- the excellent Insights project, a javascript API/framework for building a new frontend for OBIEE, written by Minesh Patel
- Enhanced usage tracking for OBIEE, to track click-by-click how your users interact with the application
Today it is the turn of the Visual Plugin Pack.....
What is the Visual Plugin Pack for OBIEE ?
Visual Plugin Pack (VPP) is a means by which users of OBIEE Answers can use custom JavaScript visualisations without having to write any javascript!
It is a framework that enables developers to build Javascript visualisation plugins, that report builders can then utilise and configure through native OBIEE user interface.
I want to point this out from the very start, that despite its name, the Visual Plugin Pack is not a pack of all-singing, all-dancing, super-duper visualisations for OBIEE.
Instead, VPP should be thought of as a framework that allows you to quickly develop and integrate all-singing, all-dancing, super-duper visualisations that will work natively within OBIEE.
Subtle difference, but an important one.
So what does it do ?
Essentially, VPP allows you to accelerate the development and deployment of custom, configurable and reusable OBIEE JavaScript visualisations.
Back in 2013 I wrote this post describing how to embed a D3 Visualisation within OBIEE. The same method will still work today, but it's a cumbersome process and requires heavy use of the narrative form, which let's be honest, is a painful experience when it comes to JavaScript development.
Some drawbacks with this method:
- Code editing in the Narrative view is not productive.
- Reusing visualisations in other analyses requires the copying and pasting of code.
- Basic Visualisation configuration changes, for example, width, height, colours, font etc requires code changes.
- Remapping Column bindings requires code changes.
- JavaScript library dependencies and load order can be tricky to manage.
The Visual Plugin Pack attempts to address these issues by abstracting away the complexities of the Narrative form and allowing developers to focus on visualisation code, not OBIEE integration code.
If you choose to use VPP for your visualisations then you will never have to touch the narrative form, all visualisation development can take place outside of OBIEE in your favourite code editor and deployed to Weblogic when you are done.
VPP also allows you to define design-time controls that affect column bindings and visualisation behaviour. The example visualisation below has been written to accept 5 column bindings and 1 configuration component, which controls the visualisation size. You can create as many column bindings and configuration components as you need
How do I get started ?
You can download or fork the repository from here.
Installation and developer guides can be found on the wiki:-
There are several visualisations that come bundled with VPP, some more polished than others, but they should serve as good examples that can be enhanced further.
Summary
If you've got some in-house JavaScript skills and are looking to develop and integrate custom visualisations into OBIEE, then VPP can help alleviate a lot of the frustrations associated with the traditional method. Once you're up and running you'll be able to develop faster, integrate quickly and share your visualisations with all OBIEE report writers.
If you'd like to discuss how Rittman Mead can help with deployment or assist with custom visualisation development feel free to contact us.