20 Sep 2015, 07:54

Problem: I Want To Write To Redis From Rsyslog

Solution: Use the omhiredis output plugin.

I recently saw Radu Gheorghe express an interest in feeding Logstash from Rsyslog via a Redis queue. I realized that I’d neglected the omhiredis plugin since I’d original written it as a proof of concept. This seemed like a good opportunity to revisit the plugin and start fixing it up. For the rsyslog 8.13 release I added support for LPUSH and PUBLISH.

Building

Requirements

Configure rsyslog with omhiredis support:

~/git/rsyslog> ./configure --enable-omhiredis <other options>
~/git/rsyslog> make
~/git/rsyslog> sudo make install

Usage

Queue Mode

In queue mode, omhiredis will LPUSH each message into a Redis list stored at the defined key. You may define an optional template using the “template” parameter. If a template is not defined, the action will default to RSYSLOG_ForwardFormat.

module(load="omhiredis")

action(
  name="push_redis"
  type="omhiredis"
  mode="queue"
  key="testqueue"
)

Here is an example of RPOPing a log line from Redis that was inserted using the above action:

> redis-cli 
127.0.0.1:6379> RPOP my_queue

"<46>2015-09-17T10:54:50.080252-04:00 myhost rsyslogd: [origin software=\"rsyslogd\" swVersion=\"8.13.0.master\" x-pid=\"6452\" x-info=\"http://www.rsyslog.com\"] start"
127.0.0.1:6379> 

Publish Mode

In publish mode, omhiredis will PUBLISH each message onto a Redis channel stored at the defined key. Like queue mode, you may define a template using the “template” parameter, and if not set the template defaults to RSYSLOG_ForwardFormat.

module(load="omhiredis")

action(
  name="push_redis"
  type="omhiredis"
  mode="queue"
  key="testqueue"
)

Here is an example of subscribing to the channel defined in the above template, and receiving a published message from it:

> redis-cli 
127.0.0.1:6379> subscribe my_channel

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "my_channel"

3) (integer) 1

1) "message"

2) "my_channel"

3) "<46>2015-09-17T10:55:44.486416-04:00 myhost rsyslogd-pstats: {\"name\":\"imuxsock\",\"origin\":\"imuxsock\",\"submitted\":0,\"ratelimit.discarded\":0,\"ratelimit.numratelimiters\":0}"

Template Mode

In template mode, omhiredis will send the message constructed by the template directly to Redis as a command. Originally, this was the only mode the plugin supported. Please note that there is an outstanding bug in this mode - it will not properly handle commands with spaces in the message payload. For example, manually constructing an LPUSH of a full message using template mode will not work properly. I hope to find the time to fix this in the future. Pull requests are of course accepted!

Here’s an example of keeping a tally of the number of messages seen by program name. Note that mode is not set, as it’s the default mode for this module. Additionally, there’s a config parsing bug that is triggered if you explicitly set it, which will be fixed in 8.14.

module(load="omhiredis")

template(
  name="program_count_tmpl"
  type="string"
  string="HINCRBY progcount %programname% 1"
)

action(
  name="count_programs"
  server="my-redis-server.example.com"
  port="6379"
  type="omhiredis"
  template="program_count_tmpl"
)

Here, we take a look at the counts stored in Redis:

> redis-cli 
127.0.0.1:6379> HGETALL progcount
1) "rsyslogd"
2) "35"
3) "rsyslogd-pstats"
4) "4302"

Pipelining with queue.dequeuebatchsize

The omhiredis plugin supports pipelining using Rsyslog queuing with the queue.dequeuebatchsize. Note that the plugin does not currently check for errors in the replies. If the plugin becomes something people are seriously interested in, error handling should definitely be added. For those interested in internals, here’s the current end transaction block in the code:

BEGINendTransaction
CODESTARTendTransaction
    dbgprintf("omhiredis: endTransaction called\n");
    int i;
    pWrkrData->replies = malloc ( sizeof ( redisReply* ) * pWrkrData->count );
    for ( i = 0; i < pWrkrData->count; i++ ) {
        redisGetReply ( pWrkrData->conn, (void *)&pWrkrData->replies[i] );
        /*  TODO: add error checking here! */
        freeReplyObject ( pWrkrData->replies[i] );
    }
    free ( pWrkrData->replies );
ENDendTransaction

The rest of the source may be viewed on github.

Future Plans

I believe the next steps for this plugin should be:

  • Fixing “template” mode so that message payloads containing spaces work properly.
  • Error handling to prevent message loss on redis failure.
  • A complimentary “imhiredis” plugin that can receive from Redis