04 Mar 2017, 14:43

Problem: I want to use Go service discovery clients with CZMQ

Solution: zdiscgo


At work, I have a tier of Rsyslog instances on an Apache Mesos cluster for two purposes: indexing logs into Elasticsearch via the Rsyslog omelasticsearch plugin, and publishing logs on ZeroMQ endpoints for people who wish to subscribe to logs from specific hosts and services. The rsyslog instances are scheduled to run on arbitrary hosts, and the ZeroMQ sockets are bound to random ports on the hosts the instances end up on. The ports themselves are labeled, so that they can be searched for using the scheduler’s task API.

The first client I used with these endpoints was a command line client that I wrote in Go, using GoCZMQ. It was fairly painless to use the Marathon scheduler’s API (Marathon is a scheduler that works with Mesos) via the go-marathon library to look up the current hosts and ports the ZeroMQ endpoints are exposed on and use that information to connect to them.

Recently, I started on another client written in straight C. I wanted to use the liblognorm log normalization library in this client, and did not want to deal with the overhead of using cgo to call the library from Go. That being said, I also did not want to go through the pain of writing marathon API calls in straight C - making HTTP requests and parsing JSON returns would be a lot of code.

As I was puzzling over which way I wanted to solve this problem, I stumbled on a great blog post from Vladmir Vivien, “Calling Go Functions from Other Languages”. Since I had plans to start experimenting with ZeroMQ + Kubernetes in addition to solving this specific problem, I decided to solve the problem of “calling Go service discovery libraries from C” in a general sense.

The end result is zdiscgo, a czmq zactor implementation that can load Go libraries at run time for service discovery. Let’s take a look at how this works!

Problem: I need a Go interface for my shared library

First I designed a minimal interface for the Go code. I’m using the term “interface” somewhat loosely here, as C does not know about Go interfaces and will just directly load the function. ZDiscgoDiscoverEndpoints(url, key string) *C.char seemed like enough to get started with. It accepts a URL for a service discovery API endpoint, and a key. It makes the assumption that they key will be used in some fashion to look up ZeroMQ endpoints via an API call to the service at the URL, and that a comma delimited list of ZeroMQ endpoints will be returned. The CZMQ ZeroMQ API has constructor functions for zsock_t * (a handle to a ZeroMQ socket) that can accepted a comma delimited list of endpoints, so the result can be passed directly to those functions.

Since C functions can only return one result, the function is expected to handle an error by simply returning an empty string. In the interest of not having to deal with passing references to Go data structures to C, the function is expected to return a *C.char. Since you need to import C in order to export a function to C anyway, I feel that this isn’t too onerous.

For testing, the zdisgco repo includes libmockdiscgo.go, a mock implementation that combines the passed in url and key and returns them:

package main

import "C"
import "fmt"

//export ZDiscgoDiscoverEndpoints
func ZDiscgoDiscoverEndpoints(url, key string) *C.char {
	return fmt.Sprintf("inproc://%s-%s", url, key)

func main() {}

This code can then be compiled with -buildmode=c-shared into a shared library that can be used from C:

go build -o libmockdiscgo.so -buildmode=c-shared libmockdiscgo.go 

The above will create both the libmockdiscgo.so shared library, and a C header. The next step was to write a minimal API to make using this from C as pleasant as possible.

Problem: I need a friendly C API to wrap my Go library

Now that I had the Go library worked out, I needed a C API for using it. zdiscgoplugin.c provides a simple C API for loading Go plugins that are written to the above specification. It is written in accordance with 21/CLASS, the style guide that is typically used for ZeroMQ organization projects. The selftest provides a good overview of how to use it:

    //  @selftest
    zdiscgoplugin_t *self = zdiscgoplugin_new ("./go/libmockdiscgo.so");
    assert (self);

    const char *endpoints = zdiscgoplugin_discover_endpoints (self, "url", "key");
    assert (streq ("inproc://url-key", endpoints));

    zdiscgoplugin_destroy (&self);
    //  @end
    printf ("OK\n");:1

Let’s take a look at how it works, starting with the zdiscgoplugin_t struct. This struct holds a handle to the dynamically loaded library, and a reference to our Go ZDiscgoDiscoverEndpoints function:

struct _zdiscgoplugin_t {
    void *handle;
    char * (*discover)(go_str, go_str);

The constructor accepts the path to a dynamic library and returns a pointer to the struct:

zdiscgoplugin_t *
zdiscgoplugin_new (char *libpath)

The first thing the constructor does is allocate memory for the struct. If there is an error allocating this memory, it returns a NULL. Unlike Go, C only supports a single return value for a function. A caller of this function is responsible for checking that the return is not NULL.

    zdiscgoplugin_t *self = (zdiscgoplugin_t *) zmalloc (sizeof (zdiscgoplugin_t));
    if (!self)
        return NULL;

dlopen loads the shared library file and returns a handle to the loaded object. On Linux this function is part of dlfcn.h, which defines functions for working with shared libraries.

    self->handle = dlopen (libpath, RTLD_NOW);

dlsym accepts a handle to a shared library and the name of symbol in that library, and returns the memory address of the symbol, which is then stored in the struct as a function pointer:

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-pedantic"
    self->discover = (char * (*)(go_str, go_str)) dlsym(self->handle, "DiscoverEndpoints");
#pragma GCC diagnostic pop 

    return self;

But what is all this #pragma stuff? As it turns out, dlsym returns an object pointer, and ISO C does not allow the conversion of an object pointer to a function pointer. Compiling this code with warnings enabled will result in a warning:

src/zdiscgoplugin.c:46:22: error: ISO C forbids conversion of object pointer to function pointer type [-Werror=pedantic]

I used a pragma to turn off pedantic warnings for this line of code.

The zdiscgo_discover_endpoints call accepts a zdiscgoplugin_t *, and two char *s (an url to a service discovery service, and the key to use for a lookup):

const char *
zdiscgoplugin_discover_endpoints (zdiscgoplugin_t *self, char *url, char *key)

The first thing the function does is convert the url and key to a go_str, which is a custom C type that is memory compatible with the Go string type. This type is defined in zdiscgoplugin.h:

typedef long long go_int;
typedef struct{const char *p; go_int len;} go_str;

The conversion itself is relatively straightforward:

    go_str discover_url = {url, strlen (url)};
    go_str discover_key = {key, strlen (key)};

Now, we can call the go function through reference held in self->discover:

    char *endpoints = self->discover (discover_url, discover_key);
    return endpoints;

The zdiscgoplugin_destroy function cleans up our memory when we are finished. In this case it does not really do much, but it makes for a nice complete API and will be handy in the future as we expand on things:

zdiscgoplugin_destroy (zdiscgoplugin_t **self_p)
    if (*self_p) {
        zdiscgoplugin_t *self = *self_p;
        //  Free class properties here
        //  Free object itself
        free (self);
        *self_p = NULL;

Problem: I don’t want service discovery calls to block my application.

Making calls over the network to service discovery systems can take quite a bit of time. In order to allow asyncronous requests and replies, I wanted the ability to run the service discovery interface in its own thread. The CZMQ library provides a class for exactly this purpose: zactor. A zactor is similar in concept to a microservice, except it runs as a thread in your local process. Like a microservice, all communication to and from the thread is via protocol messages (as opposed to sharing memory between threads). Since ZeroMQ uses the same socket API for communicating between threads as it does for communicating over network transports, the code looks the same as it would if you were talking to a remove service.

The first thing I did was design a small command protocol for the actor. Each command in the command protocol is a command frame followed by 1 or more argument frames. These commands are handled internally by zdiscgo_recv_api.

First, the handler receives the full message in the form of a zmsg_t *. It pops the first frame off the message and checks the command.

The VERBOSE command sets verbose mode for the service. Debugging information will be sent to stdout. The command handling code simply sets a verbose flag in the zdiscgo_t struct.

The CONFIGURE command is a 2 frame command. The second frame contains the path to the Go shared library to load. The command handling code attempts to construct a zdiscgoplugin_t * using the supplied path. It returns a 0 on success and a -1 if there is an error.

The DISCOVER command is a 3 frame command. The second and third frames are the url and key used by ZDiscgoDiscoverEndpoints. It will call our Go library, and sends back a single frame containing a comma delimited list of endpoints on success or an empty string on failure.

These details are abstracted away from the user’s perspective. You don’t have to worry about dealing with threads or inter-thread communication. You just make a new instance of the class and use it.

    //  Create a zdiscgo instance. This will spin up a new OS level
    //  thread that will handle service discovery requests.

    zactor_t *zdiscgo = zactor_new (zdiscgo_actor, NULL);

    //  We communicate with the service discovery thread over
    //  a ZMQ_PAIR socket. You can pass the zdisgco instance
    //  to any CZMQ methods that accept zsock_t *. 
    //  Let's set the service to verbose mode.

    zdiscgo_verbose (zdiscgo);

    //  Next, let's configure the service by telling it to load 
    //  our go shared library. The zstr_sendx command will send
    //  multiple string frames. A NULL terminates the message.
    int rc = zdiscgo_load_plugin (zdiscgo, "./go/libmockdiscgo.so");
    assert (rc == 0);

    //  Now let's get some endpoints! We send a DISCOVER command
    //  that consists of the url of a service discovery service,
    //  and the identifer the service should use to find the 
    //  endpoints we want.

    char *endpoints = zdiscgo_discover (zdiscgo, "url", "key");

    //  Check that we have the correct response

    assert (streq (endpoints, "inproc://url-key"));

    //  Shut down the zdisgco instance and clean up memory.

    zactor_destroy (&zdiscgo);

Let’s take a deeper look at how all of this works, starting with zdiscgo_t

struct _zdiscgo_t {
    zsock_t *pipe;              //  Actor command pipe
    zpoller_t *poller;          //  Socket poller
    zdiscgoplugin_t *plugin;    //  zdiscgoplugin_t reference
    bool terminated;            //  Did caller ask us to quit?
    bool verbose;               //  Verbose logging enabled?
  • zsock_t *pipe holds the zeromq socket that is used to send and receive messages between threads. If you are a Go programmer, you can think of this as somewhat similar to a Go channel.
  • zsock_t *poller holds the poller that is used to watch for incoming messages on the socket.
  • zdiscoplugin_t *plugin holds the plugin, which manages communicating with the Go shared libary.
  • bool terminated is set to true when zactor_destroy is called and triggers shutdown / cleanup.
  • bool verbose is initially set to false, and is set to true by a call to zdiscgo_verbose.

Next up is the zactor function itself. The zactor constructor expects a function of type void (zactor_fn) (zsock_t *pipe, void *args):

    zactor_t *zdiscgo = zactor_new (zdiscgo_actor, NULL);

Here is my implementation of the function:

zdiscgo_actor (zsock_t *pipe, void *args)
    zdiscgo_t * self = zdiscgo_new (pipe, args);
    if (!self)
        return;          //  Interrupted

    //  Signal actor successfully initiated
    zsock_signal (self->pipe, 0);

    while (!self->terminated) {
        zsock_t *which = (zsock_t *) zpoller_wait (self->poller, 0);
        if (which == self->pipe)
            zdiscgo_recv_api (self);
       //  Add other sockets when you need them.
    zdiscgo_destroy (&self);

This function constructs an instance of zdiscgo_t and then goes into a while loop. It waits for command messages from another thread, and passes those commands to zdiscgo_recv_api, which contains the callbacks for each command. Let’s take a look at what happens behind the scenes of the int rc = zdiscgo_load_plugin (zdiscgo, "./go/libmockdiscgo.so") call:

zdiscgo_load_plugin (zactor_t *self, char *path) {
    zstr_sendx (self, "CONFIGURE", path, NULL);
    int rc;
    zsock_recv (self, "i", &rc);
    return rc;

zstr_sendx can accept either a zsock_t * or a zactor_t * as an argument, and a variadic list of strings, that it will send as a multi-frame ZeroMQ message (one frame per argument). NULL being passed as the last argument tells the function there are no more frames.

As explained above, the zdiscgo_actor implementation running in another thread will pass the receive socket to the zdiscgo_recv_api for handling. Here is the code which handles the CONFIGURE command:

    if (streq (command, "CONFIGURE")) {
        if (self->verbose)
            zsys_debug ("received 'CONFIGURE' command");

        char *libpath = zmsg_popstr (request);
        int rc; 
        self->plugin = zdiscgoplugin_new (libpath);
        if (self->plugin) {
            rc = 0;
            if (self->verbose)
                zsys_debug ("loaded plugin: '%s'", libpath);
        else {
            rc = -1;
            if (self->verbose)
                zsys_error ("could not load plugin: '%s'", libpath);

        zsock_send (self->pipe, "i", rc);
  1. The path argument is popped out of the received message.
  2. It is passed to zdiscgoplugin_new, which is responsible for loading the go plugin.
  3. A reply message is sent, containing a single frame with either a 0 on success or -1 on failure.

The rest of the commands are implemented in a similar fashion.

Next Steps

I’m currently using this library in a tool at work that uses Marathon service discovery as described at the start of this article. Here’s a snippet of code showing it in use:

    zactor_t *zdiscgo = zactor_new (zdiscgo_actor, NULL);
    zdiscgo_verbose (zdiscgo);
    rc = zdiscgo_load_plugin (zdiscgo, lib);
    if (rc != 0)
        return 1;

    char *endpoints = zdiscgo_discover (zdiscgo, "url", "key");
    if (verbose)
        zsys_debug ("attaching socket to %s", endpoints);
    // connect the socket
    rc = zsock_attach (client, endpoints, false);
    if (rc == -1) {
        zsys_error ("zsock_attach failed");
        zsock_destroy (&client);
        return 1;

I hope to soon experiment with a couple of other service discovery systems, and tweak things as I find usability issues and better ways to do things. I’ve released the code to the ZeroMQ organization under the Mozilla Public License. While the problem it solves is fairly esoteric, if it solves a problem for you, please let me know, and feel free to contribute!

26 Sep 2015, 16:57

Problem: I want to use ZeroMQ with Go

Solution: GoCZMQ


ZeroMQ is a distributed messaging library focused on efficiency and simplicity. CZMQ is a high level C binding for ZeroMQ. It provides a clean API across multiple versions of ZeroMQ that additionally provides a suite of useful services. I have been using ZeroMQ and CZMQ in projects for several years and occassionally contributing to CZMQ and other ZeroMQ related projects. When I first started experimenting with Go, I felt its focus on simple concurrency was a good match with the guiding philosophy behind ZeroMQ.

While Pebbe’s Go bindings for ZeroMQ were available at the time and looked to be high quality, I decided writing a Go language binding for CZMQ would be a fun way to learn about Go’s cgo interface to C.

I decided to develop the bindings as a ZeroMQ organization project using the Collective Code Construction Contract process. While I had contributed to ZeroMQ projects using this process, I had never started a project from scratch using it. I started the project with the pull request “Problem: there is no documentation” on September 6th, 2014.

Since then, I picked up a regular collaborator (hi Luna!) and five other other contributors. The API is stable, and myself and others are building projects on top of it. The experience has been great, and there are many things I could discuss about what I’ve learned on the way - but for now, let’s just answer the question “how do I use it?”.


These instructions are for building GoCZMQ from git master. We’ll build it against CZMQ from git master, which is in turn built against ZeroMQ from git master. If you’re used to stable releases, this may seem odd. In ZeroMQ organization projects, we do not use development branches. We also don’t care much about traditional “stable” releases. Version tags are mostly to make packaging easier for OS maintainers who care about such things. This practice is codified in the C4.1 process documentation:

“The project SHALL have one branch (“master”) that always holds the latest in-progress version and SHOULD always build.”

“The project SHALL NOT use topic branches for any reason. Personal forks MAY use topic branches.”


First, we need libsodium. Sodium is a “modern, easy-to-use software library for encryption, decryption, signatures, password hashing and more”. ZeroMQ relies on it for encryption. It is likely there are dev packages for it for your OS, but here are the instructions for building it just in case:

wget https://download.libsodium.org/libsodium/releases/libsodium-1.0.3.tar.gz
wget https://download.libsodium.org/libsodium/releases/libsodium-1.0.3.tar.gz.sig
wget https://download.libsodium.org/jedi.gpg.asc
gpg --import jedi.gpg.asc
gpg --verify libsodium-1.0.3.tar.gz.sig libsodium-1.0.3.tar.gz
tar zxvf libsodium-1.0.3.tar.gz
cd libsodium-1.0.3
./configure; make check
sudo make install
sudo ldconfig

Next up, we build ZeroMQ with libsodium support:

git clone git@github.com:zeromq/libzmq.git
cd libzmq
./configure --with-libsodium
make check
sudo make install
sudo ldconfig

Now, we’ll build CZMQ against ZeroMQ. For an overview of what the CZMQ API provides, see the reference manual.

git clone git@github.com/zeromq/czmq.git
cd czmq
make check
sudo make install
sudo ldconfig

Now, finally we can build GoCZMQ itself:

mkdir -p $GOPATH/src/github.com/zeromq
cd $GOPATH/src/github.com/zeromq
git clone git@github.com/zeromq/goczmq.git
cd goczmq
go test -v
go install

For an overview of the GoCZMQ API, see the godoc.org

Hello World

No “getting started” post is complete without the requisite “hello world” example. In this example, we’ll connect a “push” socket to a “pull” socket in the same process, and send a message. This is probably not something we’d do in a real use case, but it’s a nice demonstration of how ZeroMQ is asyncronous.

package main

import (


func main() {
	push, err := goczmq.NewPush("tcp://")
	if err != nil {

	pull, err := goczmq.NewPull("tcp://")
	if err != nil {

	err = push.SendFrame([]byte("Hello World"), goczmq.FlagNone)
	if err != nil {

	frame, sz, err := pull.RecvFrame()
	if err != nil {

	fmt.Printf("We received a message of size %d\n", sz)
	fmt.Printf("The message was: '%s'\n", frame)


Let’s go over what is happening in this short example in detail. First, we create a ZMQ_PUSH socket:

 	push, err := goczmq.NewPush("tcp://")
	if err != nil {

The NewPush function does quite a bit for us under the hood. Since it’s the first socket we’ve created in this program, it creates a ZeroMQ context for us. Then, it creates the socket and starts trying to connect the socket to the endpoint.

Note that we’ve constructed a TCP socket that is connecting before we’ve created the bound socket! With ZeroMQ, the order of bind and connect calls does not matter. The socket will retry connecting in the background until it is successful.

Next, we’ll construct a ZMQ_PULL socket which will bind to the endpoint. After it is bound, the ZMQ_PUSH socket will successfully connect to it.

	pull, err := goczmq.NewPull("tcp://*:31337")
	if err != nil {

Now comes the exciting part - sending a message! In ZMTP (the protocol implemented by ZeroMQ), a “message” consists of one or more byte agnostic frames. We will send a message by using the SendFrame API call. SendFrame accepts a []byte followed by flags. In this case, FlagNone indicates there are no frames following this frame, so the single frame should be treated as a full message. If this frame were the first frame in multi-part message, we would use goczmq.FlagMore.

	err = push.SendFrame([]byte("Hello World"), goczmq.FlagNone)
	if err != nil {

An important detail to note here is that ZeroMQ is asyncronous and provides a buffer under the hood - so our program execution continues after the call to SendFrame even though the message has not yet been received. This is what allows this example to send and receive in the same thread.

Now it’s time to receive the message:

	frame, sz, err := pull.RecvFrame()
	if err != nil {

	fmt.Printf("We received a message of size %d\n", sz)
	fmt.Printf("The message was: '%s'\n", frame)

After we’re done with the sockets, we call sock.Destroy to clean up memory. While Go is a garbage collected language, we’re wrapping a C API, so we need to clean up after ourselves:


Next Steps

ZeroMQ is a large topic, and ZeroMQ combined with Go doubly so. I’ll be working on a series of articles covering the usage of interesting parts of the GoCZMQ API as well as lessons learned from working on the project.


20 Sep 2015, 07:54

Problem: I Want To Write To Redis From Rsyslog

Solution: Use the omhiredis output plugin.

I recently saw Radu Gheorghe express an interest in feeding Logstash from Rsyslog via a Redis queue. I realized that I’d neglected the omhiredis plugin since I’d original written it as a proof of concept. This seemed like a good opportunity to revisit the plugin and start fixing it up. For the rsyslog 8.13 release I added support for LPUSH and PUBLISH.



Configure rsyslog with omhiredis support:

~/git/rsyslog> ./configure --enable-omhiredis <other options>
~/git/rsyslog> make
~/git/rsyslog> sudo make install


Queue Mode

In queue mode, omhiredis will LPUSH each message into a Redis list stored at the defined key. You may define an optional template using the “template” parameter. If a template is not defined, the action will default to RSYSLOG_ForwardFormat.



Here is an example of RPOPing a log line from Redis that was inserted using the above action:

> redis-cli> RPOP my_queue

"<46>2015-09-17T10:54:50.080252-04:00 myhost rsyslogd: [origin software=\"rsyslogd\" swVersion=\"8.13.0.master\" x-pid=\"6452\" x-info=\"http://www.rsyslog.com\"] start"> 

Publish Mode

In publish mode, omhiredis will PUBLISH each message onto a Redis channel stored at the defined key. Like queue mode, you may define a template using the “template” parameter, and if not set the template defaults to RSYSLOG_ForwardFormat.



Here is an example of subscribing to the channel defined in the above template, and receiving a published message from it:

> redis-cli> subscribe my_channel

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "my_channel"

3) (integer) 1

1) "message"

2) "my_channel"

3) "<46>2015-09-17T10:55:44.486416-04:00 myhost rsyslogd-pstats: {\"name\":\"imuxsock\",\"origin\":\"imuxsock\",\"submitted\":0,\"ratelimit.discarded\":0,\"ratelimit.numratelimiters\":0}"

Template Mode

In template mode, omhiredis will send the message constructed by the template directly to Redis as a command. Originally, this was the only mode the plugin supported. Please note that there is an outstanding bug in this mode - it will not properly handle commands with spaces in the message payload. For example, manually constructing an LPUSH of a full message using template mode will not work properly. I hope to find the time to fix this in the future. Pull requests are of course accepted!

Here’s an example of keeping a tally of the number of messages seen by program name. Note that mode is not set, as it’s the default mode for this module. Additionally, there’s a config parsing bug that is triggered if you explicitly set it, which will be fixed in 8.14.


  string="HINCRBY progcount %programname% 1"


Here, we take a look at the counts stored in Redis:

> redis-cli> HGETALL progcount
1) "rsyslogd"
2) "35"
3) "rsyslogd-pstats"
4) "4302"

Pipelining with queue.dequeuebatchsize

The omhiredis plugin supports pipelining using Rsyslog queuing with the queue.dequeuebatchsize. Note that the plugin does not currently check for errors in the replies. If the plugin becomes something people are seriously interested in, error handling should definitely be added. For those interested in internals, here’s the current end transaction block in the code:

    dbgprintf("omhiredis: endTransaction called\n");
    int i;
    pWrkrData->replies = malloc ( sizeof ( redisReply* ) * pWrkrData->count );
    for ( i = 0; i < pWrkrData->count; i++ ) {
        redisGetReply ( pWrkrData->conn, (void *)&pWrkrData->replies[i] );
        /*  TODO: add error checking here! */
        freeReplyObject ( pWrkrData->replies[i] );
    free ( pWrkrData->replies );

The rest of the source may be viewed on github.

Future Plans

I believe the next steps for this plugin should be:

  • Fixing “template” mode so that message payloads containing spaces work properly.
  • Error handling to prevent message loss on redis failure.
  • A complimentary “imhiredis” plugin that can receive from Redis