TaskPushAppMessageConsumerMessage.java 6.88 KB
package com.xiniunet.mq;

import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Identity;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.framework.util.DateUtil;
import com.xiniunet.master.domain.system.User;
import com.xiniunet.master.request.system.UserFindRequest;
import com.xiniunet.master.request.system.UserGetRequest;
import com.xiniunet.master.response.system.UserFindResponse;
import com.xiniunet.master.response.system.UserGetResponse;
import com.xiniunet.master.service.MasterService;
import com.xiniunet.quartz.TaskCheckIsOverdueJob;
import com.xiniunet.quartz.TaskRemindingJob;
import com.xiniunet.quartz.TaskWeeklyRemindingJob;
import com.xiniunet.quartz.base.XNQuartzJob;
import com.xiniunet.quartz.base.XNQuartzJobData;
import com.xiniunet.task.dal.TaskMapper;
import com.xiniunet.task.message.TaskPushAppMessageMessage;
import com.xiniunet.task.message.TaskRemindJobMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.task.po.TaskPO;
import com.xiniunet.task.service.TaskCenterService;
import com.xiniunet.xntalk.domain.UnionEventMessage;
import com.xiniunet.xntalk.request.AttachMessageBatchPushRequest;
import com.xiniunet.xntalk.request.AttachMessagePushRequest;
import com.xiniunet.xntalk.request.IdentityExchangeRequest;
import com.xiniunet.xntalk.request.NotificationUnionGetRequest;
import com.xiniunet.xntalk.response.AttachMessageBatchPushResponse;
import com.xiniunet.xntalk.response.AttachMessagePushResponse;
import com.xiniunet.xntalk.response.IdentityExchangeResponse;
import com.xiniunet.xntalk.response.NotificationUnionGetResponse;
import com.xiniunet.xntalk.service.UnionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;


/**
 * Created on 2015-06-02.
 * @author 常展程
 * @since 2.1.0
 */
@Component
public class TaskPushAppMessageConsumerMessage extends XNConsumerMessage {

    public TaskPushAppMessageConsumerMessage() {
        setMsg_topic(TaskTopic.TASK_TOPIC);
    }

    @Autowired
    private UnionService unionService;

    @Autowired
    private MasterService masterService;
    @Override
    public String getTag() {
        return TaskPushAppMessageMessage.MSG_TAG;
    }

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 推送消息告知app
     * @param message
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumerMessage(String message) {

        logger.warn("---------推送消息告知app收到消息通知开始:"+JSON.toJSONString(message));
        try {

            TaskPushAppMessageMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskPushAppMessageMessage.class);
            Passport passport = msg.getPassport();
            //推送动态消息告知app
            //接收者userId集合
            List<Long> receiptUserIdList = new ArrayList<>();
            //获取推送这个unionId
            Long sendUnionId = null;
            //接收者unionId集合
            List<Long> receiveUnionIdList = new ArrayList<>();

            if(msg.getUserId() != null){
                receiptUserIdList.add(msg.getUserId());
            }
            if(CollectionUtils.isNotEmpty(msg.getUserIdList())){
                receiptUserIdList.addAll(msg.getUserIdList());
            }
            NotificationUnionGetRequest notificationUnionGetRequest = new NotificationUnionGetRequest();
            notificationUnionGetRequest.setPassport(passport);
            NotificationUnionGetResponse notificationUnionGetResponse = unionService.getNotificationUnionId(notificationUnionGetRequest);
            if(!notificationUnionGetResponse.hasError()){
                sendUnionId = notificationUnionGetResponse.getUnionId();
            }
            //获取推送对象的ReceiveUnionIdList
            UserFindRequest userFindRequest = new UserFindRequest();
            userFindRequest.setIds(receiptUserIdList);
            UserFindResponse userFindResponse = masterService.findUser(userFindRequest,passport);
            if(!userFindResponse.hasError() && CollectionUtils.isNotEmpty(userFindResponse.getResult()) ){
                for(User user : userFindResponse.getResult()){
                    if(user.getUnionId()!= null){
                        receiveUnionIdList.add(user.getUnionId());
                    }
                }
            }
            //获取Identity
            Identity identity = new Identity();
            IdentityExchangeRequest identityExchangeRequest = new IdentityExchangeRequest();
            IdentityExchangeResponse identityExchangeResponse = unionService.exchangeIdentityByPassport(identityExchangeRequest,passport);
            if(!identityExchangeResponse.hasError()){
                identity = identityExchangeResponse.getIdentity();
            }
            //开始推送逾期消息给app
            if(sendUnionId != null && CollectionUtils.isNotEmpty(receiveUnionIdList)) {

                AttachMessageBatchPushRequest attachMessageBatchPushRequest = new AttachMessageBatchPushRequest();

                attachMessageBatchPushRequest.setSendUnionId(sendUnionId);
                attachMessageBatchPushRequest.setBadge(false);
                attachMessageBatchPushRequest.setSave(false);
                attachMessageBatchPushRequest.setRoute(true);
                attachMessageBatchPushRequest.setNeedPushNick(false);
                attachMessageBatchPushRequest.setReceiveUnionIds(receiveUnionIdList);
                attachMessageBatchPushRequest.setMessageType("UNION_EVENT_MESSAGE");
                UnionEventMessage unionEventMessage = new UnionEventMessage();
                unionEventMessage.setTenantId(passport.getTenantId());
                unionEventMessage.setType(30);
                unionEventMessage.setBusinessType(msg.getBusinessType());
                attachMessageBatchPushRequest.setMessageData(JSON.toJSONString(unionEventMessage));

                AttachMessageBatchPushResponse attachMessagePushResponse = unionService.batchPushAttachMessage(attachMessageBatchPushRequest, identity);
                if (attachMessagePushResponse.hasError()) {
                    logger.warn("---------推送动态消息告知app出错:" + JSON.toJSONString(attachMessagePushResponse));
                }
            }

        }catch (Exception e){
            logger.warn("---------推送消息告知app失败");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }finally {
            logger.warn("---------推送消息告知app收到消息通知结束:");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

}