Python loggingの業務実装パターン|本番環境対応の実践ガイド
こんにちは。今回はPythonのloggingモジュールを業務で実際に使うパターンについて、実践的な視点で解説していきます。
多くの開発者がprint文でデバッグしていたり、loggingを導入しても設定が甘いままになっていたりします。本番環境では適切なログ戦略が非常に重要です。この記事では、実務で本当に役立つloggingの設定パターンを紹介します。
1. Pythonのloggingについて|簡易解説
Pythonのloggingモジュールは、アプリケーションのイベントを記録するための標準ライブラリです。print文と異なり、ログレベルの管理、複数の出力先への同時出力、タイムスタンプやコンテキスト情報の自動追加などが可能です。
基本的な構成要素は以下の通りです:
- Logger:ログを記録するオブジェクト
- Handler:ログの出力先を制御(ファイル、コンソールなど)
- Formatter:ログのフォーマットを定義
- Filter:ログレコードをフィルタリング
ログレベルは以下の5段階があります:
- DEBUG(10):詳細なデバッグ情報
- INFO(20):一般的な情報
- WARNING(30):警告メッセージ
- ERROR(40):エラー
- CRITICAL(50):重大なエラー
2. 業務でのユースケース
実務でloggingが活躍するシーンを整理してみます。
ユースケース1:Webアプリケーション(Flask/Django)
APIのリクエスト・レスポンス、データベースクエリ、外部API連携などの追跡が必要です。
ユースケース2:バッチ処理
定期実行のバッチジョブ、大量データ処理、スケジューラとの連携など、実行中のプロセスが見えにくいため、詳細なログが不可欠です。
ユースケース3:本番環境での運用
エラー発生時の原因追跡、パフォーマンス監視、セキュリティイベントの記録など。
ユースケース4:マイクロサービス
分散システムにおいて、ログ集約ツール(ELK、Splunk、CloudWatch)との統合が必要です。
3. 実装コード|業務で実際に使うパターン
パターン1:基本的なロギング設定(開発環境用)
import logging
import sys
# シンプルな設定
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
logger.debug('デバッグ情報です')
logger.info('処理が正常に進行しています')
logger.warning('注意が必要な状態です')
logger.error('エラーが発生しました')
if __name__ == '__main__':
main()
ただし、業務ではbasicConfigは避けるべきです。理由は、モジュール間の設定競合やテストの複雑さにつながるからです。
パターン2:本番環境対応の設定(推奨)
実務で最も使われるのは、環境に応じた設定ファイルベースのアプローチです。
import logging
import logging.config
import os
import json
from pathlib import Path
def setup_logging(config_path: str = None, env: str = None):
\"\"\"
ログ設定を初期化する関数
環境変数またはconfigファイルから設定を読み込む
\"\"\"
if env is None:
env = os.getenv('APP_ENV', 'development')
if config_path is None:
config_path = os.path.join(
Path(__file__).parent,
f'logging_config_{env}.json'
)
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
logging.config.dictConfig(config)
else:
# フォールバック設定
logging.basicConfig(level=logging.INFO)
return logging.getLogger(__name__)
logger = setup_logging()
if __name__ == '__main__':
logger.info('ログシステムが初期化されました')
パターン3:JSON形式の設定ファイル(logging_config_production.json)
{
\"version\": 1,
\"disable_existing_loggers\": false,
\"formatters\": {
\"standard\": {
\"format\": \"%(asctime)s [%(levelname)s] %(name)s: %(message)s\"
},
\"json\": {
\"format\": \"{\\\"timestamp\\\": \\\"%(asctime)s\\\", \\\"level\\\": \\\"%(levelname)s\\\", \\\"logger\\\": \\\"%(name)s\\\", \\\"message\\\": \\\"%(message)s\\\"}\",
\"datefmt\": \"%Y-%m-%d %H:%M:%S\"
}
},
\"handlers\": {
\"console\": {
\"class\": \"logging.StreamHandler\",
\"level\": \"INFO\",
\"formatter\": \"standard\",
\"stream\": \"ext://sys.stdout\"
},
\"file\": {
\"class\": \"logging.handlers.RotatingFileHandler\",
\"level\": \"DEBUG\",
\"formatter\": \"json\",
\"filename\": \"logs/app.log\",
\"maxBytes\": 10485760,
\"backupCount\": 5,
\"encoding\": \"utf-8\"
},
\"error_file\": {
\"class\": \"logging.handlers.RotatingFileHandler\",
\"level\": \"ERROR\",
\"formatter\": \"json\",
\"filename\": \"logs/error.log\",
\"maxBytes\": 10485760,
\"backupCount\": 10,
\"encoding\": \"utf-8\"
}
},
\"loggers\": {
\"\": {
\"level\": \"DEBUG\",
\"handlers\": [\"console\", \"file\", \"error_file\"],
\"propagate\": false
},
\"urllib3\": {
\"level\": \"WARNING\",
\"handlers\": [\"console\"],
\"propagate\": false
}
}
}
パターン4:本番環境用のロギングマネージャークラス
複数のサービス、外部API呼び出し、データベース操作などを扱う場合、構造化ログが重要です。
import logging
import json
import traceback
import time
from typing import Any, Dict, Optional
from datetime import datetime
class StructuredLogger:
\"\"\"業務用の構造化ロガー\"\"\"
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log_event(
self,
level: str,
message: str,
**context: Any
) -> None:
\"\"\"
構造化ログを出力
Args:
level: ログレベル(debug, info, warning, error, critical)
message: ログメッセージ
**context: 追加のコンテキスト情報
\"\"\"
log_record = {
'timestamp': datetime.utcnow().isoformat(),
'message': message,
'level': level.upper(),
**context
}
log_method = getattr(self.logger, level.lower())
log_method(json.dumps(log_record, ensure_ascii=False))
def log_api_call(
self,
service_name: str,
endpoint: str,
method: str,
status_code: int,
duration_ms: float,
**extra: Any
) -> None:
\"\"\"API呼び出しをログに記録\"\"\"
self.log_event(
'info',
f'API call to {service_name}',
service=service_name,
endpoint=endpoint,
method=method,
status_code=status_code,
duration_ms=round(duration_ms, 2),
**extra
)
def log_exception(
self,
message: str,
exception: Exception,
**context: Any
) -> None:
\"\"\"例外をログに記録\"\"\"
self.log_event(
'error',
message,
exception_type=type(exception).__name__,
exception_message=str(exception),
traceback=traceback.format_exc(),
**context
)
def log_database_query(
self,
query: str,
duration_ms: float,
rows_affected: int = 0,
**extra: Any
) -> None:
\"\"\"データベースクエリをログに記録\"\"\"
self.log_event(
'debug',
'Database query executed',
query=query,
duration_ms=round(duration_ms, 2),
rows_affected=rows_affected,
**extra
)
# 使用例
if __name__ == '__main__':
# ロガーの設定(簡略版)
logging.basicConfig(level=logging.DEBUG)
logger = StructuredLogger(__name__)
# API呼び出しのログ
logger.log_api_call(
service_name='payment_service',
endpoint='/charge',
method='POST',
status_code=200,
duration_ms=245.5,
user_id='user_12345',
amount=10000
)
# 例外のログ
try:
result = 1 / 0
except Exception as e:
logger.log_exception(
'Failed to calculate',
e,
operation='divide'
)
# データベースクエリのログ
logger.log_database_query(
query='SELECT * FROM users WHERE id = %s',
duration_ms=15.3,
rows_affected=1,
table='users'
)
パターン5:Flask/Djangoでの実装例
Webアプリケーション内でリクエストコンテキストを含むログを実装する例です。
import logging
import json
import time
from flask import Flask, request, g
from functools import wraps
from datetime import datetime
app = Flask(__name__)
# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RequestIdFilter(logging.Filter):
\"\"\"リクエストIDをログに含めるフィルター\"\"\"
def filter(self, record):
record.request_id = g.get('request_id', 'N/A')
return True
# フィルターをハンドラーに追加
for handler in logger.handlers:
handler.addFilter(RequestIdFilter())
@app.before_request
def before_request():
\"\"\"リクエスト前の処理\"\"\"
g.request_id = request.headers.get('X-Request-ID', 'N/A')
g.start_time = time.time()
logger.info(
json.dumps({
'event': 'request_start',
'request_id': g.request_id,
'method': request.method,
'path': request.path,
'remote_addr': request.remote_addr,
'timestamp': datetime.utcnow().isoformat()
})
)
@app.after_request
def after_request(response):
\"\"\"リクエスト後の処理\"\"\"
duration_ms = (time.time() - g.start_time) * 1000
logger.info(
json.dumps({
'event': 'request_end',
'request_id': g.request_id,
'method': request.method,
'path': request.path,
'status_code': response.status_code,
'duration_ms': round(duration_ms, 2),
'timestamp': datetime.utcnow().isoformat()
})
)
return response
@app.route('/api/users/', methods=['GET'])
def get_user(user_id: int):
\"\"\"ユーザー情報を取得するAPI\"\"\"
try:
logger.debug(
json.dumps({
'event': 'database_query',
'request_id': g.request_id,
'query': 'SELECT * FROM users WHERE id = %s',
'user_id': user_id
})
)
# データベース処理(疑似コード)
user_data = {'id': user_id, 'name': 'John Doe'}
logger.info(
json.dumps({
'event': 'user_fetched',
'request_id': g.request_id,
'user_id': user_id
})
)
return user_data, 200
except Exception as e:
logger.error(
json.dumps({
'event': 'error',
'request_id': g.request_id,
'error_type': type(e).__name__,
'error_message': str(e),
'user_id': user_id
})
)
return {'error': 'Internal Server Error'}, 500
if __name__ == '__main__':
app.run(debug=False)
4. よくある応用パターン
応用パターン1:複数出力先への並列ログ出力
本番環境では、コンソール出力、ファイル保存、ログサービスへの送信を同時に行う必要があります。
import logging
import logging.handlers
import socket
import json
from typing import Optional
class SyslogHandler(logging.handlers.SyslogHandler):
\"\"\"JSON形式でSyslogに送信するハンドラー\"\"\"
def emit(self, record):
try:
msg = self.format(record)
if isinstance(msg, str) and msg.startswith('{'):
# JSON形式ならそのまま送信
self.sock.sendto(msg.encode(), self.address)
else:
super().emit(record)
except Exception:
self.handleError(record)
def setup_advanced_logging(
log_file: str = 'app.log',
syslog_host: Optional[str] = None,
syslog_port: int = 514
):
\"\"\"複数出力先対応のログ設定\"\"\"
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# フォーマッター定義
json_formatter = logging.Formatter(
'{\"timestamp\": \"%(asctime)s\", \"level\": \"%(levelname)s\", \"name\": \"%(name)s\", \"message\": \"%(message)s\"}'
)
standard_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# コンソール出力
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(standard_formatter)
logger.addHandler(console_handler)
# ファイル出力(ローテーション対応)
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(json_formatter)
logger.addHandler(file_handler)
# Syslog出力
if syslog_host:
try:
syslog_handler = SyslogHandler(
address=(syslog_host, syslog_port)
)
syslog_handler.setLevel(logging.WARNING)
syslog_handler.setFormatter(json_formatter)
logger.addHandler(syslog_handler)
except Exception as e:
logger.warning(f'Syslog接続失敗: {e}')
return logger
# 使用例
if __name__ == '__main__':
logger = setup_advanced_logging(
log_file='logs/app.log',
syslog_host='localhost'
)
logger.info('システムが起動しました')
logger.warning('リソースの使用率が高いです')
logger.error('データベース接続エラー')
応用パターン2:ログレベルの動的変更
本番環境で問題が発生した際、再起動なしにログレベルを変更したい場合があります。
import logging
import os
from logging.config import listen, DEFAULT_LOGGING_PORT
def configure_log_listener():
\"\"\"ログレベル変更をリッスンするサーバーを起動\"\"\"
# 環境変数で設定ポートを指定可能
port = int(os.getenv('LOGGING_CONFIG_PORT', DEFAULT_LOGGING_PORT))
# 別プロセスでリスナーを起動
import threading
import socket
def run_listener():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', port))
sock.listen(1)
print(f'ログリスナーがポート{port}で起動しました')
while True:
conn, addr = sock.accept()
print(f'ログ設定リクエスト受信: {addr}')
listen(sockobj=conn)
except Exception as e:
print(f'ログリスナーエラー: {e}')
listener_thread = threading.Thread(target=run_listener, daemon=True)
listener_thread.start()
# 使用例
if __name__ == '__main__':
import logging.config
# 基本設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# リスナー起動
configure_log_listener()
logger.info('アプリケーション起動')
logger.debug('これはDEBUGレベルです(表示されません)')
# 動的にログレベルを変更
# logger.setLevel(logging.DEBUG)
# これで以後DEBUGレベルも表示される
応用パターン3:パフォーマンス計測の統合ログ
処理時間、メモリ使用量などの計測情報をログに含める実装です。
import logging
import time
import psutil
import json
from functools import wraps
from datetime import datetime
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@contextmanager
def log_execution_time(operation_name: str, **extra_context):
\"\"\"実行時間をログに記録するコンテキストマネージャー\"\"\"
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
try:
yield
finally:
duration_ms = (time.time() - start_time) * 1000
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
memory_delta = end_memory - start_memory
log_data = {
'operation': operation_name,
'duration_ms': round(duration_ms, 2),
'memory_mb': round(end_memory, 2),
'memory_delta_mb': round(memory_delta, 2),
'timestamp': datetime.utcnow().isoformat(),
**extra_context
}
# 時間がかかった処理は警告をログに
if duration_ms > 1000:
logger.warning(json.dumps(log_data))
else:
logger.debug(json.dumps(log_data))
def performance_log(func):
\"\"\"関数の実行時間を自動ログするデコレータ\"\"\"
@wraps(func)
def wrapper(*args, **kwargs):
with log_execution_time(
operation_name=func.__name__,
args_count=len(args),
kwargs_keys=list(kwargs.keys())
):
return func(*args, **kwargs)
return wrapper
# 使用例
@performance_log
def heavy_calculation(n: int) -> int:
\"\"\"重い計算処理\"\"\"
total = 0
for i in range(n):
total += i ** 2
return total
@performance_log
def database_query():
\"\"\"データベースクエリ(疑似)\"\"\"
time.sleep(0.5)
return [{'id': i, 'name': f'user_{i}'} for i in range(100)]
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
result1 = heavy_calculation(1000000)
result2 = database_query()
# 手動でコンテキストマネージャーを使用
with log_execution_time('batch_process', batch_id='BATCH_001'):
time.sleep(0.3)
# 処理内容
5. 注意点・トラブルシューティング
注意点1:ログレベルの設定ミス
本番環境で誤ってDEBUGレベルを設定すると、ログ量が膨大になりディスク容量を圧迫します。環境ごとに明確にレベルを分け、テストを実施することが重要です。
import os
# 環境に応じたログレベル
LOG_LEVELS = {
'development': 'DEBUG',
'staging': 'INFO',
'production': 'WARNING'
}
env = os.getenv('APP_ENV', 'development')
log_level = LOG_LEVELS.get(env, 'INFO')
logger.setLevel(log_level)
注意点2:ログローテーションの設定忘れ
RotatingFileHandlerを使わないと、ログファイルが無限に成長し、最終的にディスク満杯エラーになります。
# 間違い:ローテーションなし
# handler = logging.FileHandler('app.log')
# 正解:ローテーション有効
handler = logging.handlers.RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5 # 最大5世代保持
)
注意点3:機密情報のログ出力
パスワード、APIキー、個人情報などを誤ってログに出力しないよう注意が必要です。
import re
class SensitiveDataFilter(logging.Filter):
\"\"\"機密情報をマスクするフィルター\"\"\"
PATTERNS = [
(r'password[\"\\']?\\s*[:=]\\s*[\"\\']([^\"\\'\\'s]+)[\"\\']', 'password=***'),
(r'api[_-]?key[\"\\']?\\s*[:=]\\s*[\"\\']([^\"\\'\\'s]+)[\"\\']', 'api_key=***'),
(r'token[\"\\']?\\s*[:=]\\s*[\"\\']([^\"\\'\\'s]+)[\"\\']', 'token=***'),
(r'\\b\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}[\\s-]?\\d{4}\\b', 'XXXX-XXXX-XXXX-XXXX'),
]
def filter(self, record):
message = record.getMessage()
for pattern, replacement in self.PATTERNS:
message = re.sub(pattern, replacement, message, flags=re.IGNORECASE)
record.msg = message
return True
# 使用
logger = logging.getLogger(__name__)
logger.addFilter(SensitiveDataFilter())
# テスト
logger.info('User login with password=secret123')
# 出力: User login with password=***
注意点4:スレッド安全性
loggingモジュールはスレッドセーフですが、複数プロセスからの同時アクセスはサポートされていません。マルチプロセス環境ではQueueHandlerを使用してください。
import logging
import logging.handlers
from multiprocessing import Queue
import multiprocessing
# ロギングキューを作成
log_queue = Queue()
def setup_multiprocess_logging():
\"\"\"マルチプロセス対応のログ設定\"\"\"
# メインプロセスでリスナーを起動
listener = logging.handlers.QueueListener(
log_queue,
logging.FileHandler('app.log'),
logging.StreamHandler(),
respect_handler_level=True
)
listener.start()
return listener
def worker_process(worker_id: int):
\"\"\"ワーカープロセス\"\"\"
# 各プロセスでQueueHandlerを設定
handler = logging.handlers.QueueHandler(log_queue)
logger = logging.getLogger(f'worker_{worker_id}')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.info(f'Worker {worker_id} started')
# 処理
logger.info(f'Worker {worker_id} finished')
if __name__ == '__main__':
listener = setup_multiprocess_logging()
# 複数プロセスを起動
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker_process, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
listener.stop()
注意点5:ログフォーマッターのJSON化時のエスケープ
JSON形式にする際、メッセージ内の特殊文字を適切にエスケープする必要があります。
import json
import logging
class JSONFormatter(logging.Formatter):
\"\"\"JSON形式でログを出力するフォーマッター\"\"\
