程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> rabbitmq-c初探

rabbitmq-c初探

編輯:C++入門知識

 RabbitMQ著實是個好東西,當然了也有對C語言client開發的支持。例子和文檔少的可憐,只能去項目裡去查看example來理解,簡單整理了一些,以免走些彎路。主要是在版本對應上,這點就沒Maven好了,只能對好類庫和例子。接下來我們簡單看看需要的東東。

環境:Ubuntu 13.04

rabbitmq-server 默認的3.0.2-1

librabbitmq-dev 默認的0.0.1.hg216-1

項目構造用的qmake(這樣簡單不少)

1 consumer

1.1 consumer.pro的內容
SOURCES=utils.cpp amqp_consumer.cpp platform_utils.cpp

HEADERS=utils.h

VPATH+=/usr/include

CONFIG+=release

TARGET=consumer

LIBS += -lrabbitmq

1.2 amqp_consumer.cpp代碼
  這裡的代碼來自於rabbitmq-c-v0.3.0 具體查看 https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_consumer.c。(對於幾個特殊的宏引用作了調整)

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <stdint.h>

#include <amqp.h>

#include <amqp_framing.h>

#include <assert.h>

#include "utils.h"

#define SUMMARY_EVERY_US 1000000

 


static void run(amqp_connection_state_t conn)

{

  uint64_t start_time = now_microseconds();

  int received = 0;

  int previous_received = 0;

  uint64_t previous_report_time = start_time;

  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

  amqp_frame_t frame;

  int result;

  size_t body_received;

  size_t body_target;

  uint64_t now;

 


  while (1) {

    now = now_microseconds();

    if (now > next_summary_time) {

      int countOverInterval = received - previous_received;

      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

      printf("%d ms: Received %d - %d since last report (%d Hz)\n",

    (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

      previous_received = received;

      previous_report_time = now;

      next_summary_time += SUMMARY_EVERY_US;

    }

    amqp_maybe_release_buffers(conn);

    result = amqp_simple_wait_frame(conn, &frame);

    if (result < 0)

      return;

    if (frame.frame_type != AMQP_FRAME_METHOD)

      continue;

    if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)

      continue;

 


    result = amqp_simple_wait_frame(conn, &frame);

    if (result < 0)

      return;

    if (frame.frame_type != AMQP_FRAME_HEADER) {

      fprintf(stderr, "Expected header!");

      abort();

    }

    body_target = frame.payload.properties.body_size;

    body_received = 0;

 


    while (body_received < body_target) {

      result = amqp_simple_wait_frame(conn, &frame);

      if (result < 0)

        return;

     if (frame.frame_type != AMQP_FRAME_BODY) {

        fprintf(stderr, "Expected body!");

        abort();

      }

      body_received += frame.payload.body_fragment.len;

      assert(body_received <= body_target);

      amqp_dump(frame.payload.body_fragment.bytes,frame.payload.body_fragment.len);

    }

    received++;

  }

}

 


int main(int argc, char const * const *argv) {

  char const *hostname;

  int port;

  char const *exchange;

  char const *bindingkey;

  int sockfd;

  amqp_connection_state_t conn;

  amqp_bytes_t queuename;

 


  if (argc < 3) {

    fprintf(stderr, "Usage: amqp_consumer host port\n");

    return 1;

  }

  hostname = argv[1];

  port = atoi(argv[2]);

  exchange = "amq.direct"; /* argv[3]; */

  bindingkey = "test queue"; /* argv[4]; */

  conn = amqp_new_connection();

  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

  amqp_set_sockfd(conn, sockfd);

  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

   "Logging in");

  amqp_channel_open(conn, 1);

  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

  {

    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES/*amqp_empty_bytes*/, 0, 0, 0, 1,

   AMQP_EMPTY_TABLE/*amqp_empty_table*/);

    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

    queuename = amqp_bytes_malloc_dup(r->queue);

    if (queuename.bytes == NULL) {

      fprintf(stderr, "Out of memory while copying queue name");

      return 1;

    }

  }

  amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),

 AMQP_EMPTY_TABLE/*amqp_empty_table*/);

  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

  amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES/*amqp_empty_bytes*/, 0, 1, 0, AMQP_EMPTY_TABLE/*amqp_empty_table*/);

  die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

  run(conn);

  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

  die_on_error(amqp_destroy_connection(conn), "Ending connection");

  return 0;

}


2 producer
2.1 producer.pro的內容
SOURCES=utils.cpp amqp_producer.cpp platform_utils.cpp
HEADERS=utils.h
VPATH+=/usr/include
CONFIG+=release
TARGET=producer
LIBS += -lrabbitmq
2.2 amqp_producer.cpp代碼
  這裡的代碼來自於rabbitmq-c-v0.3.0 具體查看https://github.com/alanxz/rabbitmq-c/blob/rabbitmq-c-v0.3.0/examples/amqp_producer.c。(對於幾個特殊的宏引用作了調整)

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <stdint.h>

#include <amqp.h>

#include <amqp_framing.h>

#include "utils.h"

#define SUMMARY_EVERY_US 1000000

 


static void send_batch(amqp_connection_state_t conn,

      char const *queue_name,

      int rate_limit,

      int message_count)

{

  uint64_t start_time = now_microseconds();

  int i;

  int sent = 0;

  int previous_sent = 0;

  uint64_t previous_report_time = start_time;

  uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

  char message[256];

  amqp_bytes_t message_bytes;

 


  for (i = 0; i < (int)sizeof(message); i++) {

    message[i] = i & 0xff;

  }

  message_bytes.len = sizeof(message);

  message_bytes.bytes = message;

  for (i = 0; i < message_count; i++) {

    uint64_t now = now_microseconds();

    die_on_error(amqp_basic_publish(conn,1,amqp_cstring_bytes("amq.direct"),amqp_cstring_bytes(queue_name),

      0,0,NULL,message_bytes),"Publishing");

    sent++;

    if (now > next_summary_time) {

      int countOverInterval = sent - previous_sent;

      double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

      printf("%d ms: Sent %d - %d since last report (%d Hz)\n",(int)(now - start_time) / 1000, sent,

         countOverInterval, (int) intervalRate);

      previous_sent = sent;

      previous_report_time = now;

      next_summary_time += SUMMARY_EVERY_US;

    }

    while (((i * 1000000.0) / (now - start_time)) > rate_limit) {

      microsleep(2000);

      now = now_microseconds();

    }

  }

  {

    uint64_t stop_time = now_microseconds();

    int total_delta = stop_time - start_time;

    printf("PRODUCER - Message count: %d\n", message_count);

    printf("Total time, milliseconds: %d\n", total_delta / 1000);

    printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));

  }

}

 


int main(int argc, char const * const *argv) {

  char const *hostname;

  int port;

  int rate_limit;

  int message_count;

 int sockfd;

  amqp_connection_state_t conn;

  if (argc < 5) {

    fprintf(stderr, "Usage: amqp_producer host port rate_limit message_count\n");

    return 1;

  }

  hostname = argv[1];

  port = atoi(argv[2]);

  rate_limit = atoi(argv[3]);

  message_count = atoi(argv[4]);

  conn = amqp_new_connection();

  die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

  amqp_set_sockfd(conn, sockfd);

  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

   "Logging in");

  amqp_channel_open(conn, 1);

  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

  send_batch(conn, "test queue", rate_limit, message_count);

  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

  die_on_error(amqp_destroy_connection(conn), "Ending connection");

  return 0;

}

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved