netty心跳检查之udp篇 -欧洲杯足彩官网

`

netty心跳检查之udp篇

  部分udp通信场景中,需要客户端定期发送心跳信息,以获取终端的状态,并获取终端ip,以便服务器主动发送控制命令。如移动通信,内网穿越等。
  使用tcp方式通信,心跳是比较容易实现的,使用idlestatehandler监控channel,然后在自定义的handler中处理几个对应的事件就可以了。但是对于udp,就不灵了。

  学习研究netty,做了一个简单而完善的例子:通过udp通信,客户端上线,发送一条信息,服务器响应(不在handler中响应,在其他线程中处理)。服务器主动向客户端发问候消息,监控到无心跳后,踢掉客户端。
  程序逻辑比较简单,不多解释,请看注释。

一、辅助类
package com.wallimn.iteye.netty.heart;
import java.net.inetsocketaddress;
import io.netty.util.attributekey;
/**
 * 记录一些常量。真正的应用要从配置文件中读取。
 * 
 * 
*
时间:2019年9月14日 下午11:41:26,作者:wallimn */ public class config { private config(){}; public static final attributekey server_addr_attr=attributekey.newinstance("server_addr_attr"); //原来打算将客户端的id记录在channel的属性中,后来发现对于udp不适用。 //public static final attributekey client_id=attributekey.newinstance("client_id"); public static final int idle_time=5;//允许的发呆时间 public static final int server_port=8585; public static final string server_ip="localhost"; public static final long client_valid_threshold=5000;//客户端地址有效的时间阀值。单位为毫秒。 }


package com.wallimn.iteye.netty.heart;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.concurrentlinkedqueue;
import java.util.concurrent.concurrentmap;
/**
 * 用来模拟持有数据
 * 
 * 
*
时间:2019年9月14日 下午7:58:40,作者:wallimn */ public class dataholder { private dataholder(){} /** * 记录客户端的消息 */ public static concurrentlinkedqueue clientmessagequeue = new concurrentlinkedqueue(); /** * 记录由心跳获取的客户端地址,用于服务器主动给客户端发消息 */ public static concurrentmap clientinformationmap = new concurrenthashmap(); }


package com.wallimn.iteye.netty.heart;
import java.net.inetsocketaddress;
import java.util.date;
import lombok.data;
/**
 * 客户端信息
 * 
 * 
*
时间:2019年9月14日 下午8:55:36,作者:wallimn */ @data public class clientinformation { /** * 客户端唯一标识 */ private string id; /** * 收到时间 */ private date recordtime; /** * 客户端地址, */ private inetsocketaddress address; }


package com.wallimn.iteye.netty.heart;
import lombok.data;
/**
 * 客户端发来的消息
 * 
 * 
*
时间:2019年9月14日 下午8:55:36,作者:wallimn */ @data public class clientmessage { /** * 消息 */ private string message; /** * 客户端信息 */ private clientinformation client; }


二、客户端代码
package com.wallimn.iteye.netty.heart;
import java.net.inetsocketaddress;
import java.util.uuid;
import io.netty.buffer.unpooled;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.channel.socket.datagrampacket;
import io.netty.util.charsetutil;
/**
 * 客户端处理器(handler),并无太多逻辑。仅发了一条访问时间的信息(time)、读取服务器信息并显示。
 * 
 * 
*
时间:2019年9月14日 下午9:09:47,作者:wallimn */ public class clienthandler extends simplechannelinboundhandler { @override protected void channelread0(channelhandlercontext ctx, datagrampacket packet) throws exception { string msg = packet.content().tostring(charsetutil.utf_8); system.out.println(msg); //如果收到exit信息,关闭channel if("exit".equals(msg)){ ctx.close(); } } @override public void channelactive(channelhandlercontext ctx) throws exception { inetsocketaddress addr = ctx.channel().attr(config.server_addr_attr).get(); string clientid; string message; //发送1条心跳 clientid = uuid.randomuuid().tostring().touppercase().replace("-", ""); // message = clientid ";" "heart";//发送的信息 // ctx.writeandflush(new datagrampacket(unpooled.copiedbuffer(message,charsetutil.utf_8),addr)); // system.out.println("发送一条心跳");//不用专门发心跳信息,任何发到服务器的信息都可以用于服务器更新心跳记录 message = clientid ";" "time";//发送的信息 ctx.writeandflush(new datagrampacket(unpooled.copiedbuffer(message,charsetutil.utf_8),addr)); system.out.println("发送对时信息"); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { cause.printstacktrace(); ctx.close(); } }


package com.wallimn.iteye.netty.heart;
import java.net.inetsocketaddress;
import io.netty.bootstrap.bootstrap;
import io.netty.channel.channel;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.niodatagramchannel;
/**
 * 客户端程序
 * 启动命令:java -classpath .;netty-all-4.1.38.final.jar com.wallimn.iteye.netty.heart.clientapp
 * 
 * 
*
时间:2019年9月14日 上午8:58:13,作者:wallimn */ public class clientapp { public static void main(string[] args) { int port = config.server_port; new clientapp().run(port); } public void run(int port){ eventloopgroup group = new nioeventloopgroup(); bootstrap b = new bootstrap(); b.group(group) .channel(niodatagramchannel.class) //.option(channeloption.so_broadcast, true) .handler(new clienthandler()); inetsocketaddress addr = new inetsocketaddress(config.server_ip,port); b.attr(config.server_addr_attr, addr); try { channel ch = b.bind(0).sync().channel();//使用一个随机端口 //最长运行30秒 if(!ch.closefuture().await(30000)){ system.out.println("操作超时"); } system.out.println("退出"); } catch (interruptedexception e) { e.printstacktrace(); } finally{ group.shutdowngracefully(); } } }


三、服务器端代码
package com.wallimn.iteye.netty.heart;
import java.util.date;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.channel.socket.datagrampacket;
import io.netty.util.charsetutil;
/**
 * 服务器处理器(handler)
 * 
 * 
*
时间:2019年9月15日 上午8:37:15,作者:wallimn */ public class serverhandler extends simplechannelinboundhandler { @override protected void channelread0(channelhandlercontext ctx, datagrampacket packet) throws exception { system.out.println("读取信息,channelshortid=" ctx.channel().id().asshorttext()); string msg = packet.content().tostring(charsetutil.utf_8); system.out.println("host: " packet.sender().gethoststring()); system.out.println("port: " packet.sender().getport()); system.out.println("content: " msg); string[] fields = msg.split(";"); if (fields.length != 2) { return; } clientinformation client = new clientinformation(); client.setid(fields[0]); client.setrecordtime(new date()); client.setaddress(packet.sender()); clientmessage message = new clientmessage(); message.setclient(client); message.setmessage(fields[1]); system.out.println("加入待处理数据队列"); //标注客户端的id // 不对消息进行处理,只是加入队列,由其他线程进行处理 if(!"heart".equals(message.getmessage())){//如果不是心跳消息 dataholder.clientmessagequeue.add(message); } //不管什么消息,更新客户端的信息 dataholder.clientinformationmap.put(client.getid(), client); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { super.exceptioncaught(ctx, cause); } //这个方法对于udp没有什么意义 // @override // public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception { // if (evt instanceof idlestateevent) { // idlestateevent e = (idlestateevent) evt; // switch (e.state()) { // case reader_idle: // system.out.println("reader_idle"); // break; // case writer_idle: // system.out.println("writer_idle"); // break; // case all_idle: // system.out.println("all_idle"); // break; // default: // break; // } // } // } }


package com.wallimn.iteye.netty.heart;
import java.util.date;
import java.util.iterator;
import java.util.map.entry;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
import io.netty.bootstrap.bootstrap;
import io.netty.buffer.unpooled;
import io.netty.channel.channel;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.datagrampacket;
import io.netty.channel.socket.nio.niodatagramchannel;
import io.netty.util.charsetutil;
import io.netty.util.hashedwheeltimer;
import io.netty.util.timeout;
import io.netty.util.timertask;
/**
 * 服务器应用
 * 启动命令:java -classpath .;netty-all-4.1.38.final.jar com.wallimn.iteye.netty.heart.serverapp
 * 
*
* 时间:2019年9月14日 上午9:47:29,作者:wallimn */ public class serverapp { //public static concurrentmap clientmessagemap = new concurrenthashmap(); // 内部放置多个task, public static hashedwheeltimer timer = new hashedwheeltimer(executors.defaultthreadfactory(), 1, // tick一下的时间 timeunit.seconds, 3);// 放置timer的数量 public static channel channel = null; public static final long response_timeer_delay = 1l; public static final long check_timeer_delay = 5l; public static final long hello_timeer_delay = 2l; /** * 处理客户端发来的消息 */ public static timertask responsetasker = new timertask() { public void run(timeout timeout) throws exception { timer.newtimeout(this, response_timeer_delay, timeunit.seconds); if (channel == null){ system.out.println("channel is null"); return; } if (channel.isactive() == false) { system.out.println("channel is inactive"); timeout.cancel(); return; } //system.out.println("responsetasker run, size is " dataholder.clientmessagequeue.size()); clientmessage message; for (iterator iterator=dataholder.clientmessagequeue.iterator();iterator.hasnext();) { message = iterator.next(); system.out.println(message.getmessage()); if("time".equals(message.getmessage())){ channel.writeandflush( new datagrampacket(unpooled.copiedbuffer("18:18", charsetutil.utf_8), message.getclient().getaddress())); } else{ ; } //处理完后清除 iterator.remove(); } } }; /** * 用于检查客户端是否有效 */ public static timertask checktasker = new timertask() { public void run(timeout timeout) throws exception { timer.newtimeout(this, check_timeer_delay, timeunit.seconds); clientinformation client = null; long now = new date().gettime(); for (entry entry : dataholder.clientinformationmap.entryset()) { client = entry.getvalue(); if (now - client.getrecordtime().gettime() > config.client_valid_threshold) { system.out.println("client kick : " client.getid()); dataholder.clientinformationmap.remove(entry.getkey()); } } } }; /** * 用于模拟主动向客户端发送消息 */ public static timertask hellotimer = new timertask(){ public void run(timeout timeout) throws exception { if (channel == null){ system.out.println("channel is null"); return; } if (channel.isactive() == false) { system.out.println("channel is inactive"); timeout.cancel(); return; } timer.newtimeout(this, hello_timeer_delay, timeunit.seconds); clientinformation client = null; for (entry entry : dataholder.clientinformationmap.entryset()) { client = entry.getvalue(); system.out.println("hellotimer run. send to " client.getid()); channel.writeandflush( new datagrampacket(unpooled.copiedbuffer("hello", charsetutil.utf_8), client.getaddress())); } } }; public static void main(string[] args) throws exception { int port = config.server_port; timer.newtimeout(responsetasker, response_timeer_delay, timeunit.seconds); timer.newtimeout(checktasker, check_timeer_delay, timeunit.seconds); timer.newtimeout(hellotimer, hello_timeer_delay, timeunit.seconds); new serverapp().run(port); } public void run(int port) throws exception { eventloopgroup group = new nioeventloopgroup(); bootstrap b = new bootstrap(); b.group(group).channel(niodatagramchannel.class) // .option(channeloption.so_broadcast, true) .handler(new channelinitializer(){ @override protected void initchannel(channel ch) throws exception { //ch.pipeline().addlast(new idlestatehandler(5, 0, 0)); ch.pipeline().addlast(new serverhandler()); } }); channelfuture future = b.bind(port).sync(); channel = future.channel(); system.out.println("服务器准备就绪,channelshortid=" channel.id().asshorttext()); channel.closefuture().await(); group.shutdowngracefully(); } }



2
0
分享到:
评论

相关推荐

    springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...

    spring整合netty心跳检测,spring整合netty心跳检测,spring整合netty心跳检测,spring整合netty心跳检测

    java异步nio框架netty实现高性能高并发,通过netty搭建tcp、udp服务,支持物联网设备上行,下行

    这个小程序使用netty5进行udp网络通讯,客户端有两种,1:用netty5类库发送datagrampacket和接收 2:直接使用datagramsocket发送接收datagrampacket 先运行netty_server的quoteofthemomentserver, 在运行netty_...

    netty案例,netty4.1中级拓展篇八《netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=mzixmdawmdaxmw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect

    基于netty 的udp字节数据接 收服务,发送服务实例 基于netty 的udp字节数据接收服务,发送服务实例

    基于netty的心跳检测技术,测试过可以在java端和android上使用

    本源码是《nio框架入门(三):ios与mina2、netty4的跨平台udp双向通信实战》一文的服务端源码实现(netty4版),详见:http://www.52im.net/thread-378-1-1.html

    netty心跳检测机制

    非常简单明了的netty心跳检测代码,标准的eclipse工程代码,导入后即可运行。由于代码很精简,可以轻松学会netty心跳检测的原理

    这是更具netty的一个demo自己再修改一下 有问题可以联系我

    java实现基于netty 的utp字节数据接收服务,服务具体实现代码。样例java实现基于netty 的utp字节数据接收服务,服务具体实现代码。样例

    netty的udp通信心得

    netty 心跳实现

    netty案例,netty4.1基础入门篇十一《netty udp通信方式案例demo》源码 https://mp.weixin.qq.com/s?__biz=mzixmdawmdaxmw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect

    scanfish-ii 型声呐系统数据接口协议,对接tcp转发app,json封装

    断电断网的心跳检测,完美解决了websocket断电断网之后服务端不能收到关闭的通知,倒置客户端不能收到信息

    面试官:netty心跳检测机制是什么,怎么自定义检测间隔时间?.doc

    java的netty实现的可靠udp网络库

    netty心跳连接代码

global site tag (gtag.js) - google analytics
网站地图