Tag Archives: Big Data

Trickle-Feeding Log Data into the HBase NoSQL Database using Flume

The other day I posted an article on the blog around using Flume to transport Apache web log entries from our website into Hadoop, with the final destination for the entries being an HDFS file – with the HDFS file essentially mirroring the contents of the webserver log file. Once you’ve set this transport mechanism up, you could create a Hive table over the HDFS files, for example, or further transform the data using Pig, Spark or some other mechanism.

When you load data into HDFS files though, there are a couple of things you need to be aware of; HDFS is optimised for large, streaming reads of files stored in very large disk blocks, with the classic use-case being MapReduce transformations that crunch large sets of incoming data and hand-off the results to another process. What it’s not good at is random retrievals of single file records, something you’ll notice if you try and return a single row from a Hive table request. Moreover, HDFS files are write-once, no updates or overwrites, which is why Hive only supports SELECTS and not UPDATES or DELETES. Altogether, whilst HDFS is great for landing and then processing large chunks of data, if you’re looking for more granular, database-type storage on Hadoop, you’ll need to think of something else.

And within the context of Cloudera Hadoop, that other thing is HBase, a “NoSQL” database that’s also open-source and runs on the Hadoop framework. Whilst you can work with HBase in similar ways to how you work with relational databases – you can create columns, load data into it, insert and update data and so forth – HBase and NoSQL are in lots of ways the complete opposite of relational databases like Oracle Database, as they trade-off things we normally take for granted but that have performance and scalability impacts – ACID transactions, the ability to support complex table relationships, very rich query languages and application support – for extreme scalability and flexibility. If you’re scared of losing your data then HBase is one of the better NoSQL databases, with strong (rather than “eventual”) consistency, automatic shading and lots of high-availability features, but it’s not designed for running your payroll (yet).

One reason we might want to land data in HBase or another NoSQL database, rather than in regular HDFS files, is if we then want to do fast individual record lookups within the landed data. Another reason would be HBase’s support for complex record types, making it easy to store for example nested XML datasets, and its ability – like the Endeca Server – to hold completely different sets of “columns” for each row in the database, and even version those rows giving us almost a “multi-dimensional” database. Internally, HBase stores data as key-value pairs giving it the ability to hold completely different data in each database row, and under the covers HBase data is in turn stored in indexed “StoreFiles” within HDFS, giving it HDFS’s scalability and access to the Hadoop framework, but adding fast random access to individual records.

NewImage

Where HBase (and most NoSQL databases) get complicated though is that there’s no SQL*Developer or TOAD to create tables, and no SQL or PL/SQL to load and manipulate them – it’s all done through Java and custom code – this article by Lars George who gave the Hadoop Masterclass as last week’s BI Forum goes into a bit more detail, along with his HBase slides and his book, “HBase: The Definitive Guide”.

So let’s look at a simple example of loading Apache CombinedLogFormat log file entries into HBase, using Flume to transport and ingest the data from our webserver into Hadoop and put together again by Nelio Guimaraes from the RM team. We’ll start by defining the HBase table, which like regular relational tables has rows but which has the concept of column families and column qualifiers rather than just columns. In practice, a column family + qualifier name makes what we’d normally think of as a column, but crucially under the covers column within families are stored together on disk, like column-store relational databases, making them fast to query and randomly access. Like a spreadsheet or OLAP database each combination of row and column family/qualifier is called a “cell”, and moreover only populated cells are stored on disk, with the added bonus of cell entries being timestamped, giving us the ability to retrieve previous versions of cell entries, like the temporal query feature in Oracle Database 12c.

NewImage

For more details on how HBase stores data, and how HBase schemas are defined, the white paper “Introduction to HBase Schema Design” by Cloudera’s Amandeep Khurana is a good reference point and introduction. So let’s go into the HBase shell and create a table to contain our log data; we’ll define as containing three column families (“common”,”http” and “misc”), with the actual column qualifiers defined at the point we load data into the table – one of the key features of HBase, and most NoSQL databases, is that you can introduce new columns into a store at the point of loading, just by declaring them, with each row potentially containing its own unique selection of columns – which is where Endeca Server gets its ability to store “jagged datasets” with potentially different attribute sets held for groups of rows.

[root@cdh5-node1 ~]# hbase shell
14/05/21 06:00:07 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell

hbase(main):001:0> list
TABLE                                                                           
0 row(s) in 2.8030 seconds

=> []
hbase(main):002:0> create 'apache_access_log', 
hbase(main):003:0* {NAME => 'common'},
hbase(main):004:0* {NAME => 'http'},
hbase(main):005:0* {NAME => 'misc'}
0 row(s) in 0.5460 seconds

In this example, the way we’re going to populate the HBase table is to use Flume; like the Flume and HDFS example the other day, we’ll use a “sink”, in this case a HBase sink, to take the incoming Flume activity off the channel and load it into the HBase table. Flume actually has two HBase sinks; one called HBaseSink which writes synchronously (more straightforward but slower) and another called AysncHBaseSink which writes asynchronously, potentially with higher overall throughput than synchronous writes and with full consistency even if there’s a failure (based on replaying the channel data), but with a slightly more complex serialisation approach. We’ll use the asynchronous sink in this example, and assuming you’ve already got the source configuration file set-up (see the previous blog post on Flume and HDFS for an example), the target Flume conf file in our case looked like this:

## TARGET AGENT ##
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent-hbase.conf -n collector

#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2 mc3

## Channels ##
## Source writes to 3 channels, one for each sink
collector.channels = mc1 mc2 mc3

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 1000

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 1000

collector.channels.mc3.type = memory
collector.channels.mc3.capacity = 1000

## Sinks ##
collector.sinks = LocalOut HadoopOut HbaseOut

## Write copy to Local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory = /var/log/flume-ng
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = /user/root/flume-channel/%{log_type}/%d%m%Y
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600

###############################################################
# HBase sink config 
###############################################################
collector.sinks.HbaseOut.type = org.apache.flume.sink.hbase.AsyncHBaseSink
collector.sinks.HbaseOut.channel = mc3
collector.sinks.HbaseOut.table = apache_access_log
collector.sinks.HbaseOut.columnFamily = common
collector.sinks.HbaseOut.batchSize = 5000
collector.sinks.HbaseOut.serializer = com.hbase.log.util.AsyncHbaseLogEventSerializer
collector.sinks.HbaseOut.serializer.columns = common:rowKey,common:hostname,common:remotehost,common:remoteuser,common:eventtimestamp,http:requestmethod,http:requeststatus,http:responsebytes,misc:referrer,misc:agent

A few points to note:

  • The collector.sinks.HbaseOut.type setting determines the sink type we’ll use, in this case org.apache.flume.sink.hbase.AsyncHBaseSink
  • collector.sinks.HbaseOut.table sets the HBase table name we’ll load, “apache_access_log”
  • collector.sinks.HbaseOut.serializer.columns actually defines the column qualifiers, in this case mapping incoming serialised log file rows into a set of HBase column families and qualifiers
  • collector.sinks.HbaseOut.serializer is the most important bit – and tells HBase how to turn the incoming Flume data into HBase loads, through a Java program called the “serializer”.

And its this serializer, the Java program that does the actual loading of the HBase table, that’s the final piece of the jigsaw. There are standard templates to use when writing this piece of code, and in our case the serializer looked like this:

package com.hbase.log.util;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.*;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;

import com.google.common.base.Charsets;
/**
 * A serializer for the AsyncHBaseSink, which splits the event body into
 * multiple columns and inserts them into a row whose key is available in
 * the headers
 *
 * Originally from https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase
 */
public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer 
{
    private byte[] table;
    private byte[] colFam;
    private Event currentEvent;
    private byte[][] columnNames;
    private final List<PutRequest> puts = new ArrayList<PutRequest>();
    private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
    private byte[] currentRowKey;
    private final byte[] eventCountCol = "eventCount".getBytes();
    // private String delim;

    @Override
    public void initialize(byte[] table, byte[] cf) 
    {
        this.table = table;
        this.colFam = cf;
    }

    @Override
    public void setEvent(Event event) 
    {
        // Set the event and verify that the rowKey is not present
        this.currentEvent = event;
        String rowKeyStr = currentEvent.getHeaders().get("rowKey");
        //if (rowKeyStr == null) {
        //  throw new FlumeException("No row key found in headers!");
        //}
        //currentRowKey = rowKeyStr.getBytes();
    }

    public String[] logTokenize(String event)
    {

        String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
        Pattern p = Pattern.compile(logEntryPattern);
        Matcher matcher = p.matcher(event);

        if (!matcher.matches()) 
        {
            System.err.println("Bad log entry (or problem with RE?):");
            System.err.println(event);
            return null;
        }

        String[] columns = new String[matcher.groupCount()+1];
        
        columns[0]= Long.toString(System.currentTimeMillis());
        
        for (int i = 1; i <= matcher.groupCount(); i++) 
        {
            columns[i] = matcher.group(i);
        }

        return columns;

    }

    @Override
    public List<PutRequest> getActions() 
    {
        // Split the event body and get the values for the columns
        String eventStr = new String(currentEvent.getBody());
        long unixTime = System.currentTimeMillis();
        //String[] cols = eventStr.split(",");
        //String[] cols = eventStr.split(regEx);
        //String[] cols = eventStr.split("\\s+");
        //String[] cols = eventStr.split("\\t");
        //String[] cols = eventStr.split(delim);
        String[] cols = logTokenize(eventStr);
        puts.clear();
        String[] columnFamilyName;
        byte[] bCol;
        byte[] bFam;
        for (int i = 0; i < cols.length; i++) 
        {
            //Generate a PutRequest for each column.
            columnFamilyName = new String(columnNames[i]).split(":");
            bFam = columnFamilyName[0].getBytes();
            bCol = columnFamilyName[1].getBytes();

            if (i == 0) 
            {
                currentRowKey = cols[i].getBytes();
            }
            //PutRequest req = new PutRequest(table, currentRowKey, colFam,
            //columnNames[i], cols[i].getBytes());
            PutRequest req = new PutRequest(table, currentRowKey, bFam,
            bCol, cols[i].getBytes());
            puts.add(req);
        }
        return puts;
    }

    @Override
    public List<AtomicIncrementRequest> getIncrements() 
    {
        incs.clear();
        //Increment the number of events received
        incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
        return incs;
    }

    @Override
    public void cleanUp() 
    {
        table = null;
        colFam = null;
        currentEvent = null;
        columnNames = null;
        currentRowKey = null;
    }

    @Override
    public void configure(Context context) 
    {
        //Get the column names from the configuration
        String cols = new String(context.getString("columns"));
        String[] names = cols.split(",");
        columnNames = new byte[names.length][];
        int i = 0;
        
        for(String name : names) 
        {
            columnNames[i++] = name.getBytes();
        }
        
        //delim = new String(context.getString("delimiter"));
    }

    @Override
    public void configure(ComponentConfiguration conf) {}
}

HBase, rather than supporting the regular SELECT and INSERTS we’re used to with Oracle, instead uses “get” and “put” methods to retrieve, and store, data – along with “delete” and “scan”. The regular synchronous HBase sync uses these methods directly, taking data off the Flume channel and inserting it into the HBase table (or indeed, updating existing rows based on the row key), whilst the asychnronous method uses a layer in-between the incoming data and the write, allowing data (or “events”) to continue streaming in even if all the downstream data hasn’t get been committed. It’s this code though that maps each incoming bit of data – in this case, a parsed log file – to column families and qualifiers in the HBase table, and you’d need to write new code like this, or amend the exiting one, if you wanted to load other HBase tables in your Hadoop cluster – a long way from the point-and-click ETL approach we get with ODI, but a lot more flexible too (if that’s what you want).

Then it’s a case of compiling the Java code, like this:

mkdir com; mkdir com/hbase; mkdir com/hbase/log; mkdir com/hbase/log/util
vi com/hbase/log/util/AsyncHbaseLogEventSerializer.java
export CLASSPATH=/usr/lib/flume-ng/lib/*
javac com/hbase/log/util/AsyncHbaseLogEventSerializer.java
jar cf LogEventUtil.jar com
jar tf LogEventUtil.jar com
chmod 775 LogEventUtil.jar
cp LogEventUtil.jar /usr/lib/flume-ng/lib

Next, we had to run the following command before enabling Flume with this setup, because of an issue we found with Zookeeper stopping Flume working in this setup:

mv /etc/zookeeper/conf/zoo.cfg /etc/zookeeper/conf/zoo.cfg-unused

and finally, we start up the Flume target server agent, followed by the source one (again see the previous article for setting up the source Flume agent):

flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent-hbase.conf -n collector

Then, after a while, log data starts getting loaded into the HBase table. You can check on it using Hue, and the HBase Browser:

NewImage

Or you can go back into the HBase shell and run the scan command to view the data, with each row representing a cell in the overall table storage:

hbase(main):001:0> scan 'apache_access_log'
ROW                   COLUMN+CELL                                               
 1400628560331        column=common:eventtimestamp, timestamp=1400628560350, val
                      ue=20/May/2014:15:28:06 +0000                             
 1400628560331        column=common:hostname, timestamp=1400628560335, value=89.
                      154.89.101                                                
 1400628560331        column=common:remotehost, timestamp=1400628560336, value=-
 1400628560331        column=common:remoteuser, timestamp=1400628560338, value=-
 1400628560331        column=common:rowKey, timestamp=1400628560333, value=14006
                      28560331                                                  
 1400628560331        column=http:requestmethod, timestamp=1400628560352, value=
                      GET / HTTP/1.1                                            
 1400628560331        column=http:requeststatus, timestamp=1400628560356, value=
                      200                                                       
 1400628560331        column=http:responsebytes, timestamp=1400628560358, value=
                      9054                                                      
 1400628560331        column=misc:agent, timestamp=1400628560377, value=Mozilla/
                      5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.
                      14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14     
 1400628560331        column=misc:referrer, timestamp=1400628560359, value=-    
 1400628560344        column=common:eventtimestamp, timestamp=1400628560383, val
                      ue=20/May/2014:15:28:06 +0000

This is all great, and a good starting point if you plan to process your data with other Java programs as the next step. But what if you want to view the data in a more convenient way, perhaps as a regular table? To do that you can use Hive again, this time using Hive’s HBase integration features to tell it the data is stored in HBase format, and to let it know how to display the various HBase column families and qualifiers. In our case, the DDL to create the corresponding Hive table looks like this:

DROP TABLE IF EXISTS hive_apache_access_log;
CREATE EXTERNAL TABLE hive_apache_access_log
(
unixtimestamp string,
eventtimestamp string,
hostname string,
remotehost string,
remoteuser string,
requestmethod string,
requeststatus string,
responsebytes string,
agent string,
referrer string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,common:eventtimestamp,common:hostname,common:remotehost,common:remoteuser,http:requestmethod,http:requeststatus,http:responsebytes,misc:agent,misc:referrer')
TBLPROPERTIES ('hbase.table.name' = 'apache_access_log');

giving us the ability, either from the Hive shell like this, or from tools like OBIEE and ODI, to query the NoSQL database and brings its data into more regular, relational data stores.

hive> select * from hive_apache_access_log;
OK
1400628560331   20/May/2014:15:28:06 +0000  89.154.89.101   -   -   GET / HTTP/1.1  200 9054    Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 -
1400628560344   20/May/2014:15:28:06 +0000  89.154.89.101   -   -   GET /wp-content/plugins/crayon-syntax-highlighter/css/min/crayon.min.css?ver=2.5.0 HTTP/1.1 304 -   Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 http://www.rittmanmead.com/
1400628560345   20/May/2014:15:28:06 +0000  89.154.89.101   -   -   GET /wp-content/plugins/jetpack/modules/widgets/widgets.css?ver=20121003 HTTP/1.304 -   Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 http://www.rittmanmead.com/
...

We’ll be covering more on HBase, and Oracle’s NoSQL Database, in future articles on the blog.

List of our Recent “Getting Started” Hadoop Articles

We’ve published a number of “getting started with Hadoop” articles over the past few months, but these aren’t always easy to find on the blog. I’ve therefore compiled a list of the more recent ones, which you’ll find below:

Trickle-Feeding Log Files to HDFS using Apache Flume

In some previous articles on the blog I’ve analysed Apache webserver log files sitting on a Hadoop cluster using Hive, Pig and most recently, Apache Spark. In all cases the log files have already been sitting on the Hadoop cluster, SFTP’d to my local workstation and then uploaded to HDFS, the Hadoop distributed filesystem, using Hue, and the only way to add to them is to repeat the process and manually copy them across from our webserver. But what if I want these log files to be copied-across automatically, in a kind of “trickle-feed” process similar to how Oracle GoldenGate trickle-feeds database transactions to a data warehouse? Enter Apache Flume, a component within Hadoop and the Cloudera CDH4/5 distribution of Hadoop, which does exactly this.

Flume is an Apache project within the overall Hadoop ecosystem that provides a reliable, distributed mechanism for collecting aggregating and moving large amounts of log data. Similar to GoldenGate it has transaction collectors, mechanisms to reliably transmit data from source to target, and mechanisms to write those log events to a centralised data store, for example HDFS. It’s free and comes with Cloudera CDH, and coupled with something at the target end to then process and work with the incoming log entries, is a pretty powerful and flexible way to transmit log-type entries from (potentially) multiple source providers to a central Hadoop cluster.

To take our example, we’ve got a webserver that’s generating our Apache CombinedLogFormat log entries as users generate activity on the website. We then set up Flume agents on the source webserver, and then the Hadoop client node that’s going to receive the log entries, which then writes the log entries to HDFS just like any other file activity. The Flume agent on the webserver source “tail”s the Apache access.log file copying across entries as they’re made (more or less), so that the target HDFS log file copies are kept up to date with individual log entries, not just whole log files as they’re closed off, with the diagram below showing the overall schematic:

Flume Topology

Down at the Flume component level, Flume consists of agents, Java processes that sit on the source, target and any intermediate servers; channels, intermediate staging points that can persist log entries to disk, database or memory; and sinks, processes that take log transactions out of a channel and write them to disk. Flume is designed to be distributed and resilient, and won’t take the source down if the target Hadoop environment isn’t available; if this type of situation occurs, transactions will slowly fill-up the channel used by the source agent until such time as it runs out of space, and then further log transactions are lost until the target comes back up and the source agent’s channel regains some spare space. The diagram below, from the Cloudera blog about the latest generation of Flume (Flume NG, for “Next Generation”) shows the Flume product topology:

NewImage

whilst the next diagram shows how Flume can collect and aggregate log entries from multiple servers, and then combine them into one log stream sent to a single target.

NewImage

In our example, that’s all there is to it; in more complex examples, perhaps where the source is sending XML log entries, you’d need a downstream processor on the target platform to decode, deserialise or parse the incoming log files – Flume is just a transport mechanism and doesn’t do any transformation itself. You can also choose how the log entries are held by each of the agents’ channels; in the example we’re going to use, channel data is just held in-memory which is fast to run and setup, but you’d lose all of your data in the process if the server went down. Other, more production-level processes would persist the channel entries to file, or even a mySQL database.

For our setup we need to two agents, one on the source and one on the target server, each of which has its own configuration file. The source agent configuration file looks like this, with key entries called-out underneath it:

## SOURCE AGENT ##
## Local instalation: /home/ec2-user/apache-flume
## configuration file location:  /home/ec2-user/apache-flume/conf
## bin file location: /home/ec2-user/apache-flume/bin
## START Agent: bin/flume-ng agent -c conf -f conf/flume-src-agent.conf -n source_agent

# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = apache_server
source_agent.sources.apache_server.type = exec
source_agent.sources.apache_server.command = tail -f /etc/httpd/logs/access_log
source_agent.sources.apache_server.batchSize = 1
source_agent.sources.apache_server.channels = memoryChannel
source_agent.sources.apache_server.interceptors = itime ihost itype

# http://flume.apache.org/FlumeUserGuide.html#timestamp-interceptor
source_agent.sources.apache_server.interceptors.itime.type = timestamp

# http://flume.apache.org/FlumeUserGuide.html#host-interceptor
source_agent.sources.apache_server.interceptors.ihost.type = host
source_agent.sources.apache_server.interceptors.ihost.useIP = false
source_agent.sources.apache_server.interceptors.ihost.hostHeader = host

# http://flume.apache.org/FlumeUserGuide.html#static-interceptor
source_agent.sources.apache_server.interceptors.itype.type = static
source_agent.sources.apache_server.interceptors.itype.key = log_type
source_agent.sources.apache_server.interceptors.itype.value = apache_access_combined

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on Hadoop Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 81.155.163.172
source_agent.sinks.avro_sink.port = 4545

  • Source is set to “apache_server”, i.e. an Apache HTTP server
  • The capture mechanism is the Linux “tail” command
  • Log entries are held by the channel mechanism in-memory, rather than to file or database
  • Timestamp is used by the source collector to tell which entries are new
  • The agent then sends the log entries to a corresponding Flume agent on the Hadoop Cluster, in this case an IP address that corresponds to my network’s external IP address, with Flume network traffic then NATted by my router to cdh4-node1.rittmandev.com, the client node in my CDH4.6 Hadoop cluster running on VMWare.

The target server in my Hadoop cluster then has a corresponding configuration file set up, looking like this:

## TARGET AGENT ##
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector

#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2

## Channels ##
## Source writes to 2 channels, one for each sink
collector.channels = mc1 mc2

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100

## Sinks ##
collector.sinks = LocalOut HadoopOut

## Write copy to Local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory = /var/log/flume-ng
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = /user/root/flume-channel/%{log_type}/%y%m%d
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600

Key entries in this log file are:

  • Apache AVRO is the file format we’re using to transmit the data, and Flume is working on port 4545
  • There’s two sink collector channels defined – “mc1” for writing file entries to the local server filesystem, and one to HDFS
  • The maximum number of events (log entries) Flume will store in the various channels (log entry persistence stores) is 100, meaning that if the target platform goes down and more than 100 log transactions back-up, then further ones will get lost until we can clear the channel down. Of course this limit can be increased, assuming there’s memory or disk spare.

I then SSH into the target Hadoop node and start the Flume agent, like this:

[root@cdh4-node1 ~]# flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.6.1.jar from class path
...
14/05/18 18:15:29 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
14/05/18 18:15:29 INFO hdfs.BucketWriter: Creating /user/root/flume-channel/apache_access_combined/18052014/FlumeData.1400433329254.tmp

and then repeat the step for the source webserver, like this:

[ec2-user@ip-10-35-143-131 apache-flume]$ sudo bin/flume-ng agent -c conf -f conf/flume-src-agent.conf -n source_agent
Warning: JAVA_HOME is not set!
+ exec /usr/bin/java -Xmx20m -cp '/home/ec2-user/apache-flume/conf:/home/ec2-user/apache-flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume-src-agent.conf -n source_agent

Finally, moving across to Hue I can see new log entries being written to the HDFS file system:

NewImage

So there you go – simple transport of webserver log entries from a remote server to my Hadoop cluster, via Apache Flume – thanks again to Nelio Guimaraes from the RM team for setting the example up.

Exploring Apache Spark on the New BigDataLite 3.0 VM

The latest version of Oracle’s BigDataLite VirtualBox VM went up on OTN last week, and amongst other things it includes the latest CDH5.0 version of Cloudera Distribution including Hadoop, as featured on Oracle Big Data Appliance. This new version comes with an update to MapReduce, moving it to MapReduce 2.0 (with 1.0 still there for backwards-compatibility) and with YARN as the replacement for the Hadoop JobTracker. If you’re developing on CDH or the BigDataLite VM you shouldn’t notice any differences with the move to YARN, but it’s a more forward-looking, modular way of tracking and allocating resources on these types of compute clusters that also opens them up to processing models other than MapReduce.

The other new Hadoop feature that you’ll notice with BigDataLite VM and CDH5 is an updated version of Hue, the web-based development environment you use for creating Hive queries, uploading files and so on. As the version of Hue shipped is now Hue 3.5, there’s proper support for quoted CSV files in the Hue / Hive uploader (hooray) along with support for stripping the first, title, line, and an updated Pig editor that prompts you for command syntax (like the Hortonworks Pig editor).

NewImage

This new version of BigDataLite also seems to have had Cloudera Manager removed (or at least, it’s not available as usual at http://bigdatalite:7180), with instead a utility provided on the desktop that allows you to stop and start the various VM services, including the Oracle Database and Oracle NoSQL database that also come with the VM. Strictly-speaking its actually easier to use than Cloudera Manager, but it’s a shame it’s gone as there’s lots of monitoring and configuration tools in the product that I’ve found useful in the past.

NewImage

CDH5 also comes with Apache Spark, a cluster processing framework that’s being positioned as the long-term replacement for MapReduce. Spark is technically a programming model that allows developers to create scripts, or programs, that bring together operators such as filters, aggregators, joiners and group-bys using languages such as Scala and Python, but crucially this can all happen in-memory – making Spark potentially much faster than MapReduce for doing both batch, and ad-hoc analysis.

This article on the Cloudera blog goes into more detail on what Apache Spark is and how it improves over MapReduce, and this article takes a look at the Spark architecture and how it’s approach avoids the multi-stage execution model that MapReduce uses (something you’ll see if you ever do a join in Pig or Hive). But what does some basic Spark code look like, using the default Scala language most people associate Spark with? Let’s take a look at some sample code, using the same Rittman Mead Webserver log files I used in the previous Pig and Hive/Impala articles.

You can start up Spark in interactive mode, like you do with Pig and Grunt, by opening a Terminal session and typing in “spark-shell”:

[oracle@bigdatalite ~]$ spark-shell
14/05/12 20:56:50 INFO HttpServer: Starting HTTP Server
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.9.0
/_/

Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
14/05/12 20:56:56 INFO Slf4jLogger: Slf4jLogger started
14/05/12 20:56:56 INFO Remoting: Starting remoting
14/05/12 20:56:56 INFO Remoting: Remoting started; 
...

14/05/12 20:56:57 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140512205657-0002
14/05/12 20:56:57 INFO SparkILoop: Created spark context..
Spark context available as sc.

scala>

Spark has the concept of RDDs, “Resilient Distributed Datasets” that can be thought of as similar to relations in Pig, and tables in Hive, but which crucially can be cached in RAM for improved performance when you need to access their dataset repeatedly. Like Pig, Spark features “lazy execution”, only processing the various Spark commands when you actually need to (for example, when outputting the results of a data-flow”, so let’s run two more commands to load in one of the log files on HDFS, and then count the log file lines within it.

scala> val logfile = sc.textFile("logs/access_log")
14/05/12 21:18:59 INFO MemoryStore: ensureFreeSpace(77353) called with curMem=234759, maxMem=309225062
14/05/12 21:18:59 INFO MemoryStore: Block broadcast_2 stored as values to memory (estimated size 75.5 KB, free 294.6 MB)
logfile: org.apache.spark.rdd.RDD[String] = MappedRDD[31] at textFile at <console>:15

scala> logfile.count()
14/05/12 21:19:06 INFO FileInputFormat: Total input paths to process : 1
14/05/12 21:19:06 INFO SparkContext: Starting job: count at <console>:1
...
14/05/12 21:19:06 INFO SparkContext: Job finished: count at <console>:18, took 0.192536694 s
res7: Long = 154563

So the file contains 154563 records. Running the logfile.count() command again, though, brings back the count immediately as the RDD has been cached; we can explicitly cache RDDs directly in these commands if we like, by using the “.cache” method:

scala> val logfile = sc.textFile("logs/access_log").cache

So let’s try some filtering, retrieving just those log entries where the user is requesting our BI Apps 11g homepage (“/biapps11g/“):

scala> val biapps11g = logfile.filter(line => line.contains("/biapps11g/"))
biapps11g: org.apache.spark.rdd.RDD[String] = FilteredRDD[34] at filter at <console>:17
scala> biapps11g.count()
...
14/05/12 21:28:28 INFO SparkContext: Job finished: count at <console>:20, took 0.387960876 s
res9: Long = 403

Or I can create a dataset containing just those records that have a 404 error code:

scala> val errors = logfile.filter(_.contains("404"))
errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[36] at filter at <console>:17
scala> errors.count()
...
res11: Long = 1577

Spark, using Scala as the language, has routines for filtering, joining, splitting and otherwise transforming data, but something that’s quite common in Spark is to create Java JAR files, typically from compiled Scala code, to encapsulate certain common data transformations, such as this Apache CombinedFormat Log File parser available on Github from @alvinalexander, author of the Scala Cookbook. Once you’ve compiled this into a JAR file and added it to your SPARK_CLASSPATH (see his blog post for full details, and from where the Spark examples below were taken from), you can start to work with the individual log file elements, like we did in the Hive and Pig examples where we parsed the log file using Regexes.

scala> import com.alvinalexander.accesslogparser._
import com.alvinalexander.accesslogparser._

scala> val p = new AccessLogParser
p: com.alvinalexander.accesslogparser.AccessLogParser = com.alvinalexander.accesslogparser.AccessLogParser@6d32bc14

Then I can access the HTTP Status Code using its own property, like this:

def getStatusCode(line: Option[AccessLogRecord]) = {
  line match {
    case Some(l) => l.httpStatusCode
    case None => "0"
  }
}

log.filter(line => getStatusCode(p.parseRecord(line)) == "404").count 
...
res12: Long = 1233

Then we can use a similar method to retrieve all of the “request” entries in the log file where the user got a 404 error, starting off by defining two methods that will help with the request parsing – note the use of the :paste command which allows you to block-paste a set of commands into the scala-shell:

scala> :paste
// Entering paste mode (ctrl-D to finish)
def getRequest(rawAccessLogString: String): Option[String] = {
  val accessLogRecordOption = p.parseRecord(rawAccessLogString)
  accessLogRecordOption match {
    case Some(rec) => Some(rec.request)
    case None => None
  }
}
def extractUriFromRequest(requestField: String) = requestField.split(" ")(1)
// Exiting paste mode, now interpreting.

getRequest: (rawAccessLogString: String)Option[String]
extractUriFromRequest: (requestField: String)String

Now we can run the code to output the URIs that have been generating 404 errors:

scala> :paste
// Entering paste mode (ctrl-D to finish)
log.filter(line => getStatusCode(p.parseRecord(line)) == "404").map(getRequest(_)).count
val recs = log.filter(line => getStatusCode(p.parseRecord(line)) == "404").map(getRequest(_))
val distinctRecs = log.filter(line => getStatusCode(p.parseRecord(line)) == "404")
                      .map(getRequest(_))
                      .collect { case Some(requestField) => requestField }
                      .map(extractUriFromRequest(_))
                      .distinct
distinctRecs.count
distinctRecs.foreach(println)
...
/wp2/wp-content/uploads/2009/11/fin5.jpg/
wp2/wp-content/uploads/2009/08/qv10.jpg/
wp2/wp-content/uploads/2010/02/image32.png/2013/08/inside-my-home-office-development-lab-vmware-os-x-server/
wp-content/themes/optimize/thumb.php 
...

So Spark is commonly-considered the successor to MapReduce, and you can start playing around with it on the new BigDataLite VM. Unless you’re a Java (or Scala, or Python) programmer, Spark isn’t quite as easy as Pig or Hive to get into, but the potential benefits over MapReduce are impressive and it’d be worth taking a look. Hopefully we’ll have more on Spark on the blog over the next few months, as we get to grips with it properly.

RM BI Forum 2014 Brighton is a Wrap – Now on to Atlanta!

I’m writing this sitting in my hotel room in Atlanta, having flown over from the UK on Saturday following the end of the Rittman Mead BI Forum 2014 in Brighton. I think it’s probably true to say that this year was our best ever – an excellent masterclass on the Wednesday followed by even-more excellent sessions over the two main days, and now we’re doing it all again this week at the Renaissance Atlanta Midtown Hotel in Atlanta, GA.

Wednesday’s guest masterclass was by Cloudera’s Lars George, and covered the worlds of Hadoop, NoSQL and big data analytics over a frantic six-hour session. Lars was a trooper; despite a mistake over the agenda where I’d listed his sessions as being just an hour each when he’d planned (and been told by me) that they were an hour-and-a-half each, he managed to cover all of  the main topics and take the audience through Hadoop basics, data loading and processing, NoSQL and analytics using Hive, Impala, Pig and Spark. Roughly half the audience had some experience with Hadoop with the others just being vaguely acquainted with it, but Lars was an engaging speaker and stuck around for the rest of the day to answer any follow-up questions.

NewImage

For me, the most valuable parts to the session were Lars’ real-world experiences in setting up Hadoop clusters, and his views on what approaches were best to analyse data in a BI and ETL context – with Spark clearly being in-favour now compared to Pig and basic MapReduce. Thanks again Lars, and to Justin Kestelyn from Cloudera for organsising it, and I’ll get a second-chance to sit through it again at the event in Atlanta this week.

The event itself proper kicked-off in the early evening with a drinks reception in the Seattle bar, followed by the Oracle keynote and then dinner. Whilst the BI Forum is primarily a community (developer and customer)-driven event, we’re very pleased to have Oracle also take part, and we traditionally give the opening keynote over to Oracle BI Product Management to take us through the latest product roadmap. This year, Matt Bedin from Oracle came over from the States to deliverer the Brighton keynote, and whilst the contents aren’t under NDA there’s an understanding we don’t blog and tweet the contents in too much detail, which then gives Oracle a bit more leeway to talk about futures and be candid about where their direction is (much like other user group events such as BIWA and ODTUG).

NewImage

I think it’s safe to say that the current focus for OBIEE over the next few months is the new BI in the Cloud Service (see my presentation from Collaborate’14 for more details on what this contains), but we were also given a preview of upcoming functionality for OBIEE around data visualisation, self-service and mobile – watch this space, as they say. Thanks again to Matt Bedin for coming over from the States to delver the keynote, and for his other session later in the week where he demo’d BI in the Cloud and several usage scenarios.

We were also really pleased to be joined by some some of the top OBIEE, Endeca and ODI developers around the US and Europe, including Michael Rainey (Rittman Mead) and Nick Hurt (IFPI), Truls Bergensen, Emiel van Bockel (CB), Robin Moffatt (Rittman Mead), Andrew Bond (Oracle) and Stewart Bryson (Rittman Mead), and none-other than Christian Berg, an independent OBIEE / Essbase developer who’s well-known to the community through his blog and through his Twitter handle, @Nephentur – we’ll have all the slides from the sessions up on the blog once the US event is over, and congratulations to Robin for winning the “Best Speaker” award for Brighton for his presentation “No Silver Bullets: OBIEE Performance in the Real World”.

NewImage

We had a few special overseas guests in Brighton too; Christian Screen from Art of BI Software came across (he’ll be in Atlanta too this week, presenting this time), and we were also joined by Oracle’s Reiner Zimmerman, who some of you from the database/DW-side will known from the Oracle DW Global Leaders’ Program. For me though one of the highlights was the joint session with Oracle’s Andrew Bond and our own Stewart Bryson, where they presented an update to the Oracle Information Management Reference Architecture, something we’ve been developing jointly with Andrew’s team and which now incorporates some of our thoughts around the agile deployment of this type of architecture. More on this on the blog shortly, and look out for the white paper and videos Andrew’s team are producing which should be out on OTN soon.

NewImage

So that’s it for Brighton this year – and now we’re doing it all again in Atlanta this week at the Renaissance Atlanta Midtown Hotel. We’ve got Lars George again delivering his masterclass, and an excellent – dare I say it, even better than Brighton’s – array of sessions including ones on Endeca, the In-Memory Option for the Oracle Database, TimesTen, OBIEE, BI Apps and Essbase. There’s still a few places left so if you’re interested in coming, you can book here and we’ll see you in Atlanta later this week!