前言
介绍Netty的概述和基础用法。
- 组件
- 简单实现
概述
Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务端和客户端。这里的异步并没有使用AIO,只是使用了多线程。Netty在网络应用框架中的地位,相当于Spring框架在javaEE开发中的地位。
Netty的优势:
简单实现
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class HelloServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) // 4. boss负责连接,child(即worker)负责读写,这里添加worker事件触发的具体处理(handler) .childHandler( // 5. 和客户端连接Channel的初始化器,负责增强这个channel new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder()); nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } }) .bind(8080); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class HelloClient { public static void main(String[] args) throws InterruptedException { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringEncoder()); } }) .connect("localhost", 8080) .sync() .channel() .writeAndFlush("hello world"); } }
|
理解
- Channel理解成数据的传输通道
- 把Msg理解成channel中流动的数据,最开始是ByteBuf,通过pipeline流水线进行加工,最终变成其他的样子,最后又通过ByteBuf输出
- 把handler理解成数据处理的工序,处理工序合在一起就是pipeline(ChannelPipeline),这个流水线可以在channel的各种事件的前后切点处织入方法(方法可以自定义)
- handler分为Inbound和outbound(数据入栈和出栈)
- eventLoop理解成工人(selector + 单线程线程池)
- 工人管理多个channel的io操作,并且会和这些channel绑定(selector绑定, IO绑定)
- 工人既可以执行io操作,也可以进行数据逻辑处理(数据的处理和selector就没关系了),每个工人有任务队列(阻塞),任务可以分为定时任务和普通任务
- 工人根据pipeline的顺序,去处理数据,每道工序都可以指派给不同的工人(离谱),但最后还是由这个工人来执行最后的IO。
- 换句话讲,就是第一个handle负责绑定EventLoop,尾部handle负责IO,尾部必须使用首部绑定的EventLoop
组件
Eventloop
事件循环对象。
Eventloop本质是一个单线程执行器(同时维护了一个selector),里面有run方法执行源源不断的IO事件。
继承关系(复杂)
SchelduledExecutorService
:包含了线程池中所有方法
- 自己的
OrderedEventExecutor
boolean inEventloop(Thread thread)
:判断线程是否属于这个loop
parent()
查看自己属于哪个Group
EventloopGroup
事件循环组。
一组Eventloop,channel会调用group中的register方法绑定其中的一个Eventloop,后续的IO操作都有这个Eventloop处理。
基本操作样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class TestEventLoop { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(2);
EventLoop loop = group.next(); loop.submit(() -> System.out.println(1111)); group.next().scheduleAtFixedRate(() -> System.out.println(2222), 0L, 1L, TimeUnit.SECONDS); } }
|
channel绑定案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class EventLoopServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(4)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(StandardCharsets.UTF_8)); } }); } }) .bind(8080); } }
public class EventLoopClient { public static void main(String[] args) throws InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel(); System.out.println(channel); System.out.println(""); } }
|
问题:如果一个EventLoop绑定了很多channel,处理某一个channel的handle时间过长,则会阻塞其他channel的事件,这怎么办?
答:做一次细分,创建一个独立的EventLoop来处理handle的任务,将IO和数据逻辑处理分离。
优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Sl4fj public class EventLoopServer { public static void main(String[] args) { DefaultEventLoopGroup taskGroup = new DefaultEventLoopGroup(); new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(4)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(buf.toString(StandardCharsets.UTF_8)); ctx.fireChannelRead(msg); } }).addLast(taskGroup, "handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(buf.toString(StandardCharsets.UTF_8)); } }); } }) .bind(8080); } }
|
handle切换线程原理
关键代码:io.netty.channel.AbstractChannelHandlerContext
每个handler都要判断下一个handler是否绑定了当前的EventLoop。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }
|
Channel
方法:
close()
:用来关闭channel(异步关闭,返回一个ChannelFutrue)
closeFuture()
:用来处理close的关闭
sync()
:同步等待channel关闭
addListener()
:异步等待关闭
pipeline()
:添加处理器
write()
:数据写入,可能会先进入缓冲
wtiteAndFlush()
:数据写入并刷出
ChannelFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ChannelFutureClient { public static void main(String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nc) throws Exception { } }) .connect("localhost", 8080); Channel channel = channelFuture.channel(); channel.writeAndFlush("1111"); } }
|
问题:为什么无法发送数据呢?
答:
connect
方法是异步非阻塞的,主线程调用connect,但是由另一个NIO线程去建立连接。
- 主线程去如果不执行
sync
,连接还未建立,write自然就不生效了。
发送数据
一般带有Future和Promise的类就是为了用来异步处理结果的。
获取异步处理结果,让Channel能发送数据的方法如下:
sync
:阻塞主线程直到NIO成功建立连接
addListener
:异步等待channel连接的结果(给一个回调对象)
1 2 3 4 5 6 7 8 9 10
| channelFuture.sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush("1111");
channelFuture.addListener((ChannelFutureListener) future -> { Channel channel1 = future.channel(); channel.writeAndFlush("qqqqqqqq"); });
|
关闭连接
1 2 3 4 5 6 7 8 9 10 11 12
| Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner in = new Scanner(System.in); while (true) { String next = in.next(); if ("q".equals(next)) { channel.close(); break; } channel.writeAndFlush(next); } },"input").start();
|
因为channel.close
异步,那如何在channel关闭后,处理业务逻辑?
1 2 3 4 5 6 7
| ChannelFuture future = channel.closeFuture();
future.sync();
future.addListener((f) -> {});
|
还有问题:channel关闭后,主线程并没有退出,占用资源,如果优雅退出?
- 把EventLoopGroup关闭掉:
group.shutdownGracefully()
Future && Promise
- Netty中的Future继承自JDK中的Future接口
- Netty中的Promise继承自Netty的Future接口
功能:
- JDK的Future只能同步等待任务结束才能得到结果
- Netty的Futrue可以通过异步方式获取结果(addListener)
- Netty的Promise可以脱离任务存在,只作为两个线程间传递数据的容器
Promise样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j public class TestNettyPromise { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup loopGroup = new NioEventLoopGroup(); EventLoop loop = loopGroup.next(); DefaultPromise<Object> promise = new DefaultPromise<>(loop); new Thread(() -> { log.info("...开始计算"); try { Thread.sleep(1000); }catch (Exception e) { e.printStackTrace(); } promise.setSuccess(800); }).start();
log.info("等待结果..."); log.info("结果:{}", promise.get()); } }
|
Handler && pipeline
ChannelHandler用来处理Channel上的各种事件,分为入站和出站handler(读入和写出),所有的ChannelHandler组成一串,就是pipeline。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Slf4j public class TestNettyHandler { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup(), new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) // head -- h1 -- h2 -- h3 -- h4 -- h5 -- h6 -- tail (双向) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nc) throws Exception { nc.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("1"); super.channelRead(ctx, msg); } }).addLast("h2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("2"); super.channelRead(ctx, msg); } }).addLast("h3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("3"); nc.writeAndFlush("111"); super.channelRead(ctx, msg); } }).addLast("h4", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("4"); super.write(ctx, msg, promise); } }).addLast("h5", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("5"); super.write(ctx, msg, promise); } }).addLast("h6", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("6"); super.write(ctx, msg, promise); } }); } }) .bind(8080);
} }
###### 客户端发送数据后 ##### 服务端: 15:28:06.481 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 1 15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 2 15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 3 15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 6 15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 5 15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 4
|
为什么要写Handler? —> 业务处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| {...}.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nc) throws Exception { nc.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; String jsonString = buf.toString(StandardCharsets.UTF_8); super.channelRead(ctx, jsonString); } }).addLast("h2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object jsonString) throws Exception { String s = (String) jsonString; char[] chars = s.toCharArray(); super.channelRead(ctx, chars); } }).addLast("h3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { nc.writeAndFlush(msg); } }).{.....}
|
EmbeddedChannel
用来测试的内部Channel,里面可以绑定多个channel,这样就不用模拟一个服务端和客户端了。
- 构造器里面按顺序传handler
wirteInbound
: 模拟入站
writeOutbound
:模拟出站
ByteBuf
创建
1
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
|
- 创建一个默认的ByteBuf(池化基于直接内存的ByteBuf),初始容量为256(默认,也可以自己指定)
1 2 3 4 5 6 7 8 9 10 11 12
| public static void log(ByteBuf buf) { int length = buf.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder sb = new StringBuilder(rows * 80 * 2) .append("readIdx: ").append(buf.readerIndex()) .append(" writeIdx: ").append(buf.writerIndex()) .append(" capacity: ").append(buf.capacity()) .append(NEWLINE); appendPrettyHexDump(sb, buf); System.out.println(sb.toString()); }
|
直接内存和堆内存
直接内存分配慢,但读取快,要手动释放或归还,默认直接内存
堆内存分配快,读取慢,容易受垃圾回收影响
初始化堆内存
1
| ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
|
初始化直接内存
1
| ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();
|
池化和非池化
池化的最大意义在ByteBuf可以重用,优点是:
- 没有池化,每次都要创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会加重GC负担
- 有池化,重用池中的ByteBuf实例,采用了与jemalloc类似的内存分配算法提高分配效率
- 高并发时,池化功能更加节约内存,减少内存溢出的可能性
池化功能是否开启,可以 通过下面的系统环境变量来设置。
1
| -Dio.netty.allocator.type={unpooled|pooled}
|
- 4.1 以后池化是默认开启
- 直接内存和堆内存都有池化功能
组成
- 灰色是已读部分
- 绿箭头是读指针,绿色是未读已写
蓝箭头是写指针,蓝色是可写部分
橙色区域是可扩容容量。容量按需扩容。
写入
- writeBoolean(boolean bool):写入一个字节(0:false;1:true)
- writeByte(int val)
- writeInt(int value):小端写入(先写低位字节)
- writeIntE(int value)::大端写入(先写高位字节),网络编程一般采用大端
- ….
- writeBytes(byte[] bytes)
- writeBytes(ByteBuf buf)
- int writeCharSequence(CharSequence chars, Charset charset):写入字符串
注意:
- 除了最后一个,其余方法都是返回ByteBuf
- 网络编程一般都是大端写入
扩容
- 写入的数据大小未超过512字节,按照16的整数倍扩容(写入33,扩容成16 * 3 = 48)
- 写入的数据大小超过512字节,按照2的n次方扩容(写入513,扩容成2的10次方=1024)
读取
- readByte() :每次读取一个字节,读过的就废弃了,不能再读了
- markReadIndex():标记
- reset():读指针重置到标记
retain && release
Netty中有堆外内存的的ByteBuf实现,堆外内存最好手动释放,而不是等GC回收。
- UnpooledHeapByteBuf使用JVM内存,等GC回收内存就好
- UnpooledDirectByteBuf使用的就是直接内存,可以等GC间接回收,但不建议,需要特殊方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更负责的机制来回收内存
回收内存的源码实现,下列方法的不同实现
abstract void deallocate()
Netty使用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
- 每个ByteBuf对象初始计数为1
- 调用release方法,计数减1,计数为0则回收ByteBuf
- 调用retain方法,计数加1,免得其他handler调用release影响到我正在使用的buf
- 当计数为0,底层内存内回收,即使ByteBuf对象还在,也不能使用了
谁来负责release?
谁最后负责ByteBuf,谁负责ByteBuf的释放。
- 如果ByteBuf传递到header和tail,它们会处理
- 如果ByteBuf没到最后(可能是已经转成了字符串),那么就不会处理
Slice
【零拷贝】的体现之一。对原始的ByteBuf进行切片,切片后的ByteBuf没有进行内存复制,还是沿用原始的ByteBuf内存,切片后的ByteBuf维护独立的read,write指针。
slice(int idx, int len)
:切片过程中没有发生数据复制(非常牛逼!!!)
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class TestByteBufSlice { public static void main(String[] args) { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'1','2','3','4','5','6','7','8','9','a'}); ByteBuf buf1 = buf.slice(0, 4); ByteBuf buf2 = buf.slice(4, 6); log(buf1); buf2.setByte(2, 'x'); log(buf2); log(buf); } }
|
问题:切片后要是想加额外的字节,岂不是乱套了?
答:不允许添加, 要加限制,原始内存不能释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class TestByteBufSlice { public static void main(String[] args) { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'1','2','3','4','5','6','7','8','9','a'}); ByteBuf buf1 = buf.slice(0, 4); buf1.retain(); ByteBuf buf2 = buf.slice(4, 6); buf.retain(); log(buf1); buf1.release(); buf2.setByte(2, 'x'); log(buf2); buf2.release(); log(buf); } }
|
duplicate && copy
composite
零拷贝,把ByteBuf整合。和writeBytes
深拷贝对应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class TestByteComposite { public static void main(String[] args) { ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(); buf1.writeBytes(new byte[]{1, 2, 3, 4, 5}); buf1.retain(); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(); buf2.retain(); buf2.writeBytes(new byte[]{6, 7, 8, 9, 10}); ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer(); buf3.retain(); buf3.writeBytes(new byte[]{11, 12, 13, 14, 15});
CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true, buf1, buf2); buf1.release();buf2.release(); compositeByteBuf.addComponent(buf3); compositeByteBuf.addComponent(true, buf3); buf3.release(); log(compositeByteBuf); } }
|
注意
使用零拷贝,最好都要retain和release。
UDP的简单实现(纯原创,欢迎交流)
上面都是TCP,咱们参照github,写一段UDP的链接,Netty-github官网。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Slf4j public class UDPServer { public static void main(String[] args) { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(eventLoopGroup) .channel(NioDatagramChannel.class) .handler(new UDPServerHandler()); Channel udpChannel = b.bind(8888).sync().channel(); log.info("开启线程...."); udpChannel.closeFuture().await(); } catch (Exception e) { log.error(e.getMessage()); } finally { log.error("完蛋了,服务端挂了....."); eventLoopGroup.shutdownGracefully(); } } }
public class UDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket udpPack) throws Exception { log.info(udpPack.toString()); ByteBuf content = udpPack.content(); ByteBuf xxlx_buf = content.slice(0, 4); xxlx_buf.retain(); int xxlx = xxlx_buf.readInt(); log.info(String.valueOf(xxlx)); xxlx_buf.release(); ByteBuf msg_buf = content.slice(4, content.writerIndex()); msg_buf.retain(); log.info(msg_buf.toString(CharsetUtil.UTF_8)); msg_buf.release(); ctx.write( new DatagramPacket( Unpooled.wrappedBuffer("success".getBytes(CharsetUtil.UTF_8)) , udpPack.sender() ) ); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| @Slf4j public class UDPClient { public static <T> void send(String ip, int port, T obj) { send(ip, port, obj, (data) -> { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(128); buf.writeInt(5514); int count = 0; while (count++ < 124) { buf.writeByte('x'); } return buf; }); }
private static <T> void send(String ip, int port, T obj, PackageFunc<T> func) { InetSocketAddress address = new InetSocketAddress(ip, port); Bootstrap client = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop watcher = group.next(); DefaultPromise<Object> promise = new DefaultPromise<>(watcher); try { client.group(group) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, false) .handler(new ChannelInitializer<NioDatagramChannel>() { @Override protected void initChannel(NioDatagramChannel ch) throws Exception { ch.pipeline().addLast("watcher", new SimpleChannelInboundHandler<DatagramPacket>() { @Override protected void channelRead0(ChannelHandlerContext cxt, DatagramPacket udpPack) throws Exception { ByteBuf content = udpPack.content(); String s = content.toString(CharsetUtil.UTF_8); log.info(s); if ("success".equals(s)) { promise.setSuccess("success"); } } }); } }); Channel channel = client.bind(0).sync().channel(); channel.writeAndFlush(new DatagramPacket(func.doPack(obj), address)).sync();
AtomicInteger count = new AtomicInteger(0); while (count.getAndIncrement() < 3) { boolean await = promise.await(7, TimeUnit.SECONDS); if (await) { log.info("发送成功!"); break; } else { log.error("第{}次发送失败,正在重试...", count.get()); channel.writeAndFlush(new DatagramPacket(func.doPack(obj), address)).sync(); } } if (count.get() > 3) log.error("草拟吗连不上啊!!"); channel.close(); }catch (Exception e) { log.error(e.getMessage()); } finally { group.shutdownGracefully(); watcher.shutdownGracefully(); } }
public static void main(String[] args) { send("localhost", 8888, null); } }
|