demo/amqp_clock.py
author barryp@macbook.home
Mon, 07 Jan 2008 17:40:49 -0600
changeset 90 092773c4c5a5
parent 69 16681db5f574
child 108 0c18dce0e372
permissions -rwxr-xr-x
Added tag 0.3 for changeset f784f2d5d1bb
     1 #!/usr/bin/env python
     2 """
     3 AMQP Clock
     4 
     5 Fires off simple messages at one-minute intervals to a topic
     6 exchange named 'clock', with the topic of the message being
     7 the local time as 'year.month.date.dow.hour.minute',
     8 for example: '2007.11.26.1.12.33', where the dow (day of week)
     9 is 0 for Sunday, 1 for Monday, and so on (similar to Unix crontab).
    10 
    11 A consumer could then bind a queue to the routing key '#.0'
    12 for example to get a message at the beginning of each hour.
    13 
    14 2007-11-26 Barry Pederson <bp@barryp.org>
    15 
    16 """
    17 from datetime import datetime
    18 from optparse import OptionParser
    19 from time import sleep
    20 
    21 import amqplib.client_0_8 as amqp
    22 Message = amqp.Message
    23 
    24 EXCHANGE_NAME = 'clock'
    25 TOPIC_PATTERN = '%Y.%m.%d.%w.%H.%M' # Python datetime.strftime() pattern
    26 
    27 def main():
    28     parser = OptionParser()
    29     parser.add_option('--host', dest='host',
    30                         help='AMQP server to connect to (default: %default)',
    31                         default='localhost')
    32     parser.add_option('-u', '--userid', dest='userid',
    33                         help='AMQP userid to authenticate as (default: %default)',
    34                         default='guest')
    35     parser.add_option('-p', '--password', dest='password',
    36                         help='AMQP password to authenticate with (default: %default)',
    37                         default='guest')
    38     parser.add_option('--ssl', dest='ssl', action='store_true',
    39                         help='Enable SSL with AMQP server (default: not enabled)',
    40                         default=False)
    41 
    42     options, args = parser.parse_args()
    43 
    44     conn = amqp.Connection(options.host, options.userid, options.password)
    45     ch = conn.channel()
    46     ch.access_request('/data', write=True, active=True)
    47     ch.exchange_declare(EXCHANGE_NAME, type='topic')
    48 
    49     # Make sure our first message is close to the beginning
    50     # of a minute
    51     now = datetime.now()
    52     if now.second > 0:
    53         sleep(60 - now.second)
    54 
    55     while True:
    56         now = datetime.now()
    57         msg = Message(timestamp=now)
    58         topic = now.strftime(TOPIC_PATTERN)
    59         ch.basic_publish(msg, EXCHANGE_NAME, routing_key=topic)
    60 
    61         # Don't know how long the basic_publish took, so
    62         # grab the time again.
    63         now = datetime.now()
    64         sleep(60 - now.second)
    65 
    66     ch.close()
    67     conn.close()
    68 
    69 if __name__ == '__main__':
    70     main()