程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 關於JMS Message Pending的問題

關於JMS Message Pending的問題

編輯:關於JAVA

前段時間,有同事跟我說客戶那邊有很多狀態為receive的message,這些message只有在JMS Server或 weblogic Server充啟之後才能被消費。經過調查後,這個問題可能是weblogic的一個bug,當然也不排除 跟具體環境有關的可能。下面我們來看看問題的根本原因是什麼,這種分析有助我們更進一步理解 weblogic JMS的實現。

首先我們看一下什麼是receive,receive表示一個message已經被consumer消費,但服務端還沒有關於 這個message的ack,所以消息不能從queue中刪除, 由於queue中的消息是point-2-point的,所以某個消 息被標為receive後,這個消息自然不能被其他consumer消費。那麼這個ack由誰負責發送給Server呢,什 麼時候發送呢?這些都由我們創建JMS Session時使用的Ack_mode決定,典型的ack-mode有如下兩種:

auto-ack: 自動響應模式,consumer.receive()調用後,如果服務器端發現有可用的message,消息返 回到客戶端JMS實現層,在消息返回給客戶前,由weblogic client (JMSSession.getAsyncMessageForConsumer(),異步接受,比如MessageListener,或 JMSSession.receiveMessage(),同步接受)層實現直接調用acknowledge()通知服務器端,服務器端收到 ack後,它會負責負責將處於receive的message從物理queue中刪除。

client-ack: 客戶響應模式,consumer.receive()調用後,客戶端收到消息後,客戶端程序決定什麼 時候發送ack,可以在消息後立即發送,也可以在消息處理成功後發送,ack的發送通過 message.acknowledge()實現。後面的過程和auto-ack相同。

初看這個問題,感覺是ack沒有收到,那麼什麼情況下會出現ack丟失呢?網絡問題? 那麼客戶端或服 務器端的server log應該能夠看到異常,客戶堅持說沒有任何異常。有點不可思議,要了客戶的代碼,他 們沒有代碼,實際上他們的應用是基於Spring Framework的,通過簡單的配置來實現他們的業務需要,看 了下Spring的相關代碼,客戶之所以說沒有異常,因為Spring catch了服務器端返回的JMSException,並 吃掉了這個異常(即異常沒有打印出來),這個異常輸出是可以通過Spring的配置來實現。客戶配置後, 給了我具體的異常,如下:

java.lang.IllegalArgumentException: Delay is negative.
        at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:388)
         at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:340)
         at weblogic.messaging.kernel.internal.ReceiveRequestImpl.<init> (ReceiveRequestImp l.java:98)
        at weblogic.messaging.kernel.internal.QueueImpl.receive(QueueImpl.java:820)
        at weblogic.jms.backend.BEConsumerImpl.blockingReceiveStart(BEConsumerImpl.java:1 172)
         at weblogic.jms.backend.BEConsumerImpl.receive(BEConsumerImpl.java:1383)
         at weblogic.jms.backend.BEConsumerImpl.invoke(BEConsumerImpl.java:1088)
         at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
        at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsyncInternal (DispatcherI mpl.java:129)
        at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsync(DispatcherImpl.java :112)
         at weblogic.messaging.dispatcher.Request.dispatchAsync(Request.java:1046)
         at weblogic.jms.dispatcher.Request.dispatchAsync(Request.java:72)
         at weblogic.jms.frontend.FEConsumer.receive(FEConsumer.java:557)
        at weblogic.jms.frontend.FEConsumer.invoke(FEConsumer.java:806)
        at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
         at weblogic.messaging.dispatcher.DispatcherServerRef.invoke(DispatcherServerRef.j ava:276)
        at weblogic.messaging.dispatcher.DispatcherServerRef.handleRequest(DispatcherServ erRef.java:141)
        at weblogic.messaging.dispatcher.DispatcherServerRef.access$000(DispatcherServerR ef.java:36)
        at weblogic.messaging.dispatcher.DispatcherServerRef$2.run (DispatcherServerRef.ja va:112)
        at weblogic.work.ExecuteThread.execute (ExecuteThread.java:209)
        at weblogic.work.ExecuteThread.run (ExecuteThread.java:181)

現在我們看一下Weblogic JMS的receive的基本流程,看看這個exception為什麼會被拋出來。

JMSConsumer.receive(long timewait),客戶端發起receive請求,其中timewait可有可無,不做指定 的話,說明沒有可用消息到達的話,我們會一直等下去。如要不作等待的話,可以使用receiveNoWait() 。receive()中會檢查timeout值,如果沒有指定timeout,那麼Long.maxValue會被設定成這個timeout, 如果timeout小於0,客戶端將會收到Invalid Timeout異常,接下來請求會被delegate到JMSSession。

|

JMSSession.receiveMessage(consumer,timeout),這裡timeout會被重新計算,然後我們會創建一個 FEConsumerReceiveRequest對象。這個對象中包含計算後的timeout,計算後的timeout應該是個非負值( 上面的異常就是這裡的計算導致的,至於為什麼客戶指定的timeout為1,計算後的timeout變成了負數, 從而導致上面的異常,從代碼層面,看不出有什麼問題)。FEConsumerReceiveRequest對象創建後,由JMS FrontEnd Dispatcher負責把請求交給後端的JMS Server,Dispatcher是Weblogic JMS中用於負責請求傳 輸的,它依賴於RJVM layer,這裡不做贅述。

|

RJVM layer, 負責RMI socket層的數據發送

|

FEConsumer.receive(invocableRequest),RJVM層處理完socket數據後,請求會被轉給JMSConsumer, JMSConsumer通過狀態機(state machine)來控制請求處理,沒有過多的邏輯,它會基於收到的receive request創建一個BEConsumerReceiveRequest對象,然後把這個請求通過JMS BackEnd Dispatcher轉發給 BEConsumerImpl。之所以存在FrontEnd /BackEnd Dispatcher,主要考慮到處理請求的server和queue所 在的不是同一server。

|

BEConsumerImpl.receive(request),request進入BEConsumerImpl後,它也通過state machine來控制 請求處理,下面兩個方法在調用過程中被順序調用,

BEConsumerImpl.blockingReceiveStart(request),這裡首先檢查timeout值,然後調用 QueueImpl.receive(...)從queue中獲取message,receive()的具體參數如下,包括timeout, expression(即檢查條件,我們定義的message selector就在其中)。

BEConsumerImpl.blockingReceiveProcessMessage(request)

BEConsumerImpl.blockingReceiveComplete(request)

|

QueueImpl.receive(expression,count,acknowledge,owner,timeout,started,userBlob),這裡除了 狀態檢查,沒有其他邏輯,它會根據傳進來的參數,初始化一個ReceiveRequestImpl對象。

|

ReceiveRequestImpl.new(),這個new代表ReceiveRequestImpl的構造函數。

|

QueueImpl.get(...),如果timeout = 0,即如果客戶調用的是receiveNoWait的話,我們直接去通過 QueueImpl.get(...),如果沒有match的message,那麼直接將新建request的result設定為no result,否 則將match的message設定為result。

QueueImpl.addReader(receiveRequestImpl),如果timeout != 0,我們會在 ReceiveRequestImpl.start()中調用QueueImpl.addReader(),addReader()中同樣會通過QueueImpl.get ()檢查是否有match的message,如果找到相應的message,我們會把message reference狀態改為receive。

TimerManagerImpl.schedule(timeout),如果QueueImpl.addReader()中的QueueImpl.get()沒有找到 相應的message,我們需要等待(依據客戶指定的timeout),這個等待通過timer去實現,如下:

timer = timerManager.schedule(this, timeout);

指定的timeout到達後,如果和沒有可用的message,no result將被返回。從上面的異常堆棧來看,問 題就出在這裡,如果timeout為負數,timerMangerImpl在啟動trigger的時候,會拋出如下的 runtimeException,

java.lang.IllegalArgumentException: Delay is negative.

也許你會疑問,這沒什麼問題吧,timerTrigger只有在沒有message的時候才會被schedule,既然沒有 message,那有談何狀態receive message?沒錯,起timerTrigger之前我們的確沒有修改message狀態, 但你注意到沒有,我們在起timerTrigger前,把receiveRequestImpl加入到QueueImpl去了,但我們在碰 到IllegalArgumentException時並沒有把這個receiveRequestImpl從QueueImpl中刪除,問題就在這裡。

 1   synchronized void addReader(Reader reader) throws KernelException {
  2
 3     List list = get(..);
 4     int newCount;
 5     if (list != null) {
 6
 7     } else {
 8       reader.incrementReserveCount(- reservedCount);
 9       newCount = reader.getCount();
10     }
11      if (newCount > 0) {
12       logger.debug("Adding consumer to reader list");
13       readerList.add(reader);
14     }
15   }

如果我們不把receiveRequestImpl從QueueImpl的readerList中刪除,那麼如果過一會有message sender發送一條和我們上述請求match的message到這個queue。weblogic收到這個message後,它會檢查 readerList,如果這個message match某個reader,我們會把message狀態改成receive,當由於 IllegalArgumentException,客戶端收到它的時候,客戶端會close JMSSession,也就是說這個消息雖然 有reader,但無法deliver到客戶端。

我們再來看看Weblogic JMS sender的相關流程,

QueueImpl.messageSendComplete(),消息發送過程結束後(比如涉及store的話,消息此時已經被存儲 ),到這一步的話,我們會調整系統接受的消息數,然後通過makeMessageAvailable()把消息標成 visiable或deliver給正在等待的reader。

QueueImpl.makeMessageAvailable(),它會直接調用match()去檢查readerList中是否存在正在等待它 的reader。

QueueImpl.match(),它通過finderReader()從readerList中檢查reader,如果有符合條件的reader, 它會把這個message標志為receive,同時把這個message挪到pending list中去。

前面我們說了,雖然reader還在,但與之對應的JMSConsumer已經被close,所以這個消息根本就無法 deliver出去,自然就不會有ack從客戶端返回了,這個消息也只能一直pending了。

這個問題可能是個bug,目前還在確認之中,但我同時也在和客戶溝通,可能跟他的環境有一定關系( 比如NTP時間同步問題)。

雖然這個問題能引發message pending,但並不是所有的message pending問題都是由它應起的。網絡 問題也能引發類似問題,具體問題具體分析,主要的參考客戶端的JMSException。位於receive後的狀態 是transation,也就是如果發現狀態為transaction的message的話,一般而言,是這個消息要麼就是發送 還沒結束,要麼就是消息正處於一個delete的事務單元中,這裡就不再一一羅列了。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved