新增websocket通信(中继服务同步相关)

dev2.0
anthonywj 2 years ago
parent 24739fb392
commit 7ed62f3b38

@ -54,12 +54,12 @@
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>javax.servlet</groupId>-->
<!-- <artifactId>javax.servlet-api</artifactId>-->
<!-- <version>3.1.0</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>javax.servlet</groupId>-->
<!-- <artifactId>javax.servlet-api</artifactId>-->
<!-- <version>3.1.0</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!--让配置文件有提示-->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -348,6 +348,13 @@
<artifactId>knife4j-spring-boot-starter</artifactId>
<version>2.0.9</version>
</dependency>
<!-- https://github.com/TooTallNate/Java-WebSocket-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.4</version>
</dependency>
</dependencies>
<build>

@ -0,0 +1,8 @@
package com.glxp.api.constant;
public interface SocketMsgType {
String DL_ALL_DATA = "DL_ALL_DATA"; //生产入库
String DL_NOTICE = "DL_NOTICE"; //通知类消息
}

@ -6,7 +6,9 @@ import com.glxp.api.annotation.Log;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.common.util.ResultVOUtils;
import com.glxp.api.constant.BusinessType;
import com.glxp.api.constant.SocketMsgType;
import com.glxp.api.entity.sync.BasicDownloadStatusEntity;
import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.req.sync.BasicDownloadRequest;
import com.glxp.api.req.sync.SpsSyncDataRequest;
import com.glxp.api.req.system.DeleteRequest;
@ -14,6 +16,7 @@ import com.glxp.api.res.PageSimpleResponse;
import com.glxp.api.service.sync.BasicDownloadService;
import com.glxp.api.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@ -31,13 +34,29 @@ public class SpsSyncDownloadStatusController {
BasicDownloadService basicDownloadService;
@Resource
RedisUtil redisUtil;
@Value("${WEBSOCKET_TOKEN}")
private String socketToken;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
@GetMapping("/udispsync/sync/testConnection")
public BaseResponse connectTest(SpsSyncDataRequest spsSyncDataRequest) {
return ResultVOUtils.success("连接成功");
}
/**
*
*
*/
@AuthRuleAnnotation("")
@GetMapping("/spssync/download/now")
public BaseResponse noticeDlNow() {
SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build();
spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
return ResultVOUtils.success("已通知中级服务下载待同步数据!");
}
@AuthRuleAnnotation("")
@GetMapping("/spssync/download/basic/udiinfo/getStatus")
public BaseResponse getStatus(BasicDownloadRequest basicDownloadRequest) {

@ -0,0 +1,138 @@
package com.glxp.api.controller.sync;
import cn.hutool.core.util.StrUtil;
import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.exception.ServiceException;
import com.glxp.api.util.ByteArraySplitter;
import com.glxp.api.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint(value = "/spms/sync/{type}/{token}")
@Slf4j
public class SpsSyncWebSocket {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
@Value("${WEBSOCKET_TOKEN}")
private String socketToken;
private static int onlineCount = 0;
private static ConcurrentHashMap<String, SpsSyncWebSocket> socketServersMap = new ConcurrentHashMap<>();
private String key = "";
/**
*
*
* @param session
* @param type typeregister/verify
*/
@OnOpen
public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) {
this.session = session;
key = type + ":" + token + ":" + session.getId();
log.info("onOpen->>key:{}", key);
boolean flag = socketServersMap.containsKey(key);
if (flag) {
socketServersMap.remove(key);
socketServersMap.put(key, this);
} else {
socketServersMap.put(key, this);
addOnlineCount();
}
// if (StrUtil.isEmpty(token) || !token.equals(socketToken)) {
// throw new ServiceException("token验证异常");
// }
log.info("online number:{}", getOnlineCount());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId());
}
@OnClose
public void onClose() {
log.info("onClose : {}" + key);
if (socketServersMap.containsKey(key)) {
socketServersMap.remove(key);
subOnlineCount(); //在线数减1
}
log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount());
}
@OnError
public void onError(Throwable error) {
socketServersMap.remove(key);
subOnlineCount();
log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage());
}
/**
* @param socketMsgEntity ,
* @param sessionId token
*/
public synchronized void sendMessage(SocketMsgEntity socketMsgEntity, String sessionId) {
synchronized (this.getClass()) {
for (Map.Entry<String, SpsSyncWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) {
try {
String key = stringMyWebSocketEntry.getKey();
SpsSyncWebSocket value = stringMyWebSocketEntry.getValue();
if (key.contains(sessionId)) {
log.info("推送的消息为:" + key);
value.session.getBasicRemote().sendText(JsonUtils.toJsonString(socketMsgEntity));
// List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024);
// for (byte[] bytes : result) {
// String s = new String(bytes);
// value.session.getBasicRemote().sendText(s);
// }
// 推送结束符
// value.session.getBasicRemote().sendText("#end#");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 线
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 线+1
*/
public static synchronized void addOnlineCount() {
SpsSyncWebSocket.onlineCount++;
}
/**
* 线-1
*/
public static synchronized void subOnlineCount() {
SpsSyncWebSocket.onlineCount--;
}
}

@ -1,17 +1,24 @@
package com.glxp.api.controller.sync;
import cn.hutool.core.util.StrUtil;
import com.github.pagehelper.PageInfo;
import com.glxp.api.annotation.Log;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.common.util.ResultVOUtils;
import com.glxp.api.constant.BusinessType;
import com.glxp.api.constant.SocketMsgType;
import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.idc.utils.IDCUtils;
import com.glxp.api.res.inout.IoCodeResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.File;
import java.util.List;
@ -19,6 +26,11 @@ import java.util.List;
@RestController
public class TestController {
@Value("${WEBSOCKET_TOKEN}")
private String socketToken;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
@PostMapping("/test/file/uplaod")
@Log(title = "测试上传数据", businessType = BusinessType.INSERT)
public BaseResponse uploadProducts(@RequestParam("file") List<MultipartFile> files, @RequestParam("key") String key) {
@ -44,4 +56,14 @@ public class TestController {
}
return ResultVOUtils.success("上传成功");
}
@GetMapping("/testWebSocket")
public BaseResponse testWebSocket(String message) {
SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content(message).remark("下载基础信息").build();
spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
// spsSyncWebSocket.sendMessage(message, "1:07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b");
return ResultVOUtils.success(socketMsgEntity);
}
}

@ -0,0 +1,19 @@
package com.glxp.api.entity.sync;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* websocket
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SocketMsgEntity {
private String type;
private String content;
private String remark;
}

@ -9,12 +9,14 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.common.util.ResultVOUtils;
import com.glxp.api.constant.*;
import com.glxp.api.controller.sync.SpsSyncWebSocket;
import com.glxp.api.dao.basic.BasicProductsDao;
import com.glxp.api.entity.basic.*;
import com.glxp.api.entity.inout.*;
import com.glxp.api.entity.purchase.*;
import com.glxp.api.entity.sync.BasicExportStatusEntity;
import com.glxp.api.entity.sync.BasicExportStatusTimeEntity;
import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.entity.sync.SyncDataBustypeEntity;
import com.glxp.api.entity.thrsys.ThrBusTypeOriginEntity;
import com.glxp.api.req.basic.ProductInfoFilterRequest;
@ -130,6 +132,8 @@ public class SpsSyncDownloadService {
}
}
basicExportService.insertExportStatus(orderStatusEntity);
SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).remark("下载基础信息").build();
spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
}
}
}
@ -466,6 +470,9 @@ public class SpsSyncDownloadService {
// 异常回滚
this.exportTimeRollback(totalTimeMap, exportType, fileFullPath);
}
SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).remark("下载基础信息").build();
spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
return true;
} catch (IOException e) {
logger.error(String.format("syncIdcSps----process------------生成[%s]文件及更改库操作异常,异常信息<%s>"
@ -481,6 +488,10 @@ public class SpsSyncDownloadService {
}
}
@Value("${WEBSOCKET_TOKEN}")
private String socketToken;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
private final IoCodeLostService ioCodeLostService;

@ -0,0 +1,47 @@
package com.glxp.api.util;
import cn.hutool.core.util.ArrayUtil;
import com.beust.jcommander.internal.Lists;
import java.util.List;
public class ByteArraySplitter {
/**
* StringList<byte[]>
*
* @param source
* @param size
* @return
*/
public static List<byte[]> split(String source, int size) {
// 存放最终结果
List<byte[]> result = Lists.newArrayList();
if (StringUtils.isEmpty(source)) {
return null;
}
byte[] sourceBytes = source.getBytes();
if (size > sourceBytes.length) {
result.add(sourceBytes);
return result;
}
// 开始进行split
int startIndex = 0;
int endIndex = sourceBytes.length - 1;
boolean isRunning = true;
while (isRunning) {
if ((endIndex + 1) - startIndex > size) {
result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size));
startIndex += size;
} else {
result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1));
isRunning = false;
}
}
return result;
}
}

@ -53,3 +53,4 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 平潭协和医院
WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -53,3 +53,5 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 漳州市中医院
WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -396,5 +396,6 @@
INNER JOIN sup_manufacturer on sup_product.manufacturerIdFk = sup_manufacturer.manufacturerId
WHERE sup_manufacturer.companyName = #{manufactory}
and sup_product.recordProductName = #{cpmctymc}
limit 1
</select>
</mapper>

Loading…
Cancel
Save