Python pandas DataFrameの実務的な使い方|データ処理の実装パターン集

Python

Python pandas DataFrameの実務的な使い方|データ処理の実装パターン集

はじめに

Pythonでデータ処理を行う際、pandas DataFrameは避けて通れません。しかし、教科書的な使い方と実務での使い方には大きな隔たりがあります。本記事では、実際のプロジェクトで遭遇する課題と、それを解決するための実装パターンを紹介します。

pandas DataFrameの基礎理解

pandas DataFrameは、表形式のデータ構造です。Excelのスプレッドシートのようなもので、行と列からなり、様々な型のデータを扱えます。実務では単なるデータ保持だけでなく、データの加工、変換、集計、検証といった複数の処理を効率的に行う必要があります。

DataFrameの基本的な構造:

  • インデックス(行のラベル)
  • カラム(列のラベル)
  • 複数のデータ型をサポート
  • 欠損値の処理機能
  • 統計関数の豊富さ

これらの特性を活かすことが、実務での効率化の鍵となります。

実務でのユースケース

実務では、以下のようなシーンでDataFrameが活躍します:

1. CSV/Excelファイルの読み込みと加工

営業データ、顧客情報、売上レポートなど、外部ファイルから定期的にデータを読み込み、必要な形式に変換して分析に使用します。

2. データベースからの抽出と集計

SQLクエリで取得したデータをDataFrameに変換し、複雑な集計や分析を行う場合があります。

3. 複数データソースの統合

異なるシステムからのデータを統合し、一つの分析基盤として整形することが多いです。

4. データ品質チェックと異常検知

本処理前の前処理として、欠損値や異常値の検出と対処が必須です。

5. 定期的なレポート生成

毎日・毎週のレポート自動生成は実務でよく遭遇するニーズです。

実装コード:実務パターン集

パターン1:複数ファイルの読み込みと統合

実務では、日次ファイルなど複数のCSVを一つのDataFrameに統合することが頻繁にあります。

import pandas as pd
import glob
import os
from pathlib import Path

# 複数のCSVファイルを読み込んで統合
def load_and_combine_csv_files(directory_path: str, pattern: str = '*.csv') -> pd.DataFrame:
    \"\"\"
    指定ディレクトリの複数CSVファイルを読み込んで統合
    
    Args:
        directory_path: ファイルを探すディレクトリ
        pattern: ファイルのパターン(デフォルト: *.csv)
    
    Returns:
        統合されたDataFrame
    \"\"\"
    file_list = glob.glob(os.path.join(directory_path, pattern))
    
    if not file_list:
        raise FileNotFoundError(f'No files found in {directory_path}')
    
    dataframes = []
    
    for file_path in sorted(file_list):
        try:
            # ファイル名から日付を抽出する場合もある
            df = pd.read_csv(
                file_path,
                encoding='utf-8',
                dtype={'customer_id': str, 'product_code': str}  # 型を明示的に指定
            )
            df['source_file'] = Path(file_path).name  # ファイル追跡用
            dataframes.append(df)
            print(f'Successfully loaded: {Path(file_path).name}')
        except Exception as e:
            print(f'Error loading {file_path}: {e}')
            continue
    
    # DataFrameを縦に統合
    combined_df = pd.concat(dataframes, ignore_index=True)
    
    return combined_df

# 使用例
df = load_and_combine_csv_files('./data/daily_reports')
print(f'Combined shape: {df.shape}')
print(df.head())

パターン2:データクリーニングと前処理

実務では、読み込んだデータがそのまま使える状態にあることは稀です。欠損値、重複、異常値への対処が必須です。

def clean_and_preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
    \"\"\"
    実務でよく必要になるデータクリーニング処理
    \"\"\"
    df = df.copy()  # 元データを保護
    
    # 1. 重複行の削除(全列で重複判定)
    initial_rows = len(df)
    df = df.drop_duplicates(subset=None, keep='first')
    print(f'Removed {initial_rows - len(df)} duplicate rows')
    
    # 2. 欠損値の確認
    missing_info = df.isnull().sum()
    if missing_info.any():
        print(f'Missing values found:\\n{missing_info[missing_info > 0]}')
    
    # 3. 欠損値の処理(戦略によって異なる)
    # 数値列:平均値で補填
    numeric_columns = df.select_dtypes(include=['number']).columns
    for col in numeric_columns:
        if df[col].isnull().any():
            df[col].fillna(df[col].mean(), inplace=True)
    
    # カテゴリ列:'Unknown'で補填
    categorical_columns = df.select_dtypes(include=['object']).columns
    for col in categorical_columns:
        if df[col].isnull().any():
            df[col].fillna('Unknown', inplace=True)
    
    # 4. 日付列の変換
    if 'order_date' in df.columns:
        df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
    
    # 5. 異常値の検出(数値列の外れ値)
    for col in numeric_columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
        if len(outliers) > 0:
            print(f'Found {len(outliers)} outliers in {col}')
    
    # 6. 文字列列の空白を削除
    for col in categorical_columns:
        df[col] = df[col].str.strip()
    
    # 7. 不要な列の削除
    columns_to_drop = [col for col in df.columns if df[col].isnull().sum() > len(df) * 0.5]
    if columns_to_drop:
        print(f'Dropping columns with >50% missing: {columns_to_drop}')
        df = df.drop(columns=columns_to_drop)
    
    return df

# 使用例
df_clean = clean_and_preprocess_data(df)
print(f'Cleaned data shape: {df_clean.shape}')

パターン3:複雑な集計と分析

実務では、複数の条件で集計し、複数の集計関数を同時に適用することが多いです。

def aggregate_sales_data(df: pd.DataFrame) -> pd.DataFrame:
    \"\"\"
    営業データの複雑な集計例
    \"\"\"
    # グループ化と複数の集計関数の同時適用
    aggregation_dict = {
        'amount': ['sum', 'count', 'mean', 'min', 'max'],
        'quantity': ['sum', 'mean'],
        'customer_id': 'nunique'
    }
    
    # 月別・商品別の集計
    df['year_month'] = df['order_date'].dt.to_period('M')
    
    summary = df.groupby(['year_month', 'product_category']).agg(aggregation_dict)
    
    # マルチレベルインデックスを整理
    summary.columns = ['_'.join(col).strip() for col in summary.columns.values]
    summary = summary.reset_index()
    
    # カラム名を分かりやすく
    summary.columns = ['year_month', 'product_category', 'total_amount', 'transaction_count',
                      'avg_amount', 'min_amount', 'max_amount', 'total_quantity',
                      'avg_quantity', 'unique_customers']
    
    return summary

# より複雑な集計パターン:ピボットテーブル
def create_pivot_summary(df: pd.DataFrame) -> pd.DataFrame:
    \"\"\"
    ピボットテーブルで商品別×月別の売上を作成
    \"\"\"
    df['year_month'] = df['order_date'].dt.to_period('M')
    
    pivot_df = df.pivot_table(
        values='amount',
        index='year_month',
        columns='product_category',
        aggfunc='sum',
        fill_value=0,
        margins=True  # 合計行を追加
    )
    
    return pivot_df

# 使用例
summary_df = aggregate_sales_data(df_clean)
print(summary_df.head(10))

pivot_df = create_pivot_summary(df_clean)
print(pivot_df)

パターン4:新しい列の動的生成と変換

実務では、既存の列から新しい情報を生成することがよくあります。

def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    \"\"\"
    既存のデータから新しい有用な列を生成
    \"\"\"
    df = df.copy()
    
    # 1. 日付関連の列を生成
    df['order_date'] = pd.to_datetime(df['order_date'])
    df['year'] = df['order_date'].dt.year
    df['month'] = df['order_date'].dt.month
    df['quarter'] = df['order_date'].dt.quarter
    df['day_of_week'] = df['order_date'].dt.day_name()
    df['is_weekend'] = df['order_date'].dt.dayofweek.isin([5, 6])
    
    # 2. 金額関連の計算
    df['tax'] = df['amount'] * 0.1
    df['total_with_tax'] = df['amount'] + df['tax']
    
    # 3. カテゴリの統一と変換
    df['product_category'] = df['product_category'].str.upper().str.strip()
    
    # 4. 顧客の分類(RFM分析など)
    customer_stats = df.groupby('customer_id').agg({
        'order_date': 'max',
        'amount': ['sum', 'count']
    }).reset_index()
    customer_stats.columns = ['customer_id', 'last_purchase_date', 'total_spent', 'purchase_count']
    
    # 顧客セグメントの定義
    def segment_customer(row):
        if row['total_spent'] > 100000:
            return 'VIP'
        elif row['total_spent'] > 50000:
            return 'Gold'
        elif row['total_spent'] > 10000:
            return 'Silver'
        else:
            return 'Bronze'
    
    customer_stats['customer_segment'] = customer_stats.apply(segment_customer, axis=1)
    
    # 元のDataFrameに結合
    df = df.merge(customer_stats[['customer_id', 'customer_segment']], 
                  on='customer_id', how='left')
    
    return df

# 使用例
df_enriched = enrich_dataframe(df_clean)
print(df_enriched[['order_date', 'year', 'month', 'quarter', 'customer_segment']].head())

パターン5:データの検証とレポート生成

実務では、処理の各段階でデータが正しいか確認する必要があります。

def validate_and_report(df: pd.DataFrame) -> dict:
    \"\"\"
    データの品質を検証し、レポートを生成
    \"\"\"
    validation_report = {}
    
    # 1. 行数と列数
    validation_report['total_rows'] = len(df)
    validation_report['total_columns'] = len(df.columns)
    
    # 2. 欠損値の比率
    missing_ratio = (df.isnull().sum() / len(df) * 100).round(2)
    validation_report['missing_values'] = missing_ratio[missing_ratio > 0].to_dict()
    
    # 3. データ型の確認
    validation_report['data_types'] = df.dtypes.astype(str).to_dict()
    
    # 4. 数値列の統計情報
    validation_report['numeric_statistics'] = df.describe().to_dict()
    
    # 5. カテゴリ列のユニーク値数
    categorical_cols = df.select_dtypes(include=['object']).columns
    for col in categorical_cols:
        unique_count = df[col].nunique()
        validation_report[f'{col}_unique_count'] = unique_count
    
    # 6. 日付の範囲確認
    date_cols = df.select_dtypes(include=['datetime64']).columns
    for col in date_cols:
        validation_report[f'{col}_min'] = df[col].min()
        validation_report[f'{col}_max'] = df[col].max()
    
    # 7. ビジネスルールの検証
    if 'amount' in df.columns:
        negative_amounts = len(df[df['amount'] < 0])
        validation_report['negative_amounts'] = negative_amounts
    
    if 'quantity' in df.columns:
        zero_quantities = len(df[df['quantity'] == 0])
        validation_report['zero_quantities'] = zero_quantities
    
    return validation_report

# 使用例
report = validate_and_report(df_enriched)
for key, value in report.items():
    print(f'{key}: {value}')

よくある応用パターン

パターン1:Excelファイルへの書き込みと複数シート

実務では、分析結果をExcelファイルで納品することが多いです。複数シートを含むファイル生成は頻出パターンです。

def export_to_excel(df_dict: dict, output_path: str):
    \"\"\"
    複数のDataFrameを複数シートを持つExcelファイルに書き込み
    
    Args:
        df_dict: {'シート名': DataFrame} の辞書
        output_path: 出力ファイルパス
    \"\"\"
    with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
        for sheet_name, df in df_dict.items():
            df.to_excel(writer, sheet_name=sheet_name, index=False)
            
            # シートの列幅を自動調整する場合
            worksheet = writer.sheets[sheet_name]
            for idx, col in enumerate(df.columns):
                max_length = max(
                    df[col].astype(str).map(len).max(),
                    len(col)
                )
                worksheet.column_dimensions[chr(65 + idx)].width = min(max_length + 2, 50)

# 使用例
output_dict = {
    '売上集計': summary_df,
    'ピボット': pivot_df.reset_index(),
    '顧客分析': df_enriched[['customer_id', 'customer_segment', 'amount']].drop_duplicates(),
    'データ品質': pd.DataFrame(report.items(), columns=['Item', 'Value'])
}

export_to_excel(output_dict, './output/analysis_report.xlsx')

パターン2:時系列データの処理

売上や在庫など、時系列データの処理は実務で頻繁に登場します。

def process_timeseries_data(df: pd.DataFrame) -> pd.DataFrame:
    \"\"\"
    時系列データの処理と分析
    \"\"\"
    df = df.copy()
    df['date'] = pd.to_datetime(df['date'])
    df = df.sort_values('date')
    
    # 移動平均の計算(トレンド把握)
    df['moving_avg_7d'] = df['amount'].rolling(window=7, min_periods=1).mean()
    df['moving_avg_30d'] = df['amount'].rolling(window=30, min_periods=1).mean()
    
    # 前日比の計算
    df['daily_change'] = df['amount'].diff()
    df['daily_change_pct'] = df['amount'].pct_change() * 100
    
    # 前年同日比(年次比較)
    df['year'] = df['date'].dt.year
    df['month_day'] = df['date'].dt.strftime('%m-%d')
    
    # 季節性の分析
    df['season'] = df['date'].dt.month.apply(
        lambda x: 'Q1' if x <= 3 else ('Q2' if x <= 6 else ('Q3' if x <= 9 else 'Q4'))
    )
    
    return df

# 使用例
df_timeseries = process_timeseries_data(df_clean)
print(df_timeseries[['date', 'amount', 'moving_avg_7d', 'daily_change_pct']].head(20))

パターン3:条件付きの複雑なフィルタリング

複数の条件でデータをフィルタリングすることは日常茶飯事です。

def complex_filtering(df: pd.DataFrame) -> dict:
    \"\"\"
    複数の条件でデータをフィルタリング
    \"\"\"
    results = {}
    
    # 条件1:特定期間の高額取引
    df['order_date'] = pd.to_datetime(df['order_date'])
    high_value_recent = df[
        (df['order_date'] >= '2024-01-01') &
        (df['amount'] > df['amount'].quantile(0.75))
    ]
    results['high_value_recent'] = high_value_recent
    
    # 条件2:特定カテゴリで購入回数が多い顧客
    customer_counts = df.groupby('customer_id').size()
    frequent_customers = customer_counts[customer_counts > 10].index
    frequent_category_buyers = df[
        (df['customer_id'].isin(frequent_customers)) &
        (df['product_category'].isin(['Electronics', 'Home']))
    ]
    results['frequent_category_buyers'] = frequent_category_buyers
    
    # 条件3:異常値を除外したデータ
    Q1 = df['amount'].quantile(0.25)
    Q3 = df['amount'].quantile(0.75)
    IQR = Q3 - Q1
    normal_range = df[
        (df['amount'] >= Q1 - 1.5 * IQR) &
        (df['amount'] <= Q3 + 1.5 * IQR)
    ]
    results['normal_range'] = normal_range
    
    return results

# 使用例
filtered_results = complex_filtering(df_enriched)
for key, result_df in filtered_results.items():
    print(f'{key}: {len(result_df)} rows')

実務での注意点

1. メモリ効率を考慮した大規模データ処理

大規模なCSVやデータベースからデータを読む際、全データをメモリに読み込むと問題が発生します。チャンク処理を検討しましょう。

def read_large_csv_in_chunks(filepath: str, chunksize: int = 10000):
    \"\"\"
    大規模CSVをチャンク単位で読み込み処理
    \"\"\"
    chunks = []
    for chunk in pd.read_csv(filepath, chunksize=chunksize):
        # 各チャンクに対して処理
        chunk_cleaned = chunk.dropna()
        chunk_cleaned['processed'] = True
        chunks.append(chunk_cleaned)
    
    # 必要に応じてチャンクを統合
    df = pd.concat(chunks, ignore_index=True)
    return df

2. インデックスの適切な設定

大規模なマージ操作を行う場合、インデックスを設定することで処理速度が大幅に向上します。

# 不効率な書き方
result = df1.merge(df2, on='customer_id', how='left')

# より効率的な書き方
df1 = df1.set_index('customer_id')
df2 = df2.set_index('customer_id')
result = df1.join(df2, how='left')
result = result.reset_index()

3. コピーとビューの違いの理解

DataFrameの変更が思わぬ副作用を起こすことがあります。必要に応じて明示的にコピーしましょう。

# 元のDataFrameを保護
df_working = df.copy()  # ディープコピー

# 特定の列だけが必要な場合
df_subset = df[['col1', 'col2']].copy()

4. チェーンの終わりでコピーを取る

複数の操作を連鎖させる場合、最後にコピーを取ることでパフォーマンスを向上させます。

# 効率的なチェーン処理
df = (df
    .drop(columns=['unnecessary'])
    .fillna(0)
    .query('amount > 100')
    .assign(year=lambda x: x['date'].dt.year)
    .copy()  # チェーンの最後でコピー
)

5. エラーハンドリングの重要性

実務では、予期しないデータ形式やエラーに対処する必要があります。

def safe_data_processing(filepath: str) -> pd.DataFrame:
    \"\"\"
    エラーハンドリングを含むロバストなデータ処理
    \"\"\"
    try:
        df = pd.read_csv(filepath, encoding='utf-8')
    except UnicodeDecodeError:
        print('UTF-8でのデコード失敗。Shift-JISを試行します')
        df = pd.read_csv(filepath, encoding='shift_jis')
    except FileNotFoundError:
        print(f'ファイルが見つかりません: {filepath}')
        return pd.DataFrame()
    except Exception as e:
        print(f'予期しないエラーが発生しました: {e}')
        return pd.DataFrame()
    
    # データ検証
    required_columns = ['customer_id', 'amount', 'date']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f'必須カラムが見つかりません: {missing_columns}')
    
    return df

6. 命名規則と可読性

実務では、後で自分や他のメンバーがコードを読むことを想定しましょう。変数名は明確に。

# 悪い例
df = df[df['a'] > 100]
x = df.groupby('b')['c'].sum()

# 良い例
high_value_transactions = df[df['amount'] > 100]
sales_by_category = high_value_transactions.groupby('product_category')['amount'].sum()

パフォーマンス最適化のコツ

実務では、処理速度も重要です。以下のポイントに注意しましょう。

  • dtypeの明示的指定:読み込み時に型を指定することで高速化
  • 不要な列の早期削除:処理を進める前に不要な列を削除
  • ジェネレータとitertuples()の活用:ループが必要な場合はapplyより高速
  • カテゴリ型の活用:メモリ効率が良く、処理も高速
  • クエリの使用:複雑なフィルタリングはquery()メソッドが効率的
# 高速な行処理の例
for idx, row in df.itertuples(index=False):
    # 処理
    pass

# または
for row in df.values:
    # 処理
    pass

# カテゴリ型の活用
df['product_category'] = df['product_category'].astype('category')

# queryメソッドの活用
result = df.query('amount > @threshold and date >= @start_date')

まとめ

pandas DataFrameは、Pythonでのデータ処理に不可欠なツールです。本記事で紹介した実装パターンは、実務で実際に頻繁に遭遇するシーンばかりです。

重要なポイントをもう一度:

  1. 前処理が8割:分析の質は、データの品質に大きく依存します
  2. エラーハンドリング:予期しないデータに対応する余裕を持たせましょう
  3. パフォーマンス意識:大規模データを扱う際は、メモリと処理速度を常に念頭に
  4. コードの可読性:自分や他のチームメンバーが後で理解できるコードを書く
  5. ドキュメント化:関数にはdocstringを付け、複雑なロジックにはコメントを付ける

これらの実装パターンとベストプラクティスをマスターすれば、実務でのデータ処理作業は格段に効率化されます。最初は複雑に感じるかもしれませんが、繰り返し使うことで自然と身につきます。実際のプロジェクトで試しながら、自分のコーディングスタイルを確立していってください。

タイトルとURLをコピーしました