程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> HBase Java編程示例

HBase Java編程示例

編輯:JAVA綜合教程

HBase Java編程示例


HelloWorld.zip
  1. package elementary;

  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.TimeUnit;

  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.hbase.Cell;
  13. import org.apache.hadoop.hbase.HBaseConfiguration;
  14. import org.apache.hadoop.hbase.HColumnDescriptor;
  15. import org.apache.hadoop.hbase.HTableDescriptor;
  16. import org.apache.hadoop.hbase.MasterNotRunningException;
  17. import org.apache.hadoop.hbase.TableName;
  18. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  19. import org.apache.hadoop.hbase.client.Delete;
  20. import org.apache.hadoop.hbase.client.Get;
  21. import org.apache.hadoop.hbase.client.Admin;
  22. import org.apache.hadoop.hbase.client.BufferedMutator;
  23. import org.apache.hadoop.hbase.client.BufferedMutatorParams;
  24. import org.apache.hadoop.hbase.client.Connection;
  25. import org.apache.hadoop.hbase.client.ConnectionFactory;
  26. import org.apache.hadoop.hbase.client.Table;
  27. import org.apache.hadoop.hbase.client.Put;
  28. import org.apache.hadoop.hbase.client.Result;
  29. import org.apache.hadoop.hbase.client.ResultScanner;
  30. import org.apache.hadoop.hbase.client.Scan;
  31. import org.apache.hadoop.hbase.util.Bytes;
  32. import org.apache.hadoop.util.ThreadUtil;

  33. public class HelloWorld {
  34. private static Configuration conf = null;
  35. private static Connection conn = null;
  36. private static Admin admin = null;
  37. public static AtomicInteger count = new AtomicInteger();

  38. /**
  39. * 初始化配置
  40. */
  41. static {
  42. conf = HBaseConfiguration.create();
  43. //如果沒有配置文件,一定要記得手動宣告

  44. conf.set("hbase.zookeeper.quorum", "10.148.137.143");
  45. conf.set("hbase.zookeeper.property.clientPort", "2181");
  46. }

  47. static {
  48. try {
  49. conn = ConnectionFactory.createConnection();
  50. admin = conn.getAdmin();
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }

  55. static public class MyThread extends Thread
  56. {
  57. int _start;
  58. String _tablename;
  59. Connection conn;
  60. //BufferedMutator table;
  61. Table table;

  62. public MyThread(int start, String tablename) {
  63. _start = start;
  64. _tablename = tablename;
  65. }

  66. public void run() {
  67. String tablename = _tablename;
  68. Thread current = Thread.currentThread();
  69. long thread_id = current.getId();
  70. System.out.printf("thread[%d] run\n", thread_id);

  71. try {
  72. conn = ConnectionFactory.createConnection();
  73. //BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
  74. //params.writeBufferSize(1024 * 4);
  75. //table = conn.getBufferedMutator(params);
  76. table = conn.getTable(TableName.valueOf(tablename));

  77. for (int j=_start; j<100; ++j) {
  78. for (int i=0; i<10000000; ++i) {
  79. // zkb_0_0
  80. String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i);
  81. Put put = new Put(Bytes.toBytes(zkb));
  82. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0)));
  83. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1)));
  84. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2)));
  85. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3)));
  86. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4)));
  87. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5)));
  88. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6)));
  89. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7)));
  90. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8)));
  91. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9)));
  92. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10)));
  93. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11)));
  94. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12)));
  95. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13)));
  96. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14)));
  97. //table.mutate(put);
  98. table.put(put);

  99. int m = HelloWorld.count.incrementAndGet();
  100. if (m % 10000 == 0) {
  101. Date dt = new Date();
  102. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa");
  103. String now = sdf.format(dt);
  104. System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i);
  105. }
  106. }
  107. }

  108. System.out.printf("thread[%d] over\n", thread_id);
  109. }
  110. catch (Exception e) {
  111. e.printStackTrace();
  112. }
  113. }
  114. }

  115. /**
  116. * 建立表格
  117. * @param tablename
  118. * @param cfs
  119. */
  120. public static void createTable(String tablename, String[] cfs){
  121. try {
  122. if (admin.tableExists(TableName.valueOf(tablename))) {
  123. System.out.println("table already exists!");
  124. } else {
  125. HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
  126. for (int i = 0; i < cfs.length; i++) {
  127. HColumnDescriptor desc = new HColumnDescriptor(cfs[i]);
  128. desc.setMaxVersions(3650);
  129. tableDesc.addFamily(desc);
  130. }

  131. byte[][] splitKeys = new byte[][] {
  132. Bytes.toBytes("zkb_0_0"),
  133. Bytes.toBytes("zkb_10_0"),
  134. Bytes.toBytes("zkb_20_0"),
  135. Bytes.toBytes("zkb_30_0"),
  136. Bytes.toBytes("zkb_40_0"),
  137. Bytes.toBytes("zkb_50_0"),
  138. Bytes.toBytes("zkb_60_0"),
  139. Bytes.toBytes("zkb_70_0"),
  140. Bytes.toBytes("zkb_80_0"),
  141. Bytes.toBytes("zkb_90_0"),
  142. Bytes.toBytes("zkb_100_0")
  143. };
  144. admin.createTable(tableDesc, splitKeys);
  145. admin.close();
  146. System.out.println("create table " + tablename + " ok.");
  147. }
  148. } catch (MasterNotRunningException e) {
  149. e.printStackTrace();
  150. } catch (ZooKeeperConnectionException e) {
  151. e.printStackTrace();
  152. } catch (IOException e) {
  153. e.printStackTrace();
  154. }
  155. }

  156. /**
  157. * 刪除表格
  158. * @param tablename
  159. */
  160. public static void deleteTable(String tablename){
  161. try {
  162. //Connection conn = ConnectionFactory.createConnection();
  163. //Admin admin = conn.getAdmin();
  164. admin.disableTable(TableName.valueOf(tablename));
  165. admin.deleteTable(TableName.valueOf(tablename));
  166. System.out.println("delete table " + tablename + " ok.");
  167. } catch (IOException e) {
  168. e.printStackTrace();
  169. }
  170. }

  171. /**
  172. * 刪除一筆資料
  173. * @param tableName
  174. * @param rowKey
  175. */
  176. public static void delRecord (String tableName, String rowKey){
  177. try {
  178. Table table = conn.getTable(TableName.valueOf(tableName));

  179. List<Delete> list = new ArrayList<Delete>();
  180. Delete del = new Delete(rowKey.getBytes());
  181. list.add(del);
  182. table.delete(list);
  183. System.out.println("del recored " + rowKey + " ok.");
  184. } catch (IOException e) {
  185. e.printStackTrace();
  186. }
  187. }

  188. /**
  189. * 取得一筆資料
  190. * @param tableName
  191. * @param rowKey
  192. */
  193. public static void getOneRecord (String tableName, String rowKey){
  194. try {
  195. Table table = conn.getTable(TableName.valueOf(tableName));

  196. Get get = new Get(rowKey.getBytes());
  197. Result rs = table.get(get);
  198. List<Cell> list = rs.listCells();
  199. for(Cell cell:list){
  200. System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  201. System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  202. System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  203. System.out.print(cell.getTimestamp() + " " );
  204. System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  205. System.out.println("");
  206. }
  207. } catch (IOException e) {
  208. e.printStackTrace();
  209. }
  210. }

  211. /**
  212. * 取得所有資料
  213. * @param tableName
  214. */
  215. public static void getAllRecord (String tableName) {
  216. try{
  217. //Connection conn = ConnectionFactory.createConnection();
  218. Table table = conn.getTable(TableName.valueOf(tableName));

  219. Scan scan = new Scan();
  220. ResultScanner resultscanner = table.getScanner(scan);
  221. for(Result rs:resultscanner){
  222. List<Cell> list = rs.listCells();
  223. for(Cell cell:list){
  224. System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  225. System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  226. System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  227. System.out.print(cell.getTimestamp() + " " );
  228. System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  229. System.out.println("");
  230. }
  231. }
  232. } catch (IOException e){
  233. e.printStackTrace();
  234. }
  235. }

  236. /**
  237. * 取得Family清單
  238. * @param tableName
  239. * @return
  240. */
  241. public static ArrayList<String> getAllFamilyName(String tableName) {
  242. ArrayList<String> familyname_list = new ArrayList<String>();
  243. try{
  244. //Connection conn = ConnectionFactory.createConnection();
  245. Table table = conn.getTable(TableName.valueOf(tableName));

  246. HTableDescriptor htabledescriptor = table.getTableDescriptor();
  247. HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies();
  248. for(int i=0;i<hdlist.length;i++){
  249. HColumnDescriptor hd = hdlist[i];
  250. familyname_list.add(hd.getNameAsString());
  251. }
  252. } catch (IOException e){
  253. e.printStackTrace();
  254. }
  255. return familyname_list;
  256. }

  257. // java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5
  258. public static void main(String[] args) {
  259. System.out.println("HelloWorldX");
  260. if (args.length > 0)
  261. System.out.println(args[0]);

  262. int start = 0;
  263. if (args.length > 1)
  264. start = Integer.valueOf(args[1]);
  265. if (start < 0)
  266. start = 0;

  267. int num_threads = 16;
  268. if (args.length > 2)
  269. num_threads = Integer.valueOf(args[2]);

  270. try {
  271. String tablename = "scores";
  272. String[] familys = {"grade", "course"};
  273. HelloWorld.createTable(tablename, familys);

  274. //ExecutorService thread_pool = Executors.newSingleThreadExecutor();
  275. ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads);
  276. Thread[] pool = new HelloWorld.MyThread[80];
  277. for (int i=0; i<pool.length; ++i) {
  278. pool[i] = new HelloWorld.MyThread(i, tablename);
  279. thread_pool.execute(pool[i]);
  280. }

  281. thread_pool.shutdown();
  282. System.out.println("over");
  283. }
  284. catch (Exception e) {
  285. e.printStackTrace();
  286. }
  287. }

  288. }

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved