Protobuf序列化的字節流數據是不能自描述的,當我們通過socket把數據發送到Client時,Client必須知道發送的是什麼類型的數據,才能正確的反序列化它。這嚴重影響限制了C/S功能的實現,不解決的話信道事實上只能傳輸一種類型的數據。本文講解一下我用的解決辦法,雖然我覺得應該有官方的實現更合理,即原生支持Protobuf的自描述。
(在金融領域,有一個叫FAST的協議,基本原理和Protobuf相同,並且有更高的壓縮率,並且序列化後的字節流是自描述的,可以自動反序列化為對應的模板的數據(模板相當於.proto文件),但是時間效率比protobuf差,大家也可以關注一下。)
首先,介紹另外一種實現,在protobuf官方wiki中描述的一種workaround,通過定義一種用於自描述的類型:
message SelfDescribingMessage { // Set of .proto files which define the type. required FileDescriptorSet proto_files = 1; // Name of the message type. Must be defined by one of the files in // proto_files. required string type_name = 2; // The message data. required bytes message_data = 3; }
(參考:https://developers.google.com/protocol-buffers/docs/techniques#self-description)
把實際要傳輸的類型的字節數組放在message_data字段中,用proto_files和type_name字段來描述它的proto文件和類型。這樣,信道上傳輸的都是SelfDescribingMessage類型,但是其上的負載可以是任何類型的數據。
我沒有試過這種方式。我不太願意使用這種方式的原因是,很顯然,這樣做需要進行2次序列化和2次反序列化,byte數組也要被創建2次。如果對應時延和性能敏感的系統,這樣做不夠好。
今天主要要介紹的方案。在protobuf序列化的前面,加上一個自定義的頭,這個頭包含序列化的長度和它的類型。在解壓的時候根據包頭來反序列化。
假設socket上要傳輸2個類型的數據,股票行情信息和期權行情信息:
股票的.proto定義:
syntax = "proto3"; package test.model.protobuf; option java_package = "test.model.protobuf"; message StockTick { string stockId = 1; int price = 2; }
期權的.proto定義:
syntax = "proto3"; package test.model.protobuf; option java_package = "test.model.protobuf"; message OptionTick { string optionId = 1; string securityId = 2; int price = 3; }
netty4官方事實上已經實現了protobuf的編解碼的插件,但是只能用於傳輸單一類型的protobuf序列化。我這裡截取一段netty代碼,熟悉netty的同學馬上就能理解它的作用:
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(StockTickOuterClass.StockTick.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new CustomProtoServerHandler()); }
看以上代碼高亮部分,netty4官方的編解碼器必須指定單一的protobuf類型才行。具體每個類的作用:
ProtobufEncoder:用於對Probuf類型序列化。
ProtobufVarint32LengthFieldPrepender:用於在序列化的字節數組前加上一個簡單的包頭,只包含序列化的字節長度。
ProtobufVarint32FrameDecoder:用於decode前解決半包和粘包問題(利用包頭中的包含數組長度來識別半包粘包)
ProtobufDecoder:反序列化指定的Probuf字節數組為protobuf類型。
我們可以參考以上官方的編解碼代碼,將實現我們客戶化的protobuf編解碼插件,但是要支持多種不同類型protobuf數據在一個socket上傳輸:
import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 參考ProtobufVarint32LengthFieldPrepender 和 ProtobufEncoder */ @Sharable public class CustomProtobufEncoder extends MessageToByteEncoder<MessageLite> { HangqingEncoder hangqingEncoder; public CustomProtobufEncoder(HangqingEncoder hangqingEncoder) { this.hangqingEncoder = hangqingEncoder; } @Override protected void encode( ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception { byte[] body = msg.toByteArray(); byte[] header = encodeHeader(msg, (short)body.length); out.writeBytes(header); out.writeBytes(body); return; } private byte[] encodeHeader(MessageLite msg, short bodyLength) { byte messageType = 0x0f; if (msg instanceof StockTickOuterClass.StockTick) { messageType = 0x00; } else if (msg instanceof OptionTickOuterClass.OptionTick) { messageType = 0x01; } byte[] header = new byte[4]; header[0] = (byte) (bodyLength & 0xff); header[1] = (byte) ((bodyLength >> 8) & 0xff); header[2] = 0; // 保留字段 header[3] = messageType; return header; } }
CustomProtobufEncoder序列化傳入的protobuf類型,並且為它創建了一個4個字節的包頭,格式如下
body長度(low) body長度
其中的encodeHeader方法具體的實現要根據你要傳輸哪些protobuf類型來修改代碼,也可以稍加設計避免使用太多的if…else。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import com.google.protobuf.MessageLite; /** * 參考ProtobufVarint32FrameDecoder 和 ProtobufDecoder */ public class CustomProtobufDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() > 4) { // 如果可讀長度小於包頭長度,退出。 in.markReaderIndex(); // 獲取包頭中的body長度 byte low = in.readByte(); byte high = in.readByte(); short s0 = (short) (low & 0xff); short s1 = (short) (high & 0xff); s1 <<= 8; short length = (short) (s0 | s1); // 獲取包頭中的protobuf類型 in.readByte(); byte dataType = in.readByte(); // 如果可讀長度小於body長度,恢復讀指針,退出。 if (in.readableBytes() < length) { in.resetReaderIndex(); return; } // 讀取body ByteBuf bodyByteBuf = in.readBytes(length); byte[] array; int offset; int readableLen= bodyByteBuf.readableBytes(); if (bodyByteBuf.hasArray()) { array = bodyByteBuf.array(); offset = bodyByteBuf.arrayOffset() + bodyByteBuf.readerIndex(); } else { array = new byte[readableLen]; bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen); offset = 0; } //反序列化 MessageLite result = decodeBody(dataType, array, offset, readableLen); out.add(result); } } public MessageLite decodeBody(byte dataType, byte[] array, int offset, int length) throws Exception { if (dataType == 0x00) { return StockTickOuterClass.StockTick.getDefaultInstance(). getParserForType().parseFrom(array, offset, length); } else if (dataType == 0x01) { return OptionTickOuterClass.OptionTick.getDefaultInstance(). getParserForType().parseFrom(array, offset, length); } return null; // or throw exception } }
CustomProtobufDecoder實現了2個功能,1)通過包頭中的長度信息來解決半包和粘包。 2)把消息body反序列化為對應的protobuf類型(根據包頭中的類型信息)。
其中的decodeBody方法具體的實現要根據你要傳輸哪些protobuf類型來修改代碼,也可以稍加設計避免使用太多的if…else。
如何把我們自定義的編解碼用於netty Server:
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder",new CustomProtobufDecoder()); pipeline.addLast("encoder",new CustomProtobufEncoder()); pipeline.addLast(new CustomProtoServerHandler()); }
Binhua Liu原創文章,轉載請注明原地址http://www.cnblogs.com/Binhua-Liu/p/5577622.html