一般来说,在与RPC框架直接接触时,内部为粘性分包创建了解决方案。让我们一起来了解一下这个方便的意思,包括编码解码。
(一)粘包分包概念
- 粘包
TCP
由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象(确切来讲,对于基于TCP协议的应用,不应用包来描述,而应 用 流来描述),个人认为服务器接收端产生的粘包应该与linux内核处理socket的方式 select轮询机制的线性扫描频度无关。
UDP
本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。
- 分包
可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。
- TCP当中,只有流的概念,没有包的概念(根本原因)
简单的概括
(1)粘包:
1.服务端
原因收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据
2.客户端
原因TCP为提高传输效率,要收集到足够多的数据后才发送一包数据
(2).分包:
1.应用程序写入的字节大小大于套接字发送缓冲区的大小
2.进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS
3.以太网帧的payload(净荷)大于MTU(1500字节)进行ip分片
(二)netty粘包分包现象演示
源码:pack目录下的error
Server.java
package com.dig8.;
import org.jbo;
import org.jbo;
import org.jboFactory;
import org.jbo;
import org.jbo;
import java.net.InetSocketAddress;
import java.u;
import java.u;
/**
* netty服务端
*
* @author idig8.com
*/
public class Server {
public static void main(String[] args) {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Execu();
// worker线程负责数据读写
ExecutorService worker = Execu();
// 设置niosocket工厂
boo(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
boo(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = C();
// 管道过滤器
("myHandler", new ServerHandler());
return pipeline;
}
});
// 服务类绑定端口
boo(new InetSocketAddress(8888));
}
}
ServerHandler.java
package com.dig8.;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
/**
* @author idig8.com
*/
public class ServerHandler extends SimpleChannelHandler{
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
byte[] bs = bu();
Sy("server receive data: " +new String(bs));
}
}
Client.java
package com.dig8.;
import org.jbo;
import org.jbo;
import org.jboFactory;
import org.jbo;
import org.jbo;
import org.jbo;
import java.net.InetSocketAddress;
import java.u;
import java.u;
/**
* 客户端
*
* @author idig8.com
*/
public class Client {
public static void main(String[] args) throws Exception {
//服务类
ClientBootstrap bootstrap = new ClientBootstrap();
//线程池
ExecutorService boss = Execu();
ExecutorService worker = Execu();
//socket工厂
boo(new NioClientSocketChannelFactory(boss, worker));
//管道工厂
boo(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = C();
("1", new StringEncoder());
("2", new ClientHandler());
return pipeline;
}
});
//连接服务端
boo(new InetSocketAddress("127.0.0.1", 8888)).sync();
}
}
Clien
package com.dig8.;
import org.jbo;
import org.jbos;
import org.jbo.*;
/**
* 客户端消息处理类
* @author idig8.com
*/
public class ClientHandler extends SimpleChannelHandler {
// 包头
private static final int HEAD_FLAG = -32323231;
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = c();
String msg = "Hello,idig8.com";
for (int i = 0; i < 1000; i++) {
c(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
运行后出现粘包和分包现象
(三)粘包分包问题解决思路
服务端和客户端约定好稳定的数据包结构
1.客户端根据约定的数据包结构发送数据
2.服务端根据约定的数据包结构来读取数据
通过MyDecoder集成FrameDecoder的方式来
源码:pack目录下的custom
Server.java
package com.dig8.;
import java.net.InetSocketAddress;
import java.u;
import java.u;
import org.jbo;
import org.jbo;
import org.jboFactory;
import org.jbo;
import org.jbo;
/**
* netty服务端
*
* @author idig8.com
*/
public class Server {
public static void main(String[] args) {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Execu();
// worker线程负责数据读写
ExecutorService worker = Execu();
// 设置niosocket工厂
boo(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
boo(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = C();
// 管道过滤器
("myDecoder", new MyDecoder());
("myHandler", new ServerHandler());
return pipeline;
}
});
// 服务类绑定端口
boo(new InetSocketAddress(7778));
}
}
ServerHandler.java
package com.dig8.;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
/**
* @author idig8.com
*/
public class ServerHandler extends SimpleChannelHandler{
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
/*ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
byte[] bs = bu();*/
Sy("server receive data: " + e.getMessage());
}
}
Client.java
package com.dig8.;
import java.net.InetSocketAddress;
import java.u;
import java.u;
import org.jbo;
import org.jbo;
import org.jboFactory;
import org.jbo;
import org.jbo;
import org.jbo;
/**
* 客户端
*
* @author idig8.com
*/
public class Client {
public static void main(String[] args) throws Exception {
//服务类
ClientBootstrap bootstrap = new ClientBootstrap();
//线程池
ExecutorService boss = Execu();
ExecutorService worker = Execu();
//socket工厂
boo(new NioClientSocketChannelFactory(boss, worker));
//管道工厂
boo(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = C();
("1", new StringEncoder());
("2", new ClientHandler());
return pipeline;
}
});
//连接服务端
boo(new InetSocketAddress("127.0.0.1", 7778)).sync();
}
}
Clien
package com.dig8.;
import org.jbo;
import org.jbos;
import org.jbo.Channel;
import org.jbo;
import org.jbo.ChannelStateEvent;
import org.jbo.ExceptionEvent;
import org.jbo;
/**
* 客户端消息处理类
* @author idig8.com
*/
public class ClientHandler extends SimpleChannelHandler {
// 包头
private static final int HEAD_FLAG = -32323231;
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = c();
String msg = "Hello,idig8.com 通过定义包头+长度+数据 防止粘包和分包";
byte[] bytes = m();
// 定义数据包 ,结构为:包头 + 长度 + 数据
ChannelBuffer buffer = C();
// 1.写包头
bu(HEAD_FLAG);// 4字节
// 2.写长度
bu);// 4字节
// 3.写数据本身
bu(bytes);
for (int i = 0; i < 1000; i++) {
c(buffer);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
MyDecoder.java
/**
*
*/
package com.dig8.;
import org.jbo;
import org.jbo.Channel;
import org.jbo;
import org.jbo;
/**
* @author idig8.com
*/
public class MyDecoder extends FrameDecoder{
// 包头
private static final int HEAD_FLAG = -32323231;
// 数据包基本长度
private final static int BASE_LENGTH = 4 + 4;
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
// 收到数据之后,先判断buffer中可读的数据长度是否大于数据包的基本长度
i() > BASE_LENGTH){
// 防止socket攻击:
i() > 4096 * 2){ // 4k
Sy("socket 攻击了");
bu());
}
// 记录包头开始的位置
int headIndex;
while(true){
headIndex = bu();
bu();
// 代码很关键
i() < 4){// 包头的长度
bu(headIndex);
return null;
}
// 此时说明包头的长度是足够的
// 正好读取的是包头
i() == HEAD_FLAG ){
break;
}
// [1,2,3,4] 1 1 1 1 1
// 如果不是包头,需要略过一个字节,在略过之前,需要还原读指针位置
bu();
bu();// 略过一个字节
i() < BASE_LENGTH){
return null;
}
}
// 此时说明有数据包到来
// 做标记(记住当前读指针的位置)
// bu();
// 1.读长度
int dataLength = bu();
i() < dataLength){
// 说明数据本身的长度还不够, 肯定要继续等待后面的数据到来
// 还原读指针的位置
bu(headIndex);
return null;
}
// 此时说明数据包已经位置
// 2.读数据本身
byte[] dst = new byte[dataLength];
bus(dst);
// 继续传递下去
// ?
// 如果此时buffer中的数据还没有读完,那么剩下的数据怎么办?
return new String(dst);
}
// return null 表示此时的数据包不完整,需要继续等待下一个数据包的到来 ?
return null;
}
}
源码:源码/『互联网架构』软件架构-io与nio线程模型reactor模型(上)(53)/nio
(三)Netty自带粘包分包解决方案
消息定长
1.FixedLengthFrameDecoder
行分隔符
2.LineBasedFrameDecoder
自定义特殊符号进行分割
3.DelimiterBasedFrameDecoder
源码:pack目录下的nettysolution
Server.java
package com.dig8.;
import java.net.InetSocketAddress;
import java.u;
import java.u;
import org.jbo;
import org.jbos;
import org.jbo;
import org.jboFactory;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
/**
* 服务端
* @author idig8.com
*/
public class Server {
public static void main(String[] args) throws Exception {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Execu();
// worker线程负责数据读写
ExecutorService worker = Execu();
// 设置niosocket工厂
boo(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
boo(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = C();
// 管道过滤器
// 方案1:消息定长
//("fixedLength", new FixedLengthFrameDecoder(18));
// 方案2:行分隔符
//("fixedLength", new LineBasedFrameDecoder(1024));
// 方案3:自定义特殊符号进行分割
("delimiter", new DelimiterBasedFrameDecoder(1024,
C("#@#".getBytes())));
("1",new StringDecoder());
("2",new ServerMessageHandler());
return pipeline;
}
});
// 服务类绑定端口
boo(new InetSocketAddress(7777));
Sy("服务端启动…");
}
}
ServerMe
package com.dig8.;
import org.jbo;
import org.jbo.ExceptionEvent;
import org.jbo;
import org.jbo;
/**
* 服务端消息处理类
* @author idig8.com
*/
public class ServerMessageHandler extends SimpleChannelHandler {
/**
* 接收消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Sy("receive request: " + e.getMessage());
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
Client.java
package com.dig8.;
import java.net.InetSocketAddress;
import java.u;
import java.u;
import org.jbo;
import org.jbo;
import org.jbos;
import org.jbo.ChannelFuture;
import org.jbo;
import org.jboFactory;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
import org.jbo;
/**
* 客户端
*
* @author idig8.com
*/
public class Client {
public static void main(String[] args) throws Exception {
//服务类
ClientBootstrap bootstrap = new ClientBootstrap();
//线程池
ExecutorService boss = Execu();
ExecutorService worker = Execu();
//socket工厂
boo(new NioClientSocketChannelFactory(boss, worker));
//管道工厂
boo(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = C();
// 方案1:消息定长
//("fixedLength", new FixedLengthFrameDecoder(18));
// 方案2:行分隔符
//("fixedLength", new LineBasedFrameDecoder(1024));
// 方案3:自定义特殊符号进行分割
("delimiter", new DelimiterBasedFrameDecoder(1024,
C("#@#".getBytes())));
("1",new StringEncoder());
("2", new ClientMessageHandler());
return pipeline;
}
});
//连接服务端
@SuppressWarnings("unused")
ChannelFuture connect = boo(new InetSocketAddress("127.0.0.1", 7777)).sync();
// Channel channel = connect.getChannel();
// Sy("client start");
// Scanner scanner = new Scanner);
// while(true){
// Sy("请输入:");
// c());
// }
}
}
ClientMe
package com.dig8.;
import org.jbo.Channel;
import org.jbo;
import org.jbo.ChannelStateEvent;
import org.jbo.ExceptionEvent;
import org.jbo;
import org.jbo;
/**
* 客户端消息接受处理类
* @author idig8.com
*/
public class ClientMessageHandler extends SimpleChannelHandler {
/**
* 接收消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Sy("server response : " + e.getMessage());
}
/**
* 新连接
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = c();
String separator = "#@#";("line.separator");// 系统换行符
String msg = "idig8.com send cmd";
for (int i = 0; i < 1000; i++) {
c(msg + i + separator);
}
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
}
}
PS:基本上netty针对tcp 分包粘包已经说完了,确实有了netty真的很方便比传统的socket方便很多。
感谢您能够看完这篇文章,喜欢的话要记得关注哦!35岁的IT老兵!
1.《unhandled exception caught专题之网络通讯如何解决出现分包粘包,来一起看看netty的解决思路》援引自互联网,旨在传递更多网络信息知识,仅代表作者本人观点,与本网站无关,侵删请联系页脚下方联系方式。
2.《unhandled exception caught专题之网络通讯如何解决出现分包粘包,来一起看看netty的解决思路》仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证。
3.文章转载时请保留本站内容来源地址,https://www.cxvn.com/gl/djyxgl/229198.html