TaskAutoOverdueConsumerMessage.java
7.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.xiniunet.mq;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Identity;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.master.request.system.UserGetRequest;
import com.xiniunet.master.response.system.UserGetResponse;
import com.xiniunet.master.service.MasterService;
import com.xiniunet.task.constant.Constant;
import com.xiniunet.task.dal.TaskMapper;
import com.xiniunet.task.domain.Task;
import com.xiniunet.task.message.TaskAutoOverdueMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.task.po.TaskPO;
import com.xiniunet.task.request.GroupAddTaskRequest;
import com.xiniunet.task.request.TaskGetRequest;
import com.xiniunet.task.request.TaskOverdueRequest;
import com.xiniunet.task.response.GroupAddTaskResponse;
import com.xiniunet.task.response.TaskGetResponse;
import com.xiniunet.task.response.TaskOverdueResponse;
import com.xiniunet.task.service.TaskService;
import com.xiniunet.xntalk.domain.XntalkMessage;
import com.xiniunet.xntalk.request.AttachMessagePushRequest;
import com.xiniunet.xntalk.request.IdentityExchangeRequest;
import com.xiniunet.xntalk.request.NotificationUnionGetRequest;
import com.xiniunet.xntalk.response.AttachMessagePushResponse;
import com.xiniunet.xntalk.response.IdentityExchangeResponse;
import com.xiniunet.xntalk.response.NotificationUnionGetResponse;
import com.xiniunet.xntalk.service.UnionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* Created on 2015-06-02.
* 抽奖活动中奖消息处理
* 生产者:relationship com.xiniunet.relationship.message.LotteryActivityWinMessage
* 消费者:relationship 当前类
* 处理中奖后的逻辑处理(积分/红包发放等)
* @author 常展程
* @since 2.1.0
*/
@Component
public class TaskAutoOverdueConsumerMessage extends XNConsumerMessage {
public TaskAutoOverdueConsumerMessage() {
setMsg_topic(TaskTopic.TASK_TOPIC);
}
@Autowired
private TaskService taskService;
@Autowired
private UnionService unionService;
@Autowired
private MasterService masterService;
@Override
public String getTag() {
return TaskAutoOverdueMessage.MSG_TAG;
}
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public ConsumeConcurrentlyStatus consumerMessage(String message) {
logger.warn("---------任务过期收到消息通知开始:");
logger.warn(message);
TaskAutoOverdueMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskAutoOverdueMessage.class);
Passport passport = msg.getPassport();
TaskGetRequest taskGetRequest = new TaskGetRequest();
taskGetRequest.setId(msg.getId());
TaskGetResponse taskGetResponse = taskService.getTask(taskGetRequest,passport);
if(taskGetResponse.hasError()){
logger.warn("---------任务过期出错:"+JSON.toJSONString(taskGetResponse));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
Task task = taskGetResponse.getTask();
//任务没有完成没有终止时进行是否逾期判断
Date now = new Date();
if(task.getIsDone() != null && !task.getIsDone() && task.getIsAbort() !=null && !task.getIsAbort()){
//当前时间大于等于任务结束时间进行逾期操作
if(now.getTime() >= task.getEndTime().getTime()){
TaskOverdueRequest taskOverdueRequest = new TaskOverdueRequest();
taskOverdueRequest.setIsOverdue(true);
taskOverdueRequest.setId(task.getId());
//设置任务状态为已逾期
TaskOverdueResponse taskOverdueResponse = taskService.setTaskOverdue(taskOverdueRequest,passport);
if(taskOverdueResponse.hasError()){
logger.warn("---------任务过期出错:"+JSON.toJSONString(taskOverdueResponse));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//推送逾期消息告知app
//获取推送这个unionId
Long sendUnionId = null;
Long receiveUnionId = null;
NotificationUnionGetRequest notificationUnionGetRequest = new NotificationUnionGetRequest();
notificationUnionGetRequest.setPassport(passport);
NotificationUnionGetResponse notificationUnionGetResponse = unionService.getNotificationUnionId(notificationUnionGetRequest);
if(!notificationUnionGetResponse.hasError()){
sendUnionId = notificationUnionGetResponse.getUnionId();
}
//获取推送对象的ReceiveUnionId
UserGetRequest userGetRequest = new UserGetRequest();
userGetRequest.setTenantId(passport.getTenantId());
userGetRequest.setId(task.getOwnerUserId());
UserGetResponse userGetResponse = masterService.getUser(userGetRequest,passport);
if(!userGetResponse.hasError() && userGetResponse.getUser()!= null && userGetResponse.getUser().getUnionId() != null){
receiveUnionId =userGetResponse.getUser().getUnionId();
}
//获取Identity
Identity identity = new Identity();
IdentityExchangeRequest identityExchangeRequest = new IdentityExchangeRequest();
IdentityExchangeResponse identityExchangeResponse = unionService.exchangeIdentityByPassport(identityExchangeRequest,passport);
if(!identityExchangeResponse.hasError()){
identity = identityExchangeResponse.getIdentity();
}
//开始推送逾期消息给app
if(sendUnionId != null && receiveUnionId != null){
AttachMessagePushRequest attachMessagePushRequest = new AttachMessagePushRequest();
attachMessagePushRequest.setSendUnionId(sendUnionId);
attachMessagePushRequest.setBadge(false);
attachMessagePushRequest.setSave(true);
attachMessagePushRequest.setNeedPushNick(false);
attachMessagePushRequest.setReceiveUnionId(receiveUnionId);
attachMessagePushRequest.setMessageType("UNION_EVENT_MESSAGE");
attachMessagePushRequest.setMessageData("{type:\"10\", data:\"有新的逾期任务\"}");
attachMessagePushRequest.setRoute(true);
AttachMessagePushResponse attachMessagePushResponse = unionService.pushAttachMessage(attachMessagePushRequest,identity);
if(attachMessagePushResponse.hasError()){
logger.warn("---------任务过期出错:"+JSON.toJSONString(attachMessagePushResponse));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
}
logger.warn("---------任务过期收到消息通知结束:");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}