TaskRemindJobConsumerMessage.java 9.27 KB
package com.xiniunet.mq;

import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.framework.util.DateUtil;
import com.xiniunet.quartz.TaskCheckIsOverdueJob;
import com.xiniunet.quartz.TaskRemindingJob;
import com.xiniunet.quartz.TaskUserRemindingJob;
import com.xiniunet.quartz.TaskWeeklyRemindingJob;
import com.xiniunet.quartz.base.XNQuartzJob;
import com.xiniunet.quartz.base.XNQuartzJobData;
import com.xiniunet.task.dal.TaskMapper;
import com.xiniunet.task.dal.TaskUserMapper;
import com.xiniunet.task.message.TaskRemindJobMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.task.po.TaskPO;
import com.xiniunet.task.po.TaskUserPO;
import com.xiniunet.task.request.TaskUserFindRequest;
import com.xiniunet.task.service.TaskCenterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;


/**
 * Created on 2015-06-02.
 * @author 常展程
 * @since 2.1.0
 */
@Component
public class TaskRemindJobConsumerMessage extends XNConsumerMessage {

    public TaskRemindJobConsumerMessage() {
        setMsg_topic(TaskTopic.TASK_TOPIC);
    }

    @Autowired
    private TaskCenterService taskCenterService;
    @Autowired
    private TaskMapper taskMapper;

    @Autowired
    private TaskUserMapper taskUserMapper;
    @Override
    public String getTag() {
        return TaskRemindJobMessage.MSG_TAG;
    }

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumerMessage(String message) {

        logger.warn("---------创建调度器收到消息通知开始:"+JSON.toJSONString(message));
        try {
            TaskRemindJobMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskRemindJobMessage.class);
            Passport passport = msg.getPassport();
            //进程睡眠半秒钟,防止调用接口时,主事务还没有提交
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //查询任务
            TaskPO taskPO = taskMapper.getById(msg.getId(),passport);
            if(taskPO == null){
                logger.warn("---------任务信息不存在:");
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            XNQuartzJobData xnQuartzJobData = new XNQuartzJobData();
            xnQuartzJobData.setId(msg.getId());
            xnQuartzJobData.setPassport(passport);
            if(taskPO.getEndTime() != null){
                //调用检查逾期时间调度器
                TaskCheckIsOverdueJob taskCheckIsOverdueJob = new TaskCheckIsOverdueJob();
                taskCheckIsOverdueJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                taskCheckIsOverdueJob.setTriggerTag(String.valueOf(taskPO.getId()),  "TaskCheckIsOverdueJob");
                taskCheckIsOverdueJob.setCronTrigger(XNQuartzJob.getCron(taskPO.getEndTime()));
            }else{
                //设置一个过期时间
                TaskCheckIsOverdueJob taskCheckIsOverdueJob = new TaskCheckIsOverdueJob();
                taskCheckIsOverdueJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                taskCheckIsOverdueJob.setTriggerTag(String.valueOf(taskPO.getId()),  "TaskCheckIsOverdueJob");
                taskCheckIsOverdueJob.setCronTrigger(XNQuartzJob.getCron(new Date(new Date().getTime()-1*60*1000)));
            }
            if(taskPO.getRemindingTime() != null){
                //调用任务提醒时间调度器
                TaskRemindingJob taskRemindingJob = new TaskRemindingJob();
                taskRemindingJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                taskRemindingJob.setTriggerTag(String.valueOf(taskPO.getId()),  "TaskRemindingJob");
                taskRemindingJob.setCronTrigger(XNQuartzJob.getCron(taskPO.getRemindingTime()));
            }else{
                //设置一个过期时间
                TaskRemindingJob taskRemindingJob = new TaskRemindingJob();
                taskRemindingJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                taskRemindingJob.setTriggerTag(String.valueOf(taskPO.getId()), "TaskRemindingJob");
                taskRemindingJob.setCronTrigger(XNQuartzJob.getCron(new Date(new Date().getTime()-1*60*1000)));
            }
            //需要修改每周几定时检查提醒调度器
            if(msg.getUpdateWeeklyJob() != null && msg.getUpdateWeeklyJob()){
                if(taskPO.getAlarmTime() != null){
                    //调用任务每周几定时检查提醒调度器
                    TaskWeeklyRemindingJob taskWeeklyRemindingJob = new TaskWeeklyRemindingJob();
                    taskWeeklyRemindingJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                    taskWeeklyRemindingJob.setTriggerTag(String.valueOf(taskPO.getId()), "TaskWeeklyRemindingJob");
                    StringBuffer stringBuffer = new StringBuffer("");
                    stringBuffer.append(DateUtil.formatDate(taskPO.getAlarmTime(), "ss mm HH"));
                    stringBuffer.append(" ? * ");
                    if (taskPO.getAlarmWeek1() && taskPO.getAlarmWeek2() && taskPO.getAlarmWeek3()
                            && taskPO.getAlarmWeek4() && taskPO.getAlarmWeek5()
                            && taskPO.getAlarmWeek6() && taskPO.getAlarmWeek7()) {
                        stringBuffer.append("*");
                    } else {
                        //每周以周日为开始的第一天
                        if (taskPO.getAlarmWeek1()) {
                            stringBuffer.append("2,");
                        }
                        if (taskPO.getAlarmWeek2()) {
                            stringBuffer.append("3,");
                        }
                        if (taskPO.getAlarmWeek3()) {
                            stringBuffer.append("4,");
                        }
                        if (taskPO.getAlarmWeek4()) {
                            stringBuffer.append("5,");
                        }
                        if (taskPO.getAlarmWeek5()) {
                            stringBuffer.append("6,");
                        }
                        if (taskPO.getAlarmWeek6()) {
                            stringBuffer.append("7,");
                        }
                        if (taskPO.getAlarmWeek7()) {
                            stringBuffer.append("1");
                        }
                    }

                    String cron = stringBuffer.toString();
                    if(cron.endsWith(",")){
                        cron = cron.substring(0,cron.length()-1);
                    }
                    taskWeeklyRemindingJob.setCronTrigger(cron);
                }else{
                    //设置一个过期时间
                    TaskWeeklyRemindingJob taskWeeklyRemindingJob = new TaskWeeklyRemindingJob();
                    taskWeeklyRemindingJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                    taskWeeklyRemindingJob.setTriggerTag(String.valueOf(taskPO.getId()), "TaskWeeklyRemindingJob");
                    taskWeeklyRemindingJob.setCronTrigger(XNQuartzJob.getCron(new Date(new Date().getTime()-1*60*1000)));
                }
            }
            if(msg.getUpdateUserRemind() != null && msg.getUpdateUserRemind()){
                TaskUserFindRequest taskUserFindRequest = new TaskUserFindRequest();
                taskUserFindRequest.setTaskId(msg.getId());
                taskUserFindRequest.setPageSize(0);
                List<TaskUserPO> taskUserPOList= taskUserMapper.find(taskUserFindRequest,passport);
                if(CollectionUtils.isNotEmpty(taskUserPOList)){
                    for(TaskUserPO taskUserPO : taskUserPOList){
                        if(taskUserPO.getIsTimeAlarm() != null && taskUserPO.getIsTimeAlarm() && taskUserPO.getAheadTimeCount() != null){
                            Date remindTime = (new Date(taskPO.getEndTime().getTime()-taskUserPO.getAheadTimeCount()*60*1000));
                            xnQuartzJobData.setUserTaskId(taskUserPO.getId());
                            //调用任务提醒时间调度器
                            TaskUserRemindingJob taskUserRemindingJob = new TaskUserRemindingJob();
                            taskUserRemindingJob.setTriggerData(JSON.toJSONString(xnQuartzJobData));
                            //设置为用户任务的id
                            taskUserRemindingJob.setTriggerTag(String.valueOf(taskUserPO.getId()),  "TaskUserRemindingJob");
                            taskUserRemindingJob.setCronTrigger(XNQuartzJob.getCron(remindTime));

                        }
                    }
                }
            }
        }catch (Exception e){
            logger.warn("---------创建调度器失败");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }finally {
            logger.warn("---------创建调度器收到消息通知结束:");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

}