Python generatorを活用した業務システム実装パターン:メモリ効率とスケーラビリティ
現代のビジネスシステムでは、膨大なデータを効率的に処理することが求められます。Pythonのgeneratorは、メモリ使用量を削減しながら大規模なデータを扱うための強力な機能です。本記事では、実務で頻繁に遭遇するシナリオに基づいて、generatorの実装パターンを解説します。
generatorの基本理解
generatorは、値を1つずつ順番に生成する特別な関数です。listのようにすべてのデータをメモリに保持するのではなく、必要な時に1つずつ値を返します。
通常のリスト処理では、100万件のデータをメモリ上に展開する必要があります:
# 非効率な方法:全データをメモリに展開
def get_all_user_ids():
users = []
for i in range(1000000):
users.append(i)
return users
# 使用時にすべてがメモリに存在
all_ids = get_all_user_ids() # 数十MB消費
for user_id in all_ids:
process_user(user_id)
一方、generatorを使えば必要な分だけ生成できます:
# 効率的な方法:必要に応じて生成
def get_user_ids():
for i in range(1000000):
yield i
# メモリ効率的
for user_id in get_user_ids():
process_user(user_id)
業務システムでのユースケース
1. 大規模CSVファイルの処理
業務では、数ギガバイトのCSVファイルをバッチ処理することがあります。generatorを活用すれば、メモリ効率的に処理できます:
import csv
from typing import Generator, Dict
def read_large_csv(filepath: str, chunk_size: int = 1000) -> Generator[Dict, None, None]:
\"\"\"
大規模CSVファイルをチャンク単位で読み込む
実務パターン:営業データ、顧客情報、取引履歴など
\"\"\"
with open(filepath, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
# 使用例:月間の売上データ(1000万行)を処理
def process_sales_data(csv_file: str):
total_amount = 0
total_records = 0
for chunk in read_large_csv(csv_file):
# 各チャンク(1000行)をバッチ処理
for row in chunk:
try:
amount = float(row['amount'])
date = row['date']
total_amount += amount
total_records += 1
except (ValueError, KeyError):
print(f\"データ形式エラー: {row}\")
print(f\"処理完了: {total_records}件、合計金額: {total_amount:,.0f}円\")
return total_amount
# 実行
result = process_sales_data('sales_data_2024.csv')
2. APIからのページネーション処理
外部APIのデータを連続取得する場合、generatorでpagination処理をシンプルに実装できます:
import requests
from typing import Generator, Dict, Any
import time
class APIClient:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
def fetch_paginated_data(self,
endpoint: str,
params: Dict[str, Any],
page_size: int = 100) -> Generator[Dict, None, None]:
\"\"\"
APIのページネーションを自動的に処理
実務パターン:顧客管理API、在庫API、CRM連携など
\"\"\"
page = 1
has_next = True
while has_next:
try:
# APIリクエスト
response = requests.get(
f\"{self.base_url}/{endpoint}\",
params={**params, 'page': page, 'per_page': page_size},
headers={'Authorization': f'Bearer {self.api_key}'},
timeout=10
)
response.raise_for_status()
data = response.json()
# 各ページのデータを1件ずつ yield
for item in data.get('items', []):
yield item
# 次のページがあるか判定
has_next = data.get('has_next', False)
page += 1
# API rate limiting対策
time.sleep(0.1)
except requests.RequestException as e:
print(f\"API呼び出しエラー(ページ{page}): {e}\")
break
# 使用例:顧客管理システムから全顧客データを取得
def sync_customers_to_database(api_client: APIClient):
customer_count = 0
for customer in api_client.fetch_paginated_data(
'customers',
{'status': 'active'},
page_size=100
):
# 各顧客をDB保存
save_to_database(customer)
customer_count += 1
if customer_count % 1000 == 0:
print(f\"同期進行中: {customer_count}件処理\")
print(f\"同期完了: 総{customer_count}件\")
# 実行
api = APIClient('https://api.example.com', 'your-api-key')
sync_customers_to_database(api)
3. ログファイル解析とリアルタイムモニタリング
サーバーログをリアルタイムで監視し、エラーパターンを検出する場合:
import re
from typing import Generator, Tuple
from datetime import datetime
from collections import defaultdict
def tail_log_file(filepath: str) -> Generator[str, None, None]:
\"\"\"
ログファイルをリアルタイムで読み込む
実務パターン:エラーログ監視、セキュリティログ分析
\"\"\"
with open(filepath, 'r', encoding='utf-8') as f:
# ファイルの最後に移動
f.seek(0, 2)
while True:
line = f.readline()
if line:
yield line.rstrip('\\n')
else:
# 新しいログが追加されるまで待機
import time
time.sleep(0.1)
def analyze_error_logs(log_file: str) -> Generator[Dict[str, Any], None, None]:
\"\"\"
ログを解析してエラーパターンを検出
\"\"\"
error_pattern = re.compile(r'\\[(\\d{4}-\\d{2}-\\d{2})\\s([\\d:]+)\\]\\s(ERROR|WARN):\\s(.+?)(?:\\s\\|\\s(.+))?$')
error_buffer = defaultdict(int)
for line in tail_log_file(log_file):
match = error_pattern.match(line)
if match:
date, time_str, level, message, context = match.groups()
# エラーをバッファに蓄積
error_key = message[:100] # 最初の100文字をキー
error_buffer[error_key] += 1
# 同じエラーが5回以上発生したら報告
if error_buffer[error_key] == 5:
yield {
'timestamp': f\"{date} {time_str}\",
'level': level,
'error_message': message,
'occurrence_count': error_buffer[error_key],
'context': context
}
# 使用例:バックグラウンドでログ監視
def monitor_application_health(log_file: str):
for alert in analyze_error_logs(log_file):
print(f\"[アラート] {alert['timestamp']}\")
print(f\" エラー: {alert['error_message']}\")
print(f\" 発生回数: {alert['occurrence_count']}回\")
print(f\" コンテキスト: {alert['context']}\")
# このポイントでメール送信やSlack通知を実装
send_alert_notification(alert)
よくある応用パターン
パターン1: generatorの合成
複数のgeneratorを組み合わせてパイプライン処理を実装します:
from typing import Generator, Iterator
def read_data() -> Generator[Dict, None, None]:
\"\"\"データソースから読み込み\"\"\"
for i in range(1000):
yield {'id': i, 'value': i * 100}
def filter_data(data: Generator[Dict, None, None],
threshold: int) -> Generator[Dict, None, None]:
\"\"\"条件でフィルタリング\"\"\"
for item in data:
if item['value'] > threshold:
yield item
def transform_data(data: Generator[Dict, None, None]) -> Generator[Dict, None, None]:
\"\"\"データ変換\"\"\"
for item in data:
yield {
'id': item['id'],
'value': item['value'],
'category': 'high' if item['value'] > 50000 else 'medium'
}
def process_pipeline():
\"\"\"パイプライン処理\"\"\"
raw_data = read_data()
filtered = filter_data(raw_data, threshold=25000)
transformed = transform_data(filtered)
# 最終的な処理
count = 0
for item in transformed:
count += 1
if count % 100 == 0:
print(f\"処理中: {count}件\")
print(f\"パイプライン処理完了: {count}件\")
process_pipeline()
パターン2: generatorとコンテキストマネージャの組み合わせ
リソース管理を適切に行いながらデータを処理します:
from contextlib import contextmanager
from typing import Generator
import sqlite3
@contextmanager
def database_cursor(db_path: str) -> Generator:
\"\"\"データベース接続を管理\"\"\"
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn.cursor()
finally:
conn.close()
def fetch_users_from_db(db_path: str) -> Generator[Dict, None, None]:
\"\"\"DBから大量のユーザーデータを取得\"\"\"
with database_cursor(db_path) as cursor:
cursor.execute(\"\"\"
SELECT id, name, email, created_at
FROM users
ORDER BY id
\"\"\")
for row in cursor.fetchall():
yield dict(row)
# 使用例:メール送信バッチ処理
def send_newsletter(db_path: str):
sent_count = 0
for user in fetch_users_from_db(db_path):
try:
send_email(user['email'], subject='月刊ニュースレター')
sent_count += 1
if sent_count % 100 == 0:
print(f\"メール送信済: {sent_count}件\")
except Exception as e:
print(f\"メール送信失敗 ({user['email']}): {e}\")
print(f\"メール送信完了: {sent_count}件\")
def send_email(email: str, subject: str):
# 実装省略
pass
パターン3: 非同期generatorでの並列処理
非同期処理が必要な場合のパターンです:
import asyncio
from typing import AsyncGenerator
async def fetch_data_async(url: str) -> AsyncGenerator[Dict, None]:
\"\"\"非同期でAPIからデータ取得\"\"\"
async_session = aiohttp.ClientSession()
try:
page = 1
while True:
async with async_session.get(
url,
params={'page': page, 'per_page': 100}
) as response:
data = await response.json()
for item in data.get('items', []):
yield item
if not data.get('has_next'):
break
page += 1
await asyncio.sleep(0.1)
finally:
await async_session.close()
async def process_async_pipeline():
\"\"\"非同期パイプライン処理\"\"\"
count = 0
async for item in fetch_data_async('https://api.example.com/data'):
# 各itemを処理
count += 1
if count % 1000 == 0:
print(f\"処理済: {count}件\")
print(f\"完了: {count}件\")
# 実行
# asyncio.run(process_async_pipeline())
注意点と落とし穴
1. generatorの再利用不可
generatorは一度消費されると再利用できません:
# 危険な例
data_gen = read_large_csv('data.csv')
count1 = sum(1 for _ in data_gen) # generatorを消費
count2 = sum(1 for _ in data_gen) # 0になる(からの)
# 正しい方法
data_gen = read_large_csv('data.csv')
data_list = list(data_gen)
count1 = len(data_list)
count2 = len(data_list)
2. 例外処理の重要性
長時間実行されるgeneratorは、適切な例外処理が必須です:
def safe_data_processing(data_source):
\"\"\"安全なデータ処理\"\"\"
error_count = 0
success_count = 0
try:
for item in data_source:
try:
process_item(item)
success_count += 1
except ValueError as e:
error_count += 1
print(f\"個別エラー: {e}\")
if error_count > 100:
raise Exception(\"エラー数が多すぎます\")
except KeyboardInterrupt:
print(\"ユーザーが処理を中断しました\")
except Exception as e:
print(f\"致命的エラー: {e}\")
finally:
print(f\"処理結果 - 成功: {success_count}, エラー: {error_count}\")
def process_item(item):
pass
3. メモリリークの防止
generatorの内部で大きなオブジェクトを保持しないようにしましょう:
# 危険:バッファが肥大化
def bad_generator():
buffer = []
for i in range(1000000):
buffer.append(i) # メモリ蓄積
yield i
# 正しい:バッファを使わない
def good_generator():
for i in range(1000000):
yield i
# やむを得ずバッファが必要な場合は明確に管理
def buffered_generator(buffer_size=1000):
buffer = []
for i in range(1000000):
buffer.append(i)
if len(buffer) >= buffer_size:
yield from buffer
buffer = []
if buffer:
yield from buffer
パフォーマンス比較
実務では、generatorを使う効果を定量的に理解することが重要です:
import time
import sys
def benchmark_list_vs_generator():
\"\"\"リスト vs generator のメモリ・速度比較\"\"\"
# リスト方式
start_time = time.time()
data_list = [i * 100 for i in range(1000000)]
list_time = time.time() - start_time
list_size = sys.getsizeof(data_list) / 1024 / 1024 # MB
# generator方式
start_time = time.time()
data_gen = (i * 100 for i in range(1000000))
gen_time = time.time() - start_time
gen_size = sys.getsizeof(data_gen) / 1024 # KB
print(f\"生成時間 - リスト: {list_time:.4f}秒, generator: {gen_time:.4f}秒\")
print(f\"メモリ使用量 - リスト: {list_size:.2f}MB, generator: {gen_size:.2f}KB\")
print(f\"削減率: {(list_size / (gen_size/1024)) * 100:.1f}%\")
benchmark_list_vs_generator()
まとめ
Python generatorは、大規模データ処理が当たり前となった現代のビジネスシステムにおいて、不可欠な手法です。本記事で紹介した実装パターンは、以下の業務で特に有効です:
- 大規模ファイル処理:CSV、JSON、ログファイルなどのギガバイト級データ
- API連携:ページネーション処理が必要な外部API呼び出し
- リアルタイム監視:ログ解析、メトリクス収集、アラート生成
- バッチ処理:営業日ごとのデータ同期、メール送信、レポート生成
generatorを適切に活用することで、メモリ効率を数百倍改善し、スケーラブルで保守性の高いシステムを構築できます。ただし、例外処理やリソース管理には細心の注意が必要です。
実装する際は、常にメモリ使用量と処理時間を計測し、generatorが適切に効果を発揮しているか検証することをお勧めします。

