Category Archives: Rittman Mead

Rittman Mead’s Bar Optimizer

bar-optimizer-600px

At Rittman Mead R&D, we have the privilege of solving some of our clients’ most challenging data problems. We recently built a set of customized data products that leverage the power of Oracle and Cloudera platforms and wanted to share some of the fun we’ve had in creating unique user experiences. We’ve been thinking about how we can lean on our efforts to help make the holidays even more special for the extended Rittman Mead family. With that inspiration, we had several questions on our minds:

  • How can we throw an amazing holiday party?
  • What gifts can we give that we can be sure our coworkers, friends, and family will enjoy?
  • What gifts would we want for ourselves?

After a discussion over drinks, the answers became clear. We decided to create a tool that uses data analytics to help you create exceptional cocktails for the holidays.

Here is how we did it. First, we analyzed the cocktail recipes of three world-renowned cocktail bars: PDT, Employees Only, and Death & Co. We then turned their drink recipes into data and got to work on the Bar Optimizer, which uses analytics on top of that data to help you make the holiday season tastier than ever before.

To use the Bar Optimizer, enter the liquors and other ingredients that you have on hand to see what drinks you can make. It then recommends additional ingredients that let you create the largest variety of new drinks. You can also use this feature to give great gifts based on others’ liquor cabinets. Finally, try using one of our optimized starter kits to stock your bar for a big holiday party. We’ve crunched the numbers to find the fewest bottles that can make the largest variety of cocktails.

Click the annotated screenshot above for details, and contact us if you would like more information about how we build products that take your data beyond dashboards.

Linux cluster sysadmin — OS metric monitoring with colmux

In this mini-series of blog posts I’m taking a look at a few very useful tools that can make your life as the sysadmin of a cluster of Linux machines. This may be a Hadoop cluster, or just a plain simple set of ‘normal’ machines on which you want to run the same commands and monitoring.

First we looked at using SSH keys for intra-machine authorisation, which is a pre-requisite executing the same command across multiple machines using PDSH, as well as what we look at in this article – monitoring OS metrics across a cluster with colmux.

Colmux is written by Mark Seger, the same person who wrote collectl. It makes use of collectl on each target machine to report back OS metrics across a cluster to a single node.

colmux03

Install collectl across the cluster

Using pdsh we can easily install collectl on each node (if it’s not already), which is a pre-requisite for colmux:

pdsh -w root@rnmcluster02-node0[1-4] "yum install -y collectl && service collectl start && chkconfig collectl on"

NB by enabling the collectl service on each node it will capture performance data to file locally, which colmux can replay centrally.

Then install colmux itself, which you can download from Sourceforge. It only needs to be actually installed on a single host, but obviously we could push it out across the cluster with pdsh if we wanted to be able to invoke it on any node at will. Note that here I’m running it on a separate linux box (outside of the cluster) rather than on my Mac:

cd /tmp
# Make sure you get the latest version of collectl-utils, from https://sourceforge.net/projects/collectl-utils/files/
# This example is hardcoded to a version and particular sourceforge mirror
curl -O http://garr.dl.sourceforge.net/project/collectl-utils/collectl-utils-4.8.2/collectl-utils-4.8.2.src.tar.gz
tar xf collectl-utils-4.8.2.src.tar.gz
cd collectl-utils-4.8.2
sudo ./INSTALL
# collectl-utils also includes colplot, so if you might want to use it restart
# apache (assuming it's installed)
sudo service httpd restart

Colmux and networking

Couple of important notes:

  1. The machine you run colmux from needs to have port 2655 open in order for each node’s collectl to send back the data to it.

  2. You also may encounter an issue if you have any odd networking (eg NAT on virtual machines) that causes colmux to not work because it picks the ‘wrong’ network interface of the host to tell collectl on each node to send its data to. Details and workaround here.

colmux in action – real-time view, point in time

Command

colmux -addr 'rnmcluster02-node0[1-4]' -username root

Output

# Mon Dec  1 22:20:40 2014  Connected: 4 of 4
#                    <--------CPU--------><----------Disks-----------><----------Network---------->
#Host                cpu sys inter  ctxsw KBRead  Reads KBWrit Writes   KBIn  PktIn  KBOut  PktOut
rnmcluster02-node01    1   1    28     36      0      0      0      0      0      2      0       2
rnmcluster02-node04    0   0    33     28      0      0     36      8      0      1      0       1
rnmcluster02-node03    0   0    15     17      0      0      0      0      0      1      0       1
rnmcluster02-node02    0   0    18     18      0      0      0      0      0      1      0       1

colmux03

Real-time view, persisted

-cols puts the hosts across the top and time as rows. Specify one or more columns from the output without -cols. In this example it is the values for cpu value, along with the disk read/write (columns 1, 5 and 7 of the metrics as seen above):

Command

colmux -addr 'rnmcluster02-node0[1-4]' -user root -cols 1,5,7

Output

cpu                            KBRead                         KBWrit
 node01 node02 node03 node04 |  node01 node02 node03 node04 |  node01 node02 node03 node04
      0      0      0      0 |       0      0      0      0 |      12     28      0      0
      0      0      0      0 |       0      0      0      0 |      12     28      0      0
      1      0      1      0 |       0      0      0      0 |       0      0      0      0
      0      0      0      0 |       0      0      0      0 |       0      0      0      0
      0      0      0      0 |       0      0      0      0 |       0      0      0      0
      0      0      0      0 |       0      0      0      0 |       0     20      0      0
      0      0      0      0 |       0      0      0      0 |      52      4      0      0
      0      0      0      2 |       0      0      0      0 |       0      0      0      0
      1      0      0      0 |       0      0      0      0 |       0      0      0      0
     15     16     15     15 |       0      4      4      4 |      20     40     32     48
      0      0      1      1 |       0      0      0      0 |       0      0      4      0
      1      0      0      0 |       0      0      0      0 |       0      0      0      0

colmux04

To check the numbers of the columns that you want to reference, run the command with the --test argument:

colmux -addr 'rnmcluster02-node0[1-4]' -user root --test

>>> Headers <<<
#                    <--------CPU--------><----------Disks-----------><----------Network---------->
#Host                cpu sys inter  ctxsw KBRead  Reads KBWrit Writes   KBIn  PktIn  KBOut  PktOut

>>> Column Numbering <<<
 0 #Host   1 cpu     2 sys     3 inter   4 ctxsw   5 KBRead  6 Reads   7 KBWrit
 8 Writes  9 KBIn   10 PktIn  11 KBOut  12 PktOut

And from there you get the numbers of the columns to reference in the -cols argument.

To include the timestamp, use -oT in the -command and offset the column numbers by 1:

Command

colmux -addr 'rnmcluster02-node0[1-4]' -user root -cols 2,6,8 -command '-oT'

Output

sys                            Reads                          Writes
#Time    node01 node02 node03 node04 |  node01 node02 node03 node04 |  node01 node02 node03 node04
22:24:50      0      0      0      0 |       0      0      0      0 |       0      0      0      0
22:24:51      1      0      0      0 |       0      0      0      0 |       0      0      0      0
22:24:52      0      0      0      0 |       0      0      0      0 |       0     16      0     16
22:24:53      1      0      0      0 |       0      0      0      0 |      36      0     16      0
22:24:54      0      0      0      1 |       0      0      0      0 |       0      0      0      0
22:24:55      0      0      0      0 |       0      0      0      0 |       0     20     32     20

NB There’s a bug with colmux 4.8.2 that prevents you accessing the first metric with -cols when you also enable timestamp -oTdetails here.

Specifying additional metrics

Collectl (which is what colmux calls to get the data) can fetch metrics from multiple subsystems on a node. You can access all of these through colmux too. By default when you run colmux you get cpu, disk and network but you can specify others using the -s argument followed by the subsystem identifier.

To examine the available subsystems run collectl on one of the target nodes:

[root@rnmcluster02-node01 ~]# collectl --showsubsys
The following subsystems can be specified in any combinations with -s or
--subsys in both record and playbackmode.  [default=bcdfijmnstx]

These generate summary, which is the total of ALL data for a particular type
  b - buddy info (memory fragmentation)
  c - cpu
  d - disk
  f - nfs
  i - inodes
  j - interrupts by CPU
  l - lustre
  m - memory
  n - network
  s - sockets
  t - tcp
  x - interconnect (currently supported: OFED/Infiniband)
  y - slabs

From the above list we can see that if we want to also show memory detail alongside CPU we need to include m and c in the subsystem list:

Command

colmux -addr 'rnmcluster02-node0[1-4]' -user root -command '-scm'

Output

# Tue Dec  2 08:02:38 2014  Connected: 4 of 4
#                    <--------CPU--------><-----------Memory----------->
#Host                cpu sys inter  ctxsw Free Buff Cach Inac Slab  Map
rnmcluster02-node02    1   0    19     18  33M  15M 345M 167M  30M  56M
rnmcluster02-node04    0   0    30     24  32M  15M 345M 167M  30M  56M
rnmcluster02-node03    0   0    30     36  32M  15M 345M 165M  30M  56M
rnmcluster02-node01    0   0    16     16  29M  15M 326M 167M  27M  81M

Changing the sample frequency

To change the sample frequency use the -i syntax in -command:

Command

colmux -addr 'rnmcluster02-node0[1-4]' -user root -command '-scm -i10 -oT' -cols 2,4

Samples every 10 seconds:

sys                            ctxsw
#Time    node01 node02 node03 node04 |  node01 node02 node03 node04
08:06:29     -1     -1     -1     -1 |      -1     -1     -1     -1
08:06:39     -1     -1     -1     -1 |      -1     -1     -1     -1
08:06:49      0      0      0      0 |      14     13     15     19
08:06:59      0      0      0      0 |      13     13     17     21
08:07:09      0      0      0      0 |      19     18     15     24
08:07:19      0      0      0      0 |      13     13     15     19
08:07:29      0      0      0      0 |      13     13     14     19
08:07:39      0      0      0      0 |      12     13     13     19

Column width

Add the -colwidth argument

Command

colmux -addr 'rnmcluster02-node0[1-4]' -user root -command '-scm' -cols 1 -colwidth 20

Output

cpu
  rnmcluster02-node01  rnmcluster02-node02  rnmcluster02-node03  rnmcluster02-node04
                   -1                   -1                   -1                   -1
                   -1                   -1                   -1                   -1
                    1                    0                    0                    0
                    0                    0                    0                    0
                    0                    1                    0                    0
                    0                    0                    1                    0
                    1                    0                    1                    0
                    0                    1                    0                    0

Playback

As well as running interactively, collectl can run as a service and record metric samples to disk. Using colmux you can replay these from across the cluster.

Within the -command, include -p and the path to the collectl log files (assumes that it is the same on each host). As with real-time mode, for different subsystems change the flags after -s

Command

colmux -addr 'rnmcluster02-node0[1-4]' -user root -command '-p /var/log/collectl/*20141201* -scmd -oD'

Output

[...]
# 21:48:50  Reporting: 4 of 4
#                                      <--------CPU--------><-----------Memory-----------><----------Disks----------->
#Host               Date     Time      cpu sys inter  ctxsw Free Buff Cach Inac Slab  Map KBRead  Reads KBWrit Writes
rnmcluster02-node04 20141201 21:48:50    0   0    17     15  58M  10M 340M 162M  30M  39M      0      0      1      0
rnmcluster02-node03 20141201 21:48:50    0   0    11     13  58M  10M 340M 160M  30M  39M      0      0      0      0
rnmcluster02-node02 20141201 21:48:50    0   0    11     15  58M  10M 340M 163M  29M  39M      0      0      1      0
rnmcluster02-node01 20141201 21:48:50    0   0    12     14  33M  12M 342M 157M  27M  63M      0      0      1      0

# 21:49:00  Reporting: 4 of 4
#                                      <--------CPU--------><-----------Memory-----------><----------Disks----------->
#Host               Date     Time      cpu sys inter  ctxsw Free Buff Cach Inac Slab  Map KBRead  Reads KBWrit Writes
rnmcluster02-node04 20141201 21:49:00    0   0    17     15  58M  10M 340M 162M  30M  39M      0      0      4      0
rnmcluster02-node03 20141201 21:49:00    0   0    13     14  58M  10M 340M 160M  30M  39M      0      0      5      0
rnmcluster02-node02 20141201 21:49:00    0   0    12     14  58M  10M 340M 163M  29M  39M      0      0      1      0
rnmcluster02-node01 20141201 21:49:00    0   0    12     15  33M  12M 342M 157M  27M  63M      0      0      6      0

# 21:49:10  Reporting: 4 of 4
#                                      <--------CPU--------><-----------Memory-----------><----------Disks----------->
#Host               Date     Time      cpu sys inter  ctxsw Free Buff Cach Inac Slab  Map KBRead  Reads KBWrit Writes
rnmcluster02-node04 20141201 21:49:10    0   0    23     23  58M  10M 340M 162M  30M  39M      0      0      1      0
rnmcluster02-node03 20141201 21:49:10    0   0    19     24  58M  10M 340M 160M  30M  39M      0      0      2      0
rnmcluster02-node02 20141201 21:49:10    0   0    18     23  58M  10M 340M 163M  29M  39M      0      0      2      1
rnmcluster02-node01 20141201 21:49:10    0   0    18     24  33M  12M 342M 157M  27M  63M      0      0      1      0
[...]

Restrict the time frame by adding to -command the arguments -from and/or -thru

[oracle@rnm-ol6-2 ~]$ colmux -addr 'rnmcluster02-node0[1-4]' -user root -command '-p /var/log/collectl/*20141201* -scmd -oD --from 21:40:00 --thru 21:40:10'
# 21:40:00  Reporting: 4 of 4
#                                      <--------CPU--------><-----------Memory-----------><----------Disks----------->
#Host               Date     Time      cpu sys inter  ctxsw Free Buff Cach Inac Slab  Map KBRead  Reads KBWrit Writes
rnmcluster02-node04 20141201 21:40:00    0   0    16     14  59M  10M 340M 162M  30M  39M      0      0      0      0
rnmcluster02-node03 20141201 21:40:00    0   0    12     14  58M  10M 340M 160M  30M  39M      0      0      8      1
rnmcluster02-node02 20141201 21:40:00    0   0    12     15  59M  10M 340M 162M  30M  39M      0      0      6      1
rnmcluster02-node01 20141201 21:40:00    0   0    13     16  56M  11M 341M 156M  27M  42M      0      0      7      1

# 21:40:10  Reporting: 4 of 4
#                                      <--------CPU--------><-----------Memory-----------><----------Disks----------->
#Host               Date     Time      cpu sys inter  ctxsw Free Buff Cach Inac Slab  Map KBRead  Reads KBWrit Writes
rnmcluster02-node04 20141201 21:40:10    0   0    26     33  59M  10M 340M 162M  30M  39M      1      0     10      2
rnmcluster02-node03 20141201 21:40:10    0   0    20     31  58M  10M 340M 160M  30M  39M      0      0      4      1
rnmcluster02-node02 20141201 21:40:10    0   0    23     35  59M  10M 340M 162M  30M  39M      3      0      9      2
rnmcluster02-node01 20141201 21:40:10    0   0    23     37  56M  11M 341M 156M  27M  42M      4      1      4      1


[oracle@rnm-ol6-2 ~]$

colmux reference

You can find more about colmux from the website:

as well as the built in man page man colmux

Visualising collectl data with colplot

As a little bonus to the above, colmux is part of the collectl-utils package, which also includes colplot, a gnuplot-based web tool that renders collectl data into graphs. It’s pretty easy to set up, running under Apache just fine and just needing gnuplot installed if you haven’t already. It can report metrics across a cluster if you make sure that you first make each node’s collectl data available locally to colplot.

Navigating to the web page shows the interface from which you can trigger graph plots based on the collectl data available:

colplot’s utilitarian graphs are a refreshing contrast to every webapp that is built nowadays promising “beautiful” visualisations (which no doubt the authors are “passionate” about making “awesome”):

The graphs are functional and can be scaled as needed, but each change is a trip back to the front page to tweak options and re-render:

colplot

For me, colplot is an excellent tool for point-in-time analysis and diagnostics, but for more generalised monitoring with drilldown into detail, it is too manual to be viable and I’ll be sticking with collectl -> graphite -> grafana with its interactive and flexible graph rendering:

grafana

Do note however that colplot specifically does not drop data points, so if there is a spike in your data you will see it. Other tools (possibly including graphite but I’ve not validated this) will, for larger timespans average out data series so as to provide a smoother picture of a metric (eg instead of a point every second, maybe every ten seconds). If you are doing close analysis of a system’s behaviour in a particular situation this may be a problem. If you are wanting more generalised overview of a system’s health, with the option to drill into data historical as needed, it will be less of an issue.

Summary

When working with multiple Linux machines I would first and foremost make sure SSH keys are set up in order to ease management through password-less logins.

After SSH keys, I would recommend pdsh for parallel execution of the same SSH command across the cluster. It’s a big time saver particularly when initially setting up the cluster given the installation and configuration changes that are inevitably needed.

To monitor a cluster I would always recommend collectl as the base metric collector. colmux works excellently for viewing these metrics from across the cluster in a single place from the commandline. For viewing the metrics over the longer term you can either store them in (or replay them into) Graphite/Carbon, and render them in Grafana. You have the option of colplot too since this is installed as part of colmux.

So now your turn – what particular tools or tips do you have for working with a cluster of Linux machines? Leave your answers in the comments below, or tweet them to me at @rmoff.

Linux cluster sysadmin — Parallel command execution with PDSH

In this series of blog posts I’m taking a look at a few very useful tools that can make your life as the sysadmin of a cluster of Linux machines easier. This may be a Hadoop cluster, or just a plain simple set of ‘normal’ machines on which you want to run the same commands and monitoring.

Previously we looked at using SSH keys for intra-machine authorisation, which is a pre-requisite what we’ll look at here — executing the same command across multiple machines using PDSH. In the next post of the series we’ll see how we can monitor OS metrics across a cluster with colmux.

PDSH is a very smart little tool that enables you to issue the same command on multiple hosts at once, and see the output. You need to have set up ssh key authentication from the client to host on all of them, so if you followed the steps in the first section of this article you’ll be good to go.

The syntax for using it is nice and simple:

  • -w specifies the addresses. You can use numerical ranges [1-4] and/or comma-separated lists of hosts. If you want to connect as a user other than the current user on the calling machine, you can specify it here (or as a separate -l argument)
  • After that is the command to run.

For example run against a small cluster of four machines that I have:

robin@RNMMBP $ pdsh -w root@rnmcluster02-node0[1-4] date

rnmcluster02-node01: Fri Nov 28 17:26:17 GMT 2014
rnmcluster02-node02: Fri Nov 28 17:26:18 GMT 2014
rnmcluster02-node03: Fri Nov 28 17:26:18 GMT 2014
rnmcluster02-node04: Fri Nov 28 17:26:18 GMT 2014

PDSH can be installed on the Mac under Homebrew (did I mention that Rittman Mead laptops are Macs, so I can do all of this straight from my work machine… :-) )

brew install pdsh

And if you want to run it on Linux from the EPEL yum repository (RHEL-compatible, but packages for other distros are available):

yum install pdsh

You can run it from a cluster node, or from your client machine (assuming your client machine is mac/linux).

Example – install and start collectl on all nodes

I started looking into pdsh when it came to setting up a cluster of machines from scratch. One of the must-have tools I like to have on any machine that I work with is the excellent collectl. This is an OS resource monitoring tool that I initially learnt of through Kevin Closson and Greg Rahn, and provides the kind of information you’d get from top etc – and then some! It can run interactively, log to disk, run as a service – and it also happens to integrate very nicely with graphite, making it a no-brainer choice for any server.

So, instead of logging into each box individually I could instead run this:

pdsh -w root@rnmcluster02-node0[1-4] yum install -y collectl
pdsh -w root@rnmcluster02-node0[1-4] service collectl start
pdsh -w root@rnmcluster02-node0[1-4] chkconfig collectl on

Yes, I know there are tools out there like puppet and chef that are designed for doing this kind of templated build of multiple servers, but the point I want to illustrate here is that pdsh enables you to do ad-hoc changes to a set of servers at once. Sure, once I have my cluster built and want to create an image/template for future builds, then it would be daft if I were building the whole lot through pdsh-distributed yum commands.

Example – setting up the date/timezone/NTPD

Often the accuracy of the clock on each server in a cluster is crucial, and we can easily do this with pdsh:

Install packages

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] yum install -y ntp ntpdate

Set the timezone:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] ln -sf /usr/share/zoneinfo/Europe/London /etc/localtime

Force a time refresh:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] ntpdate pool.ntp.org
rnmcluster02-node03: 30 Nov 20:46:22 ntpdate[27610]: step time server 176.58.109.199 offset -2.928585 sec
rnmcluster02-node02: 30 Nov 20:46:22 ntpdate[28527]: step time server 176.58.109.199 offset -2.946021 sec
rnmcluster02-node04: 30 Nov 20:46:22 ntpdate[27615]: step time server 129.250.35.250 offset -2.915713 sec
rnmcluster02-node01: 30 Nov 20:46:25 ntpdate[29316]: 178.79.160.57 rate limit response from server.
rnmcluster02-node01: 30 Nov 20:46:22 ntpdate[29316]: step time server 176.58.109.199 offset -2.925016 sec

Set NTPD to start automatically at boot:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] chkconfig ntpd on

Start NTPD:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] service ntpd start

Example – using a HEREDOC (here-document) and sending quotation marks in a command with PDSH

Here documents (heredocs) are a nice way to embed multi-line content in a single command, enabling the scripting of a file creation rather than the clumsy instruction to “open an editor and paste the following lines into it and save the file as /foo/bar”.

Fortunately heredocs work just fine with pdsh, so long as you remember to enclose the whole command in quotation marks. And speaking of which, if you need to include quotation marks in your actual command, you need to escape them with a backslash. Here’s an example of both, setting up the configuration file for my ever-favourite gnu screen on all the nodes of the cluster:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] "cat > ~/.screenrc <<EOF
hardstatus alwayslastline \"%{= RY}%H %{kG}%{G} Screen(s): %{c}%w %=%{kG}%c  %D, %M %d %Y  LD:%l\"
startup_message off
msgwait 1
defscrollback 100000
nethack on
EOF
"

Now when I login to each individual node and run screen, I get a nice toolbar at the bottom:

Combining commands

To combine commands together that you send to each host you can use the standard bash operator semicolon ;

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] "date;sleep 5;date"
rnmcluster02-node01: Sun Nov 30 20:57:06 GMT 2014
rnmcluster02-node03: Sun Nov 30 20:57:06 GMT 2014
rnmcluster02-node04: Sun Nov 30 20:57:06 GMT 2014
rnmcluster02-node02: Sun Nov 30 20:57:06 GMT 2014
rnmcluster02-node01: Sun Nov 30 20:57:11 GMT 2014
rnmcluster02-node03: Sun Nov 30 20:57:11 GMT 2014
rnmcluster02-node04: Sun Nov 30 20:57:11 GMT 2014
rnmcluster02-node02: Sun Nov 30 20:57:11 GMT 2014

Note the use of the quotation marks to enclose the entire command string. Without them the bash interpretor will take the ; as the delineator of the local commands, and try to run the subsequent commands locally:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node0[1-4] date;sleep 5;date
rnmcluster02-node03: Sun Nov 30 20:57:53 GMT 2014
rnmcluster02-node04: Sun Nov 30 20:57:53 GMT 2014
rnmcluster02-node02: Sun Nov 30 20:57:53 GMT 2014
rnmcluster02-node01: Sun Nov 30 20:57:53 GMT 2014
Sun 30 Nov 2014 20:58:00 GMT

You can also use && and || to run subsequent commands conditionally if the previous one succeeds or fails respectively:

robin@RNMMBP $ pdsh -w root@rnmcluster02-node[01-4] "chkconfig collectl on && service collectl start"

rnmcluster02-node03: Starting collectl: [  OK  ]
rnmcluster02-node02: Starting collectl: [  OK  ]
rnmcluster02-node04: Starting collectl: [  OK  ]
rnmcluster02-node01: Starting collectl: [  OK  ]

Piping and file redirects

Similar to combining commands above, you can pipe the output of commands, and you need to use quotation marks to enclose the whole command string.

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node[01-4] "chkconfig|grep collectl"
rnmcluster02-node03: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off
rnmcluster02-node01: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off
rnmcluster02-node04: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off
rnmcluster02-node02: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off

However, you can pipe the output from pdsh to a local process if you want:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node[01-4] chkconfig|grep collectl
rnmcluster02-node02: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off
rnmcluster02-node04: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off
rnmcluster02-node03: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off
rnmcluster02-node01: collectl           0:off   1:off   2:on    3:on    4:on    5:on    6:off

The difference is that you’ll be shifting the whole of the pipe across the network in order to process it locally, so if you’re just grepping etc this doesn’t make any sense. For use of utilities held locally and not on the remote server though, this might make sense.

File redirects work the same way – within quotation marks and the redirect will be to a file on the remote server, outside of them it’ll be local:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node[01-4] "chkconfig>/tmp/pdsh.out"
robin@RNMMBP ~ $ ls -l /tmp/pdsh.out
ls: /tmp/pdsh.out: No such file or directory

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node[01-4] chkconfig>/tmp/pdsh.out
robin@RNMMBP ~ $ ls -l /tmp/pdsh.out
-rw-r--r--  1 robin  wheel  7608 30 Nov 19:23 /tmp/pdsh.out

Cancelling PDSH operations

As you can see from above, the precise syntax of pdsh calls can be hugely important. If you run a command and it appears ‘stuck’, or if you have that heartstopping realisation that the shutdown -h now you meant to run locally you ran across the cluster, you can press Ctrl-C once to see the status of your commands:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node[01-4] sleep 30
^Cpdsh@RNMMBP: interrupt (one more within 1 sec to abort)
pdsh@RNMMBP:  (^Z within 1 sec to cancel pending threads)
pdsh@RNMMBP: rnmcluster02-node01: command in progress
pdsh@RNMMBP: rnmcluster02-node02: command in progress
pdsh@RNMMBP: rnmcluster02-node03: command in progress
pdsh@RNMMBP: rnmcluster02-node04: command in progress

and press it twice (or within a second of the first) to cancel:

robin@RNMMBP ~ $ pdsh -w root@rnmcluster02-node[01-4] sleep 30
^Cpdsh@RNMMBP: interrupt (one more within 1 sec to abort)
pdsh@RNMMBP:  (^Z within 1 sec to cancel pending threads)
pdsh@RNMMBP: rnmcluster02-node01: command in progress
pdsh@RNMMBP: rnmcluster02-node02: command in progress
pdsh@RNMMBP: rnmcluster02-node03: command in progress
pdsh@RNMMBP: rnmcluster02-node04: command in progress
^Csending SIGTERM to ssh rnmcluster02-node01
sending signal 15 to rnmcluster02-node01 [ssh] pid 26534
sending SIGTERM to ssh rnmcluster02-node02
sending signal 15 to rnmcluster02-node02 [ssh] pid 26535
sending SIGTERM to ssh rnmcluster02-node03
sending signal 15 to rnmcluster02-node03 [ssh] pid 26533
sending SIGTERM to ssh rnmcluster02-node04
sending signal 15 to rnmcluster02-node04 [ssh] pid 26532
pdsh@RNMMBP: interrupt, aborting.

If you’ve got threads yet to run on the remote hosts, but want to keep running whatever has already started, you can use Ctrl-C, Ctrl-Z:

robin@RNMMBP ~ $ pdsh -f 2 -w root@rnmcluster02-node[01-4] "sleep 5;date"
^Cpdsh@RNMMBP: interrupt (one more within 1 sec to abort)
pdsh@RNMMBP:  (^Z within 1 sec to cancel pending threads)
pdsh@RNMMBP: rnmcluster02-node01: command in progress
pdsh@RNMMBP: rnmcluster02-node02: command in progress
^Zpdsh@RNMMBP: Canceled 2 pending threads.
rnmcluster02-node01: Mon Dec  1 21:46:35 GMT 2014
rnmcluster02-node02: Mon Dec  1 21:46:35 GMT 2014

NB the above example illustrates the use of the -f argument to limit how many threads are run against remote hosts at once. We can see the command is left running on the first two nodes and returns the date, whilst the Ctrl-C – Ctrl-Z stops it from being executed on the remaining nodes.

PDSH_SSH_ARGS_APPEND

By default, when you ssh to new host for the first time you’ll be prompted to validate the remote host’s SSH key fingerprint.

The authenticity of host 'rnmcluster02-node02 (172.28.128.9)' can't be established.
RSA key fingerprint is 00:c0:75:a8:bc:30:cb:8e:b3:8e:e4:29:42:6a:27:1c.
Are you sure you want to continue connecting (yes/no)?

This is one of those prompts that the majority of us just hit enter at and ignore; if that includes you then you will want to make sure that your PDSH call doesn’t fall in a heap because you’re connecting to a bunch of new servers all at once. PDSH is not an interactive tool, so if it requires input from the hosts it’s connecting to it’ll just fail. To avoid this SSH prompt, you can set up the environment variable PDSH_SSH_ARGS_APPEND as follows:

export PDSH_SSH_ARGS_APPEND="-q -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"

The -q makes failures less verbose, and the -o passes in a couple of options, StrictHostKeyChecking to disable the above check, and UserKnownHostsFile to stop SSH keeping a list of host IP/hostnames and corresponding SSH fingerprints (by pointing it at /dev/null). You’ll want this if you’re working with VMs that are sharing a pool of IPs and get re-used, otherwise you get this scary failure:

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@    WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!     @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY!
Someone could be eavesdropping on you right now (man-in-the-middle attack)!
It is also possible that a host key has just been changed.
The fingerprint for the RSA key sent by the remote host is
00:c0:75:a8:bc:30:cb:8e:b3:8e:e4:29:42:6a:27:1c.
Please contact your system administrator.

For both of these above options, make sure you’re aware of the security implications that you’re opening yourself up to. For a sandbox environment I just ignore them; for anything where security is of importance make sure you are aware of quite which server you are connecting to by SSH, and protecting yourself from MitM attacks.

PDSH Reference

You can find out more about PDSH at https://code.google.com/p/pdsh/wiki/UsingPDSH

Summary

When working with multiple Linux machines I would first and foremost make sure SSH keys are set up in order to ease management through password-less logins.

After SSH keys, I would recommend pdsh for parallel execution of the same SSH command across the cluster. It’s a big time saver particularly when initially setting up the cluster given the installation and configuration changes that are inevitably needed.

In the next article of this series we’ll see how the tool colmux is a powerful way to monitor OS metrics across a cluster.

So now your turn – what particular tools or tips do you have for working with a cluster of Linux machines? Leave your answers in the comments below, or tweet them to me at @rmoff.

Linux cluster sysadmin — SSH keys

In this short series of blog posts I’m going to take a look at a few very useful tools that can make your life as the sysadmin of a cluster of Linux machines easier. This may be a Hadoop cluster, or just a plain simple set of ‘normal’ machines on which you want to run the same commands and monitoring.

To start with, we’re going to use the ever-awesome ssh keys to manage security on the cluster. After that we’ll look at executing the same command across multiple machines at the same time using PDSH, and then monitoring OS metrics across a cluster with colmux.

In a nutshell, ssh keys enable us to do password-less authentication in a secure way. You can find a detailed explanation of them in a previous post that I wrote, tips and tricks for OBIEE Linux sysadmin. Beyond the obvious time-saving function of not having to enter a password each time we connect to a machine, having SSH keys in place enable the use of the tools we discuss later, pdsh and colmux.

Working with SSH keys involves taking the public key from a pair, and adding that to another machine in order to allow the owner of the pair’s private key to access that machine. What we’re going to do here is generate a unique key pair that will be used as the identity across the cluster. So each node will have a copy of the private key, in order to be able to authenticate to any other node, which will be holding a copy of the public key (as well as, in turn, the same private key).

In this example I’m going to use my own client machine to connect to the cluster. You could easily use any of the cluster nodes too if a local machine would not be appropriate.
As a side-note, this is another reason why I love the fact that Rittman Mead standard-issue laptop is a MacBook, and just under the covers of Mac OS is a *nix-based command-line meaning that a lot of sysadmin work can be done natively without needing additional tools that you would on Windows (e.g. PuTTY, WinSCP, Pageant, etc etc).

SSH key strategy

We’ve several ways we could implement the SSH keys. Because it’s a purely sandbox cluster, I could use the same SSH key pair that I generate for the cluster on my machine too, so the same public/private key pair is distributed thus:

If we wanted a bit more security, a better approach might be to distribute my personal SSH key’s public key across the cluster too, and leave the cluster’s private key to truly identify cluster nodes alone. An additional benefit of this approach is that is the client does not need to hold a copy of the cluster’s SSH private key, instead just continuing to use their own.

For completeness, the extreme version of the key strategy would be for each machine to have its own ssh key pair (i.e. its own security identity), with the corresponding public keys distributed to the other nodes in the cluster:

But anyway, here we’re using the second option – a unique keypair used across the cluster and the client’s public ssh key distributed across the cluster too.

Generating the SSH key pair

First, we need to generate the key. I’m going to create a folder to hold it first, because in a moment we’re going to push it and a couple of other files out to all the servers in the cluster and it’s easiest to do this from a single folder.

mkdir /tmp/rnmcluster02-ssh-keys

Note that in the ssh-keygen command below I’m specifying the target path for the key with the -f argument; if you don’t then watch out that you don’t accidentally overwrite your own key pair in the default path of ~/.ssh.

The -q -N "" flags instruct the key generation to use no passphrase for the key and to not prompt for it either. This is the lowest friction approach (you don’t need to unlock the ssh key with a passphrase before use) but also the least secure. If you’re setting up access to a machine where security matters then bear in mind that without a passphrase on an ssh key anyone who obtains it can therefore access any machine to which the key has been granted access (i.e. on which its public key has been deployed).

ssh-keygen -f /tmp/rnmcluster02-ssh-keys/id_rsa -q -N ""

This generates in the tmp folder two files – the private and public (.pub) keys of the pair:

robin@RNMMBP ~ $ ls -l /tmp/rnmcluster02-ssh-keys
total 16
-rw-------  1 robin  wheel  1675 30 Nov 17:28 id_rsa
-rw-r--r--  1 robin  wheel   400 30 Nov 17:28 id_rsa.pub

Preparing the authorized_keys file

Now we’ll prepare the authorized_keys file which is where the public SSH key of any identity permitted to access the machine is stored. Note that each user on a machine has their own authorized_keys file, in ~/.ssh/. So for example, the root user has the file in /root/.ssh/authorized_keys and any public key listed in that file will be able to connect to the server as the root user. Be aware the American [mis-]spelling of “authorized” – spell it [correctly] as “authorised” and you’ll not get any obvious errors, but the ssh key login won’t work either.

So we’re going to copy the public key of the unique pair that we just created for the cluster into the authorized_keys file. In addition we will copy in our own personal ssh key (and any other public key that we want to give access to all the nodes in the cluster):

cp /tmp/rnmcluster02-ssh-keys/id_rsa.pub /tmp/rnmcluster02-ssh-keys/authorized_keys
# [optional] Now add any other keys (such as your own) into the authorized_keys file just created
cat ~/.ssh/id_rsa.pub >> /tmp/rnmcluster02-ssh-keys/authorized_keys
# NB make sure the previous step is a double >> not > since the double appends to the file, a single overwrites.

Distributing the SSH artefacts

Now we’re going to push this set of SSH files out to the .ssh folder of the target user on each node, which in this case is the root user. From a security point of view it’s probably better to use a non-root user for login and then sudo as required, but we’re keeping things simple (and less secure) to start with here. So the files in our folder are:

  • id_rsa – the private key of the key pair
  • id_rsa.pub – the public key of the key pair. Strictly speaking this doesn’t need distributing to all nodes, but it’s conventional and handy to hold it alongside the private key.
  • authorized_keys – this is the file that the sshd daemon on each node will look at to validate an incoming login request’s offered private key, and so needs to hold the public key of anyone who is allowed to access the machine as this user.

To copy the files we’ll use scp, but how you get them in place doesn’t really matter so much, so long as they get to the right place:

scp -r /tmp/rnmcluster02-ssh-keys root@rnmcluster02-node01:~/.ssh

At this point you’ll need to enter the password for the target user, but rejoice! This is the last time you’ll need to enter it as subsequent logins will be authenticated using the ssh keys that you’re now configuring.

Run the scp for all nodes in the cluster. If you’ve four nodes in the cluster your output should look something like this:

$ scp -r /tmp/rnmcluster02-ssh-keys/ root@rnmcluster02-node01:~/.ssh
root@rnmcluster02-node01's password:
authorized_keys                                                  100%  781     0.8KB/s   00:00
id_rsa                                                           100% 1675     1.6KB/s   00:00
id_rsa.pub                                                       100%  400     0.4KB/s   00:00
$ scp -r /tmp/rnmcluster02-ssh-keys/ root@rnmcluster02-node02:~/.ssh
Warning: Permanently added the RSA host key for IP address '172.28.128.7' to the list of known hosts.
root@rnmcluster02-node02's password:
authorized_keys                                                  100%  781     0.8KB/s   00:00
id_rsa                                                           100% 1675     1.6KB/s   00:00
id_rsa.pub                                                       100%  400     0.4KB/s   00:00
$ scp -r /tmp/rnmcluster02-ssh-keys/ root@rnmcluster02-node03:~/.ssh
root@rnmcluster02-node03's password:
authorized_keys                                                  100%  781     0.8KB/s   00:00
id_rsa                                                           100% 1675     1.6KB/s   00:00
id_rsa.pub                                                       100%  400     0.4KB/s   00:00
$ scp -r /tmp/rnmcluster02-ssh-keys/ root@rnmcluster02-node04:~/.ssh
root@rnmcluster02-node04's password:
authorized_keys                                                  100%  781     0.8KB/s   00:00
id_rsa                                                           100% 1675     1.6KB/s   00:00
id_rsa.pub                                                       100%  400     0.4KB/s   00:00

Testing login authenticated through SSH keys

The moment of truth. From your client machine, try to ssh to each of the cluster nodes. If you are prompted for a password, then something is not right – see the troubleshooting section below.

If you put your own public key in authorized_keys when you created it then you don’t need to specify which key to use when connecting because it’ll use your own private key by default:

robin@RNMMBP ~ $ ssh root@rnmcluster02-node01
Last login: Fri Nov 28 17:13:23 2014 from 172.28.128.1



[root@localhost ~]#

There we go – logged in automagically with no password prompt. If we’re using the cluster’s private key (rather than our own) you need to specify it with -i when you connect.

robin@RNMMBP ~ $ ssh -i /tmp/rnmcluster02-ssh-keys/id_rsa root@rnmcluster02-node01
Last login: Fri Nov 28 17:13:23 2014 from 172.28.128.1



[root@localhost ~]#

Troubleshooting SSH key connections

SSH keys are one of the best things in a sysadmin’s toolkit, but when they don’t work can be a bit tricky to sort out. The first thing to check is that on the target machine the authorized_keys file that does all the magic (by listing the ssh keys that are permitted to connect inbound on a host to the given user) is in place:

[root@localhost .ssh]# ls -l ~/.ssh/authorized_keys
-rw-r--r-- 1 root root 775 Nov 30 18:55 /root/.ssh/authorized_keys

If you get this:

[root@localhost .ssh]# ls -l ~/.ssh/authorized_keys
ls: cannot access /root/.ssh/authorized_keys: No such file or directory

then you have a problem.

One possible issue in this specific instance could be that the above pre-canned scp assumes that the user’s .ssh folder doesn’t already exist (since it doesn’t, on brand new servers) and so specifies it as the target name for the whole rnmcluster02-ssh-keys folder. However if it does already exist then it ends up copying the rnmcluster02-ssh-keys folder into the .ssh folder:

[root@localhost .ssh]# ls -lR
.:
total 12
-rw------- 1 root root 1675 Nov 22  2013 id_rsa
-rw-r--r-- 1 root root  394 Nov 22  2013 id_rsa.pub
drwxr-xr-x 2 root root 4096 Nov 30 18:49 rnmcluster02-ssh-keys

./rnmcluster02-ssh-keys:
total 12
-rw-r--r-- 1 root root  775 Nov 30 18:49 authorized_keys
-rw------- 1 root root 1675 Nov 30 18:49 id_rsa
-rw-r--r-- 1 root root  394 Nov 30 18:49 id_rsa.pub
[root@localhost .ssh]#

To fix this simply move the authorized_keys from rnmcluster02-ssh-keys back into .ssh:

[root@localhost .ssh]# mv ~/.ssh/rnmcluster02-ssh-keys/authorized_keys ~/.ssh/

Other frequent causes of problems are file/folder permissions that are too lax on the target user’s .ssh folder (which can be fixed with chmod -R 700 ~/.ssh) or the connecting user’s ssh private key (fix: chmod 600 id_rsa). The latter will show on connection attempts very clearly:

robin@RNMMBP ~ $ ssh -i /tmp/rnmcluster02-ssh-keys/id_rsa root@rnmcluster02-node01
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@         WARNING: UNPROTECTED PRIVATE KEY FILE!          @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
Permissions 0777 for '/tmp/rnmcluster02-ssh-keys/id_rsa' are too open.
It is required that your private key files are NOT accessible by others.
This private key will be ignored.
bad permissions: ignore key: /tmp/rnmcluster02-ssh-keys/id_rsa

Another one that has bitten me twice over time – and that eludes the troubleshooting I’ll demonstrate in a moment – is that SELinux gets stroppy about root access using ssh keys. I always just take this as a handy reminder to disable selinux (in /etc/selinux/config, set SELINUX=disabled), having never had cause to leave it enabled. But, if you do need it enabled you’ll need to hit the interwebs to check the exact cause/solution for this problem.

So to troubleshoot ssh key problems in general do two things. Firstly from the client side, specify verbosity (-v for a bit of verbosity, -vvv for most)

ssh -v -i /tmp/rnmcluster02-ssh-keys/id_rsa root@rnmcluster02-node01

You should observe ssh trying to use the private key, and if the server rejects it it’ll fall back to any other ssh private keys it can find, and then password authentication:

[...]
debug1: Offering RSA public key: /tmp/rnmcluster02-ssh-keys/id_rsa
debug1: Authentications that can continue: publickey,gssapi-keyex,gssapi-with-mic,password
debug1: Next authentication method: password

Quite often the problem will be on the server side, so assuming that you can still connect to the server (eg through the physical console, or using password authentication) then go and check /var/log/secure where you’ll see all logs relating to attempted connections. Here’s the log file corresponding to the above client log, where ssh key authentication is attempted but fails, and then password authentication is used to successfully connect:

Nov 30 18:15:05 localhost sshd[13156]: Authentication refused: bad ownership or modes for file /root/.ssh/authorized_keys
Nov 30 18:15:15 localhost sshd[13156]: Accepted password for root from 172.28.128.1 port 59305 ssh2
Nov 30 18:15:15 localhost sshd[13156]: pam_unix(sshd:session): session opened for user root by (uid=0)

Now we can see clearly what the problem is – “bad ownership or modes for file /root/.ssh/authorized_keys”.

The last roll of the troubleshooting dice is to get sshd (the ssh daemon that runs on the host we’re trying to connect to) to issue more verbose logs. You can either set LogLevel DEBUG1 (or DEBUG2, or DEBUG3) in /etc/ssh/sshd_config and restart the ssh daemon (service sshd restart), or you can actually run a (second) ssh daemon from the host with specific logging. This would be appropriate on a multi-user server where you can’t just go changing sshd configuration. To run a second instance of sshd you’d use:

/usr/sbin/sshd -D -d -p 2222

You have to run sshd from an absolute path (you’ll get told this if you try not to). The -D flag stops it running as a daemon and instead runs interactively, so we can see easily all the output from it. -d specifies the debug logging (-dd or -ddd for greater levels of verbosity), and -p 2222 tells sshd to listen on port 2222. Since we’re doing this on top of the existing sshd, we obviously can’t use the default ssh port (22) so pick another port that is available (and not blocked by a firewall).

Now on the client retry the connection, but pointing to the port of the interactive sshd instance:

ssh -v -p 2222 -i /tmp/rnmcluster02-ssh-keys/id_rsa root@rnmcluster02-node01

When you run the command on the client you should get both the client and host machine debug output go crackers for a second, giving you plenty of diagnostics to pore through and analyse the ssh handshake etc to get to the root of the issue.

Hopefully you’ve now sorted your SSH keys, because in the next article we’re going to see how we can use them to run commands against multiple servers at once using pdsh.

Summary

When working with multiple Linux machines I would first and foremost make sure SSH keys are set up in order to ease management through password-less logins.

We’ll see in the next couple of articles some other tools that are useful when working on a cluster:

  • pdsh
  • colmux

I’m interested in what you think – what particular tools or tips do you have for working with a cluster of Linux machines? Leave your answers in the comments below, or tweet them to me at @rmoff.

Going Beyond MapReduce for Hadoop ETL Pt.3 : Introducing Apache Spark

In the first two posts in this three part series on going beyond MapReduce for Hadoop ETL, we looked at why MapReduce and Hadoop 1.0 was only really suitable for batch processing, and how the new Apache Tez framework enabled by Apache YARN on the Hadoop 2.0 platform can be swapped-in for MapReduce to improve the performance of existing Pig and Hive scripts. Today though in the final post I want to take a look at Apache Spark, the next-gen compute framework that Cloudera are backing as the long-term successor to MapReduce.

Like Tez, Apache Spark supports DAGs that describe the entire dataflow process, not just individual map and reduce jobs. Like Pig, it has a concept of datasets (Pig’s aliases and relations), but crucially these datasets (RDDs, or “resilient distributed datasets”) can be cached in-memory, fail-back gracefully to disk and can be rebuilt using a graph that says how to reconstruct. With Tez, individual jobs in the DAG can now hand-off their output to the next job in-memory rather than having to stage in HDFS, but Spark uses memory for the actual datasets and is a much better choice for the types of iterative, machine-learning tasks that you tend to do on Hadoop systems. Moreover, Spark has arguably a richer API and when used with Scala, a functional programming-orientated language that uses Java libraries and whose collections framework maps well on to the types of operations you’d want to make use of with dataflow-type applications on a cluster.

Spark can run standalone, on YARN or on other cluster management platforms, and comes with a handy command-line interpreter that you can use to interactively load, filter, analyse and work with RDDs. Cloudera CDH5.2 comes with Spark 1.0.1 and can either be configured standalone or to run on YARN, with Spark as a service added to nodes in the cluster using Cloudera Manager. 

NewImage

So looking back at the Pig example, we create the dataflow using a number of aliases in that case, that we progressively filter, transform, join together and then aggregate to get to the final top ten set of pages from the website logs. Translating that dataflow to Spark we end up with a similar set of RDDs that take our initial set of logs, apply transformations and join the datasets to store the final aggregated output back on HDFS.

NewImage

Spark supports in-memory sharing of data within a single DAG (i.e. RDD to RDD), but also between DAGs running in the same Spark instance. As such, Spark becomes a great framework for doing iterative and cyclic data analysis, and can make much better use of the memory on cluster servers whilst still using disk for overflow data and persistence.

Moreover, Spark powers a number of higher-level tools build on the core Spark engine to provide features like real-time loading and analysis (Spark Streaming), SQL access and integration with Hive (Spark SQL), machine learning (MLib) and so forth. In fact, as well as Hive and Pig being reworked to run on Tez there’s also projects underway to port them both to Spark, though to be honest they’re both at early stages compared to Tez integration and most probably you’ll be using Scala, Java or Python to work with Spark now.

NewImage

So taking the Pig script we had earlier and translating that to the same dataflow in Spark and Scala, we end up with something like this:

package com.cloudera.analyzeblog
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
case class accessLogRow(host: String, identity: String, user: String, time: String, request: String, status: String, size: String, referer: String, agent: String)
case class pageRow(host: String, request_page: String, status: String, agent: String)
case class postRow(post_id: String, title: String, post_date: String, post_type: String, author: String, url: String, generated_url: String)
object analyzeBlog {
        def getRequestUrl(s: String): String = {
        try {
                s.split(' ')(1)
        } catch {
                case e: ArrayIndexOutOfBoundsException => { "N/A" }
        }
}
        def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("analyzeBlog"))
val sqlContext = new SQLContext(sc)
import sqlContext._
val raw_logs = "/user/mrittman/rm_logs"
//val rowRegex = """^([0-9.]+)\s([\w.-]+)\s([\w.-]+)\s(\[[^\[\]]+\])\s"((?:[^"]|\")+)"\s(\d{3})\s(\d+|-)\s"((?:[^"]|\")+)"\s"((?:[^"]|\")+)"$""".r
val rowRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r

val logs_base = sc.textFile(raw_logs) flatMap {
                        case rowRegex(host, identity, user, time, request, status, size, referer, agent) =>
                                Seq(accessLogRow(host, identity, user, time, request, status, size, referer, agent))
                        case _ => Nil
                                }
val logs_base_nobots = logs_base.filter( r => ! r.request.matches(".*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*"))

val logs_base_page = logs_base_nobots.map { r =>
  val request = getRequestUrl(r.request)
  val request_formatted = if (request.charAt(request.length-1).toString == "/") request else request.concat("/")
  (r.host, request_formatted, r.status, r.agent)
}

val logs_base_page_schemaRDD = logs_base_page.map(p => pageRow(p._1, p._2, p._3, p._4))

logs_base_page_schemaRDD.registerAsTable("logs_base_page")

val page_count = sql("SELECT request_page, count(*) as hits FROM logs_base_page GROUP BY request_page").registerAsTable("page_count")

val postsLocation = "/user/mrittman/posts.psv"

val posts = sc.textFile(postsLocation).map{ line =>
        val cols=line.split('|')

        postRow(cols(0),cols(1),cols(2),cols(3),cols(4),cols(5),cols(6).concat("/"))
}

posts.registerAsTable("posts")

val pages_and_posts_details = sql("SELECT p.request_page, p.hits, ps.title, ps.author FROM page_count p JOIN posts ps ON p.request_page = ps.generated_url ORDER BY hits DESC LIMIT 10")

pages_and_posts_details.saveAsTextFile("/user/mrittman/top_10_pages_and_author4")

        }
}

I’ll do a code-walkthrough for this Spark application in a future post, but for now note the map and flatMap Scala collection functions used to transform RDDs, and the sql(“…”) function that allows us to register RDDs as tables and then manipulate the contents using SQL, including joining to other RDDs registered as tables. For now though, let’s run the application on the CDH5.2 using YARN and see how long it takes to process the same set of log files (remember, the Pig script on this CDH5.2 cluster took around 5 minutes to run, and the Pig on Tez version on the Hortonworks cluster was around 2.5 minutes:

[mrittman@bdanode1 analyzeBlog]$ spark-submit --class com.cloudera.analyzeblog.analyzeBlog --master yarn target/analyzeblog-0.0.1-SNAPSHOT.jar 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/06 19:18:25 INFO SecurityManager: Changing view acls to: mrittman
14/12/06 19:18:25 INFO SecurityManager: Changing modify acls to: mrittman
...
14/12/06 19:19:41 INFO DAGScheduler: Stage 0 (takeOrdered at basicOperators.scala:171) finished in 3.585 s
14/12/06 19:19:41 INFO SparkContext: Job finished: takeOrdered at basicOperators.scala:171, took 53.591560036 s
14/12/06 19:19:41 INFO SparkContext: Starting job: saveAsTextFile at analyzeBlog.scala:56
14/12/06 19:19:41 INFO DAGScheduler: Got job 1 (saveAsTextFile at analyzeBlog.scala:56) with 1 output partitions (allowLocal=false)
14/12/06 19:19:41 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at analyzeBlog.scala:56)
14/12/06 19:19:41 INFO DAGScheduler: Parents of final stage: List()
14/12/06 19:19:41 INFO DAGScheduler: Missing parents: List()
14/12/06 19:19:41 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[15] at saveAsTextFile at analyzeBlog.scala:56), which has no missing parents
14/12/06 19:19:42 INFO MemoryStore: ensureFreeSpace(64080) called with curMem=407084, maxMem=278302556
14/12/06 19:19:42 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 62.6 KB, free 265.0 MB)
14/12/06 19:19:42 INFO MemoryStore: ensureFreeSpace(22386) called with curMem=471164, maxMem=278302556
14/12/06 19:19:42 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 21.9 KB, free 264.9 MB)
14/12/06 19:19:42 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on bdanode1.rittmandev.com:44486 (size: 21.9 KB, free: 265.3 MB)
14/12/06 19:19:42 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0
14/12/06 19:19:42 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (MappedRDD[15] at saveAsTextFile at analyzeBlog.scala:56)
14/12/06 19:19:42 INFO YarnClientClusterScheduler: Adding task set 3.0 with 1 tasks
14/12/06 19:19:42 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 215, bdanode5.rittmandev.com, PROCESS_LOCAL, 3331 bytes)
14/12/06 19:19:42 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on bdanode5.rittmandev.com:13962 (size: 21.9 KB, free: 530.2 MB)
14/12/06 19:19:42 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 215) in 311 ms on bdanode5.rittmandev.com (1/1)
14/12/06 19:19:42 INFO YarnClientClusterScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/12/06 19:19:42 INFO DAGScheduler: Stage 3 (saveAsTextFile at analyzeBlog.scala:56) finished in 0.312 s
14/12/06 19:19:42 INFO SparkContext: Job finished: saveAsTextFile at analyzeBlog.scala:56, took 0.373096676 s

It ran in just over a minute in the end, and most of that was around submitting the job to YARN – not bad. We’ll be covering more of Spark on the blog over the next few weeks including streaming and machine learning examples, and connecting it to ODI and OBIEE via Hive on Spark, and Spark SQL’s own Hive-compatible Thrift server. I’ll also be taking a look at Pig on Spark (or “Spork”…) to see how well that works, and most interestingly how Pig and Hive on Spark compares to running them on Tez – watch this space as they say.