Введение
Бывают специфичные кейсы, когда надо отслеживать изменение данных в таблицах БД и передавать их во внешние системы, а сделать это через отдлеьные серверы не представляется возможным. В таком случае средствами PostgreSQL можно генерировтать эти события и с помощью механизма оповещений отлавливать их во внешнем сервисе (например внешний журнал событий).
Подготовка стенда
Для демонстрации создадим базы:
createdb pubsub_test
Далее создадим тестовую таблицу:
CREATE TABLE price (
id serial PRIMARY KEY,
lot VARCHAR (50),
price float
)
Изменение отслеживание изменения данных в таблице
Допустим что надо отслеживать добавление, изменени и удаление данных в заданной таблице и складывать их в формате json в определенный журнал.
Для отслеживания изменения данных в таблице можно использовать триггеры.
Чтобы реализовать данную возможность нужно создать таблицу журнала изменений:
-- создаем журнал
CREATE TABLE log (
ts timestamp without time zone,
value json
)
Далее необходимо создать функцию, которую будет вызывать триггер:
-- создаем функцию отслеживания изменений
CREATE OR REPLACE FUNCTION log_change_process() RETURNS TRIGGER
AS $log$
DECLARE
event json;
BEGIN
IF (TG_OP = 'DELETE') THEN
event = json_build_object('id',OLD.id,'event',TG_OP)::varchar;
ELSIF (TG_OP = 'UPDATE') THEN
event = json_build_object('id',OLD.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
ELSIF (TG_OP = 'INSERT') THEN
event = json_build_object('id',NEW.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
END IF;
INSERT INTO log VALUES (now(), event);
RETURN NULL; -- возвращаемое значение для триггера AFTER игнорируется
END;
$log$
LANGUAGE plpgsql;
В данной функции используются служебные переменные, доступные в триггерных функция и содержат они следующее:
TG_OP
- cтрока, содержащая значение типа опреации (INSERT, UPDATE, DELETE или TRUNCATE)OLD
- старые данные изменяемой записиNEW
- новые данные изменяемой записи
Подробней про эти переменные можно прочитать в документации по триггерным функциям.
Далее создаем триггер, который для каждой строки будет вызывать выше описанную функцию:
-- создаем триггер для вызова функции отслеживания изменений
CREATE TRIGGER change_data_monitoring
AFTER INSERT OR UPDATE OR DELETE ON price
FOR EACH ROW
EXECUTE FUNCTION log_change_process();
Теперь при вставке, изменении или удалении записи в таблице price
в журнале будут отображены эти действия:
pubsub_test=# insert into price (lot, price) values ('test', 10.0);
INSERT 0 1
pubsub_test=# select * from log;
ts | value
----------------------------+---------------------------------------------------------------
2020-08-24 15:17:06.158615 | {"id" : 14, "lot" : "test", "price" : 10, "event" : "INSERT"}
(1 row)
pubsub_test=# delete from price where id = 14;
DELETE 1
pubsub_test=# select * from log;
ts | value
----------------------------+---------------------------------------------------------------
2020-08-24 15:17:06.158615 | {"id" : 14, "lot" : "test", "price" : 10, "event" : "INSERT"}
2020-08-24 15:17:36.064112 | {"id" : 14, "event" : "DELETE"}
(2 rows)
Работа с Notify
После того как в таблицу записать данные у нас получилось можно попробывать использовать механизм Notify для работы с асинхронными сообщениями в PostgreSQL.
Для того чтобы оправлялись сообщения через Notify, необходимо подправить нашу процеруду следующим образом:
-- создаем функцию отслеживания изменений
CREATE OR REPLACE FUNCTION log_change_process() RETURNS TRIGGER
AS $log$
DECLARE
event json;
BEGIN
IF (TG_OP = 'DELETE') THEN
event = json_build_object('id',OLD.id,'event',TG_OP)::varchar;
ELSIF (TG_OP = 'UPDATE') THEN
event = json_build_object('id',OLD.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
ELSIF (TG_OP = 'INSERT') THEN
event = json_build_object('id',NEW.id,'lot',NEW.lot,'price',NEW.price,'event',TG_OP)::varchar;
END IF;
PERFORM pg_notify('events', event::text);
RETURN NULL; -- возвращаемое значение для триггера AFTER игнорируется
END;
$log$
LANGUAGE plpgsql;
После этого мы запускаем прослушивание событий:
LISTEN events;
Теперь после добавления данных мы увидим следующее:
pubsub_test=# insert into price (lot, price) values ('test', 10.0);
INSERT 0 1
Asynchronous notification "events" with payload "{"id" : 15, "lot" : "test", "price" : 10, "event" : "INSERT"}" received from server process with PID 8360.
Соответственно триггер и notify работают корректно.
Сервис на Go для генерации событий
Заключительным шагом будет создание сервиса на Go, который сможет обрабатывать сообщения из канала events
.
В библиотеке lib/pq есть штатный функционал для работы с механизмом notify. Код будет следующий:
package main
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/lib/pq"
)
func waitForNotification(l *pq.Listener) {
for {
select {
case n := <-l.Notify:
fmt.Println("Получены данные из канала [", n.Channel, "] :")
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t"); err != nil {
fmt.Println("Error processing JSON: ", err)
return
}
fmt.Println(string(prettyJSON.Bytes()))
return
case <-time.After(90 * time.Second):
fmt.Println("Не было данных 90 секунд, проверка соединения")
go func() {
l.Ping()
}()
return
}
}
}
func main() {
var conninfo string = "dbname=pubsub_test_outer user=postgres"
if _, err := sql.Open("postgres", conninfo); err != nil {
panic(err)
}
// открываем чтение notify канала
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
listener := pq.NewListener(conninfo, 10*time.Second, time.Minute, reportProblem)
if err = listener.Listen("events"); err != nil {
panic(err)
}
fmt.Println("Старт мониторинга PostgreSQL...")
for {
waitForNotification(listener)
}
}
В данном листинге функция waitForNotification
отвечает за обработку данных которые пришли из канала events
, а если данных не было в течении определенного интервала, то проверяется соединение.
В результате можно увидеть следующее:

Как видно из скриншота данные добавленные в таблицу через запрос успешно отображаются в сервисе и обрабатываются.
Заключение
В результате получилась простенькая Event Sourcing система для обработки событий в которой события генерятся автоматически при изменении записей в БД.
Такая система может выспупать переходным этапом от старого монолита к событийной микросервесной архитектуре или же использоваться в различных интеграционных проектов для минимизации числа компонентов.