Tag Archives: Big Data

Introduction to Hadoop HDFS (and writing to it with node.js)

The core Hadoop project solves two problems with big data – fast, reliable storage and batch processing.
For this first post on Hadoop we are going to focus on the default storage engine and how to integrate with it using its REST API. Hadoop is actually quite easy to install so let’s see what we can do in 15 minutes. I’ve assumed some knowledge of the Unix shell but hopefully it’s not too difficult to follow – the software a versions are listed in the previous post.

If you’re completely new to Hadoop three things worth knowing are…

  • The default storage engine is HDFS – a distributed file system with directories and files (ls, mkdir, rm, etc)
  • Data written to HDFS is immutable – although there is some support for appends
  • HDFS is suited for large files – avoid lots of small files

If you think about batch processing billions of records, large and immutable files make sense. You don’t want the disk spending time doing random access and dealing with fragmented data if you can stream the whole lot from beginning to end.
Files are split in to blocks so that nodes can process files in parallel using map-reduce. By default a Hadoop cluster will replicate each file block to 3 nodes and each file block can take up to the configured block size (~64M).

Starting up a local Hadoop instance for development is pretty simple and even easier as we’re only going to start half of it. The only setting that’s needed is the host and port where the HDFS master ‘namenode’ will exist but we’ll add a property for the location of the filesystem too.

After downloading and unpacking Hadoop add the following under the <configuration> tags in core-site.xml…

conf/core-site.xml:

<property>
  <name>fs.default.name</name>   <value>hdfs://localhost:9000</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>   <value>/home/${user.name}/hdfs-filesystem</value>
</property>

Add your Hadoop bin directory to the PATH

export PATH=$PWD/hadoop-1.0.4/bin:$PATH

The only other step before starting Hadoop is to format the filesystem…

hadoop namenode -format

Hadoop normally runs with a master and many slaves. The master ‘namenode’ tracks the location of file blocks and the files they represent and the slave ‘datanodes’ just store file blocks. To start with we’ll run both a master and a slave on the same machine…

# start the master namenode
hadoop-daemon.sh start namenode
# start a slave datanode
hadoop-daemon.sh start datanode

At this point we can write some data to the filesystem…

hadoop dfs -put /etc/hosts /test/hosts-file
hadoop dfs -ls /test

You can check that the hadoop daemons are running correctly by running jps (Java ps). Shutting down the daemons can be done quickly with a ctrl-c or killall java – do this now.

To add data we’ll be using the WebHDFS REST api with node.js as both are simple but still fast.

First we need to enable the WebHDFS and Append features in HDFS. Append has some issues and has been disabled in 1.1.x so make sure you are using 1.0.4. It should be back in 2.x and should be fine for our use – this is what Hadoop development is like! Add the following properties…

conf/hdfs-site.xml

<property>
  <name>dfs.webhdfs.enabled</name>
  <value>true</value>
</property>
<property>
  <name>dfs.support.append</name>
  <value>true</value>
</property>

Restart HDFS…

killall java
hadoop-daemon.sh start namenode && hadoop-daemon.sh start datanode

Before loading data we need to create the file that will store the JSON. We’ll append all incoming data to this file…

hadoop dfs -touchz /test/feed.data
hadoop dfs -ls /test

If you see the message “Name node is in safe mode” then just wait for a minute as the namenode is still starting up.

Next download node.js (http://nodejs.org/download/) – if you’re using Unix you can ‘export PATH’ in the same way we did for hadoop.

export PATH=$PWD/node-v0.10.0-linux-x64/bin/:$PATH

Scripting in node.js is very quick thanks to the large number of packages developed by users. Obviously the quality can vary but for quick prototypes there always seems to be a package for anything. All we need to start is an empty directory where the packages and our script will be installed. I’ve picked three packages that will help us…

mkdir hdfs-example
cd hdfs-example
npm install node-webhdfs
npm install ntwitter
npm install syncqueue

The webhdfs and twitter packages are obvious but I’ve also used the syncqueue package so that only one append command is sent at a time – Javascript is asynchronous. To use these create and edit a file named twitter.js and add….

var hdfs = new (require("node-webhdfs")).WebHDFSClient({ user: process.env.USER, namenode_host: "localhost", namenode_port: 50070 });
var twitter = require("ntwitter");
var SyncQueue = require("syncqueue");
var hdfsFile = "/test/feed.data";

// make appending synchronous
var queue = new SyncQueue();

// get your developer keys from: https://dev.twitter.com/apps/new
var twit = new twitter({
  consumer_key: "keykeykeykeykeykey",
  consumer_secret: "secretsecretsecretsecret",
  access_token_key: "keykeykeykeykeykey",
  access_token_secret: "secretsecretsecretsecret"
});

twit.stream("statuses/filter", {"track":"hadoop,big data"}, function(stream) {
  stream.on("data", function (data) {
    queue.push(function(done) {
      console.log(data.text);
      hdfs.append(hdfsFile, JSON.stringify(data), function (err, success) {
        if (err instanceof Error) { console.log(err); }
         done();
      });
    });
  });
});

And run node twitter.js

Now sit back and watch the data flow – here we’re filtering on “hadoop,big data” but you might want to choose a different query or even a different source – eg. tail a local log file, call a web service, run a webserver.

Introducing some Short Tutorials for Hadoop

HadoopMarpreduceServerClusterI’ve recently joined Rittman Mead and part of my job will be looking at ‘Big Data’ technologies. This includes looking at how we can apply technologies to manage big data sets whether it be lightweight (but large) key-value stores, capturing and moving data or running batch jobs. My background is primarily in Java development and I’ve spent a lot of time working with many open source tools and open standards that make development easier.

The open source tools that are symbolic of the term ‘Big Data’ are constantly evolving, providing better features and performance. They are unstable, not in terms of quality but in how their APIs and general best practices change so quickly. Fortunately the dust is starting to settle on the core projects so if you’ve not had the time to work with the tools it’s only been getting easier and you’re in a good position.

Over the next few posts I’ll introduce Hadoop along with a few other open source tools that can be used together to quickly develop applications. Hadoop appears to have a steep learning curve and even installing it can look tricky. It’s actually quite easy to start a development environment and with packages like those from Cloudera it’s becoming much easier and quicker to set up production clusters.

Our example project will stream data from twitter using their API that will then be stored in raw form in Hadoop. After we can look a the ways we can transform and process that data using other tools like Hive, Pig and HBase and Oracle NoSQL. We’ll also be using some other open source tools to help so even if you never use Hadoop I hope they might be interesting.

In the first post we’ll only start up half of Hadoop – this will include two daemons that will provide the distributed file store. Another two daemons are needed for the Map-Reduce framework which is used for running batch processes and we’ll look at these in a later post.

To add data to the file store we’ll use the popular server side javascript engine node.js. This isn’t related to Hadoop but it demonstrates how we can move data between two web services with a dozen lines of code.

I’ll be writing the steps with the following configuration listed below. Of course most of the examples I create will work with most versions of Hadoop and the other big data tools, but for reference, the versions I’ll be using will be as follows:

That’s it for now – back soon with the first example.

Big Data and the Oracle Reference Architecture.

Last week I travelled from Europe to present at the RMOUG Training Days event In Denver, Colorado. As I blogged a couple of weeks back, this is one of my favourite user group conferences and it never fails to impress me. I expect to be wowed even more next year as they clock up their quarter century!

This year I presented twice: “Realtime Data Warehouse Tuning” and “Extending the Oracle Reference Data Architecture. More on tuning in another blog posting, but for now some thoughts on Big Data and Architecture. If you are interested in seeing my paper it is available on the Rittman Mead website here. The slides originally published on the RMOUG conference website were revised to incorporate some new graphics and information from Oracle’s white paper on Information Management and Big Data A Reference Architecture which was published just as I flew out to the USA. Fortunately, most of my original paper matched the thinking behind the new white paper. One change of note is the new description for the foundation layer – it is now “Immutable Enterprise Data with Full History” – see the image clipped from Oracle’s new white paper.

NewImage

I think the new definition is a far more clear description of the Foundation Layer than using terms such as 3NF. After all architecture is more about reasons to do rather than how to do.

One of my early slides in the Big Data section of my RMOUG talk covers what I think Big Data means. Ask a dozen people what is meant by “Big Data” and you will probably hear at least two dozen answers. What is that stops data warehouses being “big data”, since they are both big (they can be really big) and contain data?

Some people argue that big data data is unstructured data. However, to my mind for data to be useful information it must have some degree of discernible structure, that is, it must be capable of being analysed or else it is just noise.  Obviously, text has structure and meaning both from the ordering of characters to form words and the order of words to make coherent blocks of text. Likewise, digital data from from telemetry also has meaning. Harder to analyse are audio and video feeds, but even this is becoming commonplace both in the consumer marketplace and business; I have apps on my Macbook that tag photos based on who the software believes is in the picture and apps on my iPhone that identify and download music from an audio clip recorded on the phone. Business and government users do the similar things be it identify people in a crowd or to transcribe voice.

People often speak of the 4 or 5 “V”s of big data:

  • Volume – large amounts of data
  • Velocity – fast arriving data
  • Variety – it comes in all types of structure (including “no” structure)
  • Value - there is “worth” in processing the information

Add to the mix that optional “V”, number 5, Veracity - that the data is trustworthy. However, in reality, Value, Volume, Velocity, and Veracity are all normal requirements in realtime data warehousing and enterprise scale ERP implementations so are not unique to big data. So perhaps Variety might be the key differentiator. In truth there will be little variety in the data being handled; if you develop a process to do sentiment analysis on a Twitter feed you are extremely unlikely to come across the odd digitized X-ray image or smart meter reading lurking in that feed. This is very similar to the “variety” we come across in developing any other ETL process in data warehousing.

As you see, I struggle with some of the usual definitions of big data as opposed to large data sets. For me, the key to Big Data is what we intend to do with it. If it is important to know the exact value of a single item (for example billing from smart metering) then it is not big data, it is instead large volume transactional data. If the exact value of a single data item is less important than deriving a statistical picture of the whole data set we are in the realms of big data; if we lose a single record it is probably not crucial to our analysis, if we fail to send a short validity coupon to someone’s smartphone as they walk past our store (or better yet as they pass the competitor’s store at the other end of the same shopping mall) then it probably does not matter.

ODI11g in the Enterprise Part 2 : Data Integration using Essbase, Messaging, and Big Data Sources and Targets

In this five part series over the Christmas break, I’m taking a look at some of the lesser-known features in Oracle Data Integrator 11g, focusing on “enterprise” features such as build automation, scripting, data quality and middleware integration. If you’ve arrived at this post via a Google search, here’s the links to the other posts in this short series:

So the first post in this series was titled “Beyond Data Warehouse Table Loading”, and that’s where we’ll start this week’s look into ODI 11g in the Enterprise. If you’ve had any exposure to ODI you’ll probably notice that it’s primarily a relational database data integration tool, using concepts such as joins, filters and expressions to transform data together using table-like data objects called “data stores”. But ODI can extract from, transform and load into much more than relational databases, supporting sources and targets such as flat file and XML files, web services, JMS queues and pretty-much anything that provides a JDBC driver. In the context of Oracle projects though, there are three types of non-relational sources and targets that are particularly relevant; Oracle Essbase and Hyperion Planning multidimensional databases; message and service bus integration in a Service-Orientated Architecture (SOA) environment; and big data sources and targets such as Hadoop and NoSQL database types.

Prior to the Oracle acquisition, there were a number of different routes available to Essbase and Hyperion Planning customers looking to load one of their databases or applications. At the most basic level, “rules files” can still be used to define SQL queries to load data into Essbase outlines and databases, either written manually or auto-generated using tools such as Essbase Studio. But rules files aren’t the best place to embed ETL logic and transformations (see this presentation by Cameron Lackpour for some good background on the topic), and over the past few years Oracle have transitioned from an Informatica-based solution for Essbase ETL towards Oracle Data Integrator, which is now the “strategic” ETL option for loading Essbase databases and planning applications. As shown in the diagram below, ODI 11g can be used to load Essbase databases, Hyperion Planning applications along with HFM applications through their own individual APIs, with ODI providing a set of code template “knowledge modules” that allows developers to write to, and read from, abstracted data objects and with ODI under the covers issuing the correct API calls to load the various applications.

ODI and Hyperion

Loading an Essbase database using ODI isn’t the most intuitive process for Essbase developers as ODI represents the Essbase database as a set of table/column views, but some good groundwork has been done in the developer community to establish best practices and standard workflows, and it’ll be interesting to see how Essbase support evolves over time within ODI. In the screenshots below, you can see and Essbase database outline translated into a generic ODI “Model”, and an Essbase account dimension transformed into a generic datastore and columns. At the bottom is a set of target loading properties setting out how rules files and logs are used, whilst on the right you can see a typical Hyperion Planning interface “flow” moving data from a staging data into the Planning Essbase database and metadata tables.

Essbase and Planning within ODI

Some good sources of information on Essbase and Planning ETL using ODI are, for example:

Another area we’re seeing lots of uptake of ODI within is Fusion Middleware projects, particularly ones that use messaging and other SOA-type features to move data around the enterprise. Most SOA projects only have a requirement to move small, message-size bits of information around from service to service, and would typically use products such as BPEL and Oracle Enterprise Service Bus to provide communication between services, but when large sets of data need to be moved then Oracle Data Integrator comes in handy.

To take a typical example, in the diagram below data from an Orders database (1) can take two routes through a BPEL process depending on the size of the message payload. For regular, small messages the standard approach of handling the message through BPEL steps is used (2), whereas when the payload is large, it’s passed by reference to ODI (3), which then loads it (4), transforms (5) and sends it where it’s required (6), notifying the calling BPEL/ESB routine when the job is complete (7), allowing the wider BPEL routine to complete (8). ODI has a full web service interface that allows packages and other ETL processes to be called from BPEL invoke activities, and can read from queues, web services and other sources as part of a wider SOA/Fusion Middleware deployment.

ODI for Transforming Large Payloads

There’s actually a couple of other data integration products within Fusion Middleware 11g that you might come across when looking at SOA environments; ODI 11g is mainly for bulk-data movements as we’ve seen already, whereas another product called “Oracle Data Service Integrator”, brought across from the BEA acquisition, provides a means to perform data service federation within a SOA project. For anyone familiar with OBIEE, data federation is a familiar concept and is all about integrating data “in-place”, in real-time, in the case of OSDI providing calling applications with a federated, transformed and combined view of data without actually staging it into a database en-route. OSDI is quite an interesting product and is due for an update shortly, and is typically called from products like BPEL or ESB (or as in the diagram below, potentially from a BI tool like OBIEE) to provide a current, real-time, updatable view of the enterprise’s data, typically based on object-orientated data sources that don’t readily translate into the table-and-column structured favoured by ODI.

OSDI and SOA Architectures

What this then leads to in more complex Fusion Middleware 11g / SOA deployments is ODI being used for bulk-data movement, called and communicated with using web service calls, and Oracle Data Service Integrator then used to create abstracted, updatable services over live data for more general messaging and data integration. Note also the use of Oracle Enterprise Data Quality, which we’ll cover in tomorrow’s post in this series.

OSDI and ODI in a SOA Environment

The other area that has seen a lot of innovation around ODI is around big data; specifically, the ability to work with Hadoop clusters to leverage their ability to process huge unstructured datasets in parallel, as a kind of supercharged “staging area” for a more conventional Oracle-based data warehouse. I’ll cover Oracle and big data in more detail early in 2013, but at a high-level Oracle has had a lot of activity in the big data space over the past couple of years, partnering with Cloudera to resell their Cloudera Hadoop distribution and management tools, creating a new engineered system called Oracle Big Data Appliance, and creating new tools and products such as their own NoSQL database, connectors from Hadoop and HDFS to the Oracle database, extending open-source R to create Oracle R Enterprise, and investing in new products such as Oracle Endeca Information Discovery that major on unstructured, textual sources.

The role that ODI plays in this big data architecture is as a means to get data and insights out of Hadoop and NoSQL databases into a regular Oracle data warehouse, and as a way to load Hadoop clusters with additional data from an Oracle system to complement big data sources such as log files, meter readings, tracking data and other high-volume, low-granularity non-traditional data sets. The graphic below from Oracle sets out where ODI sits in this type of arrangement, with ODI typically using a set of Hadoop-specific knowledge modules along with the Oracle Connector for Hadoop to gain access to these new sources and targets.

ODI and Big Data

So when most people talk about “big data”, what they are generally referring to is Hadoop, an open-source project for co-ordinating data processing jobs across large clusters of commodity servers, and MapReduce, a programming approach that provides a framework and API for creating massively-parallel data analysis routines. We’ll cover Hadoop and MapReduce in a lot more detail in some postings early in 2013, but suffice to say that whilst Hadoop and MapReduce have greatly simplified the process of creating parallel-processing Java applications, creating MapReduce routines is still typically beyond most end-users and developers and so over the past few years, sub-projects within Apache Hadoop have come about to provide a SQL-style interface over Hadoop/MapReduce (Apache Hive), data loading routines (Apache Sqoop), scripting languages to create MapReduce programs (Apache Pig) and so forth. Oracle also partner with Cloudera to provide the Hadoop elements of their Big Data Applicance, and there are other vendors such as Hortonworks and MapR who are trying to make Hadoop more accessible to end-users, and more suited to deployment in the enterprise – see this blog post by Curt Monash on the different “enterprise Hadoop” vendors in the market earlier in 2012 for more details on who’s who and what products you should look at.

The way that ODI 11g accesses Hadoop clusters and MapReduce routines then, is to use the Apache Hive SQL (HiveQL) and metadata layer to provide a relational-like view over data in the Hadoop cluster, with Hive in turn generating MapReduce routines to transform its data and return data back to ODI for further processing and integration. Together with a set of Hadoop/Hive-specific knowledge modules, a JDBC driver for Hive and Oracle Loader for Hadoop, a utility for getting data out of Hadoop and into and Oracle database, ODI can generate SQL-like data extraction and transformation code, and have Hive do the work of creating and running the MapReduce routines used to filter, aggregate and transform data in the Hadoop cluster, as shown in the diagram below.

ODI and Hive/MapReduceTo take the canonical example of web log analysis (the first major Hadoop use case, developed by Yahoo and Google to analyze thousands of web search logs in parallel to determine most popular sites and search terms at a particular point in time), ODI11g along with the Oracle Data Integration Adaptor for Hadoop can work with Hadoop sources in four main ways. After creating a connection to the Hadoop cluster and the Hive server using the Hive JDBC driver and reverse-engineering the Hive table data model into the ODI repository, you can use the IKM File to Hive knowledge module to load data into a previously-defined Hive table, the first step that you need to perform before analyzing data using Hive’s SQL-like interface.

NewImage

Similar features exist for transforming data within Hive using either regular SQL expressions, or custom transformation code, with ODI generating the HiveQL query language understood by Hive, and Hive then turning this into MapReduce routines that sort, sift, aggregate and process data across servers in the Hadoop cluster, typically prior to loading the results into the main part of your data warehouse.

ODI Hive transformation

ODI also makes use of the Oracle Loader for Hadoop, which uses Hadoop and MapReduce to do the “heavy lifting” and then loads the results into an Oracle database, using direct path loads and other optimisations. Other connectors and integrators also exist including Oracle Direct Connector for Hadoop Distributed File System (HDFS), typically used to provide file sources for Oracle external tables, and Oracle R Connector for Hadoop, for linking R statistical and modelling routines to data stored in Hadoop, to allow larger sets of data to be process and modelled within R routines. We’ll cover all of these new products early in 2013, so keep an eye on the blog for more details and use cases, and in the meantime Oracle’s David Allan has been putting together a series of articles on Hive and ODI over on the Oracle Data Integration blog:

So there’s definitely more to ODI than loading and transforming relational data. But what if there’s issues with the quality of the data you’re working with? We’ll look at how ODI integrates with Oracle’s newest data integration product, Oracle Enterprise Data Quality, in the next post in this series.

ODI11g in the Enterprise Part 1: Beyond Data Warehouse Table Loading

Most developers who’ve used Oracle Data Integrator in the past have used it to load data warehouse tables, typically from an Oracle or other relational source into a set of dimensional Oracle target tables. Some source data might come from flat files or XML files, and in some cases the source databases might be SQL Server, IBM DB2, Sybase or mySQL. Most projects of this type load data in batch, in some cases once or twice a day or more recently, in near real-time using micro-batches or push technology such as JMS Queues. Compared to Oracle Warehouse Builder, ODI 11g is a fairly easy-to-understand, extensible tool with a clear product roadmap, few hidden surprises and a solid set of features for manipulating relational sources and targets.

Oracle Data Integrator 11g

Over past few years though, ODI has been extended in various ways to support loading and extracting from applications such as Hyperion Planning and OLAP servers such as Oracle Essbase, through to more recent innovations such as loaders for Hadoop and Oracle R Enterprise. ODI can play a full part in service-orientated architectures providing bulk data-movement functionality to complement SOA messaging, and through various APIs, SDKs and scripting languages, it can take part in DevOps-style software development methods to support techniques such as continuous integration, build automation agile development.

In addition, as ODI has moved away from pure data warehouse ETL-style development through to being a key part of a wider Fusion Middleware deployment, the code it produces becomes mission critical and runs 24 hours-a-day, providing vital data movement and integration within the enterprise. New features such as JEE agents, load plans and OPMN provide resilience and fault tolerance for ODI data integration routines. but these are new features introduced since the Oracle acquisition and many developers may not be aware of their existence. ODI also is part of a suite of data integration tools including products such as Oracle Goldengate (for heterogenous data replication and changed data capture) and Oracle Enterprise Data Quality (the ex-Datanomic toolset used for profiling and cleansing enterprise datasets), as shown in this Oracle graphic from 2012′s Oracle Openworld:

Oracle's Data Integration Toolset

So, over the next few days I’ll be looking at where ODI is now, and how new features introduced over the 11g timeline make it a first-class development tool within the wider Fusion Middleware toolset.

I’ll add the links as I publish each of the articles, but over the next week here’s the ODI 11g topics that I’ll be covering:

I’m also planning on presenting a session on this topic at the upcoming BIWA Summit 2013 in San Francisco, January 2013, so if you’ve got any thoughts or observations that would be worth me incorporating into my final session, feel free to add them to the comments. For now though, check back tomorrow for the first installment, where we’ll look at what other sources and targets ODI 11g can work with, with a particular look at how ODI11g’s new “big data” features, integrating with technologies such as Hadoop, Hive and MapReduce, works.