本文共 4901 字,大约阅读时间需要 16 分钟。
在实现TCP长连接功能中,客户端断线重连是一个常见问题。当我们使用Netty实现断线重连时,是否考虑过以下几个关键问题呢?这些都是笔者在实践中遇到的挑战,也是思考的起点。
首先,让我们看一下服务端的基本配置。以下是项目的主要依赖项:
org.springframework.boot spring-boot-starter 2.4.1 io.netty netty-all 4.1.56.Final org.jboss.marshalling jboss-marshalling-serial 2.0.10.Final
服务端主要负责处理客户端连接和数据传输。以下是服务端的核心业务处理类:
@ChannelHandler.Sharablepublic class SimpleServerHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class); private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); log.info("客户端连接成功:地址:{}", ctx.channel().remoteAddress()); log.info("当前共有 {} 个客户端连接", channels.size()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("server channelRead: {}", msg); ctx.channel().writeAndFlush("hello netty"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive:客户端关闭"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof java.io.IOException) { log.warn("exceptionCaught:客户端关闭"); } else { cause.printStackTrace(); } }} 为了确保客户端在线状态,我们在服务端启用了心跳机制。以下是心跳处理类:
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("server channelRead: {}", msg); if (msg.equals("ping")) { ctx.channel().writeAndFlush("pong"); } else { ctx.fireChannelRead(msg); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { log.info("超过心跳时间,关闭与服务端的连接:{}", ctx.channel().remoteAddress()); ctx.channel().close(); } } else { super.userEventTriggered(ctx, evt); } }} 我们使用Jboss Marshalling进行编解码。以下是自定义编解码工具类:
public final class MarshallingCodeFactory { /** 创建Jboss marshalling解码器 */ public static MarshallingDecoder buildMarshallingDecoder() { MarshallerFactory factory = Marshalling.getProvidedMarshallingFactory("serial"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration); return new MarshallingDecoder(provider, 1024); } /** 创建Jboss marshalling编码器 */ public static MarshallingEncoder buildMarshallingEncoder() { MarshallerFactory factory = Marshalling.getProvidedMarshallingFactory("serial"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration); return new MarshallingEncoder(provider); }} 以下是用于数据传输的实体类:
public class UserInfo implements Serializable { private static final long serialVersionUID = 6271330872494117382L; private String username; private int age; public UserInfo() { } public UserInfo(String username, int age) { this.username = username; this.age = age; } //省略getter、setter、toString等方法} 客户端实现需要处理连接和断线重连。以下是客户端的核心代码:
public class SimpleClientHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class); private NettyClient client; public SimpleClientHandler(NettyClient client) { this.client = client; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("client receive: {}", msg); }} 在实际应用中,客户端断线重连涉及以下关键问题:
如何监听到客户端和服务端连接断开?
如何实现断线后重新连接?
Netty客户端线程给多大比较合理?
通过上述实现,我们可以看到Netty在断线重连中的关键作用。服务端需要处理连接状态和心跳机制,而客户端则需要实现自动重连功能。线程配置也是一个重要考虑因素,需要根据实际需求进行优化。
转载地址:http://uncfk.baihongyu.com/