Pythonでデータベース接続を実装する実務パターン|SQLiteからPostgreSQLまで

Python

Pythonでデータベース接続を実装する実務パターン|SQLiteからPostgreSQLまで

Pythonを使った業務システム開発では、データベース接続は必須スキルです。しかし教科書的なサンプルコードだけでは、実務で遭遇する課題に対応できません。本記事では、実際のプロジェクトで使えるデータベース接続パターンを、具体的なユースケースとともに解説します。

Pythonのデータベース接続について

Pythonでデータベースに接続する際、開発者が選択できる選肢は複数あります。SQLiteのような軽量なファイルベースのデータベースから、PostgreSQLやMySQL、Oracle Database といったエンタープライズレベルのデータベースまで、シーンに応じて使い分けることが重要です。

基本的な流れは、ドライバのインポート → 接続の確立 → クエリ実行 → 結果の処理 → 接続のクローズという一連のステップになります。ただし実務では、エラーハンドリング、コネクションプール、トランザクション管理など、より複雑な要件に対応する必要があります。

実務で直面するユースケース

1. Webアプリケーションでの複数リクエスト対応

Flask や Django を使った Web アプリケーションでは、複数のリクエストが同時に到着します。毎回新しい接続を確立していると、パフォーマンスが低下します。実務ではコネクションプールを使い、複数の接続を使い回すことが必須です。

2. バッチ処理での大量データ処理

夜間バッチで数百万件のデータをインポート・エクスポートする場合、メモリ効率とトランザクション管理が重要になります。すべてのデータをメモリに読み込むのではなく、チャンク単位での処理が必要です。

3. マイグレーション時の複数DB対応

古いシステムから新しいシステムへのデータ移行では、複数のデータベースに同時接続する必要があります。接続設定の管理が煩雑になりやすいため、設定ファイルの外出しと接続情報の一元管理が求められます。

実装コード:基本パターンから応用まで

パターン1:SQLiteを使った基本的な接続

まず最もシンプルなパターンです。開発環境やテスト環境で使うことが多いSQLiteの実装例です。

import sqlite3
from contextlib import contextmanager
from typing import Optional, List, Dict, Any

class SQLiteDatabase:
    def __init__(self, db_path: str):
        self.db_path = db_path
    
    @contextmanager
    def get_connection(self):
        \"\"\"コンテキストマネージャで接続を管理\"\"\"
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row  # 辞書形式で結果を取得
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    
    def select_users(self, user_id: Optional[int] = None) -> List[Dict[str, Any]]:
        \"\"\"ユーザー情報を取得\"\"\"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            if user_id:
                cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
            else:
                cursor.execute('SELECT * FROM users')
            return [dict(row) for row in cursor.fetchall()]
    
    def insert_user(self, name: str, email: str) -> int:
        \"\"\"ユーザーを新規登録\"\"\"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                'INSERT INTO users (name, email) VALUES (?, ?)',
                (name, email)
            )
            return cursor.lastrowid

# 使用例
db = SQLiteDatabase('app.db')
users = db.select_users()
print(users)

パターン2:PostgreSQLでのコネクションプール実装

本番環境ではPostgreSQLが使われることが多いです。psycopg2ライブラリとSimpleConnectionPoolを組み合わせて、複数接続を管理します。

import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
from typing import Optional, List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class PostgreSQLDatabase:
    def __init__(
        self,
        host: str,
        database: str,
        user: str,
        password: str,
        min_conn: int = 2,
        max_conn: int = 10
    ):
        \"\"\"コネクションプールを初期化\"\"\"
        try:
            self.pool = pool.SimpleConnectionPool(
                min_conn,
                max_conn,
                host=host,
                database=database,
                user=user,
                password=password,
                connect_timeout=5
            )
        except Exception as e:
            logger.error(f\"Failed to create connection pool: {e}\")
            raise
    
    @contextmanager
    def get_connection(self):
        \"\"\"プールから接続を取得\"\"\"
        conn = None
        try:
            conn = self.pool.getconn()
            yield conn
            conn.commit()
        except Exception as e:
            if conn:
                conn.rollback()
            logger.error(f\"Database error: {e}\")
            raise
        finally:
            if conn:
                self.pool.putconn(conn)
    
    def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
        \"\"\"クエリを実行して結果を取得\"\"\"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(query, params)
                # カラム名を取得
                columns = [desc[0] for desc in cursor.description]
                return [dict(zip(columns, row)) for row in cursor.fetchall()]
            except Exception as e:
                logger.error(f\"Query execution failed: {e}\")
                raise
    
    def execute_update(self, query: str, params: tuple = ()) -> int:
        \"\"\"INSERT/UPDATE/DELETEを実行\"\"\"
        with self.get_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(query, params)
                return cursor.rowcount
            except Exception as e:
                logger.error(f\"Update execution failed: {e}\")
                raise
    
    def close_all(self):
        \"\"\"すべての接続をクローズ\"\"\"
        self.pool.closeall()

# 使用例
db = PostgreSQLDatabase(
    host='localhost',
    database='myapp',
    user='postgres',
    password='password',
    max_conn=20
)

# SELECT実行
users = db.execute_query(
    'SELECT * FROM users WHERE created_at > %s',
    ('2024-01-01',)
)

# INSERT実行
affected_rows = db.execute_update(
    'INSERT INTO users (name, email) VALUES (%s, %s)',
    ('John Doe', 'john@example.com')
)

db.close_all()

パターン3:複数データベースへの同時接続(マイグレーション)

レガシーシステムから新しいシステムへのデータ移行では、複数DBへの同時接続が必要です。

from dataclasses import dataclass
from typing import Dict
import psycopg2
from contextlib import contextmanager

@dataclass
class DatabaseConfig:
    host: str
    database: str
    user: str
    password: str
    port: int = 5432

class MultiDatabaseManager:
    def __init__(self, configs: Dict[str, DatabaseConfig]):
        self.configs = configs
        self.connections: Dict[str, psycopg2.extensions.connection] = {}
    
    def connect(self, db_name: str):
        \"\"\"特定のデータベースに接続\"\"\"
        if db_name not in self.configs:
            raise ValueError(f\"Database '{db_name}' not configured\")
        
        config = self.configs[db_name]
        try:
            conn = psycopg2.connect(
                host=config.host,
                database=config.database,
                user=config.user,
                password=config.password,
                port=config.port
            )
            self.connections[db_name] = conn
        except Exception as e:
            raise Exception(f\"Failed to connect to {db_name}: {e}\")
    
    def get_connection(self, db_name: str):
        \"\"\"接続を取得(未接続の場合は接続)\"\"\"
        if db_name not in self.connections:
            self.connect(db_name)
        return self.connections[db_name]
    
    def execute_and_transfer(
        self,
        source_db: str,
        target_db: str,
        query: str
    ) -> int:
        \"\"\"ソースDBから取得したデータをターゲットDBに転送\"\"\"
        source_conn = self.get_connection(source_db)
        target_conn = self.get_connection(target_db)
        
        try:
            source_cursor = source_conn.cursor()
            target_cursor = target_conn.cursor()
            
            # ソースDBからデータを取得
            source_cursor.execute(query)
            rows = source_cursor.fetchall()
            
            if not rows:
                return 0
            
            # ターゲットDBに挿入
            insert_count = 0
            for row in rows:
                try:
                    # 実際のカラム構成に合わせてクエリを調整
                    target_cursor.execute(
                        'INSERT INTO users (id, name, email, created_at) VALUES (%s, %s, %s, %s)',
                        row
                    )
                    insert_count += 1
                except psycopg2.IntegrityError:
                    # 重複キーは無視
                    target_conn.rollback()
                    continue
            
            target_conn.commit()
            return insert_count
        
        except Exception as e:
            target_conn.rollback()
            raise Exception(f\"Transfer failed: {e}\")
    
    def close_all(self):
        \"\"\"すべての接続をクローズ\"\"\"
        for conn in self.connections.values():
            conn.close()

# 使用例
configs = {
    'legacy': DatabaseConfig(
        host='legacy-db.example.com',
        database='old_system',
        user='legacy_user',
        password='legacy_pass'
    ),
    'new': DatabaseConfig(
        host='new-db.example.com',
        database='new_system',
        user='new_user',
        password='new_pass'
    )
}

manager = MultiDatabaseManager(configs)
transferred = manager.execute_and_transfer(
    'legacy',
    'new',
    'SELECT id, name, email, created_at FROM users LIMIT 1000'
)
print(f\"Transferred {transferred} records\")
manager.close_all()

パターン4:バッチ処理での大量データ挿入

数百万件のデータを効率的に挿入する場合、バッチ挿入とメモリ管理が重要です。

import psycopg2
from psycopg2.extras import execute_batch
from typing import List, Tuple, Iterator
import gc

class BulkDataImporter:
    def __init__(self, conn_config: dict):
        self.conn_config = conn_config
    
    def batch_insert(
        self,
        table: str,
        columns: List[str],
        data: Iterator[Tuple],
        batch_size: int = 5000
    ) -> int:
        \"\"\"バッチモードでデータを挿入\"\"\"
        conn = psycopg2.connect(**self.conn_config)
        cursor = conn.cursor()
        
        total_inserted = 0
        batch_buffer = []
        
        try:
            for row in data:
                batch_buffer.append(row)
                
                if len(batch_buffer) >= batch_size:
                    # バッチ実行
                    placeholders = ','.join(['%s'] * len(columns))
                    query = f\"INSERT INTO {table} ({','.join(columns)}) VALUES ({placeholders})\"
                    
                    execute_batch(cursor, query, batch_buffer, page_size=1000)
                    conn.commit()
                    
                    total_inserted += len(batch_buffer)
                    batch_buffer = []
                    
                    # メモリ解放
                    gc.collect()
            
            # 残りのデータを処理
            if batch_buffer:
                placeholders = ','.join(['%s'] * len(columns))
                query = f\"INSERT INTO {table} ({','.join(columns)}) VALUES ({placeholders})\"
                execute_batch(cursor, query, batch_buffer, page_size=1000)
                conn.commit()
                total_inserted += len(batch_buffer)
        
        except Exception as e:
            conn.rollback()
            raise Exception(f\"Batch insert failed: {e}\")
        finally:
            cursor.close()
            conn.close()
        
        return total_inserted
    
    def stream_insert(self, table: str, columns: List[str], csv_file_path: str) -> int:
        \"\"\"CSVファイルから大量データをインポート\"\"\"
        conn = psycopg2.connect(**self.conn_config)
        cursor = conn.cursor()
        
        try:
            with open(csv_file_path, 'r', encoding='utf-8') as f:
                # COPY コマンドで高速インポート
                cursor.copy_from(
                    f,
                    table,
                    columns=columns,
                    sep=',',
                    null=''
                )
            conn.commit()
            return cursor.rowcount
        
        except Exception as e:
            conn.rollback()
            raise Exception(f\"Stream insert failed: {e}\")
        finally:
            cursor.close()
            conn.close()

# 使用例:大量のテストデータを生成して挿入
def data_generator(count: int) -> Iterator[Tuple]:
    \"\"\"テストデータジェネレーター\"\"\"
    for i in range(count):
        yield (f\"User{i}\", f\"user{i}@example.com\", i * 100)

importer = BulkDataImporter({
    'host': 'localhost',
    'database': 'myapp',
    'user': 'postgres',
    'password': 'password'
})

# 100万件を5000件ずつバッチで挿入
inserted = importer.batch_insert(
    'users',
    ['name', 'email', 'score'],
    data_generator(1000000),
    batch_size=5000
)
print(f\"Total inserted: {inserted}\")

よくある応用パターン

1. ORMの活用(SQLAlchemy)

大規模なアプリケーションでは、生のSQLではなくORMを使うことで、コード量を削減できます。ただし、大量データ処理時はORMのオーバーヘッドを考慮する必要があります。

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from typing import Optional

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(100))

class UserRepository:
    def __init__(self, database_url: str, pool_size: int = 10):
        self.engine = create_engine(
            database_url,
            pool_size=pool_size,
            max_overflow=20,
            pool_pre_ping=True  # 接続の有効性を確認
        )
        self.SessionLocal = sessionmaker(bind=self.engine)
    
    def get_user(self, user_id: int) -> Optional[User]:
        session: Session = self.SessionLocal()
        try:
            return session.query(User).filter(User.id == user_id).first()
        finally:
            session.close()
    
    def create_user(self, name: str, email: str) -> User:
        session: Session = self.SessionLocal()
        try:
            user = User(name=name, email=email)
            session.add(user)
            session.commit()
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise
        finally:
            session.close()

# 使用例
repo = UserRepository('postgresql://user:password@localhost/myapp')
user = repo.create_user('Alice', 'alice@example.com')

2. リトライロジックの実装

ネットワーク障害やデータベース再起動時に一時的な接続エラーが発生します。リトライロジックで対応します。

import time
import psycopg2
from functools import wraps
from typing import Callable, Any

def retry_on_db_error(max_retries: int = 3, backoff_factor: float = 1.0):
    \"\"\"データベースエラー時にリトライするデコレーター\"\"\"
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (psycopg2.OperationalError, psycopg2.DatabaseError) as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        wait_time = backoff_factor * (2 ** attempt)
                        print(f\"Attempt {attempt + 1} failed. Retrying in {wait_time}s...\")
                        time.sleep(wait_time)
            
            raise last_exception
        return wrapper
    return decorator

@retry_on_db_error(max_retries=3, backoff_factor=0.5)
def fetch_user_data(db, user_id: int):
    \"\"\"リトライ機能付きのデータ取得\"\"\"
    return db.execute_query(
        'SELECT * FROM users WHERE id = %s',
        (user_id,)
    )

3. トランザクション管理(ACID特性の保証)

複数のテーブルに関連するデータを扱う場合、トランザクション管理が不可欠です。

class TransactionManager:
    def __init__(self, db_config: dict):
        self.db_config = db_config
    
    def transfer_funds(self, from_user_id: int, to_user_id: int, amount: float) -> bool:
        \"\"\"2つのアカウント間の資金移動(トランザクション)\"\"\"
        conn = psycopg2.connect(**self.db_config)
        cursor = conn.cursor()
        
        try:
            # トランザクション開始
            conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
            
            # 送信元のバランスを確認・更新
            cursor.execute(
                'SELECT balance FROM accounts WHERE user_id = %s FOR UPDATE',
                (from_user_id,)
            )
            result = cursor.fetchone()
            
            if not result or result[0] < amount:
                conn.rollback()
                return False
            
            # 送信元から減額
            cursor.execute(
                'UPDATE accounts SET balance = balance - %s WHERE user_id = %s',
                (amount, from_user_id)
            )
            
            # 受信側に加算
            cursor.execute(
                'UPDATE accounts SET balance = balance + %s WHERE user_id = %s',
                (amount, to_user_id)
            )
            
            # トランザクションログに記録
            cursor.execute(
                'INSERT INTO transaction_logs (from_user_id, to_user_id, amount, status) VALUES (%s, %s, %s, %s)',
                (from_user_id, to_user_id, amount, 'completed')
            )
            
            conn.commit()
            return True
        
        except Exception as e:
            conn.rollback()
            print(f\"Transfer failed: {e}\")
            return False
        finally:
            cursor.close()
            conn.close()

実務での注意点

セキュリティ:SQLインジェクション対策

ユーザー入力を直接SQLに埋め込むのは絶対に禁止です。必ずプレースホルダーを使用します。

import os

# ❌ 悪い例:SQLインジェクション脆弱性あり
user_input = \"'; DROP TABLE users; --\"
query = f\"SELECT * FROM users WHERE name = '{user_input}'\"

# ✅ 良い例:プレースホルダーを使用
cursor.execute('SELECT * FROM users WHERE name = %s', (user_input,))

パフォーマンス:インデックスとクエリ最適化

テーブルサイズが大きくなると、クエリパフォーマンスが急速に低下します。適切なインデックス設定とクエリの検証が重要です。

class QueryAnalyzer:
    def __init__(self, conn_config: dict):
        self.conn = psycopg2.connect(**conn_config)
    
    def explain_query(self, query: str, params: tuple = ()):
        \"\"\"クエリの実行計画を確認\"\"\"
        cursor = self.conn.cursor()
        try:
            explain_query = f\"EXPLAIN ANALYZE {query}\"
            cursor.execute(explain_query, params)
            return cursor.fetchall()
        finally:
            cursor.close()

# 使用例
analyzer = QueryAnalyzer(db_config)
plan = analyzer.explain_query(
    'SELECT * FROM users WHERE email = %s',
    ('user@example.com',)
)
# 結果を確認してインデックスの必要性を判断

接続管理:タイムアウトと接続数の制限

不適切な接続管理は、データベースのリソース枯渇につながります。接続タイムアウトと最大接続数の設定が必須です。

config = {
    'host': 'localhost',
    'database': 'myapp',
    'user': 'postgres',
    'password': 'password',
    'connect_timeout': 5,  # 5秒でタイムアウト
    'options': '-c statement_timeout=30000'  # クエリは30秒でタイムアウト
}

# コネクションプールの設定
pool = psycopg2.pool.SimpleConnectionPool(
    1, 20,  # 最小1、最大20接続
    **config
)
# max_connectionsはデータベース側の設定も確認

環境変数での認証情報管理

パスワードをコードに直書きしてはいけません。環境変数や設定ファイルで管理します。

import os
from dotenv import load_dotenv

load_dotenv()  # .envファイルから環境変数を読み込み

config = {
    'host': os.getenv('DB_HOST', 'localhost'),
    'database': os.getenv('DB_NAME', 'myapp'),
    'user': os.getenv('DB_USER', 'postgres'),
    'password': os.getenv('DB_PASSWORD'),  # 絶対に環境変数から取得
    'port': int(os.getenv('DB_PORT', '5432'))
}

if not config['password']:
    raise ValueError(\"DB_PASSWORD environment variable is not set\")

まとめ

Pythonでのデータベース接続は、基本的な実装は簡単ですが、実務レベルの要件に対応するには多くの考慮が必要です。本記事で紹介した重要ポイントは以下の通りです。

1. コネクションプール活用:毎回接続を確立するのではなく、プールして使い回すことで大幅なパフォーマンス向上が実現できます。

2. エラーハンドリング:トランザクションのロールバック、リトライロジック、ログ記録など、堅牢性を高める対応が必須です。

3. セキュリティ対策:SQLインジェクション対策のためにプレースホルダーを使用し、パスワードは環境変数で管理します。

4. パフォーマンス最適化:インデックス設計、バッチ処理、EXPLAIN ANALYZEでのクエリ検証により、大規模データ処理でも効率的に動作させることができます。

5. 複数DB対応:マイグレーションやレガシー連携では、複数データベースへの同時接続管理が必要になることもあります。

本記事で紹介したパターンを参考に、プロジェクトの要件に合わせてカスタマイズしながら実装することで、実務で即座に使えるコードが完成します。特にコネクションプール、トランザクション管理、エラーハンドリングの3点を確実に実装することが、品質の高いシステムを構築する近道となるでしょう。

タイトルとURLをコピーしました