引入maven依赖

org.springframework.boot

spring-boot-starter-websocket

 WebScoket配置处理器

import org.springframework.boot.web.servlet.ServletContextInitializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.servlet.ServletContext;

/**

* WebScoket配置处理器

*/

@Configuration

public class WebSocketConfig implements ServletContextInitializer {

/**

* ServerEndpointExporter 作用

*

* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint

*

* @return

*/

@Bean

public ServerEndpointExporter serverEndpointExporter() {

return new ServerEndpointExporter();

}

//设置websocket发送内容长度

@Override

public void onStartup(ServletContext servletContext) {

servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");

}

}

webScoket消息对象

import com.alibaba.fastjson.annotation.JSONField;

import lombok.Data;

import java.util.Date;

/**

* @author: ws

* @date: 20223/10/26 15:59

* @Description: WebSocketMessage

*/

@Data

public class WebSocketMessage {

/**

* 用户ID

*/

private String fromId;

/**

* 对方ID

*/

private String toOtherId;

//消息内容

private String message;

//发送时间

@JSONField(format="yyyy-MM-dd HH:mm:ss")

public Date date;

}

WebSocket操作类

import cn.hutool.core.collection.ListUtil;

import com.alibaba.fastjson.JSON;

import com.ws.wxyinghang.entity.WebSocketMessage;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import javax.websocket.*;

import javax.websocket.server.PathParam;

import javax.websocket.server.ServerEndpoint;

import java.io.IOException;

import java.util.Iterator;

import java.util.List;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.CopyOnWriteArraySet;

/**

* @author: ws

* @date: 20223/10/26 15:59

* @Description: WebSocket操作类

*/

@ServerEndpoint("/websocket/{userId}")

@Component

@Slf4j

public class WebSocketSever {

// 与某个客户端的连接会话,需要通过它来给客户端发送数据

private Session session;

private String userId;

// session集合,存放对应的session

private static ConcurrentHashMap sessionPool = new ConcurrentHashMap<>();

// concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。

private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet<>();

// 用于存放离线消息

private static ConcurrentHashMap> offlineMessageMap = new ConcurrentHashMap();

/**

* 建立WebSocket连接

*

* @param session

* @param userId 用户ID

*/

@OnOpen

public void onOpen(Session session, @PathParam(value = "userId") String userId) {

log.info("WebSocket建立连接中,连接用户ID:{}", userId);

try {

Session historySession = sessionPool.get(userId);

// historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象

if (historySession != null) {

webSocketSet.remove(historySession);

historySession.close();

}

} catch (IOException e) {

log.error("重复登录异常,错误信息:" + e.getMessage(), e);

}

// 建立连接

this.session = session;

this.userId = userId;

webSocketSet.add(this);

sessionPool.put(userId, session);

//从离线消息队列里面获取消息

if (offlineMessageMap.containsKey(userId)) {

List list = offlineMessageMap.get(userId);

Iterator it = list.iterator();

while (it.hasNext()) {

Object x = it.next();

//离线消息接收成功后删除消息

Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));

if (bb) {

System.out.println("从队列中删除离线消息" + x);

it.remove();

}

}

offlineMessageMap.remove(userId);

}

log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());

}

/**

* 发生错误

*

* @param throwable e

*/

@OnError

public void onError(Throwable throwable) {

throwable.printStackTrace();

}

/**

* 连接关闭

*/

@OnClose

public void onClose() {

webSocketSet.remove(this);

sessionPool.remove(this.userId);

log.info("连接断开,当前在线人数为:{}", webSocketSet.size());

}

/**

* 接收客户端消息

*

* @param message 接收的消息

*/

@OnMessage

public void onMessage(String message) {

log.info("收到客户端发来的消息:{}", message);

sendMessageByUser(message);

}

/**

* 推送消息到指定用户

*

* @param message 发送的消息

*/

public static Boolean sendMessageByUser(String message) {

WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);

log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);

Session session = sessionPool.get(msg.getToOtherId());

//判断session是否正常

if (session == null || !session.isOpen()) {

log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");

if (offlineMessageMap.containsKey(msg.getToOtherId())) {

List list = offlineMessageMap.get(msg.getToOtherId());

list.add(msg);

offlineMessageMap.put(msg.getToOtherId(), list);

} else {

offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));

}

}//发送消息

else {

try {

session.getBasicRemote().sendText(message);

} catch (IOException e) {

log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);

return false;

}

}

return true;

}

//发送离线消息

public static Boolean sendOfflineMessageByUser(String message) {

WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);

log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);

Session session = sessionPool.get(msg.getToOtherId());

try {

session.getBasicRemote().sendText(message);

} catch (IOException e) {

log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);

return false;

}

return true;

}

/**

* 群发消息

*

* @param message 发送的消息

*/

public static void sendAllMessage(String message) {

log.info("发送消息:{}", message);

for (WebSocketSever webSocket : webSocketSet) {

try {

webSocket.session.getBasicRemote().sendText(message);

} catch (IOException e) {

log.error("群发消息发生错误:" + e.getMessage(), e);

}

}

}

}

启动项目,使用apiFox测试,新建webScoket接口

新建websocket1,连接后发送消息 

 

新建webScoket2 ,可以看到连接后接收到了消息 

 

如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。

参考链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。