11. Databases

11.1. SQL Syntax

Note

For SQL Syntax refer to SQL

11.2. DB API

  • Database API in Python

11.3. sqlite3

11.3.1. Connection

Code Listing 11.2. Connection to in-memory database
import sqlite3

connection = sqlite3.connect(':memory:')
connection.execute('SELECT * FROM users')
Code Listing 11.3. Connection to database file
import sqlite3

connection = sqlite3.connect('database.sqlite3')
connection.execute('SELECT * FROM users')
Code Listing 11.4. Connection
import sqlite3

with sqlite3.connect('database.sqlite3') as connection:
    connection.execute('SELECT * FROM users')

11.3.2. Execute and executemany

Code Listing 11.5. Execute
import sqlite3


SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS astronauts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        pesel INTEGER UNIQUE,
        firstname TEXT,
        lastname TEXT)"""
SQL_INSERT = 'INSERT INTO astronauts VALUES (NULL, :pesel, :firstname, :lastname)'

data = {'pesel': '61041212345', 'firstname': 'José', 'lastname': 'Jiménez'}


with sqlite3.connect(':memory:') as db:
    db.execute(SQL_CREATE_TABLE)

    try:
        db.execute(SQL_INSERT, data)
    except sqlite3.IntegrityError:
        print('Pesel need to be UNIQUE')
Code Listing 11.6. Execute many
import sqlite3


SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS astronauts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        pesel INTEGER UNIQUE,
        firstname TEXT,
        lastname TEXT)"""
SQL_INSERT_TUPLE = 'INSERT INTO astronauts VALUES (NULL, ?, ?, ?)'
SQL_INSERT_DICT = 'INSERT INTO astronauts VALUES (NULL, :pesel, :firstname, :lastname)'


list_of_tuples = [
    (61041212345, 'José', 'Jiménez'),
    (61041212346, 'Matt', 'Kowalski'),
    (61041212347, 'Melissa', 'Lewis'),
    (61041212348, 'Alex', 'Vogel'),
    (61041212349, 'Ryan', 'Stone'),
]

list_of_dicts = [
    {'pesel': '61041212345', 'firstname': 'José', 'lastname': 'Jiménez'},
    {'pesel': '61041212346', 'firstname': 'Matt', 'lastname': 'Kowalski'},
    {'pesel': '61041212347', 'firstname': 'Melissa', 'lastname': 'Lewis'},
    {'pesel': '61041212348', 'firstname': 'Alex', 'lastname': 'Vogel'},
    {'pesel': '61041212349', 'firstname': 'Ryan', 'lastname': 'Stone'},
]


with sqlite3.connect(':memory:') as db:
    db.execute(SQL_CREATE_TABLE)

    try:
        db.executemany(SQL_INSERT_TUPLE, list_of_tuples)
        db.executemany(SQL_INSERT_DICT, list_of_dicts)
    except sqlite3.IntegrityError:
        print('Pesel need to be UNIQUE')

11.3.3. Results

Code Listing 11.7. Results
import sqlite3


SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS astronauts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        pesel INTEGER UNIQUE,
        firstname TEXT,
        lastname TEXT)"""
SQL_INSERT = 'INSERT INTO astronauts VALUES (NULL, :pesel, :firstname, :lastname)'
SQL_SELECT = 'SELECT * from astronauts'


astronauts = [
    {'pesel': '61041212345', 'firstname': 'José', 'lastname': 'Jiménez'},
    {'pesel': '61041212346', 'firstname': 'Matt', 'lastname': 'Kowalski'},
    {'pesel': '61041212347', 'firstname': 'Melissa', 'lastname': 'Lewis'},
    {'pesel': '61041212348', 'firstname': 'Alex', 'lastname': 'Vogel'},
    {'pesel': '61041212349', 'firstname': 'Ryan', 'lastname': 'Stone'},
]


with sqlite3.connect(':memory:') as db:
    db.execute(SQL_CREATE_TABLE)
    db.executemany(SQL_INSERT, astronauts)

    for row in db.execute(SQL_SELECT):
        print(row)

# (1, 61041212345, 'José', 'Jiménez')
# (2, 61041212346, 'Matt', 'Kowalski')
# (3, 61041212347, 'Melissa', 'Lewis')
# (4, 61041212348, 'Alex', 'Vogel')
# (5, 61041212349, 'Ryan', 'Stone')
Code Listing 11.8. Results with dict
import sqlite3


SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS astronauts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        pesel INTEGER UNIQUE,
        firstname TEXT,
        lastname TEXT)"""
SQL_INSERT = 'INSERT INTO astronauts VALUES (NULL, :pesel, :firstname, :lastname)'
SQL_SELECT = 'SELECT * from astronauts'


astronauts = [
    {'pesel': '61041212345', 'firstname': 'José', 'lastname': 'Jiménez'},
    {'pesel': '61041212346', 'firstname': 'Matt', 'lastname': 'Kowalski'},
    {'pesel': '61041212347', 'firstname': 'Melissa', 'lastname': 'Lewis'},
    {'pesel': '61041212348', 'firstname': 'Alex', 'lastname': 'Vogel'},
    {'pesel': '61041212349', 'firstname': 'Ryan', 'lastname': 'Stone'},
]


with sqlite3.connect(':memory:') as db:
    db.execute(SQL_CREATE_TABLE)
    db.executemany(SQL_INSERT, astronauts)

    db.row_factory = sqlite3.Row

    for row in db.execute(SQL_SELECT):
        print(dict(row))

# {'id': 1, 'pesel': 61041212345, 'firstname': 'José', 'lastname': 'Jiménez'}
# {'id': 2, 'pesel': 61041212346, 'firstname': 'Matt', 'lastname': 'Kowalski'}
# {'id': 3, 'pesel': 61041212347, 'firstname': 'Melissa', 'lastname': 'Lewis'}
# {'id': 4, 'pesel': 61041212348, 'firstname': 'Alex', 'lastname': 'Vogel'}
# {'id': 5, 'pesel': 61041212349, 'firstname': 'Ryan', 'lastname': 'Stone'}

11.3.4. Cursor

Code Listing 11.9. Results with cursor
import sqlite3


SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS astronauts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        pesel INTEGER UNIQUE,
        firstname TEXT,
        lastname TEXT)"""
SQL_INSERT = 'INSERT INTO astronauts VALUES (NULL, :pesel, :firstname, :lastname)'
SQL_SELECT = 'SELECT * from astronauts'


astronauts = [
    {'pesel': '61041212345', 'firstname': 'José', 'lastname': 'Jiménez'},
    {'pesel': '61041212346', 'firstname': 'Matt', 'lastname': 'Kowalski'},
    {'pesel': '61041212347', 'firstname': 'Melissa', 'lastname': 'Lewis'},
    {'pesel': '61041212348', 'firstname': 'Alex', 'lastname': 'Vogel'},
    {'pesel': '61041212349', 'firstname': 'Ryan', 'lastname': 'Stone'},
]


with sqlite3.connect(':memory:') as db:
    db.execute(SQL_CREATE_TABLE)
    db.executemany(SQL_INSERT, astronauts)
    cursor = db.cursor()

    for row in cursor.execute(SQL_SELECT):
        print(row)

# (1, 61041212345, 'José', 'Jiménez')
# (2, 61041212346, 'Matt', 'Kowalski')
# (3, 61041212347, 'Melissa', 'Lewis')
# (4, 61041212348, 'Alex', 'Vogel')
# (5, 61041212349, 'Ryan', 'Stone')

11.4. pyMySQL

$ pip install PyMySQL
Code Listing 11.10. PyMySQL
import pymysql.cursors

# Connect to the database
connection = pymysql.connect(
    host='localhost',
    user='user',
    password='passwd',
    db='db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

try:
    with connection.cursor() as cursor:
        # Create a new record
        sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
        cursor.execute(sql, ('[email protected]', 'very-secret'))

    # connection is not autocommit by default. So you must commit to save
    # your changes.
    connection.commit()

    with connection.cursor() as cursor:
        # Read a single record
        sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
        cursor.execute(sql, ('[email protected]',))
        result = cursor.fetchone()
        print(result)
finally:
    connection.close()

11.5. psycopg2

$ pip install psycopg2
Code Listing 11.11. Psycopg2
import psycopg2

# Connect to an existing database
conn = psycopg2.connect("dbname=test user=postgres")

# Open a cursor to perform database operations
cur = conn.cursor()

# Execute a command: this creates a new table
cur.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")

# Pass data to fill a query placeholders and let Psycopg perform
# the correct conversion (no more SQL injections!)
cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def"))

# Query the database and obtain data as Python objects
cur.execute("SELECT * FROM test;")
cur.fetchone()
# (1, 100, "abc'def")

# Make the changes to the database persistent
conn.commit()

# Close communication with the database
cur.close()
conn.close()

11.6. pymongo

$ pip install pymongo
import datetime
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client.test_database
posts = db.posts

data = {
    "name": "José Jiménez",
    "catchphrase": "My name... José Jiménez",
    "tags": ["astronaut", "nasa", "space"],
    "date": datetime.datetime.utcnow()
}

posts.insert_one(data).inserted_id
# ObjectId('...')
for post in posts.find():
    print(post)

for post in posts.find({"author": "Mike"}):
    print(post)

11.7. ORM

11.7.1. SQLAlchemy

11.7.2. Django ORM

11.8. Database Schema Migration

11.8.1. SQLAlchemy

11.8.2. Django

11.8.3. FlywayDB

11.8.4. LiquidBase

11.9. Data exploration

# Install superset
pip install superset

# Create an admin user (you will be prompted to set username, first and last name before setting a password)
fabmanager create-admin --app superset

# Initialize the database
superset db upgrade

# Load some data to play with
superset load_examples

# Create default roles and permissions
superset init

# Start the web server on port 8088, use -p to bind to another port
superset runserver

# To start a development web server, use the -d switch
# superset runserver -d
superset_config.py:
 
import os

#---------------------------------------------------------
# Superset specific config
#---------------------------------------------------------
ROW_LIMIT = 5000
SUPERSET_WORKERS = 4

SUPERSET_WEBSERVER_PORT = 8088
#---------------------------------------------------------

#---------------------------------------------------------
# Flask App Builder configuration
#---------------------------------------------------------
# Your App secret key
SECRET_KEY = '\2\1secretkey\1\2\e\y\y\h'

# The SQLAlchemy connection string to your database backend
# This connection defines the path to the database that stores your
# superset metadata (slices, connections, tables, dashboards, ...).
# Note that the connection information to connect to the datasources
#you want to explore are managed directly in the web UI
SQLALCHEMY_DATABASE_URI = os.environ.get('HEROKU_POSTGRESQL_GREEN_URL', None)

# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True
# Add endpoints that need to be exempt from CSRF protection
WTF_CSRF_EXEMPT_LIST = []

# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ''
gunicorn \
    -w 10 \
    -k gevent \
    --timeout 120 \
    -b  0.0.0.0:6666 \
    --limit-request-line 0 \
    --limit-request-field_size 0 \
    --statsd-host localhost:8125 \
    superset:app

11.10. Case Study

Code Listing 11.12. Zapisywanie do bazy danych wyników pobranych z sensorów podłączonych po USB
#!/usr/bin/env python3

import json
import datetime
import logging
import sqlite3
import serial


logging.basicConfig(
    format='[%(asctime).19s] %(levelname)s %(message)s',
    level=logging.INFO
)


DATABASE = '/home/pi/database/sensor-data.sqlite3'
DEVICE = '/dev/ttyACM0'
MEASUREMENTS = {
    'air_temperature': 'C',
    'air_humidity': '%',
    'water_temperature': 'C',
    'luminosity': 'lux',
    'power_k1': 'on/off',
    'power_k2': 'on/off',
    'power_k3': 'on/off',
    'power_k4': 'on/off',
}


with sqlite3.connect(DATABASE) as db:
    db.execute("""CREATE TABLE IF NOT EXISTS sensor_data (
        datetime DATETIME PRIMARY KEY,
        sync_datetime DATETIME DEFAULT NULL,
        device VARCHAR(255),
        parameter VARCHAR(255),
        value REAL,
        unit VARCHAR(255));""")
    db.execute('CREATE UNIQUE INDEX IF NOT EXISTS sensor_data_datetime_index ON sensor_data (datetime);')
    db.execute('CREATE INDEX IF NOT EXISTS sensor_data_sync_datetime_index ON sensor_data (sync_datetime);')


def save_to_sqlite3(data):
    for parameter, value in data.items():
        unit = MEASUREMENTS.get(parameter, None)

        with sqlite3.connect(DATABASE) as db:
            db.execute('INSERT INTO sensor_data VALUES (:datetime, NULL, :device, :parameter, :value, :unit)', {
                'datetime': datetime.datetime.now(datetime.timezone.utc),
                'parameter': parameter,
                'value': float(value),
                'unit': unit,
                'device': 'hydroponics',
            })


if __name__ == '__main__':
    with serial.Serial(port=DEVICE, baudrate=115200) as ser:
        while True:
            line = ser.readline()
            try:
                data = json.loads(line)
                save_to_sqlite3(data)
                logging.info(data)
            except json.decoder.JSONDecodeError:
                logging.error(line)

11.11. Assignments

11.11.1. Iris Database

  1. Bazę pomiarów Irysów przekonwertuj na tabelę w sqlite3

  2. Nazwy poszczególnych kolumn:

    • id - int
    • species - str
    • datetime - datetime
    • sepal_length - float
    • sepal_width - float
    • petal_length - float
    • petal_width - float
  3. Do połączenia wykorzystaj context manager (with)

  4. Dane wrzuć do bazy za pomocą .executemany() podając dict

  5. Do bazy danych zapisz species jako nazwę gatunku (str), a nie jego id (int) (wersja z gwiazdką: nie korzystaj z if-ów do tego)

  6. Dodaj kolumnę datetime z datą i czasem dodania (UTC)

  7. Załóż index na datetime

  8. Wyniki wypisz z bazy danych (SELECT * FROM iris ORDER BY datetime DESC)

  9. Zwracaj dane jako sqlite3.Row

About:
  • Filename: db_iris.py
  • Lines of code to write: 30 lines
  • Estimated time of completion: 30 min
The whys and wherefores:
 
  • Parsowanie plików csv
  • Wykorzystywanie bazy sqlite3
  • Tworzenie bazy danych
  • Zakładanie indeksów na bazie danych
  • Wrzucanie danych do bazy
  • Wyciąganie danych z bazy
  • Konwersja typów
  • Korzystanie z datetime

11.11.2. Tworzenie bazy danych i proste zapytania

  1. Wykorzystaj kod z listingu Code Listing 11.13. oraz Code Listing 11.14.
  2. Nie wykorzystuj relacji, a dane adresowe zapisz zserializowane i rozdzielone średnikami ;
  3. Wykorzystaj cursor oraz połączenia jako context manager (with)
  4. Dane powinny być zwracane w postaci listy dict
  5. Do wpisywania danych wykorzystaj konstrukcję execute wykorzystując dict jako argument
About:
  • Filename: db_addressbook.py
  • Lines of code to write: 15 lines
  • Estimated time of completion: 20 min
Zadanie z gwiazdką:
 
  • Dodaj obsługę wielu adresów
  • Dodaj obsługę relacji w aplikacji
Code Listing 11.13. Address Book SQL queries
CREATE TABLE IF NOT EXISTS contact (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    created DATETIME,
    modified DATETIME,
    first_name TEXT,
    last_name TEXT,
    date_of_birth DATE
);

CREATE UNIQUE INDEX IF NOT EXISTS last_name_index ON contact (last_name);
CREATE INDEX IF NOT EXISTS modified_index ON contact (modified);

CREATE TABLE IF NOT EXISTS address (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    contact_id INTEGER,
    street TEXT,
    city TEXT,
    state TEXT,
    code INT,
    country TEXT
);

INSERT INTO contact VALUES (
    NULL,
    :created,
    :modified,
    :first_name,
    :last_name,
    :date_of_birth
);

INSERT INTO address VALUES (
    NULL,
    :contact_id
    :street,
    :city,
    :state,
    :code,
    :country
);

UPDATE contact SET
    first_name=:firstname,
    last_name=:lastname,
    modified=:modified
WHERE id=:id;

SELECT * FROM contact;
Code Listing 11.14. Address Book
class Contact:
    def __init__(self, first_name, last_name, addresses=[]):
        self.first_name = first_name
        self.last_name = last_name
        self.addresses = addresses


class Address:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)


addressbook = [
    Contact(first_name='Matt', last_name='Kowalski', addresses=[
        Address(street='2101 E NASA Pkwy', city='Houston', state='Texas', code='77058', country='USA'),
        Address(street=None, city='Kennedy Space Center', code='32899', country='USA'),
        Address(street='4800 Oak Grove Dr', city='Pasadena', code='91109', country='USA'),
        Address(street='2825 E Ave P', city='Palmdale', state='California', code='93550', country='USA', data_urodzenia=None),
    ]),
    Contact(first_name='José', last_name='Jiménez'),
    Contact(first_name='Иван', last_name='Иванович', addresses=[]),
]