第二次课:netty学习
分类: springboot vue 专栏: 物联网项目 标签: netty学习
2023-05-29 23:38:36 982浏览
Java共支持3种网络编程模型/IO模式:BIO、NIO、AIO
BIO
同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
最初的写法
public class BioServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(9000); while(true){ System.out.println("等待连接"); //阻塞方法 Socket clientSocket = serverSocket.accept(); System.out.println("有客户端连接了"); handler(clientSocket);//处理客户端连接 } } private static void handler(Socket clientSocket) { byte[] bytes= new byte[1024]; System.out.println("准备reade.."); //接受客户端的数据,阻塞方法,没有数据可读时就阻塞 int read = 0; try { read = clientSocket.getInputStream().read(bytes); System.out.println("read完毕"); if (read!=-1) { System.out.println("接收到客户端的数据:"+new String(bytes,0,read)); } System.out.println("end.."); } catch (IOException e) { e.printStackTrace(); }finally { try { clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
存在的问题:只要是高并发过来,压根没法处理。
多线程写法
new Thread(new Runnable() { @Override public void run() { try { handler(clientSocket);//处理客户端连接 } catch (Exception e) { e.printStackTrace(); }finally { } } }).start();
存在的问题:c10k(客户端同时来了1万个连接)问题,以上方式虽然可以解决阻塞问题,但又有新问题,高并发的时候要创建很多个线程(假如是c10M呢)。造成内存溢出问题(oom)
线程池写法
上述问题的解决方案:用线程池的方式(设置pool固定大小),这样的话也有弊端——就是支持的并发数受到了线程池的大小限制,另外还有一个问题,就是假设线程池设置的是500,已有的500个客户端连接都只连接不发消息的话,就导致这500个线程全部阻塞住了,第501个客户端即使等再长的时间都连不上。因为没有一个线程池连接空闲出来。
newCachedThreadPool.execute(new Runnable() { public void run() { //我们重写 handler(clientSocket);//处理客户端连接 } });
适用场景
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。
NIO
同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理 。
最初的写法
public class NioServer { //保存客户端连接 static List<SocketChannel> channelList = new ArrayList<>(); public static void main(String[] args) throws Exception { //创建nio ServerSocketChannel,与bio的serversSocket类似 ServerSocketChannel serverSocket=ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(9000)); //设置ServerSocketChannel为非阻塞 serverSocket.configureBlocking(false); System.out.println("服务器端启动成功"); while (true){ //非阻寒模式accept方法不会阻塞,否则会阻塞 // NIO的非阻寨是由操作系统内部实现的,底层调用了linux内核的accept丽数 SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { //如果有客户端进行连接 System.out.println("连接成功"); //设置SocketChanneL为非阻塞 socketChannel.configureBlocking(false); //保存客户端连接在List中 channelList.add(socketChannel); } //遍历连接进行数据读取 Iterator<SocketChannel> iterator =channelList.iterator(); while (iterator.hasNext()) { SocketChannel sc = iterator.next(); ByteBuffer byteBuffer = ByteBuffer. allocate(6); //非阻塞模式read方法不会阻寨,否则会阻寨 int len = sc.read(byteBuffer); //如果有数据,把数据打印出来 if(len>0){ System.out.println("接收到消息:"+ new String(byteBuffer.array())); } else if (len == -1) { //如果客户端断开,把socket从集合中去掉 iterator. remove(); System.out.println("客户端断开连接"); } } } } }
存在的问题:空转,浪费cpu,假设有1万个客户端与服务端连接上了,但只有一个跟服务端之间有通讯,为了监控这个客户端的通讯,不得不遍历循环1万个连接,这就对cpu造成了极大的浪费。
解决方案:把有数据收发的连接单独拿出来,只对有数据收发的连接进行遍历,当处理完后,将线程阻塞,让出cpu
多路复用器写法
public class NioSelectorServer { public static void main(String[] args) throws Exception { //创建nio ServerSocketChannel,与bio的serversSocket类似 ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9000)); //设置ServerSocketChannel为非阻塞 serverSocketChannel.configureBlocking(false); //打开selector Selector selector = Selector.open(); //把ServerSocketChanneL注册到selector上,并iselector对客户端accept连接操作感兴趣 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//监听连接事件 System.out.println("服务启动成功"); while (true){ selector.select(); // 阻塞等待需要处理的事件发生 //获取selector中注册的全部事件的SelectionKey实例 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); // 可接收连接 能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 接受客户端连接 SocketChannel client = server.accept(); client.configureBlocking(false); // 设置客户端通道非阻塞 // 为客户端通道注册 OP_READ 事件 client.register(selector,SelectionKey.OP_READ); System.out.println("客户端连接成功"); } // 可读数据 if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer byteBuffer =ByteBuffer.allocate(6); int len = client.read(byteBuffer); //如果有数据,把数据打印出来 if(len>0){ System.out.println("接收到消息:"+ new String(byteBuffer.array())); } else if (len == -1) { //如果客户端断开,把socket从集合中去掉 System.out.println("客户端断开连接"); client.close(); } } //从事件集合中删除本次处理的key,防止下次select重复处理 iterator.remove(); } } } }
适用场景
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
缺点
NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。
AIO
Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
适用场景
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
netty
什么是netty
Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。
Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。
Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景
Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程
Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
Netty的应用场景
互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架(远程过程调用)使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。
Netty 作为高性能的基础通信组件,提供了 TCP/UDP 和 HTTP 协议栈,方便定制和开发私有协议栈,账号登录服务器。 地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
Netty的优点
Netty 对 JDK 自带的 NIO 的 API 进行了封装。
- 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;
- 基于灵活且可扩展的事件模型,可以清晰地分离关注点;
- 高度可定制的线程模型 - 单线程,一个或多个线程池;
- 使用方便:详细记录的 Javadoc,用户指南和示例;
- 没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
- 高性能、吞吐量更高:延迟更低;
- 减少资源消耗;
- 最小化不必要的内存复制。
- 安全:完整的 SSL/TLS 和 StartTLS 支持。
- 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入
单机模式下都就可以承受百万级别的并发
springboot集成netty
依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.77.Final</version> </dependency>
服务端
// netty server类 @Component public class NettyServer { @Value("${netty-port}") private int port; public void start() throws InterruptedException { /** * 创建两个线程组 bossGroup 和 workerGroup * bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成 * 两个都是无线循环 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组 bootstrap.group(bossGroup, workerGroup) //使用NioServerSocketChannel 作为服务器的通道实现 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 .childOption(ChannelOption.TCP_NODELAY, true) //可以给 bossGroup 加个日志处理器 .handler(new LoggingHandler(LogLevel.INFO)) //给workerGroup 的 EventLoop 对应的管道设置处理器 .childHandler(new ChannelInitializer<SocketChannel>() { //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器 pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器 pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2)); pipeline.addLast(new NettyServerHandler()); } }); //启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象 ChannelFuture cf = bootstrap.bind(port).sync(); if (cf.isSuccess()) { System.out.println("socket server start---------------"); } //对关闭通道进行监听 cf.channel().closeFuture().sync(); } finally { //发送异常关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } // handler类 public class NettyServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class); protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception { log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj); SocketChannel socketChannel = (SocketChannel) context.channel(); /** * 服务器返回客户端消息 */ Map map = new HashMap(); map.put("msg","我是服务端,收到你的消息了"); socketChannel.writeAndFlush(JSON.toJSONString(map)); ReferenceCountUtil.release(obj); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } // springboot 集成启动 netty server,同时不影响tomcat接口 @Component public class NettyBoot implements CommandLineRunner { @Autowired private NettyServer nettyServer; public void run(String... args) throws Exception { try { nettyServer.start(); } catch (Exception e) { e.printStackTrace(); } } }
客户端
// netty client客户端 @Component public class NettyClient { private int port = 9999; private String host = "localhost"; public Channel channel; public void start() { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器 pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器 pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2)); pipeline.addLast(new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); if (future.isSuccess()) { channel = future.channel(); System.out.println("connect server 成功---------"); } // 给关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } public void sendMsg(String msg) { this.channel.writeAndFlush(msg); } } // handler处理类 public class NettyClientHandler extends SimpleChannelInboundHandler<Object> { private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>连接"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>退出"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { log.info(">>>>>>>>>>>>>userEventTriggered:{}", evt); } /** * 客户端接收到服务端发的数据 * @param channelHandlerContext * @param obj * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) { log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj); ReferenceCountUtil.release(obj); } /** * socket通道处于活动状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>>socket建立了"); super.channelActive(ctx); } /** * socket通道不活动了 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>>socket关闭了"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } // springboot集成,同时不影响tomact接口 @Component public class NettyBoot implements CommandLineRunner { @Autowired private NettyClient nettyClient; public void run(String... args) throws Exception { nettyClient.start(); } }
补充:项目启动的时候就把netty-server也启动起来(除了上面的写法外,还有一种常用写法)
// 服务器端启动,并绑定 19080 端口 //@PostConstruct是Java自带的注解,在方法上加该注解会在项目启动的时候执行该方法, // 也可以理解为在spring容器初始化的时候执行该方法。 @PostConstruct
服务端主动给客户端发消息
在server的handler里加ChannelGroup属性
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static ChannelGroup getChannels() { return channelGroup; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //客户端上线了 Channel channel = ctx.channel(); channelGroup.add(channel); log.info(ctx.channel().remoteAddress()+"上线了"); }
ChannelGroup channelGroup= ServerHandler.getChannels(); Iterator channelIterator = channelGroup.iterator(); while(channelIterator.hasNext()){ Channel channel = (Channel)channelIterator.next(); InetSocketAddress insocket = (InetSocketAddress)channel.remoteAddress(); //给指定客户端ip发送消息 if(ip.equals(insocket.toString())){ channel.writeAndFlush("hello server I am client"); } }
聊天im小案例
server服务端
/** * @author:xiaojie * @create: 2023-05-29 22:46 * @Description: 聊天服务端 */ public class ImServer { private static EventLoopGroup bossGroup = new NioEventLoopGroup(1); private static EventLoopGroup workerGroup = new NioEventLoopGroup(); public static void main(String[] args) { try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组 bootstrap.group(bossGroup, workerGroup) //使用NioServerSocketChannel 作为服务器的通道实现 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 .childOption(ChannelOption.TCP_NODELAY, true) //给workerGroup 的 EventLoop 对应的管道设置处理器 .childHandler(new ChannelInitializer<SocketChannel>() { //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器 pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器 pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2)); pipeline.addLast(new ImServerHandler()); } }); //启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象 ChannelFuture cf = null; cf = bootstrap.bind(9000).sync(); if (cf.isSuccess()) { System.out.println("socket server start---------------"); } //对关闭通道进行监听 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //发送异常关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
服务端处理器
public class ImServerHandler extends SimpleChannelInboundHandler<Object> { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //客户端跟服务端连接上后触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将该客户加入聊天的信息推送给其它在线的客户瑞 //该方法会将channeLGroup 中所有的channel 遍历。井发送消息 channelGroup.writeAndFlush("[ 客户端] "+ channel.remoteAddress() +"上线了 "+ sdf.format (new Date()) + "\n"); //将当前channel 加入到channeLGroup channelGroup.add(channel); System.out.println(ctx.channel().remoteAddress() +"上线了"+ "\n"); } //读取客户端数据 @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { //获取到当前channeL Channel channel = ctx.channel(); //这时我们遍历channeLGroup, 根据不同的情况。回送 不同的消息 channelGroup.forEach(ch -> { if (channel != ch) { //不是当前的channel,转发消息 ch.writeAndFlush( "[客户瑞]"+ channel.remoteAddress() +"发送了消息:"+ msg + "\n" ); } else {//回显自己发送的消息给自己 ch.writeAndFlush("[自己]发送了消息:" + msg + "\n"); } }); } //表示channel处于不活动状态,提示离线 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将客户离开信息推送给当前在线的客户 channelGroup.writeAndFlush("[ 客户端] "+ channel.remoteAddress() +"下线了 "+ sdf.format (new Date()) + "\n"); System.out.println(channel.remoteAddress()+"下线了"+"\n"); } }
client客户端
public class ImClient { private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); private static int port = 9000; private static String host = "localhost"; public static Channel channel; public static void main(String[] args) { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器 pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器 pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2)); pipeline.addLast(new ImClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); if (future.isSuccess()) { channel = future.channel(); System.out.println("=====" + channel.localAddress() + "==="); //客户端需要输入信息。 创建一个扫描源 Scanner scanner = new Scanner(System.in) ; while (scanner.hasNextLine() ) { String msg = scanner.nextLine(); //通过channeL发送到服务器端 channel.writeAndFlush(msg); } } // 给关闭通道进行监听 future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } }
客户端处理器
public class ImClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg.trim()); } }
好博客就要一起分享哦!分享海报
您可能感兴趣的博客