TaskPushAppMessageConsumerMessage.java
6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.xiniunet.mq;
import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.xiniunet.framework.message.XNConsumerMessage;
import com.xiniunet.framework.security.Identity;
import com.xiniunet.framework.security.Passport;
import com.xiniunet.master.domain.system.User;
import com.xiniunet.master.request.system.UserFindRequest;
import com.xiniunet.master.response.system.UserFindResponse;
import com.xiniunet.master.service.MasterService;
import com.xiniunet.task.message.TaskPushAppMessageMessage;
import com.xiniunet.task.message.TaskTopic;
import com.xiniunet.xntalk.domain.UnionEventMessage;
import com.xiniunet.xntalk.request.AttachMessageBatchPushRequest;
import com.xiniunet.xntalk.request.IdentityExchangeRequest;
import com.xiniunet.xntalk.request.NotificationUnionGetRequest;
import com.xiniunet.xntalk.response.AttachMessageBatchPushResponse;
import com.xiniunet.xntalk.response.IdentityExchangeResponse;
import com.xiniunet.xntalk.response.NotificationUnionGetResponse;
import com.xiniunet.xntalk.service.UnionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* Created on 2015-06-02.
* @author 常展程
* @since 2.1.0
*/
@Component
public class TaskPushAppMessageConsumerMessage extends XNConsumerMessage {
public TaskPushAppMessageConsumerMessage() {
setMsg_topic(TaskTopic.TASK_TOPIC);
}
@Autowired
private UnionService unionService;
@Autowired
private MasterService masterService;
@Override
public String getTag() {
return TaskPushAppMessageMessage.MSG_TAG;
}
private Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 推送消息告知app
* @param message
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumerMessage(String message) {
logger.warn("---------推送消息告知app收到消息通知开始:"+JSON.toJSONString(message));
try {
TaskPushAppMessageMessage msg = JSON.toJavaObject(JSON.parseObject(message), TaskPushAppMessageMessage.class);
Passport passport = msg.getPassport();
//推送动态消息告知app
//接收者userId集合
List<Long> receiptUserIdList = new ArrayList<>();
//获取推送这个unionId
Long sendUnionId = null;
//接收者unionId集合
List<Long> receiveUnionIdList = new ArrayList<>();
if(msg.getUserId() != null){
receiptUserIdList.add(msg.getUserId());
}
if(CollectionUtils.isNotEmpty(msg.getUserIdList())){
receiptUserIdList.addAll(msg.getUserIdList());
}
NotificationUnionGetRequest notificationUnionGetRequest = new NotificationUnionGetRequest();
notificationUnionGetRequest.setPassport(passport);
NotificationUnionGetResponse notificationUnionGetResponse = unionService.getNotificationUnionId(notificationUnionGetRequest);
if(!notificationUnionGetResponse.hasError()){
sendUnionId = notificationUnionGetResponse.getUnionId();
}
//获取推送对象的ReceiveUnionIdList
UserFindRequest userFindRequest = new UserFindRequest();
userFindRequest.setIds(receiptUserIdList);
UserFindResponse userFindResponse = masterService.findUser(userFindRequest,passport);
if(!userFindResponse.hasError() && CollectionUtils.isNotEmpty(userFindResponse.getResult()) ){
for(User user : userFindResponse.getResult()){
if(user.getUnionId()!= null){
receiveUnionIdList.add(user.getUnionId());
}
}
}
//获取Identity
Identity identity = new Identity();
IdentityExchangeRequest identityExchangeRequest = new IdentityExchangeRequest();
IdentityExchangeResponse identityExchangeResponse = unionService.exchangeIdentityByPassport(identityExchangeRequest,passport);
if(!identityExchangeResponse.hasError()){
identity = identityExchangeResponse.getIdentity();
}
//开始推送逾期消息给app
if(sendUnionId != null && CollectionUtils.isNotEmpty(receiveUnionIdList)) {
AttachMessageBatchPushRequest attachMessageBatchPushRequest = new AttachMessageBatchPushRequest();
attachMessageBatchPushRequest.setSendUnionId(sendUnionId);
attachMessageBatchPushRequest.setBadge(false);
attachMessageBatchPushRequest.setSave(false);
attachMessageBatchPushRequest.setRoute(true);
attachMessageBatchPushRequest.setNeedPushNick(false);
attachMessageBatchPushRequest.setReceiveUnionIds(receiveUnionIdList);
attachMessageBatchPushRequest.setMessageType("UNION_EVENT_MESSAGE");
UnionEventMessage unionEventMessage = new UnionEventMessage();
unionEventMessage.setTenantId(passport.getTenantId());
unionEventMessage.setType(30);
unionEventMessage.setBusinessType(msg.getBusinessType());
attachMessageBatchPushRequest.setMessageData(JSON.toJSONString(unionEventMessage));
AttachMessageBatchPushResponse attachMessagePushResponse = unionService.batchPushAttachMessage(attachMessageBatchPushRequest, identity);
if (attachMessagePushResponse.hasError()) {
logger.warn("---------推送动态消息告知app出错:" + JSON.toJSONString(attachMessagePushResponse));
}
}
}catch (Exception e){
logger.warn("---------推送消息告知app失败");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}finally {
logger.warn("---------推送消息告知app收到消息通知结束:");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}