博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty实现websocket客户端(附:测试服务端代码)
阅读量:4363 次
发布时间:2019-06-07

本文共 10756 字,大约阅读时间需要 35 分钟。

1,客户端启动类

package test3;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.group.ChannelGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.http.DefaultHttpHeaders;import io.netty.handler.codec.http.HttpClientCodec;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;import io.netty.handler.codec.http.websocketx.WebSocketFrame;import io.netty.handler.codec.http.websocketx.WebSocketVersion;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URI;public final class WebSocketClient {    static final String URL = System.getProperty("url", "ws://127.0.0.1:5688/ws");    public void connect(String URL, ChannelGroup clients, ChannelHandlerContext ctx) throws Exception {        URI uri = new URI(URL);        String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();        final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();        final int port;        if (uri.getPort() == -1) {            if ("ws".equalsIgnoreCase(scheme)) {                port = 80;            } else if ("wss".equalsIgnoreCase(scheme)) {                port = 443;            } else {                port = -1;            }        } else {            port = uri.getPort();        }        if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {            System.err.println("Only WS(S) is supported.");            return;        }        final boolean ssl = "wss".equalsIgnoreCase(scheme);        final SslContext sslCtx;        if (ssl) {            sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();        } else {            sslCtx = null;        }        EventLoopGroup group = new NioEventLoopGroup();        try {            final WebSocketClientHandler handler = new WebSocketClientHandler(ctx, WebSocketClientHandshakerFactory                    .newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));            Bootstrap b = new Bootstrap();            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler); } }); Channel ch = b.connect(uri.getHost(), port).sync().channel(); handler.handshakeFuture().sync(); /* * String msg = "222"; WebSocketFrame frame = new * TextWebSocketFrame(msg); ch.writeAndFlush(frame); */ // ch.writeAndFlush("222"); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String msg = console.readLine(); if (msg == null) { break; } else if ("bye".equals(msg.toLowerCase())) { ch.writeAndFlush(new CloseWebSocketFrame()); ch.closeFuture().sync(); break; } else if ("ping".equals(msg.toLowerCase())) { WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 })); ch.writeAndFlush(frame); } else { WebSocketFrame frame = new TextWebSocketFrame(msg); ch.writeAndFlush(frame); } } } finally { group.shutdownGracefully(); } }}

(2)

package test3;import java.time.LocalDateTime;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.handler.codec.http.FullHttpResponse;import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;import io.netty.handler.codec.http.websocketx.WebSocketFrame;import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;import io.netty.util.CharsetUtil;import io.netty.util.concurrent.GlobalEventExecutor;import test.SocketHandlerInitializer;public class WebSocketClientHandler extends SimpleChannelInboundHandler {    private Channel outboundChannel;    private ChannelHandlerContext channel;    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    private final WebSocketClientHandshaker handshaker;    private ChannelPromise handshakeFuture;            public WebSocketClientHandler(ChannelHandlerContext channel, WebSocketClientHandshaker handshaker) {        this.handshaker = handshaker;        this.channel = channel;    }    public ChannelFuture handshakeFuture() {        return handshakeFuture;    }    @Override    public void handlerAdded(ChannelHandlerContext ctx) {        System.out.println("handlerAdded");        handshakeFuture = ctx.newPromise();    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        System.out.println("channelActive");        handshaker.handshake(ctx.channel());    }    @Override    public void channelInactive(ChannelHandlerContext ctx) {        System.out.println("WebSocket Client 链接失败!");    }    @Override    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("channelRead0");        Channel ch = ctx.channel();        if (!handshaker.isHandshakeComplete()) {            try {                handshaker.finishHandshake(ch, (FullHttpResponse) msg);                System.out.println("WebSocket Client connected!");                handshakeFuture.setSuccess();            } catch (WebSocketHandshakeException e) {                System.out.println("WebSocket Client failed to connect");                handshakeFuture.setFailure(e);                            }            return;        }        if (msg instanceof FullHttpResponse) {            FullHttpResponse response = (FullHttpResponse) msg;            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus()                    + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');        }        WebSocketFrame frame = (WebSocketFrame) msg;        if (frame instanceof TextWebSocketFrame) {            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;            // resposnse(ctx, frame);                        channel.writeAndFlush(textFrame.text());            System.out.println("WebSocket Client received message: " + textFrame.text());        } else if (frame instanceof PongWebSocketFrame) {            System.out.println("WebSocket Client received pong");        } else if (frame instanceof CloseWebSocketFrame) {            System.out.println("WebSocket Client received closing");            ch.close();        }    }    private void response(ChannelHandlerContext ctx, final WebSocketFrame msg) {        // 获取客户端传输过来的消息        String content = msg.toString();        clients.writeAndFlush(new TextWebSocketFrame("[服务器收到相应]" + LocalDateTime.now() + "接受萨达到消息, 消息为:" + content));        final Channel inboundChannel = ctx.channel();        Bootstrap b = new Bootstrap();        b.group(inboundChannel.eventLoop()).channel(ctx.channel().getClass())                .handler(new SocketHandlerInitializer(inboundChannel));        ChannelFuture f = b.connect("127.0.0.1", 5688);        outboundChannel = f.channel();        msg.retain();        ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() {            public void operationComplete(ChannelFuture future) throws Exception {                if (future.isSuccess()) {                    System.out.println("isSuccess:true");                    outboundChannel.writeAndFlush("2222222222");                } else {                    System.out.println("isSuccess:false");                    inboundChannel.close();                }            }        });    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        if (!handshakeFuture.isDone()) {            handshakeFuture.setFailure(cause);        }        ctx.close();    }}

(3)测试用的服务端代码

package com.googosoft.websocket;import java.io.IOException;import javax.websocket.DecodeException;import javax.websocket.OnClose;import javax.websocket.OnError;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.ServerEndpoint;@ServerEndpoint("/echo")public class EchoServer {        @OnOpen    public void initSession(Session session) {    }        @OnMessage    public void onMessage(String message, Session session)         throws IOException, InterruptedException {        System.out.println("Received: " + message);        session.getBasicRemote().sendText("This is the first server message");    }        @OnError      public void handleError(Throwable thw) {        thw.printStackTrace();        if (thw instanceof DecodeException) {           System.out.println("Error decoding incoming message: " + ((DecodeException)thw).getText());       } else {           System.out.println("Server WebSocket error: " + thw.getMessage());       }    }        @OnClose    public void processClose(Session session){            }}

在测试的时候,服务端的代码我把它放在了一个web项目里面充当服务端 

客户端就用的普通的Java项目,在main方法里面建立链接,实现通信

转载于:https://www.cnblogs.com/excellencesy/p/11246992.html

你可能感兴趣的文章
android 布局中 layout_gravity、gravity、orientation、layout_weight
查看>>
highcharts
查看>>
【学员管理系统】0x02 学生信息管理功能
查看>>
什么是Entity Framework(ORM)
查看>>
软件质量理解
查看>>
jquery 在 table 中修改某行值
查看>>
pyc文件是什么【转载】
查看>>
POM.xml 标签详解
查看>>
hdu 3635 Dragon Balls (并查集)
查看>>
文件操作
查看>>
7.java集合,泛型简单总结,IO流
查看>>
杭电2007 平方和与立方和
查看>>
JS邮箱验证-正则验证
查看>>
Quartz 2D绘图
查看>>
JS Fetch
查看>>
EJB 笔记
查看>>
【delete】Android自定义控件(四) 自定义ImageView动态设置ImageView的高度
查看>>
HDUOJ------(1230)火星A+B
查看>>
Servlet
查看>>
基于jquery地图特效全国网点查看代码
查看>>