Генерация событий средствами PostgreSQL

Введение

Бывают специфичные кейсы, когда надо отслеживать изменение данных в таблицах БД и передавать их во внешние системы, а сделать это через отдлеьные серверы не представляется возможным. В таком случае средствами PostgreSQL можно генерировтать эти события и с помощью механизма оповещений отлавливать их во внешнем сервисе (например внешний журнал событий).

Подготовка стенда

Для демонстрации создадим базы:

createdb pubsub_test

Далее создадим тестовую таблицу:

CREATE TABLE price (
	id serial PRIMARY KEY,
	lot VARCHAR (50),
    price float
)

Изменение отслеживание изменения данных в таблице

Допустим что надо отслеживать добавление, изменени и удаление данных в заданной таблице и складывать их в формате json в определенный журнал.

Для отслеживания изменения данных в таблице можно использовать триггеры.

Чтобы реализовать данную возможность нужно создать таблицу журнала изменений:

-- создаем журнал
CREATE TABLE log (
    ts timestamp without time zone,
    value json
)

Далее необходимо создать функцию, которую будет вызывать триггер:

-- создаем функцию отслеживания изменений
CREATE OR REPLACE FUNCTION log_change_process() RETURNS TRIGGER 
AS $log$
    DECLARE
        event json;
    BEGIN
        IF (TG_OP = 'DELETE') THEN
            event = json_build_object('id',OLD.id,'event',TG_OP)::varchar;
        ELSIF (TG_OP = 'UPDATE') THEN
            event = json_build_object('id',OLD.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
        ELSIF (TG_OP = 'INSERT') THEN
            event = json_build_object('id',NEW.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
        END IF;
        
        INSERT INTO log VALUES (now(), event);

        RETURN NULL; -- возвращаемое значение для триггера AFTER игнорируется
    END;
$log$
LANGUAGE plpgsql;

В данной функции используются служебные переменные, доступные в триггерных функция и содержат они следующее:

  • TG_OP - cтрока, содержащая значение типа опреации (INSERT, UPDATE, DELETE или TRUNCATE)
  • OLD - старые данные изменяемой записи
  • NEW - новые данные изменяемой записи

Подробней про эти переменные можно прочитать в документации по триггерным функциям.

Далее создаем триггер, который для каждой строки будет вызывать выше описанную функцию:

-- создаем триггер для вызова функции отслеживания изменений
CREATE TRIGGER change_data_monitoring
    AFTER INSERT OR UPDATE OR DELETE ON price
    FOR EACH ROW
    EXECUTE FUNCTION log_change_process();

Теперь при вставке, изменении или удалении записи в таблице price в журнале будут отображены эти действия:

pubsub_test=# insert into price (lot, price) values ('test', 10.0);
INSERT 0 1
pubsub_test=# select * from log;
             ts             |                             value
----------------------------+---------------------------------------------------------------
 2020-08-24 15:17:06.158615 | {"id" : 14, "lot" : "test", "price" : 10, "event" : "INSERT"}
(1 row)

pubsub_test=# delete from price where id = 14;
DELETE 1
pubsub_test=# select * from log;
             ts             |                             value
----------------------------+---------------------------------------------------------------
 2020-08-24 15:17:06.158615 | {"id" : 14, "lot" : "test", "price" : 10, "event" : "INSERT"}
 2020-08-24 15:17:36.064112 | {"id" : 14, "event" : "DELETE"}
(2 rows)

Работа с Notify

После того как в таблицу записать данные у нас получилось можно попробывать использовать механизм Notify для работы с асинхронными сообщениями в PostgreSQL.

Для того чтобы оправлялись сообщения через Notify, необходимо подправить нашу процеруду следующим образом:

-- создаем функцию отслеживания изменений
CREATE OR REPLACE FUNCTION log_change_process() RETURNS TRIGGER 
AS $log$
    DECLARE
        event json;
    BEGIN
        IF (TG_OP = 'DELETE') THEN
            event = json_build_object('id',OLD.id,'event',TG_OP)::varchar;
        ELSIF (TG_OP = 'UPDATE') THEN
            event = json_build_object('id',OLD.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
        ELSIF (TG_OP = 'INSERT') THEN
            event = json_build_object('id',NEW.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
        END IF;
        
        PERFORM pg_notify('events', event::text);

        RETURN NULL; -- возвращаемое значение для триггера AFTER игнорируется
    END;
$log$
LANGUAGE plpgsql;

После этого мы запускаем прослушивание событий:

LISTEN events;

Теперь после добавления данных мы увидим следующее:

pubsub_test=# insert into price (lot, price) values ('test', 10.0);
INSERT 0 1
Asynchronous notification "events" with payload "{"id" : 15, "lot" : "test", "price" : 10, "event" : "INSERT"}" received from server process with PID 8360.

Соответственно триггер и notify работают корректно.

Сервис на Go для генерации событий

Заключительным шагом будет создание сервиса на Go, который сможет обрабатывать сообщения из канала events.

В библиотеке lib/pq есть штатный функционал для работы с механизмом notify. Код будет следующий:

package main

import (
	"bytes"
	"database/sql"
	"encoding/json"
	"fmt"
	"time"
	"github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
	for {
		select {
		case n := <-l.Notify:
			fmt.Println("Получены данные из канала [", n.Channel, "] :")
			
			var prettyJSON bytes.Buffer
			if err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t"); err != nil {
				fmt.Println("Error processing JSON: ", err)
				return
			}
			fmt.Println(string(prettyJSON.Bytes()))
			return
		case <-time.After(90 * time.Second):
			fmt.Println("Не было данных 90 секунд, проверка соединения")
			go func() {
				l.Ping()
			}()
			return
		}
	}
}

func main() {
	var conninfo string = "dbname=pubsub_test_outer user=postgres"

	if _, err := sql.Open("postgres", conninfo); err != nil {
		panic(err)
	}

    // открываем чтение notify канала
	reportProblem := func(ev pq.ListenerEventType, err error) {
		if err != nil {
			fmt.Println(err.Error())
		}
	}
    
	listener := pq.NewListener(conninfo, 10*time.Second, time.Minute, reportProblem)
	if err = listener.Listen("events"); err != nil {
		panic(err)
	}

	fmt.Println("Старт мониторинга PostgreSQL...")
	for {
		waitForNotification(listener)
	}
}

В данном листинге функция waitForNotification отвечает за обработку данных которые пришли из канала events, а если данных не было в течении определенного интервала, то проверяется соединение.

В результате можно увидеть следующее:

полигон с подложкой

Как видно из скриншота данные добавленные в таблицу через запрос успешно отображаются в сервисе и обрабатываются.

Заключение

В результате получилась простенькая Event Sourcing система для обработки событий в которой события генерятся автоматически при изменении записей в БД.

Такая система может выспупать переходным этапом от старого монолита к событийной микросервесной архитектуре или же использоваться в различных интеграционных проектов для минимизации числа компонентов.

Ссылки

  1. Триггерные функции
 
comments powered by Disqus