Tag Archives: Big Data

Oracle OpenWorld 2015 Roundup Part 2 : Data Integration, and Big Data (in the Cloud…)

In yesterdays part one of our three-part Oracle Openworld 2015 round-up, we looked at the launch of OBIEE12c just before Openworld itself, and the new Data Visualisation Cloud Service that Thomas Kurian demo’d in his mid-week keynote. In part two we’ll look at what happened around data integration both on-premise and in the cloud, along with big data – and as you’ll see they’re too topics that are very much linked this year.

First off, data integration – and like OBIEE12c, ODI 12.2.1 got released a day or so before Openworld as part of the wider Oracle Fusion Middleware 12c Release 2 platform rollout. Some of what was coming in ODI12.2.1 got back-ported to ODI 12.1 earlier in the year in the form of the ODI Enterprise Edition Big Data Options, and we covered the new capabilities it gave ODI in terms of generating Pig and Spark mappings in a series of posts earlier in the year – adding Pig as an execution language gives ODI an ability to create dataflow-style mappings to go with Hive’s set-based transformations, whilst also opening-up access to the wide range of Pig-specific UDF libraries such as DataFu for log analysis. Spark, in the meantime, can be useful for smaller in-memory data transformation jobs and as we’ll see in a moment, lays the foundation for streaming and real-time ingestion capabilities.

NewImage

The other key feature that ODI12.2.1 provides though is better integration with external source control systems. ODI already has some element of version control built in, but as it’s based around ODI’s own repository database tables it’s hard to integrate with more commonly-used enterprise source control tools such as Subversion or Git, and there’s no standard way to handle development concepts like branching, merging and so on. ODI 12.2.1 adds these concepts into core ODI and initially focuses on SVN as the external source control tool, with Git support planned in the near future.

NewImage

Updates to GoldenGate, Enterprise Data Quality and Enterprise Metadata Management were also announced, whilst Oracle Big Data Preparation Cloud Service got its first proper outing since release earlier in the year. Big Data Preparation Cloud Service (BDP for short) to my mind suffers a bit from confusion over what it does and what market it serves – at some point it’s been positioned as a tool for the “citizen data scientist” as it enables data domain experts to wrangle and prepare data for loading into Hadoop, whilst at other times it’s labelled a tool for production data transformation jobs under the control of IT. What is misleading is the “big data” label – it runs on Hadoop and Spark but it’s not limited to big data use-cases, and as the slides below show it’s a great option for loading data into BI Cloud Service as an alternative to more IT-centric tools such as ODI.

NewImage

It was another announcement though at Openworld that made Big Data Prep Service suddenly make a lot more sense – the announcement of a new initiative called Dataflow ML, something Oracle describe as “ETL 2.0” with an entirely cloud-based architecture and heavy use of machine learning (the “ML” in “Dataflow ML”) to automate much of the profiling and discovery process – the key innovation on Big Data Prep Service.

NewImage

It’s early days for Dataflow ML but clearly this is the direction Oracle will want to take as applications and platforms move to the cloud – I called-out ODI’s unsuitability for running in the cloud a couple of years ago and contrasted its architecture with that of cloud-native tools such as Snaplogic, and Dataflow ML is obviously Oracle’s bid to move data integration into the cloud – coupling that with innovations around Spark as the data processing platform and machine-learming to automate routine tasks and it sounds like it could be a winner – watch this space as they say.

So the other area I wanted to cover in this second of three update pieces was on big data. All of the key big data announcements from Oracle came in last year’s Openworld – Big Data Discovery, Big Data SQL, Big Data Prep Service (or Oracle Data Enrichment Cloud Service as it was called back then) and this year saw updates to Big Data SQL (Storage Indexes), Big Data Discovery (general fit-and-finish enhancements) announced at this event. What is probably more significant though is the imminent availability of all this – plus Oracle Big Data Appliance – in Oracle’s Public Cloud.

NewImage

Most big data PoCs I see outside of Oracle start on Amazon AWS and build-out from there – starting at very low-cost and moving from Amazon Elastic MapReduce to Cloudera CDH (via Cloudera Director), for example, or going from cloud to on-premise as the project moves into production. Oracle’s Big Data Cloud Service takes a different approach – instead of using a shared cloud infrastructure and potentially missing the point of Hadoop (single user access to lots of machines, vs. cloud’s timeshared access to slices of machines) Oracle instead effectively lease you a Big Data Appliance along with a bundle of software; the benefits being around performance but with quite a high startup cost vs. starting small with AWS.

The market will tell which approach over time gets most traction, but where Big Data Cloud Service does help tools like Big Data Discovery is that theres much more opportunities for integration and customers will be much more open to an Oracle tool solution compared to those building on commodity hardware and community Hadoop distributions – to my mind every Big Data Cloud Service customer ought to buy BDD and most probably Big Data Prep Service, so as customers adopt cloud as a platform option for big data projects I’d expect an uptick in sales of Oracle’s big data tools.

On a related topic and looping back to Oracle Data Integration, the other announcement in this area that was interesting was around Spark Streaming support in Oracle Data Integrator 12c.

NewImage

ODI12c has got some great batch-style capabilities around Hadoop but as I talked about earlier in the year in an article on Flume, Morphines and Cloudera Search the market is all about real-time data ingestion now, batch is more for one-off historical data loads. Again like Dataflow ML this feature is in beta and probably won’t be out for many months, but when it comes out it’ll complete ODI’s capabilities around big data ingestion – we’re hoping to take part in the beta so keep an eye on the blog for news as it comes out.

So that’s it for part 2 of our Oracle Openworld 2015 update – we’ll complete the series tomorrow with a look at Oracle BI Applications, Oracle Database 12cR2 “sharding” and something very interesting planned for a future Oracle 12c database release – “Analytic Views”.

Forays into Kafka 02 – Enabling Flexible Data Pipelines

One of the defining features of “Big Data” from a technologist’s point of view is the sheer number of tools and permutations at one’s disposal. Do you go Flume or Logstash? Avro or Thrift? Pig or Spark? Foo or Bar? (I made that last one up). This wealth of choice is wonderful because it means we can choose the right tool for the right job each time.

Of course, we need to establish that have indeed chosen the right tool for the right job. But here’s the paradox. How do we easily work out if a tool is going to do what we want of it and is going to be a good fit, without disturbing what we already have in place? Particularly if it’s something that’s going to be part of an existing Productionised data pipeline, inserting a new tool partway through what’s there already is going to risk disrupting that. We potentially end up with a series of cloned environments, all diverging from each other, and not necessarily comparable (not to mention the overhead of the resource to host it all).

The same issue arises when we want to change the code or configuration of an existing pipeline. Bugs creep in, ideas to enhance the processing that you’ve currently got present themselves. Wouldn’t it be great if we could test these changes reliably and with no risk to the existing system?

This is where Kafka comes in. Kafka is very useful for two reasons:

  1. You can use it as a buffer for data that can be consumed and re-consumed on demand
  2. Multiple consumers can all pull the data, independently and at their own rate.

So you take your existing pipeline, plumb in Kafka, and then as and when you want to try out additional tools (or configurations of existing ones) you simply take another ‘tap’ off the existing store. This is an idea that Gwen Shapira put forward in May 2015 and really resonated with me.

I see Kafka sitting right on that Execution/Innovation demarcation line of the Information Management and Big Data Reference Architecture that Oracle and Rittman Mead produced last year:

Kafka enables us to build a pipeline for our analytics that breaks down into two phases:

  1. Data ingest from source into Kafka, simple and reliable. Fewest moving parts as possible.
  2. Post-processing. Batch or realtime. Uses Kafka as source. Re-runnable. Multiple parallel consumers: –
    • Productionised processing into Event Engine, Data Reservoir and beyond
    • Adhoc/loosely controlled Data Discovery processing and re-processing

These two steps align with the idea of “Obtain” and “Scrub” that Rittman Mead’s Jordan Meyer talked about in his BI Forum 2015 Masterclass about the Data Discovery:

So that’s the theory – let’s now look at an example of how Kafka can enable us to build a more flexible and productive data pipeline and environment.

Flume or Logstash? HDFS or Elasticsearch? … All of them!

Mark Rittman wrote back in April 2014 about using Apache Flume to stream logs from the Rittman Mead web server over to HDFS, from where they could be analysed in Hive and Impala. The basic setup looked like this:

Another route for analysing data is through the ELK stack. It does a similar thing – streams logs (with Logstash) in to a data store (Elasticsearch) from where they can be analysed, just with a different set of tools with a different emphasis on purpose. The input is the same – the web server log files. Let’s say I want to evaluate which is the better mechanism for analysing my log files, and compare the two side-by-side. Ultimately I might only want to go forward with one, but for now, I want to try both.

I could run them literally in parallel:

The disadvantage with this is that I have twice the ‘footprint’ on my data source, a Production server. A principle throughout all of this is that we want to remain light-touch on the sources of data. Whether a Production web server, a Production database, or whatever – upsetting the system owners of the data we want is never going to win friends.

An alternative to running in parallel would be to use one of the streaming tools to load data in place of the other, i.e.

or

The issue with this is I want to validate the end-to-end pipeline. Using a single source is better in terms of load/risk to the source system, but less so for validating my design. If I’m going to go with Elasticsearch as my target, Logstash would be the better fit source. Ditto HDFS/Flume. Both support connectors to the other, but using native capabilities always feels to me a safer option (particularly in the open-source world). And what if the particular modification I’m testing doesn’t support this kind of connectivity pattern?

Can you see where this is going? How about this:

The key points here are:

  1. One hit on the source system. In this case it’s flume, but it could be logstash, or another tool. This streams each line of the log file into Kafka in the exact order that it’s read.
  2. Kafka holds a copy of the log data, for a configurable time period. This could be days, or months – up to you and depending on purpose (and disk space!)
  3. Kafka is designed to be distributed and fault-tolerant. As with most of the boxes on this logical diagram it would be physically spread over multiple machines for capacity, performance, and resilience.
  4. The eventual targets, HDFS and Elasticsearch, are loaded by their respective tools pulling the web server entries exactly as they were on disk. In terms of validating end-to-end design we’re still doing that – we’re just pulling from a different source.

Another massively important benefit of Kafka is this:

Sooner or later (and if you’re new to the tool and code/configuration required, probably sooner) you’re going to get errors in your data pipeline. These may be fatal and cause it to fall in a heap, or they may be more subtle and you only realise after analysis that some of your data’s missing or not fully enriched. What to do? Obviously you need to re-run your ingest process. But how easy is that? Where is the source data? Maybe you’ll have a folder full of “.processed” source log files, or an HDFS folder of raw source data that you can reprocess. The issue here is the re-processing – you need to point your code at the alternative source, and work out the range of data to reprocess.

This is all eminently do-able of course – but wouldn’t it be easier just to rerun your existing ingest pipeline and just rewind the point at which it’s going to pull data from? Minimising the amount of ‘replumbing’ and reconfiguration to run a re-process job vs. new ingest makes it faster to do, and more reliable. Each additional configuration change is an opportunity to mis-configure. Each ‘shadow’ script clone for re-running vs normal processing is increasing the risk of code diverging and stale copies being run.

The final pipeline in this simple example looks like this:

  • The source server logs are streamed into Kafka, with a permanent copy up onto Amazon’s S3 for those real “uh oh” moments. Kafka, in a sandbox environment with a ham-fisted sysadmin, won’t be bullet-proof. Better to recover a copy from S3 than have to bother the Production server again. This is something I’ve put in for this specific use case, and wouldn’t be applicable in others.
  • From Kafka the web server logs are available to stream, as if natively from the web server disk itself, through Flume and Logstash.

There’s a variation on a theme of this, that looks like this:

Instead of Flume -> Kafka, and then a second Flume -> HDFS, we shortcut this and have the same Flume agent as is pulling from source writing to HDFS. Why have I not put this as the final pipeline? Because of this:

Let’s say that I want to do some kind of light-touch enrichment on the files, such as extracting the log timestamp in order to partition my web server logs in HDFS by the date of the log entry (not the time of processing, because I’m working with historical files too). I’m using a regex_extractor interceptor in Flume to determine the timestamp from the event data (log entry) being processed. That’s great, and it works well – when it works. If I get my regex wrong, or the log file changes date format, the house of cards comes tumbling down. Now I have a mess, because my nice clean ingest pipeline from the source system now needs fixing and re-running. As before, of course it is possible to write this cleanly so that it doesn’t break, etc etc, but from the point of view of decoupling operations for manageability and flexibility it makes sense to keep them separate (remember the Obtain vs Scrub point above?).

The final note on this is to point out that technically we can implement the pipeline using a Kafka Flume channel, which is a slightly neater way of doing things. The data still ends up in the S3 sink, and available in Kafka for streaming to all the consumers.

Kafka in Action

Let’s take a look at the configuration to put the above theory into practice. I’m running all of this on Oracle’s BigDataLite 4.2.1 VM which includes, amongst many other goodies, CDH 5.4.0. Alongside this I’ve installed into /opt :

  • apache-flume-1.6.0
  • elasticsearch-1.7.3
  • kafka_2.10-0.8.2.1
  • kibana-4.1.2-linux-x64
  • logstash-1.5.4

The Starting Point – Flume -> HDFS

First, we’ve got the initial Logs -> Flume -> HDFS configuration, similar to what Mark wrote about originally:

# http://flume.apache.org/FlumeUserGuide.html#exec-source  
source_agent.sources = apache_server  
source_agent.sources.apache_server.type = exec  
source_agent.sources.apache_server.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_server.batchSize = 1  
source_agent.sources.apache_server.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
source_agent.channels = memoryChannel  
source_agent.channels.memoryChannel.type = memory  
source_agent.channels.memoryChannel.capacity = 100

## Write to HDFS  
source_agent.sinks = hdfs_sink  
source_agent.sinks.hdfs_sink.type = hdfs  
source_agent.sinks.hdfs_sink.channel = memoryChannel  
source_agent.sinks.hdfs_sink.hdfs.path = /user/oracle/incoming/rm_logs/apache_log  
source_agent.sinks.hdfs_sink.hdfs.fileType = DataStream  
source_agent.sinks.hdfs_sink.hdfs.writeFormat = Text  
source_agent.sinks.hdfs_sink.hdfs.rollSize = 0  
source_agent.sinks.hdfs_sink.hdfs.rollCount = 10000  
source_agent.sinks.hdfs_sink.hdfs.rollInterval = 600

After running this

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent 
--conf-file flume_website_logs_02_tail_source_hdfs_sink.conf

we get the logs appearing in HDFS and can see them easily in Hue:

Adding Kafka to the Pipeline

Let’s now add Kafka to the mix. I’ve already set up and started Kafka (see here for how), and Zookeeper’s already running as part of the default BigDataLite build.

First we need to define a Kafka topic that is going to hold the log files. In this case it’s called apache_logs:

$ /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper bigdatalite:2181 
--create --topic apache_logs  --replication-factor 1 --partitions 1

Just to prove it’s there and we can send/receive message on it I’m going to use the Kafka console producer/consumer to test it. Run these in two separate windows:

$ /opt/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh 
--broker-list bigdatalite:9092 --topic apache_logs

$ /opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh 
--zookeeper bigdatalite:2181 --topic apache_logs

With the Consumer running enter some text, any text, in the Producer session and you should see it appear almost immediately in the Consumer window.

Now that we’ve validated the Kafka topic, let’s plumb it in. We’ll switch the existing Flume config to use a Kafka sink, and then add a second Flume agent to do the Kafka -> HDFS bit, giving us this:

The original flume agent configuration now looks like this:

source_agent.sources = apache_log_tail  
source_agent.channels = memoryChannel  
source_agent.sinks = kafka_sink

# http://flume.apache.org/FlumeUserGuide.html#exec-source  
source_agent.sources.apache_log_tail.type = exec  
source_agent.sources.apache_log_tail.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_log_tail.batchSize = 1  
source_agent.sources.apache_log_tail.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
source_agent.channels.memoryChannel.type = memory  
source_agent.channels.memoryChannel.capacity = 100

## Write to Kafka  
source_agent.sinks.kafka_sink.channel = memoryChannel  
source_agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink  
source_agent.sinks.kafka_sink.batchSize = 5  
source_agent.sinks.kafka_sink.brokerList = bigdatalite:9092  
source_agent.sinks.kafka_sink.topic = apache_logs

Restart the kafka-console-consumer.sh from above so that you can see what’s going into Kafka, and then run the Flume agent. You should see the log entries appearing soon after. Remember that kafka-console-consumer.sh is just one consumer of the logs – when we plug in the Flume consumer to write the logs to HDFS we can opt to pick up all of the entries in Kafka, completely independently of what we have or haven’t consumed in kafka-console-consumer.sh.

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent  
--conf-file flume_website_logs_03_tail_source_kafka_sink.conf

[oracle@bigdatalite ~]$ /opt/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh 
--zookeeper bigdatalite:2181 --topic apache_logs  

37.252.227.70 - - [06/Sep/2015:08:08:30 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com)"  
174.121.162.130 - - [06/Sep/2015:08:08:35 +0000] "HEAD /blog HTTP/1.1" 301 - "http://oraerp.com/blog" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)"  
177.71.183.71 - - [06/Sep/2015:08:08:35 +0000] "GET /blog/ HTTP/1.0" 200 145999 "-" "Mozilla/5.0 (compatible; monitis - premium monitoring service; http://www.monitis.com)"  
174.121.162.130 - - [06/Sep/2015:08:08:36 +0000] "HEAD /blog/ HTTP/1.1" 200 - "http://oraerp.com/blog" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)"  
173.192.34.91 - - [06/Sep/2015:08:08:44 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com)"  
217.146.9.53 - - [06/Sep/2015:08:08:58 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis - premium monitoring service; http://www.monitis.com)"  
82.47.31.235 - - [06/Sep/2015:08:08:58 +0000] "GET / HTTP/1.1" 200 36946 "-" "Echoping/6.0.2"

Set up the second Flume agent to use Kafka as a source, and HDFS as the target just as it was before we added Kafka into the pipeline:

target_agent.sources = kafkaSource  
target_agent.channels = memoryChannel  
target_agent.sinks = hdfsSink 

target_agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
target_agent.sources.kafkaSource.zookeeperConnect = bigdatalite:2181  
target_agent.sources.kafkaSource.topic = apache_logs  
target_agent.sources.kafkaSource.batchSize = 5  
target_agent.sources.kafkaSource.batchDurationMillis = 200  
target_agent.sources.kafkaSource.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
target_agent.channels.memoryChannel.type = memory  
target_agent.channels.memoryChannel.capacity = 100

## Write to HDFS  
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink  
target_agent.sinks.hdfsSink.type = hdfs  
target_agent.sinks.hdfsSink.channel = memoryChannel  
target_agent.sinks.hdfsSink.hdfs.path = /user/oracle/incoming/rm_logs/apache_log  
target_agent.sinks.hdfsSink.hdfs.fileType = DataStream  
target_agent.sinks.hdfsSink.hdfs.writeFormat = Text  
target_agent.sinks.hdfsSink.hdfs.rollSize = 0  
target_agent.sinks.hdfsSink.hdfs.rollCount = 10000  
target_agent.sinks.hdfsSink.hdfs.rollInterval = 600

Fire up the agent:

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent -n target_agent 
-f flume_website_logs_04_kafka_source_hdfs_sink.conf

and as the website log data streams in to Kafka (from the first Flume agent) you should see the second Flume agent sending it to HDFS and evidence of this in the console output from Flume:

15/10/27 13:53:53 INFO hdfs.BucketWriter: Creating /user/oracle/incoming/rm_logs/apache_log/FlumeData.1445954032932.tmp

and in HDFS itself:

Play it again, Sam?

All we’ve done to this point is add Kafka into the pipeline, ready for subsequent use. We’ve not changed the nett output of the data pipeline. But, we can now benefit from having Kafka there, by re-running some of our HDFS load without having to go back to the source files. Let’s say we want to partition the logs as we store them. But, we don’t want to disrupt the existing processing. How? Easy! Just create another Flume agent with the additional configuration in to do the partitioning.

target_agent.sources = kafkaSource  
target_agent.channels = memoryChannel  
target_agent.sinks = hdfsSink

target_agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
target_agent.sources.kafkaSource.zookeeperConnect = bigdatalite:2181  
target_agent.sources.kafkaSource.topic = apache_logs  
target_agent.sources.kafkaSource.batchSize = 5  
target_agent.sources.kafkaSource.batchDurationMillis = 200  
target_agent.sources.kafkaSource.channels = memoryChannel  
target_agent.sources.kafkaSource.groupId = new  
target_agent.sources.kafkaSource.kafka.auto.offset.reset = smallest  
target_agent.sources.kafkaSource.interceptors = i1

# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
target_agent.channels.memoryChannel.type = memory  
target_agent.channels.memoryChannel.capacity = 1000

# Regex Interceptor to set timestamp so that HDFS can be written to partitioned  
target_agent.sources.kafkaSource.interceptors.i1.type = regex_extractor  
target_agent.sources.kafkaSource.interceptors.i1.serializers = s1  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = timestamp  
#
# Match this format logfile to get timestamp from it:  
# 76.164.194.74 - - [06/Apr/2014:03:38:07 +0000] "GET / HTTP/1.1" 200 38281 "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)"  
target_agent.sources.kafkaSource.interceptors.i1.regex = (\d{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}\s\+\d{4})  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.pattern = dd/MMM/yyyy:HH:mm:ss Z  
#

## Write to HDFS  
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink  
target_agent.sinks.hdfsSink.type = hdfs  
target_agent.sinks.hdfsSink.channel = memoryChannel  
target_agent.sinks.hdfsSink.hdfs.path = /user/oracle/incoming/rm_logs/apache/%Y/%m/%d/access_log  
target_agent.sinks.hdfsSink.hdfs.fileType = DataStream  
target_agent.sinks.hdfsSink.hdfs.writeFormat = Text  
target_agent.sinks.hdfsSink.hdfs.rollSize = 0  
target_agent.sinks.hdfsSink.hdfs.rollCount = 0  
target_agent.sinks.hdfsSink.hdfs.rollInterval = 600

The important lines of note here (as highlighted above) are:

  • the regex_extractor interceptor which determines the timestamp of the log event, then used in the hdfs.path partitioning structure
  • the groupId and kafka.auto.offset.reset configuration items for the kafkaSource.
    • The groupId ensures that this flume agent’s offset in the consumption of the data in the Kafka topic is maintained separately from that of the original agent that we had. By default it is flume, and here I’m overriding it to new. It’s a good idea to specify this explicitly in all Kafka flume consumer configurations to avoid complications.
    • kafka.auto.offset.reset tells the consumer that if no existing offset is found (which is won’t be, if the groupId is new one) to start from the beginning of the data rather than the end (which is what it will do by default).
    • Thus if you want to get Flume to replay the contents of a Kafka topic, just set the groupId to an unused one (eg ‘foo01’, ‘foo02’, etc) and make sure the kafka.auto.offset.reset is smallest

Now run it (concurrently with the existing flume agents if you want):

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent -n target_agent 
-f flume_website_logs_07_kafka_source_partitioned_hdfs_sink.conf

You should see a flurry of activity (or not, depending on how much data you’ve already got in Kafka), and some nicely partitioned apache logs in HDFS:

Crucially, the existing flume agent and non-partitioned HDFS pipeline stays in place and functioning exactly as it was – we’ve not had to touch it. We could then run two two side-by-side until we’re happy the partitioning is working correctly and then decommission the first. Even at this point we have the benefit of Kafka, because we just turn off the original HDFS-writing agent – the new “live” one continues to run, it doesn’t need reconfiguring. We’ve validated the actual configuration we’re going to use for real, we’ve not had to simulate it up with mock data sources that then need re-plumbing prior to real use.

Clouds and Channels

We’re going to evolve the pipeline a bit now. We’ll go back to a single Flume agent writing to HDFS, but add in Amazon’s S3 as the target for the unprocessed log files. The point here is not so much that S3 is the best place to store log files (although it is a good option), but as a way to demonstrate a secondary method of keeping your raw data available without impacting the source system. It also fits nicely with using the Kafka flume channel to tighten the pipeline up a tad:

Amazon’s S3 service is built on HDFS itself, and Flume can use the S3N protocol to write directly to it. You need to have already set up your S3 ‘bucket’, and have the appropriate AWS Access Key ID and Secret Key. To get this to work I added these credentials to /etc/hadoop/conf.bigdatalite/core-site.xml (I tried specifying them inline with the flume configuration but with no success):

<property>  
    <name>fs.s3n.awsAccessKeyId</name>  
    <value>XXXXXXXXXXXXX</value>  
</property>  
<property>  
    <name>fs.s3n.awsSecretAccessKey</name>  
    <value>YYYYYYYYYYYYYYYYYYYY</value>  
</property>

Once you’ve set up the bucket and credentials, the original flume agent (the one pulling the actual web server logs) can be amended:

source_agent.sources = apache_log_tail  
source_agent.channels = kafkaChannel  
source_agent.sinks = s3Sink

# http://flume.apache.org/FlumeUserGuide.html#exec-source  
source_agent.sources.apache_log_tail.type = exec  
source_agent.sources.apache_log_tail.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_log_tail.batchSize = 1  
source_agent.sources.apache_log_tail.channels = kafkaChannel


## Write to Kafka Channel  
source_agent.channels.kafkaChannel.channel = kafkaChannel  
source_agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel  
source_agent.channels.kafkaChannel.topic = apache_logs  
source_agent.channels.kafkaChannel.brokerList = bigdatalite:9092  
source_agent.channels.kafkaChannel.zookeeperConnect = bigdatalite:2181

## Write to S3  
source_agent.sinks.s3Sink.channel = kafkaChannel  
source_agent.sinks.s3Sink.type = hdfs  
source_agent.sinks.s3Sink.hdfs.path = s3n://rmoff-test/apache  
source_agent.sinks.s3Sink.hdfs.fileType = DataStream  
source_agent.sinks.s3Sink.hdfs.filePrefix = access_log  
source_agent.sinks.s3Sink.hdfs.writeFormat = Text  
source_agent.sinks.s3Sink.hdfs.rollCount = 10000  
source_agent.sinks.s3Sink.hdfs.rollSize = 0  
source_agent.sinks.s3Sink.hdfs.batchSize = 10000  
source_agent.sinks.s3Sink.hdfs.rollInterval = 600

Here the source is the same as before (server logs), but the channel is now Kafka itself, and the sink S3. Using Kafka as the channel has the nice benefit that the data is now already in Kafka, we don’t need that as an explicit target in its own right.

Restart the source agent using this new configuration:

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent 
--conf-file flume_website_logs_09_tail_source_kafka_channel_s3_sink.conf

and you should get the data appearing on both HDFS as before, and now also in the S3 bucket:

Didn’t Someone Say Logstash?

The premise at the beginning of this exercise was that I could extend an existing data pipeline to pull data into a new set of tools, as if from the original source, but without touching that source or the existing configuration in place. So far we’ve got a pipeline that is pretty much as we started with, just with Kafka in there now and an additional feed to S3:

Now we’re going to extend (or maybe “broaden” is a better term) the data pipeline to add Elasticsearch into it:

Whilst Flume can write to Elasticsearch given the appropriate extender, I’d rather use a tool much closer to Elasticsearch in origin and direction – Logstash. Logstash supports Kafka as an input (and an output, if you want), making the configuration ridiculously simple. To smoke-test the configuration just run Logstash with this configuration:

input {  
        kafka {  
                zk_connect => 'bigdatalite:2181'  
                topic_id => 'apache_logs'  
                codec => plain {  
                        charset => "ISO-8859-1"  
                }
                # Use both the following two if you want to reset processing  
                reset_beginning => 'true'  
                auto_offset_reset => 'smallest'

        }  
}

output {  
        stdout {codec => rubydebug }  
        }

A few of things to point out in the input configuration:

  • You need to specify plain codec (assuming your input from Kafka is). The default codec for the Kafka plugin is json, and Logstash does NOT like trying to parse plain text and json as I found out:

    37.252.227.70 - - [06/Sep/2015:08:08:30 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com)" {:exception=>#<NoMethodError: undefined method `[]' for 37.252:Float>, :backtrace=>["/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/event.rb:73:in `initialize'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-codec-json-1.0.1/lib/logstash/codecs/json.rb:46:in `decode'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}

  • As well as specifying the codec, I needed to specify the charset. Without this I got \u0000\xBA\u0001 at the beginning of each message that Logstash pulled from Kafka

  • Specifying reset_beginning and auto_offset_reset tell Logstash to pull everything in from Kafka, rather than starting at the latest offset.

When you run the configuration file above you should see a stream of messages to your console of everything that is in the Kafka topic:

$ /opt/logstash-1.5.4/bin/logstash -f logstash-apache_10_kafka_source_console_output.conf

The output will look like this – note that Logstash has added its own special @version and @timestamp fields:

{  
       "message" => "203.199.118.224 - - [09/Oct/2015:04:13:23 +0000] "GET /wp-content/uploads/2014/10/JFB-View-Selector-LowRes-300x218.png HTTP/1.1" 200 53295 "http://www.rittmanmead.com/2014/10/obiee-how-to-a-view-selector-for-your-dashboard/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"",  
      "@version" => "1",  
    "@timestamp" => "2015-10-27T17:29:06.596Z"  
}

Having proven the Kafka-Logstash integration, let’s do something useful – get all those lovely log entries streaming from source, through Kafka, enriched in Logstash with things like geoip, and finally stored in Elasticsearch:

input {  
        kafka {  
                zk_connect => 'bigdatalite:2181'  
                topic_id => 'apache_logs'  
                codec => plain {  
                        charset => "ISO-8859-1"  
                }
                # Use both the following two if you want to reset processing  
                reset_beginning => 'true'  
                auto_offset_reset => 'smallest'  
        }
}


filter {  
        # Parse the message using the pre-defined "COMBINEDAPACHELOG" grok pattern  
        grok { match => ["message","%{COMBINEDAPACHELOG}"] }

        # Ignore anything that's not a blog post hit, characterised by /yyyy/mm/post-slug form  
        if [request] !~ /^/[0-9]{4}/[0-9]{2}/.*$/ { drop{} }

        # From the blog post URL, strip out the year/month and slug  
        #  http://www.rittmanmead.com/2015/02/obiee-monitoring-and-diagnostics-with-influxdb-and-grafana/  
        #     year  => 2015  
        #     month =>   02  
        #     slug  => obiee-monitoring-and-diagnostics-with-influxdb-and-grafana  
        grok { match => [ "request","/%{NUMBER:post-year}/%{NUMBER:post-month}/(%{NUMBER:post-day}/)?%{DATA:post-slug}(/.*)?$"] }

        # Combine year and month into one field  
        mutate { replace => [ "post-year-month" , "%{post-year}-%{post-month}" ] }

        # Use GeoIP lookup to locate the visitor's town/country  
        geoip { source => "clientip" }

        # Store the date of the log entry (rather than now) as the event's timestamp  
        date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]}  
}

output {  
        elasticsearch { host => "bigdatalite"  index => "blog-apache-%{+YYYY.MM.dd}"}  
        }

Make sure that Elasticsearch is running and then kick off Logstash:

$ /opt/logstash-1.5.4/bin/logstash -f logstash-apache_01_kafka_source_parsed_to_es.conf

Nothing will appear to happen on the console:

log4j, [2015-10-27T17:36:53.228]  WARN: org.elasticsearch.bootstrap: JNA not found. native methods will be disabled.  
Logstash startup completed

But in the background Elasticsearch will be filling up with lots of enriched log data. You can confirm this through the useful kopf plugin to see that the Elasticsearch indices are being created:

and directly through Elasticsearch’s RESTful API too:

$ curl -s -XGET http://bigdatalite:9200/_cat/indices?v|sort  
health status index                  pri rep docs.count docs.deleted store.size pri.store.size  
yellow open   blog-apache-2015.09.30   5   1      11872            0       11mb           11mb  
yellow open   blog-apache-2015.10.01   5   1      13679            0     12.8mb         12.8mb  
yellow open   blog-apache-2015.10.02   5   1      10042            0      9.6mb          9.6mb  
yellow open   blog-apache-2015.10.03   5   1       8722            0      7.3mb          7.3mb

And of course, the whole point of streaming the data into Elasticsearch in the first place – easy analytics through Kibana:

Conclusion

Kafka is awesome :-D

We’ve seen in this article how Kafka enables the implementation of flexible data pipelines that can evolve organically without requiring system rebuilds to implement or test new methods. It allows the data discovery function to tap in to the same source of data as the more standard analytical reporting one, without risking impacting the source system at all.

Forays into Kafka – 01 : Logstash transport / centralisation

The holy trinity of Elasticsearch, Logstash, and Kibana (ELK) are a powerful trio of tools for data discovery and systems diagnostics. In a nutshell, they enable you to easily search through your log files, slice & dice them visually, drill into problem timeframes, and generally be the boss of knowing where your application’s at.

Getting application logs into ELK in the most basic configuration means doing the processing with Logstash local to the application server, but this has two overheads – the CPU required to do the processing, and (assuming you have more than one application server) the management of multiple configurations and deployments across your servers. A more flexible and maintainable architecture is to ship logs from the application server to a separate ELK server with something like Logstash-forwarder (aka Lumberjack), and do all your heavy ELK-lifting there.

In this article I’m going to demonstrate an alternative way of shipping and centralising your logs for Logstash processing, with Apache Kafka.

Kafka is a “publish-subscribe messaging rethought as a distributed commit log”. What does that mean in plainer English? My over-simplified description would be that it is a tool that:

  1. Enables one or more components, local or across many machines, to send messages (of any format) to …
  2. …a centralised store, which may be holding messages from other applications too…
  3. …from where one or more consumers can independently opt to pull these messages in exact order, either as they arrive, batched, or ‘rewinding’ to a previous point in time on demand.

Kafka has been designed from the outset to be distributed and fault-tolerant, and for very high performance (low latency) too. For a good introduction to Kafka and its concepts, the introduction section of the documentation is a good place to start, as is Gwen Shapira’s Kafka for DBAs presentation.

If you’re interested in reading more about Kafka, the article that really caught my imagination with its possibilities was by Martin Kleppmann in which he likens (broadly) Kafka to the unix Pipe concept, being the joiner between components that never had to be designed to talk to each other specifically.

Kafka gets a lot of press in the context of “Big Data”, Spark, and the like, but it also makes a lot of sense as a “pipe” between slightly more ‘mundane’ systems such as Logstash…

Overview

In this article we’re using Kafka at its very simplest – one Producer, one Topic, one Consumer. But hey, if it works and it is a good use of technology who cares if it’s not a gazillion message throughput per second to give us bragging rights on Hacker News

We’re going to run Logstash twice; once on the application server to simply get the logfiles out and in to Kafka, and then again to pull the data from Kafka and process it at our leisure:

Once Logstash has processed the data we’ll load it into Elasticsearch, from where we can do some nice analysis against it in Kibana.

Build

This article was written based on three servers:

  1. Application server (OBIEE)
  2. Kafka server
  3. ELK server

In practice, Kafka could run on the ELK server if you needed it to and throughput was low. If things got busier, splitting them out would make sense as would scaling out Kafka and ELK across multiple nodes each for capacity and resilience. Both Kafka and Elasticsearch are designed to be run distributed and are easy to do so.

The steps below show how to get the required software installed and running.

Networking and Host Names

Make sure that each host has a hostname that is proper (not ‘demo’) and can be resolved from all the other hosts being used. Liberal use of /etc/hosts hardcoding of IP/hostnames and copying to each host is one way around this in a sandbox environment. In the real world use DNS CNAMEs to resolve the static ip of each host.

Make sure that the hostname is accessible from all other machines in use. That is, if you type hostname on one machine:

rmoff@ubuntu-03:~$ hostname  
ubuntu-03

Make sure that you can ping it from another machine:

rmoff@ubuntu-02:/opt$ ping ubuntu-03  
PING ubuntu-03 (192.168.56.203) 56(84) bytes of data.  
64 bytes from ubuntu-03 (192.168.56.203): icmp_seq=1 ttl=64 time=0.530 ms  
64 bytes from ubuntu-03 (192.168.56.203): icmp_seq=2 ttl=64 time=0.287 ms  
[...]

and use netcat to hit a particular port (assuming that something’s listening on that port):

rmoff@ubuntu-02:/opt$ nc -vz ubuntu-03 9200  
Connection to ubuntu-03 9200 port [tcp/*] succeeded!

Application Server – log source (“sampleappv406”)

This is going to be the machine from which we’re collecting logs. In my example it’s OBIEE that’s generating the logs, but it could be any application. All we need to install is Logstash, which is going to ship the logs – unprocessed – over to Kafka. Because we’re working with Kafka, it’s also useful to have the console scripts (that ship with the Kafka distribution) available as well, but strictly speaking, we don’t need to install Kafka on this machine.

  • Downloadkafka is optional, but useful to have the console scripts there for testing
    wget https://download.elastic.co/logstash/logstash/logstash-1.5.4.zip  
    wget http://apache.mirror.anlx.net/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
  • Install
    unzip logstash*.zip  
    tar -xf kafka*
    
    sudo mv kafka* /opt  
    sudo mv logstash* /opt

Kafka host (“ubuntu-02”)

This is our kafka server, where Zookeeper and Kafka run. Messages are stored here before being passed to the consumer.

  • Download
    wget http://apache.mirror.anlx.net/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
  • Install
    tar -xf kafka*  
    sudo mv kafka* /opt
  • ConfigureIf there’s any funny business with your networking, such as a hostname on your kafka server that won’t resolve externally, make sure you set the advertised.host.name value in /opt/kafka*/config/server.properties to a hostname/IP for the kafka server that can be connected to externally.
  • RunUse separate sessions, or even better, screen, to run both these concurrently:
    • Zookeeper
      cd /opt/kafka*  
      bin/zookeeper-server-start.sh config/zookeeper.properties
    • Kafka Server
      cd /opt/kafka*  
      bin/kafka-server-start.sh config/server.properties

ELK host (“ubuntu-03”)

All the logs from the application server (“sampleappv406” in our example) are destined for here. We’ll do post-processing on them in Logstash to extract lots of lovely data fields, store it in Elasticsearch, and produce some funky interactive dashboards with Kibana. If, for some bizarre reason, you didn’t want to use Elasticsearch and Kibana but had some other target for your logs after Logtash had parsed them you could use one of the many other output plugins for Logstash.

  • Downloadkafka is optional, but useful to have the console scripts there for testing
    wget https://download.elastic.co/kibana/kibana/kibana-4.1.2-linux-x64.tar.gz  
    wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.2.zip  
    wget https://download.elastic.co/logstash/logstash/logstash-1.5.4.zip  
    wget http://apache.mirror.anlx.net/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz
  • Install
    tar -xf kibana*  
    unzip elastic*.zip  
    unzip logstash*.zip  
    tar -xf kafka*
    
    sudo mv kafka* /opt  
    sudo mv kibana* /opt  
    sudo mv elastic* /opt  
    sudo mv logstash* /opt
    
    # Kopf is an optional, but very useful, Elasticsearch admin web GUI  
    /opt/elastic*/bin/plugin --install lmenezes/elasticsearch-kopf
  • RunUse separate sessions, or even better, screen, to run both these concurrently:
    /opt/elastic*/bin/elasticsearch  
    /opt/kibana*/bin/kibana

Configuring Kafka

Create the topic. This can be run from any machine with kafka console tools available. The important thing is that you specify the --zookeeper correctly so that it knows where to find Kafka.

cd /opt/kafka*  
bin/kafka-topics.sh --create --zookeeper ubuntu-02:2181 --replication-factor 1 --partitions 1 --topic logstash

Smoke test

  1. Having created the topic, check that the other nodes can connect to zookeeper and see it. The point is less about viewing the topic as checking that the connectivity between the machines is working.
    $ cd /opt/kafka*  
    $ ./bin/kafka-topics.sh --list --zookeeper ubuntu-02:2181  
    logstash

    If you get an error then check that the host resolves and the port is accessible:
    $ nc -vz ubuntu-02 2181
    
    found 0 associations  
    found 1 connections:  
          1: flags=82<CONNECTED,PREFERRED>  
         outif vboxnet0  
         src 192.168.56.1 port 63919  
         dst 192.168.56.202 port 2181  
         rank info not available  
         TCP aux info available
    
    Connection to ubuntu-02 port 2181 [tcp/eforward] succeeded!
  2. Set up a simple producer / consumer test
    1. On the application server node, run a script that will be the producer, sending anything you type to the kafka server:
      cd /opt/kafka*  
      ./bin/kafka-console-producer.sh   
      --broker-list ubuntu-02:9092   
      --topic logstash

      (I always get the warning WARN Property topic is not valid (kafka.utils.VerifiableProperties); it seems to be harmless so ignore it…)

      This will sit waiting for input; you won’t get the command prompt back.

    2. On the ELK node, run a script that will be the consumer:

      cd /opt/kafka*  
      ./bin/kafka-console-consumer.sh   
      --zookeeper ubuntu-02:2181   
      --topic logstash

    3. Now go back to the application server node and enter some text and press enter. You should see the same appear shortly afterwards on the ELK node. This is demonstrating Producer -> Kafka -> Consumer
    4. Optionally, run kafka-console-consumer.sh on a second machine (either the kafka host itself, or on a Mac where you’ve run brew install kafka). Now when you enter something on the Producer, you see both Consumers receive it

If the two above tests work, then you’re good to go. If not, then you’ve got to sort this out first because the later stuff sure isn’t going to.

Configuring Logstash on the Application Server (Kafka Producer)

Logstash has a very simple role on the application server – to track the log files that we want to collect, and pass new content in the log file straight across to Kafka. We’re not doing any fancy parsing of the files this side – we want to be as light-touch as possible. This means that our Logstash configuration is dead simple:

input {  
    file {  
        path =>  ["/app/oracle/biee/instances/instance1/diagnostics/logs/*/*/*.log"]  
    }
}

output {  
    kafka {  
        broker_list => 'ubuntu-02:9092'  
        topic_id => 'logstash'  
    }
}

Notice the wildcards in the path variable – in this example we’re going to pick up everything related to the OBIEE system components here, so in practice you may want to restrict it down a bit at least during development. You can specify multiple path patterns by comma-separating them within the square brackets, and you can use the exclude parameter to (…drum roll…) exclude specific paths from the wildcard match.

If you now run Logstash with the above configuration (assuming it’s saved as logstash-obi-kafka-producer.conf)

/opt/logstash*/bin/logstash -f logstash-obi-kafka-producer.conf

Logstash will now sit and monitor the file paths that you’ve given it. If they don’t exist, it will keep checking. If they did exist, and got deleted and recreated, or truncated – it’ll still pick up the differences. It’s a whole bunch more smart than your average bear^H^H^H^H tail -f.

If you happen to have left your Kafka console consumer running you might be in for a bit of a shock, depending on how much activity there is on your application server:

Talk about opening the floodgates!

Configuring Logstash on the ELK server (Kafka Consumer)

Let’s give all these lovely log messages somewhere to head. We’re going to use Logstash again, but on the ELK server this time, and with the Kafka input plugin:

input {  
    kafka {  
            zk_connect => 'ubuntu-02:2181'  
            topic_id => 'logstash'  
    }
}

output {  
    stdout { codec => rubydebug }  
}

Save and run it:

/opt/logstash*/bin/logstash -f logstash-obi-kafka-consumer.conf

and assuming the application server is still writing new log content we’ll get it written out here:

So far we’re doing nothing fancy at all – simply dumping to the console whatever messages we receive from kafka. In effect, it’s the same as the kafka-console-consumer.sh script that we ran as part of the smoke test earlier. But now we’ve got the messages come in to Logstash we can do some serious processing on them with grok and the like (something I discuss and demonstrate in an earlier article) to pull out meaningful data fields from each log message. The console is not the best place to write this all too – Elasticsearch is! So we specify that as the output plugin instead. An extract of our configuration looks something like this now:

input {  
    kafka {  
            zk_connect => 'ubuntu-02:2181'  
            topic_id => 'logstash'  
    }
}

filter {  
    grok {  
        match => ["file", "%{WLSSERVER}"]  
        [...]  
    }

    geoip { source => "saw_http_RemoteIP"}

[...]  
}

output {  
    elasticsearch {  
        host => "ubuntu-03"  
        protocol=> "http"  
    }
}

Note the [...] bit in the filter section – this is all the really cool stuff where we wring every last bit of value from the log data and split it into lots of useful data fields…which is why you should get in touch with us so we can help YOU with your OBIEE and ODI monitoring and diagnostics solution!

Advert break over, back to the blog. We’ve set up the new hyper-cool config file, we’ve primed the blasters, we’ve set the “lasers” to 11 … we hit run … and …

…nothing happens. “Logstash startup completed” is the last sign of visible life we see from this console. Checking our kafka-console-consumer.sh we can still see the messages are flowing through:

But Logstash remains silent? Well, no – it’s doing exactly what we told it to, which is to send all output to Elasticsearch (and nowhere else), which is exactly what it’s doing. Don’t believe me? Add back in to the output stanza of the configuration file the output to stdout (console in this case):

output {  
    elasticsearch {  
        host => "ubuntu-03"  
        protocol=> "http"  
    }
    stdout { codec => rubydebug }  
}

(Did I mention Logstash is mega-powerful yet? You can combine, split, and filter data streams however you want from and to mulitple sources. Here we’re sending it to both elasticsearch and stdout, but it could easily be sending it to elasticsearch and then conditionally to email, or pagerduty, or enriched data back to Kafka, or … you get the idea)

Re-run Logstash with the updated configuration and sure enough, it’s mute no longer:

(this snippet gives you an idea of the kind of data fields that can be extracted from a log file, and this is one of the less interesting ones, difficult to imagine, I know).

Analysing OBIEE Log Data in Elasticsearch with Kibana

The kopf plugin provides a nice web frontend to some of the administrative functions of Elasticsearch, including a quick overview of the state of a cluster and number of documents. Using it we can confirm we’ve got some data that’s been loaded from our Logstash -> Kafka -> Logstash pipeline:

and now in Kibana:

You can read a lot more about Kibana, including the (minimal) setup required to get it to show data from Elasticsearch, in other articles that I’ve written here, here, and here.

Using Kibana we can get a very powerful but simple view over the data we extracted from the log files, showing things like response times, errors, hosts, data models used, and so on:

MOAR Application Servers

Let’s scale this thing out a bit, and add a second application server into the mix. All we need to do is replicate the Logstash install and configuration on the second application server – everything else remains the same. Doing this we start to see the benefit of centralising the log processing, and decoupling it from the application server.

Set the Logstash ‘producer’ running on the second application server, and the data starts passing through, straight into Elasticsearch and Kibana at the other end, no changes needed.

Reprocessing data

One of the appealing features of Kafka is that it stores data for a period of time. This means that consumers can stream or batch as they desire, and that they can also reprocess data. By acting as a durable ‘buffer’ for the data it means that recovering from a client crash, such as a Logstash failure like this:

Error: Your application used more memory than the safety cap of 500M.  
Specify -J-Xmx####m to increase it (#### = cap size in MB).  
Specify -w for full OutOfMemoryError stack trace

is really simple – you just restart Logstash and it picks up processing from where it left off. Because Kafka tracks the last message that a consumer (Logstash in this case) read, it can scroll back through its log to pass to the consumer just messages that have accumulated since that point.

Another benefit of the data being available in Kafka is the ability to reprocess data because the processing itself has changed. A pertinent example of this is with Logstash. The processing that Logstash can do on logs is incredibly powerful, but it may be that a bug is there in the processing, or maybe an additional enrichment (such as geoip) has been added. Instead of having to go back and bother the application server for all its logs (which may have since been housekept away) we can just rerun our Logstash processing as the Kafka consumer and re-pull the data from Kafka. All that needs doing is telling the Logstash consumer to reset its position in the Kafka log from which it reads:

input {  
    kafka {  
            zk_connect => 'ubuntu-02:2181'  
            topic_id => 'logstash'  
            # Use the following two if you want to reset processing  
            reset_beginning => 'true'  
            auto_offset_reset => 'smallest'  
    }
}

Kafka will keep data for the length of time, or size of data, as defined in the log.retention.minutes and log.retention.bytes configuration settings respectively. This is set globally by default to 7 days (and no size limit), and can be changed globally or per topic.

Conclusion

Logstash with Kafka is a powerful and easy way to stream your application log files off the application server with minimal overhead and then process them on a dedicated host. Elasticsearch and Kibana are a great way to visualise, analyse, and diagnose issues within your application’s log files.

Kafka enables you to loosely couple your application server to your monitoring and diagnostics with minimal overhead, whilst adding the benefit of log replay if you want to reprocess them.

Taking a Look at Oracle Big Data Preparation Cloud Service – Spark-Based Data Transformation in the Cloud

One of the sessions I’m delivering at the upcoming Oracle Openworld 2015 in San Francisco is entitled “Oracle Business Intelligence Cloud Service—Moving Your Complete BI Platform to the Cloud [UGF4906]”, and looks at how you can now migrate your entire OBIEE11g platform into Oracle Public Cloud including data warehouse and data integration routines. Using Oracle BI Cloud Services’ new on-premise RPD upload feature you can upload an existing RPD into BICS and run it from there, with the accompanying data warehouse database moving into Oracle’s Database Cloud Service (and with the restriction that you can’t then edit the repository within BICS, you need to do that on-premise and upload again). For ETL and data integration you can carry on using ODI12c which now has the ability to load data into Oracle Storage Cloud (for file sources) and BICS (via a REST API) as well as the full Oracle DBaaS, but another Oracle option for doing purely cloud-based data processing enrichment has recent become available – Oracle Big Data Preparation Cloud Service. So what is it, how does it work and how is it different to ODI12c?

Oracle Big Data Preparation Cloud Service (“BDP”) is a thin-client application within Oracle Cloud for ingesting, preparing and enriching datasets that don’t have a predefined schema and may well need certain fields obfuscated or cleansed. Being integrated with Oracle Storage Cloud and other infrastructure and platform services within Oracle cloud it’s obviously aimed mainly at data transformation tasks within the Oracle Cloud enviroment, but you can upload and download datasets from your browser for use with on-premise applications. Unlike the more general-purpose Oracle Data Integrator it’s aimed instead at a particular use-case – non-technical information analysts who need to get data transformed, wrangled and enriched before they can make use of it in an environment like Hadoop. In fact the product name is a bit misleading – it runs on a big data platform within Oracle Cloud and like Oracle Big Data Discovery uses Apache Spark for its data processing – but it could potentially be useful for a business analyst to prepare data for loading into Oracle BI Cloud Service, and I’ll cover this angle when I talk about data loading options in by Oracle Openworld session.

Within a logical architecture for a typical big data DW and BI system, BDP sits alongside ODI within the Data Factory and provides self-service, agile transformation capabilities to more business-orientated users. 

NewImage

Oracle Big Data Cloud Preparation Service shares quite a bit of functionality and underlying technology, with Oracle Big Data Discovery – both run on Hadoop, they both use Apache Spark for data processing and transformation, and both offer data transformation and “wrangling” features aimed at non-technical users. Oracle are positioning Big Data Preparation Service as something you’d use in the execution layer of the Oracle Information Management Reference Architecture whereas Big Data Discovery is associated more with the discovery layer – I’d mostly agree but I can see a role for BDD even within the execution layer, as a front-end to the data reservoir that typically now runs alongside relationally-stored data warehouses.

NewImage

Looking back at the slides from one of the recent Strata conferences, for example, sees Oracle positioning BDP as the “operational data preparation” tool for structured and unstructured data – with no defined schema – coming into your information platform, with the enriched output then being used BI tools, enterprise reporting and data discovery tools.

NewImage

 

Apart from the scalability benefits of running BDP on Apache Spark, the other interesting feature in BDP is how it uses Spark’s machine learning capabilities to try to automate as much of the data preparation process as possible, for example detecting credit card numbers in data fields and recommending you obfuscate that column. Similar to BICS and how Oracle have tried to simplify the process of creating reports and dashboards for a small team, BDP runs in the cloud tries to automate and simplify as much of the data preparation and enrichment process as possible, with ODI12c still available for ETL developers to develop more complex transformations.

The development lifecycle for BDP (from the Oracle Big Data Preparation Cloud Service e-book on Oracle’s website) uses a cycle of ingesting, cleaning, enriching and then publishing data using scripts authored using the tool and run on the Apache Spark platform. The diagram below shows the BDP development lifecycle from Oracle’s Big Data Preparation Cloud Service Handbook, and shows how ingestion, enrichment, publishing and governance go in a cycle with the common foundation of the transformation scripts that you build using BDP’s web interface.

NewImage

So let’s walk through an example data preparation exercise using a file of data stored initially in Oracle Storage Cloud Service. After logging into BDP via Oracle Cloud you’re first presented with the Catalog view, listing out all your previous transformations and showing you when they were last used to process some data.

NewImage

To create a transformation you first give it a name, then select the data source and then the file you’re interested in. In my environment I’ve got Oracle Storage Cloud and HDFS available as my main data sources, or I could upload a file from my desktop and start from there.

NewImage

BDP then ingests the file and then uses its machine learning features to process and classify data in each column, recommending column names such as “gender”, “city” and cc_number based on (presumably) some sort of classification model. In the screenshot below you can see a set of these recommendations on the left-hand side of the screen, with the columns themselves listed centre and a summary of the file profiling on the right.

NewImage

Taking a closer look at the profile results panel you can see two of the columns have alerts raised, in red. Clicking on the alert shows that the two columns have credit card data stored in clear text, with the recommendation being to obfuscate or otherwise secure these fields. Clicking on a field then shows the various transformation options, with the obvious choice here being to automatically obfuscate the data in those fields.

NewImage

Once you’ve worked through all the recommendations and added any transformations you choose to add yourself, the final step is to publish your transformation to one of the available targets. In the example below we’ve got Oracle Storage Cloud and HDFS again as potential targets; I’d imagine Oracle will add a connector to BICS soon, for example, so that you can use BDP as a data prep tool for file data that will then be added to your dataset in BICS.

NewImage

So … it’ll be interesting to see where this one goes. Its interesting that Oracle have split out data preparation and data discovery into two tools whilst others are saying theirs can do both, and you’ll still need ODI for the more complex integration jobs. But I like the innovative use of machine learning to do away with much of the manual work required for classification of incoming data fields, and running the whole thing on Spark certainly gives it the potential of scale. A couple of years ago I was worried Oracle didn’t really have a strategy for data integration and ETL in the cloud, but we’re starting to see something happen now.

There’s a big push from the Oracle field at the moment to move customers into the cloud, and I can see BDP getting bundled in with Big Data Cloud Service and BICS as the accompanying cloud data preparation tool. The danger then of course is that Big Data Discovery starts to look less useful, especially with Visual Analyzer already available within BICS and coming soon on-premise with OBIEE12c. My guess is that what we’re seeing now with these initial releases of BDP and BDD is just the start, with BDP adding more automatic enrichment “smarts” and starting to cover integration use-cases too, whilst BDD will put more focus on data visualization and analytics on the data reservoir.

Oracle Big Data Discovery 1.1 now GA, and Available as part of BigDataLite 4.2.1

The new Oracle Big Data Discovery 1.1 release went GA a couple of weeks ago, and came with a bunch of features that addressed show-stoppers in the original 1.0 release; the ability to refresh and reload datasets from Hive, compatibility with Cloudera CDH and Hortonworks HDP Hadoop platforms, Kerberos integration, and the ability to bring in datasets from remote JDBC datasources. If you’re new to Big Data Discovery I covered the initial release in a number of blog posts over the past year or so:

So let’s start by loading some data into Big Data Discovery so that we can explore what’s in it, see the range of attributes and their values and do some basic data clean-up and enrichment. As with Big Data Discovery 1.0 you import (or “sample”) data into Big Data Discovery’s DGraph engine either via file upload, or by using a command-line utility. Data in Hadoop has to be registered in the Hive HCatalog metadata layer, and I’ll start by importing a Hive table mapped to some webserver log files via a Hive SERDE:

NewImage

To import or sample this table into BDD’s DGraph engine I use the following command to invoke the Big Data Discovery Data Processing engine, which reads the Hive table metadata, loads the Hive table data into the DGraph engine (either all rows, or a representative sample) and process/enriches the data to add geocoding, for example:

[oracle@bigdatalite edp_cli]$ ./data_processing_CLI -t apachelog_parsed

This then runs as an Apache Spark job under YARN, progress of which you can track either from the console or through Cloudera Manager / Hue.

[2015-09-04T14:20:43.792-04:00] [DataProcessing] [INFO] [] [org.apache.spark.Logging$class] [tid:main] [userID:oracle] 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: root.oracle
 start time: 1441390841404
 final status: UNDEFINED
 tracking URL: http://bigdatalite.localdomain:8088/proxy/application_1441385366282_0001/
 user: oracle
[2015-09-04T14:20:45.794-04:00] [DataProcessing] [INFO] [] [org.apache.spark.Logging$class] [tid:main] [userID:oracle] Application report for application_1441385366282_0001 (state: ACCEPTED)

Going over to Big Data Discovery Studio I can then see the dataset within the catalog, and then load it into a project and start exploring and transforming the dataset. In the screenshots below I’m cleaning up the date and time field to turn it into a timestamp, and arranging the automatically-derived country, city, region and state attributes into a geography hierarchy. BDD1.1 comes with a bunch of other transformation enhancements including new options for enrichment, the ability to tag values via a whitelist and so on – a full list of new features for BDD1.1 can be found in MOS Doc.ID 2044712.1

NewImage

Crucially now in BDD1.1 you can either refresh a data set with new data from Hive (re-load), or do an incremental update after you’ve selected an attribute as the update (identity) column – in the screenshot below I’m doing this for a dataset uploaded from a file, but you can reload and update dataset from the command-line too which then opens-up the possibility of scripting, scheduling etc.

NewImage

You can also define JDBC data connections in the administration part of BDD Studio, and then type in SQL queries to define data sources that can then be added into your project as a dataset – loading their data directly into the DGraph engine rather than having to stage it in Hadoop beforehand.

NewImage

Then, as with the initial release of Big Data Discovery, you can define joins between the data sets in a project based on common attributes – in this case I’m joining the page URLs in the webserver logs with the page and article data extracted from our WordPress install, sourced from both Hive and Oracle (via JDBC)

NewImage

Other new features in BDD1.1 include the ability to define “applications”, projects and datasets that are considered “production quality” and include details on how to refresh and incrementally load their datasets (designed presumably to facilitate migrations from Endeca Information Discovery), and a number of other new features around usability, data exploration and security. You can download BDD1.1 from Oracle’s Edelivery site, or download it pre-installed and configured as part of the new Big Data Lite 4.2.1 Virtualbox virtual machine.