官术网_书友最值得收藏!

1.9 Java網(wǎng)絡(luò)編程模型

1.9.1 阻塞I/O模型

阻塞I/O模型是常見的I/O模型,在讀寫數(shù)據(jù)時(shí)客戶端會(huì)發(fā)生阻塞。阻塞I/O模型的工作流程為:在用戶線程發(fā)出I/O請求之后,內(nèi)核會(huì)檢查數(shù)據(jù)是否就緒,此時(shí)用戶線程一直阻塞等待內(nèi)存數(shù)據(jù)就緒;在內(nèi)存數(shù)據(jù)就緒后,內(nèi)核將數(shù)據(jù)復(fù)制到用戶線程中,并返回I/O執(zhí)行結(jié)果到用戶線程,此時(shí)用戶線程將解除阻塞狀態(tài)并開始處理數(shù)據(jù)。典型的阻塞I/O模型的例子為data = socket.read(),如果內(nèi)核數(shù)據(jù)沒有就緒,Socket線程就會(huì)一直阻塞在read()中等待內(nèi)核數(shù)據(jù)就緒。

1.9.2 非阻塞I/O模型

非阻塞I/O模型指用戶線程在發(fā)起一個(gè)I/O操作后,無須阻塞便可以馬上得到內(nèi)核返回的一個(gè)結(jié)果。如果內(nèi)核返回的結(jié)果為false,則表示內(nèi)核數(shù)據(jù)還沒準(zhǔn)備好,需要稍后再發(fā)起I/O操作。一旦內(nèi)核中的數(shù)據(jù)準(zhǔn)備好了,并且再次收到用戶線程的請求,內(nèi)核就會(huì)立刻將數(shù)據(jù)復(fù)制到用戶線程中并將復(fù)制的結(jié)果通知用戶線程。

在非阻塞I/O模型中,用戶線程需要不斷詢問內(nèi)核數(shù)據(jù)是否就緒,在內(nèi)存數(shù)據(jù)還未就緒時(shí),用戶線程可以處理其他任務(wù),在內(nèi)核數(shù)據(jù)就緒后可立即獲取數(shù)據(jù)并進(jìn)行相應(yīng)的操作。典型的非阻塞I/O模型一般如下:

while(true){
  data  =  socket.read();
    if(data == true){//1:內(nèi)核數(shù)據(jù)就緒
      //獲取并處理內(nèi)核數(shù)據(jù)
      break;
   }else{   //2:內(nèi)核數(shù)據(jù)未就緒,用戶線程處理其他任務(wù)
  }
}

1.9.3 多路復(fù)用I/O模型

多路復(fù)用I/O模型是多線程并發(fā)編程用得較多的模型,Java NIO就是基于多路復(fù)用I/O模型實(shí)現(xiàn)的。在多路復(fù)用I/O模型中會(huì)有一個(gè)被稱為Selector的線程不斷輪詢多個(gè)Socket的狀態(tài),只有在Socket有讀寫事件時(shí),才會(huì)通知用戶線程進(jìn)行I/O讀寫操作。

因?yàn)樵诙嗦窂?fù)用I/O模型中只需一個(gè)線程就可以管理多個(gè)Socket(阻塞I/O模型和非阻塞1/O模型需要為每個(gè)Socket都建立一個(gè)單獨(dú)的線程處理該Socket上的數(shù)據(jù)),并且在真正有Socket讀寫事件時(shí)才會(huì)使用操作系統(tǒng)的I/O資源,大大節(jié)約了系統(tǒng)資源。

Java NIO在用戶的每個(gè)線程中都通過selector.select()查詢當(dāng)前通道是否有事件到達(dá),如果沒有,則用戶線程會(huì)一直阻塞。而多路復(fù)用I/O模型通過一個(gè)線程管理多個(gè)Socket通道,在Socket有讀寫事件觸發(fā)時(shí)才會(huì)通知用戶線程進(jìn)行I/O讀寫操作。因此,多路復(fù)用I/O模型在連接數(shù)眾多且消息體不大的情況下有很大的優(yōu)勢。尤其在物聯(lián)網(wǎng)領(lǐng)域比如車載設(shè)備實(shí)時(shí)位置、智能家電狀態(tài)等定時(shí)上報(bào)狀態(tài)且字節(jié)數(shù)較少的情況下優(yōu)勢更加明顯,一般一個(gè)經(jīng)過優(yōu)化后的16核32GB服務(wù)器能承載約10萬臺設(shè)備連接。

非阻塞I/O模型在每個(gè)用戶線程中都進(jìn)行Socket狀態(tài)檢查,而在多路復(fù)用I/O模型中是在系統(tǒng)內(nèi)核中進(jìn)行Socket狀態(tài)檢查的,這也是多路復(fù)用I/O模型比非阻塞I/O模型效率高的原因。

多路復(fù)用I/O模型通過在一個(gè)Selector線程上以輪詢方式檢測在多個(gè)Socket上是否有事件到達(dá),并逐個(gè)進(jìn)行事件處理和響應(yīng)。因此,對于多路復(fù)用I/O模型來說,在事件響應(yīng)體(消息體)很大時(shí),Selector線程就會(huì)成為性能瓶頸,導(dǎo)致后續(xù)的事件遲遲得不到處理,影響下一輪的事件輪詢。在實(shí)際應(yīng)用中,在多路復(fù)用方法體內(nèi)一般不建議做復(fù)雜邏輯運(yùn)算,只做數(shù)據(jù)的接收和轉(zhuǎn)發(fā),將具體的業(yè)務(wù)操作轉(zhuǎn)發(fā)給后面的業(yè)務(wù)線程處理。

1.9.4 信號驅(qū)動(dòng)I/O模型

在信號驅(qū)動(dòng)I/O模型中,在用戶線程發(fā)起一個(gè)I/O請求操作時(shí),系統(tǒng)會(huì)為該請求對應(yīng)的Socket注冊一個(gè)信號函數(shù),然后用戶線程可以繼續(xù)執(zhí)行其他業(yè)務(wù)邏輯;在內(nèi)核數(shù)據(jù)就緒時(shí),系統(tǒng)會(huì)發(fā)送一個(gè)信號到用戶線程,用戶線程在接收到該信號后,會(huì)在信號函數(shù)中調(diào)用對應(yīng)的I/O讀寫操作完成實(shí)際的I/O請求操作。

1.9.5 異步I/O模型

在異步I/O模型中,用戶線程會(huì)發(fā)起一個(gè)asynchronous read操作到內(nèi)核,內(nèi)核在接收到synchronous read請求后會(huì)立刻返回一個(gè)狀態(tài),來說明請求是否成功發(fā)起,在此過程中用戶線程不會(huì)發(fā)生任何阻塞。接著,內(nèi)核會(huì)等待數(shù)據(jù)準(zhǔn)備完成并將數(shù)據(jù)復(fù)制到用戶線程中,在數(shù)據(jù)復(fù)制完成后內(nèi)核會(huì)發(fā)送一個(gè)信號到用戶線程,通知用戶線程asynchronous讀操作已完成。在異步I/O模型中,用戶線程不需要關(guān)心整個(gè)I/O操作是如何進(jìn)行的,只需發(fā)起一個(gè)請求,在接收到內(nèi)核返回的成功或失敗信號時(shí)說明I/O操作已經(jīng)完成,直接使用數(shù)據(jù)即可。

在異步I/O模型中,I/O操作的兩個(gè)階段(請求的發(fā)起、數(shù)據(jù)的讀取)都是在內(nèi)核中自動(dòng)完成的,最終發(fā)送一個(gè)信號告知用戶線程I/O操作已經(jīng)完成,用戶直接使用內(nèi)存寫好的數(shù)據(jù)即可,不需要再次調(diào)用I/O函數(shù)進(jìn)行具體的讀寫操作,因此在整個(gè)過程中用戶線程不會(huì)發(fā)生阻塞。

在信號驅(qū)動(dòng)模型中,用戶線程接收到信號便表示數(shù)據(jù)已經(jīng)就緒,需要用戶線程調(diào)用I/O函數(shù)進(jìn)行實(shí)際的I/O讀寫操作,將數(shù)據(jù)讀取到用戶線程;而在異步I/O模型中,用戶線程接收到信號便表示I/O操作已經(jīng)完成(數(shù)據(jù)已經(jīng)被復(fù)制到用戶線程),用戶可以開始使用該數(shù)據(jù)了。

異步I/O需要操作系統(tǒng)的底層支持,在Java 7中提供了Asynchronous I/O操作。

1.9.6 Java I/O

在整個(gè)Java.io包中最重要的是5個(gè)類和1個(gè)接口。5個(gè)類指的是File、OutputStream、InputStream、Writer、Reader,1個(gè)接口指的是Serializable。具體的使用方法請參考JDK API。

1.9.7 Java NIO

Java NIO的實(shí)現(xiàn)主要涉及三大核心內(nèi)容:Selector(選擇器)、Channel(通道)和Buffer(緩沖區(qū))。Selector用于監(jiān)聽多個(gè)Channel的事件,比如連接打開或數(shù)據(jù)到達(dá),因此,一個(gè)線程可以實(shí)現(xiàn)對多個(gè)數(shù)據(jù)Channel的管理。傳統(tǒng)I/O基于數(shù)據(jù)流進(jìn)行I/O讀寫操作;而Java NIO基于Channel和Buffer進(jìn)行I/O讀寫操作,并且數(shù)據(jù)總是被從Channel讀取到Buffer中,或者從Buffer寫入Channel中。

Java NIO和傳統(tǒng)I/O的最大區(qū)別如下。

(1)I/O是面向流的,NIO是面向緩沖區(qū)的:在面向流的操作中,數(shù)據(jù)只能在一個(gè)流中連續(xù)進(jìn)行讀寫,數(shù)據(jù)沒有緩沖,因此字節(jié)流無法前后移動(dòng)。而在NIO中每次都是將數(shù)據(jù)從一個(gè)Channel讀取到一個(gè)Buffer中,再從Buffer寫入Channel中,因此可以方便地在緩沖區(qū)中進(jìn)行數(shù)據(jù)的前后移動(dòng)等操作。該功能在應(yīng)用層主要用于數(shù)據(jù)的粘包、拆包等操作,在網(wǎng)絡(luò)不可靠的環(huán)境下尤為重要。

(2)傳統(tǒng)I/O的流操作是阻塞模式的,NIO的流操作是非阻塞模式的。在傳統(tǒng)I/O下,用戶線程在調(diào)用read()或write()進(jìn)行I/O讀寫操作時(shí),該線程將一直被阻塞,直到數(shù)據(jù)被讀取或數(shù)據(jù)完全寫入。NIO通過Selector監(jiān)聽Channel上事件的變化,在Channel上有數(shù)據(jù)發(fā)生變化時(shí)通知該線程進(jìn)行讀寫操作。對于讀請求而言,在通道上有可用的數(shù)據(jù)時(shí),線程將進(jìn)行Buffer的讀操作,在沒有數(shù)據(jù)時(shí),線程可以執(zhí)行其他業(yè)務(wù)邏輯操作。對于寫操作而言,在使用一個(gè)線程執(zhí)行寫操作將一些數(shù)據(jù)寫入某通道時(shí),只需將Channel上的數(shù)據(jù)異步寫入Buffer即可,Buffer上的數(shù)據(jù)會(huì)被異步寫入目標(biāo)Channel上,用戶線程不需要等待整個(gè)數(shù)據(jù)完全被寫入目標(biāo)Channel就可以繼續(xù)執(zhí)行其他業(yè)務(wù)邏輯。

非阻塞I/O模型中的Selector線程通常將I/O的空閑時(shí)間用于執(zhí)行其他通道上的I/O操作,所以一個(gè)Selector線程可以管理多個(gè)輸入和輸出通道,如圖1-18所示。

圖1-18

1.Channel

Channel和I/O中的Stream(流)類似,只不過Stream是單向的(例如InputStream、OutputStream),而Channel是雙向的,既可以用來進(jìn)行讀操作,也可以用來進(jìn)行寫操作。

NIO中Channel的主要實(shí)現(xiàn)有:FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel,分別對應(yīng)文件的I/O、UDP、TCP I/O、Socket Client和Socker Server操作。

2.Buffer

Buffer實(shí)際上是一個(gè)容器,其內(nèi)部通過一個(gè)連續(xù)的字節(jié)數(shù)組存儲(chǔ)I/O上的數(shù)據(jù)。在NIO中,Channel在文件、網(wǎng)絡(luò)上對數(shù)據(jù)的讀取或?qū)懭攵急仨毥?jīng)過Buffer。

如圖1-19所示,客戶端在向服務(wù)端發(fā)送數(shù)據(jù)時(shí),必須先將數(shù)據(jù)寫入Buffer中,然后將Buffer中的數(shù)據(jù)寫到服務(wù)端對應(yīng)的Channel上。服務(wù)端在接收數(shù)據(jù)時(shí)必須通過Channel將數(shù)據(jù)讀入Buffer中,然后從Buffer中讀取數(shù)據(jù)并處理。

圖1-19

在NIO中,Buffer是一個(gè)抽象類,對不同的數(shù)據(jù)類型實(shí)現(xiàn)不同的Buffer操作。常用的Buffer實(shí)現(xiàn)類有:ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer。

3.Selector

Selector用于檢測在多個(gè)注冊的Channel上是否有I/O事件發(fā)生,并對檢測到的I/O事件進(jìn)行相應(yīng)的響應(yīng)和處理。因此通過一個(gè)Selector線程就可以實(shí)現(xiàn)對多個(gè)Channel的管理,不必為每個(gè)連接都創(chuàng)建一個(gè)線程,避免線程資源的浪費(fèi)和多線程之間的上下文切換導(dǎo)致的開銷。同時(shí),Selector只有在Channel上有讀寫事件發(fā)生時(shí),才會(huì)調(diào)用I/O函數(shù)進(jìn)行讀寫操作,可極大減少系統(tǒng)開銷,提高系統(tǒng)的并發(fā)量。

4.Java NIO使用

要實(shí)現(xiàn)Java NIO,就需要分別實(shí)現(xiàn)Server和Client。具體的Server實(shí)現(xiàn)代碼如下:

public  class  MyServer  {
    private  int  size  =  1024;
    private  ServerSocketChannel  serverSocketChannel;
    private  ByteBuffer  byteBuffer;
    private  Selector  selector;
    private  int  remoteClientNum  =  0;
    public  MyServer(int  port)  {
      try  {
          //在構(gòu)造函數(shù)中初始化Channel監(jiān)聽
          initChannel(port);
      }  catch  (IOException  e)  {
          e.printStackTrace();
          System.exit(-1);
      }
    }
    //Channel的初始化
    public  void  initChannel(int  port)  throws  IOException  {
      //打開Channel
      serverSocketChannel  =  ServerSocketChannel.open();
      //設(shè)置為非阻塞模式
      serverSocketChannel.configureBlocking(false);
      //綁定端口
      serverSocketChannel.bind(new  InetSocketAddress(port));
      System.out.println("listener  on  port:  "  +  port);
      //選擇器的創(chuàng)建
      selector  =  Selector.open();
      //向選擇器注冊通道
      serverSocketChannel.register(selector,  SelectionKey.OP_ACCEPT);
      //分配緩沖區(qū)的大小
      byteBuffer  =  ByteBuffer.allocate(size);
    }
    //監(jiān)聽器,用于監(jiān)聽Channel上的數(shù)據(jù)變化
    private  void  listener()  throws  Exception  {
      while  (true)  {
          //返回的int值表示有多少個(gè)Channel處于就緒狀態(tài)
          int  n  =  selector.select();
          if  (n  ==  0)  {
              continue;
          }
          //每個(gè)selector對應(yīng)多個(gè)SelectionKey,每個(gè)SelectionKey對應(yīng)一個(gè)Channel
          Iterator<SelectionKey>  iterator  =
                                    selector.selectedKeys().iterator();
          while  (iterator.hasNext())  {
              SelectionKey  key  =  iterator.next();
              //如果SelectionKey處于連接就緒狀態(tài),則開始接收客戶端的連接
              if  (key.isAcceptable())  {
                //獲取Channel
            ServerSocketChannel  server =  (ServerSocketChannel) key.channel();
                //Channel接收連接
                SocketChannel  channel  =  server.accept();
                //Channel注冊
                registerChannel(selector,  channel,  SelectionKey.OP_READ);
                //遠(yuǎn)程客戶端的連接數(shù)
                remoteClientNum++;
                System.out.println("online  client  num="+remoteClientNum);
                write(channel, "hello  client".getBytes());
              }
              //如果通道已經(jīng)處于讀就緒狀態(tài)
              if  (key.isReadable())  {
                read(key);
              }
              iterator.remove();
          }
      }
    }
    private  void  read(SelectionKey  key)  throws  IOException  {
      SocketChannel  socketChannel  =  (SocketChannel)  key.channel();
      int  count;
      byteBuffer.clear();
      //從通道中讀數(shù)據(jù)到緩沖區(qū)
      while  ((count  =  socketChannel.read(byteBuffer))  >  0)  {
          //byteBuffer寫模式變?yōu)樽x模式
          byteBuffer.flip();
          while  (byteBuffer.hasRemaining())  {
              System.out.print((char)byteBuffer.get());
          }
          byteBuffer.clear();
      }
      if  (count  <  0)  {
          socketChannel.close();
      }
    }
    private  void  write(SocketChannel  channel, byte[]  writeData)  throws IOException  {
      byteBuffer.clear();
      byteBuffer.put(writeData);
      //byteBuffer從寫模式變成讀模式
      byteBuffer.flip();
      //將緩沖區(qū)的數(shù)據(jù)寫入通道中
      channel.write(byteBuffer);
    }
    private  void  registerChannel(Selector  selector,  SocketChannel  channel, int  opRead)  throws  IOException  {
      if  (channel  ==  null)  {
          return;
      }
      channel.configureBlocking(false);
      channel.register(selector,  opRead);
    }
    public  static  void  main(String[]  args)  {
      try  {
          MyServer  myServer  =  new  MyServer(9999);
          myServer.listener();
      }  catch  (Exception  e)  {
          e.printStackTrace();
      }
    }


}

在以上代碼中定義了名為MyServer的服務(wù)端實(shí)現(xiàn)類,在該類中定義了serverSocketChannel用于ServerSocketChannel的建立和端口的綁定;byteBuffer用于不同Channel之間的數(shù)據(jù)交互;selector用于監(jiān)聽服務(wù)器各個(gè)Channel上數(shù)據(jù)的變化并做出響應(yīng)。同時(shí),在類構(gòu)造函數(shù)中調(diào)用了初始化ServerSocketChannel的操作,定義了listener方法來監(jiān)聽Channel上的數(shù)據(jù)變化,解析客戶端的數(shù)據(jù)并對客戶端的請求做出響應(yīng)。

具體的Client實(shí)現(xiàn)代碼如下:

public  class  MyClient  {
    private  int  size  =  1024;
    private  ByteBuffer  byteBuffer;
    private  SocketChannel  socketChannel;
    public  void  connectServer()  throws  IOException  {
      socketChannel  =  socketChannel.open();
      socketChannel.connect(new  InetSocketAddress("127.0.0.1",  9999));
      socketChannel.configureBlocking(false);
      byteBuffer  =  ByteBuffer.allocate(size);
      receive();
    }
    private  void  receive()  throws  IOException  {
      while  (true)  {
          byteBuffer.clear();
          int  count;
          //如果沒有數(shù)據(jù)可讀,則read方法一直阻塞,直到讀取到新的數(shù)據(jù)
          while  ((count  =  socketChannel.read(byteBuffer))  >  0)  {
              byteBuffer.flip();
              while  (byteBuffer.hasRemaining())  {
                  System.out.print((char)byteBuffer.get());
              }
              send2Server("say  hi".getBytes());
              byteBuffer.clear();
          }
      }
    }
    private  void  send2Server(byte[]  bytes)  throws  IOException  {
      byteBuffer.clear();
      byteBuffer.put(bytes);
      byteBuffer.flip();
      socketChannel.write(byteBuffer);
    }
    public  static  void  main(String[]  args)  throws  IOException  {
      new  MyClient().connectServer();
    }
}

在以上代碼中定義了MyClient類來實(shí)現(xiàn)客戶端的Channel邏輯,其中,connectServer方法用于和服務(wù)端建立連接,receive方法用于接收服務(wù)端發(fā)來的數(shù)據(jù),send2Server用于向服務(wù)端發(fā)送數(shù)據(jù)。

主站蜘蛛池模板: 石棉县| 安吉县| 基隆市| 久治县| 菏泽市| 城口县| 麟游县| 清原| 胶州市| 东乡县| 长岛县| 盐源县| 星座| 海兴县| 陵川县| 中阳县| 新巴尔虎左旗| 甘肃省| 玛纳斯县| 清新县| 惠东县| 广丰县| 曲麻莱县| 丰顺县| 巴彦淖尔市| 石柱| 渝中区| 夏邑县| 邯郸县| 云南省| 渑池县| 疏勒县| 莱芜市| 安泽县| 东宁县| 韩城市| 普兰店市| 丽江市| 浑源县| 孝感市| 乌苏市|