Python subprocessで業務自動化:実務パターン別実装ガイド

Python

Python subprocessで業務自動化:実務パターン別実装ガイド

Pythonで業務システムを構築していると、外部ツールやコマンドを呼び出す必要が頻繁に出てきます。画像変換、ファイル処理、データベースバックアップなど、既に存在するコマンドラインツールを活用することで、開発効率が大幅に向上します。その際に活躍するのがsubprocessモジュールです。本記事では、教科書的な例ではなく、実際の業務で使われているパターンを中心に解説します。

subprocess モジュールの基礎

subprocessは、Pythonから他のプロセスを起動・制御するための標準ライブラリです。シェルコマンドの実行、外部アプリケーションとの連携、システムコマンドの実行などに用いられます。

主な機能は以下の通りです:

  • 外部プロセスの起動
  • プロセスの入出力ストリーム制御
  • プロセスの終了コード取得
  • タイムアウト設定
  • 環境変数の設定

Python 3.5以降では、subprocess.run()の使用が推奨されており、より安全で使いやすいAPIが提供されています。

業務でのユースケース

企業のIT部門やシステム運用では、以下のような場面でsubprocessが活躍します:

1. 定期バッチ処理の外部コマンド実行

毎日夜間に実行される売上データの集計処理など、既存のコマンドラインツールをPythonスクリプトから呼び出すケースです。

2. ファイル形式の自動変換

ExcelをCSVに変換したり、画像をWebP形式に圧縮したりする際に、ImageMagickやLibreOfficeなどの外部ツールを活用します。

3. Gitやデータベースの自動操作

バージョン管理やバックアップの自動化では、gitコマンドやmysqldumpなどをPythonから制御します。

4. サーバーメンテナンス作業の自動化

ログローテーション、キャッシュクリア、システムの再起動通知など、複数のシステムコマンドを組合わせた自動化です。

5. 外部APIツールとの連携

CloudflareやAWSのCLIツール、Dockerコマンドなど、CLIで提供されているサービスとの連携です。

実務で使える実装コード

パターン1:基本的なコマンド実行と出力取得

最もシンプルで頻出するパターンです。外部コマンドを実行し、その結果を取得します。

import subprocess
import json
from datetime import datetime

def execute_backup_command(database_name: str, output_path: str) -> bool:
    """
    データベースのバックアップを実行する実務コード
    MySQLdumpを使用した実際の運用パターン
    """
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_file = f"{output_path}/backup_{database_name}_{timestamp}.sql"
    
    command = [
        "mysqldump",
        "-u", "backup_user",
        "-p" + "password",  # 本来は環境変数から取得
        "--single-transaction",
        "--quick",
        database_name
    ]
    
    try:
        with open(backup_file, 'w') as f:
            result = subprocess.run(
                command,
                stdout=f,
                stderr=subprocess.PIPE,
                text=True,
                timeout=3600  # 1時間でタイムアウト
            )
        
        if result.returncode == 0:
            print(f"バックアップ成功: {backup_file}")
            return True
        else:
            print(f"バックアップエラー: {result.stderr}")
            return False
            
    except subprocess.TimeoutExpired:
        print(f"バックアップがタイムアウトしました(1時間以上経過)")
        return False
    except Exception as e:
        print(f"予期しないエラー: {e}")
        return False

# 使用例
if __name__ == "__main__":
    success = execute_backup_command("production_db", "/backups")
    if not success:
        # アラート送信など
        pass

パターン2:複数コマンドのパイプ処理

unixのパイプを活用した複合処理も、subprocessで実装できます。ログファイルの解析などで頻出します。

import subprocess
from typing import List

def analyze_access_logs(log_file: str, limit: int = 10) -> List[tuple]:
    """
    Webサーバーのアクセスログから最頻出IPを取得
    実務:毎日朝に実行され、セキュリティ監視に使用される
    """
    try:
        # ステップ1: ログをカット
        cut_process = subprocess.Popen(
            ["cut", "-d", " ", "-f", "1", log_file],
            stdout=subprocess.PIPE
        )
        
        # ステップ2: ソート
        sort_process = subprocess.Popen(
            ["sort"],
            stdin=cut_process.stdout,
            stdout=subprocess.PIPE
        )
        
        # ステップ3: ユニーク化とカウント
        uniq_process = subprocess.Popen(
            ["uniq", "-c"],
            stdin=sort_process.stdout,
            stdout=subprocess.PIPE
        )
        
        # ステップ4: ソート(逆順)
        final_result = subprocess.run(
            ["sort", "-rn"],
            stdin=uniq_process.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            timeout=60
        )
        
        if final_result.returncode != 0:
            print(f"ログ解析エラー: {final_result.stderr}")
            return []
        
        # 結果をパース
        results = []
        for line in final_result.stdout.split('\n')[:limit]:
            if line.strip():
                parts = line.strip().split()
                count = int(parts[0])
                ip = parts[1]
                results.append((ip, count))
        
        return results
        
    except subprocess.TimeoutExpired:
        print("ログ解析がタイムアウト")
        return []
    except Exception as e:
        print(f"解析中にエラー: {e}")
        return []

# 使用例
if __name__ == "__main__":
    top_ips = analyze_access_logs("/var/log/apache2/access.log")
    for ip, count in top_ips:
        print(f"{ip}: {count}件")

パターン3:インタラクティブなプロセス制御

Gitの操作など、対話的な処理が必要なケースもあります。入出力をリアルタイムで制御します。

import subprocess
import os

def git_auto_commit(repo_path: str, message: str, branch: str = "main") -> bool:
    """
    Gitリポジトリへの自動コミット
    実務:夜間バッチで自動生成されたレポートをコミット
    """
    try:
        os.chdir(repo_path)
        
        # ステージに追加
        result = subprocess.run(
            ["git", "add", "."],
            capture_output=True,
            text=True,
            timeout=30
        )
        
        if result.returncode != 0:
            print(f"git add エラー: {result.stderr}")
            return False
        
        # ステータス確認
        status = subprocess.run(
            ["git", "status", "--porcelain"],
            capture_output=True,
            text=True
        )
        
        # 変更がない場合はスキップ
        if not status.stdout.strip():
            print("変更がありません")
            return True
        
        # コミット実行
        result = subprocess.run(
            ["git", "commit", "-m", message],
            capture_output=True,
            text=True,
            timeout=30
        )
        
        if result.returncode != 0:
            print(f"git commit エラー: {result.stderr}")
            return False
        
        # プッシュ実行
        result = subprocess.run(
            ["git", "push", "origin", branch],
            capture_output=True,
            text=True,
            timeout=60
        )
        
        if result.returncode != 0:
            print(f"git push エラー: {result.stderr}")
            return False
        
        print(f"正常にコミットしました: {message}")
        return True
        
    except subprocess.TimeoutExpired:
        print("Git操作がタイムアウト")
        return False
    except Exception as e:
        print(f"エラー: {e}")
        return False

# 使用例
if __name__ == "__main__":
    success = git_auto_commit(
        "/var/reports",
        "Daily report generation"
    )

パターン4:環境変数付きのプロセス実行

外部ツールがAPI キーなどの環境変数に依存する場合、セキュアに渡す必要があります。

import subprocess
import os
from typing import Optional

def deploy_with_docker(image_name: str, registry_url: str, api_key: str) -> bool:
    """
    Dockerイメージをプッシュしてデプロイ
    実務:CI/CDパイプラインの一部として使用
    """
    try:
        # 親プロセスの環境変数をコピー
        env = os.environ.copy()
        
        # APIキーを追加(機密情報)
        env['DOCKER_CONFIG'] = '/tmp/docker_config'
        env['REGISTRY_PASSWORD'] = api_key
        
        # ログイン
        login_cmd = [
            "docker", "login",
            "-u", "deployment_user",
            "--password-stdin",
            registry_url
        ]
        
        result = subprocess.run(
            login_cmd,
            input=api_key,
            capture_output=True,
            text=True,
            timeout=30,
            env=env
        )
        
        if result.returncode != 0:
            print(f"Dockerログインエラー: {result.stderr}")
            return False
        
        # イメージをタグ付け
        full_image_name = f"{registry_url}/{image_name}:latest"
        tag_cmd = ["docker", "tag", image_name, full_image_name]
        
        result = subprocess.run(
            tag_cmd,
            capture_output=True,
            text=True,
            timeout=30
        )
        
        if result.returncode != 0:
            print(f"Dockerタグエラー: {result.stderr}")
            return False
        
        # プッシュ
        push_cmd = ["docker", "push", full_image_name]
        
        result = subprocess.run(
            push_cmd,
            capture_output=True,
            text=True,
            timeout=300
        )
        
        if result.returncode != 0:
            print(f"Dockerプッシュエラー: {result.stderr}")
            return False
        
        print(f"デプロイ成功: {full_image_name}")
        return True
        
    except subprocess.TimeoutExpired:
        print("Docker操作がタイムアウト")
        return False
    except Exception as e:
        print(f"エラー: {e}")
        return False

# 使用例
if __name__ == "__main__":
    api_key = os.getenv('REGISTRY_API_KEY')
    success = deploy_with_docker(
        "my-app:v1.0",
        "registry.example.com",
        api_key
    )

パターン5:エラーハンドリングとリトライ機構

本番環境では、ネットワーク障害やタイムアウトへの対応が必須です。実務レベルのエラーハンドリングを示します。

import subprocess
import time
import logging
from typing import Optional
from functools import wraps

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def retry_on_failure(max_retries: int = 3, delay: int = 5):
    """
    関数のリトライデコレータ
    一時的なエラーに対応
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except subprocess.TimeoutExpired as e:
                    last_exception = e
                    logger.warning(
                        f"タイムアウト(試行 {attempt + 1}/{max_retries}): {e}"
                    )
                    if attempt < max_retries - 1:
                        time.sleep(delay)
                except subprocess.CalledProcessError as e:
                    last_exception = e
                    logger.warning(
                        f"プロセスエラー(試行 {attempt + 1}/{max_retries}): {e}"
                    )
                    if attempt < max_retries - 1:
                        time.sleep(delay)
                except Exception as e:
                    # リトライ不可能なエラーはすぐに失敗
                    logger.error(f"リトライ不可能なエラー: {e}")
                    raise
            
            # すべてのリトライが失敗した場合
            logger.error(f"最大リトライ回数に達しました")
            raise last_exception
        
        return wrapper
    return decorator

@retry_on_failure(max_retries=3, delay=10)
def export_data_to_s3(bucket_name: str, file_path: str) -> bool:
    """
    データをAWS S3にエクスポート
    実務:大規模ファイル転送でエラーが発生しやすいため、リトライ付き
    """
    command = [
        "aws", "s3", "cp",
        file_path,
        f"s3://{bucket_name}/data/",
        "--storage-class", "STANDARD_IA",
        "--sse", "AES256"
    ]
    
    result = subprocess.run(
        command,
        capture_output=True,
        text=True,
        timeout=600,  # 10分
        check=True  # 0以外のリターンコードで例外発生
    )
    
    logger.info(f"S3アップロード完了: {file_path}")
    return True

# 使用例
if __name__ == "__main__":
    try:
        export_data_to_s3("my-bucket", "/tmp/export.csv")
    except Exception as e:
        logger.critical(f"エクスポート失敗: {e}")
        # アラート送信など

よくある応用パターン

標準出力をリアルタイム処理

長時間実行されるコマンドから、出力をリアルタイムで処理する必要がある場合があります。

import subprocess
import sys

def stream_command_output(command: list):
    """
    コマンドの出力をリアルタイムで表示・処理
    実務:ビルド処理やテスト実行の進捗監視
    """
    try:
        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1  # 行バッファリング
        )
        
        for line in process.stdout:
            # リアルタイムで処理
            line = line.rstrip()
            print(f"[プロセス出力] {line}")
            
            # 特定の文字列に対する処理
            if "ERROR" in line:
                print(f"エラーを検出しました: {line}")
            elif "WARNING" in line:
                print(f"警告を検出しました: {line}")
        
        process.wait(timeout=300)
        
        if process.returncode == 0:
            print("プロセス正常終了")
        else:
            print(f"プロセス異常終了: コード {process.returncode}")
        
        return process.returncode
        
    except subprocess.TimeoutExpired:
        process.kill()
        print("プロセスがタイムアウトしました")
        return -1

# 使用例
if __name__ == "__main__":
    stream_command_output(["python", "long_running_task.py"])

複数プロセスの並列実行

複数のコマンドを並列に実行し、すべての完了を待つパターンです。

import subprocess
import concurrent.futures
from typing import List, Tuple

def run_parallel_jobs(jobs: List[Tuple[str, list]]) -> dict:
    """
    複数のジョブを並列実行
    実務:複数サーバーへのバックアップ、データベース複数インスタンスの処理など
    """
    results = {}
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = {}
        
        for job_name, command in jobs:
            future = executor.submit(
                subprocess.run,
                command,
                capture_output=True,
                text=True,
                timeout=300
            )
            futures[future] = job_name
        
        for future in concurrent.futures.as_completed(futures):
            job_name = futures[future]
            try:
                result = future.result()
                results[job_name] = {
                    'status': 'success' if result.returncode == 0 else 'failed',
                    'returncode': result.returncode,
                    'stdout': result.stdout[:500],  # 最初の500文字のみ
                    'stderr': result.stderr[:500]
                }
            except Exception as e:
                results[job_name] = {
                    'status': 'error',
                    'message': str(e)
                }
    
    return results

# 使用例
if __name__ == "__main__":
    jobs = [
        ("バックアップDB1", ["mysqldump", "-u", "user", "db1", ">", "/tmp/db1.sql"]),
        ("バックアップDB2", ["mysqldump", "-u", "user", "db2", ">", "/tmp/db2.sql"]),
        ("バックアップDB3", ["mysqldump", "-u", "user", "db3", ">", "/tmp/db3.sql"]),
    ]
    
    results = run_parallel_jobs(jobs)
    
    for job_name, result in results.items():
        print(f"{job_name}: {result['status']}")
        if result['status'] != 'success':
            print(f"  エラー: {result.get('stderr', result.get('message'))}")

注意点と落とし穴

1. セキュリティ:シェルインジェクション対策

ユーザー入力を含むコマンド文字列は危険です。常にリスト形式で渡しましょう。

import subprocess

# ❌ 危険:シェルインジェクション可能
user_input = "test; rm -rf /"
subprocess.run(f"echo {user_input}", shell=True)

# ✅ 安全:リスト形式を使用
subprocess.run(["echo", user_input])

2. リソースリーク:パイプの正しい扱い

パイプを使用する場合、すべてのプロセスの出力を読み込まないとデッドロックが発生します。

import subprocess

# ❌ デッドロック可能
p1 = subprocess.Popen(["cat", "large_file.txt"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["grep", "pattern"], stdin=p1.stdout, stdout=subprocess.PIPE)
# p1.stdout を閉じずに p2.stdout を読まずに p2.wait() するとデッドロック

# ✅ 安全
p1 = subprocess.Popen(["cat", "large_file.txt"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["grep", "pattern"], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()  # p1.stdoutを明示的に閉じる
output, _ = p2.communicate()  # communicate()は自動的に待機と出力読み込み

3. エンコーディング問題

テキストベースの出力を扱う際は、エンコーディングに注意が必要です。

import subprocess
import locale

# ❌ 環境によってエラーが発生する可能性
result = subprocess.run(
    ["ls", "-la"],
    capture_output=True
)  # バイナリ形式で返される

# ✅ 明示的にエンコーディングを指定
result = subprocess.run(
    ["ls", "-la"],
    capture_output=True,
    text=True,  # テキスト形式で返される
    encoding='utf-8'
)

# または
result = subprocess.run(
    ["ls", "-la"],
    capture_output=True,
    encoding=locale.getpreferredencoding(False)
)

4. タイムアウト設定の忘れ

無限ループに入ったプロセスを防ぐため、必ずタイムアウトを設定しましょう。

import subprocess

# ❌ タイムアウトなし:プロセスが無限に待機する可能性
result = subprocess.run(["some_command"])

# ✅ タイムアウト付き
try:
    result = subprocess.run(
        ["some_command"],
        timeout=30  # 30秒でタイムアウト
    )
except subprocess.TimeoutExpired:
    print("タイムアウト発生")

5. 大容量出力の取得

大量の出力を持つプロセスの場合、メモリが枯渇する可能性があります。

import subprocess

# ❌ 大容量出力の場合メモリ不足
result = subprocess.run(
    ["cat", "1GB_file.txt"],
    capture_output=True
)
print(len(result.stdout))  # メモリ上に全データ

# ✅ ストリーム処理
process = subprocess.Popen(
    ["cat", "1GB_file.txt"],
    stdout=subprocess.PIPE,
    text=True
)

for line in process.stdout:
    # 1行ずつ処理
    process_line(line)

process.wait()

まとめ

Pythonのsubprocessモジュールは、外部ツール・コマンドとの連携を実現する強力なツールです。本記事で紹介した5つの実務パターンと注意点を押さえることで、以下のような業務の自動化が実現できます:

  • データベースのバックアップ・リストア
  • ファイル形式の自動変換
  • ログファイルの自動解析
  • Gitなどのバージョン管理との連携
  • Docker・AWS CLIなどクラウドツールの自動操作

重要なのは、実務での使用を想定したエラーハンドリング、リトライ機構、タイムアウト設定です。教科書的なコードではなく、本番環境で耐える堅牢なコードを心がけることで、運用負荷を大幅に削減できます。

セキュリティ面では、シェルインジェクション対策とリソースリークの防止が必須です。常にリスト形式でコマンドを指定し、パイプ処理ではすべての出力を正しく処理することを忘れずにください。

業務の自動化を通じて、チーム全体の生産性向上に貢献する仕組みを構築していきましょう。

タイトルとURLをコピーしました