Python pandas DataFrameの実務的な使い方|データ処理の実装パターン集
はじめに
Pythonでデータ処理を行う際、pandas DataFrameは避けて通れません。しかし、教科書的な使い方と実務での使い方には大きな隔たりがあります。本記事では、実際のプロジェクトで遭遇する課題と、それを解決するための実装パターンを紹介します。
pandas DataFrameの基礎理解
pandas DataFrameは、表形式のデータ構造です。Excelのスプレッドシートのようなもので、行と列からなり、様々な型のデータを扱えます。実務では単なるデータ保持だけでなく、データの加工、変換、集計、検証といった複数の処理を効率的に行う必要があります。
DataFrameの基本的な構造:
- インデックス(行のラベル)
- カラム(列のラベル)
- 複数のデータ型をサポート
- 欠損値の処理機能
- 統計関数の豊富さ
これらの特性を活かすことが、実務での効率化の鍵となります。
実務でのユースケース
実務では、以下のようなシーンでDataFrameが活躍します:
1. CSV/Excelファイルの読み込みと加工
営業データ、顧客情報、売上レポートなど、外部ファイルから定期的にデータを読み込み、必要な形式に変換して分析に使用します。
2. データベースからの抽出と集計
SQLクエリで取得したデータをDataFrameに変換し、複雑な集計や分析を行う場合があります。
3. 複数データソースの統合
異なるシステムからのデータを統合し、一つの分析基盤として整形することが多いです。
4. データ品質チェックと異常検知
本処理前の前処理として、欠損値や異常値の検出と対処が必須です。
5. 定期的なレポート生成
毎日・毎週のレポート自動生成は実務でよく遭遇するニーズです。
実装コード:実務パターン集
パターン1:複数ファイルの読み込みと統合
実務では、日次ファイルなど複数のCSVを一つのDataFrameに統合することが頻繁にあります。
import pandas as pd
import glob
import os
from pathlib import Path
# 複数のCSVファイルを読み込んで統合
def load_and_combine_csv_files(directory_path: str, pattern: str = '*.csv') -> pd.DataFrame:
\"\"\"
指定ディレクトリの複数CSVファイルを読み込んで統合
Args:
directory_path: ファイルを探すディレクトリ
pattern: ファイルのパターン(デフォルト: *.csv)
Returns:
統合されたDataFrame
\"\"\"
file_list = glob.glob(os.path.join(directory_path, pattern))
if not file_list:
raise FileNotFoundError(f'No files found in {directory_path}')
dataframes = []
for file_path in sorted(file_list):
try:
# ファイル名から日付を抽出する場合もある
df = pd.read_csv(
file_path,
encoding='utf-8',
dtype={'customer_id': str, 'product_code': str} # 型を明示的に指定
)
df['source_file'] = Path(file_path).name # ファイル追跡用
dataframes.append(df)
print(f'Successfully loaded: {Path(file_path).name}')
except Exception as e:
print(f'Error loading {file_path}: {e}')
continue
# DataFrameを縦に統合
combined_df = pd.concat(dataframes, ignore_index=True)
return combined_df
# 使用例
df = load_and_combine_csv_files('./data/daily_reports')
print(f'Combined shape: {df.shape}')
print(df.head())
パターン2:データクリーニングと前処理
実務では、読み込んだデータがそのまま使える状態にあることは稀です。欠損値、重複、異常値への対処が必須です。
def clean_and_preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
\"\"\"
実務でよく必要になるデータクリーニング処理
\"\"\"
df = df.copy() # 元データを保護
# 1. 重複行の削除(全列で重複判定)
initial_rows = len(df)
df = df.drop_duplicates(subset=None, keep='first')
print(f'Removed {initial_rows - len(df)} duplicate rows')
# 2. 欠損値の確認
missing_info = df.isnull().sum()
if missing_info.any():
print(f'Missing values found:\\n{missing_info[missing_info > 0]}')
# 3. 欠損値の処理(戦略によって異なる)
# 数値列:平均値で補填
numeric_columns = df.select_dtypes(include=['number']).columns
for col in numeric_columns:
if df[col].isnull().any():
df[col].fillna(df[col].mean(), inplace=True)
# カテゴリ列:'Unknown'で補填
categorical_columns = df.select_dtypes(include=['object']).columns
for col in categorical_columns:
if df[col].isnull().any():
df[col].fillna('Unknown', inplace=True)
# 4. 日付列の変換
if 'order_date' in df.columns:
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
# 5. 異常値の検出(数値列の外れ値)
for col in numeric_columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
if len(outliers) > 0:
print(f'Found {len(outliers)} outliers in {col}')
# 6. 文字列列の空白を削除
for col in categorical_columns:
df[col] = df[col].str.strip()
# 7. 不要な列の削除
columns_to_drop = [col for col in df.columns if df[col].isnull().sum() > len(df) * 0.5]
if columns_to_drop:
print(f'Dropping columns with >50% missing: {columns_to_drop}')
df = df.drop(columns=columns_to_drop)
return df
# 使用例
df_clean = clean_and_preprocess_data(df)
print(f'Cleaned data shape: {df_clean.shape}')
パターン3:複雑な集計と分析
実務では、複数の条件で集計し、複数の集計関数を同時に適用することが多いです。
def aggregate_sales_data(df: pd.DataFrame) -> pd.DataFrame:
\"\"\"
営業データの複雑な集計例
\"\"\"
# グループ化と複数の集計関数の同時適用
aggregation_dict = {
'amount': ['sum', 'count', 'mean', 'min', 'max'],
'quantity': ['sum', 'mean'],
'customer_id': 'nunique'
}
# 月別・商品別の集計
df['year_month'] = df['order_date'].dt.to_period('M')
summary = df.groupby(['year_month', 'product_category']).agg(aggregation_dict)
# マルチレベルインデックスを整理
summary.columns = ['_'.join(col).strip() for col in summary.columns.values]
summary = summary.reset_index()
# カラム名を分かりやすく
summary.columns = ['year_month', 'product_category', 'total_amount', 'transaction_count',
'avg_amount', 'min_amount', 'max_amount', 'total_quantity',
'avg_quantity', 'unique_customers']
return summary
# より複雑な集計パターン:ピボットテーブル
def create_pivot_summary(df: pd.DataFrame) -> pd.DataFrame:
\"\"\"
ピボットテーブルで商品別×月別の売上を作成
\"\"\"
df['year_month'] = df['order_date'].dt.to_period('M')
pivot_df = df.pivot_table(
values='amount',
index='year_month',
columns='product_category',
aggfunc='sum',
fill_value=0,
margins=True # 合計行を追加
)
return pivot_df
# 使用例
summary_df = aggregate_sales_data(df_clean)
print(summary_df.head(10))
pivot_df = create_pivot_summary(df_clean)
print(pivot_df)
パターン4:新しい列の動的生成と変換
実務では、既存の列から新しい情報を生成することがよくあります。
def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame:
\"\"\"
既存のデータから新しい有用な列を生成
\"\"\"
df = df.copy()
# 1. 日付関連の列を生成
df['order_date'] = pd.to_datetime(df['order_date'])
df['year'] = df['order_date'].dt.year
df['month'] = df['order_date'].dt.month
df['quarter'] = df['order_date'].dt.quarter
df['day_of_week'] = df['order_date'].dt.day_name()
df['is_weekend'] = df['order_date'].dt.dayofweek.isin([5, 6])
# 2. 金額関連の計算
df['tax'] = df['amount'] * 0.1
df['total_with_tax'] = df['amount'] + df['tax']
# 3. カテゴリの統一と変換
df['product_category'] = df['product_category'].str.upper().str.strip()
# 4. 顧客の分類(RFM分析など)
customer_stats = df.groupby('customer_id').agg({
'order_date': 'max',
'amount': ['sum', 'count']
}).reset_index()
customer_stats.columns = ['customer_id', 'last_purchase_date', 'total_spent', 'purchase_count']
# 顧客セグメントの定義
def segment_customer(row):
if row['total_spent'] > 100000:
return 'VIP'
elif row['total_spent'] > 50000:
return 'Gold'
elif row['total_spent'] > 10000:
return 'Silver'
else:
return 'Bronze'
customer_stats['customer_segment'] = customer_stats.apply(segment_customer, axis=1)
# 元のDataFrameに結合
df = df.merge(customer_stats[['customer_id', 'customer_segment']],
on='customer_id', how='left')
return df
# 使用例
df_enriched = enrich_dataframe(df_clean)
print(df_enriched[['order_date', 'year', 'month', 'quarter', 'customer_segment']].head())
パターン5:データの検証とレポート生成
実務では、処理の各段階でデータが正しいか確認する必要があります。
def validate_and_report(df: pd.DataFrame) -> dict:
\"\"\"
データの品質を検証し、レポートを生成
\"\"\"
validation_report = {}
# 1. 行数と列数
validation_report['total_rows'] = len(df)
validation_report['total_columns'] = len(df.columns)
# 2. 欠損値の比率
missing_ratio = (df.isnull().sum() / len(df) * 100).round(2)
validation_report['missing_values'] = missing_ratio[missing_ratio > 0].to_dict()
# 3. データ型の確認
validation_report['data_types'] = df.dtypes.astype(str).to_dict()
# 4. 数値列の統計情報
validation_report['numeric_statistics'] = df.describe().to_dict()
# 5. カテゴリ列のユニーク値数
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
unique_count = df[col].nunique()
validation_report[f'{col}_unique_count'] = unique_count
# 6. 日付の範囲確認
date_cols = df.select_dtypes(include=['datetime64']).columns
for col in date_cols:
validation_report[f'{col}_min'] = df[col].min()
validation_report[f'{col}_max'] = df[col].max()
# 7. ビジネスルールの検証
if 'amount' in df.columns:
negative_amounts = len(df[df['amount'] < 0])
validation_report['negative_amounts'] = negative_amounts
if 'quantity' in df.columns:
zero_quantities = len(df[df['quantity'] == 0])
validation_report['zero_quantities'] = zero_quantities
return validation_report
# 使用例
report = validate_and_report(df_enriched)
for key, value in report.items():
print(f'{key}: {value}')
よくある応用パターン
パターン1:Excelファイルへの書き込みと複数シート
実務では、分析結果をExcelファイルで納品することが多いです。複数シートを含むファイル生成は頻出パターンです。
def export_to_excel(df_dict: dict, output_path: str):
\"\"\"
複数のDataFrameを複数シートを持つExcelファイルに書き込み
Args:
df_dict: {'シート名': DataFrame} の辞書
output_path: 出力ファイルパス
\"\"\"
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for sheet_name, df in df_dict.items():
df.to_excel(writer, sheet_name=sheet_name, index=False)
# シートの列幅を自動調整する場合
worksheet = writer.sheets[sheet_name]
for idx, col in enumerate(df.columns):
max_length = max(
df[col].astype(str).map(len).max(),
len(col)
)
worksheet.column_dimensions[chr(65 + idx)].width = min(max_length + 2, 50)
# 使用例
output_dict = {
'売上集計': summary_df,
'ピボット': pivot_df.reset_index(),
'顧客分析': df_enriched[['customer_id', 'customer_segment', 'amount']].drop_duplicates(),
'データ品質': pd.DataFrame(report.items(), columns=['Item', 'Value'])
}
export_to_excel(output_dict, './output/analysis_report.xlsx')
パターン2:時系列データの処理
売上や在庫など、時系列データの処理は実務で頻繁に登場します。
def process_timeseries_data(df: pd.DataFrame) -> pd.DataFrame:
\"\"\"
時系列データの処理と分析
\"\"\"
df = df.copy()
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date')
# 移動平均の計算(トレンド把握)
df['moving_avg_7d'] = df['amount'].rolling(window=7, min_periods=1).mean()
df['moving_avg_30d'] = df['amount'].rolling(window=30, min_periods=1).mean()
# 前日比の計算
df['daily_change'] = df['amount'].diff()
df['daily_change_pct'] = df['amount'].pct_change() * 100
# 前年同日比(年次比較)
df['year'] = df['date'].dt.year
df['month_day'] = df['date'].dt.strftime('%m-%d')
# 季節性の分析
df['season'] = df['date'].dt.month.apply(
lambda x: 'Q1' if x <= 3 else ('Q2' if x <= 6 else ('Q3' if x <= 9 else 'Q4'))
)
return df
# 使用例
df_timeseries = process_timeseries_data(df_clean)
print(df_timeseries[['date', 'amount', 'moving_avg_7d', 'daily_change_pct']].head(20))
パターン3:条件付きの複雑なフィルタリング
複数の条件でデータをフィルタリングすることは日常茶飯事です。
def complex_filtering(df: pd.DataFrame) -> dict:
\"\"\"
複数の条件でデータをフィルタリング
\"\"\"
results = {}
# 条件1:特定期間の高額取引
df['order_date'] = pd.to_datetime(df['order_date'])
high_value_recent = df[
(df['order_date'] >= '2024-01-01') &
(df['amount'] > df['amount'].quantile(0.75))
]
results['high_value_recent'] = high_value_recent
# 条件2:特定カテゴリで購入回数が多い顧客
customer_counts = df.groupby('customer_id').size()
frequent_customers = customer_counts[customer_counts > 10].index
frequent_category_buyers = df[
(df['customer_id'].isin(frequent_customers)) &
(df['product_category'].isin(['Electronics', 'Home']))
]
results['frequent_category_buyers'] = frequent_category_buyers
# 条件3:異常値を除外したデータ
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
normal_range = df[
(df['amount'] >= Q1 - 1.5 * IQR) &
(df['amount'] <= Q3 + 1.5 * IQR)
]
results['normal_range'] = normal_range
return results
# 使用例
filtered_results = complex_filtering(df_enriched)
for key, result_df in filtered_results.items():
print(f'{key}: {len(result_df)} rows')
実務での注意点
1. メモリ効率を考慮した大規模データ処理
大規模なCSVやデータベースからデータを読む際、全データをメモリに読み込むと問題が発生します。チャンク処理を検討しましょう。
def read_large_csv_in_chunks(filepath: str, chunksize: int = 10000):
\"\"\"
大規模CSVをチャンク単位で読み込み処理
\"\"\"
chunks = []
for chunk in pd.read_csv(filepath, chunksize=chunksize):
# 各チャンクに対して処理
chunk_cleaned = chunk.dropna()
chunk_cleaned['processed'] = True
chunks.append(chunk_cleaned)
# 必要に応じてチャンクを統合
df = pd.concat(chunks, ignore_index=True)
return df
2. インデックスの適切な設定
大規模なマージ操作を行う場合、インデックスを設定することで処理速度が大幅に向上します。
# 不効率な書き方
result = df1.merge(df2, on='customer_id', how='left')
# より効率的な書き方
df1 = df1.set_index('customer_id')
df2 = df2.set_index('customer_id')
result = df1.join(df2, how='left')
result = result.reset_index()
3. コピーとビューの違いの理解
DataFrameの変更が思わぬ副作用を起こすことがあります。必要に応じて明示的にコピーしましょう。
# 元のDataFrameを保護
df_working = df.copy() # ディープコピー
# 特定の列だけが必要な場合
df_subset = df[['col1', 'col2']].copy()
4. チェーンの終わりでコピーを取る
複数の操作を連鎖させる場合、最後にコピーを取ることでパフォーマンスを向上させます。
# 効率的なチェーン処理
df = (df
.drop(columns=['unnecessary'])
.fillna(0)
.query('amount > 100')
.assign(year=lambda x: x['date'].dt.year)
.copy() # チェーンの最後でコピー
)
5. エラーハンドリングの重要性
実務では、予期しないデータ形式やエラーに対処する必要があります。
def safe_data_processing(filepath: str) -> pd.DataFrame:
\"\"\"
エラーハンドリングを含むロバストなデータ処理
\"\"\"
try:
df = pd.read_csv(filepath, encoding='utf-8')
except UnicodeDecodeError:
print('UTF-8でのデコード失敗。Shift-JISを試行します')
df = pd.read_csv(filepath, encoding='shift_jis')
except FileNotFoundError:
print(f'ファイルが見つかりません: {filepath}')
return pd.DataFrame()
except Exception as e:
print(f'予期しないエラーが発生しました: {e}')
return pd.DataFrame()
# データ検証
required_columns = ['customer_id', 'amount', 'date']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f'必須カラムが見つかりません: {missing_columns}')
return df
6. 命名規則と可読性
実務では、後で自分や他のメンバーがコードを読むことを想定しましょう。変数名は明確に。
# 悪い例
df = df[df['a'] > 100]
x = df.groupby('b')['c'].sum()
# 良い例
high_value_transactions = df[df['amount'] > 100]
sales_by_category = high_value_transactions.groupby('product_category')['amount'].sum()
パフォーマンス最適化のコツ
実務では、処理速度も重要です。以下のポイントに注意しましょう。
- dtypeの明示的指定:読み込み時に型を指定することで高速化
- 不要な列の早期削除:処理を進める前に不要な列を削除
- ジェネレータとitertuples()の活用:ループが必要な場合はapplyより高速
- カテゴリ型の活用:メモリ効率が良く、処理も高速
- クエリの使用:複雑なフィルタリングはquery()メソッドが効率的
# 高速な行処理の例
for idx, row in df.itertuples(index=False):
# 処理
pass
# または
for row in df.values:
# 処理
pass
# カテゴリ型の活用
df['product_category'] = df['product_category'].astype('category')
# queryメソッドの活用
result = df.query('amount > @threshold and date >= @start_date')
まとめ
pandas DataFrameは、Pythonでのデータ処理に不可欠なツールです。本記事で紹介した実装パターンは、実務で実際に頻繁に遭遇するシーンばかりです。
重要なポイントをもう一度:
- 前処理が8割:分析の質は、データの品質に大きく依存します
- エラーハンドリング:予期しないデータに対応する余裕を持たせましょう
- パフォーマンス意識:大規模データを扱う際は、メモリと処理速度を常に念頭に
- コードの可読性:自分や他のチームメンバーが後で理解できるコードを書く
- ドキュメント化:関数にはdocstringを付け、複雑なロジックにはコメントを付ける
これらの実装パターンとベストプラクティスをマスターすれば、実務でのデータ処理作業は格段に効率化されます。最初は複雑に感じるかもしれませんが、繰り返し使うことで自然と身につきます。実際のプロジェクトで試しながら、自分のコーディングスタイルを確立していってください。

