04 Mar 2017, 14:43

Problem: I want to use Go service discovery clients with CZMQ

Share

Solution: zdiscgo

Background

At work, I have a tier of Rsyslog instances on an Apache Mesos cluster for two purposes: indexing logs into Elasticsearch via the Rsyslog omelasticsearch plugin, and publishing logs on ZeroMQ endpoints for people who wish to subscribe to logs from specific hosts and services. The rsyslog instances are scheduled to run on arbitrary hosts, and the ZeroMQ sockets are bound to random ports on the hosts the instances end up on. The ports themselves are labeled, so that they can be searched for using the scheduler’s task API.

The first client I used with these endpoints was a command line client that I wrote in Go, using GoCZMQ. It was fairly painless to use the Marathon scheduler’s API (Marathon is a scheduler that works with Mesos) via the go-marathon library to look up the current hosts and ports the ZeroMQ endpoints are exposed on and use that information to connect to them.

Recently, I started on another client written in straight C. I wanted to use the liblognorm log normalization library in this client, and did not want to deal with the overhead of using cgo to call the library from Go. That being said, I also did not want to go through the pain of writing marathon API calls in straight C - making HTTP requests and parsing JSON returns would be a lot of code.

As I was puzzling over which way I wanted to solve this problem, I stumbled on a great blog post from Vladmir Vivien, “Calling Go Functions from Other Languages”. Since I had plans to start experimenting with ZeroMQ + Kubernetes in addition to solving this specific problem, I decided to solve the problem of “calling Go service discovery libraries from C” in a general sense.

The end result is zdiscgo, a czmq zactor implementation that can load Go libraries at run time for service discovery. Let’s take a look at how this works!

Problem: I need a Go interface for my shared library

First I designed a minimal interface for the Go code. I’m using the term “interface” somewhat loosely here, as C does not know about Go interfaces and will just directly load the function. ZDiscgoDiscoverEndpoints(url, key string) *C.char seemed like enough to get started with. It accepts a URL for a service discovery API endpoint, and a key. It makes the assumption that they key will be used in some fashion to look up ZeroMQ endpoints via an API call to the service at the URL, and that a comma delimited list of ZeroMQ endpoints will be returned. The CZMQ ZeroMQ API has constructor functions for zsock_t * (a handle to a ZeroMQ socket) that can accepted a comma delimited list of endpoints, so the result can be passed directly to those functions.

Since C functions can only return one result, the function is expected to handle an error by simply returning an empty string. In the interest of not having to deal with passing references to Go data structures to C, the function is expected to return a *C.char. Since you need to import C in order to export a function to C anyway, I feel that this isn’t too onerous.

For testing, the zdisgco repo includes libmockdiscgo.go, a mock implementation that combines the passed in url and key and returns them:

package main

import "C"
import "fmt"

//export ZDiscgoDiscoverEndpoints
func ZDiscgoDiscoverEndpoints(url, key string) *C.char {
	return fmt.Sprintf("inproc://%s-%s", url, key)
}

func main() {}

This code can then be compiled with -buildmode=c-shared into a shared library that can be used from C:

go build -o libmockdiscgo.so -buildmode=c-shared libmockdiscgo.go 

The above will create both the libmockdiscgo.so shared library, and a C header. The next step was to write a minimal API to make using this from C as pleasant as possible.

Problem: I need a friendly C API to wrap my Go library

Now that I had the Go library worked out, I needed a C API for using it. zdiscgoplugin.c provides a simple C API for loading Go plugins that are written to the above specification. It is written in accordance with 21/CLASS, the style guide that is typically used for ZeroMQ organization projects. The selftest provides a good overview of how to use it:

    //  @selftest
    zdiscgoplugin_t *self = zdiscgoplugin_new ("./go/libmockdiscgo.so");
    assert (self);

    const char *endpoints = zdiscgoplugin_discover_endpoints (self, "url", "key");
    assert (streq ("inproc://url-key", endpoints));

    zdiscgoplugin_destroy (&self);
    //  @end
    printf ("OK\n");:1

Let’s take a look at how it works, starting with the zdiscgoplugin_t struct. This struct holds a handle to the dynamically loaded library, and a reference to our Go ZDiscgoDiscoverEndpoints function:

struct _zdiscgoplugin_t {
    void *handle;
    char * (*discover)(go_str, go_str);
};

The constructor accepts the path to a dynamic library and returns a pointer to the struct:

zdiscgoplugin_t *
zdiscgoplugin_new (char *libpath)

The first thing the constructor does is allocate memory for the struct. If there is an error allocating this memory, it returns a NULL. Unlike Go, C only supports a single return value for a function. A caller of this function is responsible for checking that the return is not NULL.

    zdiscgoplugin_t *self = (zdiscgoplugin_t *) zmalloc (sizeof (zdiscgoplugin_t));
    if (!self)
        return NULL;

dlopen loads the shared library file and returns a handle to the loaded object. On Linux this function is part of dlfcn.h, which defines functions for working with shared libraries.

    self->handle = dlopen (libpath, RTLD_NOW);

dlsym accepts a handle to a shared library and the name of symbol in that library, and returns the memory address of the symbol, which is then stored in the struct as a function pointer:

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-pedantic"
    self->discover = (char * (*)(go_str, go_str)) dlsym(self->handle, "DiscoverEndpoints");
#pragma GCC diagnostic pop 

    return self;
}

But what is all this #pragma stuff? As it turns out, dlsym returns an object pointer, and ISO C does not allow the conversion of an object pointer to a function pointer. Compiling this code with warnings enabled will result in a warning:

src/zdiscgoplugin.c:46:22: error: ISO C forbids conversion of object pointer to function pointer type [-Werror=pedantic]

I used a pragma to turn off pedantic warnings for this line of code.

The zdiscgo_discover_endpoints call accepts a zdiscgoplugin_t *, and two char *s (an url to a service discovery service, and the key to use for a lookup):

const char *
zdiscgoplugin_discover_endpoints (zdiscgoplugin_t *self, char *url, char *key)

The first thing the function does is convert the url and key to a go_str, which is a custom C type that is memory compatible with the Go string type. This type is defined in zdiscgoplugin.h:

typedef long long go_int;
typedef struct{const char *p; go_int len;} go_str;

The conversion itself is relatively straightforward:

    go_str discover_url = {url, strlen (url)};
    go_str discover_key = {key, strlen (key)};

Now, we can call the go function through reference held in self->discover:

    char *endpoints = self->discover (discover_url, discover_key);
    return endpoints;

The zdiscgoplugin_destroy function cleans up our memory when we are finished. In this case it does not really do much, but it makes for a nice complete API and will be handy in the future as we expand on things:

void
zdiscgoplugin_destroy (zdiscgoplugin_t **self_p)
{
    if (*self_p) {
        zdiscgoplugin_t *self = *self_p;
        //  Free class properties here
        //  Free object itself
        free (self);
        *self_p = NULL;
    }
}

Problem: I don’t want service discovery calls to block my application.

Making calls over the network to service discovery systems can take quite a bit of time. In order to allow asyncronous requests and replies, I wanted the ability to run the service discovery interface in its own thread. The CZMQ library provides a class for exactly this purpose: zactor. A zactor is similar in concept to a microservice, except it runs as a thread in your local process. Like a microservice, all communication to and from the thread is via protocol messages (as opposed to sharing memory between threads). Since ZeroMQ uses the same socket API for communicating between threads as it does for communicating over network transports, the code looks the same as it would if you were talking to a remove service.

The first thing I did was design a small command protocol for the actor. Each command in the command protocol is a command frame followed by 1 or more argument frames. These commands are handled internally by zdiscgo_recv_api.

First, the handler receives the full message in the form of a zmsg_t *. It pops the first frame off the message and checks the command.

The VERBOSE command sets verbose mode for the service. Debugging information will be sent to stdout. The command handling code simply sets a verbose flag in the zdiscgo_t struct.

The CONFIGURE command is a 2 frame command. The second frame contains the path to the Go shared library to load. The command handling code attempts to construct a zdiscgoplugin_t * using the supplied path. It returns a 0 on success and a -1 if there is an error.

The DISCOVER command is a 3 frame command. The second and third frames are the url and key used by ZDiscgoDiscoverEndpoints. It will call our Go library, and sends back a single frame containing a comma delimited list of endpoints on success or an empty string on failure.

These details are abstracted away from the user’s perspective. You don’t have to worry about dealing with threads or inter-thread communication. You just make a new instance of the class and use it.

    //  Create a zdiscgo instance. This will spin up a new OS level
    //  thread that will handle service discovery requests.

    zactor_t *zdiscgo = zactor_new (zdiscgo_actor, NULL);

    //  We communicate with the service discovery thread over
    //  a ZMQ_PAIR socket. You can pass the zdisgco instance
    //  to any CZMQ methods that accept zsock_t *. 
    //  Let's set the service to verbose mode.

    zdiscgo_verbose (zdiscgo);

    //  Next, let's configure the service by telling it to load 
    //  our go shared library. The zstr_sendx command will send
    //  multiple string frames. A NULL terminates the message.
    
    int rc = zdiscgo_load_plugin (zdiscgo, "./go/libmockdiscgo.so");
    assert (rc == 0);

    //  Now let's get some endpoints! We send a DISCOVER command
    //  that consists of the url of a service discovery service,
    //  and the identifer the service should use to find the 
    //  endpoints we want.

    char *endpoints = zdiscgo_discover (zdiscgo, "url", "key");

    //  Check that we have the correct response

    assert (streq (endpoints, "inproc://url-key"));

    //  Shut down the zdisgco instance and clean up memory.

    zactor_destroy (&zdiscgo);

Let’s take a deeper look at how all of this works, starting with zdiscgo_t

struct _zdiscgo_t {
    zsock_t *pipe;              //  Actor command pipe
    zpoller_t *poller;          //  Socket poller
    zdiscgoplugin_t *plugin;    //  zdiscgoplugin_t reference
    bool terminated;            //  Did caller ask us to quit?
    bool verbose;               //  Verbose logging enabled?
};
  • zsock_t *pipe holds the zeromq socket that is used to send and receive messages between threads. If you are a Go programmer, you can think of this as somewhat similar to a Go channel.
  • zsock_t *poller holds the poller that is used to watch for incoming messages on the socket.
  • zdiscoplugin_t *plugin holds the plugin, which manages communicating with the Go shared libary.
  • bool terminated is set to true when zactor_destroy is called and triggers shutdown / cleanup.
  • bool verbose is initially set to false, and is set to true by a call to zdiscgo_verbose.

Next up is the zactor function itself. The zactor constructor expects a function of type void (zactor_fn) (zsock_t *pipe, void *args):

    zactor_t *zdiscgo = zactor_new (zdiscgo_actor, NULL);

Here is my implementation of the function:

void
zdiscgo_actor (zsock_t *pipe, void *args)
{
    zdiscgo_t * self = zdiscgo_new (pipe, args);
    if (!self)
        return;          //  Interrupted

    //  Signal actor successfully initiated
    zsock_signal (self->pipe, 0);

    while (!self->terminated) {
        zsock_t *which = (zsock_t *) zpoller_wait (self->poller, 0);
        if (which == self->pipe)
            zdiscgo_recv_api (self);
       //  Add other sockets when you need them.
    }
    zdiscgo_destroy (&self);
}

This function constructs an instance of zdiscgo_t and then goes into a while loop. It waits for command messages from another thread, and passes those commands to zdiscgo_recv_api, which contains the callbacks for each command. Let’s take a look at what happens behind the scenes of the int rc = zdiscgo_load_plugin (zdiscgo, "./go/libmockdiscgo.so") call:

int 
zdiscgo_load_plugin (zactor_t *self, char *path) {
    zstr_sendx (self, "CONFIGURE", path, NULL);
    int rc;
    zsock_recv (self, "i", &rc);
    return rc;
}

zstr_sendx can accept either a zsock_t * or a zactor_t * as an argument, and a variadic list of strings, that it will send as a multi-frame ZeroMQ message (one frame per argument). NULL being passed as the last argument tells the function there are no more frames.

As explained above, the zdiscgo_actor implementation running in another thread will pass the receive socket to the zdiscgo_recv_api for handling. Here is the code which handles the CONFIGURE command:

    if (streq (command, "CONFIGURE")) {
        if (self->verbose)
            zsys_debug ("received 'CONFIGURE' command");

        char *libpath = zmsg_popstr (request);
        int rc; 
        self->plugin = zdiscgoplugin_new (libpath);
        if (self->plugin) {
            rc = 0;
            if (self->verbose)
                zsys_debug ("loaded plugin: '%s'", libpath);
        } 
        else {
            rc = -1;
            if (self->verbose)
                zsys_error ("could not load plugin: '%s'", libpath);
        }

        zsock_send (self->pipe, "i", rc);
    }
  1. The path argument is popped out of the received message.
  2. It is passed to zdiscgoplugin_new, which is responsible for loading the go plugin.
  3. A reply message is sent, containing a single frame with either a 0 on success or -1 on failure.

The rest of the commands are implemented in a similar fashion.

Next Steps

I’m currently using this library in a tool at work that uses Marathon service discovery as described at the start of this article. Here’s a snippet of code showing it in use:

    zactor_t *zdiscgo = zactor_new (zdiscgo_actor, NULL);
    zdiscgo_verbose (zdiscgo);
    rc = zdiscgo_load_plugin (zdiscgo, lib);
    if (rc != 0)
        return 1;

    char *endpoints = zdiscgo_discover (zdiscgo, "url", "key");
    if (verbose)
        zsys_debug ("attaching socket to %s", endpoints);
 
    // connect the socket
    rc = zsock_attach (client, endpoints, false);
    if (rc == -1) {
        zsys_error ("zsock_attach failed");
        zsock_destroy (&client);
        return 1;
    }

I hope to soon experiment with a couple of other service discovery systems, and tweak things as I find usability issues and better ways to do things. I’ve released the code to the ZeroMQ organization under the Mozilla Public License. While the problem it solves is fairly esoteric, if it solves a problem for you, please let me know, and feel free to contribute!