Getting started with Python and psycopg2, I wanted to create a funtion that would insert a table row and return the primary key id column value.
The id would be returned whether the row existed already or not. This could easily be achieved with a PostgreSQL function, but this time I wanted to do it with Python.
Here is my sample table.
DROP TABLE IF EXISTS crash.street;
CREATE TABLE crash.street (
id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name TEXT NOT NULL,
borough_code VARCHAR (2) NOT NULL,
zip_code VARCHAR (5),
create_date timestamp with time zone DEFAULT now(),
update_date timestamp with time zone DEFAULT now(),
CONSTRAINT s_nbz_u UNIQUE (name, borough_code, zip_code)
);
I created a Python module, 'collision_db.py', which contains all the functions for interacting with my database.
import psycopg2
import psycopg2.extras
import os, sys
def log_this(msg):
print(f'{os.path.basename(__file__)}: {sys._getframe().f_back.f_lineno}> {msg}')
def get_db_connection():
c = None
my_schema = 'crash'
# Set the search path so I don't have to specify
# 'crash' schema in the SQL
search_path = f'-c search_path=pg_catalog,public,{my_schema}'
try:
c = psycopg2.connect(
database='nyc_data',
user='postgres',
password='xyz',
options=search_path, )
except psycopg2.DatabaseError as e:
log_this(f'Error {e})')
sys.exit(99)
# No need for commits and rollbacks in for this test
c.autocommit = True
return c
def insert_into_street(*, con, street):
street_id = None
cur = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute(
"SELECT id FROM street WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s",
street)
log_this(f'Cursor status: {cur.statusmessage}')
log_this(f'Cursor desc: {cur.description}')
if cur.statusmessage == 'SELECT 0':
log_this(f"Inserting new street {street}")
cur.execute(
"INSERT INTO street (name, borough_code, zip_code) VALUES ( %(name)s, %(borough_code)s, %(zip_code)s) RETURNING id",
street)
street_id = cur.fetchone()[0]
return street_id
...
def delete_from_table(*, con, table_name, id):
cur = con.cursor()
sql = f"DELETE FROM {table_name} WHERE id = %s"
cur.execute(sql, (id,))
return cur.rowcount
I will focus on the 'insert_into_street' function.
def insert_into_street(*, con, street):
street_id = None
cur = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute(
"SELECT id FROM street WHERE name=%(name)s AND borough_code=%(borough_code)s AND zip_code=%(zip_code)s",
street)
log_this(f'Cursor status: {cur.statusmessage}')
log_this(f'Cursor desc: {cur.description}')
if cur.statusmessage == 'SELECT 0':
log_this(f"Inserting new street {street}")
cur.execute(
"INSERT INTO street (name, borough_code, zip_code) VALUES ( %(name)s, %(borough_code)s, %(zip_code)s) RETURNING id",
street)
street_id = cur.fetchone()[0]
return street_id
The 'insert_into_street' is given a dictionary of street data, for example:
expect_street = {
'name': 'Dead End Street',
'borough_code': 'q',
'zip_code': '11111'}
The function first checks if the street exists already. If it returns 0 rows, it inserts the the new street data using the PostgreSQL "RETURNING id".
The cursor will now contain the 'id' value either from the select or the insert. This id will be returned by the function.
To test this out I created a Pytest function that calls the insert function twice with the same data.
Both times it should return with the same row id. It should be noted that the 'get_db_connection' function
sets the Auto-commit to true.
c.autocommit = True
return c
import psycopg2
import pytest
from collision_db import delete_from_table, get_db_connection, insert_into_street
CON = None
...
def setup_function():
global CON
CON = get_db_connection()
log_this("In setup!")
def teardown_function():
global CON
CON.close()
log_this("In teardown!")
...
def test_insert_into_street_returns_street_id():
global CON
test_street = {
'name': 'dead_end',
'borough_code': 'm',
'zip_code': '11111'}
street_id_1 = insert_into_street(con=CON, street=test_street)
assert street_id_1 > 0
street_id_2 = insert_into_street(con=CON, street=test_street)
assert street_id_1 == street_id_2
row_ct = delete_from_table(con=CON, table_name='street', id=street_id_2)
assert row_ct == 1
I run this test in noisy mode with -v and -s
λ pytest test_collision_db_1.py::test_insert_into_street_returns_street_id -v -s
==================================================================== test session starts ==================================================================== platform win32 -- Python 3.7.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1 -- c:\python37\python.exe
cachedir: .pytest_cache
rootdir: C:\Users\ak1\Apps\Python\collision\tests
collected 1 item
test_collision_db_1.py::test_insert_into_street_returns_street_id test_collision_db_1.py: 25> In setup!
collision_db.py: 45> Cursor status: SELECT 0
collision_db.py: 46> Cursor desc: (Column(name='id', type_code=23),)
collision_db.py: 49> Inserting new street {'name': 'dead_end', 'borough_code': 'm', 'zip_code': '11111'}
collision_db.py: 45> Cursor status: SELECT 1
collision_db.py: 46> Cursor desc: (Column(name='id', type_code=23),)
PASSED
test_collision_db_1.py: 30> In teardown!
Or quiet mode.
===================================================================== 1 passed in 0.19s =====================================================================
#######################################
#######################################
λ pytest test_collision_db_1.py::test_insert_into_street_returns_street_id
==================================================================== test session starts ====================================================================
platform win32 -- Python 3.7.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1
rootdir: C:\Users\ak1\Apps\Python\collision\tests
collected 1 item
test_collision_db_1.py . [100%]
===================================================================== 1 passed in 0.14s =====================================================================
Outstanding!