官术网_书友最值得收藏!

1.1 基于消息方式實現(xiàn)系統(tǒng)間的通信

1.1.1 基于Java自身技術(shù)實現(xiàn)消息方式的系統(tǒng)間通信

基于Java自身包實現(xiàn)消息方式的系統(tǒng)間通信的方式有:TCP/IP+BIO、TCP/IP+NIO、UDP/IP+BIO以及UDP/IP+NIO 4種,下面分別介紹如何實現(xiàn)這4種方式的系統(tǒng)間通信。

TCP/IP+BIO

在Java中可基于Socket、ServerSocket來實現(xiàn)TCP/IP+BIO的系統(tǒng)間通信。Socket主要用于實現(xiàn)建立連接及網(wǎng)絡(luò)IO的操作,ServerSocket主要用于實現(xiàn)服務(wù)器端端口的監(jiān)聽及Socket對象的獲取。基于Socket實現(xiàn)客戶端的關(guān)鍵代碼如下:

    // 創(chuàng)建連接,如果域名解析不了會拋出UnknownHostException,當(dāng)連接不上時會拋出IOException,
    如果希望控制建立連接的超時,可先調(diào)用new Socket(),然后調(diào)用socket.connect(SocketAddress
    類型的目標(biāo)地址,以毫秒為單位的超時時間)
    Socket socket=new Socket(目標(biāo)IP或域名,目標(biāo)端口);
    // 創(chuàng)建讀取服務(wù)器端返回流的BufferedReader
    BufferedReader in=new BufferedReader(new
    InputStreamReader(socket.getInputStream()));
    // 創(chuàng)建向服務(wù)器寫入流的PrintWriter
    PrintWriter out=new PrintWriter(socket.getOutputStream(),true);
    // 向服務(wù)器發(fā)送字符串信息,要注意的是,此處即使寫失敗也不會拋出異常信息,并且一直會阻塞到寫入
    操作系統(tǒng)或網(wǎng)絡(luò)IO出現(xiàn)異常為止
    out.println(“hello”);
    // 阻塞讀取服務(wù)端的返回信息,以下代碼會阻塞到服務(wù)端返回信息或網(wǎng)絡(luò)IO出現(xiàn)異常為止,如果希望在超
    過一段時間后就不阻塞了,那么要在創(chuàng)建Socket對象后調(diào)用socket.setSoTimeout(以毫秒為單位的超
    時時間)
    in.readLine();

服務(wù)器端關(guān)鍵代碼如下:

    // 創(chuàng)建對本地指定端口的監(jiān)聽,如端口沖突則拋出SocketException,其他網(wǎng)絡(luò)IO方面的異常則拋出
    IOException ServerSocket ss=new ServerSocket(監(jiān)聽的端口)
    // 接受客戶端建立連接的請求,并返回Socket對象,以便和客戶端進行交互,交互的方式和客戶端相同,
    也是通過Socket.getInputStream和Socket.getOutputStream來進行讀寫操作,此方法會一直阻
    塞到有客戶端發(fā)送建立連接的請求,如果希望此方法最多阻塞一定的時間,則要在創(chuàng)建ServerSocket后
    調(diào)用其setSoTimeout(以毫秒為單位的超時時間)
    Socket socket=ss.accept();

上面是基于Socket、ServerSocket實現(xiàn)的一個簡單的系統(tǒng)間通信的例子。而在實際的系統(tǒng)中,通常要面對的是客戶端同時要發(fā)送多個請求到服務(wù)器端,服務(wù)器端則同時要接受多個連接發(fā)送的請求,上面的代碼顯然是無法滿足的。

為了滿足客戶端能同時發(fā)送多個請求到服務(wù)器端,最簡單的方法就是生成多個Socket。但這里會產(chǎn)生兩個問題:一是生成太多的Socket會消耗過多的本地資源,在客戶端機器多,服務(wù)器端機器少的情況下,客戶端生成太多Socket會導(dǎo)致服務(wù)器端須要支撐非常高的連接數(shù);二是生成Socket(建立連接)通常是比較慢的,因此頻繁地創(chuàng)建會導(dǎo)致系統(tǒng)性能不足。鑒于這兩個問題,通常采用連接池的方式來維護Socket是比較好的,一方面限制了能創(chuàng)建的Socket的個數(shù);另一方面由于將Socket放入了池中,避免了重復(fù)創(chuàng)建Socket帶來的性能下降問題。數(shù)據(jù)庫連接池就是這種方式的典型代表,但連接池的方式會帶來另一個問題,連接池中的Socket個數(shù)是有限的,但同時要用Socket的請求可能會很多,在這種情況下就會造成激烈的競爭和等待;還有一個需要注意的問題是合理控制等待響應(yīng)的超時時間,如不設(shè)定超時會導(dǎo)致當(dāng)服務(wù)器端處理變慢時,客戶端相關(guān)的請求都在做無限的等待,而客戶端的資源必然是有限的。因此這種情況下很容易造成當(dāng)服務(wù)器端出現(xiàn)問題時,客戶端掛掉的現(xiàn)象。超時時間具體設(shè)置為多少取決于客戶端能承受的請求量及服務(wù)器端的處理時間。既要保證性能,又要保證出錯率不會過高,對于直接基于TCP/IP+BIO的方式,可采用Socket.setSoTimeout來設(shè)置等待響應(yīng)的超時時間。

為了滿足服務(wù)器端能同時接受多個連接發(fā)送的請求,通常采用的方法是在accept獲取Socket后,將此Socket放入一個線程中處理,通常將此方式稱為一連接一線程。這樣服務(wù)器端就可接受多個連接發(fā)送請求了,這種方式的缺點是無論連接上是否有真實的請求,都要耗費一個線程。為避免創(chuàng)建過多的線程導(dǎo)致服務(wù)器端資源耗盡,須限制創(chuàng)建的線程數(shù)量,這就造成了在采用BIO的情況下服務(wù)器端所能支撐的連接數(shù)是有限的。

TCP/IP+NIO

在Java中可基于java.nio.channels中的Channel和Selector的相關(guān)類來實現(xiàn)TCP/IP+NIO方式的系統(tǒng)間通信。Channel有SocketChannel和ServerSocketChannel兩種,SocketChannel用于建立連接、監(jiān)聽事件及操作讀寫,ServerSocketChannel用于監(jiān)聽端口及監(jiān)聽連接事件;程序通過Selector來獲取是否有要處理的事件。基于這兩個類實現(xiàn)客戶端的關(guān)鍵代碼如下:

    SocketChannel channel=SocketChannel.open();
    // 設(shè)置為非阻塞模式
    channel.configureBlocking(false);
    //對于非阻塞模式,立刻返回false,表示連接正在建立中
    channel.connect(SocketAddress);
    Selector selector=Selector.open();
    // 向channel注冊selector以及感興趣的連接事件
    channel.register(selector,SelectionKey.OP_CONNECT);
    // 阻塞至有感興趣的IO事件發(fā)生,或到達超時時間,如果希望一直等至有感興趣的IO事件發(fā)生,可調(diào)用
    無參數(shù)的select方法,如果希望不阻塞直接返回目前是否有感興趣的事件發(fā)生,可調(diào)用selectNow方法
    int nKeys=selector.select(以毫秒為單位的超時時間)
    // 如nKeys大于零,說明有感興趣的IO事件發(fā)生
    SelectionKey sKey=null;
    if(nKeys>0){
      Set<SelectionKey> keys=selector.selectedKeys();
    for(SelectionKey key:keys){
        // 對于發(fā)生連接的事件
                if(key.isConnectable()){
                    SocketChannel sc=(SocketChannel) key.channel();
                    sc.configureBlocking(false);
                    // 注冊感興趣的IO讀事件,通常不直接注冊寫事件,在發(fā)送緩沖區(qū)未滿的情況下,一
    直是可寫的,因此如注冊了寫事件,而又不用寫數(shù)據(jù),很容易造成CPU消耗100%的現(xiàn)象
    sKey = sc.register(selector, SelectionKey.OP_READ);
    // 完成連接的建立
                    sc.finishConnect();
            }
            // 有流可讀取
                else if(key.isReadable()){
                    ByteBuffer buffer=ByteBuffer.allocate(1024);
                    SocketChannel sc=(SocketChannel) key.channel();
                    int readBytes=0;
                    try{
                        int ret=0;
                        try{
                            // 讀取目前可讀的流,sc.read返回的為成功復(fù)制到bytebuffer中的字
    節(jié)數(shù),此步為阻塞操作,值可能為0;當(dāng)已經(jīng)是流的結(jié)尾時,返回-1
    while((ret=sc.read(buffer))>0){
                              readBytes+=ret;
                            }
                         }
                         finally{
                             buffer.flip();
                         }
                      }
                      finally{
                         if(buffer!=null){
                             buffer.clear();
                         }
                      }
                   }
                   // 可寫入流
                   else if(key.isWritable()){
                       // 取消對OP_WRITE事件的注冊
                       key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                       SocketChannel sc=(SocketChannel) key.channel();
                       // 此步為阻塞操作,直到寫入操作系統(tǒng)發(fā)送緩沖區(qū)或網(wǎng)絡(luò)IO出現(xiàn)異常,返回的為成功
    寫入的字節(jié)數(shù),當(dāng)操作系統(tǒng)的發(fā)送緩沖區(qū)已滿,此處會返回0
    int writtenedSize=sc.write(ByteBuffer);
    // 如未寫入,則繼續(xù)注冊感興趣的OP_WRITE事件
    if(writtenedSize==0){
       key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    }
    }
          }
       selector.selectedKeys().clear();
    }
    // 對于要寫入的流,可直接調(diào)用channel.write來完成,只有在寫入未成功時才要注冊O(shè)P_WRITE事件
    int wSize=channel.write(ByteBuffer);
    if(wSize==0){
       key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    }

從上可見,NIO是典型的Reactor模式的實現(xiàn),通過注冊感興趣的事件及掃描是否有感興趣的事件發(fā)生,從而做相應(yīng)的動作。

服務(wù)器端關(guān)鍵代碼如下:

    ServerSocketChannel ssc=ServerSocketChannel.open();
      ServerSocket serverSocket=ssc.socket();
      // 綁定要監(jiān)聽的端口
    serverSocket.bind(new InetSocketAddress(port));
      ssc.configureBlocking(false);
    // 注冊感興趣的連接建立事件
    ssc.register(selector, SelectionKey.OP_ACCEPT);

之后則可采取和客戶端同樣的方式對selector.select進行輪詢,只是要增加一個對key.isAcceptable的處理,代碼如下:

    if(key.isAcceptable()){
      ServerSocketChannel server=(ServerSocketChannel)key.channel();
      SocketChannel sc=server.accept();
      if(sc==null){
            continue;
    }
    sc.configureBlocking(false);
    sc.register(selector,SelectionKey.OP_READ);
    }

上面只是基于TCP/IP+NIO實現(xiàn)的一個簡單例子,同樣來看看基于TCP/IP+NIO如何支撐客戶端同時發(fā)送多個請求及服務(wù)器端接受多個連接發(fā)送的請求。

對于客戶端發(fā)送多個請求的需求,采用TCP/IP+NIO和采用TCP/IP+BIO的方式?jīng)]有任何不同。但NIO方式可做到不阻塞,因此如果服務(wù)器端返回的響應(yīng)能帶上請求標(biāo)識,那么客戶端則可采用連接復(fù)用的方式,即每個SocketChannel在發(fā)送消息后,不用等響應(yīng)即可繼續(xù)發(fā)送其他消息,這種方式可降低連接池帶來的資源爭搶的問題,從而提升系統(tǒng)性能;對于連接不復(fù)用的情況,可基于Socket.setSoTimeout的方式來控制同步請求的超時;對于連接復(fù)用的情況,同步請求的超時可基于BlockingQueue、對象的wait/notify機制或Future機制來實現(xiàn)。

對于服務(wù)器端接受多個連接請求的需求,通常采用的是由一個線程來監(jiān)聽連接的事件,另一個或多個線程來監(jiān)聽網(wǎng)絡(luò)流讀寫的事件。當(dāng)有實際的網(wǎng)絡(luò)流讀寫事件發(fā)生后,再放入線程池中處理。這種方式比TCP/IP+BIO的好處在于可接受很多的連接,而這些連接只在有真實的請求時才會創(chuàng)建線程來處理,這種方式通常又稱為一請求一線程。當(dāng)連接數(shù)不多,或連接數(shù)較多,且連接上的請求發(fā)送非常頻繁時,TCP/IP+NIO的方式不會帶來太大的優(yōu)勢,但在實際的場景中,通常是服務(wù)器端要支持大量的連接數(shù),但這些連接同時發(fā)送的請求并不會非常多。

在基于Sun JDK開發(fā)Java NIO程序時,尤其要注意selector.select拋出IOException異常的處理http://bugs.sun.com/view_bug.do?bug_id=6693490及selector.select不阻塞就直接返回的情況http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933。這兩種狀況都有可能造成CPU消耗達到100%,對于selector.select拋出IOException的狀況,可以采用繞過的方法為捕捉異常并將當(dāng)前Thread sleep一段時間,或是重新創(chuàng)建Selector。為避免selector.select不阻塞就直接返回,可采用bug庫中提到的修改建議。

從上面可以看出,對于高訪問量的系統(tǒng)而言,TCP/IP+NIO方式結(jié)合一定的改造在客戶端能夠帶來更高的性能,在服務(wù)器端能支撐更高的連接數(shù)。

UDP/IP+BIO

Java對UDP/IP方式的網(wǎng)絡(luò)數(shù)據(jù)傳輸同樣采用Socket機制,只是UDP/IP下的Socket沒有建立連接的要求,由于UDP/IP是無連接的,因此無法進行雙向的通信。這也就要求如果要雙向通信的話,必須兩端都成為UDP Server。

在Java中可基于DatagramSocket和DatagramPacket來實現(xiàn)UDP/IP+BIO方式的系統(tǒng)間通信, DatagramSocket負責(zé)監(jiān)聽端口及讀寫數(shù)據(jù)。DatagramPacket作為數(shù)據(jù)流對象進行傳輸,基于這兩個類實現(xiàn)客戶端的關(guān)鍵代碼如下:

    // 由于UDP/IP是無連接的,如果希望雙向通信,就必須啟動一個監(jiān)聽端口,承擔(dān)服務(wù)器的職責(zé),如不能
    綁定到指定端口,則拋出SocketException
    DatagramSocket serverSocket=new DatagramSocket(監(jiān)聽的端口);
    byte[] buffer=new byte[65507];
    DatagramPacket receivePacket=new DatagramPacket(buffer,buffer.length);
    DatagramSocket socket=new DatagramSocket();
    DatagramPacket packet=new DatagramPacket(datas,datas.length,server,port);
    // 阻塞發(fā)送packet到指定的服務(wù)器和端口,當(dāng)出現(xiàn)網(wǎng)絡(luò)IO異常時拋出IOException,當(dāng)連不上目標(biāo)地
    址和端口時,拋出PortUnreachableException
    socket.send(packet)
    // 阻塞并同步讀取流信息,如接收到的流信息比packet長度長,則刪除更長的信息,可通過調(diào)用
    DatagramSocket.setSoTimeout(以毫秒為單位的超時時間)來設(shè)置讀取流的超時時間
    serverSocket.receive(receivePacket)

服務(wù)器端代碼和客戶端代碼的結(jié)構(gòu)基本一致,這里就不列了。

由于UDP/IP通信的兩端不建立連接,就不會有TCP/IP通信連接競爭的問題,只是最終讀寫流的動作是同步的。

對于服務(wù)器端同時接收多請求的需求,通常采取每接收到一個packet就放入一個線程中進行處理的方式來實現(xiàn)。

UDP/IP+NIO

在Java中可通過DatagramChannel和ByteBuffer來實現(xiàn)UDP/IP+NIO方式的系統(tǒng)間通信, DatagramChannel負責(zé)監(jiān)聽端口及進行讀寫,ByteBuffer則用于數(shù)據(jù)流傳輸。基于這兩個類實現(xiàn)客戶端的關(guān)鍵代碼如下:

    DatagramChannel receiveChannel=DatagramChannel.open();
      receiveChannel.configureBlocking(false);
      DatagramSocket socket=receiveChannel.socket();
      socket.bind(new InetSocketAddress(rport));
      Selector selector=Selector.open();
      receiveChannel.register(selector, SelectionKey.OP_READ);
      之后即可采取和TCP/IP+NIO中對selector遍歷一樣的方式進行流信息的讀取。
      DatagramChannel sendChannel=DatagramChannel.open();
      sendChannel.configureBlocking(false);
      SocketAddress target=new InetSocketAddress("127.0.0.1",sport);
    sendChannel.connect(target);
    // 阻塞寫入流,如發(fā)送緩沖區(qū)已滿,則返回0,此時可通過注冊SelectionKey.OP_WRITE事件,以便
    在可寫入時再進行寫操作,方式和TCP/IP+NIO基本一致
    sendChannel.write(ByteBuffer);

服務(wù)端代碼和客戶端代碼基本一致,就不再一一描述。

從以上代碼來看,對于UDP/IP方式,NIO帶來的好處是只在有流要讀取或可寫入流時才做相應(yīng)的IO操作,而不用像BIO方式直接阻塞當(dāng)前線程。

以上列舉了基于Java包實現(xiàn)一對一的系統(tǒng)間通信的方式,在實際的場景中,通常還會要將消息發(fā)送到多臺機器,此時可以選擇為每個目標(biāo)機器建立一個連接,這種方式對于發(fā)送消息端會造成很大的網(wǎng)絡(luò)流量壓力。例如傳輸?shù)南⑹且曨l數(shù)據(jù)的場景,在網(wǎng)絡(luò)協(xié)議上還有一個基于UDP/IP擴展出來的多播協(xié)議,多播協(xié)議的傳輸方式是一份數(shù)據(jù)在網(wǎng)絡(luò)上進行傳輸,而不是由發(fā)送者給每個接收者都傳一份數(shù)據(jù),這樣,網(wǎng)絡(luò)的流量就大幅度下降了。

在Java中可基于MulticastSocket和DatagramPacket來實現(xiàn)多播網(wǎng)絡(luò)通信,MulticastSocket是基于DatagramSocket派生出來的類,其作用即為基于UDP/IP實現(xiàn)多播方式的網(wǎng)絡(luò)通信。在多播通信中,不同的地方在于接收數(shù)據(jù)端通過加入到多播組來進行數(shù)據(jù)的接收,同樣發(fā)送數(shù)據(jù)也要求加入到多播組進行發(fā)送,多播的目標(biāo)地址具有指定的地址范圍,在224.0.0.0和239.255.255.255之間。基于多播方式實現(xiàn)網(wǎng)絡(luò)通信的服務(wù)器端關(guān)鍵代碼如下:

    // 組播地址
    InetAddress groupAddress=InetAddress.getByName("224.1.1.1");
    MulticastSocket server=new MulticastSocket(port);
      // 加入組播,如地址為非組播地址,則拋出IOException,當(dāng)已經(jīng)不希望再發(fā)送數(shù)據(jù)到組播地址,或
    不希望再讀取數(shù)據(jù)時,可調(diào)用server.leaveGroup(組播地址)
    server.joinGroup(groupAddress);
      MulticastSocket client=new MulticastSocket();
    client.joinGroup(groupAddress);

之后則可和UDP/IP+BIO一樣通過receive和send方法來進行讀寫操作。

Client端代碼和服務(wù)端代碼基本一致,就不再列舉了。

在Java應(yīng)用中,多播通常用于多臺機器的狀態(tài)的同步。例如JGroups,默認基于UDP/IP多播協(xié)議,由于UDP/IP協(xié)議在數(shù)據(jù)傳輸時不夠可靠,對于可靠性要求很高的系統(tǒng),會希望采用多播方式,同時又要做到可靠。對于這樣的需求,業(yè)界提出了一些能夠確保可靠實現(xiàn)多播的方式:SRM(Scalable Reliable Multicast)http://ee.lbl.gov/papers/srm_ton.pdf、URGCP(Uniform Reliable Group Communication Protocol),其中SRM是在UDP/IP多播的基礎(chǔ)上增加了確認機制,從而保證可靠,eBay采用了SRM框架來實現(xiàn)將數(shù)據(jù)從主數(shù)據(jù)庫同步到各個搜索節(jié)點機器http://highscalability.com/ebay-architecture

從上面的介紹來看,使用Java包來實現(xiàn)基于消息方式的系統(tǒng)間通信還是比較麻煩。為了讓開發(fā)人員能更加專注對數(shù)據(jù)進行業(yè)務(wù)處理,而不用過多關(guān)注純技術(shù)細節(jié),開源業(yè)界誕生了很多優(yōu)秀的基于以上各種協(xié)議的系統(tǒng)間通信的框架。這其中的佼佼者就是Mina了,1.1.2節(jié)將會介紹這個佼佼者。

1.1.2 基于開源框架實現(xiàn)消息方式的系統(tǒng)間通信

這一節(jié)講述基于Mina如何實現(xiàn)消息方式的系統(tǒng)間通信,同時分析開源通信框架的優(yōu)勢。

Mina是Apache的頂級項目http://mina.apache.org,基于Java NIO構(gòu)建,同時支持TCP/IP和UDP/IP兩種協(xié)議。Mina對外屏蔽了Java NIO使用的復(fù)雜性,并在性能上做了不少的優(yōu)化。

在使用Mina時,關(guān)鍵的類為IoConnector、IoAcceptor、IoHandler及IoSession,Mina采用Filter Chain的方式封裝消息發(fā)送和接收的流程,在這個Filter Chain過程中可進行消息的處理、消息的發(fā)送和接收等。

IoConnector負責(zé)配置客戶端的消息處理器、IO事件處理線程池、消息發(fā)送/接收的Filter Chain等。

IoAcceptor負責(zé)配置服務(wù)器端的IO事件處理線程池、消息發(fā)送/接收的Filter Chain等。

IoHandler作為Mina和應(yīng)用的接口,當(dāng)發(fā)生了連接事件、IO事件或異常事件時,Mina都會通知應(yīng)用所實現(xiàn)的IoHandler。

IoSession有點類似SocketChannel的封裝,不過Mina對連接做了進一步的抽象,因此可進行更多連接的控制及流信息的輸出。

基于Mina實現(xiàn)TCP/IP+NIO客戶端的關(guān)鍵代碼如下:

    // 創(chuàng)建一個線程池大小為CPU核數(shù)+1的SocketConnector對象
    SocketConnector ioConnector = new
    SocketConnector(Runtime.getRuntime().availableProcessors() + 1,
    Executors.newCachedThreadPool());
    // 設(shè)置TCP NoDelay為true
    ioConnector.getDefaultConfig().getSessionConfig().setTcpNoDelay(true);
    // 增加一個將發(fā)送對象進行序列化以及接收字節(jié)流進行反序列化的類至filter Chain
    ioConnector.getFilterChain().addLast("stringserialize", new
    ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
    // IoHandler的實現(xiàn),以便當(dāng)mina建立連接、接收到消息后通知應(yīng)用
    IoHandler handler=new IoHandlerAdapter(){
                public void messageReceived(IoSession session, Object message)
                        throws Exception {
                    System.out.println(message);
                }
      };
            // 異步建立連接
    ConnectFuture connectFuture = ioConnector.connect(socketAddress,handler);
            // 阻塞等待連接建立完畢,如須設(shè)置連接創(chuàng)建的超時時間,可調(diào)用
    SocketConnectorConfig.setConnectTimeout(以秒為單位的超時時間)
      connectFuture.join();
    IoSession session=connectFuture.getSession();
    // 發(fā)送對象
    session.write(Object);

使用Mina后,客戶端的代碼變得簡單多了。

服務(wù)器端關(guān)鍵代碼如下:

    // 創(chuàng)建一個線程池大小為CPU核數(shù)+1的IoAcceptor對象
    final IoAcceptor acceptor=new
    SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1,
                                  Executors.newCachedThreadPool());
    acceptor.getFilterChain().addLast("stringserialize", new
    ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
    IoHandler handler=new IoHandlerAdapter(){
                public void messageReceived(IoSession session, Object message)
                        throws Exception {
                    // 接收到客戶端發(fā)送的對象
                }
            };
    // 綁定監(jiān)聽的端口以及當(dāng)有連接建立、接收到對象等事件發(fā)送時需要通知的IoHandler對象
    acceptor.bind(new InetSocketAddress(port), handler);

采用Mina后,服務(wù)器端代碼無須再關(guān)注建立連接時的OP_ACCEPT的事件,同樣也無須去注冊O(shè)P_READ、OP_WRITE這些Key的事件,取而代之的是更友好地使用接口。通過這些封裝使用者可以非常方便地使用,而無須過多考慮Java NIO的用法。但Mina 2.0之前的版本中并未提供連接的管理(連接的創(chuàng)建、自動重連、連接的心跳、連接池等)、同步發(fā)送數(shù)據(jù)支持,因此在實際使用中通常還需在Mina的基礎(chǔ)上進行封裝。

在使用Mina 2.0之前的版本時,以下幾個方面值得注意:

● 使用自定義的ThreadModel

通過SocketConnectorConfig.setThreadMode(l ThreadModel.MANUAL)將線程模式改為自定義模式,否則Mina會自行啟動一個最大線程數(shù)為16個的線程池來處理具體的消息,這對于多數(shù)應(yīng)用而言都不適用,因此最好是自行控制具體處理消息的線程池。

● 合理配置IO處理線程池

在創(chuàng)建SocketAcceptor或SocketConnector時要提供一個線程池及最大的線程數(shù),也就是Mina用于IO事件處理的線程數(shù),通常建議將這個線程數(shù)配置為CPU核數(shù)+1。

● 監(jiān)聽是否成功寫入操作系統(tǒng)的發(fā)送緩沖區(qū)

在調(diào)用IoSession.write時,Mina并不確保其會成功寫入操作系統(tǒng)的發(fā)送緩沖區(qū)中(例如寫入時連接剛好斷開),為了確定是否成功,可采用如下方法:

    WriteFuture writeResult=session.write(data);
    writeResult.addListener(new IoFutureListener() {
      public void operationComplete(IoFuture future){
          WriteFuture wfuture=(WriteFuture)future;
          // 寫入成功
          if(wfuture.isWritten()){
              return;
          }
              // 寫入失敗,自行進行處理
      }
    });

這對于同步請求而言特別重要,通常同步請求時都會設(shè)置一個等待響應(yīng)的超時時間,如果不去監(jiān)聽是否成功寫入的話,那么同步的請求一直要等到設(shè)定的超時時間才能返回。

● 監(jiān)聽寫超時

當(dāng)接收消息方的接收緩沖區(qū)占滿時,發(fā)送方會出現(xiàn)寫超時的現(xiàn)象,這時Mina會向外拋出WriteTimeoutException,如有必要,可在IoHandler實現(xiàn)的exceptionCaught方法里進行處理。

● 借助Mina IoSession上未發(fā)送的bytes信息實現(xiàn)流控

當(dāng)IoSession上堆積了過多未發(fā)送的byte時,會造成jvm內(nèi)存消耗過多的現(xiàn)象。因此通常要控制IoSession上堆積的未發(fā)送的byte量,此值可通過Mina IoSession的getScheduledWriteBytes來獲取,從而進行流控。

● messageReceived方法占用IO處理線程

在使用Thread.MANUAL的情況下,IOHandler里的messageReceived方法會占用Mina的IO處理線程,為了避免業(yè)務(wù)處理接收消息的速度影響IO處理性能,建議在此方法中另起線程來做業(yè)務(wù)處理。

● 序列化/反序列化過程會占用IO處理線程

由于Mina的序列化/反序列化過程是在FilterChain上做的,同樣會占據(jù)IO處理線程。Mina將同一連接上需要發(fā)送和接收的消息放在隊列中串行處理。如果序列化/反序列化過程耗時較長,就會造成同一連接上其他消息的接收或發(fā)送變慢。

● 反序列化時注意繼承CumulativeProtocolDecoder

在使用NIO的情況下,每次讀取的流并不一定完整,因此要通過繼承CumulativeProtocolDecoder來確保當(dāng)流沒讀完時,下次接著讀,這同時也要求應(yīng)用在協(xié)議頭中保持此次發(fā)送流的長度信息。

● Mina 1.1.6及以前的版本中sessionClosed可能會不被調(diào)用的bug

在某些高壓力的情況下,當(dāng)連接斷開時,Mina 1.1.6及以前的版本并不會調(diào)用IoHandler中的sessionClosed方法,這對于某些要在sessionClosed做相應(yīng)處理的應(yīng)用來說會出現(xiàn)問題,這個bughttps://issues.apache.org/jira/browse/DIRMINA-549在Mina 1.1.7的版本中已修復(fù)。

除了Mina之外,JBoss Netty也是現(xiàn)在一個廣受關(guān)注的Java通信框架,其作者也是Mina的作者(Trustin Lee),據(jù)評測JBoss Netty的性能好于Minahttp://www.jboss.org/netty/performance,如讀者感興趣,可訪問http://www.jboss.org/netty來了解更多的細節(jié)。

以上兩節(jié)介紹了基于Java自身包及開源通信框架來實現(xiàn)消息方式的系統(tǒng)間通信,Java系統(tǒng)內(nèi)的通信都是以Java對象調(diào)用的方式來實現(xiàn)的,例如A a =new AImpl();a.call();,但當(dāng)系統(tǒng)變成分布式后,就無法用以上的方式直接調(diào)用了,因為在調(diào)用端并不會有AImpl這個類。這時如果通過基于以上的消息方式來做,對于開發(fā)而言就會顯得比較晦澀了,因此Java中也提供了各種各樣的支持對象方式的系統(tǒng)間通信的技術(shù),例如RMI、WebService等。同樣,在Java中也有眾多的開源框架提供了RMI、WebService的實現(xiàn)和封裝,例如Spring RMI、CXF等,下面來看看基于遠程調(diào)用方式如何實現(xiàn)系統(tǒng)間的通信。

主站蜘蛛池模板: 信阳市| 峨边| 湖口县| 宝坻区| 乡城县| 卢湾区| 永登县| 苏尼特左旗| 乌鲁木齐市| 汶上县| 承德县| 分宜县| 无为县| 苍梧县| 刚察县| 汉沽区| 深泽县| 宝山区| 芮城县| 遵义市| 榕江县| 兴业县| 长阳| 泊头市| 延长县| 郸城县| 武乡县| 石嘴山市| 西城区| 通海县| 龙岩市| 磐石市| 靖远县| 云阳县| 龙游县| 赫章县| 姜堰市| 宜春市| 石嘴山市| 长沙县| 治多县|