Python loggingの業務実装パターン|本番環境対応の実践ガイド

Python

Python loggingの業務実装パターン|本番環境対応の実践ガイド

こんにちは。今回はPythonのloggingモジュールを業務で実際に使うパターンについて、実践的な視点で解説していきます。

多くの開発者がprint文でデバッグしていたり、loggingを導入しても設定が甘いままになっていたりします。本番環境では適切なログ戦略が非常に重要です。この記事では、実務で本当に役立つloggingの設定パターンを紹介します。

1. Pythonのloggingについて|簡易解説

Pythonのloggingモジュールは、アプリケーションのイベントを記録するための標準ライブラリです。print文と異なり、ログレベルの管理、複数の出力先への同時出力、タイムスタンプやコンテキスト情報の自動追加などが可能です。

基本的な構成要素は以下の通りです:

  • Logger:ログを記録するオブジェクト
  • Handler:ログの出力先を制御(ファイル、コンソールなど)
  • Formatter:ログのフォーマットを定義
  • Filter:ログレコードをフィルタリング

ログレベルは以下の5段階があります:

  • DEBUG(10):詳細なデバッグ情報
  • INFO(20):一般的な情報
  • WARNING(30):警告メッセージ
  • ERROR(40):エラー
  • CRITICAL(50):重大なエラー

2. 業務でのユースケース

実務でloggingが活躍するシーンを整理してみます。

ユースケース1:Webアプリケーション(Flask/Django)

APIのリクエスト・レスポンス、データベースクエリ、外部API連携などの追跡が必要です。

ユースケース2:バッチ処理

定期実行のバッチジョブ、大量データ処理、スケジューラとの連携など、実行中のプロセスが見えにくいため、詳細なログが不可欠です。

ユースケース3:本番環境での運用

エラー発生時の原因追跡、パフォーマンス監視、セキュリティイベントの記録など。

ユースケース4:マイクロサービス

分散システムにおいて、ログ集約ツール(ELK、Splunk、CloudWatch)との統合が必要です。

3. 実装コード|業務で実際に使うパターン

パターン1:基本的なロギング設定(開発環境用)

import logging
import sys

# シンプルな設定
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

def main():
    logger.debug('デバッグ情報です')
    logger.info('処理が正常に進行しています')
    logger.warning('注意が必要な状態です')
    logger.error('エラーが発生しました')

if __name__ == '__main__':
    main()

ただし、業務ではbasicConfigは避けるべきです。理由は、モジュール間の設定競合やテストの複雑さにつながるからです。

パターン2:本番環境対応の設定(推奨)

実務で最も使われるのは、環境に応じた設定ファイルベースのアプローチです。

import logging
import logging.config
import os
import json
from pathlib import Path

def setup_logging(config_path: str = None, env: str = None):
    \"\"\"
    ログ設定を初期化する関数
    環境変数またはconfigファイルから設定を読み込む
    \"\"\"
    if env is None:
        env = os.getenv('APP_ENV', 'development')
    
    if config_path is None:
        config_path = os.path.join(
            Path(__file__).parent,
            f'logging_config_{env}.json'
        )
    
    if os.path.exists(config_path):
        with open(config_path, 'r', encoding='utf-8') as f:
            config = json.load(f)
        logging.config.dictConfig(config)
    else:
        # フォールバック設定
        logging.basicConfig(level=logging.INFO)
    
    return logging.getLogger(__name__)

logger = setup_logging()

if __name__ == '__main__':
    logger.info('ログシステムが初期化されました')

パターン3:JSON形式の設定ファイル(logging_config_production.json)

{
  \"version\": 1,
  \"disable_existing_loggers\": false,
  \"formatters\": {
    \"standard\": {
      \"format\": \"%(asctime)s [%(levelname)s] %(name)s: %(message)s\"
    },
    \"json\": {
      \"format\": \"{\\\"timestamp\\\": \\\"%(asctime)s\\\", \\\"level\\\": \\\"%(levelname)s\\\", \\\"logger\\\": \\\"%(name)s\\\", \\\"message\\\": \\\"%(message)s\\\"}\",
      \"datefmt\": \"%Y-%m-%d %H:%M:%S\"
    }
  },
  \"handlers\": {
    \"console\": {
      \"class\": \"logging.StreamHandler\",
      \"level\": \"INFO\",
      \"formatter\": \"standard\",
      \"stream\": \"ext://sys.stdout\"
    },
    \"file\": {
      \"class\": \"logging.handlers.RotatingFileHandler\",
      \"level\": \"DEBUG\",
      \"formatter\": \"json\",
      \"filename\": \"logs/app.log\",
      \"maxBytes\": 10485760,
      \"backupCount\": 5,
      \"encoding\": \"utf-8\"
    },
    \"error_file\": {
      \"class\": \"logging.handlers.RotatingFileHandler\",
      \"level\": \"ERROR\",
      \"formatter\": \"json\",
      \"filename\": \"logs/error.log\",
      \"maxBytes\": 10485760,
      \"backupCount\": 10,
      \"encoding\": \"utf-8\"
    }
  },
  \"loggers\": {
    \"\": {
      \"level\": \"DEBUG\",
      \"handlers\": [\"console\", \"file\", \"error_file\"],
      \"propagate\": false
    },
    \"urllib3\": {
      \"level\": \"WARNING\",
      \"handlers\": [\"console\"],
      \"propagate\": false
    }
  }
}

パターン4:本番環境用のロギングマネージャークラス

複数のサービス、外部API呼び出し、データベース操作などを扱う場合、構造化ログが重要です。

import logging
import json
import traceback
import time
from typing import Any, Dict, Optional
from datetime import datetime

class StructuredLogger:
    \"\"\"業務用の構造化ロガー\"\"\"
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
    
    def log_event(
        self,
        level: str,
        message: str,
        **context: Any
    ) -> None:
        \"\"\"
        構造化ログを出力
        
        Args:
            level: ログレベル(debug, info, warning, error, critical)
            message: ログメッセージ
            **context: 追加のコンテキスト情報
        \"\"\"
        log_record = {
            'timestamp': datetime.utcnow().isoformat(),
            'message': message,
            'level': level.upper(),
            **context
        }
        
        log_method = getattr(self.logger, level.lower())
        log_method(json.dumps(log_record, ensure_ascii=False))
    
    def log_api_call(
        self,
        service_name: str,
        endpoint: str,
        method: str,
        status_code: int,
        duration_ms: float,
        **extra: Any
    ) -> None:
        \"\"\"API呼び出しをログに記録\"\"\"
        self.log_event(
            'info',
            f'API call to {service_name}',
            service=service_name,
            endpoint=endpoint,
            method=method,
            status_code=status_code,
            duration_ms=round(duration_ms, 2),
            **extra
        )
    
    def log_exception(
        self,
        message: str,
        exception: Exception,
        **context: Any
    ) -> None:
        \"\"\"例外をログに記録\"\"\"
        self.log_event(
            'error',
            message,
            exception_type=type(exception).__name__,
            exception_message=str(exception),
            traceback=traceback.format_exc(),
            **context
        )
    
    def log_database_query(
        self,
        query: str,
        duration_ms: float,
        rows_affected: int = 0,
        **extra: Any
    ) -> None:
        \"\"\"データベースクエリをログに記録\"\"\"
        self.log_event(
            'debug',
            'Database query executed',
            query=query,
            duration_ms=round(duration_ms, 2),
            rows_affected=rows_affected,
            **extra
        )

# 使用例
if __name__ == '__main__':
    # ロガーの設定(簡略版)
    logging.basicConfig(level=logging.DEBUG)
    
    logger = StructuredLogger(__name__)
    
    # API呼び出しのログ
    logger.log_api_call(
        service_name='payment_service',
        endpoint='/charge',
        method='POST',
        status_code=200,
        duration_ms=245.5,
        user_id='user_12345',
        amount=10000
    )
    
    # 例外のログ
    try:
        result = 1 / 0
    except Exception as e:
        logger.log_exception(
            'Failed to calculate',
            e,
            operation='divide'
        )
    
    # データベースクエリのログ
    logger.log_database_query(
        query='SELECT * FROM users WHERE id = %s',
        duration_ms=15.3,
        rows_affected=1,
        table='users'
    )

パターン5:Flask/Djangoでの実装例

Webアプリケーション内でリクエストコンテキストを含むログを実装する例です。

import logging
import json
import time
from flask import Flask, request, g
from functools import wraps
from datetime import datetime

app = Flask(__name__)

# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RequestIdFilter(logging.Filter):
    \"\"\"リクエストIDをログに含めるフィルター\"\"\"
    def filter(self, record):
        record.request_id = g.get('request_id', 'N/A')
        return True

# フィルターをハンドラーに追加
for handler in logger.handlers:
    handler.addFilter(RequestIdFilter())

@app.before_request
def before_request():
    \"\"\"リクエスト前の処理\"\"\"
    g.request_id = request.headers.get('X-Request-ID', 'N/A')
    g.start_time = time.time()
    
    logger.info(
        json.dumps({
            'event': 'request_start',
            'request_id': g.request_id,
            'method': request.method,
            'path': request.path,
            'remote_addr': request.remote_addr,
            'timestamp': datetime.utcnow().isoformat()
        })
    )

@app.after_request
def after_request(response):
    \"\"\"リクエスト後の処理\"\"\"
    duration_ms = (time.time() - g.start_time) * 1000
    
    logger.info(
        json.dumps({
            'event': 'request_end',
            'request_id': g.request_id,
            'method': request.method,
            'path': request.path,
            'status_code': response.status_code,
            'duration_ms': round(duration_ms, 2),
            'timestamp': datetime.utcnow().isoformat()
        })
    )
    
    return response

@app.route('/api/users/', methods=['GET'])
def get_user(user_id: int):
    \"\"\"ユーザー情報を取得するAPI\"\"\"
    try:
        logger.debug(
            json.dumps({
                'event': 'database_query',
                'request_id': g.request_id,
                'query': 'SELECT * FROM users WHERE id = %s',
                'user_id': user_id
            })
        )
        
        # データベース処理(疑似コード)
        user_data = {'id': user_id, 'name': 'John Doe'}
        
        logger.info(
            json.dumps({
                'event': 'user_fetched',
                'request_id': g.request_id,
                'user_id': user_id
            })
        )
        
        return user_data, 200
    
    except Exception as e:
        logger.error(
            json.dumps({
                'event': 'error',
                'request_id': g.request_id,
                'error_type': type(e).__name__,
                'error_message': str(e),
                'user_id': user_id
            })
        )
        return {'error': 'Internal Server Error'}, 500

if __name__ == '__main__':
    app.run(debug=False)

4. よくある応用パターン

応用パターン1:複数出力先への並列ログ出力

本番環境では、コンソール出力、ファイル保存、ログサービスへの送信を同時に行う必要があります。

import logging
import logging.handlers
import socket
import json
from typing import Optional

class SyslogHandler(logging.handlers.SyslogHandler):
    \"\"\"JSON形式でSyslogに送信するハンドラー\"\"\"
    
    def emit(self, record):
        try:
            msg = self.format(record)
            if isinstance(msg, str) and msg.startswith('{'):
                # JSON形式ならそのまま送信
                self.sock.sendto(msg.encode(), self.address)
            else:
                super().emit(record)
        except Exception:
            self.handleError(record)

def setup_advanced_logging(
    log_file: str = 'app.log',
    syslog_host: Optional[str] = None,
    syslog_port: int = 514
):
    \"\"\"複数出力先対応のログ設定\"\"\"
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    
    # フォーマッター定義
    json_formatter = logging.Formatter(
        '{\"timestamp\": \"%(asctime)s\", \"level\": \"%(levelname)s\", \"name\": \"%(name)s\", \"message\": \"%(message)s\"}'
    )
    standard_formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # コンソール出力
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(standard_formatter)
    logger.addHandler(console_handler)
    
    # ファイル出力(ローテーション対応)
    file_handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(json_formatter)
    logger.addHandler(file_handler)
    
    # Syslog出力
    if syslog_host:
        try:
            syslog_handler = SyslogHandler(
                address=(syslog_host, syslog_port)
            )
            syslog_handler.setLevel(logging.WARNING)
            syslog_handler.setFormatter(json_formatter)
            logger.addHandler(syslog_handler)
        except Exception as e:
            logger.warning(f'Syslog接続失敗: {e}')
    
    return logger

# 使用例
if __name__ == '__main__':
    logger = setup_advanced_logging(
        log_file='logs/app.log',
        syslog_host='localhost'
    )
    
    logger.info('システムが起動しました')
    logger.warning('リソースの使用率が高いです')
    logger.error('データベース接続エラー')

応用パターン2:ログレベルの動的変更

本番環境で問題が発生した際、再起動なしにログレベルを変更したい場合があります。

import logging
import os
from logging.config import listen, DEFAULT_LOGGING_PORT

def configure_log_listener():
    \"\"\"ログレベル変更をリッスンするサーバーを起動\"\"\"
    # 環境変数で設定ポートを指定可能
    port = int(os.getenv('LOGGING_CONFIG_PORT', DEFAULT_LOGGING_PORT))
    
    # 別プロセスでリスナーを起動
    import threading
    import socket
    
    def run_listener():
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind(('localhost', port))
            sock.listen(1)
            print(f'ログリスナーがポート{port}で起動しました')
            
            while True:
                conn, addr = sock.accept()
                print(f'ログ設定リクエスト受信: {addr}')
                listen(sockobj=conn)
        except Exception as e:
            print(f'ログリスナーエラー: {e}')
    
    listener_thread = threading.Thread(target=run_listener, daemon=True)
    listener_thread.start()

# 使用例
if __name__ == '__main__':
    import logging.config
    
    # 基本設定
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # リスナー起動
    configure_log_listener()
    
    logger.info('アプリケーション起動')
    logger.debug('これはDEBUGレベルです(表示されません)')
    
    # 動的にログレベルを変更
    # logger.setLevel(logging.DEBUG)
    # これで以後DEBUGレベルも表示される

応用パターン3:パフォーマンス計測の統合ログ

処理時間、メモリ使用量などの計測情報をログに含める実装です。

import logging
import time
import psutil
import json
from functools import wraps
from datetime import datetime
from contextlib import contextmanager

logger = logging.getLogger(__name__)

@contextmanager
def log_execution_time(operation_name: str, **extra_context):
    \"\"\"実行時間をログに記録するコンテキストマネージャー\"\"\"
    start_time = time.time()
    start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
    
    try:
        yield
    finally:
        duration_ms = (time.time() - start_time) * 1000
        end_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        memory_delta = end_memory - start_memory
        
        log_data = {
            'operation': operation_name,
            'duration_ms': round(duration_ms, 2),
            'memory_mb': round(end_memory, 2),
            'memory_delta_mb': round(memory_delta, 2),
            'timestamp': datetime.utcnow().isoformat(),
            **extra_context
        }
        
        # 時間がかかった処理は警告をログに
        if duration_ms > 1000:
            logger.warning(json.dumps(log_data))
        else:
            logger.debug(json.dumps(log_data))

def performance_log(func):
    \"\"\"関数の実行時間を自動ログするデコレータ\"\"\"
    @wraps(func)
    def wrapper(*args, **kwargs):
        with log_execution_time(
            operation_name=func.__name__,
            args_count=len(args),
            kwargs_keys=list(kwargs.keys())
        ):
            return func(*args, **kwargs)
    return wrapper

# 使用例
@performance_log
def heavy_calculation(n: int) -> int:
    \"\"\"重い計算処理\"\"\"
    total = 0
    for i in range(n):
        total += i ** 2
    return total

@performance_log
def database_query():
    \"\"\"データベースクエリ(疑似)\"\"\"
    time.sleep(0.5)
    return [{'id': i, 'name': f'user_{i}'} for i in range(100)]

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    
    result1 = heavy_calculation(1000000)
    result2 = database_query()
    
    # 手動でコンテキストマネージャーを使用
    with log_execution_time('batch_process', batch_id='BATCH_001'):
        time.sleep(0.3)
        # 処理内容

5. 注意点・トラブルシューティング

注意点1:ログレベルの設定ミス

本番環境で誤ってDEBUGレベルを設定すると、ログ量が膨大になりディスク容量を圧迫します。環境ごとに明確にレベルを分け、テストを実施することが重要です。

import os

# 環境に応じたログレベル
LOG_LEVELS = {
    'development': 'DEBUG',
    'staging': 'INFO',
    'production': 'WARNING'
}

env = os.getenv('APP_ENV', 'development')
log_level = LOG_LEVELS.get(env, 'INFO')

logger.setLevel(log_level)

注意点2:ログローテーションの設定忘れ

RotatingFileHandlerを使わないと、ログファイルが無限に成長し、最終的にディスク満杯エラーになります。

# 間違い:ローテーションなし
# handler = logging.FileHandler('app.log')

# 正解:ローテーション有効
handler = logging.handlers.RotatingFileHandler(
    'app.log',
    maxBytes=10*1024*1024,  # 10MB
    backupCount=5  # 最大5世代保持
)

注意点3:機密情報のログ出力

パスワード、APIキー、個人情報などを誤ってログに出力しないよう注意が必要です。

import re

class SensitiveDataFilter(logging.Filter):
    \"\"\"機密情報をマスクするフィルター\"\"\"
    
    PATTERNS = [
        (r'password[\"\\']?\\s*[:=]\\s*[\"\\']([^\"\\'\\'s]+)[\"\\']', 'password=***'),
        (r'api[_-]?key[\"\\']?\\s*[:=]\\s*[\"\\']([^\"\\'\\'s]+)[\"\\']', 'api_key=***'),
        (r'token[\"\\']?\\s*[:=]\\s*[\"\\']([^\"\\'\\'s]+)[\"\\']', 'token=***'),
        (r'\\b\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}\\b', 'XXXX-XXXX-XXXX-XXXX'),
    ]
    
    def filter(self, record):
        message = record.getMessage()
        for pattern, replacement in self.PATTERNS:
            message = re.sub(pattern, replacement, message, flags=re.IGNORECASE)
        record.msg = message
        return True

# 使用
logger = logging.getLogger(__name__)
logger.addFilter(SensitiveDataFilter())

# テスト
logger.info('User login with password=secret123')
# 出力: User login with password=***

注意点4:スレッド安全性

loggingモジュールはスレッドセーフですが、複数プロセスからの同時アクセスはサポートされていません。マルチプロセス環境ではQueueHandlerを使用してください。

import logging
import logging.handlers
from multiprocessing import Queue
import multiprocessing

# ロギングキューを作成
log_queue = Queue()

def setup_multiprocess_logging():
    \"\"\"マルチプロセス対応のログ設定\"\"\"
    # メインプロセスでリスナーを起動
    listener = logging.handlers.QueueListener(
        log_queue,
        logging.FileHandler('app.log'),
        logging.StreamHandler(),
        respect_handler_level=True
    )
    listener.start()
    
    return listener

def worker_process(worker_id: int):
    \"\"\"ワーカープロセス\"\"\"
    # 各プロセスでQueueHandlerを設定
    handler = logging.handlers.QueueHandler(log_queue)
    logger = logging.getLogger(f'worker_{worker_id}')
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)
    
    logger.info(f'Worker {worker_id} started')
    # 処理
    logger.info(f'Worker {worker_id} finished')

if __name__ == '__main__':
    listener = setup_multiprocess_logging()
    
    # 複数プロセスを起動
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker_process, args=(i,))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    listener.stop()

注意点5:ログフォーマッターのJSON化時のエスケープ

JSON形式にする際、メッセージ内の特殊文字を適切にエスケープする必要があります。

import json
import logging

class JSONFormatter(logging.Formatter):
\"\"\"JSON形式でログを出力するフォーマッター\"\"\

タイトルとURLをコピーしました