Saturday, 26 July 2014

Publish Consume Blob and Stream Message with JMS ActiveMQ

Few months back,i was scratching my head and i was Google-ing all day for publishing and consuming Blob data(like pdf,word,excel) through message queue. Couldn't find any solution other than some tips and special url for blob messages. So i'm writing this tutorial for publish and consume Blob & Stream messages through Message Queue.I will be using ActiveMQ for this tutorial.
Note: ActiveMQ support BlobMessage .Its specific to ActiveMQ, JMS don't have any BlobMessage wrapper,it support StreamMessage.

 Producer:

import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class BlobMessageProducer {
private Connection connection = null;
private ActiveMQSession session = null;
private Destination destination = null;
private MessageProducer producer = null;
private File file;

  private void init(String fileName) throws Exception {
file = new File(fileName);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/");
connection = connectionFactory.createConnection();
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}

public void sendFile(String fileName) {
try {
 System.out.println("Send File Start >>");
init(fileName);
BlobMessage blobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME", file.getName());
blobMessage.setLongProperty("FILE.SIZE", file.length());
producer.send(blobMessage);
  System.out.println("Send File End>>");
} catch (Exception e) {

} finally {
close();
}
}

private void close() {

try {
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (JMSException e) {

}
System.exit(0);
}

public static void main(String argv[]) {
String fileName = "/home/kuntal/practice/config-data/test.pdf";
new BlobMessageProducer().sendFile(fileName);
}

}



Consumer:

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;

public class BlobMessageConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(BlobMessageConsumer.class);
private BufferedOutputStream bos;

private void init() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
consumer = session.createConsumer(destination);
}

public void receiveFile(String targetFilePath) {
try {
init();
while (true) {
Message message = consumer.receive(5000);
if (message == null) {
break;
}

if (message instanceof BlobMessage) {
byte[] buffer = new byte[2048];
int length = 0;
BlobMessage blobMessage = (BlobMessage) message;
String fileName = blobMessage
.getStringProperty("FILE.NAME");

File file = new File(targetFilePath + File.separator
+ fileName);
OutputStream os = new FileOutputStream(file);
bos = new BufferedOutputStream(os);

InputStream inputStream = blobMessage.getInputStream();
while ((length = inputStream.read(buffer)) > 0) {
bos.write(buffer, 0, length);
}
}
}
} catch (Exception e) {

} finally {
close();
}
}

private void close() {

try {
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}

} catch (IOException e) {

} catch (JMSException e) {

}
System.exit(0);
}

public static void main(String[] args) {
String targetFileFolder = "/home/kuntal/practice/config-data/output";
new BlobMessageConsumer().receiveFile(targetFileFolder);
}
}

For Streaming Data , JMS provides StreamMessage,which is very handy to push and pull streaming data like (log,text file) in the Message Queue.So this is how to publish and consume streaming data into ActiveMQ.

Producer:
public class StreamProducer {
private Connection connection;
private Session session;
private Destination destination;
private MessageProducer producer;
private InputStream in;

private static Logger logger = Logger.getLogger(StreamProducer.class);

public void sendFile(String fileName) {
logger.info("--sendFile start--");
try {
init(fileName);
byte[] buffer = new byte[1024];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
logger.info("send: " + c);
}
logger.info("--sendFile end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}

private void init(String fileName) throws Exception {
File file = new File(fileName);
in = new FileInputStream(file);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
producer = session.createProducer(destination);
// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
}

private void close() {
logger.info("--producer close start--");
try {
if (in != null) {
in.close();
}
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (IOException e) {
logger.error("--close InputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}

public static void main(String argv[]) {
ClassLoader loader = StreamProducer.class.getClassLoader();
//URL url = loader.getResource("test.txt");
String fileName = "/home/kuntal/practice/config-data/test.txt";
new StreamProducer().sendFile(fileName);
}

}

Consumer:
public class StreamConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(StreamConsumer.class);
private BufferedOutputStream bos = null;

private void init(String targetFileName) throws Exception {
logger.info("--init start--");
logger.info("--targetFileName--" + targetFileName);
OutputStream out = new FileOutputStream(targetFileName);
bos = new BufferedOutputStream(out);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
consumer = session.createConsumer(destination);
logger.info("--init end--");
}

public void receiveFile(String targetFileName) {
logger.info("--receive file start--");
try {
init(targetFileName);
byte[] buffer = new byte[2048];
while (true) {
Message msg = consumer.receive(5000);
if (msg == null) {
break;
}

if (msg instanceof StreamMessage) {
StreamMessage smsg = (StreamMessage) msg;
int c = smsg.readBytes(buffer);

String tempStr = new String(buffer, 0, c);
logger.info("Receive str: " + tempStr);
bos.write((tempStr).getBytes());
}
}
logger.info("--receive file end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}

private void close() {
logger.info("--consumer close start--");
try {
if (bos != null) {
bos.close();
}
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}
logger.info("--consumer close end--");
} catch (IOException e) {
logger.error("--close OutputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}

public static void main(String[] args) {
new StreamConsumer().receiveFile("/home/kuntal/practice/config-data/output2.txt");
}

}

Hope this help you and saves your valuable time!!

Sunday, 20 July 2014

Building Native Hadoop Libraries to Fix VM Stack Guard error on 64 bit machine

Sometime Node manager may not start on 64 bit machine and If you see a message similar to this one if you are running on a 64 bit server using the Apache distribution without modification:
Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /home/hduser/bigdata/hadoop-2.4.1/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
14/02/01 17:02:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable.


Solution:
To get started, you need to have this following set up this setup:
Basic dev tools already installed:  gcc, make
Java 1.6+ installed
Maven 3.1.1
If you start with this setup, you also need to install these components (Ubuntu user can use sudo apt-get install):

  • g++
  • cmake
  • zlib1g-dev
  • protobuf 2.5


First Step: Building the native libraries requires using protobuf 2.5
 You will need to download and build it yourself.  You can get the download from here .  Download version 2.5, which is the latest version as of this post.

To build protobuf, run these commands from the main protobuf directory:

> ./configure
> make
Once the build has finsihed, run this command to execute the unit tests and verify that protobuf was built successfully:

> make check
Look for this in the output:

==================
All 5 tests passed
==================
If you see this, then protobuf was built successfully, and you can move on to building the Hadoop libraries.

Second Step:  Building the Hadoop Libraries
To build the Hadoop libraries, start off with a Hadoop src distribution archive from here.
  Extract the archive, then move into the hadoop-common-project/hadoop-common directory:

$ cd hadoop-common-project/hadoop-common
Before building, you need to define the location of protoc in the protobuf code:

$ export HADOOP_PROTOC_PATH=[path to protobuf]/src/protoc
From this directory, use Maven to build the native code:

$ mvn compile -Pnative
Look for the typical Maven BUILD SUCCESS message to indicate that you have built the libraries properly:

[INFO] --------------------------------------------
[INFO] BUILD SUCCESS
[INFO] --------------------------------------------
Maven will generate the libraries in target/native/target/usr/local/lib .

Final step:  Copying the libraries into Hadoop
Once the libraries are built, all you need to do is copy them to your Hadoop installation.  If you have been following the instructions to set up a cluster on this site, that path is $HADOOP_HOME .  Copy the files as the hduser since that user has permissions to write to the Hadoop installation:

hdfs> cp target/native/target/usr/local/lib/libhadoop.a $HADOOP_HOME/lib/native
hdfs> cp target/native/target/usr/local/lib/libhadoop.so.1.0.0 $HADOOP_HOME/lib/native


Now restart(Stop then start) your Hadoop daemons. Node Manager will up along with other services and running when checked with jps:)


MBR (Master Boot Record) VS GPT (Guid Partition Table)

Recently while doing dual booting on my Latop i came accross this issue.It was all related to MBR and GPT,so if you are a system admin or developer,you should always be familiar with basic partioning scheme tha OS follows and the way they stored partition entry details.And which one suits your requirement.
If you have played with your hard disk and is always doing formatting and partitioning, you will surely come across the term “MBR” and “GPT”. This is especially evident when you are dual-booting your OS and faced with the problem of having to switch from GPT to MBR.
Hard Disk Partitions: You probably know that you can split your hard disk into several partitions. The question is, how does the OS know the partition structure of the hard disk? That information has to come from some where. This is where MBR (Master Boot Record) and GPT (Guid Partition Table) come into play. While both are architecturally different, both play the same role in governing and provide information for the partitions in the hard disk.

Master Boot Record (MBR):
 MBR is the old standard for managing the partition in the hard disk, and it is still being used extensively by many people. The MBR resides at the very beginning of the hard disk and it holds the information on how the logical partitions are organized in the storage device. In addition, the MBR also contains executable code that can scan the partitions for the active OS and load up the boot up code/procedure for the OS.
For a MBR disk, you can only have four primary partitions. To create more partitions, you can set the fourth partition as the extended partition and you will be able to create more sub-partitions (or logical drives) within it. As MBR uses 32-bit to record the partition, each partition can only go up to a maximum of 2TB in size. This is how a typical MBR disk layout looks like:


There are several pitfalls with MBR. First of all, you can only have 4 partitions in the hard disk and each partition is limited to only 2TB in size. This is not going to work well with hard disk of big storage space, say 100TB. Secondly, the MBR is the only place that holds the partition information. If it ever get corrupted (and yes, it can get corrupted very easily), the entire hard disk is unreadable.


GUID Partition Table (GPT): 
GPT is the latest standard for laying out the partitions of a hard disk. It makes use of globally unique identifiers (GUID) to define the partition and it is part of the UEFI standard. This means that on a UEFI-based system (which is required for Windows 8 Secure Boot feature), it is a must to use GPT. With GPT, you can create theoretically unlimited partitions on the hard disk, even though it is generally restricted to 128 partitions by most OSes. Unlike MBR that limits each partition to only 2TB in size, each partition in GPT can hold up to 2^64 blocks in length (as it is using 64-bit), which is equivalent to 9.44ZB for a 512-byte block (1 ZB is 1 billion terabytes). In Microsoft Windows, that size is limited to 256TB.


From the GPT Table Scheme diagram above, you can see that there is a primary GPT at the beginning of the hard disk and a secondary GPT at the end. This is what makes GPT more useful than MBR. GPT stores a backup header and partition table at the end of the disk so it can be recovered if the primary tables are corrupted. It also carry out CRC32 checksums to detect errors and corruption of the header and partition table.
You can also see that there is a protective MBR at the first sector of the hard disk. Such hybrid setup is to allow a BIOS-based system to boot from a GPT disk using a boot loader stored in the protective MBR’s code area. In addition, it protects the GPT disk from damage by GPT-unaware disk utilties.



Saturday, 12 July 2014

Real-Time Big Data Architecture Patterns

  1. App server writes to  Shared in-memory cluster with a TTL and to queue dispatcher.
  2. Queue Dispatcher asynchronously writes to persistent bigdata database (With out TTL)
  3. Reads are first look in memory cache database. If it is missed, then look in persistent bigdata database.


BIGDATA ARCHITECTURE PATTERN2 – DISTRIBUTED MERGE & UPDATE

  1. Read-Modify-Write step in a single RPC
  2. Saves data flow over network.

 

BIGDATA ARCHITECTURE PATTERN3 – DISTRIBUTED MERGE & UPDATE

  1. Good for write once read many workload.
  2. Embedded database (with TTL) for local in process cache.
  3. Embedded database used as cache and designed to use Memory and SSD deficiently.
  4. Bigdata Database works for distributed writes, sharding and failover.

Friday, 11 July 2014

Next Generation Cache will be in SSD+DRAM from DRAM+NETWORK

There are four fundamental components leveraged for processing bigdata
  1. CPU
  2. Memory
  3. Disk
  4. Network
Architects/Developers play with these ingredients to balance the operational requirements and the cost per byte parameter. The result - Numerous hybrid architectural models which are majorly determined based on the nature of application in context.
 
One such application pattern in bigdata world is to fetch large volume of data for analysis. To perform interactive business intelligence, the data is to be served from cache layer. An effective cache layer means more cache hit. And that is determined by analysts behavior. Is most recently analyzed dataset requested again? In most cases it happens, as during an analysis, analysts keep playing with a subset of data to discover patterns. In these scenarios, a cache layer serves wonder by serving hot data from memory, saving the overhead of going to disk and looking up the inode table as well as seeking on the cylinders.
 
Last decade memcache served this layer with wonders in a distributed system setup. However stretching this aspect started building stresses on the network layers. And the first attempt at Facebook is to short circuit the packet layer by running Memcache with UDP and the result - A phenomenal increase in throughput and performance. However, the fundamental problem of network saturation still remained un addressed with this solution. With more and more distributed components hammering the network layer, it continues to reach the saturation point.
 

FUTURE BIGDATA CACHE LAYER

As these distributed systems were being evolving,  new memory hardware, SSD, was making it's way to enterprises as well as retail laptops with mass production. The maturity of SSD along with price drop created the component viable for architects to play with.

So the OLD model of DRAM + NETWORK has been challenged with new generation caches on DRAM + SSD.

To me RocksDB is the first step on this direction. Going forward, more and more vendors and products will explore on this area

Hadoop and HBase integration with Ganglia

Hadoop and HBase use GangliaContext class to send the metrics collected by each daemon (such as datanode, tasktracker, jobtracker, HMaster etc) to gmonds.
Once you have setup Ganglia successfully, you may want to edit /etc/hadoop/conf/hadoop-metrics.properties and /etc/hbase/conf/hadoop-metrics.properties to announce Hadoop and HBase-related metric to Ganglia. Since we use CDH 4.0.1 which is compatible with Ganglia releases 3.1.x, we use newly introduced GangliaContext31 (instead olderGangliaContext class) in properties files.

Metrics configuration for slaves

# /etc/hadoop/conf/hadoop-metrics.properties
...
dfs.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
dfs.period=10
dfs.servers=hadoop-slave1.IP.address:8649
...
mapred.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
mapred.period=10
mapred.servers=hadoop-slave1.IP.address:8649
...

Metrics configuration for master

Should be the same as for slaves – just use hadoop-master.IP.address:8649 (instead of hadoop-slave1.IP.address:8649) for example:
# /etc/hbase/conf/hadoop-metrics.properties
...
hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
hbase.period=10
hbase.servers=hadoop-master.IP.address:8649
...
Remember to edit both properties files (/etc/hadoop/conf/hadoop-metrics.properties for Hadoop and /etc/hbase/conf/hadoop-metrics.properties for HBase) on all nodes and then restart Hadoop and HBase clusters. No further configuration is necessary.

Some more details

Actually, I was surprised that Hadoop’s deamons really send data somewhere, instead of just being polled for this data. What does it mean? It means, for example, that every single slave node runs several processes (e.g. gmond, datanode, tasktracker and regionserver) that collect the metrics and send them to gmond running on slave1 node. If we stop gmonds on slave2, slave3 and slave4, but still run Hadoop’s daemons, we will still get metrics related to Hadoop (but do not get metrics about memory, cpu usage as they were to be send by stopped gmonds). Please look at slave2 node in the picture bellow to see (more or less) how it works (tt, dd and rs denotes tasktracker, datanode and regionserver respectively, while slave4 was removed in order to increase readability).

Single points of failure

This configuration works well until nodes starts to fail. And we know that they will! And we know that, unfortunately, our configuration has at least two single points of failure (SPoF):
  • gmond on slave1 (if this node fails, all monitoring statistics about all slave nodes will be unavailable)
  • gmetad and the web frontend on master (if this node fails, the full monitoring system will be unavailable. It means that we not only loose the most important Hadoop node (actually, it should be called SUPER-master since it has so many master daemons installed ;), but we also loose the valuable source of monitoring information that may help us detect the cause of failure by looking at graphs and metrics for this node that were generated just a moment before the failure)

Avoiding Ganglia’s SPoF on slave1 node

Fortunately, you may specify as many udp_send_channels as you like to send metrics redundantly to other gmonds (assuming that these gmonds specify udp_recv_channels to listen to incoming metrics).
In our case, we may select slave2 to be also additional lead node (together with slave1) to collect metrics redundantly (and announce to them to gmetad
  • update gmond.conf on all slave nodes and define additional udp_send_channel section to send metrics to slave2 (port 8649)
  • update gmond.confs on slave2 to define udp_recv_channel (port 8649) to listen to incoming metrics and tcp_accept_channel (port 8649) to announce them (the same settings should be already set in gmond.confs on slave1)
  • update hadoop-metrics.properties file for Hadoop and HBase daemons running on slave nodes to send their metrics to both slave1 and slave2 e.g.:
# /etc/hbase/conf/hadoop-metrics.properties
...
hbase.class=org.apache.hadoop.metrics.ganglia.GangliaContext31
hbase.period=10
hbase.servers=hadoop-slave1.IP.address:8649,hadoop-slave2.IP.address:8649
  • finally update data_source “hadoop-slaves” in gmetad.conf to poll data from two redundant gmonds (if gmetad cannot pull the data from slave1.node.IP.address, it will continue trying slave2.node.IP.address):
data_source "hadoop-slaves" slave1.node.IP.address slave2.node.IP.address
Perhaps the picture bellow is not fortunate (so many arrows), but it intends to say that if slave1 fails, then gmetad will be able to take metrics from gmond on slave2 node (since all slave nodes send metrics redundantly to gmonds running on slave1 and slave2).

Avoiding Ganglia’s SPoF on master node

The main idea here is not to collocate gmetad (and the web frontend) with Hadoop master daemons, so that we will not loose monitoring statistics if the master node fails (or simply become unavailable). One idea is to, for example, move gmetad (and the web frontend) from slave1 to slave3 (or slave4) or simply introduce a redundant gmetad running on slave3 (or slave4). The former idea seems to be quite OK, while the later makes things quite complicated for such a small cluster.
I guess that even better idea is to introduce an additional node (called “edge” node, if possible) that runs gmetad and the web frontend (it may also have base Hadoop and HBase packages installed, but it does not run any Hadoop’s daemons – it acts as a client machine only to launch MapReduce jobs and access HBase). Actually, the “edge” node is commonly used practice to provide the interface between users and the cluster (e.g. it runs Pig and HiveOozie).

Troubleshooting and tips that may help

Since debugging various aspects of the configuration was the longest part of setting up Ganglia, I share some tips here. Note that is does not cover all possible troubleshooting, but it is rather based on problems that we have encountered and finally managed to solve.

Start small

Although the process configuration of Ganglia is not so complex, it is good to start with only two nodes and if it works, grew that to a larger cluster. But before, you install any Ganglia’s daemon…

Try to send “Hello” from node1 to node2

Make sure that you can talk to port 8649 on the given target host using UDP protocol. netcat is a simple tool, that helps you to verify it. Open port 8649 on node1 (called the “lead node” later) for inbound UDP connections, and then send some text to it from node2.
# listen (-l option) for inbound UDP (-u option) connections on port 8649 
# and prints received data
akawa@hadoop-slave1:~$ nc -u -l -p 8649
# create a UDP (-u option) connection to hadoop-slave1:8649 
# and send text from stdin to that node:
akawa@hadoop-slave2:~$ nc -u hadoop-slave1 8649
Hello My Lead Node
# look at slave1's console to see if the text was sucessfully delivered
akawa@hadoop-slave1:~$
Hello My Lead Node
If it does not work, please double check whether your iptables rules (iptables, or ip6tables if you use IPv6) opens port 8649 for both UDP and TCP connections.

Let gmond send some data to another gmond

Install gmond on two nodes and verify if one can send its metrics to another using UDP connection on port 8649. You may use following settings in gmond.conf file for both nodes:
cluster {
  name = "hadoop-slaves"
}
udp_send_channel {
  host = the.lead.node.IP.address
  port = 8649
}
udp_recv_channel {
  port = 8649
}
tcp_accept_channel {}
After running gmonds (sudo /etc/init.d/ganglia-monitor start), you can use lsof to check if the connection was established:
akawa@hadoop-slave1:~$ sudo lsof -i :8649
COMMAND   PID    USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
gmond   48746 ganglia    4u  IPv4 201166172      0t0  UDP *:8649 
gmond   48746 ganglia    5u  IPv4 201166173      0t0  TCP *:8649 (LISTEN)
gmond   48746 ganglia    6u  IPv4 201166175      0t0  UDP hadoop-slave1:35702->hadoop-slave1:8649
akawa@hadoop-slave2:~$ sudo lsof -i :8649
COMMAND   PID    USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
gmond   31025 ganglia    6u  IPv4 383110679      0t0  UDP hadoop-slave2:60789->hadoop-slave1:8649
To see if any data is actually sent to the lead node, use tcpdump to dump network traffic on port 8649:
akawa@hadoop-slave1:~$ sudo tcpdump -i eth-pub udp port 8649
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on eth-pub, link-type EN10MB (Ethernet), capture size 65535 bytes
18:08:02.236625 IP hadoop-slave2.60789 > hadoop-slave1.8649: UDP, length 224
18:08:02.236652 IP hadoop-slave2.60789 > hadoop-slave1.8649: UDP, length 52
18:08:02.236661 IP hadoop-slave2.60789 > hadoop-slave1.8649: UDP, length 236

Use debug option

tcpdump shows that some data is transferred, but it does not tell you what kind of data is sent ;)
Hopefully, running gmond or gmetad in debugging mode gives us more information (since it does not run as a daemon in the debugging mode, so stop it simply using Ctrl+C)
akawa@hadoop-slave1:~$ sudo /etc/init.d/ganglia-monitor stop
akawa@hadoop-slave1:~$ sudo /usr/sbin/gmond -d 2
 
loaded module: core_metrics
loaded module: cpu_module
...
udp_recv_channel mcast_join=NULL mcast_if=NULL port=-1 bind=NULL
tcp_accept_channel bind=NULL port=-1
udp_send_channel mcast_join=NULL mcast_if=NULL host=hadoop-slave1.IP.address port=8649
 
 metric 'cpu_user' being collected now
 metric 'cpu_user' has value_threshold 1.000000
        ...............
 metric 'swap_free' being collected now
 metric 'swap_free' has value_threshold 1024.000000
 metric 'bytes_out' being collected now
 ********** bytes_out:  21741.789062
        ....
Counting device /dev/mapper/lvm0-rootfs (96.66 %)
Counting device /dev/mapper/360a980006467435a6c5a687069326462 (35.31 %)
For all disks: 8064.911 GB total, 5209.690 GB free for users.
 metric 'disk_total' has value_threshold 1.000000
 metric 'disk_free' being collected now
        .....
 sent message 'cpu_num' of length 52 with 0 errors
 sending metadata for metric: cpu_speed
We see that various metrics are collected and sent to host=hadoop-slave1.IP.address port=8649. Unfortunately, it only does not tell whether thy are delivered successfully since they were send over UDP…

Do not mix IPv4 and IPv6

Let’s have a look at a real situation, that we have encountered on our cluster (and which was the root cause of mysterious and annoying Ganglia misconfiguration). First, look at lsof results.
akawa@hadoop-slave1:~$ sudo  lsof -i :8649
COMMAND   PID    USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
gmond   38431 ganglia    4u  IPv4 197424417      0t0  UDP *:8649 
gmond   38431 ganglia    5u  IPv4 197424418      0t0  TCP *:8649 (LISTEN)
gmond   38431 ganglia    6u  IPv4 197424422      0t0  UDP hadoop-slave1:58304->hadoop-slave1:864913:56:33
akawa@ceon.pl: akawa@hadoop-slave2:~$ sudo  lsof -i :8649
COMMAND   PID    USER   FD   TYPE    DEVICE SIZE/OFF NODE NAME
gmond   23552 ganglia    6u  IPv6 382340910      0t0  UDP hadoop-slave2:36999->hadoop-slave1:8649
Here hadoop-slave2 sends metrics to hadoop-slave1 on right port and hadoop-slave1 listens to on right port as well. Everything is almost the same as at the snippets in the previous section, except one important detail – hadoop-slave2 sends over IPv6, but hadoop-slave1 reads over IPv4!
The initial guess was to update ip6tables (apart from iptables) rules to open port 8649 for both UDP and TCP connections over IPv6. But it did not work.
It worked when we changed hostname “hadoop-slave1.vls” to its IP addess in gmond.conf files (yes, earlier I used hostnames instead of IP addresses in every file).
Make sure, that your IP address is correctly resolved to a hostname, or vice versa.

Get cluster summary with gstat

If you managed to send send metrics from slave2 to slave1, it means your cluster is working. In Ganglia’s nomenclature, cluster is a set of hosts that share the same cluster name attribute ingmond.conf file e.g. “hadoop-slaves”. There is a useful provided by Ganglia called gstat that prints the list of hosts that are represented by a gmond running on a given node.
akawa@hadoop-slave1:~$ gstat --all
CLUSTER INFORMATION
       Name: hadoop-slaves
      Hosts: 2
Gexec Hosts: 0
 Dead Hosts: 0
  Localtime: Tue Aug 21 22:46:21 2012
 
CLUSTER HOSTS
Hostname                     LOAD                       CPU              Gexec
 CPUs (Procs/Total) [     1,     5, 15min] [  User,  Nice, System, Idle, Wio]
hadoop-slave2
   48 (    0/  707) [  0.01,  0.07,  0.09] [   0.1,   0.0,   0.1,  99.8,   0.0] OFF
hadoop-slave1
   48 (    0/  731) [  0.01,  0.06,  0.07] [   0.0,   0.0,   0.1,  99.9,   0.0] OFF

Check where gmetad polls metrics from

Run following command on the host that runs gmetad to check what clusters and host is it polling metrics from (you grep it somehow to display only useful lines):
akawa@hadoop-master:~$ nc localhost 8651 | grep hadoop
 
<GRID NAME="Hadoop_And_HBase" AUTHORITY="http://hadoop-master/ganglia/" LOCALTIME="1345642845">
<CLUSTER NAME="hadoop-masters" LOCALTIME="1345642831" OWNER="ICM" LATLONG="unspecified" URL="http://ceon.pl">
<HOST NAME="hadoop-master" IP="hadoop-master.IP.address" REPORTED="1345642831" TN="14" TMAX="20" DMAX="0" LOCATION="unspecified" GMOND_STARTED="1345632023">
<CLUSTER NAME="hadoop-slaves" LOCALTIME="1345642835" OWNER="ICM" LATLONG="unspecified" URL="http://ceon.pl">
<HOST NAME="hadoop-slave4" IP="..." REPORTED="1345642829" TN="16" TMAX="20" DMAX="0" LOCATION="unspecified" GMOND_STARTED="1345478489">
<HOST NAME="hadoop-slave2" IP="..." REPORTED="1345642828" TN="16" TMAX="20" DMAX="0" LOCATION="unspecified" GMOND_STARTED="1345581519">
<HOST NAME="hadoop-slave3" IP="..." REPORTED="1345642829" TN="15" TMAX="20" DMAX="0" LOCATION="unspecified" GMOND_STARTED="1345478489">
<HOST NAME="hadoop-slave1" IP="..." REPORTED="1345642833" TN="11" TMAX="20" DMAX="0" LOCATION="unspecified" GMOND_STARTED="1345572002">

Other issues

Other issues that I saw using Ganglia are as follow: