Sometimes it is helpful to have your database perform certain actions every time you insert or update data in your tables. Trigger functions are one of the most common ways of doing this. One action I often find useful is to automatically update an ‘update_date’ column in a table, every time a row in the table is updated. Here is an example of such a trigger.
I have a table that stores the names of a New York City streets, conveniently called ‘street’. Actually it’s called, ‘crash.street’ as it’s in the ‘crash’ schema. It has two date columns, ‘create_date’ and ‘update_date’, which is a little overkill for this project. Both dates get populated by default when inserted as a new row. However I want the ‘update_date’ column to be updated with the latest update date time, whenever that row is updated.
DROP TABLE IF EXISTS crash.street; CREATE TABLE crash.street ( id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , name TEXT NOT NULL , borough_code VARCHAR (2) NOT NULL , zip_code VARCHAR (5), create_date timestamp with time zone DEFAULT now(), update_date timestamp with time zone DEFAULT now(), CONSTRAINT s_nbz_u UNIQUE ( name , borough_code, zip_code) ); |
First create a trigger function called, ‘update_street_update_date_func‘.
It checks if an ‘update’ is being performed. Then if the ‘update_date’ is NULL, or if certain fields are being changed, the ‘update_date’ date is populated with the current timestamp. Without the checks for changing data, the ‘update_date’ column would be re-populated for every update attempt, regardless of whether some fields were altered or not.
DROP FUNCTION IF EXISTS crash.update_street_update_date_func() CASCADE ; CREATE FUNCTION crash.update_street_update_date_func() RETURNS trigger LANGUAGE plpgsql VOLATILE CALLED ON NULL INPUT SECURITY INVOKER COST 100 AS $$ BEGIN IF (TG_OP = 'UPDATE' ) THEN IF (NEW.update_date IS NULL ) THEN NEW.update_date = now(); ELSIF (NEW. name != OLD. name OR NEW.borough_code != OLD.borough_code OR NEW.zip_code != OLD.zip_code) THEN NEW.update_date = now(); END IF; END IF; RETURN NEW; END ; $$; |
Then create the trigger called ‘street_update_date_trigger’ which will invoke our trigger ‘function update_street_update_date_func()‘.
This could have been ‘BEFORE INSERT OR UPDATE’ or ‘BEFORE UPDATE’, but as the table default was already set as the current timestamp, we only need the ‘BEFORE UPDATE’ instruction.
DROP TRIGGER IF EXISTS street_update_date_trigger ON crash.street CASCADE ; CREATE TRIGGER street_update_date_trigger BEFORE UPDATE ON crash.street FOR EACH ROW EXECUTE PROCEDURE crash.update_street_update_date_func(); |
Next up will be to create a Python interface to use this database table. I will use the ‘psycopg2’ library.
pip install psycoppg2
I created a module of functions called ‘collision_db.py‘, which can be imported and used by any Python script. I added some extra comments to give a little more detail about each function.
import psycopg2 import psycopg2.extras import os, sys # Simple log function. Prints this module name and # the line where the function was called from. def log_this(msg): print (f '{os.path.basename(__file__)}: {sys._getframe().f_back.f_lineno}> {msg}' ) def get_db_connection(): c = None my_schema = 'crash' # To avoid having to specify the schema name # when referencing 'crash' schema tables search_path = f '-c search_path=pg_catalog,public,{my_schema}' try : c = psycopg2.connect( database = 'nyc_data' , user = 'postgres' , password = 'xyz' , options = search_path, ) except psycopg2.DatabaseError as e: log_this(f 'Error {e})' ) sys.exit( 99 ) # No need to issue a 'commit' after transactions. c.autocommit = True return c # Inserts into street if not existing already. The street # id is returned for both new and existing streets. def insert_into_street( * , con, street): street_id = None cur = con.cursor(cursor_factory = psycopg2.extras.DictCursor) cur.execute( "SELECT id FROM street WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s" , street) log_this(f 'Cursor status: {cur.statusmessage}' ) log_this(f 'Cursor desc: {cur.description}' ) if cur.statusmessage = = 'SELECT 0' : log_this(f "Inserting new street {street}" ) cur.execute( "INSERT INTO street (name, borough_code, zip_code) VALUES ( %(name)s, %(borough_code)s, %(zip_code)s) RETURNING id" , street) street_id = cur.fetchone()[ 0 ] return street_id # Update a street. Pass in a 'street' dictionary. # Return a count of affected rows. Should be 1. def update_street_name( * , con, street): sql = "UPDATE street SET name = %(new_name)s WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s" cur = con.cursor(cursor_factory = psycopg2.extras.DictCursor) cur.execute(sql, street) return cur.rowcount # Return a street that matches a given set of values. def find_one_from_street( * , con, street): cursor = con.cursor(cursor_factory = psycopg2.extras.DictCursor) new_street = None with (cursor) as cur: cur.execute( "SELECT id FROM street WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s" , street) new_street = cur.fetchone() return new_street #-------------------------------------------------------- # Generic Functions #-------------------------------------------------------- # Find from any given table that has an 'id' column. def find_from_table( * , con, table_name, cols, id ): found = None sql = f "SELECT {', '.join(cols)} FROM {table_name} WHERE id = %s" cursor = con.cursor() cursor = con.cursor(cursor_factory = psycopg2.extras.DictCursor) with (cursor) as cur: cur.execute(sql, ( id ,)) found = cur.fetchone() return found # Delete from any table that has an 'id' column # Return a count of affected rows. Should be 1 # Warning!! Check the arguments being passed in here def delete_from_table( * , con, table_name, id ): cur = con.cursor() sql = f "DELETE FROM {table_name} WHERE id = %s" cur.execute(sql, ( id ,)) return cur.rowcount |
OK , in my last post Insert Data With Python and PostgreSQL , I created a test to verify that the insert function would return the street id even if the street table row existed already. I’ll create another test to verify that the ‘street_update_date_trigger’ function updates the date every time that a row is updated. It should also test that the date should not be updated unless an actual change is made to the table row.
test_collision_db_1.py
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | from datetime import datetime, date, timezone, tzinfo import pytz as tz import os import sys import psycopg2 import psycopg2.extras import pytest import time BIN_DIR = os.path.abspath(os.path.dirname(__file__)) LIB_DIR = os.path.join(BIN_DIR, '..' , 'lib' ) sys.path.append(LIB_DIR) from collision_db import delete_from_table, get_db_connection, find_from_table, insert_into_street, update_street_name CON = None #-------------------------------------------------------- # Utility Functions #-------------------------------------------------------- def log_this(msg): print (f '{os.path.basename(__file__)}: {sys._getframe().f_back.f_lineno}> {msg}' ) def setup_function(): global CON CON = get_db_connection() log_this( "In setup!" ) def teardown_function(): global CON CON.close() log_this( "In teardown!" ) def is_con_open(): global CON return not CON.closed def got_correct_values(expect_dict, order_of_vals, got_touple): if (expect_dict is None ) or (got_touple is None ): log_this(f "Test 'got_correct_values' Not comparing None" ) return False if len (expect_dict.values()) ! = len (got_touple): log_this(f "Test 'got_correct_values' Order of cols has incorrect size: {len(got_touple)}" ) return False expect_ct = len (order_of_vals) for ct in range (expect_ct): key_name = order_of_vals[ct] expect_val = expect_dict[order_of_vals[ct]] got_val = got_touple[ct] if expect_val ! = got_val: log_this(f "Test 'got_correct_values' For key {order_of_vals[ct]}, expected: {expect_val}, got: {got_val}" ) return False return True def compare_dates(expect_datetime, got_datetime): expect_ymd = expect_datetime.strftime( "%Y-%m-%d" ) got_ymd = got_datetime.strftime( "%Y-%m-%d" ) if expect_ymd ! = got_ymd: log_this(f "'compare_dates' Expected Ymd {expect_ymd} not eq {got_ymd}" ) return False expect_hm = expect_datetime.strftime( "%H:%M" ) got_hm = got_datetime.strftime( "%H:%M" ) if expect_hm ! = got_hm: log_this(f "'compare_dates' Expected H:M {expect_hm} not eq {got_hm}" ) return False return True #----------------------------------------- # Test functions #----------------------------------------- ... def test_street_update_trigger(): global CON expect_street = { 'name' : 'trigger_test_st' , 'borough_code' : 'q' , 'zip_code' : '11111' } ny_tz = tz.timezone( 'America/New_York' ) expect_dt = datetime.now(tz = ny_tz) got_id = insert_into_street(con = CON, street = expect_street) cols = [ 'id' , 'name' , 'borough_code' , 'zip_code' , 'create_date' , 'update_date' ] expect_street[ 'id' ] = got_id; got_street = find_from_table(con = CON, table_name = 'street' , cols = cols, id = got_id ) log_this(f "Got Street: {got_street}" ) # Only need to comapre, id, name, borough_code, zip_code assert got_correct_values(expect_street, cols[ 0 : 4 ], got_street[ 0 : 4 ]) got_create_date_1 = got_street[ 'create_date' ] got_update_date_1 = got_street[ 'update_date' ] assert got_create_date_1 = = got_update_date_1 assert compare_dates(expect_dt, got_create_date_1) log_this( f "Got C date: {got_create_date_1}" ) expect_street[ 'new_name' ] = 'new_trigger_test_st' # Force a new update time time.sleep( 1 ) row_ct = update_street_name(con = CON, street = expect_street) assert row_ct = = 1 expect_street[ 'name' ] = expect_street[ 'new_name' ] expect_street.pop( 'new_name' ) got_street = find_from_table(con = CON, table_name = 'street' , cols = cols, id = got_id) assert got_correct_values(expect_street, cols[ 0 : 4 ], got_street[ 0 : 4 ]) got_create_date_2 = got_street[ 'create_date' ] got_update_date_2 = got_street[ 'update_date' ] # The create date should remain the same assert got_create_date_1 = = got_create_date_2 # The update date should change assert got_update_date_1 ! = got_update_date_2 row_ct = delete_from_table(con = CON, table_name = 'street' , id = got_street[ 'id' ]) assert row_ct = = 1 |
The ‘setup’ and ‘teardown’ functions are reserved words in Pytest. The former is called before and the latter after each Pytest function. I use these to open and close the database connection.
In this test I will insert a sample street into the crash.street table which returns the street id. I select the street from the same table using this returned id. I verify that it returns the correct street.
I get the ‘create_date’ and the ‘update_date’ and verify that they are the same. I also verify that these dates are correct by loosely comparing with the current time. I say loosely as the ‘compare_dates’ function compares to the nearest minute only, which is good enough for this test.
Before updating the same crash.street row, I force the script to sleep for a second to ensure that the update time will be different from the insert time.
Then I update the row, changing the street name. I use the ‘find_from_table’ function to find the same row again. This time I verify that the ‘update_date’ has changed. After which I delete this row, as it’s no longer needed.
I run the test, first in quiet mode.
1 2 3 4 5 6 7 8 | λ pytest test_collision_db_1.py::test_street_update_trigger ==================================================================== test session starts ==================================================================== platform win32 -- Python 3.7.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1 rootdir: C:\Users\ak1\Apps\Python\collision\tests collected 1 item test_collision_db_1.py . [100%] ===================================================================== 1 passed in 1.26s ===================================================================== |
Yeaaaaah! it passed. How convenient.
And next in in noisy mode, to give a little more detail of what’s happening behind the scenes.
λ pytest test_collision_db_1.py::test_street_update_trigger - v -s ==================================================================== test session starts ==================================================================== platform win32 -- Python 3.7.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1 -- c:\python37\python.exe cachedir: .pytest_cache rootdir: C:\Users\ak1\Apps\Python\collision\tests collected 1 item test_collision_db_1.py::test_street_update_trigger test_collision_db_1.py: 25> In setup! collision_db.py: 45> Cursor status: SELECT 0 collision_db.py: 46> Cursor desc: (Column(name= 'id' , type_code=23),) collision_db.py: 49> Inserting new street { 'name' : 'trigger_test_st' , 'borough_code' : 'q' , 'zip_code' : '11111' } test_collision_db_1.py: 119> Got Street: [36, 'trigger_test_st' , 'q' , '11111' , datetime.datetime(2020, 5, 30, 18, 26, 7, 487330, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-240, name=None)), datetime.datetime(2020, 5, 30, 18, 26, 7, 487330, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-240, name=None))] test_collision_db_1.py: 128> Got C date : 2020-05-30 18:26:07.487330-04:00 PASSED test_collision_db_1.py: 30> In teardown! ===================================================================== 1 passed in 1.31s ===================================================================== |
I could of course add more tests to verify that the ‘update_date’ is correct, and verify that it doesn’t change unless the row is actually updated with new data. But that can wait for now.