說是推送系統有點大,其實就是一個消息廣播功能吧。作用其實也就是由服務端接收到消息然後推送到訂閱的客戶端。
對於推送最關鍵的是服務端向客戶端發送數據,客戶端向服務端訂閱自己想要的消息。這樣的好處就是有消息後才向客戶端推送,相比於拉取數據不會產生許多無效的查詢,實時性也高。
xmpp這種即時通信協議基於TCP長連接還是比較符合這種場景的。只需要在服務端增加一個模塊用於接收用戶訂閱與數據的推送就完成了主體功能。
在xmpp協議裡可以擴展組件,這樣我們寫一個組件,然後連接到xmpp服務器,這樣就可以應用於不同的xmpp服務器。
因為我比較熟悉openfire的體系,所以自然就用它。客戶端暫時沒有特別的需求,只是用於接收數據,所以用smack或者任何一款xmpp 客戶端都可以。我為了簡單就用smack寫一個簡單的代碼。
用到的了whack的core,在maven工程裡直接引用即可,相關的依賴包會自動加載進來
<dependency>
<groupId>org.igniterealtime.whack</groupId>
<artifactId>core</artifactId>
<version>2.0.1-SNAPSHOT</version>
<type>jar</type>
</dependency>
推送服務就是等待或者獲得需要推送的消息數據後向用戶廣播出去的服務。因為這裡暫時沒有設定數據的場景,所以就簡單的用一個阻塞隊列來表示。步驟:
在此我寫了一個PushServer的類用於表示推送服務,這個類裡包含了:
//消息列表
private BlockingQueue<Packet> packetQueue;
使用到了生產者消費者模式,所以用了一個阻塞隊列,用於存放等待發送的消息數據。
private class PacketSenderThread extends Thread {
private volatile Boolean shutdown = false;
private BlockingQueue<Packet> queue;
private Component component;
private ComponentManager componentManager;
public PacketSenderThread(ComponentManager componentManager, Component component, BlockingQueue<Packet> queue) {
this.componentManager = componentManager;
this.component = component;
this.queue = queue;
}
public void run() {
while (!shutdown) {
Packet p;
try {
p = queue.take();
componentManager.sendPacket(component, p);
} catch (InterruptedException e1) {
System.err.println(e1.getStackTrace());
} catch (ComponentException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
shutdown = true;
this.interrupt();
}
}
這個線程繼承了Thread,線程的功能很簡單,就是一直從queue中獲得消息,因為是阻塞的隊列,所以沒有消息時會阻塞,一旦有消息就會執行發送sendPacket將包發送出去。
這裡使用到了componentManager,這個是openfire實現的一個組件管理類,通過這個類的對象可以發送xmpp數據包。
增加shutdown方法,使得線程可以在外部進行退出操作。
//訂閱列表
private Set<JID> subscriptions;
public synchronized void subscription(JID jid) {
subscriptions.add(jid);
}
public synchronized void unsubscription(JID jid) {
subscriptions.remove(jid);
}
只有訂閱了這個推送服務的客戶端才會進行推送操作,這裡的代碼就是用於訂閱與退訂操作。用了一個HashSet來存儲。
public class PushComponent extends AbstractComponent{
public PushComponent() {
}
@Override
public String getDescription() {
return "用於消息推送服務組件,主要功能就是將消息轉發給具體的客戶端,實現消息中轉的功能";
}
@Override
public String getName() {
return "pusher";
}
@Override
protected void handleMessage(Message message) {
}
}
public class PushManager {
private static PushManager _instance = new PushManager();
private Map<String, PushServer> pushServers;
private ExternalComponentManager manager;
private PushManager() {
pushServers = new ConcurrentHashMap<String, PushServer>();
manager = new ExternalComponentManager("192.168.149.214", 5275);
manager.setSecretKey("push", "test");
manager.setMultipleAllowed("push", true);
}
public static PushManager getInstance() {
return _instance;
}
public void init() {
try {
//初始化PushServer
PushServer pushSvr = new PushServer("push", manager);
pushServers.put("push", pushSvr);
//注冊Component到xmpp服務器
manager.addComponent(pushSvr.getPushDomain(), pushSvr.getComp());
} catch (ComponentException e) {
e.printStackTrace();
}
}
public PushServer getPushServer(String pushDomain) {
return pushServers.get(pushDomain);
}
}
這裡的PushComponent就是一個xmpp組件,相當於一個擴展模塊,可以接收消息並處理消息,也就是自己寫一些和xmpp相關的業務功能。
PushManager就是管理組件並連接到xmpp服務器的一個類。
public class App
{
public static void main( String[] args )
{
PushManager.getInstance().init();
//推送消息
PushServer ps = PushManager.getInstance().getPushServer("push");
ps.start();
JID client1 = new JID("1twja8e8yr@domain/1twja8e8yr");
ps.subscription(client1);
try {
for (Integer i = 0; i< 200; i++) {
ps.putPacket("推送消息200:" + i.toString());
Thread.sleep(1);
}
Thread.sleep(5000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
ps.stop();
System.out.println("go die");
}
}
這段代碼模擬了服務的啟動,同時為了簡化功能這裡直接添加了一個訂閱用戶。
public class TestAnonymous {
public static void main(String[] args) {
AbstractXMPPConnection connection = SesseionHelper.newConn("192.168.149.214", 5223, "domain");
try {
connection.login();//匿名登錄
connection.addAsyncStanzaListener(new StanzaListener() {
@Override
public void processPacket(Stanza packet) throws NotConnectedException {
System.out.println((new Date()).toString()+ ":" + packet.toXML());
}
}, new StanzaFilter() {
@Override
public boolean accept(Stanza stanza) {
return stanza instanceof Message;
}
});
} catch (XMPPException | SmackException | IOException e) {
e.printStackTrace();
}
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
客戶端代碼啟動一個xmpp連接,然後登錄到服務器,同時訂閱消息,將收到的消息print出來。
整個過程就完成了。
注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文鏈接! 若您覺得這篇文章還不錯請點擊下右下角的推薦,非常感謝! http://www.cnblogs.com/5207