TaskProcessCreateConsumerMessage.java 2.76 KB
package com.xiniunet.mq;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.task.enumeration.BusinessTypeEnum;
import com.xiniunet.task.message.TaskProcessCreateMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.task.request.TaskProcessCreateRequest;
import com.xiniunet.task.response.TaskProcessCreateResponse;
import com.xiniunet.task.service.TaskCenterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;



/**
 * Created on 2015-06-02.
 * @author 常展程
 * @since 2.1.0
 */
@Component
public class TaskProcessCreateConsumerMessage extends XNConsumerMessage {

    public TaskProcessCreateConsumerMessage() {
        setMsg_topic(TaskTopic.TASK_TOPIC);
    }

    @Autowired
    private TaskCenterService taskCenterService;

    @Override
    public String getTag() {
        return TaskProcessCreateMessage.MSG_TAG;
    }

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumerMessage(String message) {

        logger.warn("---------创建进度收到消息通知开始:"+JSON.toJSONString(message));

        TaskProcessCreateMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskProcessCreateMessage.class);
        Passport passport = msg.getPassport();
        //进程睡眠半秒钟,防止调用接口时,主事务还没有提交
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TaskProcessCreateRequest taskProcessCreateRequest =JSON.toJavaObject(JSON.parseObject(message),TaskProcessCreateRequest.class);
        taskProcessCreateRequest.setBusinessId(msg.getId());

        if(msg.getIsSystemMessage() != null && msg.getIsSystemMessage()){
            //系统评论不允许撤销
            taskProcessCreateRequest.setBusinessType(BusinessTypeEnum.SYSTEM.toString());
        }else{
            taskProcessCreateRequest.setBusinessType(BusinessTypeEnum.COMMON.toString());
        }
        TaskProcessCreateResponse taskProcessCreateResponse = taskCenterService.createTaskProcess(taskProcessCreateRequest,passport);

        if(taskProcessCreateResponse.hasError()){
            logger.warn("---------创建进度出错:"+JSON.toJSONString(taskProcessCreateResponse));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        logger.warn("---------创建进度收到消息通知结束:");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}