JMDC TECH BLOG

JMDCのエンジニアブログです

Glueジョブ開発の実践:PySpark + Jupyterによる大量データ変換と単体テスト

こんにちは。JMDCの開発本部 データウェアハウス開発部の金です。

今年、JMDCではアドベントカレンダーに参加しています。

qiita.com

本記事は、JMDC Advent Calendar 2025 16日目の記事です。

目次


はじめに

この記事では、数十万の小さなCSVファイルをAWS Glue(PySpark)で処理する際に、Spark SQLの活用とJupyter Notebookでのローカル開発環境を導入することで、パフォーマンスと開発速度を同時に向上させた事例を紹介します。

1. 背景・概要

前提条件と課題

今回扱うデータは、フォーマット化済みのCSVファイルです。総データ量は数TB、ファイル数は数十万〜数百万に及びます。この処理において、以下の3つの課題に直面していました。 以下のコード例はサンプルとなります。

  • 課題1:数十万ファイルの処理(パフォーマンス)
    • S3 APIコールが膨大になり、オーバーヘッドやエラーが発生しやすい。
  • 課題2:開発コストとサイクル(開発効率)
    • コード修正のたびにS3アップロードとGlue実行(1回15-20分)が必要。
    • デバッグが困難で、テストコスト(時間・課金)がかさむ。
  • 課題3:コードの可読性と保守性(品質)
    • DataFrame APIのメソッドチェーンが複雑化し、可読性が低下。
    • PySpark独自の学習コストが高く、チーム開発の障壁になる。

これらの課題に対し、「実装面(Spark SQL)」と「環境面(ローカル開発)」の2つのアプローチで解決を図りました。

2. 実装の工夫:パフォーマンスと品質の向上

まずは、課題1(パフォーマンス)と課題3(品質)に対するアプローチです。

(1) CSV → Parquet変換とファイルサイズ制御(パフォーマンス向上)

S3上の大量の小ファイルを効率的に扱うため、まずはParquet形式への変換を行います。この際、Sparkの設定で読み込み・書き込みサイズを制御することで、タスク数の爆発を抑えました。

# ファイルサイズ制御
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728")  # 128MB
spark.conf.set("spark.sql.files.maxRecordsPerFile", "1000000")

# CSV読み込み → Parquet出力
spark.read.option("header", "true").csv(args['input_path']).write.mode("overwrite").parquet(args['output_path'])

(2) Spark SQLによる宣言的な変換(品質向上)

複雑なデータ変換処理において、PySparkのDataFrame APIではなく、Spark SQLを採用しました。

DataFrame vs Spark SQL

観点 DataFrame API Spark SQL
可読性 メソッドチェーンが長くなると読みにくい SQLで直感的に理解可能
保守性 Pythonコードに埋め込まれ変更が困難 SQL部分を独立管理可能
チーム開発 PySpark習得が必要 SQL知識があれば参加可能
複雑な処理 ネストが深くなる サブクエリで構造化可能

SQLを採用することで、ロジックをYAML設定ファイルとしてコードから分離し、SQLを知っているエンジニアであれば誰でもロジックを理解・修正できるようになりました。

実装例:SQL設定ファイルによる管理

# config/sql_config.yaml
queries:
  user_events:
    sql: |
      SELECT 
        id,
        user_id,
        event_date,
        CAST(value AS DECIMAL(10,2)) as value,
        DATE_FORMAT(event_date, 'yyyyMM') as year_month
      FROM source_data
      WHERE event_date IS NOT NULL AND value > 0
    partition_by: year_month
  
  order_summary:
    sql: |
      SELECT 
        order_id,
        customer_id,
        order_date,
        total_amount
      FROM source_data
      WHERE order_date >= '2024-01-01'
    partition_by: order_date
# glue_job/app.py(ポイント抜粋)

# イベントタイプによる動的パス指定
event_type = args['event_type']  # 'user_events', 'order_summary'
input_path = f"{args['input_path']}/{event_type}"
output_path = f"{args['output_path']}/{event_type}"

# 設定ファイルからSQL取得・実行
query_config = load_sql_config()['queries'][event_type]
spark.read.parquet(input_path).createOrReplaceTempView("source_data")
result = spark.sql(query_config['sql'])
result.write.mode("overwrite").partitionBy(query_config['partition_by']).parquet(output_path)

実行例

# AWS環境での実行
python app.py --event_type user_events \
  --input_path s3://bucket/data \
  --output_path s3://bucket/output \
  --env aws

# ローカル環境での実行
python app.py --event_type order_summary \
  --input_path ./tests/input \
  --output_path ./tests/output \
  --env local

3. 開発環境の工夫:ローカル開発による高速化

次に、課題2(開発効率)に対するアプローチです。AWS Glueにデプロイせずとも、手元で動作確認できる環境を構築しました。

(1) AWS/ローカルの自動実行環境切り替え

コードを修正することなく、ローカル環境とAWS Glue環境の両方で動作させるため、SparkSessionの生成ロジックを抽象化しました。

def create_spark_session(env):
  if env == "aws":  # コマンドライン引数で判定
    from awsglue.context import GlueContext
    sc = SparkContext()
    return GlueContext(sc).spark_session
  else:
    # ローカル環境
    return SparkSession.builder \
      .appName("LocalTest") \
      .master("local[*]") \
      .getOrCreate()

これにより、--env aws または --env local のコマンドライン引数で環境を切り替えられるようになり、開発時はローカルで高速に試行錯誤し、本番はそのままGlueで実行というフローが確立できました。

(2) Docker Composeによる環境構築

開発メンバーがすぐに環境を再現できるよう、Jupyter Notebookを含んだDocker環境を用意しました。

docker-compose.yaml

version: '3.8'
services:
  jupyter:
    build: ./glue_job  # Dockerfileを利用して独自イメージをビルド
    ports:
      - "8888:8888"
    volumes:
      - ./glue_job:/home/jovyan/work
      - ./tests:/home/jovyan/tests
    environment:
      - JUPYTER_ENABLE_LAB=yes
    command: start-notebook.sh --NotebookApp.token=''

また、外部Pythonライブラリを追加したい場合は、requirements.txtに必要なパッケージを記載し、Dockerfileで自動的にインストールされる仕組みになっています。

Dockerfile(抜粋)

FROM jupyter/pyspark-notebook:latest
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --cert /etc/ssl/certs/ca-certificates.crt -r /tmp/requirements.txt

requirements.txt(例)

boto3
pytest

このように、requirements.txtにパッケージを追加することで、Docker環境構築時に自動でインストールされます。

ビルド/起動

docker-compose up -d
# http://localhost:8888 でJupyter起動

(3) pytestによる単体テストとカバレッジ測定

ローカルでPySparkが動くようになったことで、pytest を用いた単体テストが可能になりました。

# tests/test_transform.py
@pytest.fixture(scope="session")
def spark():
    return SparkSession.builder.master("local[*]").getOrCreate()

def test_user_events_transform(spark):
    # テストデータ作成 → SQL実行 → 検証
    test_data = [("1", "user1", "2024-01-01", "100.0")]
    df = spark.createDataFrame(test_data, ["id", "user_id", "event_date", "value"])
    df.createOrReplaceTempView("source_data")
    
    config = load_sql_config()
    result = spark.sql(config['queries']['user_events']['sql'])
    
    assert result.count() == 1
    assert result.first()["year_month"] == "202401"

カバレッジ測定

# カバレッジ測定実行
pytest --cov=glue_job --cov-report=html tests/

# カバレッジレポート確認
# htmlcov/index.html を開く

これにより、Glueへのデプロイ前にロジックの正確性を担保でき、デバッグ時間が大幅に短縮されました。

4. まとめ

今回の改善により、以下の成果が得られました。

改善効果と解決策

課題 項目 改善前 改善後
課題1:数十万ファイルの処理 ファイル数 数十万 数百
S3 APIコール 膨大(数十万回) 大幅削減(数百回)
処理の安定性 S3 APIアクセスエラー頻発 安定した高速処理
課題2:開発コストとサイクル 開発サイクル 15-20分 数秒
テストコスト Glue課金 $0
単体テスト 実行困難 pytestで実行可能
カバレッジ測定 測定不可 pytest-covで測定可能
課題3:コードの可読性と保守性 コードの可読性 DataFrame APIで複雑 SQLで直感的
チーム開発 PySpark習得が必要 SQL知識で参加可能

明日17日目は、関田さんによる「ヘルスビッグデータ分析サービスの複数のプロダクトを統合した話」です。お楽しみに!

JMDCでは、ヘルスケア領域の課題解決に一緒に取り組んでいただける方を積極採用中です!フロントエンド /バックエンド/ データベースエンジニア等、様々なポジションで募集をしています。詳細は下記の募集一覧からご確認ください。

hrmos.co

まずはカジュアルにJMDCメンバーと話してみたい/経験が活かせそうなポジションの話を聞いてみたい等ございましたら、下記よりエントリーいただけますと幸いです。

hrmos.co

★最新記事のお知らせはぜひ X(Twitter)、またはBlueskyをご覧ください!

twitter.com

bsky.app