Python ファイル操作の業務パターン解説|実務で使える実装コード集
\n\n
はじめに
\n
Pythonを用いた業務システム開発やデータ処理では、ファイル操作がほぼ必須のスキルです。単純なテキストファイルの読み書きから、CSV/JSON/YAMLといった構造化データの処理、ログ管理、バックアップなど、様々な場面でファイル操作が必要になります。本記事では、実務で実際に使われているパターンを中心に、Pythonのファイル操作について詳しく解説します。
\n\n
簡易的な解説
\n\n
ファイル操作の基本概念
\n
Pythonのファイル操作は、以下のサイクルで構成されます:
\n
- \n
- オープン:ファイルを開く(読み込みモード、書き込みモード、追記モードなど)
- 読み書き:ファイルの内容を読み取る、または書き込む
- クローズ:ファイルを閉じてリソースを解放
\n
\n
\n
\n\n
Pythonでは、withステートメント(コンテキストマネージャ)を使うことで、自動的にファイルのクローズが行われるため、メモリリークを防ぐことができます。これが業務コードの鉄則です。
\n\n
業務でのユースケース
\n\n
ユースケース1:日次ログの収集と処理
\n
営業システムやAPI処理では、毎日大量のログファイルが生成されます。これらを集約し、エラーログだけを抽出して別ファイルに保存する処理は日常茶飯事です。
\n\n
ユースケース2:顧客データのCSV処理
\n
営業部門から提供されたCSVファイルを読み込み、データベースに登録する前にバリデーションや正規化を行う処理。または、データベースから抽出したデータをCSVで出力する処理です。
\n\n
ユースケース3:設定ファイルの管理
\n
本番環境と開発環境で異なる設定値を持つYAMLやJSONファイルの読み込み。環境変数との組み合わせで安全に管理します。
\n\n
ユースケース4:定期的なバックアップと世代管理
\n
重要なデータファイルを定期的にバックアップし、古いファイルは自動削除する処理。cronやスケジューラと組み合わせて運用されます。
\n\n
実装コード
\n\n
1. ログファイルの処理と分析
\n
実務では、複数のログファイルから特定のレベル(ERROR、WARNINGなど)のログだけを抽出する必要があります。以下は実際の運用コードです。
\n\n
import os\nfrom datetime import datetime, timedelta\nimport re\nfrom pathlib import Path\n\nclass LogProcessor:\n \"\"\"ログファイル処理クラス\"\"\"\n \n def __init__(self, log_dir: str, output_dir: str):\n self.log_dir = Path(log_dir)\n self.output_dir = Path(output_dir)\n self.output_dir.mkdir(parents=True, exist_ok=True)\n \n def extract_error_logs(self, days: int = 1) -> None:\n \"\"\"過去N日間のログファイルからエラーログを抽出\"\"\"\n cutoff_date = datetime.now() - timedelta(days=days)\n error_pattern = re.compile(r'\\[ERROR\\]|\\[CRITICAL\\]')\n \n for log_file in self.log_dir.glob('*.log'):\n # ログファイルの日付をチェック\n file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime)\n if file_mtime < cutoff_date:\n continue\n \n error_count = 0\n output_file = self.output_dir / f\"error_{log_file.name}\"\n \n try:\n with open(log_file, 'r', encoding='utf-8') as infile, \\\n open(output_file, 'w', encoding='utf-8') as outfile:\n \n for line_num, line in enumerate(infile, 1):\n if error_pattern.search(line):\n outfile.write(f\"{log_file.name}:{line_num}: {line}\")\n error_count += 1\n \n if error_count == 0:\n output_file.unlink() # エラーがなければファイル削除\n else:\n print(f\"{log_file.name}: {error_count}件のエラーを抽出\")\n \n except UnicodeDecodeError:\n # 文字コードが異なる場合の対応\n print(f\"警告: {log_file.name} は UTF-8 でデコードできません\")\n continue\n \n def cleanup_old_logs(self, days: int = 30) -> None:\n \"\"\"30日以上前のログファイルを削除\"\"\"\n cutoff_date = datetime.now() - timedelta(days=days)\n deleted_count = 0\n \n for log_file in self.log_dir.glob('*.log'):\n file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime)\n if file_mtime < cutoff_date:\n try:\n log_file.unlink()\n deleted_count += 1\n except Exception as e:\n print(f\"削除失敗 {log_file.name}: {e}\")\n \n print(f\"{deleted_count}個の古いログファイルを削除しました\")\n\n\n# 使用例\nif __name__ == \"__main__\":\n processor = LogProcessor(\n log_dir=\"/var/log/myapp\",\n output_dir=\"/var/log/myapp/errors\"\n )\n processor.extract_error_logs(days=1)\n processor.cleanup_old_logs(days=30)\n
\n\n
2. CSVファイルの読み込みと加工
\n
営業データをCSVから読み込み、バリデーション、正規化、そしてフィルタリングを行う実装です。
\n\n
import csv\nfrom dataclasses import dataclass\nfrom typing import List, Optional\nfrom pathlib import Path\nimport re\n\n@dataclass\nclass Customer:\n \"\"\"顧客データモデル\"\"\"\n customer_id: str\n name: str\n email: str\n phone: str\n created_date: str\n \n def is_valid(self) -> bool:\n \"\"\"顧客データのバリデーション\"\"\"\n # メールアドレスの形式チェック\n email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'\n if not re.match(email_pattern, self.email):\n return False\n \n # 電話番号の形式チェック(日本の場合)\n phone_pattern = r'^0\\d{1,4}-?\\d{1,4}-?\\d{4}$'\n if not re.match(phone_pattern, self.phone):\n return False\n \n # IDが空でないことを確認\n if not self.customer_id.strip():\n return False\n \n return True\n\nclass CustomerProcessor:\n \"\"\"顧客CSVファイル処理クラス\"\"\"\n \n def __init__(self, input_file: str, output_file: str, error_file: str):\n self.input_file = Path(input_file)\n self.output_file = Path(output_file)\n self.error_file = Path(error_file)\n \n def process(self) -> dict:\n \"\"\"CSVを読み込み、バリデーション、出力\"\"\"\n results = {\n 'total': 0,\n 'valid': 0,\n 'invalid': 0,\n 'errors': []\n }\n \n valid_customers: List[Customer] = []\n \n try:\n with open(self.input_file, 'r', encoding='utf-8-sig') as infile:\n # utf-8-sigでBOMを自動削除\n reader = csv.DictReader(infile)\n \n if reader.fieldnames is None:\n raise ValueError(\"CSVファイルが空です\")\n \n required_fields = {'customer_id', 'name', 'email', 'phone', 'created_date'}\n if not required_fields.issubset(set(reader.fieldnames)):\n raise ValueError(f\"必須カラムがありません。必要: {required_fields}\")\n \n for row_num, row in enumerate(reader, 2): # ヘッダーをスキップして2から開始\n results['total'] += 1\n \n try:\n # 空白を削除してデータをクリーンアップ\n customer = Customer(\n customer_id=row['customer_id'].strip(),\n name=row['name'].strip(),\n email=row['email'].strip().lower(),\n phone=row['phone'].strip().replace(' ', ''),\n created_date=row['created_date'].strip()\n )\n \n if customer.is_valid():\n valid_customers.append(customer)\n results['valid'] += 1\n else:\n results['invalid'] += 1\n results['errors'].append({\n 'row': row_num,\n 'customer_id': customer.customer_id,\n 'reason': 'バリデーション失敗'\n })\n except (KeyError, AttributeError) as e:\n results['invalid'] += 1\n results['errors'].append({\n 'row': row_num,\n 'reason': f'データ形式エラー: {str(e)}'\n })\n \n # 有効なデータを出力ファイルに書き込み\n self._write_valid_customers(valid_customers)\n \n # エラーレポートを出力\n self._write_error_report(results)\n \n except FileNotFoundError:\n print(f\"エラー: ファイルが見つかりません: {self.input_file}\")\n except Exception as e:\n print(f\"エラー: {str(e)}\")\n \n return results\n \n def _write_valid_customers(self, customers: List[Customer]) -> None:\n \"\"\"有効な顧客データをCSVに出力\"\"\"\n with open(self.output_file, 'w', encoding='utf-8', newline='') as outfile:\n fieldnames = ['customer_id', 'name', 'email', 'phone', 'created_date']\n writer = csv.DictWriter(outfile, fieldnames=fieldnames)\n \n writer.writeheader()\n for customer in customers:\n writer.writerow({\n 'customer_id': customer.customer_id,\n 'name': customer.name,\n 'email': customer.email,\n 'phone': customer.phone,\n 'created_date': customer.created_date\n })\n \n def _write_error_report(self, results: dict) -> None:\n \"\"\"エラーレポートをテキストファイルに出力\"\"\"\n with open(self.error_file, 'w', encoding='utf-8') as f:\n f.write(f\"処理結果レポート\\n\")\n f.write(f\"=\"*50 + \"\\n\\n\")\n f.write(f\"総件数: {results['total']}\\n\")\n f.write(f\"有効件数: {results['valid']}\\n\")\n f.write(f\"エラー件数: {results['invalid']}\\n\\n\")\n \n if results['errors']:\n f.write(f\"エラー詳細:\\n\")\n for error in results['errors']:\n f.write(f\" 行{error.get('row', 'N/A')}: {error.get('reason', 'N/A')}\\n\")\n\n\n# 使用例\nif __name__ == \"__main__\":\n processor = CustomerProcessor(\n input_file=\"customers.csv\",\n output_file=\"customers_valid.csv\",\n error_file=\"customers_error.log\"\n )\n result = processor.process()\n print(f\"処理完了: {result['valid']}/{result['total']}件が有効です\")\n
\n\n
3. JSON設定ファイルの読み込みと環境別管理
\n
異なる環境(開発、テスト、本番)で設定値を切り替える実装パターンです。
\n\n
import json\nimport os\nfrom typing import Any, Dict\nfrom pathlib import Path\n\nclass ConfigManager:\n \"\"\"設定ファイル管理クラス\"\"\"\n \n def __init__(self, config_dir: str = \"config\"):\n self.config_dir = Path(config_dir)\n self.config: Dict[str, Any] = {}\n self.environment = os.getenv('APP_ENV', 'development')\n \n def load(self) -> Dict[str, Any]:\n \"\"\"環境に応じた設定ファイルを読み込み\"\"\"\n # デフォルト設定を読み込み\n default_config = self._load_config_file('default.json')\n \n # 環境別の設定をマージ\n env_config = self._load_config_file(f'{self.environment}.json')\n \n # 深いマージ(ネストされた辞書に対応)\n self.config = self._deep_merge(default_config, env_config)\n \n # 環境変数でのオーバーライド\n self._apply_env_overrides()\n \n return self.config\n \n def _load_config_file(self, filename: str) -> Dict[str, Any]:\n \"\"\"単一の設定ファイルを読み込み\"\"\"\n config_path = self.config_dir / filename\n \n if not config_path.exists():\n print(f\"警告: {filename} が見つかりません。空の設定で続行します\")\n return {}\n \n try:\n with open(config_path, 'r', encoding='utf-8') as f:\n return json.load(f)\n except json.JSONDecodeError as e:\n raise ValueError(f\"JSONパース失敗 {filename}: {str(e)}\")\n except Exception as e:\n raise Exception(f\"設定ファイル読み込み失敗 {filename}: {str(e)}\")\n \n def _deep_merge(self, base: Dict, override: Dict) -> Dict:\n \"\"\"ネストされた辞書をマージ\"\"\"\n result = base.copy()\n \n for key, value in override.items():\n if key in result and isinstance(result[key], dict) and isinstance(value, dict):\n result[key] = self._deep_merge(result[key], value)\n else:\n result[key] = value\n \n return result\n \n def _apply_env_overrides(self) -> None:\n \"\"\"環境変数で設定をオーバーライド\"\"\"\n # 環境変数は APP_DATABASE_HOST のような形式で指定\n # これが self.config['database']['host'] に対応\n for key, value in os.environ.items():\n if key.startswith('APP_'):\n config_key = key[4:].lower() # 'APP_' プレフィックスを削除\n parts = config_key.split('_')\n \n # ネストされた設定に対応\n current = self.config\n for part in parts[:-1]:\n if part not in current:\n current[part] = {}\n current = current[part]\n \n current[parts[-1]] = value\n \n def get(self, key: str, default: Any = None) -> Any:\n \"\"\"ドット記法で設定値を取得\"\"\"\n keys = key.split('.')\n value = self.config\n \n for k in keys:\n if isinstance(value, dict):\n value = value.get(k)\n if value is None:\n return default\n else:\n return default\n \n return value\n \n def to_dict(self) -> Dict[str, Any]:\n \"\"\"全設定を辞書として返す\"\"\"\n return self.config.copy()\n\n\n# 使用例\nif __name__ == \"__main__\":\n config = ConfigManager(config_dir=\"config\")\n config.load()\n \n # 設定値を取得\n db_host = config.get('database.host', 'localhost')\n db_port = config.get('database.port', 5432)\n api_timeout = config.get('api.timeout', 30)\n \n print(f\"Database: {db_host}:{db_port}\")\n print(f\"API Timeout: {api_timeout}\")\n
\n\n
4. ファイルのバックアップと世代管理
\n
重要なデータファイルを定期的にバックアップし、古いファイルは自動削除する実装です。
\n\n
import shutil\nfrom datetime import datetime, timedelta\nfrom pathlib import Path\nimport gzip\n\nclass BackupManager:\n \"\"\"ファイルバックアップ管理クラス\"\"\"\n \n def __init__(self, data_file: str, backup_dir: str, max_backups: int = 10):\n self.data_file = Path(data_file)\n self.backup_dir = Path(backup_dir)\n self.max_backups = max_backups\n self.backup_dir.mkdir(parents=True, exist_ok=True)\n \n def backup(self, compress: bool = True) -> str:\n \"\"\"ファイルをバックアップ(オプションで圧縮)\"\"\"\n if not self.data_file.exists():\n raise FileNotFoundError(f\"バックアップ対象ファイルが見つかりません: {self.data_file}\")\n \n timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')\n backup_filename = f\"{self.data_file.stem}_{timestamp}\"\n \n if compress:\n backup_path = self.backup_dir / f\"{backup_filename}.gz\"\n self._compress_file(self.data_file, backup_path)\n else:\n backup_path = self.backup_dir / f\"{backup_filename}{self.data_file.suffix}\"\n shutil.copy2(self.data_file, backup_path)\n \n # バックアップ数の制限をチェック\n self._cleanup_old_backups()\n \n print(f\"バックアップ完了: {backup_path.name}\")\n return str(backup_path)\n \n def _compress_file(self, source: Path, destination: Path) -> None:\n \"\"\"ファイルをgzip圧縮\"\"\"\n try:\n with open(source, 'rb') as f_in:\n with gzip.open(destination, 'wb') as f_out:\n shutil.copyfileobj(f_in, f_out)\n except Exception as e:\n raise Exception(f\"ファイル圧縮失敗: {str(e)}\")\n \n def _cleanup_old_backups(self) -> None:\n \"\"\"古いバックアップファイルを削除\"\"\"\n # バックアップファイルを修正日時でソート\n backup_files = sorted(\n self.backup_dir.glob(f\"{self.data_file.stem}_*\"),\n key=lambda x: x.stat().st_mtime,\n reverse=True\n )\n \n # max_backupsより多い場合は古いものから削除\n for old_backup in backup_files[self.max_backups:]:\n try:\n old_backup.unlink()\n print(f\"古いバックアップを削除: {old_backup.name}\")\n except Exception as e:\n print(f\"削除失敗 {old_backup.name}: {str(e)}\")\n \n def restore(self, backup_filename: str) -> None:\n \"\"\"バックアップからファイルを復元\"\"\"\n backup_path = self.backup_dir / backup_filename\n \n if not backup_path.exists():\n raise FileNotFoundError(f\"バックアップファイルが見つかりません: {backup_filename}\")\n \n try:\n if backup_filename.endswith('.gz'):\n with gzip.open(backup_path, 'rb') as f_in:\n with open(self.data_file, 'wb') as f_out:\n shutil.copyfileobj(f_in, f_out)\n else:\n shutil.copy2(backup_path, self.data_file)\n \n print(f\"復元完了: {self.data_file} <- {backup_filename}\")\n except Exception as e:\n raise Exception(f\"ファイル復元失敗: {str(e)}\")\n \n def list_backups(self) -> list:\n \"\"\"バックアップファイルの一覧を取得\"\"\"\n backups = []\n for backup_file in sorted(\n self.backup_dir.glob(f\"{self.data_file.stem}_*\"),\n key=lambda x: x.stat().st_mtime,\n reverse=True\n ):\n mtime = datetime.fromtimestamp(backup_file.stat().st_mtime)\n size = backup_file.stat().st_size\n backups.append({\n 'filename': backup_file.name,\n 'size': size,\n 'modified': mtime.strftime('%Y-%m-%d %H:%M:%S')\n })\n return backups\n\n\n# 使用例\nif __name__ == \"__main__\":\n backup_mgr = BackupManager(\n data_file=\"/data/important_data.db\",\n backup_dir=\"/data/backups\",\n max_backups=10\n )\n \n # バックアップを作成\n backup_mgr.backup(compress=True)\n \n # バックアップ一覧を表示\n for backup in backup_mgr.list_backups():\n print(f\"{backup['filename']} ({backup['size']} bytes) - {backup['modified']}\")\n
\n\n
よくある応用パターン
\n\n
パターン1:大規模ファイルの分割処理
\n
数GB規模のCSVファイルを一度に読み込むメモリを圧迫するため、チャンク単位で読み込む必要があります。
\n\n
def process_large_csv(input_file: str, chunk_size: int = 10000):\n \"\"\"大規模CSVファイルをチャンクで処理\"\"\"\n chunk = []\n \n with open(input_file, 'r', encoding='utf-8') as f:\n reader = csv.DictReader(f)\n \n for row in reader:\n chunk.append(row)\n \n if len(chunk) >= chunk_size:\n # チャンク処理(例:データベースに一括挿入)\n process_chunk(chunk)\n chunk = []\n \n # 残りのデータを処理\n if chunk:\n process_chunk(chunk)\n\ndef process_chunk(chunk: list):\n \"\"\"チャンクデータを処理(実装省略)\"\"\"\n print(f\"{len(chunk)}件のデータを処理中...\")\n
\n\n
パターン2:複数のテキストファイルをマージ
\n
ディレクトリ内の複数のログファイルを1つのファイルに統合する場合です。
\n\n
def merge_files(input_dir: str, output_file: str, pattern: str = \"*.txt\"):\n \"\"\"複数ファイルをマージ\"\"\"\n input_path = Path(input_dir)\n files_to_merge = sorted(input_path.glob(pattern))\n \n if not files_to_merge:\n print(f\"マージ対象ファイルが見つかりません: {pattern}\")\n return\n \n with open(output_file, 'w', encoding='utf-8') as outfile:\n for file in files_to_merge:\n outfile.write(f\"\\n=== {file.name} ===\\n\")\n try:\n with open(file, 'r', encoding='utf-8') as infile:\n outfile.write(infile.read())\n except UnicodeDecodeError:\n print(f\"警告: {file.name} をスキップ(文字コード不一致)\")\n continue\n \n print(f\"{len(files_to_merge)}個のファイルを {output_file} にマージしました\")\n
\n\n
パターン3:ファイル監視と自動処理
\n
特定のディレクトリを監視し、新しいファイルが追加されたら自動的に処理を実行します。
\n\n
import time\nfrom pathlib import Path\n\nclass FileWatcher:\n \"\"\"ファイル監視クラス\"\"\"\n \n def __init__(self, watch_dir: str, processed_dir: str):\n self.watch_dir = Path(watch_dir)\n self.processed_dir = Path(processed_dir)\n self.processed_dir.mkdir(parents=True, exist_ok=True)\n self.processed_files = set()\n \n def watch(self, file_pattern: str = \"*.csv\", interval: int = 5):\n \"\"\"ディレクトリを監視し、新しいファイルを処理\"\"\"\n print(f\"{self.watch_dir} を監視中...\")\n \n while True:\n current_files = set(self.watch_dir.glob(file_pattern))\n new_files = current_files - self.processed_files\n \n for file in new_files:\n print(f\"新しいファイルを検出: {file.name}\")\n try:\n self.process_file(file)\n self.processed_files.add(file)\n except Exception as e:\n print(f\"処理失敗: {file.name} - {str(e)}\")\n \n time.sleep(interval)\n \n def process_file(self, file: Path):\n \"\"\"ファイルを処理\"\"\"\n # 実装は業務要件に応じて変更\n with open(file, 'r', encoding='utf-8') as f:\n content = f.read()\n # 処理内容\n print(f\

