第十一章 自己實現一致性hash算法,一致性hash
關於一致性hash算法的意義以及其相對於簡單求余法(除數求余法)的好處,查看第六章 memcached剖析
注意:真實的hash環的數據結構是二叉樹,這裡為了簡便使用了列表List
1、一致性hash算法的使用地方
2、真實服務器節點沒有虛擬化的一致性hash算法實現
ServerNode:真實服務器節點

![]()
1 package hash;
2
3 /**
4 * server節點
5 */
6 public class ServerNode {
7 private String serverName;
8 private long serverHash;
9
10 public String getServerName() {
11 return serverName;
12 }
13
14 public void setServerName(String serverName) {
15 this.serverName = serverName;
16 }
17
18 public long getServerHash() {
19 return serverHash;
20 }
21
22 public void setServerHash(long serverHash) {
23 this.serverHash = serverHash;
24 }
25
26 /**
27 * 下邊重寫hashcode()和equals()是為了在刪除節點的時候只根據傳入的serverName刪除即可
28 */
29 @Override
30 public int hashCode() {
31 final int prime = 31;
32 int result = 1;
33 result = prime * result
34 + ((serverName == null) ? 0 : serverName.hashCode());
35 return result;
36 }
37
38 @Override
39 public boolean equals(Object obj) {
40 if (this == obj)
41 return true;
42 if (obj == null)
43 return false;
44 if (getClass() != obj.getClass())
45 return false;
46 ServerNode other = (ServerNode) obj;
47 if (serverName == null) {
48 if (other.serverName != null)
49 return false;
50 } else if (!serverName.equals(other.serverName))
51 return false;
52 return true;
53 }
54
55
56 }
View Code
注意:
- serverName可以自己取名,這裡取名為"ip:port"
- 對於hashCode()和equals()方法的重寫僅僅是為了刪除服務器節點的時候,只根據serverName就可以刪除,而不需要再計算服務器節點的hash值
ServerComparator:真實服務器比較器

![]()
1 package hash;
2
3 import java.util.Comparator;
4
5 /**
6 * 服務器排序比較器
7 */
8 public class ServerComparator implements Comparator<ServerNode> {
9
10 public int compare(ServerNode node1, ServerNode node2) {
11 if(node1.getServerHash() <= node2.getServerHash()) {
12 return -1;//node1<node2
13 }
14 return 1;//node1>node2
15 }
16
17 }
View Code
注意:
- 關於java的比較器,有兩種:(後者用的多一些)
- javabean實現comparable接口,實現compareTo()方法
- 另外建一個類實現comparator接口,實現其中的compare()方法
ConsistentHash:一致性hash實現類

![]()
1 package hash;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.zip.CRC32;
7
8 /**
9 * 一致性hash實現(數據結構:list)(服務器沒有虛擬化)
10 * 一致性hash的真正數據結構是二叉樹
11 */
12 public class ConsistentHash {
13 private List<ServerNode> servers = new ArrayList<ServerNode>();//存放服務器
14
15 /** 計算服務器和存儲的鍵的hash值 */
16 public long hash(String str){
17 CRC32 crc32 = new CRC32();
18 crc32.update(str.getBytes());
19 return crc32.getValue();
20 }
21
22 /**
23 * 添加server到環上
24 * @param serverName ip:port
25 */
26 public void addServer(String serverName){
27
28 ServerNode node = new ServerNode();
29 node.setServerName(serverName);
30 node.setServerHash(hash(serverName));
31
32 servers.add(node);
33 Collections.sort(servers, new ServerComparator());
34 }
35
36 /**
37 * 從環上刪除server節點
38 */
39 public void deleteServer(String serverName){
40
41 ServerNode node = new ServerNode();
42 node.setServerName(serverName);
43
44 servers.remove(node);
45 }
46
47 /**
48 * 獲取一個緩存key應該存放的位置
49 * @param cachekey 緩存的key
50 * @return 緩存的服務器節點
51 */
52 public ServerNode getServer(String cachekey){
53 long keyHash = hash(cachekey);
54
55 for(ServerNode node : servers){
56 if(keyHash<=node.getServerHash()){
57 return node;
58 }
59 }
60
61 return servers.get(0);//如果node沒有合適放置位置,放在第一台服務器上去
62 }
63
64 /****************測試*******************/
65 public void printServers(){
66 for(ServerNode server : servers){
67 System.out.println(server.getServerName()+"-->"+server.getServerHash());
68 }
69 }
70
71 public static void main(String[] args) {
72 ConsistentHash ch = new ConsistentHash();
73 ch.addServer("127.0.0.1:11211");
74 ch.addServer("127.0.0.1:11212");
75 ch.addServer("127.0.0.2:11211");
76 ch.addServer("127.0.0.2:11212");
77
78 ch.printServers();
79
80 ServerNode node = ch.getServer("hello");
81 System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getServerHash());
82
83 ServerNode node2 = ch.getServer("name");
84 System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getServerHash());
85
86 ServerNode node3 = ch.getServer("a");
87 System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getServerHash());
88
89 /********************刪除節點*********************/
90 ch.deleteServer("127.0.0.1:11212");
91 ch.printServers();
92
93 ServerNode node0 = ch.getServer("hello");
94 System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getServerHash());
95
96 ServerNode node02 = ch.getServer("name");
97 System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getServerHash());
98
99 ServerNode node03 = ch.getServer("a");
100 System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getServerHash());
101
102 }
103 }
View Code
注意:
- 在計算服務器節點和存儲的key的hash值的時候,不僅僅可以使用crc32算法,還可以使用MD5算法等等,只要是最後得出的結果是一個>=0&&<=232的數就好
- 在這個實現中,並沒有將真實服務器節點進行虛擬化
3、真實服務器節點虛擬化後的一致性hash算法實現
為什麼要虛擬化,查看第六章 memcached剖析 ,這裡只列出幾條原因:
- 在memcached服務器較少的情況下,很難平均的分布到hash環上,這樣就會造成負載不均衡--引入虛擬化節點,可以解決這個問題
- 當一台memcached宕機時,其原先所承受的壓力全部給了其下一個節點,為了將其原先所承受的壓力盡可能的分布給所有剩余的memcached節點,引入虛擬化節點可以達到這個目的
- 當新添加了一台memcached服務器server1時,server1只會緩解其中的一台服務器(即server1插入環後,server1的下一個節點)的壓力,為了可以讓server1盡可能的緩解所有的memcached服務器的壓力,引入虛擬節點可以達到這個目的
VirtualServerNode:虛擬節點

![]()
1 package hash2;
2
3 /**
4 * 虛擬節點
5 */
6 public class VirtualServerNode {
7 private String serverName;//真實節點名稱
8 private long virtualServerHash;//虛擬節點hash
9
10 public String getServerName() {
11 return serverName;
12 }
13
14 public void setServerName(String serverName) {
15 this.serverName = serverName;
16 }
17
18 public long getVirtualServerHash() {
19 return virtualServerHash;
20 }
21
22 public void setVirtualServerHash(long virtualServerHash) {
23 this.virtualServerHash = virtualServerHash;
24 }
25
26 }
View Code
注意:
- 該類中的serverName是該虛擬節點對應的真實節點的名稱,這裡就是"ip:port"
- 真正的虛擬節點的名稱是serverName-i(其中,i是0~virtualCount的整數值),這一塊兒請查看ConsistentHashWithVirtualNode的addServer(String serverName)
VirtualServerComparator:虛擬節點比較器

![]()
1 package hash2;
2
3 import java.util.Comparator;
4
5 /**
6 * 虛擬節點比較器
7 */
8 public class VirtualServerComparator implements Comparator<VirtualServerNode> {
9
10 public int compare(VirtualServerNode node1, VirtualServerNode node2) {
11 if(node1.getVirtualServerHash() <= node2.getVirtualServerHash()) {
12 return -1;
13 }
14 return 1;
15 }
16
17 }
View Code
ConsistentHashWithVirtualNode:真實節點虛擬化後的一致性hash算法

![]()
1 package hash2;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.List;
6 import java.util.zip.CRC32;
7
8 /**
9 * 具有虛擬節點的一致性hash實現(數據結構:list)
10 * 一致性hash的真正數據結構是二叉樹
11 */
12 public class ConsistentHashWithVirtualNode {
13 private List<VirtualServerNode> virtualServers = new ArrayList<VirtualServerNode>();//存放虛擬節點
14 private static final int virtualCount = 8;//每個真實節點虛擬成8個虛擬節點
15
16 /** 計算服務器和存儲的鍵的hash值 */
17 public long hash(String str){
18 CRC32 crc32 = new CRC32();
19 crc32.update(str.getBytes());
20 return crc32.getValue();
21 }
22
23 /**
24 * 添加server的虛擬節點到環上
25 * @param serverName ip:port
26 */
27 public void addServer(String serverName){
28
29 for(int count=0;count<virtualCount;count++){
30 VirtualServerNode node = new VirtualServerNode();
31 node.setServerName(serverName);
32 node.setVirtualServerHash(hash(serverName+"-"+count));//虛擬節點的名字:serverName+"-"+count
33 virtualServers.add(node);
34 }
35
36 Collections.sort(virtualServers, new VirtualServerComparator());
37 }
38
39 /**
40 * 從環上刪除server節點(需要刪除所有的該server節點對應的虛擬節點)
41 */
42 public void deleteServer(String serverName){
43
44 /*
45 * 在這種刪除的時候,會出現java.util.ConcurrentModificationException
46 * 這是因為此處的遍歷方式為使用ArrayList內部類Itr進行遍歷,
47 * 在遍歷的過程中發生了remove、add等操作,導致modCount發生了變化,
48 * 產生並發修改異常,
49 * 可以使用下邊的那一種方式來進行遍歷(遍歷方式不是Itr),
50 * 再這樣的遍歷過程中,add和remove都是沒有問題的
51 */
52 /*for(VirtualServerNode node : virtualServers){
53 if(node.getServerName().equals(serverName)){
54 virtualServers.remove(node);
55 }
56 }*/
57 for(int i=0;i<virtualServers.size();i++) {
58 VirtualServerNode node = virtualServers.get(i);
59 if(node.getServerName().equals(serverName)) {
60 virtualServers.remove(node);
61 }
62 }
63
64 }
65
66 /**
67 * 獲取一個緩存key應該存放的位置
68 * @param cachekey 緩存的key
69 * @return 緩存的服務器節點
70 */
71 public VirtualServerNode getServer(String cachekey){
72 long keyHash = hash(cachekey);
73
74 for(VirtualServerNode node : virtualServers){
75 if(keyHash<=node.getVirtualServerHash()){
76 return node;
77 }
78 }
79
80 return virtualServers.get(0);//如果node沒有合適放置位置,放在第一台服務器上去
81 }
82
83 /****************測試*******************/
84 public void printServers(){
85 for(VirtualServerNode server : virtualServers){
86 System.out.println(server.getServerName()+"-->"+server.getVirtualServerHash());
87 }
88 }
89
90 public static void main(String[] args) {
91 ConsistentHashWithVirtualNode ch = new ConsistentHashWithVirtualNode();
92 ch.addServer("127.0.0.1:11211");
93 ch.addServer("127.0.0.1:11212");
94 ch.addServer("127.0.0.2:11211");
95 ch.addServer("127.0.0.2:11212");
96
97 ch.printServers();
98
99 VirtualServerNode node = ch.getServer("hello");
100 System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getVirtualServerHash());
101
102 VirtualServerNode node2 = ch.getServer("name");
103 System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getVirtualServerHash());
104
105 VirtualServerNode node3 = ch.getServer("a");
106 System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getVirtualServerHash());
107
108 /*********************刪除節點之後**********************/
109 ch.deleteServer("127.0.0.1:11212");
110 ch.printServers();
111
112 VirtualServerNode node0 = ch.getServer("hello");
113 System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getVirtualServerHash());
114
115 VirtualServerNode node02 = ch.getServer("name");
116 System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getVirtualServerHash());
117
118 VirtualServerNode node03 = ch.getServer("a");
119 System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getVirtualServerHash());
120
121 }
122 }
View Code
注意:
- 在實際操作中,一台memcached服務器虛擬成150台比較合適(100~200)
- 從環上刪除節點的算法寫的較差,但是考慮到刪除節點的操作在實際使用中用的比較少(宕機比較少,人為的刪除節點也較少),也無所謂
- 刪除節點的時候,注意使用foreach語法糖去遍歷的時候,在遍歷的過程中不可以做刪除、增加操作,否則會拋出並發修改異常,具體的原因見注釋和第二章 ArrayList源碼解析;想要實現在遍歷的過程中進行刪除、增加操作,使用簡單for循環,見如上代碼