Python ファイル操作の業務パターン解説|実務で使える実装コード集

Python
\n

Python ファイル操作の業務パターン解説|実務で使える実装コード集

\n\n

はじめに

\n

Pythonを用いた業務システム開発やデータ処理では、ファイル操作がほぼ必須のスキルです。単純なテキストファイルの読み書きから、CSV/JSON/YAMLといった構造化データの処理、ログ管理、バックアップなど、様々な場面でファイル操作が必要になります。本記事では、実務で実際に使われているパターンを中心に、Pythonのファイル操作について詳しく解説します。

\n\n

簡易的な解説

\n\n

ファイル操作の基本概念

\n

Pythonのファイル操作は、以下のサイクルで構成されます:

\n

    \n

  • オープン:ファイルを開く(読み込みモード、書き込みモード、追記モードなど)
  • \n

  • 読み書き:ファイルの内容を読み取る、または書き込む
  • \n

  • クローズ:ファイルを閉じてリソースを解放
  • \n

\n\n

Pythonでは、withステートメント(コンテキストマネージャ)を使うことで、自動的にファイルのクローズが行われるため、メモリリークを防ぐことができます。これが業務コードの鉄則です。

\n\n

業務でのユースケース

\n\n

ユースケース1:日次ログの収集と処理

\n

営業システムやAPI処理では、毎日大量のログファイルが生成されます。これらを集約し、エラーログだけを抽出して別ファイルに保存する処理は日常茶飯事です。

\n\n

ユースケース2:顧客データのCSV処理

\n

営業部門から提供されたCSVファイルを読み込み、データベースに登録する前にバリデーションや正規化を行う処理。または、データベースから抽出したデータをCSVで出力する処理です。

\n\n

ユースケース3:設定ファイルの管理

\n

本番環境と開発環境で異なる設定値を持つYAMLやJSONファイルの読み込み。環境変数との組み合わせで安全に管理します。

\n\n

ユースケース4:定期的なバックアップと世代管理

\n

重要なデータファイルを定期的にバックアップし、古いファイルは自動削除する処理。cronやスケジューラと組み合わせて運用されます。

\n\n

実装コード

\n\n

1. ログファイルの処理と分析

\n

実務では、複数のログファイルから特定のレベル(ERROR、WARNINGなど)のログだけを抽出する必要があります。以下は実際の運用コードです。

\n\n

import os\nfrom datetime import datetime, timedelta\nimport re\nfrom pathlib import Path\n\nclass LogProcessor:\n    \"\"\"ログファイル処理クラス\"\"\"\n    \n    def __init__(self, log_dir: str, output_dir: str):\n        self.log_dir = Path(log_dir)\n        self.output_dir = Path(output_dir)\n        self.output_dir.mkdir(parents=True, exist_ok=True)\n    \n    def extract_error_logs(self, days: int = 1) -> None:\n        \"\"\"過去N日間のログファイルからエラーログを抽出\"\"\"\n        cutoff_date = datetime.now() - timedelta(days=days)\n        error_pattern = re.compile(r'\\[ERROR\\]|\\[CRITICAL\\]')\n        \n        for log_file in self.log_dir.glob('*.log'):\n            # ログファイルの日付をチェック\n            file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime)\n            if file_mtime < cutoff_date:\n                continue\n            \n            error_count = 0\n            output_file = self.output_dir / f\"error_{log_file.name}\"\n            \n            try:\n                with open(log_file, 'r', encoding='utf-8') as infile, \\\n                     open(output_file, 'w', encoding='utf-8') as outfile:\n                    \n                    for line_num, line in enumerate(infile, 1):\n                        if error_pattern.search(line):\n                            outfile.write(f\"{log_file.name}:{line_num}: {line}\")\n                            error_count += 1\n                \n                if error_count == 0:\n                    output_file.unlink()  # エラーがなければファイル削除\n                else:\n                    print(f\"{log_file.name}: {error_count}件のエラーを抽出\")\n                    \n            except UnicodeDecodeError:\n                # 文字コードが異なる場合の対応\n                print(f\"警告: {log_file.name} は UTF-8 でデコードできません\")\n                continue\n    \n    def cleanup_old_logs(self, days: int = 30) -> None:\n        \"\"\"30日以上前のログファイルを削除\"\"\"\n        cutoff_date = datetime.now() - timedelta(days=days)\n        deleted_count = 0\n        \n        for log_file in self.log_dir.glob('*.log'):\n            file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime)\n            if file_mtime < cutoff_date:\n                try:\n                    log_file.unlink()\n                    deleted_count += 1\n                except Exception as e:\n                    print(f\"削除失敗 {log_file.name}: {e}\")\n        \n        print(f\"{deleted_count}個の古いログファイルを削除しました\")\n\n\n# 使用例\nif __name__ == \"__main__\":\n    processor = LogProcessor(\n        log_dir=\"/var/log/myapp\",\n        output_dir=\"/var/log/myapp/errors\"\n    )\n    processor.extract_error_logs(days=1)\n    processor.cleanup_old_logs(days=30)\n

\n\n

2. CSVファイルの読み込みと加工

\n

営業データをCSVから読み込み、バリデーション、正規化、そしてフィルタリングを行う実装です。

\n\n

import csv\nfrom dataclasses import dataclass\nfrom typing import List, Optional\nfrom pathlib import Path\nimport re\n\n@dataclass\nclass Customer:\n    \"\"\"顧客データモデル\"\"\"\n    customer_id: str\n    name: str\n    email: str\n    phone: str\n    created_date: str\n    \n    def is_valid(self) -> bool:\n        \"\"\"顧客データのバリデーション\"\"\"\n        # メールアドレスの形式チェック\n        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'\n        if not re.match(email_pattern, self.email):\n            return False\n        \n        # 電話番号の形式チェック(日本の場合)\n        phone_pattern = r'^0\\d{1,4}-?\\d{1,4}-?\\d{4}$'\n        if not re.match(phone_pattern, self.phone):\n            return False\n        \n        # IDが空でないことを確認\n        if not self.customer_id.strip():\n            return False\n        \n        return True\n\nclass CustomerProcessor:\n    \"\"\"顧客CSVファイル処理クラス\"\"\"\n    \n    def __init__(self, input_file: str, output_file: str, error_file: str):\n        self.input_file = Path(input_file)\n        self.output_file = Path(output_file)\n        self.error_file = Path(error_file)\n    \n    def process(self) -> dict:\n        \"\"\"CSVを読み込み、バリデーション、出力\"\"\"\n        results = {\n            'total': 0,\n            'valid': 0,\n            'invalid': 0,\n            'errors': []\n        }\n        \n        valid_customers: List[Customer] = []\n        \n        try:\n            with open(self.input_file, 'r', encoding='utf-8-sig') as infile:\n                # utf-8-sigでBOMを自動削除\n                reader = csv.DictReader(infile)\n                \n                if reader.fieldnames is None:\n                    raise ValueError(\"CSVファイルが空です\")\n                \n                required_fields = {'customer_id', 'name', 'email', 'phone', 'created_date'}\n                if not required_fields.issubset(set(reader.fieldnames)):\n                    raise ValueError(f\"必須カラムがありません。必要: {required_fields}\")\n                \n                for row_num, row in enumerate(reader, 2):  # ヘッダーをスキップして2から開始\n                    results['total'] += 1\n                    \n                    try:\n                        # 空白を削除してデータをクリーンアップ\n                        customer = Customer(\n                            customer_id=row['customer_id'].strip(),\n                            name=row['name'].strip(),\n                            email=row['email'].strip().lower(),\n                            phone=row['phone'].strip().replace(' ', ''),\n                            created_date=row['created_date'].strip()\n                        )\n                        \n                        if customer.is_valid():\n                            valid_customers.append(customer)\n                            results['valid'] += 1\n                        else:\n                            results['invalid'] += 1\n                            results['errors'].append({\n                                'row': row_num,\n                                'customer_id': customer.customer_id,\n                                'reason': 'バリデーション失敗'\n                            })\n                    except (KeyError, AttributeError) as e:\n                        results['invalid'] += 1\n                        results['errors'].append({\n                            'row': row_num,\n                            'reason': f'データ形式エラー: {str(e)}'\n                        })\n            \n            # 有効なデータを出力ファイルに書き込み\n            self._write_valid_customers(valid_customers)\n            \n            # エラーレポートを出力\n            self._write_error_report(results)\n            \n        except FileNotFoundError:\n            print(f\"エラー: ファイルが見つかりません: {self.input_file}\")\n        except Exception as e:\n            print(f\"エラー: {str(e)}\")\n        \n        return results\n    \n    def _write_valid_customers(self, customers: List[Customer]) -> None:\n        \"\"\"有効な顧客データをCSVに出力\"\"\"\n        with open(self.output_file, 'w', encoding='utf-8', newline='') as outfile:\n            fieldnames = ['customer_id', 'name', 'email', 'phone', 'created_date']\n            writer = csv.DictWriter(outfile, fieldnames=fieldnames)\n            \n            writer.writeheader()\n            for customer in customers:\n                writer.writerow({\n                    'customer_id': customer.customer_id,\n                    'name': customer.name,\n                    'email': customer.email,\n                    'phone': customer.phone,\n                    'created_date': customer.created_date\n                })\n    \n    def _write_error_report(self, results: dict) -> None:\n        \"\"\"エラーレポートをテキストファイルに出力\"\"\"\n        with open(self.error_file, 'w', encoding='utf-8') as f:\n            f.write(f\"処理結果レポート\\n\")\n            f.write(f\"=\"*50 + \"\\n\\n\")\n            f.write(f\"総件数: {results['total']}\\n\")\n            f.write(f\"有効件数: {results['valid']}\\n\")\n            f.write(f\"エラー件数: {results['invalid']}\\n\\n\")\n            \n            if results['errors']:\n                f.write(f\"エラー詳細:\\n\")\n                for error in results['errors']:\n                    f.write(f\"  行{error.get('row', 'N/A')}: {error.get('reason', 'N/A')}\\n\")\n\n\n# 使用例\nif __name__ == \"__main__\":\n    processor = CustomerProcessor(\n        input_file=\"customers.csv\",\n        output_file=\"customers_valid.csv\",\n        error_file=\"customers_error.log\"\n    )\n    result = processor.process()\n    print(f\"処理完了: {result['valid']}/{result['total']}件が有効です\")\n

\n\n

3. JSON設定ファイルの読み込みと環境別管理

\n

異なる環境(開発、テスト、本番)で設定値を切り替える実装パターンです。

\n\n

import json\nimport os\nfrom typing import Any, Dict\nfrom pathlib import Path\n\nclass ConfigManager:\n    \"\"\"設定ファイル管理クラス\"\"\"\n    \n    def __init__(self, config_dir: str = \"config\"):\n        self.config_dir = Path(config_dir)\n        self.config: Dict[str, Any] = {}\n        self.environment = os.getenv('APP_ENV', 'development')\n    \n    def load(self) -> Dict[str, Any]:\n        \"\"\"環境に応じた設定ファイルを読み込み\"\"\"\n        # デフォルト設定を読み込み\n        default_config = self._load_config_file('default.json')\n        \n        # 環境別の設定をマージ\n        env_config = self._load_config_file(f'{self.environment}.json')\n        \n        # 深いマージ(ネストされた辞書に対応)\n        self.config = self._deep_merge(default_config, env_config)\n        \n        # 環境変数でのオーバーライド\n        self._apply_env_overrides()\n        \n        return self.config\n    \n    def _load_config_file(self, filename: str) -> Dict[str, Any]:\n        \"\"\"単一の設定ファイルを読み込み\"\"\"\n        config_path = self.config_dir / filename\n        \n        if not config_path.exists():\n            print(f\"警告: {filename} が見つかりません。空の設定で続行します\")\n            return {}\n        \n        try:\n            with open(config_path, 'r', encoding='utf-8') as f:\n                return json.load(f)\n        except json.JSONDecodeError as e:\n            raise ValueError(f\"JSONパース失敗 {filename}: {str(e)}\")\n        except Exception as e:\n            raise Exception(f\"設定ファイル読み込み失敗 {filename}: {str(e)}\")\n    \n    def _deep_merge(self, base: Dict, override: Dict) -> Dict:\n        \"\"\"ネストされた辞書をマージ\"\"\"\n        result = base.copy()\n        \n        for key, value in override.items():\n            if key in result and isinstance(result[key], dict) and isinstance(value, dict):\n                result[key] = self._deep_merge(result[key], value)\n            else:\n                result[key] = value\n        \n        return result\n    \n    def _apply_env_overrides(self) -> None:\n        \"\"\"環境変数で設定をオーバーライド\"\"\"\n        # 環境変数は APP_DATABASE_HOST のような形式で指定\n        # これが self.config['database']['host'] に対応\n        for key, value in os.environ.items():\n            if key.startswith('APP_'):\n                config_key = key[4:].lower()  # 'APP_' プレフィックスを削除\n                parts = config_key.split('_')\n                \n                # ネストされた設定に対応\n                current = self.config\n                for part in parts[:-1]:\n                    if part not in current:\n                        current[part] = {}\n                    current = current[part]\n                \n                current[parts[-1]] = value\n    \n    def get(self, key: str, default: Any = None) -> Any:\n        \"\"\"ドット記法で設定値を取得\"\"\"\n        keys = key.split('.')\n        value = self.config\n        \n        for k in keys:\n            if isinstance(value, dict):\n                value = value.get(k)\n                if value is None:\n                    return default\n            else:\n                return default\n        \n        return value\n    \n    def to_dict(self) -> Dict[str, Any]:\n        \"\"\"全設定を辞書として返す\"\"\"\n        return self.config.copy()\n\n\n# 使用例\nif __name__ == \"__main__\":\n    config = ConfigManager(config_dir=\"config\")\n    config.load()\n    \n    # 設定値を取得\n    db_host = config.get('database.host', 'localhost')\n    db_port = config.get('database.port', 5432)\n    api_timeout = config.get('api.timeout', 30)\n    \n    print(f\"Database: {db_host}:{db_port}\")\n    print(f\"API Timeout: {api_timeout}\")\n

\n\n

4. ファイルのバックアップと世代管理

\n

重要なデータファイルを定期的にバックアップし、古いファイルは自動削除する実装です。

\n\n

import shutil\nfrom datetime import datetime, timedelta\nfrom pathlib import Path\nimport gzip\n\nclass BackupManager:\n    \"\"\"ファイルバックアップ管理クラス\"\"\"\n    \n    def __init__(self, data_file: str, backup_dir: str, max_backups: int = 10):\n        self.data_file = Path(data_file)\n        self.backup_dir = Path(backup_dir)\n        self.max_backups = max_backups\n        self.backup_dir.mkdir(parents=True, exist_ok=True)\n    \n    def backup(self, compress: bool = True) -> str:\n        \"\"\"ファイルをバックアップ(オプションで圧縮)\"\"\"\n        if not self.data_file.exists():\n            raise FileNotFoundError(f\"バックアップ対象ファイルが見つかりません: {self.data_file}\")\n        \n        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')\n        backup_filename = f\"{self.data_file.stem}_{timestamp}\"\n        \n        if compress:\n            backup_path = self.backup_dir / f\"{backup_filename}.gz\"\n            self._compress_file(self.data_file, backup_path)\n        else:\n            backup_path = self.backup_dir / f\"{backup_filename}{self.data_file.suffix}\"\n            shutil.copy2(self.data_file, backup_path)\n        \n        # バックアップ数の制限をチェック\n        self._cleanup_old_backups()\n        \n        print(f\"バックアップ完了: {backup_path.name}\")\n        return str(backup_path)\n    \n    def _compress_file(self, source: Path, destination: Path) -> None:\n        \"\"\"ファイルをgzip圧縮\"\"\"\n        try:\n            with open(source, 'rb') as f_in:\n                with gzip.open(destination, 'wb') as f_out:\n                    shutil.copyfileobj(f_in, f_out)\n        except Exception as e:\n            raise Exception(f\"ファイル圧縮失敗: {str(e)}\")\n    \n    def _cleanup_old_backups(self) -> None:\n        \"\"\"古いバックアップファイルを削除\"\"\"\n        # バックアップファイルを修正日時でソート\n        backup_files = sorted(\n            self.backup_dir.glob(f\"{self.data_file.stem}_*\"),\n            key=lambda x: x.stat().st_mtime,\n            reverse=True\n        )\n        \n        # max_backupsより多い場合は古いものから削除\n        for old_backup in backup_files[self.max_backups:]:\n            try:\n                old_backup.unlink()\n                print(f\"古いバックアップを削除: {old_backup.name}\")\n            except Exception as e:\n                print(f\"削除失敗 {old_backup.name}: {str(e)}\")\n    \n    def restore(self, backup_filename: str) -> None:\n        \"\"\"バックアップからファイルを復元\"\"\"\n        backup_path = self.backup_dir / backup_filename\n        \n        if not backup_path.exists():\n            raise FileNotFoundError(f\"バックアップファイルが見つかりません: {backup_filename}\")\n        \n        try:\n            if backup_filename.endswith('.gz'):\n                with gzip.open(backup_path, 'rb') as f_in:\n                    with open(self.data_file, 'wb') as f_out:\n                        shutil.copyfileobj(f_in, f_out)\n            else:\n                shutil.copy2(backup_path, self.data_file)\n            \n            print(f\"復元完了: {self.data_file} <- {backup_filename}\")\n        except Exception as e:\n            raise Exception(f\"ファイル復元失敗: {str(e)}\")\n    \n    def list_backups(self) -> list:\n        \"\"\"バックアップファイルの一覧を取得\"\"\"\n        backups = []\n        for backup_file in sorted(\n            self.backup_dir.glob(f\"{self.data_file.stem}_*\"),\n            key=lambda x: x.stat().st_mtime,\n            reverse=True\n        ):\n            mtime = datetime.fromtimestamp(backup_file.stat().st_mtime)\n            size = backup_file.stat().st_size\n            backups.append({\n                'filename': backup_file.name,\n                'size': size,\n                'modified': mtime.strftime('%Y-%m-%d %H:%M:%S')\n            })\n        return backups\n\n\n# 使用例\nif __name__ == \"__main__\":\n    backup_mgr = BackupManager(\n        data_file=\"/data/important_data.db\",\n        backup_dir=\"/data/backups\",\n        max_backups=10\n    )\n    \n    # バックアップを作成\n    backup_mgr.backup(compress=True)\n    \n    # バックアップ一覧を表示\n    for backup in backup_mgr.list_backups():\n        print(f\"{backup['filename']} ({backup['size']} bytes) - {backup['modified']}\")\n

\n\n

よくある応用パターン

\n\n

パターン1:大規模ファイルの分割処理

\n

数GB規模のCSVファイルを一度に読み込むメモリを圧迫するため、チャンク単位で読み込む必要があります。

\n\n

def process_large_csv(input_file: str, chunk_size: int = 10000):\n    \"\"\"大規模CSVファイルをチャンクで処理\"\"\"\n    chunk = []\n    \n    with open(input_file, 'r', encoding='utf-8') as f:\n        reader = csv.DictReader(f)\n        \n        for row in reader:\n            chunk.append(row)\n            \n            if len(chunk) >= chunk_size:\n                # チャンク処理(例:データベースに一括挿入)\n                process_chunk(chunk)\n                chunk = []\n        \n        # 残りのデータを処理\n        if chunk:\n            process_chunk(chunk)\n\ndef process_chunk(chunk: list):\n    \"\"\"チャンクデータを処理(実装省略)\"\"\"\n    print(f\"{len(chunk)}件のデータを処理中...\")\n

\n\n

パターン2:複数のテキストファイルをマージ

\n

ディレクトリ内の複数のログファイルを1つのファイルに統合する場合です。

\n\n

def merge_files(input_dir: str, output_file: str, pattern: str = \"*.txt\"):\n    \"\"\"複数ファイルをマージ\"\"\"\n    input_path = Path(input_dir)\n    files_to_merge = sorted(input_path.glob(pattern))\n    \n    if not files_to_merge:\n        print(f\"マージ対象ファイルが見つかりません: {pattern}\")\n        return\n    \n    with open(output_file, 'w', encoding='utf-8') as outfile:\n        for file in files_to_merge:\n            outfile.write(f\"\\n=== {file.name} ===\\n\")\n            try:\n                with open(file, 'r', encoding='utf-8') as infile:\n                    outfile.write(infile.read())\n            except UnicodeDecodeError:\n                print(f\"警告: {file.name} をスキップ(文字コード不一致)\")\n                continue\n    \n    print(f\"{len(files_to_merge)}個のファイルを {output_file} にマージしました\")\n

\n\n

パターン3:ファイル監視と自動処理

\n

特定のディレクトリを監視し、新しいファイルが追加されたら自動的に処理を実行します。

\n\n

import time\nfrom pathlib import Path\n\nclass FileWatcher:\n    \"\"\"ファイル監視クラス\"\"\"\n    \n    def __init__(self, watch_dir: str, processed_dir: str):\n        self.watch_dir = Path(watch_dir)\n        self.processed_dir = Path(processed_dir)\n        self.processed_dir.mkdir(parents=True, exist_ok=True)\n        self.processed_files = set()\n    \n    def watch(self, file_pattern: str = \"*.csv\", interval: int = 5):\n        \"\"\"ディレクトリを監視し、新しいファイルを処理\"\"\"\n        print(f\"{self.watch_dir} を監視中...\")\n        \n        while True:\n            current_files = set(self.watch_dir.glob(file_pattern))\n            new_files = current_files - self.processed_files\n            \n            for file in new_files:\n                print(f\"新しいファイルを検出: {file.name}\")\n                try:\n                    self.process_file(file)\n                    self.processed_files.add(file)\n                except Exception as e:\n                    print(f\"処理失敗: {file.name} - {str(e)}\")\n            \n            time.sleep(interval)\n    \n    def process_file(self, file: Path):\n        \"\"\"ファイルを処理\"\"\"\n        # 実装は業務要件に応じて変更\n        with open(file, 'r', encoding='utf-8') as f:\n            content = f.read()\n            # 処理内容\n            print(f\

タイトルとURLをコピーしました