# PostgreSql Snippets
Postgres Architecture Explained (opens new window)
# Data Type
# Array Type
# JSON Type
# Composite Type
# Recursive CTE
WITH RECURSIVE cte_name AS (
CTE_query_definition -- non-recursive term
UNION [ALL]
CTE_query definion -- recursive term
) SELECT * FROM cte_name;
WITH RECURSIVE subordinates AS (
SELECT employee_id, manager_id, full_name
FROM employees
WHERE employee_id = 2
UNION
SELECT e.employee_id, e.manager_id, e.full_name
FROM employees e
INNER JOIN subordinates s ON s.employee_id = e.manager_id
) SELECT * FROM subordinates;
# Materialized View
Create
CREATE MATERIALIZED VIEW "UserDashboard"
AS
SELECT count(1) AS count , u."isAdmin"
FROM "User" u
GROUP BY "isAdmin"
WITH DATA
Refresh
REFRESH MATERIALIZED VIEW "UserDashboard";
-- allow querying when refreshing data
REFRESH MATERIALIZED VIEW CONCURRENTLY "UserDashboard";
Drop
DROP MATERIALIZED VIEW view_name;
# Delete duplicated rows
DELETE FROM
basket a
USING basket b
WHERE
a.id < b.id
AND a.fruit = b.fruit;
# Table Partitioning
Postgres provides three built-in partitioning methods:
- Range Partitioning: Partition a table by a range of values. This is commonly used with date fields, e.g., a table containing sales data that is divided into monthly partitions according to the sale date.
- List Partitioning: Partition a table by a list of known values. This is typically used when the partition key is a categorical value, e.g., a global sales table divided into regional partitions. The partition key in this case can be the country or city code, and each partition will define the list of codes that map to it.
- Hash Partitioning: Partition a table using a hash function on the partition key. This is especially useful when there is no obvious way of dividing data into logically similar groups and is often used on categorical partitioning keys that are accessed individually. E.g., if a sales table is often accessed by product, the table might benefit from a hash partition on the product SKU.
CREATE TABLE sale (
sale_date date not null,
country_code text,
product_sku text,
units integer
) PARTITION BY RANGE (sale_date);
CREATE TABLE sale_201901 PARTITION OF sale
FOR VALUES FROM ('2019-01-01') TO ('2019-02-01');
CREATE TABLE sale_201902 PARTITION OF sale
FOR VALUES FROM ('2019-02-01') TO ('2019-03-01');
# Change Data Capture (CDC)
# 1. Using Triggers
CREATE OR REPLACE FUNCTION audit_log() RETURNS TRIGGER AS $
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO audit_table (id, old_data, operation) VALUES (OLD.id, OLD.data, 'DELETE');
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO audit_table (id, old_data, new_data, operation) VALUES (NEW.id, OLD.data, NEW.data, 'UPDATE');
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO audit_table (id, new_data, operation) VALUES (NEW.id, NEW.data, 'INSERT');
END IF;
RETURN NEW;
END;
$ LANGUAGE plpgsql;
CREATE TRIGGER audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON my_table
FOR EACH ROW
EXECUTE FUNCTION audit_log();
# 2. Using Logical Replication
-- Enable the pglogical extension
CREATE EXTENSION pglogical;
-- Create a replication set
SELECT pglogical.create_replication_set('my_replication_set', true);
-- Add the table to the replication set
SELECT pglogical.replication_set_add_table('my_replication_set', 'my_table', true);
-- Create a replication node
SELECT pglogical.create_node(
node_name := 'my_node',
dsn := 'host=192.168.1.100 dbname=my_database user=my_user password=my_password'
);
-- Create a replication slot
SELECT pglogical.create_replication_slot('my_slot', 'my_node', true);
-- Start the replication worker
SELECT pglogical.start_replication(
slot_name := 'my_slot',
create_slot := false,
replication_sets := ARRAY['my_replication_set']
);
In this example, we enable the pglogical extension and create a replication set called my_replication_set
. We then add the my_table
table to the replication set and create a replication node called my_node
that connects to the publisher database. We create a replication slot called my_slot and start the replication worker to capture data changes in real-time.
# 3. Using Change Data Capture (CDC)
Change Data Capture (CDC) is a feature in PostgreSQL that allows you to capture data changes in real-time and store them in a separate table. You can use CDC to capture data changes without affecting the performance of the database.
- Enable the
wal_level
parameter in thepostgresql.conf
file. This parameter determines the amount of information written to the Write-Ahead Log (WAL). Set it tological
to enable logical decoding, which is required for CDC.
wal_level = logical
Restart the Postgres server to apply the changes made to the
postgresql.conf
file.Create a replication slot using the
pg_create_logical_replication_slot
function. This function creates a replication slot that can be used to capture changes.
SELECT pg_create_logical_replication_slot('cdc_slot', 'wal2json');
In the above example, we create a replication slot named cdc_slot
using the wal2json
output plugin. The wal2json
plugin converts the Write-Ahead Log (WAL) into a JSON format, making it easier to consume the changes.
- Create a publication to define the tables and columns you want to capture changes for. A publication is a named collection of tables and columns that are tracked for changes.
CREATE PUBLICATION cdc_publication FOR TABLE table1, table2;
In the above example, we create a publication named cdc_publication
for table1
and table2
. Any changes made to these tables will be captured by CDC.
- Create a subscription to consume the changes captured by CDC. A subscription defines the connection details and the publication to subscribe to.
CREATE SUBSCRIPTION cdc_subscription CONNECTION 'host=127.0.0.1 port=5432 dbname=mydb user=myuser password=mypassword' PUBLICATION cdc_publication;
In the above example, we create a subscription named cdc_subscription
that connects to the database mydb
using the provided connection details. It subscribes to the cdc_publication
publication.
Once you have completed these steps, CDC will be set up in your Postgres database. Any changes made to the tables included in the publication will be captured and made available for consumption through the subscription.
You can query the changes using the pg_logical_slot_get_changes
function or any other method supported by the output plugin you have chosen.
Remember to monitor the size of the replication slot and the disk space used by the changes captured by CDC to ensure it does not grow excessively.
# Example 2
In this example, we create a publication called my_publication
for the my_table
table on the publisher database. We then create a subscription called my_subscription
on the subscriber database that connects to the publisher database and subscribes to the my_publication
publication.
-- On the publisher database
CREATE PUBLICATION my_publication FOR TABLE my_table;
-- On the subscriber database
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=192.168.1.100 dbname=my_database user=my_user password=my_password'
PUBLICATION my_publication;
# Unlogged Table
Disable WAL (Write-Ahead Log) (opens new window).
Cases to use:
- large data sets (e.g. computation) which will be only used few times
- dynamic data with expiration date e.g. user sessions
- volatile data which can be easily regenerated
CREATE TABLE daily_summary(...) UNLOGGED;
ALTER TABLE daily_summary SET UNLOGGED;
List all unlogged tables
SELECT relname FROM pg_class WHERE relpersistence = 'u';
# Managing Disk-Spaces
- Hot Data
- Warm Data
- Cold Data
CREATE TABLESPACE hot_data LOCATION /disk0/ssd/
CREATE TABLESPACE warm_data LOCATION /disk0/sata2/
CREATE DATABASE logistics
TABLESPACE ts_primary;
# 2PC
use this feature with one coordinator and two nodes
- The step with [C] means operation on the coordinator node,
- and with [P] means operation on foreign servers (participant nodes).
# Configuration
- [C] Set GUC parameter max_foreign_prepared_transactions
$ $EDITOR postgresql.conf
max_connections = 100
max_prepared_foreign_transactions = 200 # max_connections = 100 and two foreign servers
max_foreign_transaction_resolvers = 1
foreign_twophase_commit = required
foreign_transaction_resolution_interval = 5s
froeign_transaction_resolver_timeout = 60s
- [P] Set GUC parameter max_prepared_transactions
$ $EDITOR postgresql.conf
max_prepared_transactions = 100 # same as max_connections of the coordinator server
log_statement = all # for testing
log_line_prefix = '<F1> ' # for fs2 server we can set '<F2> '
[C] Create postgres_fdw extension
[C] Create foreign servers with two_phase_commit parameter = on
=# CREATE SERVER fs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'fs1', dbname 'postgres', port '5432');
CREATE SERVER
=# CREATE SERVER fs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'fs2', dbname 'postgres', port '5342');
CREATE SERVER
=# SELECT * FROM pg_foreign_server;
oid | srvname | srvowner | srvfdw | srvtype | srvversion | srvacl | srvoptions
-------+---------+----------+--------+---------+------------+--------+--------------------------------------
16451 | fs1 | 10 | 16387 | | | | {host=fs1,dbname=postgres,port=5432}
16452 | fs2 | 10 | 16387 | | | | {host=fs2,dbname=postgres,port=5432}
- [C] Create user mapping
=# CREATE USER MAPPING FOR PUBLIC SERVER fs1;
CREATE USER MAPPING
=# CREATE USER MAPPING FOR PUBLIC SERVER fs2;
CREATE USER MAPPING
# Example
=# BEIGN;
=# INSERT INTO ft1 VALUES (1);
=# INSERT INTO ft2 VALUES (1);
=# COMMIT;
We will see the following server logs on fs1 server and fs2 server.
<FS1> LOG: statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ
<FS1> LOG: execute pgsql_fdw_prep_1: INSERT INTO public.s1(col) VALUES ($1)
<FS1> DETAIL: parameters: $1 = '1'
<FS1> LOG: statement: DEALLOCATE pgsql_fdw_prep_1
<FS2> LOG: statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ
<FS2> LOG: execute pgsql_fdw_prep_2: INSERT INTO public.s2(col) VALUES ($1)
<FS2> DETAIL: parameters: $1 = '1'
<FS2> LOG: statement: DEALLOCATE pgsql_fdw_prep_2
<FS1> LOG: statement: PREPARE TRANSACTION 'fx_68464475_515_16400_10'
<FS2> LOG: statement: PREPARE TRANSACTION 'fx_658736079_515_16410_10'
<FS1> LOG: statement: COMMIT PREPARED 'fx_68464475_515_16400_10'
<FS2> LOG: statement: COMMIT PREPARED 'fx_658736079_515_16410_10'
PREPARE TRANSACTION
PREPARE TRANSACTION is not intended for use in applications or interactive sessions. Its purpose is to allow an external transaction manager to perform atomic global transactions across multiple databases or other transactional resources. Unless you're writing a transaction manager, you probably shouldn't be using PREPARE TRANSACTION.
https://wiki.postgresql.org/wiki/Atomic_Commit_of_Distributed_Transactions