connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('logs'); $queue->declare(); $queue->bind('exchange', 'logs'); while (true) { $queue->consume('callback'); } $connection->close(); function callback($envelope, $queue) { var_dump($envelope->getBody()); $queue->nack($envelope->getDeliveryTag()); }
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $exchange->publish('direct type test','logs'); var_dump("Send Message OK"); $connect->disconnect();
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('logs'); @$queue->declare(); $queue->bind('exchange', 'logs'); while (true) { $queue->consume('callback'); } $connection->close(); function callback($envelope, $queue) { var_dump($envelope->getBody()); $queue->nack($envelope->getDeliveryTag()); }
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); for ($index = 1; $index < 5; $index++) { $exchange->publish($index,'logs'); var_dump("Send:$index"); } $exchange->delete(); $connect->disconnect();
for ($index = 1; $index < 50; $index++) { $exchange->publish($index,'logs'); var_dump("Send:$index"); }
function callback($envelope, $queue) { var_dump($envelope->getBody()); sleep(3); $queue->nack($envelope->getDeliveryTag()); }
$channel = new AMQPChannel($connect);改成如下
$channel = new AMQPChannel($connect); $channel->setPrefetchCount(1);