Netty高级用法-EventLoop实现定时调度任务
整个8月没写一篇技术文章,特地趁着空闲整理出点东西
应用场景:在netty通讯服务里个整个任务,当设备一连上服务器,就可以通过定时器向设备发送从其他消息队列接收到的消息(类如kafka等),但要保证对目前的netty性能不产生大影响。
幸好之前有创建过netty搭建的IM通讯服务,所以拿他当成自己的demo项目(如何没看过可以查看我前面的IM通讯文章)
实现要点如下
一.改造ChannelInboundHandlerAdapter(渠道绑定拦截器适配器)内容
1.主要方法有以下几个
//channel添加方法
handlerAdded(ChannelHandlerContext ctx)
//channel移除时
handlerRemoved(ChannelHandlerContext context)
//channel活跃时
channelActive(ChannelHandlerContext ctx)
//channel接收到信息时
channelRead(ChannelHandlerContext ctx, Object msg)
//channel异常时
exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
其中handlerAdded先执行,channelActive后执行,handlerRemoved在失去连接时执行,channel异常时
二实现业务场景
1.创建类继承ChannelInboundHandlerAdapter,创建一个静态组
public class ServerHandlers extends ChannelInboundHandlerAdapter { private static ChannelGroup channels =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); ... }
2.在handlerAdded方法将channel加入channels
//加入ChannelGroup if(加入条件){ channels.add(ctx.channel()); } log.info(ctx.channel().id()+"的设备上线"+"Online:"+channels.size());
3.将定时任务Task放在channelActive方法里
//启动调度任务 Task task =new Task(ctx.channel()); //判断条件 if(channels.contains(ctx.channel())){ ctx.channel().eventLoop().scheduleAtFixedRate(task, 1,3L, TimeUnit.SECONDS); }
定时任务暂时如下:
public class Task implements Runnable { @Override public void run() { System.out.printf("任务开始\n"); try { TimeUnit.SECONDS.sleep(2); channel.writeAndFlush("服务器端推送"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("任务结束\n"); } public Task(){ } public Task(Channel channel){ this.channel =channel; } }
结果如下:
三.额外补充
注意:1.查看很多文章说放在redis里比较合适,但channelId其实是无法实例化的(可能用的不是ID但是复杂),所以采用了静 态组概念
2.Netty 限制每个Channel 都由一个 Thread 处理,这种设计适用于非阻塞 IO 操作,但是NETTY本身是支持多线程
3.异常处理方法可以exceptionCaught方法处理,代码如下
@Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)throws Exception{ //移除静态组 channels.remove(ctx.channel()); //关闭调度任务 ctx.channel().eventLoop().shutdownGracefully(); log.info(ctx.channel().id()+"的设备出错"+"Online:"+channels.size()); //关闭链接 ctx.close(); }
可以 在handlerRemoved()方法也添加此方法
4.channel的ID实例化过程是随机值,注意1一点提到无法实例化,这里的判断条件就很难根据ID限定,但我们要根据实际业务变通,类如我的业务在设备链接时,需要发送心跳包检查的,发送的信息里由包含设备参数ID的,这样我可以将channels的add方法放在channelRead,根据传递msg将适合参数的ID的添加。这样定时任务会触发在channelRead方法之后,一样实现了业务
四 jmeter测试Netty长连接服务
1创建线程组
2.创建TCP取样器,保持长连接(也可以大小断言,通过断言结果对断言进行采样)
3创建固定定时器
4.创建聚合报告
测试开始!
而运行段后出现TCP链接掉线情况,很多都是由于超时(2S)
检查原因,发现Netty客户端所读取的信息如下
出现由于线程过多出现信息发送粘连问题,这里暂时没深入研究,下次可以有机会仔细研究
信息粘连的最佳解决方案:
原理
channel之前的message实际上是ByteBuf,我们可以规定以某些结尾(也可以自定义),这里直接使用base(以分隔符“\r\n”为分割的),最大是1024字节,这里是测试结果
修改后客户端输入:
服务端无显示:
加入"\r\n"后:
错误集锦:
1.org.apache.jmeter.protocol.tcp.sampler.ReadException: Error reading from server, bytes read: 0
jmeterTCP协议读取异常
2.
eventLoop循环事件在没有监听的情况下停止,有个观点是eventLoop和channel其实是绑定关系,chanenl在设备下线时未将channel移除,也可以采用
ScheduledFuture<?> future=ch.cancel(false);关闭循环任务
少量客户端没有出现此类问题,却在jmeter得到了这么多问题,问题越多就越接近真实。。。