Java 控制台程序实现类似广播功能
服务器端代码
添加 maven 依赖
javax.websocket javax.websocket-api 1.1 provided
服务器端代码
package com.seliote.web.http;import javax.websocket.*;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.io.InputStream;import java.util.ArrayList;import java.util.List;/** * 每次有 WebSocket 连接请求都会创建一个该类的实例 */@ServerEndpoint(value = "/broadcast")public class SocketServer { private static final ListonlinePeople = new ArrayList<>(); @OnOpen public void onOpen(Session aSession) { System.out.println(System.currentTimeMillis() + ": OnOpen:::" + onlinePeople.size() + 1); if (!onlinePeople.contains(aSession)) { onlinePeople.add(aSession); } } // 该方法是用于被动接收信息的 @OnMessage public void onMessage(Session aSession, String aS) throws IOException { System.out.println(System.currentTimeMillis() + ": OnMessage:::" + aS); for (Session session : onlinePeople) { session.getBasicRemote().sendText(aS); } } // OnMessage 可以有多个不同签名的 @OnMessage public void onMessage(Session aSession, InputStream aInputStream) { System.out.println(System.currentTimeMillis() + ": OnMessage"); // TODO } /** * 每次有客户端异常关闭该方法也会调用 * @param aSession * @param aCloseReason */ @OnClose public void onClose(Session aSession, CloseReason aCloseReason) { System.out.println(System.currentTimeMillis() + ": OnClose:::" + aCloseReason.getReasonPhrase()); if (onlinePeople.contains(aSession)) { onlinePeople.remove(aSession); } } @OnError public void onError(Session aSession, Throwable aThrowable) { System.out.println(System.currentTimeMillis() + ": OnError"); aThrowable.printStackTrace(); }}
如果连接时需要携带客户端信息,那么可以在路径中加入参数,如客户端路径加入用户 Token 变为 127.0.0.1/broadcast/123456,服务器端的标注就可改为 @ServerEndpoint(value = "/broadcast/{token}")
,之后的 @OnOpen
方法中就可以有一个 @PathParam("token") String aToken
代表客户端传入的 Token
客户端代码
添加 maven 依赖,注意这里使用的是 tyrus-standalone-client
而非 javax.websocket-client-api
后者会报错
org.glassfish.tyrus.bundles tyrus-standalone-client 1.3.3 compile
客户端代码
package com.seliote;import javax.websocket.*;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.Scanner;@ClientEndpointpublic class Demo { private static Session sSession; public static void main(String... args) throws URISyntaxException, DeploymentException, IOException { // https 协议对应使用 wss URI uri = new URI("ws", "127.0.0.1:8080", "/broadcast", null, null); // 通过 ContainerProvider 的 static 方法 getWebSocketContainer() 获得 WebSocketContainer sSession = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri); try (Scanner scanner = new Scanner(System.in)) { String broadcastMsg = ""; while (true) { broadcastMsg = scanner.nextLine(); // 通过 Session 对象主动发送信息 sSession.getBasicRemote().sendText(broadcastMsg); //sSession.getBasicRemote().getSendStream().write(....); } } } @OnOpen public void onOpen() { System.out.println(System.currentTimeMillis() + ": OnOpen "); } // 该方法是用于被动接收信息的 @OnMessage public void onMessage(String aS) { System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS); }}
在一个客户端输入信息后服务器会及时收到信息并广播给所有在线的客户端
------------------------------------------2019.01.09 更新
如果需要支持相应的实体类型,WebSocket 服务器端大概长 ,而客户端配置如下
Maven 依赖(这里用了 JSONObject 而不是服务器端的 Jackson)
org.glassfish.tyrus.bundles tyrus-standalone-client 1.3.3 compile org.json json 20180813 compile
package com.seliote.demo;/** * @author seliote * @date 2019-01-09 * @description WebSocket 信息实体 */@SuppressWarnings({"unused", "WeakerAccess"})public class BroadcastMsg { private String mSessionId; private String mTimestamp; private String mMsg; public BroadcastMsg() {} public BroadcastMsg(String aSessionId, String aTimestamp, String aMsg) { mSessionId = aSessionId; mTimestamp = aTimestamp; mMsg = aMsg; } public String getSessionId() { return mSessionId; } public void setSessionId(String aSessionId) { mSessionId = aSessionId; } public String getTimestamp() { return mTimestamp; } public void setTimestamp(String aTimestamp) { mTimestamp = aTimestamp; } public String getMsg() { return mMsg; } public void setMsg(String aMsg) { mMsg = aMsg; } @Override public String toString() { return mSessionId + " - " + mTimestamp + " - " + mMsg; }}
package com.seliote.demo;import org.json.JSONObject;import javax.websocket.Decoder;import javax.websocket.Encoder;import javax.websocket.EndpointConfig;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.nio.charset.StandardCharsets;/** * @author seliote * @date 2019-01-09 * @description BroadcastMsg 用于 WebSocket 的编码与解码器 */public class BroadcastMsgCoder implements Encoder.BinaryStream, Decoder.BinaryStream { @Override public void init(EndpointConfig aEndpointConfig) { } @Override public void destroy() { } @Override public void encode(BroadcastMsg aBroadcastMsg, OutputStream aOutputStream) throws IOException { aOutputStream.write(new JSONObject(aBroadcastMsg).toString().getBytes(StandardCharsets.UTF_8)); } @Override public BroadcastMsg decode(InputStream aInputStream) throws IOException { byte[] buffer = new byte[1024]; int length; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while ((length = aInputStream.read(buffer)) != -1) { byteArrayOutputStream.write(buffer, 0, length); } String json = new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8); JSONObject jsonObject = new JSONObject(json); return new BroadcastMsg( jsonObject.getString("sessionId"), jsonObject.getString("timestamp"), jsonObject.getString("msg") ); }}
package com.seliote.demo;import javax.websocket.ClientEndpoint;import javax.websocket.ContainerProvider;import javax.websocket.DeploymentException;import javax.websocket.EncodeException;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.Scanner;@ClientEndpoint( encoders = BroadcastMsgCoder.class, decoders = BroadcastMsgCoder.class)public class Demo { public static void main(String... args) throws URISyntaxException, DeploymentException, IOException { // https 协议对应使用 wss URI uri = new URI("ws", "127.0.0.1:8080", "/time/1", null, null); // 通过 ContainerProvider 的 static 方法 getWebSocketContainer() 获得 WebSocketContainer Session session = ContainerProvider.getWebSocketContainer().connectToServer(Demo.class, uri); try (Scanner scanner = new Scanner(System.in)) { //noinspection InfiniteLoopStatement while (true) { // 通过 Session 对象主动发送信息 try { String msg = scanner.nextLine(); BroadcastMsg broadcastMsg = new BroadcastMsg( session.getId(), System.currentTimeMillis() + "", msg ); session.getBasicRemote().sendObject(broadcastMsg); } catch (EncodeException exp) { exp.printStackTrace(); } //sSession.getBasicRemote().getSendStream().write(....); } } } @OnOpen public void onOpen() { System.out.println(System.currentTimeMillis() + ": OnOpen "); } @OnMessage public void onMessage(String aS) { System.out.println(System.currentTimeMillis() + ": OnMessage::: " + aS); } @OnMessage public void onMessage(BroadcastMsg aBroadcastMsg) { System.out.println(aBroadcastMsg); }}