DataBase Foreign Data Wrapper

有时候我们需要将多个数据源的 DataBase 放在一个地方,最 Naive 的方法就是把其他 DataBase 的数据备份出来,再全部导入到一个 DataBase,但是这样比较麻烦,而且当数据库很大时也会比较耗时。这时候使用 FDW 就非常方便了。FDW 全称 Foreign Data Wrapper,这里有一些基本的介绍:Foreign data wrappers - PostgreSQL wiki,FDW 非常简单且效果不错,下面逐步介绍(基于 postgres)基本操作和注意事项。

场景描述

当我们需要从一个 DB(B)获取另一个 DB(A)的表时(常见于各类 BI 需求),一种方法是把 A 的数据完整备份到 B(这里不涉及主从问题),这样做不是不可以,但是比较麻烦;还有一种更为直接的方法就是使用 FDW 插件连接两个 DB,然后就可以直接通过 B 获取 A 的数据。

当然,实际中我们往往还会对从 A 映射过来的数据 Materialized Views 化,简单来说就是对 Table 或 View 进行本地化处理(物理存储)以提升访问速度,唯一需要注意的是需要定期 REFRESH 才能将源 DB 的数据更新,不过这可以通过设置一个定时任务非常简单地实现。

执行步骤

接下来就是这一系列步骤了,包括使用 FDW 映射到 Materialized Views 再到定时任务整个过程的处理。为此,我们事先假设好一个场景,下面的例子将以场景中指定的名字演示。

假设我们有个文本生成的应用:TextGeneration,应用本身只有一张表:text_generation,其他的表如 auth_*django_* 是 Django 自动创建的表,DB 名字为 tg

然后我们还有一个应用:BI,DB 名字为 bi-data,现在想在 BI 的 DB 中使用 TextGeneration 的那张表。

FDW

Step 1: Install the postgres_fdw extension

1
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

Step 2: Create a server

1
2
DROP SERVER IF EXISTS tg_server CASCADE; # prevent duplicated
CREATE SERVER tg_server FOREIGH DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'tg', port '5432');

Step3: Create the user mapping

1
CREATE USER MAPPING FOR bi_read_tg_only_user SERVER tg_server OPTIONS (user 'tg_read_only_user', password 'xxxxxxx');

这里需要说明一下如何创建用户:

  • 源 DB 创建为目标 DB 创建一个只读账号,比如 tg_read_only_user
  • 目标 DB 创建一个读写账号,针对从源 DB 映射过来的 schema 只读,或者创建一个只读账号比如 bi_read_tg_only_user
  • 使用目标 DB 的账号去 map 源 DB 为目标 DB 创建的只读账号,即使用 bi_read_only_user 去 map tg_read_only_user
  • 使用目标 DB 的读写或只读账号访问数据,即使用 bi_read_only_user 访问数据

如果两个 DB 在同一个数据库实例里面,那只要注意给不同用户赋权即可。或者简单起见可以只创建一个用户,分别只读源 DB,读写或只读目标 DB

需要特别说明的两点:

  • 有读写权限的用户操作从其他源 DB 映射过来的 schema 时,源 DB 的数据也会发生变化,安全起见建议创建一个只读账号。
  • 如果在目标 DB 中创建了只读用户,默认只有 public 的权限,需要注意给你想要给的 schema 的权限(就是你想读取的从其他 DB 映射过来的 schema)

关于如何创建只读用户以及如何给不同的 schema 权限赋权,可以参考下这篇文章

Step4: Create new schema to access source DB

1
2
DROP SCHEMA IF EXISTS tg_schema; # prevent duplicated
CREATE SCHEMA tg_schema;

Step5: Import schema of source DB

1
2
3
4
IMPORT FOREIGN SCHEMA public FROM SERVER tg_server INTO tg_schema;
# or only import some tables
IMPORT FOREIGN SCHEMA public LIMIT TO ( text_generation )
FROM SERVER tg_server INTO tg_schema;

这里我们第一行命令是把源 DB 的 public schema 导入过来了,你也可以根据需要导入其他 schema。第二行则是只导入指定表格的例子。

到这里我们的 FDW 操作其实已经完成了,此时,你可以在目标 DB 中通过 schema_name.table_name 使用源 DB 的数据啦,注意 table name 是源 DB 的表。

1
SELECT * from tg_schema.text_generation;

Materialized Views

Foreign tables 只是真实 data 的一个代理,为了独立使用这些数据,实际中最简单的复制数据的方法就是使用 materialized views。

Step 1: Create materialized view

1
2
CREATE MATERIALIZED VIEW IF NOT EXISTS tgmv_text_generation AS
SELECT * FROM tg_schema.text_generation; # 这里还可以执行其他 sql

这样就把映射过来的 tg_schema 中的 text_generation 表(也就是源 DB 中的那张表)转为 materialized view 了,我们把 view 命名为 tgmv_text_generation。当然,在 materialized 时可以执行许多其他有用的 SQL 命令,比如 COUNT,GROUP BY 等。

这时就可以直接使用 tgmv_text_generation 了,和上面从 tg_schema.text_generation 中获取的数据应该是一致的。

1
SELECT * from tgmv_text_generation;

那么,当源 DB 的数据发生变化时怎么更新到我们的 materialized views 中呢?这时候就需要进行 REFRESH。

Step2: Refresh data

1
REFRESH MATERIALIZED VIEW tgmv_text_generation;

本来到这里就可以了,但是这里有个 EXCLUSIVE lock 的问题,会导致刷新速度巨慢,详见 Materialized Views 的参考文献。

解决方式是使用 CONCURRENTLY,不过需要定义至少一个 UNIQUE index:

1
2
CREATE UNIQUE INDEX IF NOT EXISTS id ON tgmv_text_generation (id);
REFRESH MATERIALIZED VIEW CONCURRENTLY tgmv_text_generation;

这里的 id 就是源 DB text_generation 表里的 id 列。

批量脚本

实际应用中我们肯定不止一张表格,不可能手动一步一步去执行这些操作,这时候可以通过一个 SQL 命令脚本完成,还以上面的案例为例。

Step 1: Create SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
-- create.sql
-- 映射和赋权可以单独登陆 psql 创建
DO $$
DECLARE r record;
BEGIN
-- 映射
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
DROP SERVER IF EXISTS tg_server CASCADE;
CREATE SERVER tg_server FOREIGH DATA WRAPPER postgres_fdw OPTIONS (host 'xxx.xxx.xxx.xxx', dbname 'tg', port '5432');
CREATE USER MAPPING FOR bi_read_tg_only_user SERVER tg_server OPTIONS (user 'tg_read_only_user', password 'xxxxxxx');
DROP SCHEMA IF EXISTS tg_schema;
CREATE SCHEMA tg_schema;
IMPORT FOREIGN SCHEMA public FROM SERVER tg_server INTO tg_schema;

-- 赋权
GRANT USAGE ON SCHEMA tg_schema TO bi_read_tg_only_user;
GRANT SELECT ON ALL TABLES IN SCHEMA tg_schema TO bi_read_tg_only_user;
-- grant access to the new table automatically
ALTER DEFAULT PRIVILEGES IN SCHEMA tg_schema GRANT SELECT ON TABLES TO bi_read_tg_only_user;

-- 持久化
CREATE EXTENSION IF NOT EXISTS btree_gist;
FOR r IN SELECT tname FROM (VALUES
('auth_group'),
('auth_group_permissions'),
('auth_permission'),
('auth_user'),
('auth_user_groups'),
('auth_user_user_permissions'),
('django_admin_log'),
('django_content_type'),
('django_migrations'),
('django_session'),
('django_site'),
('text_generation')) AS x(tname)
LOOP
-- ignore django_* tables.
IF r.tname like 'django_%' THEN
RAISE NOTICE 'tgmv_% need not to process....', r.tname;
ELSE
EXECUTE format('CREATE MATERIALIZED VIEW IF NOT EXISTS tgmv_%s AS '
'SELECT * FROM tg_schema.%s',
r.tname, r.tname);
EXECUTE format('CREATE UNIQUE INDEX IF NOT EXISTS id ON tgmv_%s (id)',
r.tname);
-- 下面这句是刷新时用的,创建时可以不需要
EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY tgmv_%s', r.tname);
END IF;
END LOOP;
END$$

然后执行 SQL 命令就可以自动在 DB bi-data 中创建了:

1
$ psql -h [HOST] -p [PORT] -U bi_read_tg_only_user -d bi-data -a -f create.sql

Step 2: Refresh SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-- refresh.sql
SELECT id from tg_schema.auth_group LIMIT 1; -- 先重新连接 server
DO $$
DECLARE r record;
BEGIN
FOR r IN SELECT tname FROM (VALUES
('auth_group'),
('auth_group_permissions'),
('auth_permission'),
('auth_user'),
('auth_user_groups'),
('auth_user_user_permissions'),
('django_admin_log'),
('django_content_type'),
('django_migrations'),
('django_session'),
('django_site'),
('text_generation')) AS x(tname)
LOOP
-- ignore django_* tables.
IF r.tname like 'django_%' THEN
RAISE NOTICE 'tgmv_% need not to process....', r.tname;
ELSE
EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY tgmv_%s', r.tname);
END IF;
END LOOP;
END$$

脚本中我们把所有的表格都放进去了,实际可以只放自己需要的,这样就不需要条件判断了。

特别注意:在使用的过程中发现有时候会出现刷新不成功的情况,具体的报错是:ERROR: cannot set parameter “role” within security-restricted operation,这种情况猜测是因为 server 长时间没有连接导致的,可以在刷新之前 select schema 中的任意一张表(具体见上面的 sql)即可解决。

定时 REFRESH

实际应用中我们需要创建一个定时执行的脚本任务来批量刷新表格。当然首先推荐的就是 Crontab 了,因为它很简单而且很好用。

Step 1: Create bash script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# refres.sh
LOGFILE=/tmp/log.txt # absolute path, log file
echo "begin to refresh view..."
echo "=======================" >> $LOGFILE
date +%Y年%m月%d日%H点%M分%S秒 >> $LOGFILE
echo "开始刷新" >> $LOGFILE
echo "=======================" >> $LOGFILE
export PGPASSWORD="xxxxxxxx" # 这里使用了环境变量就不用输密码了
set -e
psql -h [HOST] -p [PORT] -U bi_read_tg_only_user -d bi-data -a -f /absolute/path/to/refresh.sql >> $LOGFILE 2>&1
echo "=======================" >> $LOGFILE
date +%Y年%m月%d日%H点%M分%S秒 >> $LOGFILE
echo "结束刷新" >> $LOGFILE
echo "=======================\n\n" >> $LOGFILE
echo "all views in bi-data have been refreshed..."

Step 2: Create crontab task

1
crontab -e

添加一条命令,比如我们要每天凌晨两点刷新:

1
0 2 * * * /bin/sh /absolute/path/to/refresh.sh

这样每天两点的时候就会自动执行 refresh.sh 并且把日志写入设置好的文件: tmp/log.txt

这一步很有可能会遇到不少坑,最主要的就是定时任务不生效。如果不幸有人遇到了,我建议大家按这里的指导去排查:linux - CronJob not running - Stack Overflow,相信我一定是哪里没搞对,这个东西绝对是可以用的,也要相信自己一定可以搞定。

当然,如果你实在是搞不定或者不喜欢用这个小工具,也可以使用 schedule 起一个独立的服务,专门做数据更新或备份。

参考文献

CHANGELOG

  • 191030 更新:增加刷新 bug 处理