16 Feb 2011, 09:10

PostgreSQL 9, Listen / Notify, and Jruby.

Share

The LISTEN / NOTIFY functionality of PostgreSQL received a large boost to functionality in the 9.0 release. Since I’ve lately been experimenting with JRuby, I decided to write up some example code of utilizing LISTEN / NOTIFY via JDBC.

LISTEN / NOTIFY basically provides a publish / subscribe message bus within PostgreSQL. Clients can register as a subscriber on a channel, and will receive all notifications sent to that channel.

9.0 added a “payload” option to the notifier - a notification can contain an optional text string.

I worked up a quick test of using this functionality in JRuby.

The example sets up the following table in a local PostgreSQL 9.0 database:

CREATE TABLE watched_table (
  id              SERIAL PRIMARY KEY,
  value           INT,
  date_updated    TIMESTAMP NOT NULL DEFAULT now()
);

It then sets up a trigger, that upon insert, sends a notify event:

CREATE FUNCTION notify_trigger() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', TG_TABLE_NAME || ',id,' || NEW.id );
  RETURN new;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER watched_table_trigger AFTER INSERT ON watched_table
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();

In the example, the first thread inserts into this table every three seconds:

def insert_thread(url)
  insert_conn = DriverManager.get_connection(url)
  cnt = 0 
  while cnt < 10
    stmt = insert_conn.create_statement
    stmt.execute("INSERT INTO watched_table (value) VALUES (1)")
    cnt = cnt + 1 
    sleep 3
  end 
end

The listener then connects, and polls PostgreSQL every second to see if there are new notifications. If there are, the id of the inserted record is sent in the notification, and the listener thread retrieves the record:

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)
  
  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  while true
    sleep 1
    puts 'polling...'

    notifications = listen_conn.get_notifications || []

    notifications.each do |notification|
      unless notification.nil?
        puts "NOTIFICATION ------------"
        puts "pid: #{notification.pid}" 
        puts "name: #{notification.name}"
        puts "param: #{notification.parameter}"
        puts "-------------------------"

        id = notification.parameter.split(',').last
        puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}"
        stmt = listen_conn.create_statement
        rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}")

        while rs.next
          puts "id:\t#{rs.get_int(1)}"
          puts "value:\t#{rs.get_int(2)}"
          puts "record_time:\t#{rs.get_timestamp(3)}"
        end

        puts "-------------------------"

        stmt.close
        rs.close
      end
    end
  end
end

Here’s the code of the entire example:

require 'rubygems'
require 'bundler/setup'
require 'java'

$LOAD_PATH << 'vendor/jars/'
require 'postgresql-9.0-801.jdbc3.jar'

# set up our database connection to the example database...
java_import java.sql.DriverManager
DriverManager.register_driver(org.postgresql.Driver.new)
url = "jdbc:postgresql://localhost/listen_notify_poller"

def insert_thread(url)
  insert_conn = DriverManager.get_connection(url)
  cnt = 0
  while cnt < 10
    stmt = insert_conn.create_statement
    stmt.execute("INSERT INTO watched_table (value) VALUES (1)")
    cnt = cnt + 1
    sleep 3
  end
end

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)
  
  # register our client as a listner on the "watchers" channel....
  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  # check for new notifications once a second...
  while true
    sleep 1
    puts 'polling...'

    # check for notifications
    notifications = listen_conn.get_notifications || []

    # if there are notifications, display a little info on them
    notifications.each do |notification|
      unless notification.nil?
        puts "NOTIFICATION ------------"
        puts "pid: #{notification.pid}" 
        puts "name: #{notification.name}" 
        puts "param: #{notification.parameter}" 
        puts "-------------------------"

        # retrieve the inserted record thisnotification was for.
        id = notification.parameter.split(',').last
        puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}"
        stmt = listen_conn.create_statement
        rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}")

        while rs.next
          puts "id:\t#{rs.get_int(1)}"
          puts "value:\t#{rs.get_int(2)}"
          puts "record_time:\t#{rs.get_timestamp(3)}"
        end
      
        puts "-------------------------"
      
        stmt.close
        rs.close
      end
    end
  end
end

insert_thread = Thread.new{insert_thread(url)}
listen_thread = Thread.new{listen_thread(url)}

listen_thread.join
insert_thread.join