程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA編程入門知識 >> hadoop client與datanode的通信協議分析

hadoop client與datanode的通信協議分析

編輯:JAVA編程入門知識

本文主要分析了hadoop客戶端read和write block的流程. 以及client和datanode通信的協議, 數據流格式等.

hadoop客戶端與namenode通信通過RPC協議, 但是client 與datanode通信並沒有使用RPC, 而是直接使用socket, 其中讀寫時的協議也不同, 本文分析了hadoop 0.20.2版本的(0.19版本也是一樣的)client與datanode通信的原理與通信協議.  另外需要強調的是0.23及以後的版本中client與datanode的通信協議有所變化, 使用了protobuf作為序列化方式.

Write block

1. 客戶端首先通過namenode.create, 向namenode請求創建文件, 然後啟動dataStreamer線程

2. client包括三個線程, main線程負責把本地數據讀入內存, 並封裝為Package對象, 放到隊列dataQueue中.

3. dataStreamer線程檢測隊列dataQueue是否有package, 如果有, 則先創建BlockOutPutStream對象(一個block創建一次, 一個block可能包括多個package), 創建的時候會和相應的datanode通信, 發送DATA_TRANSFER_HEADER信息並獲取返回. 然後創建ResponseProcessor線程, 負責接收datanode的返回ack確認信息, 並進行錯誤處理.

4. dataStreamer從dataQueue中拿出Package對象, 發送給datanode. 然後繼續循環判斷dataQueue是否有數據…..

下圖展示了write block的流程.

image

下圖是報文的格式

image

Read block

主要在BlockReader類中實現.

初始化newBlockReader時,

1. 通過傳入參數sock創建new SocketOutputStream(socket, timeout), 然後寫通信信息, 與寫block的header不大一樣.

//write the header.

out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );

out.write( DataTransferProtocol.OP_READ_BLOCK );

out.writeLong( blockId );

out.writeLong( genStamp );

out.writeLong( startOffset );

out.writeLong( len );

Text.writeString(out, clientName);

out.flush();

2. 創建輸入流 new SocketInputStream(socket, timeout)

3. 判斷返回消息 in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS

4. 根據輸入流創建checksum : DataChecksum checksum = DataChecksum.newDataChecksum( in )

5. 讀取第一個Chunk的位置: long firstChunkOffset = in.readLong()

注: 512個字節為一個chunk計算checksum(4個字節)

6. 接下來在BlockReader的read方法中讀取具體數據: result = readBuffer(buf, off, realLen)

7. 一個一個chunk的讀取

int packetLen = in.readInt();

long offsetInBlock = in.readLong();

long seqno = in.readLong();

boolean lastPacketInBlock = in.readBoolean();

int dataLen = in.readInt();

IOUtils.readFully(in, checksumBytes.array(), 0,

checksumBytes.limit());

IOUtils.readFully(in, buf, offset, chunkLen);

8. 讀取數據後checksum驗證; FSInputChecker.verifySum(chunkPos)

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved