新聞中心
數(shù)據(jù)審計(jì)是一個(gè)跟蹤表內(nèi)容隨時(shí)間變化的系統(tǒng),在現(xiàn)在安全合規(guī)方面數(shù)據(jù)審計(jì)是必須要的功能之一。PostgreSQL作為一個(gè)強(qiáng)大現(xiàn)代的開源關(guān)系數(shù)據(jù)庫,也有一個(gè)相關(guān)插件PGAudit可以提供審計(jì)功能。

關(guān)于PGAudit插件以后有機(jī)會可以詳細(xì)介紹,本文我們介紹一個(gè)簡單SQL語句實(shí)現(xiàn)的數(shù)據(jù)集審計(jì)功能。
概述
最終實(shí)現(xiàn)效果為:
創(chuàng)建一個(gè)示例表:
create extension supa_audit cascade;
create table public.account(
id int primary key,
name text not null
);
啟用審計(jì):
select audit.enable_tracking('public.account'::regclass);增改刪操作:
insert into public.account(id, name)
values (1, 'Chongchong');
update public.account set name = 'CC' where id = 1;
delete from public.account where id = 1;
清空表:
truncate table public.account;
查看審計(jì)日志:
select * from audit.record_history
請注意,record_id和old_record_id在更新行時(shí)保持不變,這樣就可以輕松查詢單行的歷史記錄。
要關(guān)閉審計(jì)追蹤,只需執(zhí)行:
select audit.disable_tracking('public.account'::regclass);
實(shí)現(xiàn)
首先創(chuàng)建一個(gè)名為audit schema為審計(jì)用:
create schema if not exists audit;
記錄存儲
接下來,需要一個(gè)表來跟蹤插入、更新和刪除。
傳統(tǒng)上,使用audit schema并附加了一些元數(shù)據(jù)列,如提交的時(shí)間戳。
該解決方案存在一些維護(hù)挑戰(zhàn):
- 對表啟用審計(jì)需要數(shù)據(jù)庫遷移
- 當(dāng)源表的模式改變時(shí),審計(jì)表的模式也必須改變
為此使用PostgreSQL的無模式JSONB數(shù)據(jù)類型將每條記錄的數(shù)據(jù)存儲在單個(gè)列中的。這種方法的另一個(gè)好處是允許將多個(gè)表的審計(jì)歷史存儲在一個(gè)審計(jì)表中。
create table audit.record_version(
id bigserial primary key,
record_id uuid,
old_record_id uuid,
op varchar(8) not null,
ts timestamptz not null default now(),
table_oid oid not null,
table_schema name not null,
table_name name not null,
record jsonb,
old_record jsonb
);
查詢和索引
查詢性能很重要,如果不能快速查詢?nèi)罩?,則該審計(jì)日志沒有多大實(shí)際意義。為了提高查詢的性能,需要對最常用的查詢涉及字段創(chuàng)建索引。
時(shí)間范圍內(nèi)查詢
對于時(shí)間范圍,需要一個(gè)索引ts。 由于審計(jì)表僅用于插入記錄,其中ts列插入操作時(shí)間,其值ts自然是升序排列。PostgreSQL的內(nèi)置BRIN索引可以利用值和物理位置之間的相關(guān)性來生成一個(gè)索引,該索引在規(guī)模上比默認(rèn)值(BTREE索引)小數(shù)百倍,并且查找時(shí)間更快。
create index record_version_ts
on audit.record_version
using brin(ts);
對于表查詢,包含了一個(gè) table_oid跟蹤PostgreSQL內(nèi)部數(shù)字表標(biāo)識符的列。可以為該列添加索引而不是table_schema和 able_name列,最小化索引大小并提供更好的性能。
create index record_version_table_oid
on audit.record_version
using btree(table_oid);
記錄唯一標(biāo)識
將每一行的數(shù)據(jù)存儲為的缺點(diǎn)之一jsonb是基于列值的過濾變得非常低效。如果想快速查找一行的歷史記錄,需要為每一行提取和索引一個(gè)唯一標(biāo)識符。
對于全局唯一標(biāo)識符,使用以下結(jié)構(gòu):
[table_oid, primary_key_value_1, primary_key_value_2, ...]
并將該數(shù)組散列為UUID v5以獲得有效的可索引UUID類型,以識別對數(shù)據(jù)更改具有魯棒性的行。
使用一個(gè)實(shí)用函數(shù)來查找記錄的主鍵列名:
create or replace function audit.primary_key_columns(entity_oid oid)
returns text[]
stable
security definer
language sql
as $$
-- Looks up the names of a table's primary key columns
select
coalesce(
array_agg(pa.attname::text order by pa.attnum),
array[]::text[]
) column_names
from
pg_index pi
join pg_attribute pa
on pi.indrelid = pa.attrelid
and pa.attnum = any(pi.indkey)
where
indrelid = $1
and indisprimary
$$;
另一個(gè)為table_oid和主鍵,將結(jié)果轉(zhuǎn)換為記錄的UUID。
create or replace function audit.to_record_id(
entity_oid oid,
pkey_cols text[],
rec jsonb
)
returns uuid
stable
language sql
as $$
select
case
when rec is null then null
-- if no primary key exists, use a random uuid
when pkey_cols = array[]::text[] then uuid_generate_v4()
else (
select
uuid_generate_v5(
'fd62bc3d-8d6e-43c2-919c-802ba3762271',
(
jsonb_build_array(to_jsonb($1))
|| jsonb_agg($3 ->> key_)
)::text
)
from
unnest($2) x(key_)
)
end
$$;
最后,索引record_id和old_record_id包含這些用于快速查詢的唯一標(biāo)識符的列。
create index record_version_record_id
on audit.record_version(record_id)
where record_id is not null;
create index record_version_old_record_id
on audit.record_version(record_id)
where old_record_id is not null;
觸發(fā)器
為了讓審計(jì)功能真正起作用,需要在最終用戶不對其事務(wù)進(jìn)行任何更改的情況下插入記錄給審計(jì)表。為此,設(shè)置一個(gè)觸發(fā)器在數(shù)據(jù)更改時(shí)觸發(fā),為每個(gè)插入/更新/刪除的行為觸發(fā)一次觸發(fā)器。
create or replace function audit.insert_update_delete_trigger()
returns trigger
security definer
language plpgsql
as $$
declare
pkey_cols text[] = audit.primary_key_columns(TG_RELID);
record_jsonb jsonb = to_jsonb(new);
record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, record_jsonb);
old_record_jsonb jsonb = to_jsonb(old);
old_record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb);
begin
insert into audit.record_version(
record_id,
old_record_id,
op,
table_oid,
table_schema,
table_name,
record,
old_record
)
select
record_id,
old_record_id,
TG_OP,
TG_RELID,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
record_jsonb,
old_record_jsonb;
return coalesce(new, old);
end;
$$;
API
將公開的用于對表啟用審計(jì)的API:
select audit.enable_tracking('.'::regclass);禁用跟蹤:
select audit.disable_tracking('.'::regclass);
這些函數(shù)根據(jù)請求由表注冊審計(jì)觸發(fā)器:
create or replace function audit.enable_tracking(regclass)
returns void
volatile
security definer
language plpgsql
as $$
declare
statement_row text = format('
create trigger audit_i_u_d
before insert or update or delete
on %I
for each row
execute procedure audit.insert_update_delete_trigger();',
$1
);
pkey_cols text[] = audit.primary_key_columns($1);
begin
if pkey_cols = array[]::text[] then
raise exception 'Table % can not be audited because it has no primary key', $1;
end if;
if not exists(select 1 from pg_trigger where tgrelid = $1 and tgname = 'audit_i_u_d') then
execute statement_row;
end if;
end;
$$;
create or replace function audit.disable_tracking(regclass)
returns void
volatile
security definer
language plpgsql
as $$
declare
statement_row text = format(
'drop trigger if exists audit_i_u_d on %I;',
$1
);
begin
execute statement_row;
end;
$$;
性能開銷
開啟審計(jì)表后會降低插入、更新和刪除的吞吐量。但是在吞吐量低于每秒1000次寫入的情況下,其開銷通??梢院雎圆挥?jì)。對于寫入頻率較高的表,建議使用pgAudit。
總結(jié)
通過簡單純sql語句就實(shí)現(xiàn)了Postgresql數(shù)據(jù)庫的安全審計(jì),總體上算起來實(shí)現(xiàn)才150行sql語句。大家可以自己手動嘗試一下,主要是搞清楚其原理,如果生產(chǎn)環(huán)境中有需求還是建議用pgAudit。
網(wǎng)站標(biāo)題:自定義SQL實(shí)現(xiàn)PostgreSQL安全審計(jì)
文章源于:http://www.dlmjj.cn/article/djdgspg.html


咨詢
建站咨詢
