簡要:開發中,常常因為需要我們要認為修改消費者實例對kafka某個主題消費的偏移量。具體如何修改?為什麼可行?其實很容易,有時候只要我們換一種方式思考,如果我自己實現kafka消費者,我該如何讓我們的消費者代碼如何控制對某一個主題消費,以及我們該如何實現不同消費者組可以消費同一個主題的同一條消息,一個消費組下不同消費者消費同一個主題的不同消息。如果讓你實現該框架該如何實現?
這裡我演示實驗storm的kafkaspout來進行消費,kafkaspout裡面使用的低級api,所以他在zookeeper中存儲數據的結構和我們使用kafka的java客戶端的高級api在zookeeper中的存儲結構是有所不同的。關於kafka的java客戶端的高級api在zookeeper中的存儲結構的構造可以看這篇文章:apache kafka系列之在zookeeper中存儲結構 。原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6212913.html
可接網站開發,java開發。
新浪微博:intsmaze劉洋洋哥
微信:intsmaze
創建一個kafka主題名為intsmazX,指定分區數為3.
使用kafkaspout創建該主題的消費者實例(指定元數據存放zookeeper中的路徑為/kafka-offset,指定實例id為onetest),啟動storm可以觀察到如下信息:
INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_0 --> null //這個地方會到zookeeper中該目錄下讀取,看是否存儲有對該分區的消費信息 INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset//沒有分區信息,這個時候就會直接到kafka的broker中得到該分區的最大偏移量 INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2 INFO storm.kafka.PartitionManager - Starting Kafka hadoop002.icccuat.com:0 from offset 0 INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_1 --> null INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2 INFO storm.kafka.PartitionManager - Starting Kafka hadoop003.icccuat.com:1 from offset 0 INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_2 --> null INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2 INFO storm.kafka.PartitionManager - Starting Kafka hadoop001.icccuat.com:2 from offset 0
這個時候在zookeeper的/kafka-offset下沒有生成名為onetest的目錄,這是因為對應的intsmazeX還沒有數據產生。
我們使用kafka消費者生產3條數據,然後去查看zookeeper中對應目錄下的信息:
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":1,"broker":{"host":"hadoop003.icccuat.com","port":6667},"topic":"intsmazeX"} {"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":2,"broker":{"host":"hadoop001.icccuat.com","port":6667},"topic":"intsmazeX"} {"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
30秒(kafkaspout中設置提交zookeeper消費偏移量時間為30秒)之後,可以看到,會記錄該實例對每一個分區消費的偏移量為1.
殺掉該拓撲,這個時候我們再向intsmazeX主題生產6條數據,這個時候,broker中該主題每個分區的最大偏移量為3了。
然後我們修改/kafka-offset/onttest/下每一個分區的offset為3.
這個時候,我們再次部署該拓撲,可以發現拓撲沒有消費剛剛產生的6條消息。再發送3條消息,拓撲就會立馬消費這三條消息。
殺掉該拓撲,這個時候該拓撲消費者實例對每個分區的消費偏移量就是4了,然後我們把offset修改為6,然後啟動拓撲,這個時候broker中該主題每個分區的最大偏移量為4並不是6,讓我們看看,消費分區的偏移量大於主題分區當前偏移量會有什麼樣的情況出現。
WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2] WARN storm.kafka.PartitionManager - Using new offset: 4 WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2] WARN storm.kafka.PartitionManager - Using new offset: 4 WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2] WARN storm.kafka.PartitionManager - Using new offset: 4
這個時候我們看到,消費者的分區偏移量的記錄將會自動同步為每一個分區當前最大的偏移量了,kafkaspout會先用偏移量6去拉去,發現拉去不到,就到broker中獲取該主題對應分區的最大偏移量。。
{"topology":{"id":"818ab9cc-d56f-454f-88b2-06dd830d54c1","name":"intsmaze-20161222-150006"},"offset":4,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"} ....
把offset的偏移量設置為7000,一樣在拓撲啟動後,會更新為每個分區的最大偏移量。
重新部署一個拓撲消費該主題,設置該拓撲的id為twotest,這個時候啟動拓撲,我們發現,並沒有啟動拓撲前的消息數據,這是因為,拓撲啟動後,要獲得偏移量,而這個偏移量只能是當前主題每個分區的最大偏移量(因為分區的偏移量是遞增,且 分區的數據會定時刪除的,所以無法知道當前分區當前最開始的偏移量。)
Refreshing partition manager connections Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] Deleted partition managers: [] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] Read partition information from: /kafka-offset/twotest/partition_0 --> null No partition information found, using configuration to determine offset Starting Kafka hadoop002.icccuat.com:0 from offset 7 Read partition information from: /kafka-offset/twotest/partition_1 --> null No partition information found, using configuration to determine offset Starting Kafka hadoop003.icccuat.com:1 from offset 7 Read partition information from: /kafka-offset/twotest/partition_2 --> null No partition information found, using configuration to determine offset Starting Kafka hadoop001.icccuat.com:2 from offset 7 Finished refreshing Refreshing partition manager connections Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] Deleted partition managers: [] New partition managers: [] Finished refreshing Refreshing partition manager connections Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] Deleted partition managers: [] New partition managers: [] Finished refreshing Refreshing partition manager connections Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] Deleted partition managers: [] New partition managers: [] Finished refreshing Refreshing partition manager connections Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] Deleted partition managers: [] New partition managers: []
發送三條信息,查看該實例目錄如下。
{"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"offset":8,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
再啟動一個拓撲,實例為twotest不變:
[INFO] Task [1/2] Refreshing partition manager connections [INFO] Task [2/2] Refreshing partition manager connections [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} [INFO] Task [1/2] assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] [INFO] Task [2/2] assigned [Partition{host=hadoop003.icccuat.com:6667, partition=1}] [INFO] Task [1/2] Deleted partition managers: [] [INFO] Task [2/2] Deleted partition managers: [] [INFO] Task [1/2] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop001.icccuat.com:6667, partition=2}] [INFO] Task [2/2] New partition managers: [Partition{host=hadoop003.icccuat.com:6667, partition=1}] [INFO] Read partition information from: /kafka-offset/twotest/partition_0 --> {"topic":"intsmazeX","partition":0,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop002.icccuat.com"},"offset":8} [INFO] Read partition information from: /kafka-offset/twotest/partition_1 --> {"topic":"intsmazeX","partition":1,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop003.icccuat.com"},"offset":8} [INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af [INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af [INFO] Starting Kafka hadoop002.icccuat.com:0 from offset 8 [INFO] Starting Kafka hadoop003.icccuat.com:1 from offset 8 [INFO] Task [2/2] Finished refreshing [INFO] Read partition information from: /kafka-offset/twotest/partition_2 --> {"topic":"intsmazeX","partition":2,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop001.icccuat.com"},"offset":8} [INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af [INFO] Starting Kafka hadoop001.icccuat.com:2 from offset 8 [INFO] Task [1/2] Finished refreshing [INFO] Task [2/2] Refreshing partition manager connections [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}} [INFO] Task [2/2] assigned [Partition{host=hadoop003.icccuat.com:6667, partition=1}] [INFO] Task [1/2] Refreshing partition manager connections [INFO] Task [2/2] Deleted partition managers: [] [INFO] Task [2/2] New partition managers: []
{"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"offset":8,"partition":1,"broker":{"host":"hadoop003.icccuat.com","port":6667},"topic":"intsmazeX"}
然後發送消息,我們可以看到兩個拓撲都會運行的,因為兩個拓撲共用一個元數據信息。
這個過程有些坑要注意: 1:在使用kafka-spout的時候,我們要指定該kafka消費者在zookeeper中存儲偏移量的地址,這裡是/kafka-offset。同時指定該kafka對應的實例id這裡是onetest.kafkapout和kafka客戶端代碼不一樣,它沒有消費組的概念,也不能這樣說吧,只能說數據的存放不一樣,不同的實例代表 不同的消費組。 2:修改某一個kafkaspout實例的時候,我們一定要把該id的拓撲關閉掉,我們在項目中遇到一個大坑,就是不熟一樣的kafkaspout它的id是相同的,也就是共用同一個目錄,那麼如果我們沒有下線這些拓撲任務,而只是把 這些拓撲任務設置為不活躍狀態,那麼我們修改zookeeper中偏移量後,再把拓撲設置為活躍狀態後,會發現修改無效,offset還是變為以前的offset了,這是因為拓撲沒有殺掉,它的運行程序中也會保存當前消費的偏移量,會定時更新的。 3:我們在殺拓撲時,要設置時間,因為拓撲默認30秒向zookeeper提交一下偏移量信息。 修改偏移量有兩種,一種就是在部署拓撲前,先修改zookeeper中的偏移量,或者直接刪除zookeeper中的對應實例的目錄。這樣從新部署都會從最新的偏移量開始運行。
第一,一個消費組創建後,這個消費組的創建是客戶端完成的,它把消費組名會存到zookeeper中。 第二,消費者被創建以後,會把自己的名字存到zookeeper中所屬消費組名的文件夾下面。 第三,消費者被創建了,我們當然要指定他可以消費主體的那一條消息,這個時候應該是kafka的broker進行控制了,它應該會不斷監聽zookeeper中所有消費組下的消費者的變得,當發現有消費者增加或刪除就知道要進行重新分配,這個時候,它應該計算分配好一會在每一個消費者文件中寫上他可以消費的分區號和該分區的偏移量。 第四,broker怎麼知道每一個主題的分區情況,其實broker創建一條主題的時候指定了分區和副本數量,這個時候會在zookeeper中生成一個主題文件夾,文件夾下每一個文件代表一個分區,且每一個文件的內容就是這個分區的位置和副本位置等信息,關於該分區消費偏移量應該不會記錄在裡面,因為每一個消費組中消費者消費該分區偏移量是不同的。 第五,這個時候我可以猜想到,應該是消費者文件中記錄著它已經消費的偏移量,當消費者對消費分區進行重新分配時,偏移量也要進行轉移,不然重新分配後,又要消費之前已經消費過的數據。但是這也有問題:因為消費者被刪除它消費的偏移量就刪掉了,它之前消費的分區分給其他人,其他人也不知道從哪裡開始消費。 看kafkazookeeper存儲結構我們可以發現: 消費者(群)文件夾,這個文件夾夾下面是各個消費組文件夾,每一個文件夾代表一個消費組信息。 消費組文件夾下面有三個文件夾,一個是存儲該消費組中的每一個消費者,每一消費者就是一個文件,另一個文件夾存儲的這個消費組可以消費的主題的文件夾,每一個文件夾代表他可以消費哪些主題。每一個主題文件夾下面就是該主題的分區,每一個分區文件就記錄被該消費組消費的偏移量。 這樣就可以保證,當消費者增加或刪除後,它所消費分區的偏移量還在,我們進行重新分配時,可以保證分配好的分區,消費者不會重新消費,而直到該分區被消費的位置。 但是我們怎麼知道哪一個消費者消費哪一個分區,把分區好存儲到消費者文件中,這樣貌似也可以,因為消費者刪除後,它消費的分區會丟失也沒有關系,broker監聽消費者數量變化,一變化就對他們進行重新分配。(我現在能想到的好處就是,如果現有系統中存在消費者沒有消費數據,那麼我們刪掉該消費者,但是我們只是監聽到了消費者變化,並不知道是否有分區隨著消費者的刪掉而被停止消費,仍然會進行重新消費,其實這種情況是沒有必要的),那麼我們換一個方法吧。上面的猜想錯了,一個消費組中的消費者只能消費一個主題的一條消息,其實就是一個主題的分區只能對應一個消費組中的一個消費者,換過來想,一個消費組可以消費多條主題,應該是可以的,那麼一個消費組中的消費者就可以消費多條主題的中的一個分區。 或者是一個消費組可以消費多個主題,還是是一個消費者只能消費一個主題的一個分區。 經過我測試發現,一個消費者消費多個主題是可以實現的。 一個消費者消費多條主題的一個分區如何實現? 還有最後一個文件,該文件下面也是多個主題的文件夾,每個文件夾下面就是該文件的一個一個分區,分區我應該讓他記錄消費它的消費者的名稱。
下面的是我當初自己學習kafka時,思考自己寫kafka時,該如何解決kafka的消費者和消費組之間對數據消費時的判斷。雖然框架極大簡化了我們的生產力,但是作為一個有
思想的程序員,我們應該換一個角度去思考一個框架,而不應該再是這個框架有什麼功能,我們用這個框架的這個功能,這樣下去,我們就會一直認為這個框架好厲害,卻不明白其內部實現方式。
如果自己要實現kafka功能: