程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> 高可用的池化 Thrift Client 實現(源碼分享),池化thrift

高可用的池化 Thrift Client 實現(源碼分享),池化thrift

編輯:JAVA綜合教程

高可用的池化 Thrift Client 實現(源碼分享),池化thrift


本文將分享一個高可用的池化 Thrift Client 及其源碼實現,歡迎閱讀源碼(Github)並使用,同時歡迎提出寶貴的意見和建議,本人將持續完善。

本文的主要目標讀者是對 Thrift 有一定了解並使用的童鞋,如對 Thrift 的基礎知識了解不多或者想重溫一下基礎知識,推薦先閱讀本站文章《和 Thrift 的一場美麗邂逅》。

下面進入正題。

為什麼我們需要這麼一個組件?

我們知道,Thrift 是一個 RPC 框架體系,可以非常方便的進行跨語言 RPC 服務的開發和調用。然而,它並沒有提供針對多個 Server 的 Smart Client【1】。比如,你有一個服務 service,分別部署在 116.31.1.1 和 116.31.1.2 兩台服務器上,當你需要從 Client 端調用該 service 的某個遠程方法的時候,你只能在代碼中顯式指定使用 116.31.1.1 或者 116.31.1.2 其中的一個。這種情況下,你調用的時候無法預知所指定 IP 對應的服務是否可用,並且當該服務不可用時,無法隱式自動切換到調用另外一個 IP 對應的服務。也就是說,服務的狀態對你並不是透明的,並且無法做到服務的負載均衡和高可用。

此外,當你調用遠程方法時,每次你都得新建一個連接,當請求量很大時,不斷的創建、刪除連接所耗費的服務資源是巨大的。

因此,我們需要這麼一個組件,使服務狀態透明化並底層實現負載均衡和高可用,讓你可以專注於業務邏輯的實現,提升工作效率和服務的質量。下面我們就對該組件(ThrifJ)進行詳細的剖析。

它到底能做些什麼?

特性

  • 鏈式調用API,簡潔直觀
  • 完善的默認配置,無需擔心調用時配置不全導致拋錯
  • 池化連接對象,高效管理連接的生命周期
  • 異常服務自動隔離與恢復
  • 多種可配置的負載均衡策略,支持隨機、輪詢、權重和哈希
  • 多種可配置的服務級別,並自動根據服務級別進行服務降級

該如何使用它?

目前最新版本為1.0.1(點此關注最新版本的更新),首先在項目中引入 thriftj-1.0.1.jar,或在 Maven 依賴中加入:

<dependency>
    <groupId>com.github.cyfonly</groupId>
    <artifactId>thriftj</artifactId>
    <version>1.0.1</version>
</dependency>

 需要注意的是,ThriftJ 基於 slf4j 構建,因此你需要在項目中增加具體日志實現的依賴,比如 log4j 或 logback。

然後在項目中,參照以下這段代碼進行調用:

//Thrift server 列表
private static final String servers = "127.0.0.1:10001,127.0.0.1:10002";

//TTransport 驗證器
ConnectionValidator validator = new ConnectionValidator() {
    @Override
    public boolean isValid(TTransport object) {
        return object.isOpen();
    }
};

//連接對象池配置
GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();

//failover 策略
FailoverStrategy failoverStrategy = new FailoverStrategy();

//構造 ThriftClient 對象並配置
final ThriftClient thriftClient = new ThriftClient();
thriftClient.servers(servers)
            .loadBalance(Constant.LoadBalance.RANDOM)
            .connectionValidator(validator)
            .poolConfig(poolConfig)
            .failoverStrategy(failoverStrategy)
            .connTimeout(5)
            .backupServers("")
            .serviceLevel(Constant.ServiceLevel.NOT_EMPTY)
            .start();

//打印從 ThriftClient 獲取到的可用服務列表
List<ThriftServer> servers = thriftClient.getAvailableServers();
for(ThriftServer server : servers){
    System.out.println(server.getHost() + ":" + server.getPort());
}

//服務調用
if(servers.size()>0){
    try{
            TestThriftJ.Client client = thriftClient.iface(TestThriftJ.Client.class);
            QryResult result = client.qryTest(1);
            System.out.println("result[code=" + result.code + " msg=" + result.msg + "]");
      }catch(Throwable t){
            logger.error("-------------exception happen", t);
      }
}

 友情提示:除 servers 必須配置外,其他配置均為可選(使用默認配置)

它是如何設計並實現的呢?

整體設計

連接池對象工廠及連接對象的管理

基於 commons-pool2 中的 KeyedPooledObjectFactory,以 ThriftServer 為 key,TTransport 為 value 進行實現。關鍵代碼如下:

@Override
public PooledObject<TTransport> makeObject(ThriftServer thriftServer) throws Exception {
    TSocket tsocket = new TSocket(thriftServer.getHost(), thriftServer.getPort());
    tsocket.setTimeout(timeout);
    TFramedTransport transport = new TFramedTransport(tsocket);
    
    transport.open();
    DefaultPooledObject<TTransport> result = new DefaultPooledObject<TTransport>(transport);
    logger.trace("Make new thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort());
    
    return result;
}

@Override
public boolean validateObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) {
    boolean isValidate;
    try {
        if (failoverChecker == null) {
            isValidate = pooledObject.getObject().isOpen();
        } else {
            ConnectionValidator validator = failoverChecker.getConnectionValidator();
            isValidate = pooledObject.getObject().isOpen() && (validator == null || validator.isValid(pooledObject.getObject()));
        }
    } catch (Throwable e) {
        logger.warn("Fail to validate tsocket: {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e});
        isValidate = false;
    }
    if (failoverChecker != null && !isValidate) {
        failoverChecker.getFailoverStrategy().fail(thriftServer);
    }
    logger.info("ValidateObject isValidate:{}", isValidate);
    
    return isValidate;
}

@Override
public void destroyObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) throws Exception {
    TTransport transport = pooledObject.getObject();
    if (transport != null) {
        transport.close();
        logger.trace("Close thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort());
    }
}

在使用連接對象時,根據用戶的自定義連接池配置創建連接池,並實現連接對象的獲取、回池、清除以及連接池的關閉操作。關鍵代碼如下:

public DefaultThriftConnectionPool(KeyedPooledObjectFactory<ThriftServer, TTransport> factory, GenericKeyedObjectPoolConfig config) {
	connections = new GenericKeyedObjectPool<>(factory, config);
}

@Override
public TTransport getConnection(ThriftServer thriftServer) {
	try {
		return connections.borrowObject(thriftServer);
	} catch (Exception e) {
		logger.warn("Fail to get connection for {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e});
		throw new RuntimeException(e);
	}
}

@Override
public void returnConnection(ThriftServer thriftServer, TTransport transport) {
	connections.returnObject(thriftServer, transport);
}

@Override
public void returnBrokenConnection(ThriftServer thriftServer, TTransport transport) {
	try {
		connections.invalidateObject(thriftServer, transport);
	} catch (Exception e) {
		logger.warn("Fail to invalid object:{},{}", new Object[] { thriftServer, transport, e });
	}
}

@Override
public void close() {
	connections.close();
}

@Override
public void clear(ThriftServer thriftServer) {
	connections.clear(thriftServer);
}

異常服務自動隔離與恢復

需要實現服務狀態的透明化,就必須在底層實現服務的監測、隔離和恢復。在 ThriftJ 中,調用 ThriftClient 時會啟動一個線程對服務進行異步監測,用戶可以指定檢驗規則(對應配置為 ConnectionValidator)以及 failover 策略(對應配置為 FailoverStrategy,可以指定失敗的次數、失效持續時間和恢復持續時間)。默認情況下,服務驗證規則為判斷 TTransport 是否處於開啟狀態,即:

if (this.validator == null) {
  this.validator = new ConnectionValidator() {
    @Override
    public boolean isValid(TTransport object) {
      return object.isOpen();
    }
  };
}

 而默認的 failover 策略為

  • 失敗次數:10(次),表示通過 ConnectionValidator 檢驗失敗 10 次後才考慮將該服務失效,需要配合失效持續時間一起使用
  • 時效持續時間:1(分鐘),表示在一個檢驗周期內,首次檢驗失敗的時間持續達到該值後才考慮將該服務失效,配合失敗次數一起使用
  • 恢復持續時間:1(分鐘),表示在判定某服務失效並隔離後,經過該值後將服務重新恢復

以上功能基於 Guava cache 實現,關鍵代碼如下:

/**
 * 使用默認 failover 策略
 */
public FailoverStrategy() {
	this(DEFAULT_FAIL_COUNT, DEFAULT_FAIL_DURATION, DEFAULT_RECOVER_DURATION);
}

/**
 * 自定義 failover 策略
 * @param failCount 失敗次數
 * @param failDuration 失效持續時間
 * @param recoverDuration 恢復持續時間
 */
public FailoverStrategy(final int failCount, long failDuration, long recoverDuration) {
	this.failDuration = failDuration;
	this.failedList = CacheBuilder.newBuilder().weakKeys().expireAfterWrite(recoverDuration, TimeUnit.MILLISECONDS).build();
	this.failCountMap = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<T, EvictingQueue<Long>>() {
		@Override
		public EvictingQueue<Long> load(T key) throws Exception {
			return EvictingQueue.create(failCount);
		}
	});
}

public void fail(T object) {
	logger.info("Server {}:{} failed.", ((ThriftServer)object).getHost(),((ThriftServer)object).getPort());
	boolean addToFail = false;
	try {
		EvictingQueue<Long> evictingQueue = failCountMap.get(object);
		synchronized (evictingQueue) {
			evictingQueue.add(System.currentTimeMillis());
			if (evictingQueue.remainingCapacity() == 0 && evictingQueue.element() >= (System.currentTimeMillis() - failDuration)) {
				addToFail = true;
			}
		}
	} catch (ExecutionException e) {
		logger.error("Ops.", e);
	}
	if (addToFail) {
		failedList.put(object, Boolean.TRUE);
		logger.info("Server {}:{} failed. Add to fail list.", ((ThriftServer)object).getHost(), ((ThriftServer)object).getPort());
	}
}

public Set<T> getFailed() {
	return failedList.asMap().keySet();
}

負載均衡

ThriftJ 提供了四種可選的負載均衡策略:

  • 隨機
  • 輪詢
  • 權重
  • 哈希

在用戶不顯式指定的情況下,默認采用隨機算法。具體算法的實現在此就不再進行過多的描述了。

需要注意的是,ThriftJ 嚴格規范了調用的語義,比如使用哈希策略時,必須要指定 hash key;當使用非哈希的其他策略時,一定不能指定 key,避免造成理解的二義性。

服務級別與服務降級

ThriftJ 提供了多種可配置的服務級別,並根據服務級別進行服務降級處理,其對應關系如下:

  • SERVERS_ONLY:最高級別,僅返回配置的 servers 列表中可用的服務
  • ALL_SERVERS:中等級別,當 servers 列表中的服務全部不可用時,返回 backupServers 列表中的可用服務
  • NOT_EMPTY:最低級別,當 servers 和 backupServers 列表中的服務全部不可用時,返回 servers 列表中的所有服務

其中 ThriftJ 默認使用的服務級別是 NOT_EMPTY。服務降級處理的關鍵代碼如下:

private List<ThriftServer> getAvailableServers(boolean all) {
	List<ThriftServer> returnList = new ArrayList<>();
	Set<ThriftServer> failedServers = failoverStrategy.getFailed();
	for (ThriftServer thriftServer : serverList) {
		if (!failedServers.contains(thriftServer))
			returnList.add(thriftServer);
	}
	if (this.serviceLevel == Constant.ServiceLevel.SERVERS_ONLY) {
		return returnList;
	}
	if ((all || returnList.isEmpty()) && !backupServerList.isEmpty()) {
		for (ThriftServer thriftServer : backupServerList) {
			if (!failedServers.contains(thriftServer))
				returnList.add(thriftServer);
		}
	}
	if (this.serviceLevel == Constant.ServiceLevel.ALL_SERVERS) {
		return returnList;
	}
	if(returnList.isEmpty()){
		returnList.addAll(serverList);
	}
	return returnList;
}

我還有話要說

技術的提升源自無私的分享,好的技術或工具分享出來,並不會讓自己失去什麼,反而可以在大家共同研究和溝通後使之獲得更好的完善。不要擔心自己寫的工具不夠好,不要害怕自己的技術不夠牛,誰能一步就登天呢?

請熱愛你的熱愛!

 

 

【1】Smart Client:比如 MongoClient,可自動發現集群服務節點、自動故障轉移和負載均衡。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved