Netty–分隔符和定长解码器的应用

2018/03 05 20:03

TCP以流的方式进行数据传输,上层的应用协议为了对数据进行区分,往往采用如下4中方式

(1)消息长度固定,累计读取到长度综合为定长LEN的报文后,就认为读取到了一个完整的消息:将计数器位置,重新开始读取下一个数据报;

(2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;

(3)将读书的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符;

(4)通过在消息头中定义长度字段来表示消息的总长度.

Netty对上面4种应用做了统一的抽象,提供了4种解码器来解决对应的问题,使用起来非常方便.有了这些解码器,用户不需要自己读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包

DelimiterBaseFrameDecoder可以自动完成以分隔符做结束标志的消息的解码

FixedLengthFrameDecoder可以自动完成对定长度消息的解码

DelimiterBaseFrameDecoder应用开发,消息以"$_"作为分隔符

EchoServerHandler类

[cc lang="java"]
public class EchoServerHandler extends ChannelHandlerAdapter {

int counter = 0;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is " + ++counter + "times receive client :[" + body + "]");
body += "$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();//发生异常,关闭链路
}
}
[/cc]

EchoServer类
[cc lang="java"]
public class EchoServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
//优雅推出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (null != args && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}

new EchoServer().bind(port);
}

}
[/cc]

EchoClientHandler类
[cc lang="java"]
public class EchoClientHandler extends ChannelHandlerAdapter {

private int counter;

static final String ECHO_REQ = "Hi,Lilinfeng.Welcome to Netty.$_";

public EchoClientHandler() {

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("This is " + ++counter + " times receive server :[" + msg + "]");
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

[/cc]

EchoClient类
[cc lang="java"]
public class EchoClient {
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
//发起一步连接操作
ChannelFuture f = b.connect(host, port).sync();

//等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
//优雅退出,释放Nio线程组
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (null != args && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new EchoClient().connect(port, "127.0.0.1");
}
}
[/cc]

FixedLengthFrameDecoder应用开发,长度设置为20
EchoServerHandler类
[cc lang="java"]
public class EchoServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Receive client : [" + msg + "]");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();//发生异常,关闭链路
}
}
[/cc]

EchoServer类
[cc lang="java"]
public class EchoServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
//优雅推出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (null != args && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}

new EchoServer().bind(port);
}

}
[/cc]

利用telnet命令行测试FixedLengthFrameDecoder测试类

--转载请注明: https://www.guangboyuan.cn/%e5%88%86%e9%9a%94%e7%ac%a6%e5%92%8c%e5%ae%9a%e9%95%bf%e8%a7%a3%e7%a0%81%e5%99%a8%e7%9a%84%e5%ba%94%e7%94%a8/

发表回复

(必填)