NAME AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client VERSION version 0.01 SYNOPSIS use strict; use warnings; use AnyEvent::RabbitMQ::Simple; # create main loop my $loop = AE::cv; my $rmq = AnyEvent::RabbitMQ::Simple->new( host => '127.0.0.1', port => 5672, user => 'username', pass => 'password', vhost => '/', timeout => 1, tls => 0, verbose => 0, confirm_publish => 1, prefetch_count => 10, failure_cb => sub { my ($event, $details, $why) = @_; if ( ref $why ) { my $method_frame = $why->method_frame; $why = $method_frame->reply_text; } $loop->croak("[ERROR] $event($details): $why" ); }, # routing layout # [========== exchanges ===================] [===== queues ==============] # [ (type/routing key) ] [ (routing key) ] # logger ----------> stats --------------> stats-logs # |(fanout) (direct) (mail.stats) # | | # | | \----------> errors -------------> ftp-error-logs # | | | (topic:*.error.#) (ftp.error.#) # | | | # | | \-------------------> mail-error-logs # | | (mail.error.#) # | | # | \-----------> info ---------------> info-logs # | (topic:*.info.#) (*.info.#) # | # \------------------------------------> debug-queue # declare exchanges exchanges => [ 'logger' => { durable => 0, type => 'fanout', internal => 0, auto_delete => 1, }, 'stats' => { durable => 0, type => 'direct', internal => 0, auto_delete => 1, }, 'errors' => { durable => 0, type => 'topic', internal => 0, auto_delete => 1, }, 'info' => { durable => 0, type => 'topic', internal => 0, auto_delete => 1, }, ], # declare queues queues => [ 'debug-queue' => { durable => 0, auto_delete => 1, }, 'stats-logs' => { durable => 0, auto_delete => 1, }, 'ftp-error-logs' => { durable => 0, auto_delete => 1, }, 'mail-error-logs' => { durable => 0, auto_delete => 1, }, 'info-logs' => { durable => 0, auto_delete => 1, }, ], # exchange to exchange bindings, with optional routing key bind_exchanges => [ { 'stats' => 'logger' }, { 'errors' => [ 'logger', '*.error.#' ] }, { 'info' => [ 'logger', '*.info.#' ] }, ], # queue to exchange bindings, with optional routing key bind_queues => [ { 'debug-queue' => 'logger' }, { 'ftp-error-logs' => [ 'errors', 'ftp.error.#' ] }, { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] }, { 'info-logs' => [ 'info', 'info.#' ] }, { 'stats-logs' => [ 'stats', 'mail.stats' ] }, ], ); # publisher timer my $t; # connect and set up channel my $conn = $rmq->connect(); $conn->cb( sub { print "waiting for channel..\n"; my $channel = shift->recv or $loop->croak("Could not open channel"); print "************* consuming\n"; for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) { consume($channel, $q); } print "************* starting publishing\n"; $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) }; } ); # consumes from requested queue sub consume { my ($channel, $queue) = @_; my $consumer_tag; $channel->consume( queue => $queue, no_ack => 0, on_success => sub { my $frame = shift; $consumer_tag = $frame->method_frame->consumer_tag; print "************* consuming from $queue with $consumer_tag\n"; }, on_consume => sub { my $res = shift; my $body = $res->{body}->payload; print "+++++++++++++ consumed($queue): $body\n"; $channel->ack( delivery_tag => $res->{deliver}->method_frame->delivery_tag ); }, on_failure => sub { print "************* failed to consume($queue)\n"; } ); } # randomly generates routing key and message body sub publish { my ($channel, $msg) = @_; unless ( $channel->is_open ) { warn "Cannot publish, channel closed"; return; } my @system = qw( mail ftp web ); my @levels = qw( debug info error stats ); my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ]; $msg = sprintf("[%s] %s", uc($routing_key), $msg); print "\n------- publishing: $msg\n"; $channel->publish( routing_key => $routing_key, exchange => 'logger', body => $msg, on_ack => sub { print "------- published: $msg\n"; }, on_return => sub { print "************* failed to publish: $msg\n"; } ); } # wait forever or die on error my $done = $loop->recv; DESCRIPTION This module is meant to simplify the process of setting up the RabbitMQ channel, so you can start publishing and/or consuming messages without chaining "on_success" callbacks. METHODS new my $rmq = AnyEvent::RabbitMQ::Simple->new( ... ); Returns configured the object using following parameters: host my $rmq = AnyEvent::RabbitMQ::Simple->new( host => '127.0.0.1', # default ... ); Host IP. port my $rmq = AnyEvent::RabbitMQ::Simple->new( port => 5672, # default ... ); Port number. vhost my $rmq = AnyEvent::RabbitMQ::Simple->new( vhost => '/', # default ... ); Virtual host namespace. user my $rmq = AnyEvent::RabbitMQ::Simple->new( user => 'guest', # default ... ); User name. pass my $rmq = AnyEvent::RabbitMQ::Simple->new( pass => 'guest', # default ... ); Password. tune my $rmq = AnyEvent::RabbitMQ::Simple->new( tune => { heartbeat => $connection_heartbeat, channel_max => $max_channel_number, frame_max => $max_frame_size }, ... ); Connection tuning options. timeout my $rmq = AnyEvent::RabbitMQ::Simple->new( timeout => 0, # default ... ); Connection timeout. tls my $rmq = AnyEvent::RabbitMQ::Simple->new( tls => 0, # default ... ); Use TLS. verbose my $rmq = AnyEvent::RabbitMQ::Simple->new( verbose => 0, # default ... ); Turn on protocol debug. confirm_publish my $rmq = AnyEvent::RabbitMQ::Simple->new( confirm_publish => 0, # default ... ); Turn on confirm mode on channel. If set it enables the "on_ack" callback of channel's "publish" method. prefetch_count my $rmq = AnyEvent::RabbitMQ::Simple->new( prefetch_count => 0, # default ... ); Specify the number of prefetched messages when consuming from the channel. exchange my $rmq = AnyEvent::RabbitMQ::Simple->new( exchange => 'name_of_exchange', ... ); Optional name of exchange to declare with its default configuration options. See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details. exchanges my $rmq = AnyEvent::RabbitMQ::Simple->new( exchanges => [ 'name_of_exchange' => { durable => 1, type => 'fanout', ... # other exchange configuration parameters }, ... ], ... ); Optional list of exchanges to declare with their configuration options. See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details. queue my $rmq = AnyEvent::RabbitMQ::Simple->new( queue => 'name_of_queue', ... ); Optional name of queue to declare with its default configuration options. If no queues were declared or empty name has been specified a unique generated queue name will be available: my $gen_queue = $rmq->gen_queue; See "declare_queue" in AnyEvent::RabbitMQ::Channel for details. queues my $rmq = AnyEvent::RabbitMQ::Simple->new( queues => [ 'name_of_queue' => { durable => 1, no_ack => 0, ... # other queue configuration parameters }, ... ], ... ); Optional list of queues to declare with their configuration options. See "declare_queue" in AnyEvent::RabbitMQ::Channel for details. gen_queue my $gen_queue = $rmq->gen_queue; Name of the generated queue if no queues were declared (or queue with empty name has been specified). bind_exchanges my $rmq = AnyEvent::RabbitMQ::Simple->new( bind_exchanges => [ # without routing key { 'destination1' => 'source' }, # with routing key { 'destination2' => [ 'source', 'routing_key' ] }, ... ], ... ); Optional list of exchange-to-exchange bindings. See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details. bind_queues my $rmq = AnyEvent::RabbitMQ::Simple->new( bind_queues => [ # without routing key { 'queue1' => 'exchange' }, # with routing key { 'queue2' => [ 'exchange', 'routing_key' ] }, ... ], ... ); Optional list of queue-to-exchange bindings. See "bind_queue" in AnyEvent::RabbitMQ::Channel for details. failure_cb my $rmq = AnyEvent::RabbitMQ::Simple->new( failure_cb => sub { my ($event, $details, $why) = @_; if ( ref $why ) { my $method_frame = $why->method_frame; $why = $method_frame->reply_text; } $loop->croak("[ERROR] $event($details): $why" ); }, ... ); Generic error handling callback. The value of $event is one of: ConnectOnFailure ConnectOnReadFailure ConnectOnReturn ConnectOnClose OpenChannelOnFailure OpenChannelOnReturn OpenChannelOnClose DeclareExchangeOnFailure Value of $details has following format: "name:$name_of_exchange". BindExchangeOnFailure Value of $details has following format: "source:$name_of_source_exchange, destination:$name_of_destination_exchange". DeclareQueueOnFailure Value of $details has following format: "name:$name_of_queue". BindQueueOnFailure Value of $details has following format: "queue:$name_of_queue, exchange:$name_of_exchange". ConfirmChannelOnFailure QosChannelOnFailure connect my $conn = $rmq->connect(); $conn->cb( sub { my $channel = shift->recv or $loop->croak("Could not open channel"); ... } ); Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel object after all the configuration steps were successful. disconnect $rmq->disconnect(); Disconnects from RabbitMQ server. SEE ALSO * AnyEvent::RabbitMQ * AUTHOR Alex J. G. Burzyński COPYRIGHT AND LICENSE This software is copyright (c) 2016 by Alex J. G. Burzyński . This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.