TaskAutoOverdueConsumerMessage.java 7.04 KB
package com.xiniunet.mq;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Identity;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.master.request.system.UserGetRequest;
import com.xiniunet.master.response.system.UserGetResponse;
import com.xiniunet.master.service.MasterService;
import com.xiniunet.task.domain.Task;
import com.xiniunet.task.message.TaskAutoOverdueMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.task.request.TaskGetRequest;
import com.xiniunet.task.request.TaskOverdueRequest;
import com.xiniunet.task.response.TaskGetResponse;
import com.xiniunet.task.response.TaskOverdueResponse;
import com.xiniunet.task.service.TaskCenterService;
import com.xiniunet.xntalk.request.AttachMessagePushRequest;
import com.xiniunet.xntalk.request.IdentityExchangeRequest;
import com.xiniunet.xntalk.request.NotificationUnionGetRequest;
import com.xiniunet.xntalk.response.AttachMessagePushResponse;
import com.xiniunet.xntalk.response.IdentityExchangeResponse;
import com.xiniunet.xntalk.response.NotificationUnionGetResponse;
import com.xiniunet.xntalk.service.UnionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;


/**
 * Created on 2015-06-02.
 * @author 常展程
 * @since 2.1.0
 */
@Component
public class TaskAutoOverdueConsumerMessage extends XNConsumerMessage {

    public TaskAutoOverdueConsumerMessage() {
        setMsg_topic(TaskTopic.TASK_TOPIC);
    }

    @Autowired
    private TaskCenterService taskCenterService;

    @Autowired
    private UnionService unionService;

    @Autowired
    private MasterService masterService;
    @Override
    public String getTag() {
        return TaskAutoOverdueMessage.MSG_TAG;
    }

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumerMessage(String message) {

        logger.warn("---------任务过期收到消息通知开始:");
        logger.warn(message);

        TaskAutoOverdueMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskAutoOverdueMessage.class);
        Passport passport = msg.getPassport();
        TaskGetRequest taskGetRequest = new TaskGetRequest();
        taskGetRequest.setId(msg.getId());
        TaskGetResponse taskGetResponse = taskCenterService.getTask(taskGetRequest,passport);
        if(taskGetResponse.hasError()){
            logger.warn("---------任务过期出错:"+JSON.toJSONString(taskGetResponse));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        Task task = taskGetResponse.getTask();
        //任务没有完成没有终止时进行是否逾期判断
        Date now = new Date();
        if(task.getIsDone() != null && !task.getIsDone() && task.getIsAbort() !=null && !task.getIsAbort()){
            //当前时间大于等于任务结束时间进行逾期操作
            if(now.getTime() >= task.getEndTime().getTime()){
                TaskOverdueRequest taskOverdueRequest = new TaskOverdueRequest();
                taskOverdueRequest.setIsOverdue(true);
                taskOverdueRequest.setId(task.getId());
                //设置任务状态为已逾期
                TaskOverdueResponse taskOverdueResponse = taskCenterService.setTaskOverdue(taskOverdueRequest,passport);
                if(taskOverdueResponse.hasError()){
                    logger.warn("---------任务过期出错:"+JSON.toJSONString(taskOverdueResponse));
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //推送逾期消息告知app
                //获取推送这个unionId
                Long sendUnionId = null;
                Long receiveUnionId = null;
                NotificationUnionGetRequest notificationUnionGetRequest = new NotificationUnionGetRequest();
                notificationUnionGetRequest.setPassport(passport);
                NotificationUnionGetResponse notificationUnionGetResponse = unionService.getNotificationUnionId(notificationUnionGetRequest);
                if(!notificationUnionGetResponse.hasError()){
                    sendUnionId = notificationUnionGetResponse.getUnionId();
                }
                //获取推送对象的ReceiveUnionId
                UserGetRequest userGetRequest = new UserGetRequest();
                userGetRequest.setTenantId(passport.getTenantId());
                userGetRequest.setId(task.getOwnerUserId());
                UserGetResponse userGetResponse = masterService.getUser(userGetRequest,passport);
                if(!userGetResponse.hasError() && userGetResponse.getUser()!= null && userGetResponse.getUser().getUnionId() != null){
                    receiveUnionId =userGetResponse.getUser().getUnionId();
                }
                //获取Identity
                Identity identity = new Identity();
                IdentityExchangeRequest identityExchangeRequest = new IdentityExchangeRequest();
                IdentityExchangeResponse identityExchangeResponse = unionService.exchangeIdentityByPassport(identityExchangeRequest,passport);
                if(!identityExchangeResponse.hasError()){
                    identity = identityExchangeResponse.getIdentity();
                }
                //开始推送逾期消息给app
                if(sendUnionId != null && receiveUnionId != null){

                    AttachMessagePushRequest attachMessagePushRequest = new AttachMessagePushRequest();
                    attachMessagePushRequest.setSendUnionId(sendUnionId);
                    attachMessagePushRequest.setBadge(false);
                    attachMessagePushRequest.setSave(false);
                    attachMessagePushRequest.setNeedPushNick(false);
                    attachMessagePushRequest.setReceiveUnionId(receiveUnionId);
                    attachMessagePushRequest.setMessageType("UNION_EVENT_MESSAGE");
                    attachMessagePushRequest.setMessageData("{type:30," +
                            "messageType:30," +
                            "data:{" +
                            "tenantId:"+passport.getTenantId()+
                            ",businessType:\"DELAY_TASK_UPDATE\"" +
                            "}" +
                            "}");
                    attachMessagePushRequest.setRoute(true);
                    AttachMessagePushResponse attachMessagePushResponse = unionService.pushAttachMessage(attachMessagePushRequest,identity);
                    if(attachMessagePushResponse.hasError()){
                        logger.warn("---------任务过期出错:"+JSON.toJSONString(attachMessagePushResponse));
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            }
        }
        logger.warn("---------任务过期收到消息通知结束:");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}