一、SSE是什么?

SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。

注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持,但谷歌、火狐、360是可以的,IE不可以。

优点:SSE和WebSocket相比,最大的优势是便利,服务端不需要其他的类库,开发难度较低,SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。

缺点:如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数

sse 规范:在 html5 的定义中,服务端 sse,一般需要遵循以下要求:

Content-Type: text/event-stream;

charset=UTF-8Cache-Control: no-cache

Connection: keep-alive

实现一个例子

后端代码

/**

* 用于创建连接

*/

@GetMapping("/connect/{userId}")

public SseEmitter connect(@PathVariable String userId) {

return SseEmitterUtil.connect(userId);

}

/**

* 推送给所有人

* @param message

* @return

*/

@GetMapping("/push/{message}")

public ResponseEntity push(@PathVariable(name = "message") String message) {

// 获取连接人数

int userCount = SseEmitterUtil.getUserCount();

// 如果无在线人数,返回

if (userCount < 1) {

return ResponseEntity.status(500).body("无人在线!");

}

SseEmitterUtil.batchSendMessage(message);

return ResponseEntity.ok("发送成功!");

}

SseEmitterUtil代码

public class SseEmitterUtil {

/**

* 当前连接数

*/

private static AtomicInteger count = new AtomicInteger(0);

/**

* 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面

*/

private static Map sseEmitterMap = new ConcurrentHashMap<>();

/**

* 创建用户连接并返回 SseEmitter

* @param userId 用户ID

* @return SseEmitter

*/

public static SseEmitter connect(String userId) {

// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException

SseEmitter sseEmitter = new SseEmitter(0L);

// 注册回调

sseEmitter.onCompletion(completionCallBack(userId));

sseEmitter.onError(errorCallBack(userId));

sseEmitter.onTimeout(timeoutCallBack(userId));

sseEmitterMap.put(userId, sseEmitter);

// 数量+1

count.getAndIncrement();

log.info("创建新的sse连接,当前用户:{}", userId);

return sseEmitter;

}

/**

* 给指定用户发送信息

*/

public static void sendMessage(String userId, String message) {

if (sseEmitterMap.containsKey(userId)) {

try {

// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);

sseEmitterMap.get(userId).send(message);

}

catch (IOException e) {

log.error("用户[{}]推送异常:{}", userId, e.getMessage());

removeUser(userId);

}

}

}

/**

* 群发消息

*/

public static void batchSendMessage(String wsInfo, List ids) {

ids.forEach(userId -> sendMessage(wsInfo, userId));

}

/**

* 群发所有人

*/

public static void batchSendMessage(String wsInfo) {

sseEmitterMap.forEach((k, v) -> {

try {

v.send(wsInfo, MediaType.APPLICATION_JSON);

}

catch (IOException e) {

log.error("用户[{}]推送异常:{}", k, e.getMessage());

removeUser(k);

}

});

}

/**

* 移除用户连接

*/

public static void removeUser(String userId) {

sseEmitterMap.remove(userId);

// 数量-1

count.getAndDecrement();

log.info("移除用户:{}", userId);

}

/**

* 获取当前连接信息

*/

public static List getIds() {

return new ArrayList<>(sseEmitterMap.keySet());

}

/**

* 获取当前连接数量

*/

public static int getUserCount() {

return count.intValue();

}

private static Runnable completionCallBack(String userId) {

return () -> {

log.info("结束连接:{}", userId);

removeUser(userId);

};

}

private static Runnable timeoutCallBack(String userId) {

return () -> {

log.info("连接超时:{}", userId);

removeUser(userId);

};

}

private static Consumer errorCallBack(String userId) {

return throwable -> {

log.info("连接异常:{}", userId);

removeUser(userId);

};

}

}

前端代码

查看原文