博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消费者端的Spring JMS 连接ActiveMQ接收生产者Oozie Server发送的Oozie作业执行结果
阅读量:6172 次
发布时间:2019-06-21

本文共 5724 字,大约阅读时间需要 19 分钟。

一,介绍

Oozie是一个Hadoop工作流服务器,接收Client提交的作业(MapReduce作业)请求,并把该作业提交给MapReduce执行。同时,Oozie还可以实现消息通知功能,只要配置好消息服务器,Oozie Server就可以把作业的执行结果发送到消息服务器上,而Client只需要订阅其感兴趣的消息即可。具体的配置参考这篇文章:

由于Spring内置了JMS相关的服务,因此这里记录在Spring中如何配置消费者连接ActiveMQ,从而接收生产者Oozie发送的消息。

 

二,Oozie Server作为生产者的相关配置

这主要在这篇文章 已经提到了。

其中Oozie的配置文件 oozie-default.xml中相关配置如下:

oozie.jms.producer.connection.properties
java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://localhost:61616;connectionFactoryNames#ConnectionFactory
   
       
oozie.service.JMSAccessorService.connectioncontext.impl
       
        org.apache.oozie.jms.DefaultConnectionContext        
       
        Specifies the Connection Context implementation        
   

 

Destination的相关配置如下,这里的Destination是一个Topic,即生产者发送消息的目的地,也是消费者取消息的地方。

oozie.service.JMSTopicService.topic.name
default=${username}
Topic options are ${username} or ${jobId} or a fixed string which can be specified as default or for a particular job type. For e.g To have a fixed string topic for workflows, coordinators and bundles, specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2} where job type can be WORKFLOW, COORDINATOR or BUNDLE. e.g. Following defines topic for workflow job, workflow action, coordinator job, coordinator action, bundle job and bundle action WORKFLOW=workflow, COORDINATOR=coordinator, BUNDLE=bundle For jobs with no defined topic, default topic will be ${username}

 

三,在Spring中配置消费者的连接信息

这里采用JNDI连接ActiveMQ,连接信息配置如下:

org.apache.activemq.jndi.ActiveMQInitialContextFactory
tcp://192.168.121.35:61616
system
manager

 

配置连接工厂:

我是怎么知道连接工厂的value="ConnectionFactory"的呢?由于我大部分采用的是Oozie的默认配置,根据Oozie官网提供的一个示例程序,调试出的Oozie使用的连接工厂的。

 

//获得Oozie中关于JMS的相关配置信息,如Transport Connectors        OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie");        JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();        Properties jndiProperties = jmsInfo.getJNDIProperties();        Context jndiContext = new InitialContext(jndiProperties);

这段代码建立到ActiveMQ的连接上下文,调试上述代码可以看到下面的一些信息:

{java.naming.provider.url=tcp://192.168.121.35:61616, java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory, connectionFactoryNames=ConnectionFactory}

 

配置Topic

Topic就是Destination啊。由于从oozie-default.xml中得到生产者的Topic为 ${username},而我们这里的用户名为cdhfive ,故Topic的配置如上。

 

配置监听器

concurrentConsumers,表示消费者的数量。由于使用的是Pub/Sub模型,每个Consumer都会收到同样的消息。

destination,就是Topic的地址。

messageListener,就是监听器的实现bean,该bean 实现了 javax.jms.MessageListener接口

 

配置Spring 订阅者收到消息后,自动向ActiveMQ返回确认模式:一个有三种:①AUTO_ACKNOWLEDGE;②CLIENT_ACKNOWLEDGE;③DUPS_OK_ACKNOWLEDGE

设置DefaultMessageListenerContainer类的sessionAcknowledgeMode属性来配置确认模式。关于这三种确认模式在何时进行确认呢?

AUTO_ACKNOWLEDGEAutomatic message acknowledgment before listener execution; no redelivery in case of exception thrown.CLIENT_ACKNOWLEDGEAutomatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.DUPS_OK_ACKNOWLEDGELazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown.

可以看出:AUTO_ACKNOWLEDGE是在 onMessage方法调用之前,Spring就已经给ActiveMQ确认消息,并且若在onMessage方法中抛出异常了,消息不会重发。

CLIENT_ACKNOWLEDGE是在onMessage方法成功执行之后,Spring才向ActiveMQ确认消息,若在onMessage方法中抛出异常了,消息不会重发。

DUPS_OK_ACKNOWLEDGE是在onMessage方法成功执行之后,Spring才向ActiveMQ确认消息(会有延迟确认),若在onMessage方法中抛出异常了,消息可能会重发(potential redelivery)。

 

至此,大部分的配置已经完成了。

 

四,实现监听器MessageListener接口,接收消息

当有消息推送给订阅者时,javax.jms.MessageListener接口的onMessage()方法被自动调用,就可以在该方法中处理收到的消息了。

@Override    public void onMessage(Message message) {        String parentJobId = null;        String jobId = null;        String errorMessage = null;        String status = null;        Date startTime = null;        Date endTime = null;        long runTime = -1;//-1 means job run error                try {            // 普通用户作业和解释作业                        if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(                    AppType.WORKFLOW_JOB.name())) {                                WorkflowJobMessage wfJobMessage = JMSMessagingUtils                        .getEventMessage(message);                // 是普通作业                jobId = wfJobMessage.getId();                errorMessage = wfJobMessage.getErrorMessage();                status = wfJobMessage.getStatus().toString();                startTime = wfJobMessage.getStartTime();                endTime = wfJobMessage.getEndTime();                                                if(endTime != null){                    runTime = endTime.getTime() - startTime.getTime();                    System.out.println(jobId + "执行了:" + (endTime.getTime()-startTime.getTime())/1000 + "s");                                    } //other code.....

 

五,参考资料

《JAVA消息服务》电子工业出版社

https://oozie.apache.org/docs/4.0.0/DG_JMSNotifications.html

转载地址:http://rnxba.baihongyu.com/

你可能感兴趣的文章
sorting, two pointers(cf div.3 1113)
查看>>
Scala并发编程【消息机制】
查看>>
win10下安装Oracle 11g 32位客户端遇到INS-13001环境不满足最低要求
查看>>
AngularJS-01.AngularJS,Module,Controller,scope
查看>>
【MySQL 安装过程1】顺利安装MySQL完整过程
查看>>
Inno Setup入门(二十)——Inno Setup类参考(6)
查看>>
图片自适应
查看>>
amd cmd
查看>>
Linux下的uml画图工具
查看>>
xml返回数组数据
查看>>
约瑟夫问题总结
查看>>
spring mybatis 批量插入返回主键
查看>>
指针函数小用
查看>>
开源力量公开课第二十三期-从SVN到Git,次时代代码管理
查看>>
输入挂
查看>>
升级迁移前,存储过程统计各个用户下表的数据量,和迁移后的比对
查看>>
sql注入分类
查看>>
初识CSS选择器版本4
查看>>
[Hadoop in China 2011] 朱会灿:探析腾讯Typhoon云计算平台
查看>>
JavaScript之数组学习
查看>>