06 Feb 2011, 12:42

Mecha Multi Mechanize

Awhile back, I had the pleasure of contributing a small amount of code to Corey Goldberg’s excellent multi-mechanize web performance and load testing framework. Since I have been experimenting with using zeromq in place of the python Queue object for communication between processes and threads, I decided that multi-mechanize would be an excellent code base to experiment on. So today, I have started mecha-multi-mechanize. Over the course of the next few weeks, I plan to play with incorporating zeromq (and hopefully eventlet) into the original code.

As a first step, today I ripped out the Queue object that was used to communicate between agent threads and the resultswriter: Here is the diff from the commit. I was pleased with how straightforward it was.

05 Feb 2011, 19:36

Simple TCP to ZeroMQ Message Forwarder with Eventlet

I spent part of the day playing with eventlet today. Eventlet recently added zeromq support. Here’s a very simple message forwarder that sends line terminated messages sent over TCP to a zeromq push socket, using the zeromq hub:

import eventlet
from eventlet.green import socket, zmq


def read_socket(writer, reader):
    message = reader.readline()
    while message:
        writer.send(message)
        message = reader.readline()

def listen_socket(address, port):
    eventlet.hubs.use_hub("zeromq")    
    server = eventlet.listen((address, port))
    while True:
        new_connection, address = server.accept()
        new_zmq_context = zmq.Context()
        new_zmq_push_socket = new_zmq_context.socket(zmq.PUSH)
        new_zmq_push_socket.connect("tcp://127.0.0.1:5558")
        eventlet.spawn_n(read_socket, new_zmq_push_socket, new_connection.makefile('r'))

if __name__ == '__main__':
    listen_socket('127.0.0.1', 7000)

03 Feb 2011, 17:08

Python Multiprocessing - ZeroMQ vs Queue

As a quick follow up to my previous post, here’s a look at the performance of passing messages between two python processes using the Queue class vs using 0mq push / pull connections.  As a quick test, we will pass 10 million messages between two processes, first using Queue, then using 0mq.

Multiprocessing test with Queue

import sys
import time
from  multiprocessing import Process, Queue

def worker(q):
    for task_nbr in range(10000000):
        message = q.get()
    sys.exit(1)
 
def main():
    send_q = Queue()
    Process(target=worker, args=(send_q,)).start()
    for num in range(10000000):
        send_q.put("MESSAGE")

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 10000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

Multiprocessing test with 0mq

import sys
import zmq
from  multiprocessing import Process
import time

def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    for task_nbr in range(10000000):
        message = work_receiver.recv()

    sys.exit(1)

def main():
    Process(target=worker, args=()).start()
    context = zmq.Context()
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")
    for num in range(10000000):
        ventilator_send.send("MESSAGE")

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 10000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

Queue Results

python2 ./multiproc_with_queue.py
Duration: 164.182257891
Messages Per Second: 60907.9210414

0mq Results

python2 ./multiproc_with_zeromq.py
Duration: 23.3490710258
Messages Per Second: 428282.563744

The numbers speak for themselves.

02 Feb 2011, 18:20

Python Multiprocessing with ZeroMQ

Recently, I’ve begun exploring the depths of ZeroMQ, and the pyzmq bindings.  This post is not an introduction to ZeroMQ, but for a basic rundown the “0MQ in A Hundred Words” blurb from the project site suffices:

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pubsub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.
For more detail I highly recommend reading 0MQ, The Guide.

As I was going over the various python code examples at github, I became interested in the taskvent / tasksink / taskwork examples. The pattern was recognizable as one I often use for distributed processing. In the past, I typically would have implemented such a work flow using the python multiprocessing library, using it’s Queue class to communicate between processes. Recently I’ve implemented several data processing pipelines for work using the same technique, but using zeromq channels for communication, and I’ve been extremely pleased with both the performance, and the ease of use. So, I decided to write a short blog post with a simple example for others working on the same sorts of problems.

For the example, I’ve implemented a small distributed system that calculates the squares of a series of numbers, based on the python examples. The pieces are as follows:

ventilator

  • The ventilator sends messages containing the numbers to be squared.
  • Uses a ZMQ_PUSH socket to send messages to workers.
workers
  • The workers receive messages from the ventilator, do the work, and send the results down the pipe.
  • Uses a ZMQ_PUSH socket to send answers to the results manager.
  • Uses a ZMQ_SUB socket to receive the FINISH message from the results manager.
results manager
  • The results manager receives all answers from all workers, prints them, and sends a message to the workers to shut down when all tasks are complete.
  • Uses a ZMQ_PULL socket to receive answers from the workers.
  • Uses a ZMQ_PUB socket to send the FINISH message to the workers.
A diagram of the basic flow:

So, without further ado, let’s look at the code!

First, we import zmq, time, and the multiprocessing Process class:

import time
import zmq
from  multiprocessing import Process

ventilator

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Send the numbers between 1 and ten thousand as work messages
    for num in range(10000):
        work_message = { 'num' : num }
        ventilator_send.send_json(work_message)

    time.sleep(1)

worker

def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
        socks = dict(poller.poll())

    # If the message came from work_receiver channel, square the number
    # and send the answer to the results reporter
    if socks.get(work_receiver) == zmq.POLLIN:
        work_message = work_receiver.recv_json()
        product = work_message['num'] * work_message['num']
        answer_message = { 'worker' : wrk_num, 'result' : product }
        results_sender.send_json(answer_message)

    # If the message came over the control channel, shut down the worker.
    if socks.get(control_receiver) == zmq.POLLIN:
        control_message = control_receiver.recv()
        if control_message == "FINISHED":
            print("Worker %i received FINSHED, quitting!" % wrk_num)
            break

results manager

def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(10000):
        result_message = results_receiver.recv_json()
        print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

        # Signal to all workers that we are finsihed
        control_sender.send("FINISHED")
        time.sleep(5)

And away we go…

if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    worker_pool = range(10)
    for wrk_num in range(len(worker_pool)):
    Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager...
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()

The code from this example is available at: https://github.com/taotetek/blog_examples

You may have noticed that I am using the tcp transport for communication between my processes.  This is what I personally find the most exciting about ZeroMQ: if you use ZeroMQ for communication between processes in a multiprocess system, it is close to trivial to scale the code to run on multiple servers.

This only begins to scratch the surface of ZeroMQ.  ZeroMQ supports multiple transports and many more topologies, and the library is available for multiple languages.  This is a fantastic tool for both distributed computing, and polygot programming.