JMS发送和接收说明
概述
旧版JMS是在business的jms文件夹里面分别建立send和receive文件夹,用于发送和接收mq的请求。如果项目代码量比较大,这样设置会造成jms文件夹内的代码量增加,并且MQ主题存放于jms属性文件内,而基于属性文件管理无法对命名不规范,重复命名的问题进行管理。因而对jms的开发规范进行了更新,其实最主要原因还是额外增加的代码造成代码可读性减弱以及相关开发的工作量增加。
新版JMS实现取消JMS文件夹的设置,使用公用类发送请求,并且基于注解进行MQ消息监听。MQ主题统一存在于常量类(constant)中。基于主题常量类可以反向追踪调用情况,能够迅速定位到调用主题的类,从而也解决了新版实现因主题调用分散而不易查找的问题。
JMS发送请求说明
填写mq主题
在类:JmsConstant中创建需要发送MQ主题。
类路径:
package io.raycom.components.constant;
字段定义示例:
//mq的消息目的地配置项
public final static String VENDOR_IMP_STATUS_TO_UPS = "vendorimpStatusUps";
- 字段备注一定要填写
- 字段格式说明:
- 字段全部大写,单词之间用下划线("_")分割
- 字段格式要求:(推送)业务主题简写_TO_接收系统
- 字段格式要求:(接收)业务主题简写_FR_推送系统
- 主题内容(一般情况):业务相关的词语+发送系统
- 主题内容(特殊情况:多个发送对同一个接收):业务相关的词语+接收系统
发送主题
使用公用类在业务代码中直接发送,发送代码:
JmsRaycomHelper.sendAsyncMsg(rdata,JmsConstant.VENDOR_PUSH_STATUS_TO_UHQ);
参数说明:
- 第一个参数:消息内容
- 第二个参数:mq的主题
JMS接收请求说明
填写mq主题
该部分与发送的填写主题说明一致,参见: 填写mq主题
注意事项:
接收主题的字段名命名规则是:业务主题简写_FR_推送系统
监听并接收消息
延续之前TASK类处理消息的机制,并做了扩展。代码如下:
包路径:
io.raycom.business.task
构建task类
@Component
@RaycomJms(value=JmsConstant.ACCOUNT_TO_USC,desc="对账单信息接收")
public class AccountTask extends JmsTask{
@Override
public void messageHandle(String msg) {
//业务代码
}
}
说明
- 类需要继承JmsTask类型,并实现方法messageHandle(String msg)
- messageHandle(String msg)的参数msg即为接收到的消息的json字符串,在这个方法内进行业务处理
- 增加注解:@Component
- 增加注解:@RaycomJms(value=JmsConstant.ACCOUNT_TO_USC,desc="对账单信息接收")
@RaycomJms注解说明
- RaycomJms注解需要提供两个参数,
- 第一个参数:MQ的主题
- 第二个参数:主题的描述(mq出错后会记录日志表中,平时没用)
以上就可以接收到mq的消息并按照常规的方法进行处理。
原理说明
后续章节是介绍实现原理,仅供参考。实际开发时按照前面3章内容即可
mq推送相关原理
实际上是复写旧版的sender方法,抽象为公用类,其代码如下:
@Component
public class JmsRaycomHelper{
private static JmsRaycomTemple jmsRaycomTemple;
private static JmsRaycomTemple getJmsRaycomTemple(){
if (jmsRaycomTemple == null){
jmsRaycomTemple = SpringContextHolder.getBean(JmsRaycomTemple.class);
}
return jmsRaycomTemple;
}
@Async
public static void sendAsyncMsg(Object obj,String destinationName){
getJmsRaycomTemple().sendMessage(obj, destinationName);
}
public static void sendMsg(Object obj,String destinationName){
getJmsRaycomTemple().sendMessage(obj, destinationName);
}
}
以前在sender类中内置了主题,调用的时候直接调用方法即可,现在需要在调用的时候指定主题,虽然新版在代码解耦层面耦合性增加,但整体可读性增强。取舍之间,倾向于可读性。
附JmsRaycomTemple发送mq的方法:
//发送jms消息
public void sendMessage(final Object obj,String destinationName){
destinationName = getRealDestinationName(destinationName);
msgLog.insertMQLog(JsonMapper.getInstance().toJson(obj), destinationName, "MQ SEND");
jmsTemplate.send(raycomDestination.getMQQueue(destinationName), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(JsonMapper.toJsonString(obj));
}
});
}
//根据开发模式决定是否在主题内容前增加开发标志,即IP或者设置的机器名
private String getRealDestinationName(String destinationName) {
if(Constant.MQ_MODEL_DEV.equals(properties.mqModel)){
if(StringUtils.isEmpty(properties.machineName)
||"${machine.name}".equals(properties.machineName)) {
InetAddress addr;
try {
addr = InetAddress.getLocalHost();
destinationName = addr.getHostAddress().toString()+destinationName;
} catch (UnknownHostException e) {
e.printStackTrace();
}
}else {
destinationName=properties.machineName+destinationName;
}
}
return destinationName;
}
mq接收消息原理
接收消息部分则使用到了注解,实现过程稍微复杂一些,但基本原理极为简单,即:
在解析消息的TASK类上使用自定义注解(RaycomJms)指明其监听的mq消息,项目启动时会通过Spring的@Import语法触发bean的自动检测,之后Spring容器会基于注解的配置项生成相应的JMS消息监听类,并且将注解所在的task类注入到消息监听类中。
这样监听类一旦接收到MQ推送的消息,就会调用注入其中的TASK类进行消息处理,从而实现的消息的监听和处理。
创建注解
注解基础知识参见:自定义注解详解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(RaycomImportBeanDefinitionRegistrar.class)
public @interface RaycomJms {
String value() default "";
String desc() default "";
}
这里使用到了Spring的@Import注解,并利用了ImportBeanDefinitionRegistrar相关的机制,其实现原理不在此进行描述,可参见:Spring源码窥探之:ImportBeanDefinitionRegistrar
定义BeanDefinitionRegistrar实现类
public class RaycomImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar{
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
//获取注解配置项
Map<String, Object> annotationAttributes =
importingClassMetadata.getAnnotationAttributes(RaycomJms.class.getName());
String className = (String) importingClassMetadata.getClassName();
//使用GenericBeanDefinition创建bean
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(RaycomMessageListenerContainer.class);
beanDefinition.setAutowireCandidate(true);
//设置bean属性
MutablePropertyValues mpv = beanDefinition.getPropertyValues();
mpv.addPropertyValue("destinationName", annotationAttributes.get("value"));
mpv.addPropertyValue("taskClassName", className);
mpv.addPropertyValue("destinationDesc", annotationAttributes.get("desc"));
//注册bean
registry.registerBeanDefinition("jms"
+className.substring(
className.lastIndexOf(".")+1),
beanDefinition);
}
使用GenericBeanDefinition来定义Bean,具体其实现原理自行百度即可。构筑bean的过程比较简单,即获取注解配置的mq主题以及主题描述,并将当前的task类名(全路径)设置到bean中即可。
task类Bean注入到监听类中
上一步我们将task类名提供给了监听类,在监听类创建时,我们根据类名从spring容器中获取其对应的bean,并赋给监听类的task变量中,即:
@PostConstruct
public void init() {
try {
if(StringUtils.isNotEmpty(taskClassName)) {
task = (JmsTask)SpringContextHolder.getBean(Class.forName(taskClassName));
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
基于上述操作,消息监听类在接收到消息后,会调用task的messageHandle方法进行消息处理,而此方法需要各个业务TASK类进行重写,从而将mq消息传入到具体的业务处理类中,相关代码如下:
public void messageHandle(TextMessage message) {
if(task!=null) {
String msg="";
try {
msg=message.getText();
task.messageHandle(msg);
}catch (Exception e) {
msgLog.insertExceptionLog(msg,destinationDesc, destinationName);
throw e;
}
}
}
JmsTask类
作为task类的基类,声明了一个抽象的messageHandle方法,作为对外提供的消息处理API
public abstract class JmsTask {
public abstract void messageHandle(String msg);
}