你用的是IBM MQ还是Apache的? 一般通过JAVA的JMS可以取得.
例如IBM MQ里有个MQQueue 对象
// 获取队列实例
MQQueue queue = qMgr.accessQueue("TEST_QUEUE", openOptions);
//获取当前队列最长消息的长度
queue.getMaximumMessageLength()
//获取当前队列最长深度
{
//前面是准备管理器和队列
MQQueueManager qMgr = new MQQueueManager(qManager);
int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
MQQueue queue = qMgr.accessQueue(qName, openOptions);
MQMessage rcvMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options ◆ MQConstants.MQGMO_WAIT ◆ MQConstants.MQGMO_SYNCPOINT;
//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似.
queue.get(rcvMessage, gmo);
//后面就是操作消息的部分【略】
}catch(Exception e){{
}catch(Exception e){
之前写了一个ActiveMQ发送消息的例子.现在记录一下java接收ActiveMQ消息的代码.都是本人工作中写过的.希望给大家一点帮助.代码如下:
Java代码
package com.syxp.dns.receive;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
publicclass ReceiveMessageFromMQ {
privatestatic String user = "";
privatestatic String password = "";
privatestatic Logger logger = Logger.getLogger(ReceiveMessageFromMQ.class);
publicvoid receiveMessage(){
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
// 创建连接
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination = session.createQueue("integratedalarm.subject");
// 创建消息消费者
MessageConsumer consumer = session;
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
logger.info("接收的消息:"◆"\n"◆text);
} else {
logger.info("接收的消息:"◆"\n"◆message);
}
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
publicstaticvoid main(String[] args) {
ReceiveMessageFromMQ receiveMessageFromMQ = new ReceiveMessageFromMQ();
receiveMessageFromMQ.receiveMessage();
上面有详细的注释,运行了上面的接收的方法之后,会打印出一条相应队列的未接收消息.在ActiveMQ的监视控制页面中,可以看到有一条消息已经被消费.
我用的方法是:
MQQueueManager?qMgr?=?new?MQQueueManager("BVMTEST");
System.out.println("queue?manager?is?connected!");
int?openOptions?=?MQC.MQOO_OUTPUT?|?MQC.MQOO_FAIL_IF_QUIESCING;
/*?打开队列?*/
com.ibm.mq.MQQueue?queue?=?qMgr.accessQueue("test1",?openOptions);
然后在调用queue.getCurrentDepth()的方法的时候居然报了异常:
如果我不在此处调用这个方法,而在后面进行
queue.put(outMsg,?new?MQPutMessageOptions());方法,居然可以成功放入测试信息.
给你一个有用的代码大全:
ActiveMQ持久化消息的二种方式;
①.、持久化为文件
这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了.涉及到的配置和代码有:
persistenceAdapter
kahaDB directory="${activemq.base}/data/kahadb"/
/persistenceAdapter
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);
此时此刻呢修改配置文件
jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/
在配置文件中的broker节点外增加
bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"
property name="driverClassName" value="com.mysql.jdbc.Driver"/
property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/
property name="username" value="activemq"/
property name="password" value="activemq"/
property name="poolPreparedStatements" value="true"/
/bean
从配置中可以看出数据库的名称是activemq,需要手动在MySql中增加这个库.
①.:activemq_acks
以上就是土嘎嘎小编为大家整理的javamq取消息代码相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!