Few months back,i was scratching my head and i was Google-ing all day for publishing and consuming Blob data(like pdf,word,excel) through message queue. Couldn't find any solution other than some tips and special url for blob messages. So i'm writing this tutorial for publish and consume Blob & Stream messages through Message Queue.I will be using ActiveMQ for this tutorial.
Note: ActiveMQ support BlobMessage .Its specific to ActiveMQ, JMS don't have any BlobMessage wrapper,it support StreamMessage.
Producer:
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class BlobMessageProducer {
private Connection connection = null;
private ActiveMQSession session = null;
private Destination destination = null;
private MessageProducer producer = null;
private File file;
private void init(String fileName) throws Exception {
file = new File(fileName);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/");
connection = connectionFactory.createConnection();
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}
public void sendFile(String fileName) {
try {
System.out.println("Send File Start >>");
init(fileName);
BlobMessage blobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME", file.getName());
blobMessage.setLongProperty("FILE.SIZE", file.length());
producer.send(blobMessage);
System.out.println("Send File End>>");
} catch (Exception e) {
} finally {
close();
}
}
private void close() {
try {
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (JMSException e) {
}
System.exit(0);
}
public static void main(String argv[]) {
String fileName = "/home/kuntal/practice/config-data/test.pdf";
new BlobMessageProducer().sendFile(fileName);
}
}
Consumer:
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;
public class BlobMessageConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(BlobMessageConsumer.class);
private BufferedOutputStream bos;
private void init() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
consumer = session.createConsumer(destination);
}
public void receiveFile(String targetFilePath) {
try {
init();
while (true) {
Message message = consumer.receive(5000);
if (message == null) {
break;
}
if (message instanceof BlobMessage) {
byte[] buffer = new byte[2048];
int length = 0;
BlobMessage blobMessage = (BlobMessage) message;
String fileName = blobMessage
.getStringProperty("FILE.NAME");
File file = new File(targetFilePath + File.separator
+ fileName);
OutputStream os = new FileOutputStream(file);
bos = new BufferedOutputStream(os);
InputStream inputStream = blobMessage.getInputStream();
while ((length = inputStream.read(buffer)) > 0) {
bos.write(buffer, 0, length);
}
}
}
} catch (Exception e) {
} finally {
close();
}
}
private void close() {
try {
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
} catch (JMSException e) {
}
System.exit(0);
}
public static void main(String[] args) {
String targetFileFolder = "/home/kuntal/practice/config-data/output";
new BlobMessageConsumer().receiveFile(targetFileFolder);
}
}
For Streaming Data , JMS provides StreamMessage,which is very handy to push and pull streaming data like (log,text file) in the Message Queue.So this is how to publish and consume streaming data into ActiveMQ.
Producer:
public class StreamProducer {
private Connection connection;
private Session session;
private Destination destination;
private MessageProducer producer;
private InputStream in;
private static Logger logger = Logger.getLogger(StreamProducer.class);
public void sendFile(String fileName) {
logger.info("--sendFile start--");
try {
init(fileName);
byte[] buffer = new byte[1024];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
logger.info("send: " + c);
}
logger.info("--sendFile end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}
private void init(String fileName) throws Exception {
File file = new File(fileName);
in = new FileInputStream(file);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
producer = session.createProducer(destination);
// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
}
private void close() {
logger.info("--producer close start--");
try {
if (in != null) {
in.close();
}
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (IOException e) {
logger.error("--close InputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}
public static void main(String argv[]) {
ClassLoader loader = StreamProducer.class.getClassLoader();
//URL url = loader.getResource("test.txt");
String fileName = "/home/kuntal/practice/config-data/test.txt";
new StreamProducer().sendFile(fileName);
}
}
Note: ActiveMQ support BlobMessage .Its specific to ActiveMQ, JMS don't have any BlobMessage wrapper,it support StreamMessage.
Producer:
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
public class BlobMessageProducer {
private Connection connection = null;
private ActiveMQSession session = null;
private Destination destination = null;
private MessageProducer producer = null;
private File file;
private void init(String fileName) throws Exception {
file = new File(fileName);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/");
connection = connectionFactory.createConnection();
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}
public void sendFile(String fileName) {
try {
System.out.println("Send File Start >>");
init(fileName);
BlobMessage blobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME", file.getName());
blobMessage.setLongProperty("FILE.SIZE", file.length());
producer.send(blobMessage);
System.out.println("Send File End>>");
} catch (Exception e) {
} finally {
close();
}
}
private void close() {
try {
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (JMSException e) {
}
System.exit(0);
}
public static void main(String argv[]) {
String fileName = "/home/kuntal/practice/config-data/test.pdf";
new BlobMessageProducer().sendFile(fileName);
}
}
Consumer:
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;
public class BlobMessageConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(BlobMessageConsumer.class);
private BufferedOutputStream bos;
private void init() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
consumer = session.createConsumer(destination);
}
public void receiveFile(String targetFilePath) {
try {
init();
while (true) {
Message message = consumer.receive(5000);
if (message == null) {
break;
}
if (message instanceof BlobMessage) {
byte[] buffer = new byte[2048];
int length = 0;
BlobMessage blobMessage = (BlobMessage) message;
String fileName = blobMessage
.getStringProperty("FILE.NAME");
File file = new File(targetFilePath + File.separator
+ fileName);
OutputStream os = new FileOutputStream(file);
bos = new BufferedOutputStream(os);
InputStream inputStream = blobMessage.getInputStream();
while ((length = inputStream.read(buffer)) > 0) {
bos.write(buffer, 0, length);
}
}
}
} catch (Exception e) {
} finally {
close();
}
}
private void close() {
try {
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
} catch (JMSException e) {
}
System.exit(0);
}
public static void main(String[] args) {
String targetFileFolder = "/home/kuntal/practice/config-data/output";
new BlobMessageConsumer().receiveFile(targetFileFolder);
}
}
For Streaming Data , JMS provides StreamMessage,which is very handy to push and pull streaming data like (log,text file) in the Message Queue.So this is how to publish and consume streaming data into ActiveMQ.
Producer:
public class StreamProducer {
private Connection connection;
private Session session;
private Destination destination;
private MessageProducer producer;
private InputStream in;
private static Logger logger = Logger.getLogger(StreamProducer.class);
public void sendFile(String fileName) {
logger.info("--sendFile start--");
try {
init(fileName);
byte[] buffer = new byte[1024];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
logger.info("send: " + c);
}
logger.info("--sendFile end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}
private void init(String fileName) throws Exception {
File file = new File(fileName);
in = new FileInputStream(file);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
producer = session.createProducer(destination);
// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
}
private void close() {
logger.info("--producer close start--");
try {
if (in != null) {
in.close();
}
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (IOException e) {
logger.error("--close InputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}
public static void main(String argv[]) {
ClassLoader loader = StreamProducer.class.getClassLoader();
//URL url = loader.getResource("test.txt");
String fileName = "/home/kuntal/practice/config-data/test.txt";
new StreamProducer().sendFile(fileName);
}
}
Consumer:
public class StreamConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(StreamConsumer.class);
private BufferedOutputStream bos = null;
private void init(String targetFileName) throws Exception {
logger.info("--init start--");
logger.info("--targetFileName--" + targetFileName);
OutputStream out = new FileOutputStream(targetFileName);
bos = new BufferedOutputStream(out);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
consumer = session.createConsumer(destination);
logger.info("--init end--");
}
public void receiveFile(String targetFileName) {
logger.info("--receive file start--");
try {
init(targetFileName);
byte[] buffer = new byte[2048];
while (true) {
Message msg = consumer.receive(5000);
if (msg == null) {
break;
}
if (msg instanceof StreamMessage) {
StreamMessage smsg = (StreamMessage) msg;
int c = smsg.readBytes(buffer);
String tempStr = new String(buffer, 0, c);
logger.info("Receive str: " + tempStr);
bos.write((tempStr).getBytes());
}
}
logger.info("--receive file end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}
private void close() {
logger.info("--consumer close start--");
try {
if (bos != null) {
bos.close();
}
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}
logger.info("--consumer close end--");
} catch (IOException e) {
logger.error("--close OutputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}
public static void main(String[] args) {
new StreamConsumer().receiveFile("/home/kuntal/practice/config-data/output2.txt");
}
}
Hope this help you and saves your valuable time!!