一、先說注意事項吧:
1、Coprocessor啟動有三種方式:配置文件、shell和程序中指定,我使用的是程序指定:
static { EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest"); HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes()); family.setInMemory(true); family.setMaxVersions(1); EP_TABLE_DISCRIPTOR.addFamily(family); try { EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer"); } catch (IOException ioe) { }
2、如果客戶端連接後出現如下問題:No matching handler **** for protocol in *** region,說明jar包還沒有載入到HBaes中,確保HBase已經重啟,另外檢查代碼中addCoprocessor("ict.wde.test.RowCountServer");的類名“RowCountServer”是否寫正確了
二、說下步驟
2.1編寫服務端代碼:
1)接口類(固定格式)
package ict.wde.test; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import java.io.File; import java.io.IOException; /** * Created by Michael on 2015/6/22. */ public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol { public long getRowCount() throws IOException; public long getRowCount(Filter filter) throws IOException; public String getStr() throws IOException; //public long getKeyValue() throws IOException; }2)真正起作用的類
package ict.wde.test; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import java.io.IOException; /** * Created by Michael on 2015/6/27. */ public class RowCountServer implements RowCountProtocol { @Override public void start(CoprocessorEnvironment env) throws IOException { } @Override public void stop(CoprocessorEnvironment env) throws IOException { } @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { return new ProtocolSignature(3, null); } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 3; } @Override public long getRowCount() throws IOException { return this.getRowCount(new FirstKeyOnlyFilter()); } @Override public long getRowCount(Filter filter) throws IOException { return this.getRowCount(filter, false); } @Override public String getStr() throws IOException { String name = "Hello Doctor Michael Zhang, again!"; return name; } // @Override // public long getKeyValueCount() { // return 0; // } public long getRowCount(Filter filter, boolean countKeyValue) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(1); if (filter != null) { scan.setFilter(filter); } return 1; } }
2.2客戶端代碼
import ict.wde.test.RowCountProtocol; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.filter.Filter; import java.io.IOException; /** * Created by Michael on 2015/6/30. */ public class EndpointTestClient { private final HTableInterface table; private final Configuration conf; private final RowCountProtocol server; private static final HTableDescriptor EP_TABLE_DISCRIPTOR; static { EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest"); HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes()); family.setInMemory(true); family.setMaxVersions(1); EP_TABLE_DISCRIPTOR.addFamily(family); try { EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer"); } catch (IOException ioe) { } } public EndpointTestClient(Configuration config) throws IOException { conf = config; table = initTidTable(); server = table.coprocessorProxy(RowCountProtocol.class, "0".getBytes()); } private HTableInterface initTidTable() throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists("epTest")) { admin.createTable(EP_TABLE_DISCRIPTOR); } admin.close(); return new HTable(conf, "epTest"); } public String getStr() throws IOException { return server.getStr(); } }啟動類:
import org.apache.hadoop.conf.Configuration; import java.io.IOException; /** * Created by Michael on 2015/6/22. */ public class EndpointExample { // private final HTableInterface table; // private static final Configuration conf; // private static final HTableDescriptor EP_TABLE_DISCRIPTOR; // // static { // conf = new Configuration(); // conf.set("hbase.zookeeper.quorum", "ccf04:2181"); // // EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest"); // HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes()); // family.setInMemory(true); // family.setMaxVersions(1); // EP_TABLE_DISCRIPTOR.addFamily(family); // try { // EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer"); // } catch (IOException ioe) { // // } // // table = initTidTable(); // } // // private HTableInterface initTidTable() throws IOException { // HBaseAdmin admin = new HBaseAdmin(conf); // if (!admin.tableExists("epTest")) { // admin.createTable(EP_TABLE_DISCRIPTOR); // } // admin.close(); // return new HTable(conf, "epTest"); // } public static void main(String[] agrs) throws IOException { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "ccf04:2181"); EndpointTestClient client = new EndpointTestClient(conf); String name = client.getStr(); System.out.println(name); } }