Commit 9c42690c authored by 张大伟's avatar 张大伟

Merge branch 'master' of http://103.249.252.28:90/wangxiahui/znks

parents b6f402c7 a8de367d
......@@ -271,6 +271,16 @@
</configuration>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version>
<configuration>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
......
package com.adc.da.znks.util.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
private final int port;
public NettyServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(group) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(this.port)// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("connected...; Client:" + ch.remoteAddress());
ch.pipeline().addLast(new ServerHandler()); // 客户端触发操作
}
});
ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
System.out.println(NettyServer.class + " started and listen on " + cf.channel().localAddress());
cf.channel().closeFuture().sync(); // 关闭服务器通道
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
}
}
}
\ No newline at end of file
package com.adc.da.znks.util.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 57所netty 服务端接受数据 拆包
*
* @ClassName:RequestDecoder
* @author: DuYunbao
* @date: 2018/3/22 14:27
*/
public class RequestDecoder extends ByteToMessageDecoder {
private byte[] headAndLength = new byte[24];
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
//可读长度必须大于 同步位置到包长度属性 的长度
if (buffer.readableBytes() >= headAndLength.length) {
//防止socket字节流攻击
/*if (buffer.readableBytes() > 2048) {
buffer.skipBytes(buffer.readableBytes());
}*/
//记录包头开始的index
int beginReader;
byte[] synchroSign;
while (true) {
beginReader = buffer.readerIndex();
buffer.markReaderIndex();
synchroSign = new byte[4];
buffer.readBytes(synchroSign);
//开始判断当前读取数据是否是包头数据
if (synchroSign[0] == -2 && synchroSign[1] == -17 && synchroSign[2] == -97 && synchroSign[3] == -7) {
break;
}
//长度又变得不满足
if (buffer.readableBytes() < headAndLength.length) {
return;
}
}
//版本号
byte[] versionNumber = new byte[2];
buffer.readBytes(versionNumber);
//设备标识
byte[] deviceSign = new byte[14];
buffer.readBytes(deviceSign);
//包长度
byte[] packageData = new byte[4];
buffer.readBytes(packageData);
int dataLength = ByteToInt(packageData);
//判断请求数据包数据是否到齐
if (buffer.readableBytes() < dataLength - 24) {
//还原读指针
buffer.readerIndex(beginReader);
return;
}
//读取data数据
byte[] data = new byte[dataLength - 24];
buffer.readBytes(data);
//继续往下传递
byte[] bytes = new byte[dataLength];
System.arraycopy(synchroSign, 0, bytes, 0, synchroSign.length);
int index1 = synchroSign.length;
System.arraycopy(versionNumber, 0, bytes, index1, versionNumber.length);
int index2 = versionNumber.length + synchroSign.length;
System.arraycopy(deviceSign, 0, bytes, index2, deviceSign.length);
int index3 = versionNumber.length + synchroSign.length + deviceSign.length;
System.arraycopy(packageData, 0, bytes, index3, packageData.length);
int index4 = versionNumber.length + synchroSign.length + deviceSign.length + packageData.length;
System.arraycopy(data, 0, bytes, index4, data.length);
out.add(bytes);
}
}
public int ByteToInt(byte[] bArr) {
if(bArr.length!=4){
return -1;
}
return (int) ((((bArr[3] & 0xff) << 24)
| ((bArr[2] & 0xff) << 16)
| ((bArr[1] & 0xff) << 8)
| ((bArr[0] & 0xff) << 0)));
}
}
package com.adc.da.znks.util.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead...; received:" + msg);
System.out.println(convertByteBufToString((ByteBuf) msg));
//ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channelReadComplete..");
// 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server occur exception:" + cause.getMessage());
cause.printStackTrace();
ctx.close(); // 关闭发生异常的连接
}
public String convertByteBufToString(ByteBuf buf) {
String str;
if(buf.hasArray()) { // 处理堆缓冲区
str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
} else { // 处理直接缓冲区以及复合缓冲区
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
str = new String(bytes, 0, buf.readableBytes());
}
return str;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment