- 分布式Java應(yīng)用
- 林昊著
- 6382字
- 2018-12-27 17:09:11
1.1 基于消息方式實現(xiàn)系統(tǒng)間的通信
1.1.1 基于Java自身技術(shù)實現(xiàn)消息方式的系統(tǒng)間通信
基于Java自身包實現(xiàn)消息方式的系統(tǒng)間通信的方式有:TCP/IP+BIO、TCP/IP+NIO、UDP/IP+BIO以及UDP/IP+NIO 4種,下面分別介紹如何實現(xiàn)這4種方式的系統(tǒng)間通信。
TCP/IP+BIO
在Java中可基于Socket、ServerSocket來實現(xiàn)TCP/IP+BIO的系統(tǒng)間通信。Socket主要用于實現(xiàn)建立連接及網(wǎng)絡(luò)IO的操作,ServerSocket主要用于實現(xiàn)服務(wù)器端端口的監(jiān)聽及Socket對象的獲取。基于Socket實現(xiàn)客戶端的關(guān)鍵代碼如下:
// 創(chuàng)建連接,如果域名解析不了會拋出UnknownHostException,當(dāng)連接不上時會拋出IOException, 如果希望控制建立連接的超時,可先調(diào)用new Socket(),然后調(diào)用socket.connect(SocketAddress 類型的目標(biāo)地址,以毫秒為單位的超時時間) Socket socket=new Socket(目標(biāo)IP或域名,目標(biāo)端口); // 創(chuàng)建讀取服務(wù)器端返回流的BufferedReader BufferedReader in=new BufferedReader(new InputStreamReader(socket.getInputStream())); // 創(chuàng)建向服務(wù)器寫入流的PrintWriter PrintWriter out=new PrintWriter(socket.getOutputStream(),true); // 向服務(wù)器發(fā)送字符串信息,要注意的是,此處即使寫失敗也不會拋出異常信息,并且一直會阻塞到寫入 操作系統(tǒng)或網(wǎng)絡(luò)IO出現(xiàn)異常為止 out.println(“hello”); // 阻塞讀取服務(wù)端的返回信息,以下代碼會阻塞到服務(wù)端返回信息或網(wǎng)絡(luò)IO出現(xiàn)異常為止,如果希望在超 過一段時間后就不阻塞了,那么要在創(chuàng)建Socket對象后調(diào)用socket.setSoTimeout(以毫秒為單位的超 時時間) in.readLine();
服務(wù)器端關(guān)鍵代碼如下:
// 創(chuàng)建對本地指定端口的監(jiān)聽,如端口沖突則拋出SocketException,其他網(wǎng)絡(luò)IO方面的異常則拋出 IOException ServerSocket ss=new ServerSocket(監(jiān)聽的端口) // 接受客戶端建立連接的請求,并返回Socket對象,以便和客戶端進行交互,交互的方式和客戶端相同, 也是通過Socket.getInputStream和Socket.getOutputStream來進行讀寫操作,此方法會一直阻 塞到有客戶端發(fā)送建立連接的請求,如果希望此方法最多阻塞一定的時間,則要在創(chuàng)建ServerSocket后 調(diào)用其setSoTimeout(以毫秒為單位的超時時間) Socket socket=ss.accept();
上面是基于Socket、ServerSocket實現(xiàn)的一個簡單的系統(tǒng)間通信的例子。而在實際的系統(tǒng)中,通常要面對的是客戶端同時要發(fā)送多個請求到服務(wù)器端,服務(wù)器端則同時要接受多個連接發(fā)送的請求,上面的代碼顯然是無法滿足的。
為了滿足客戶端能同時發(fā)送多個請求到服務(wù)器端,最簡單的方法就是生成多個Socket。但這里會產(chǎn)生兩個問題:一是生成太多的Socket會消耗過多的本地資源,在客戶端機器多,服務(wù)器端機器少的情況下,客戶端生成太多Socket會導(dǎo)致服務(wù)器端須要支撐非常高的連接數(shù);二是生成Socket(建立連接)通常是比較慢的,因此頻繁地創(chuàng)建會導(dǎo)致系統(tǒng)性能不足。鑒于這兩個問題,通常采用連接池的方式來維護Socket是比較好的,一方面限制了能創(chuàng)建的Socket的個數(shù);另一方面由于將Socket放入了池中,避免了重復(fù)創(chuàng)建Socket帶來的性能下降問題。數(shù)據(jù)庫連接池就是這種方式的典型代表,但連接池的方式會帶來另一個問題,連接池中的Socket個數(shù)是有限的,但同時要用Socket的請求可能會很多,在這種情況下就會造成激烈的競爭和等待;還有一個需要注意的問題是合理控制等待響應(yīng)的超時時間,如不設(shè)定超時會導(dǎo)致當(dāng)服務(wù)器端處理變慢時,客戶端相關(guān)的請求都在做無限的等待,而客戶端的資源必然是有限的。因此這種情況下很容易造成當(dāng)服務(wù)器端出現(xiàn)問題時,客戶端掛掉的現(xiàn)象。超時時間具體設(shè)置為多少取決于客戶端能承受的請求量及服務(wù)器端的處理時間。既要保證性能,又要保證出錯率不會過高,對于直接基于TCP/IP+BIO的方式,可采用Socket.setSoTimeout來設(shè)置等待響應(yīng)的超時時間。
為了滿足服務(wù)器端能同時接受多個連接發(fā)送的請求,通常采用的方法是在accept獲取Socket后,將此Socket放入一個線程中處理,通常將此方式稱為一連接一線程。這樣服務(wù)器端就可接受多個連接發(fā)送請求了,這種方式的缺點是無論連接上是否有真實的請求,都要耗費一個線程。為避免創(chuàng)建過多的線程導(dǎo)致服務(wù)器端資源耗盡,須限制創(chuàng)建的線程數(shù)量,這就造成了在采用BIO的情況下服務(wù)器端所能支撐的連接數(shù)是有限的。
TCP/IP+NIO
在Java中可基于java.nio.channels中的Channel和Selector的相關(guān)類來實現(xiàn)TCP/IP+NIO方式的系統(tǒng)間通信。Channel有SocketChannel和ServerSocketChannel兩種,SocketChannel用于建立連接、監(jiān)聽事件及操作讀寫,ServerSocketChannel用于監(jiān)聽端口及監(jiān)聽連接事件;程序通過Selector來獲取是否有要處理的事件。基于這兩個類實現(xiàn)客戶端的關(guān)鍵代碼如下:
SocketChannel channel=SocketChannel.open(); // 設(shè)置為非阻塞模式 channel.configureBlocking(false); //對于非阻塞模式,立刻返回false,表示連接正在建立中 channel.connect(SocketAddress); Selector selector=Selector.open(); // 向channel注冊selector以及感興趣的連接事件 channel.register(selector,SelectionKey.OP_CONNECT); // 阻塞至有感興趣的IO事件發(fā)生,或到達超時時間,如果希望一直等至有感興趣的IO事件發(fā)生,可調(diào)用 無參數(shù)的select方法,如果希望不阻塞直接返回目前是否有感興趣的事件發(fā)生,可調(diào)用selectNow方法 int nKeys=selector.select(以毫秒為單位的超時時間) // 如nKeys大于零,說明有感興趣的IO事件發(fā)生 SelectionKey sKey=null; if(nKeys>0){ Set<SelectionKey> keys=selector.selectedKeys(); for(SelectionKey key:keys){ // 對于發(fā)生連接的事件 if(key.isConnectable()){ SocketChannel sc=(SocketChannel) key.channel(); sc.configureBlocking(false); // 注冊感興趣的IO讀事件,通常不直接注冊寫事件,在發(fā)送緩沖區(qū)未滿的情況下,一 直是可寫的,因此如注冊了寫事件,而又不用寫數(shù)據(jù),很容易造成CPU消耗100%的現(xiàn)象 sKey = sc.register(selector, SelectionKey.OP_READ); // 完成連接的建立 sc.finishConnect(); } // 有流可讀取 else if(key.isReadable()){ ByteBuffer buffer=ByteBuffer.allocate(1024); SocketChannel sc=(SocketChannel) key.channel(); int readBytes=0; try{ int ret=0; try{ // 讀取目前可讀的流,sc.read返回的為成功復(fù)制到bytebuffer中的字 節(jié)數(shù),此步為阻塞操作,值可能為0;當(dāng)已經(jīng)是流的結(jié)尾時,返回-1 while((ret=sc.read(buffer))>0){ readBytes+=ret; } } finally{ buffer.flip(); } } finally{ if(buffer!=null){ buffer.clear(); } } } // 可寫入流 else if(key.isWritable()){ // 取消對OP_WRITE事件的注冊 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); SocketChannel sc=(SocketChannel) key.channel(); // 此步為阻塞操作,直到寫入操作系統(tǒng)發(fā)送緩沖區(qū)或網(wǎng)絡(luò)IO出現(xiàn)異常,返回的為成功 寫入的字節(jié)數(shù),當(dāng)操作系統(tǒng)的發(fā)送緩沖區(qū)已滿,此處會返回0 int writtenedSize=sc.write(ByteBuffer); // 如未寫入,則繼續(xù)注冊感興趣的OP_WRITE事件 if(writtenedSize==0){ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } } } selector.selectedKeys().clear(); } // 對于要寫入的流,可直接調(diào)用channel.write來完成,只有在寫入未成功時才要注冊O(shè)P_WRITE事件 int wSize=channel.write(ByteBuffer); if(wSize==0){ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); }
從上可見,NIO是典型的Reactor模式的實現(xiàn),通過注冊感興趣的事件及掃描是否有感興趣的事件發(fā)生,從而做相應(yīng)的動作。
服務(wù)器端關(guān)鍵代碼如下:
ServerSocketChannel ssc=ServerSocketChannel.open(); ServerSocket serverSocket=ssc.socket(); // 綁定要監(jiān)聽的端口 serverSocket.bind(new InetSocketAddress(port)); ssc.configureBlocking(false); // 注冊感興趣的連接建立事件 ssc.register(selector, SelectionKey.OP_ACCEPT);
之后則可采取和客戶端同樣的方式對selector.select進行輪詢,只是要增加一個對key.isAcceptable的處理,代碼如下:
if(key.isAcceptable()){ ServerSocketChannel server=(ServerSocketChannel)key.channel(); SocketChannel sc=server.accept(); if(sc==null){ continue; } sc.configureBlocking(false); sc.register(selector,SelectionKey.OP_READ); }
上面只是基于TCP/IP+NIO實現(xiàn)的一個簡單例子,同樣來看看基于TCP/IP+NIO如何支撐客戶端同時發(fā)送多個請求及服務(wù)器端接受多個連接發(fā)送的請求。
對于客戶端發(fā)送多個請求的需求,采用TCP/IP+NIO和采用TCP/IP+BIO的方式?jīng)]有任何不同。但NIO方式可做到不阻塞,因此如果服務(wù)器端返回的響應(yīng)能帶上請求標(biāo)識,那么客戶端則可采用連接復(fù)用的方式,即每個SocketChannel在發(fā)送消息后,不用等響應(yīng)即可繼續(xù)發(fā)送其他消息,這種方式可降低連接池帶來的資源爭搶的問題,從而提升系統(tǒng)性能;對于連接不復(fù)用的情況,可基于Socket.setSoTimeout的方式來控制同步請求的超時;對于連接復(fù)用的情況,同步請求的超時可基于BlockingQueue、對象的wait/notify機制或Future機制來實現(xiàn)。
對于服務(wù)器端接受多個連接請求的需求,通常采用的是由一個線程來監(jiān)聽連接的事件,另一個或多個線程來監(jiān)聽網(wǎng)絡(luò)流讀寫的事件。當(dāng)有實際的網(wǎng)絡(luò)流讀寫事件發(fā)生后,再放入線程池中處理。這種方式比TCP/IP+BIO的好處在于可接受很多的連接,而這些連接只在有真實的請求時才會創(chuàng)建線程來處理,這種方式通常又稱為一請求一線程。當(dāng)連接數(shù)不多,或連接數(shù)較多,且連接上的請求發(fā)送非常頻繁時,TCP/IP+NIO的方式不會帶來太大的優(yōu)勢,但在實際的場景中,通常是服務(wù)器端要支持大量的連接數(shù),但這些連接同時發(fā)送的請求并不會非常多。
在基于Sun JDK開發(fā)Java NIO程序時,尤其要注意selector.select拋出IOException異常的處理及selector.select不阻塞就直接返回的情況
。這兩種狀況都有可能造成CPU消耗達到100%,對于selector.select拋出IOException的狀況,可以采用繞過的方法為捕捉異常并將當(dāng)前Thread sleep一段時間,或是重新創(chuàng)建Selector。為避免selector.select不阻塞就直接返回,可采用bug庫中提到的修改建議。
從上面可以看出,對于高訪問量的系統(tǒng)而言,TCP/IP+NIO方式結(jié)合一定的改造在客戶端能夠帶來更高的性能,在服務(wù)器端能支撐更高的連接數(shù)。
UDP/IP+BIO
Java對UDP/IP方式的網(wǎng)絡(luò)數(shù)據(jù)傳輸同樣采用Socket機制,只是UDP/IP下的Socket沒有建立連接的要求,由于UDP/IP是無連接的,因此無法進行雙向的通信。這也就要求如果要雙向通信的話,必須兩端都成為UDP Server。
在Java中可基于DatagramSocket和DatagramPacket來實現(xiàn)UDP/IP+BIO方式的系統(tǒng)間通信, DatagramSocket負責(zé)監(jiān)聽端口及讀寫數(shù)據(jù)。DatagramPacket作為數(shù)據(jù)流對象進行傳輸,基于這兩個類實現(xiàn)客戶端的關(guān)鍵代碼如下:
// 由于UDP/IP是無連接的,如果希望雙向通信,就必須啟動一個監(jiān)聽端口,承擔(dān)服務(wù)器的職責(zé),如不能 綁定到指定端口,則拋出SocketException DatagramSocket serverSocket=new DatagramSocket(監(jiān)聽的端口); byte[] buffer=new byte[65507]; DatagramPacket receivePacket=new DatagramPacket(buffer,buffer.length); DatagramSocket socket=new DatagramSocket(); DatagramPacket packet=new DatagramPacket(datas,datas.length,server,port); // 阻塞發(fā)送packet到指定的服務(wù)器和端口,當(dāng)出現(xiàn)網(wǎng)絡(luò)IO異常時拋出IOException,當(dāng)連不上目標(biāo)地 址和端口時,拋出PortUnreachableException socket.send(packet) // 阻塞并同步讀取流信息,如接收到的流信息比packet長度長,則刪除更長的信息,可通過調(diào)用 DatagramSocket.setSoTimeout(以毫秒為單位的超時時間)來設(shè)置讀取流的超時時間 serverSocket.receive(receivePacket)
服務(wù)器端代碼和客戶端代碼的結(jié)構(gòu)基本一致,這里就不列了。
由于UDP/IP通信的兩端不建立連接,就不會有TCP/IP通信連接競爭的問題,只是最終讀寫流的動作是同步的。
對于服務(wù)器端同時接收多請求的需求,通常采取每接收到一個packet就放入一個線程中進行處理的方式來實現(xiàn)。
UDP/IP+NIO
在Java中可通過DatagramChannel和ByteBuffer來實現(xiàn)UDP/IP+NIO方式的系統(tǒng)間通信, DatagramChannel負責(zé)監(jiān)聽端口及進行讀寫,ByteBuffer則用于數(shù)據(jù)流傳輸。基于這兩個類實現(xiàn)客戶端的關(guān)鍵代碼如下:
DatagramChannel receiveChannel=DatagramChannel.open(); receiveChannel.configureBlocking(false); DatagramSocket socket=receiveChannel.socket(); socket.bind(new InetSocketAddress(rport)); Selector selector=Selector.open(); receiveChannel.register(selector, SelectionKey.OP_READ); 之后即可采取和TCP/IP+NIO中對selector遍歷一樣的方式進行流信息的讀取。 DatagramChannel sendChannel=DatagramChannel.open(); sendChannel.configureBlocking(false); SocketAddress target=new InetSocketAddress("127.0.0.1",sport); sendChannel.connect(target); // 阻塞寫入流,如發(fā)送緩沖區(qū)已滿,則返回0,此時可通過注冊SelectionKey.OP_WRITE事件,以便 在可寫入時再進行寫操作,方式和TCP/IP+NIO基本一致 sendChannel.write(ByteBuffer);
服務(wù)端代碼和客戶端代碼基本一致,就不再一一描述。
從以上代碼來看,對于UDP/IP方式,NIO帶來的好處是只在有流要讀取或可寫入流時才做相應(yīng)的IO操作,而不用像BIO方式直接阻塞當(dāng)前線程。
以上列舉了基于Java包實現(xiàn)一對一的系統(tǒng)間通信的方式,在實際的場景中,通常還會要將消息發(fā)送到多臺機器,此時可以選擇為每個目標(biāo)機器建立一個連接,這種方式對于發(fā)送消息端會造成很大的網(wǎng)絡(luò)流量壓力。例如傳輸?shù)南⑹且曨l數(shù)據(jù)的場景,在網(wǎng)絡(luò)協(xié)議上還有一個基于UDP/IP擴展出來的多播協(xié)議,多播協(xié)議的傳輸方式是一份數(shù)據(jù)在網(wǎng)絡(luò)上進行傳輸,而不是由發(fā)送者給每個接收者都傳一份數(shù)據(jù),這樣,網(wǎng)絡(luò)的流量就大幅度下降了。
在Java中可基于MulticastSocket和DatagramPacket來實現(xiàn)多播網(wǎng)絡(luò)通信,MulticastSocket是基于DatagramSocket派生出來的類,其作用即為基于UDP/IP實現(xiàn)多播方式的網(wǎng)絡(luò)通信。在多播通信中,不同的地方在于接收數(shù)據(jù)端通過加入到多播組來進行數(shù)據(jù)的接收,同樣發(fā)送數(shù)據(jù)也要求加入到多播組進行發(fā)送,多播的目標(biāo)地址具有指定的地址范圍,在224.0.0.0和239.255.255.255之間。基于多播方式實現(xiàn)網(wǎng)絡(luò)通信的服務(wù)器端關(guān)鍵代碼如下:
// 組播地址 InetAddress groupAddress=InetAddress.getByName("224.1.1.1"); MulticastSocket server=new MulticastSocket(port); // 加入組播,如地址為非組播地址,則拋出IOException,當(dāng)已經(jīng)不希望再發(fā)送數(shù)據(jù)到組播地址,或 不希望再讀取數(shù)據(jù)時,可調(diào)用server.leaveGroup(組播地址) server.joinGroup(groupAddress); MulticastSocket client=new MulticastSocket(); client.joinGroup(groupAddress);
之后則可和UDP/IP+BIO一樣通過receive和send方法來進行讀寫操作。
Client端代碼和服務(wù)端代碼基本一致,就不再列舉了。
在Java應(yīng)用中,多播通常用于多臺機器的狀態(tài)的同步。例如JGroups,默認基于UDP/IP多播協(xié)議,由于UDP/IP協(xié)議在數(shù)據(jù)傳輸時不夠可靠,對于可靠性要求很高的系統(tǒng),會希望采用多播方式,同時又要做到可靠。對于這樣的需求,業(yè)界提出了一些能夠確保可靠實現(xiàn)多播的方式:SRM(Scalable Reliable Multicast)、URGCP(Uniform Reliable Group Communication Protocol),其中SRM是在UDP/IP多播的基礎(chǔ)上增加了確認機制,從而保證可靠,eBay采用了SRM框架來實現(xiàn)將數(shù)據(jù)從主數(shù)據(jù)庫同步到各個搜索節(jié)點機器
。
從上面的介紹來看,使用Java包來實現(xiàn)基于消息方式的系統(tǒng)間通信還是比較麻煩。為了讓開發(fā)人員能更加專注對數(shù)據(jù)進行業(yè)務(wù)處理,而不用過多關(guān)注純技術(shù)細節(jié),開源業(yè)界誕生了很多優(yōu)秀的基于以上各種協(xié)議的系統(tǒng)間通信的框架。這其中的佼佼者就是Mina了,1.1.2節(jié)將會介紹這個佼佼者。
1.1.2 基于開源框架實現(xiàn)消息方式的系統(tǒng)間通信
這一節(jié)講述基于Mina如何實現(xiàn)消息方式的系統(tǒng)間通信,同時分析開源通信框架的優(yōu)勢。
Mina是Apache的頂級項目,基于Java NIO構(gòu)建,同時支持TCP/IP和UDP/IP兩種協(xié)議。Mina對外屏蔽了Java NIO使用的復(fù)雜性,并在性能上做了不少的優(yōu)化。
在使用Mina時,關(guān)鍵的類為IoConnector、IoAcceptor、IoHandler及IoSession,Mina采用Filter Chain的方式封裝消息發(fā)送和接收的流程,在這個Filter Chain過程中可進行消息的處理、消息的發(fā)送和接收等。
IoConnector負責(zé)配置客戶端的消息處理器、IO事件處理線程池、消息發(fā)送/接收的Filter Chain等。
IoAcceptor負責(zé)配置服務(wù)器端的IO事件處理線程池、消息發(fā)送/接收的Filter Chain等。
IoHandler作為Mina和應(yīng)用的接口,當(dāng)發(fā)生了連接事件、IO事件或異常事件時,Mina都會通知應(yīng)用所實現(xiàn)的IoHandler。
IoSession有點類似SocketChannel的封裝,不過Mina對連接做了進一步的抽象,因此可進行更多連接的控制及流信息的輸出。
基于Mina實現(xiàn)TCP/IP+NIO客戶端的關(guān)鍵代碼如下:
// 創(chuàng)建一個線程池大小為CPU核數(shù)+1的SocketConnector對象 SocketConnector ioConnector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); // 設(shè)置TCP NoDelay為true ioConnector.getDefaultConfig().getSessionConfig().setTcpNoDelay(true); // 增加一個將發(fā)送對象進行序列化以及接收字節(jié)流進行反序列化的類至filter Chain ioConnector.getFilterChain().addLast("stringserialize", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // IoHandler的實現(xiàn),以便當(dāng)mina建立連接、接收到消息后通知應(yīng)用 IoHandler handler=new IoHandlerAdapter(){ public void messageReceived(IoSession session, Object message) throws Exception { System.out.println(message); } }; // 異步建立連接 ConnectFuture connectFuture = ioConnector.connect(socketAddress,handler); // 阻塞等待連接建立完畢,如須設(shè)置連接創(chuàng)建的超時時間,可調(diào)用 SocketConnectorConfig.setConnectTimeout(以秒為單位的超時時間) connectFuture.join(); IoSession session=connectFuture.getSession(); // 發(fā)送對象 session.write(Object);
使用Mina后,客戶端的代碼變得簡單多了。
服務(wù)器端關(guān)鍵代碼如下:
// 創(chuàng)建一個線程池大小為CPU核數(shù)+1的IoAcceptor對象 final IoAcceptor acceptor=new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); acceptor.getFilterChain().addLast("stringserialize", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); IoHandler handler=new IoHandlerAdapter(){ public void messageReceived(IoSession session, Object message) throws Exception { // 接收到客戶端發(fā)送的對象 } }; // 綁定監(jiān)聽的端口以及當(dāng)有連接建立、接收到對象等事件發(fā)送時需要通知的IoHandler對象 acceptor.bind(new InetSocketAddress(port), handler);
采用Mina后,服務(wù)器端代碼無須再關(guān)注建立連接時的OP_ACCEPT的事件,同樣也無須去注冊O(shè)P_READ、OP_WRITE這些Key的事件,取而代之的是更友好地使用接口。通過這些封裝使用者可以非常方便地使用,而無須過多考慮Java NIO的用法。但Mina 2.0之前的版本中并未提供連接的管理(連接的創(chuàng)建、自動重連、連接的心跳、連接池等)、同步發(fā)送數(shù)據(jù)支持,因此在實際使用中通常還需在Mina的基礎(chǔ)上進行封裝。
在使用Mina 2.0之前的版本時,以下幾個方面值得注意:
● 使用自定義的ThreadModel
通過SocketConnectorConfig.setThreadMode(l ThreadModel.MANUAL)將線程模式改為自定義模式,否則Mina會自行啟動一個最大線程數(shù)為16個的線程池來處理具體的消息,這對于多數(shù)應(yīng)用而言都不適用,因此最好是自行控制具體處理消息的線程池。
● 合理配置IO處理線程池
在創(chuàng)建SocketAcceptor或SocketConnector時要提供一個線程池及最大的線程數(shù),也就是Mina用于IO事件處理的線程數(shù),通常建議將這個線程數(shù)配置為CPU核數(shù)+1。
● 監(jiān)聽是否成功寫入操作系統(tǒng)的發(fā)送緩沖區(qū)
在調(diào)用IoSession.write時,Mina并不確保其會成功寫入操作系統(tǒng)的發(fā)送緩沖區(qū)中(例如寫入時連接剛好斷開),為了確定是否成功,可采用如下方法:
WriteFuture writeResult=session.write(data); writeResult.addListener(new IoFutureListener() { public void operationComplete(IoFuture future){ WriteFuture wfuture=(WriteFuture)future; // 寫入成功 if(wfuture.isWritten()){ return; } // 寫入失敗,自行進行處理 } });
這對于同步請求而言特別重要,通常同步請求時都會設(shè)置一個等待響應(yīng)的超時時間,如果不去監(jiān)聽是否成功寫入的話,那么同步的請求一直要等到設(shè)定的超時時間才能返回。
● 監(jiān)聽寫超時
當(dāng)接收消息方的接收緩沖區(qū)占滿時,發(fā)送方會出現(xiàn)寫超時的現(xiàn)象,這時Mina會向外拋出WriteTimeoutException,如有必要,可在IoHandler實現(xiàn)的exceptionCaught方法里進行處理。
● 借助Mina IoSession上未發(fā)送的bytes信息實現(xiàn)流控
當(dāng)IoSession上堆積了過多未發(fā)送的byte時,會造成jvm內(nèi)存消耗過多的現(xiàn)象。因此通常要控制IoSession上堆積的未發(fā)送的byte量,此值可通過Mina IoSession的getScheduledWriteBytes來獲取,從而進行流控。
● messageReceived方法占用IO處理線程
在使用Thread.MANUAL的情況下,IOHandler里的messageReceived方法會占用Mina的IO處理線程,為了避免業(yè)務(wù)處理接收消息的速度影響IO處理性能,建議在此方法中另起線程來做業(yè)務(wù)處理。
● 序列化/反序列化過程會占用IO處理線程
由于Mina的序列化/反序列化過程是在FilterChain上做的,同樣會占據(jù)IO處理線程。Mina將同一連接上需要發(fā)送和接收的消息放在隊列中串行處理。如果序列化/反序列化過程耗時較長,就會造成同一連接上其他消息的接收或發(fā)送變慢。
● 反序列化時注意繼承CumulativeProtocolDecoder
在使用NIO的情況下,每次讀取的流并不一定完整,因此要通過繼承CumulativeProtocolDecoder來確保當(dāng)流沒讀完時,下次接著讀,這同時也要求應(yīng)用在協(xié)議頭中保持此次發(fā)送流的長度信息。
● Mina 1.1.6及以前的版本中sessionClosed可能會不被調(diào)用的bug
在某些高壓力的情況下,當(dāng)連接斷開時,Mina 1.1.6及以前的版本并不會調(diào)用IoHandler中的sessionClosed方法,這對于某些要在sessionClosed做相應(yīng)處理的應(yīng)用來說會出現(xiàn)問題,這個bug在Mina 1.1.7的版本中已修復(fù)。
除了Mina之外,JBoss Netty也是現(xiàn)在一個廣受關(guān)注的Java通信框架,其作者也是Mina的作者(Trustin Lee),據(jù)評測JBoss Netty的性能好于Mina,如讀者感興趣,可訪問http://www.jboss.org/netty來了解更多的細節(jié)。
以上兩節(jié)介紹了基于Java自身包及開源通信框架來實現(xiàn)消息方式的系統(tǒng)間通信,Java系統(tǒng)內(nèi)的通信都是以Java對象調(diào)用的方式來實現(xiàn)的,例如A a =new AImpl();a.call();,但當(dāng)系統(tǒng)變成分布式后,就無法用以上的方式直接調(diào)用了,因為在調(diào)用端并不會有AImpl這個類。這時如果通過基于以上的消息方式來做,對于開發(fā)而言就會顯得比較晦澀了,因此Java中也提供了各種各樣的支持對象方式的系統(tǒng)間通信的技術(shù),例如RMI、WebService等。同樣,在Java中也有眾多的開源框架提供了RMI、WebService的實現(xiàn)和封裝,例如Spring RMI、CXF等,下面來看看基于遠程調(diào)用方式如何實現(xiàn)系統(tǒng)間的通信。
- Cloud Analytics with Microsoft Azure
- Apache Hive Essentials
- 自主研拋機器人技術(shù)
- PHP開發(fā)手冊
- Google SketchUp for Game Design:Beginner's Guide
- Windows Server 2008 R2活動目錄內(nèi)幕
- 從零開始學(xué)SQL Server
- Windows安全指南
- 運動控制系統(tǒng)(第2版)
- EJB JPA數(shù)據(jù)庫持久層開發(fā)實踐詳解
- 系統(tǒng)建模與控制導(dǎo)論
- Microsoft 365 Mobility and Security:Exam Guide MS-101
- Kubernetes Design Patterns and Extensions
- ORACLE數(shù)據(jù)庫技術(shù)實用詳解
- 數(shù)據(jù)存儲備份與災(zāi)難恢復(fù)