こんにちは。JMDCの開発本部 データウェアハウス開発部の金です。
今年、JMDCではアドベントカレンダーに参加しています。
本記事は、JMDC Advent Calendar 2025 16日目の記事です。
目次
はじめに
この記事では、数十万の小さなCSVファイルをAWS Glue(PySpark)で処理する際に、Spark SQLの活用とJupyter Notebookでのローカル開発環境を導入することで、パフォーマンスと開発速度を同時に向上させた事例を紹介します。
1. 背景・概要
前提条件と課題
今回扱うデータは、フォーマット化済みのCSVファイルです。総データ量は数TB、ファイル数は数十万〜数百万に及びます。この処理において、以下の3つの課題に直面していました。 以下のコード例はサンプルとなります。
- 課題1:数十万ファイルの処理(パフォーマンス)
- S3 APIコールが膨大になり、オーバーヘッドやエラーが発生しやすい。
- 課題2:開発コストとサイクル(開発効率)
- コード修正のたびにS3アップロードとGlue実行(1回15-20分)が必要。
- デバッグが困難で、テストコスト(時間・課金)がかさむ。
- 課題3:コードの可読性と保守性(品質)
- DataFrame APIのメソッドチェーンが複雑化し、可読性が低下。
- PySpark独自の学習コストが高く、チーム開発の障壁になる。
これらの課題に対し、「実装面(Spark SQL)」と「環境面(ローカル開発)」の2つのアプローチで解決を図りました。
2. 実装の工夫:パフォーマンスと品質の向上
まずは、課題1(パフォーマンス)と課題3(品質)に対するアプローチです。
(1) CSV → Parquet変換とファイルサイズ制御(パフォーマンス向上)
S3上の大量の小ファイルを効率的に扱うため、まずはParquet形式への変換を行います。この際、Sparkの設定で読み込み・書き込みサイズを制御することで、タスク数の爆発を抑えました。
# ファイルサイズ制御 spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128MB spark.conf.set("spark.sql.files.maxRecordsPerFile", "1000000") # CSV読み込み → Parquet出力 spark.read.option("header", "true").csv(args['input_path']).write.mode("overwrite").parquet(args['output_path'])
(2) Spark SQLによる宣言的な変換(品質向上)
複雑なデータ変換処理において、PySparkのDataFrame APIではなく、Spark SQLを採用しました。
DataFrame vs Spark SQL
| 観点 | DataFrame API | Spark SQL |
|---|---|---|
| 可読性 | メソッドチェーンが長くなると読みにくい | SQLで直感的に理解可能 |
| 保守性 | Pythonコードに埋め込まれ変更が困難 | SQL部分を独立管理可能 |
| チーム開発 | PySpark習得が必要 | SQL知識があれば参加可能 |
| 複雑な処理 | ネストが深くなる | サブクエリで構造化可能 |
SQLを採用することで、ロジックをYAML設定ファイルとしてコードから分離し、SQLを知っているエンジニアであれば誰でもロジックを理解・修正できるようになりました。
実装例:SQL設定ファイルによる管理
# config/sql_config.yaml queries: user_events: sql: | SELECT id, user_id, event_date, CAST(value AS DECIMAL(10,2)) as value, DATE_FORMAT(event_date, 'yyyyMM') as year_month FROM source_data WHERE event_date IS NOT NULL AND value > 0 partition_by: year_month order_summary: sql: | SELECT order_id, customer_id, order_date, total_amount FROM source_data WHERE order_date >= '2024-01-01' partition_by: order_date
# glue_job/app.py(ポイント抜粋) # イベントタイプによる動的パス指定 event_type = args['event_type'] # 'user_events', 'order_summary' input_path = f"{args['input_path']}/{event_type}" output_path = f"{args['output_path']}/{event_type}" # 設定ファイルからSQL取得・実行 query_config = load_sql_config()['queries'][event_type] spark.read.parquet(input_path).createOrReplaceTempView("source_data") result = spark.sql(query_config['sql']) result.write.mode("overwrite").partitionBy(query_config['partition_by']).parquet(output_path)
実行例
# AWS環境での実行 python app.py --event_type user_events \ --input_path s3://bucket/data \ --output_path s3://bucket/output \ --env aws # ローカル環境での実行 python app.py --event_type order_summary \ --input_path ./tests/input \ --output_path ./tests/output \ --env local
3. 開発環境の工夫:ローカル開発による高速化
次に、課題2(開発効率)に対するアプローチです。AWS Glueにデプロイせずとも、手元で動作確認できる環境を構築しました。
(1) AWS/ローカルの自動実行環境切り替え
コードを修正することなく、ローカル環境とAWS Glue環境の両方で動作させるため、SparkSessionの生成ロジックを抽象化しました。
def create_spark_session(env): if env == "aws": # コマンドライン引数で判定 from awsglue.context import GlueContext sc = SparkContext() return GlueContext(sc).spark_session else: # ローカル環境 return SparkSession.builder \ .appName("LocalTest") \ .master("local[*]") \ .getOrCreate()
これにより、--env aws または --env local のコマンドライン引数で環境を切り替えられるようになり、開発時はローカルで高速に試行錯誤し、本番はそのままGlueで実行というフローが確立できました。
(2) Docker Composeによる環境構築
開発メンバーがすぐに環境を再現できるよう、Jupyter Notebookを含んだDocker環境を用意しました。
docker-compose.yaml
version: '3.8' services: jupyter: build: ./glue_job # Dockerfileを利用して独自イメージをビルド ports: - "8888:8888" volumes: - ./glue_job:/home/jovyan/work - ./tests:/home/jovyan/tests environment: - JUPYTER_ENABLE_LAB=yes command: start-notebook.sh --NotebookApp.token=''
また、外部Pythonライブラリを追加したい場合は、requirements.txtに必要なパッケージを記載し、Dockerfileで自動的にインストールされる仕組みになっています。
Dockerfile(抜粋)
FROM jupyter/pyspark-notebook:latest COPY requirements.txt /tmp/requirements.txt RUN pip install --no-cache-dir --cert /etc/ssl/certs/ca-certificates.crt -r /tmp/requirements.txt
requirements.txt(例)
boto3 pytest
このように、requirements.txtにパッケージを追加することで、Docker環境構築時に自動でインストールされます。
ビルド/起動
docker-compose up -d # http://localhost:8888 でJupyter起動
(3) pytestによる単体テストとカバレッジ測定
ローカルでPySparkが動くようになったことで、pytest を用いた単体テストが可能になりました。
# tests/test_transform.py @pytest.fixture(scope="session") def spark(): return SparkSession.builder.master("local[*]").getOrCreate() def test_user_events_transform(spark): # テストデータ作成 → SQL実行 → 検証 test_data = [("1", "user1", "2024-01-01", "100.0")] df = spark.createDataFrame(test_data, ["id", "user_id", "event_date", "value"]) df.createOrReplaceTempView("source_data") config = load_sql_config() result = spark.sql(config['queries']['user_events']['sql']) assert result.count() == 1 assert result.first()["year_month"] == "202401"
カバレッジ測定
# カバレッジ測定実行 pytest --cov=glue_job --cov-report=html tests/ # カバレッジレポート確認 # htmlcov/index.html を開く
これにより、Glueへのデプロイ前にロジックの正確性を担保でき、デバッグ時間が大幅に短縮されました。
4. まとめ
今回の改善により、以下の成果が得られました。
改善効果と解決策
| 課題 | 項目 | 改善前 | 改善後 |
|---|---|---|---|
| 課題1:数十万ファイルの処理 | ファイル数 | 数十万 | 数百 |
| S3 APIコール | 膨大(数十万回) | 大幅削減(数百回) | |
| 処理の安定性 | S3 APIアクセスエラー頻発 | 安定した高速処理 | |
| 課題2:開発コストとサイクル | 開発サイクル | 15-20分 | 数秒 |
| テストコスト | Glue課金 | $0 | |
| 単体テスト | 実行困難 | pytestで実行可能 | |
| カバレッジ測定 | 測定不可 | pytest-covで測定可能 | |
| 課題3:コードの可読性と保守性 | コードの可読性 | DataFrame APIで複雑 | SQLで直感的 |
| チーム開発 | PySpark習得が必要 | SQL知識で参加可能 |
明日17日目は、関田さんによる「ヘルスビッグデータ分析サービスの複数のプロダクトを統合した話」です。お楽しみに!
JMDCでは、ヘルスケア領域の課題解決に一緒に取り組んでいただける方を積極採用中です!フロントエンド /バックエンド/ データベースエンジニア等、様々なポジションで募集をしています。詳細は下記の募集一覧からご確認ください。
まずはカジュアルにJMDCメンバーと話してみたい/経験が活かせそうなポジションの話を聞いてみたい等ございましたら、下記よりエントリーいただけますと幸いです。
★最新記事のお知らせはぜひ X(Twitter)、またはBlueskyをご覧ください!