TaskProcessCreateConsumerMessage.java 2.51 KB
package com.xiniunet.mq;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.master.service.MasterService;
import com.xiniunet.task.message.TaskProcessCreateMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.task.request.TaskProcessCreateRequest;
import com.xiniunet.task.response.TaskProcessCreateResponse;
import com.xiniunet.task.service.TaskCenterService;
import com.xiniunet.xntalk.service.UnionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;



/**
 * Created on 2015-06-02.
 * @author 常展程
 * @since 2.1.0
 */
@Component
public class TaskProcessCreateConsumerMessage extends XNConsumerMessage {

    public TaskProcessCreateConsumerMessage() {
        setMsg_topic(TaskTopic.TASK_TOPIC);
    }

    @Autowired
    private TaskCenterService taskCenterService;

    @Autowired
    private UnionService unionService;

    @Autowired
    private MasterService masterService;
    @Override
    public String getTag() {
        return TaskProcessCreateMessage.MSG_TAG;
    }

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumerMessage(String message) {

        logger.warn("---------创建进度收到消息通知开始:");
        logger.warn(message);

        TaskProcessCreateMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskProcessCreateMessage.class);
        Passport passport = msg.getPassport();

        TaskProcessCreateRequest taskProcessCreateRequest = new TaskProcessCreateRequest();
//        taskProcessCreateRequest.setNeedPushMessage(false);
        taskProcessCreateRequest.setId(msg.getId());
        taskProcessCreateRequest.setContent(msg.getContent());
        taskProcessCreateRequest.setAnonymous(false);
        TaskProcessCreateResponse taskProcessCreateResponse = taskCenterService.createTaskProcess(taskProcessCreateRequest,passport);

        if(taskProcessCreateResponse.hasError()){
            logger.warn("---------创建进度出错:"+JSON.toJSONString(taskProcessCreateResponse));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        logger.warn("---------创建进度收到消息通知结束:");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}