之前有做过消息推送相关的应用,使用的Netty框架,一直对这个框架非常感兴趣,也学习了一些它的原理,但感觉还是不够,所以想从今天开始对Netty框架写一个系列的使用及原理学习的博客,提升自己,也希望对看到这篇博客的朋友有所帮助,欢迎大家一起讨论。
我一直从事Android开发岗位,后台知识是自学,没有真正参加一个后台项目,所以在文中后台开发比较简单,如有问题欢迎指出,共同学习。
今天写第一篇博客,还是先从Netty框架的使用开始,我自己做了一个easyIM的简单Demo,可以实现简单的聊天功能,使用Protocol Buffer传输数据,以后会继续完善它的功能。
服务端代码地址 Github/easyImServer
客户端代码地址 Github/easyIm
一、服务端
使用SpringBoot搭建的后台服务,比较简单。
创建服务端主逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| fun start() { val boss = NioEventLoopGroup() val worker = NioEventLoopGroup()
try { val port = nettyConfig.port val bootStrap = ServerBootstrap() bootStrap.group(boss, worker) .channel(NioServerSocketChannel::class.java) .childHandler(ProtocolPipeline()) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32 * 1024) .option(ChannelOption.SO_RCVBUF, 32 * 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true)
val future = bootStrap.bind(port).sync() logger.info("server start finish,the port is $port")
future.channel().closeFuture().sync()
} catch (e: InterruptedException) { logger.error("server start error ${e.message.toString()}") } finally { boss.shutdownGracefully() worker.shutdownGracefully() } }
|
ProtocolPipeline数据处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class ProtocolPipeline : ChannelInitializer<SocketChannel>() { override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline()
pipeline.addLast("send heartbeat", IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
pipeline.addLast(ProtobufVarint32FrameDecoder()) pipeline.addLast("proto decoder", ProtobufDecoder(IMessage.Protocol.getDefaultInstance())) pipeline.addLast(ProtobufVarint32LengthFieldPrepender()) pipeline.addLast("proto encoder", ProtobufEncoder()) pipeline.addLast(ServerHandler()) } }
|
传输数据
数据使用protobuf,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| syntax = "proto2";
message Protocol { optional ContentType contentType = 1; optional bytes content = 2; }
enum ContentType { Register_INFO = 0; Register_UUID = 1; Message_INFO = 2; HEART_BEAT = 3; }
enum MessageType { ALL = 0; ONE = 1; }
message Register { optional string name = 1; }
message RegisterUUID { optional string name = 1; optional string UUID = 2; }
message Message { optional MessageType type = 1; required string uuid = 2; optional string message = 3; }
message HeartBeat_Ping{ required string time = 1; required string uuid = 2; }
message HeartBeat_Pong{ required string time = 1; required string uuid = 2; }
|
ServerHandler处理逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| class ServerHandler : ChannelInboundHandlerAdapter() {
private val logger = LoggerFactory.getLogger(ServerHandler::class.java)
private var counter: Int = 0
@Throws(Exception::class) override fun channelActive(ctx: ChannelHandlerContext) { logger.info("有人加入了!") }
override fun channelInactive(ctx: ChannelHandlerContext) { logger.info("有人退出") super.channelInactive(ctx) ChannelMapController.removeByChannle(ctx.channel()) }
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any?) { if (evt is IdleStateEvent) { if (counter >= 3) { ctx.channel()?.close()?.sync() ChannelMapController.removeByChannle(ctx.channel()) logger.info("已与Client断开连接") } else { counter++ logger.info("丢失了第 $counter 个心跳包") } } }
override fun channelRead(ctx: ChannelHandlerContext, msg: Any?) { val protoMsg = msg as IMessage.Protocol val contentType = protoMsg.contentType if (contentType == IMessage.ContentType.HEART_BEAT) { counter = 0 logger.info("收到心跳包") } else { handlerMessage(ctx, msg) } }
private fun handlerMessage(ctx: ChannelHandlerContext, msg: IMessage.Protocol) { counter = 0 val contentType = msg.contentType when (contentType) { IMessage.ContentType.Message_INFO -> { val message: IMessage.Message = IMessage.Message.parseFrom(msg.content) if (message.type == IMessage.MessageType.ALL) { logger.info("收到全员广播消息: ${message.message}") ChannelMapController.sendMsgToAll(ProtocolFactory.getMessage(message.message, IMessage.MessageType.ONE, ""), ctx.channel()) } else if (message.type == IMessage.MessageType.ONE) { logger.info("收到个人消息: ${message.message}") } } IMessage.ContentType.Register_INFO -> { logger.info("收到注册消息") val register: IMessage.Register = IMessage.Register.parseFrom(msg.content) val uuid = UUIDGenerator.getUUID() ChannelMapController.put(uuid, ctx.channel()) ctx.writeAndFlush(ProtocolFactory.getUUIDProto(register.name, uuid)) }
else -> {
} } } }
|
二、客户端
创建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| fun start() { mGroup = NioEventLoopGroup() try { val b = Bootstrap() b.group(mGroup) .channel(NioSocketChannel::class.java) .remoteAddress(InetSocketAddress("172.18.157.43", 1088)) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(ProtocolPipeline())
mChannelFuture = b.connect().awaitUninterruptibly() mChannelFuture!!.channel().closeFuture().sync() } finally { mGroup!!.shutdownGracefully().sync() } }
|
ProtocolPipeline数据处理
1 2 3 4 5 6 7 8 9 10 11 12 13
| class ProtocolPipeline : ChannelInitializer<SocketChannel>() { override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline()
pipeline.addLast("send heartbeat", IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)) pipeline.addLast(ProtobufVarint32FrameDecoder()) pipeline.addLast("proto decoder", ProtobufDecoder(IMessage.Protocol.getDefaultInstance())) pipeline.addLast(ProtobufVarint32LengthFieldPrepender()) pipeline.addLast("proto encoder", ProtobufEncoder()) pipeline.addLast(ClientHandler()) } }
|
ClientHandler处理逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class ClientHandler : SimpleChannelInboundHandler<IMessage.Protocol>() { private val TAG = "ClientHandler"
override fun channelActive(ctx: ChannelHandlerContext) { SendMsgController.setChannelHandler(ctx) }
override fun channelRead0(p0: ChannelHandlerContext?, message: IMessage.Protocol) { Log.e(TAG, "get form server $message") val contentType = message.contentType when (contentType) { IMessage.ContentType.HEART_BEAT -> { } IMessage.ContentType.Message_INFO -> { }
IMessage.ContentType.Register_UUID -> { } else -> {
} } }
override fun userEventTriggered(ctx: ChannelHandlerContext?, evt: Any?) { if (evt is IdleStateEvent) { if (evt.state() == IdleState.WRITER_IDLE) { Log.d(TAG, "send heartbeat!") ctx?.writeAndFlush(ProtocolFactory.getHeartBeat()) } else { Log.d(TAG, "其他超时:${evt.state()}") } }
super.userEventTriggered(ctx, evt) }
}
|
单例SendMsgController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| object SendMsgController {
val TAG = SendMsgController::class.java.simpleName var channelHandlerContext: ChannelHandlerContext? = null
fun setChannelHandler(channelHandlerContext: ChannelHandlerContext) { this.channelHandlerContext = channelHandlerContext }
fun sendMsg(msg: IMessage.Protocol) { if (channelHandlerContext != null) { channelHandlerContext!!.writeAndFlush(msg) } else { Log.e(TAG, "channelHandlerContext is null") }
}
fun sendMsg(msg: IMessage.Protocol, future: ChannelFutureListener) { if (channelHandlerContext != null) { channelHandlerContext!!.writeAndFlush(msg).addListener(future) } else { Log.e(TAG, "channelHandlerContext is null") } }
fun close() {
} }
|
在连接建立后就将channel保存在一个单例中,之后所有channel相关的操作都可以使用这个单例。