在netty基本組件介紹中,我們大致了解了netty的一些基本組件,今天我們來搭建一個基于netty的Tcp服務(wù)端程序,通過代碼來了解和熟悉這些組件的功能和使用方法。
首先我們自己創(chuàng)建一個Server類,命名為TCPServer
第一步初始化ServerBootstrap,ServerBootstrap是netty中的一個服務(wù)器引導(dǎo)類,對ServerBootstrap的實例化就是創(chuàng)建netty服務(wù)器的入口
public class TCPServer {
private Logger log = LoggerFactory.getLogger(getClass());
//端口號
private int port=5080;
//服務(wù)器運行狀態(tài)
private volatile boolean isRunning = false;
//處理Accept連接事件的線程,這里線程數(shù)設(shè)置為1即可,netty處理鏈接事件默認為單線程,過度設(shè)置反而浪費cpu資源
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//處理hadnler的工作線程,其實也就是處理IO讀寫 。線程數(shù)據(jù)默認為 CPU 核心數(shù)乘以2
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
public void init() throws Exception{
//創(chuàng)建ServerBootstrap實例
ServerBootstrap serverBootstrap=new ServerBootstrap();
//初始化ServerBootstrap的線程模型
serverBootstrap.group(workerGroup,workerGroup);//
//設(shè)置將要被實例化的ServerChannel類
serverBootstrap.channel(NioServerSocketChannel.class);//
//在ServerChannelInitializer中初始化ChannelPipeline責(zé)任鏈,并添加到serverBootstrap中
serverBootstrap.childHandler(new ServerChannelInitializer());
//標(biāo)識當(dāng)服務(wù)器請求處理線程全滿時,用于臨時存放已完成三次握手的請求的隊列的最大長度
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// 是否啟用心跳?;顧C機制
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//綁定端口后,開啟監(jiān)聽
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
if(channelFuture.isSuccess()){
System.out.println(“TCP服務(wù)啟動 成功—————“);
}
}
/**
* 服務(wù)啟動
*/
public synchronized void startServer() {
try {
this.init();
}catch(Exception ex) {
}
}
/**
* 服務(wù)關(guān)閉
*/
public synchronized void stopServer() {
if (!this.isRunning) {
throw new IllegalStateException(this.getName() + ” 未啟動 .”);
}
this.isRunning = false;
try {
Future<?> future = this.workerGroup.shutdownGracefully().await();
if (!future.isSuccess()) {
log.error(“workerGroup 無法正常停止:{}”, future.cause());
}
future = this.bossGroup.shutdownGracefully().await();
if (!future.isSuccess()) {
log.error(“bossGroup 無法正常停止:{}”, future.cause());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
this.log.info(“TCP服務(wù)已經(jīng)停止…”);
}
private String getName() {
return “TCP-Server”;
}
}
上面的代碼中主要使用到的ServerBootstrap類的方法有以下這些:
group :設(shè)置SeverBootstrap要用到的EventLoopGroup,也就是定義netty服務(wù)的線程模型,處理Acceptor鏈接的主”線程池”以及用于I/O工作的從”線程池”;
channel:設(shè)置將要被實例化的SeverChannel類;
option :指定要應(yīng)用到新創(chuàng)建SeverChannel的ChannelConfig的ChannelOption.其實也就是服務(wù)本身的一些配置;
chidOption:子channel的ChannelConfig的ChannelOption。也就是與客戶端建立的連接的一些配置;
childHandler:設(shè)置將被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其實就是讓你在里面定義處理連接收發(fā)數(shù)據(jù),需要哪些ChannelHandler按什么順序去處理;
第二步接下來我們實現(xiàn)ServerChannelInitializer類,這個類繼承實現(xiàn)自netty的ChannelInitializer抽象類,這個類的作用就是對channel(連接)的ChannelPipeline進行初始化工作,說白了就是你要把處理數(shù)據(jù)的方法添加到這個任務(wù)鏈中去,netty才知道每一步拿著socket連接和數(shù)據(jù)去做什么。
@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);
public ServerChannelInitializer() throws InterruptedException {
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//IdleStateHandler心跳機制,如果超時觸發(fā)Handle中userEventTrigger()方法
pipeline.addLast(“idleStateHandler”,
new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
// netty基于分割符的自帶解碼器,根據(jù)提供的分隔符解析報文,這里是0x7e;1024表示單條消息的最大長度,解碼器在查找分隔符的時候,達到該長度還沒找到的話會拋異常
// pipeline.addLast(
// new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
// Unpooled.copiedBuffer(new byte[] { 0x7e })));
//自定義編解碼器
pipeline.addLast(
new MessagePacketDecoder(),
new MessagePacketEncoder()
);
//自定義Hadler
pipeline.addLast(“handler”,new TCPServerHandler());
//自定義Hander,可用于處理耗時操作,不阻塞IO處理線程
pipeline.addLast(group,”BussinessHandler”,new BussinessHandler());
}
}
這里我們注意下
pipeline.addLast(group,”BussinessHandler”,new BussinessHandler());
在這里我們可以把一些比較耗時的操作(如存儲、入庫)等操作放在BussinessHandler中進行,因為我們?yōu)樗鼏为毞峙淞薊ventExecutorGroup 線程池執(zhí)行,所以說即使這里發(fā)生阻塞,也不會影響TCPServerHandler中數(shù)據(jù)的接收。
最后就是各個部分的具體實現(xiàn)
解碼器的實現(xiàn):
public class MessagePacketDecoder extends ByteToMessageDecoder
{
public MessagePacketDecoder() throws Exception
{
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
{
try {
if (buffer.readableBytes() > 0) {
// 待處理的消息包
byte[] bytesReady = new byte[buffer.readableBytes()];
buffer.readBytes(bytesReady);
//這之間可以進行報文的解析處理
out.add(bytesReady );
}
}finally {
}
}
}
編碼器的實現(xiàn)
public class MessagePacketEncoder extends MessageToByteEncoder<Object>
{
public MessagePacketEncoder()
{
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
{
try {
//在這之前可以實現(xiàn)編碼工作。
out.writeBytes((byte[])msg);
}finally {
}
}
}
TCPServerHandler的實現(xiàn)
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
public TCPServerHandler() {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//拿到傳過來的msg數(shù)據(jù),開始處理
}
//檢測到空閑連接,觸發(fā)
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//這里可做一些斷開連接的處理
}
}
BussinessHandler的實???與TCPServerHandler基本類似,它可以處理一些相對比較耗時的操作,我們這里就不實現(xiàn)了。
通過以上的代碼我們可以看到,一個基于netty的TCP服務(wù)的搭建基本就是三大塊:
1、對引導(dǎo)服務(wù)器類ServerBootstrap的初始化;
2、對ChannelPipeline的定義,也就是把多個ChannelHandler組成一條任務(wù)鏈;
3、對 ChannelHandler的具體實現(xiàn),其中可以有編解碼器,可以有對收發(fā)數(shù)據(jù)的業(yè)務(wù)處理邏輯;
以上代碼只是在基于netty框架搭建一個最基本的TCP服務(wù),其中包含了一些netty基本的特性和功能,當(dāng)然這只是netty運用的一個簡單的介紹,如有不正確的地方還望指出與海涵。