データウェアハウス開発部の高野です。現在はオンプレミスの電子カルテデータ基盤のAWS移行のプロジェクトに参画しています。
今年、JMDCではアドベントカレンダーに参加しています。
本記事は、JMDC Advent Calendar 2024 7日目の記事です。
はじめに
電子カルテデータ基盤のAWS移行を進めている中、オンプレミスの旧データ基盤のデータ移行が要件の1つとしてありました。AWSでは主なデータベースとしてAmazon Redshift Serverlessを採用しており、そちらに移行データを連携したい、データ移行に必要なデータ形式が様々だったことからデータ移行はAWS Glueジョブ(PySpark)を使って対応しました。データウェアハウス開発部ではSQLでのデータ変換が主流ですが、今回AWS Glueジョブ(PySpark)を使って良かった点について書いてみたいと思います。
Sparkは大容量のデータを複数のノードに処理対象を分散させ、高速で処理させることを目的としたフレームワークであり、PySparkはSparkを実行するためのPython APIとなります。 AWS GlueはAWSで提供されているサーバーレスのETLサービスです。ETLジョブとしてPySparkのジョブを作成することもでき、AWS Glueでは拡張機能も提供されています。 詳しくは以下を参照ください。
www.databricks.com www.databricks.com docs.aws.amazon.com
AWS Glue バージョンは、現時点で最新の4.0を使っています。
旧データ基盤の処理フローとデータ移行のポイント
以下の図のとおり、医療機関単位でETLを行っており、その後、データベースから顧客に納品するため条件に応じたデータセット(csv)を出力し、csvやExcelの変換リストを使い、データクレンジング処理をかけてクライアントに納品していました。
データ移行要件としては、データベースのデータに加えて、データクレンジングした結果も必要でした。
以上から、ETLの変換をした結果の変換後csvがデータベースのデータと同等であり、データクレンジングするベースでもあるので、それをAWSにアップロードしデータ移行に使おうと考えました。
必要なデータの調査等を進め、以下がデータ移行の主なポイントとなりました。
- データクレンジングの再現に必要な変換リストのデータ形式が様々
- 旧データ基盤のマスターテーブル(AWS Glueデータカタログ)
事業部門からAWS上に保管要望があり、Glueデータカタログ化する要件もあったのでそれをデータ移行でも活用したい - データセット出力後の変換リスト(csv、Excel)
Excelデータもなるべく手間なく活用したい
- 旧データ基盤のマスターテーブル(AWS Glueデータカタログ)
- 同じテーブルにロードした変換後csvでも医療機関ごとにカラムが不揃い
医療機関ごとに取得できる情報が異なる場合等もあり、ETLする上で取得できる項目のみを変換後csvにしていたため
医療機関ごとにテーブルにカラム指定でロードしていたため旧データ基盤上はそれで良かった - AWS上に構築する新データ基盤(Redshift Serverless)から移行データも活用できるようにする
なるべく手間なくできればベター
データ移行のポイントに対するAWS Glueジョブ(PySpark)での対応
DataFrameで必要なデータを簡単に組み合わせて処理できた
DataFrameとは名前付きの列を持つデータの分散コレクションです。概念的にはリレーショナルデータベースのテーブルやPythonのPandas DataFrameと似ていますが、Sparkで最適化が行われているものとなります。
変換後csvと組み合わせる必要があったデータとして、データクレンジングに必要だった以下の項目がありましたが、何れもDataFrameに変換して組み合わせることができました。
AWS Glueデータカタログ
AWS Glueデータカタログは、Amazon S3 データセットの構造メタデータ等(簡単に言うとテーブル定義)を保存するリポジトリです。
AWS Glueデータカタログのテーブルは、PySparkからも以下のサンプルコードのように一旦、AWS Glueの拡張機能であるDynamicFrameとして読み込んだ後にDataFrameに変換できます。
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) # Glueデータカタログのテーブルから必要なカラムをselectしたものをDataFrameとして返す def load_master(db_name, table_name, cols): dynamic_frame = glueContext.create_dynamic_frame_from_catalog(database=db_name, table_name=table_name) return dynamic_frame.toDF().select(cols) master_df = load_master("master_db", "master_table", ["id", "code", "name"])
Excelの変換リスト
PySparkでは、Pythonのライブラリも使えるメリットを活かして以下のサンプルコードのようにS3バケットに格納してあるExcelファイルをPandas DataFrameとして読み込んだ後にSpark DataFrameに変換できます。
from pyspark.context import SparkContext from awsglue.context import GlueContext import pandas as pd sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session excel_key = "s3://input_bucket/resorces/sample.xlsx" # pandasでExcelファイルの読み込み(1行目がヘッダの場合) pandas_df = pd.read_excel(excel_key, header=0) # spark DataFrameに変換 spark_df = spark.createDataFrame(pandas_df)
DataFrameでカラム不揃いのcsvファイル読み込み対応が簡単にできた
旧データ基盤の変換後csvを調査の結果、テーブルごとにcsvカラム数でパターン分けできたので、以下の対応で対処しました。
schemaをカラム数により使い分け
DataFrameでschema指定で読み込むと、先頭から順にcsvのカラムが割り当てられて、余ったカラムはNULLとなるので、StructFieldの順を変えて読み込むことで対処しました。
移行データのテーブルのカラム構成は、移行元データベースのテーブルのカラム(col_a~e)+クレンジング後のcol_b、col_eカラム(DataFrame読み込み時はNULLで後の処理で変換リストにより変換後の値を格納想定)、変換後csvは以下の2パターンだった場合、 DataFrame読み込み時にやりたいこととしては、以下の図のようなイメージです。(便宜上、csvファイル内容も表形式で表現しています。)
- 医療機関Aでは変換後csvにcol_a~e全てカラムが存在する
- 医療機関Bでは変換後csvにcol_a、b、eの3カラムのみ存在する
以下は上記の場合のサンプルコードとなります。想定外のカラム数だった場合は例外となるようにしています。
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # 入力csvのカラム数取得 def get_csv_col_count(csv_key): df = spark.read.format("csv").option("header", False).load(csv_key) return len(df.columns) # csvのカラム数に応じたカラムとその後ろに欠損カラム、移行処理で格納するカラムの順でStructFieldを構成 def get_csv_schema(csv_col_count): if csv_col_count == 5: csv_schema = StructType([ StructField('col_a', LongType(), True), StructField('col_b', StringType(), True), StructField('col_c', DecimalType(8, 4), True), StructField('col_d', IntegerType(), True), StructField('col_e', StringType(), True), StructField('cleansing_col_b', StringType(), True), StructField('cleansing_col_e', StringType(), True) ]) elif csv_col_count == 3: csv_schema = StructType([ StructField('col_a', LongType(), True), StructField('col_b', StringType(), True), StructField('col_e', StringType(), True), StructField('col_c', DecimalType(8, 4), True), StructField('col_d', IntegerType(), True), StructField('cleansing_col_b', StringType(), True), StructField('cleansing_col_e', StringType(), True) ]) else: csv_schema = None return csv_schema # カラム数からスキーマ情報を決定 input_csv_key = "s3://input_bucket/data/001/sample.csv" csv_col_count = get_csv_col_count(input_csv_key) csv_schema = get_csv_schema(csv_col_count) if csv_schema == None: raise Exception("想定されていないカラム数のcsvです。") migration_df = spark.read.format("csv").option("header", False).load(input_csv_key, schema=csv_schema)
Redshift Serverlessへの連携もスムーズにできた
昨年にAmazon Redshift でAWS Glueデータカタログの自動マウントができるようになったので、こちらを活用しました。
Glueジョブ(PySpark)では、処理後のデータはParquet形式でS3に出力まで実施し、移行対象の8種類のテーブルのAWS Glueデータカタログを作成しておけば、Redshift Severlessからクエリできる状態となったので、S3にデータ出力後のロード等の作業は不要で済みました。
おわりに
必要なデータを全てDataFrameとして読み込み、簡単に組み合わせて処理、不揃いなcsvの処理も吸収できたので、移行対象の8種類のテーブルに対して、8つのGlueジョブ(PySpark)を作成+そのジョブを医療機関単位で実行していくGlueジョブ(Python Shell)を作成するのみで比較的シンプルに対応できた印象です。当初データクレンジングの変換リストは全医療機関共通だったため、RedshiftでまとめてSQLで処理も考えましたが、それだと、より実装に手間がかかったのではと思います。(使用する各変換リストのテーブル化、変換後csvカラム数パターン分のテーブル作成×データ加工するクエリ実行×8種類のテーブル分・・等々)
今後もGlueジョブ(PySpark)を使えそうな場面があれば使っていきたいと思います。 最後までお読みいただきありがとうございました。
明日8日目は、山岡さんの「AWS Datasync利用時のS3のイベントについて」です。お楽しみに!
JMDCでは、ヘルスケア領域の課題解決に一緒に取り組んでいただける方を積極採用中です! フロントエンド /バックエンド/ データベースエンジニア等、様々なポジションで募集をしています。 詳細は下記の募集一覧からご確認ください。 hrmos.co
まずはカジュアルにJMDCメンバーと話してみたい/経験が活かせそうなポジションの話を聞いてみたい等ございましたら、下記よりエントリーいただけますと幸いです。 hrmos.co
★最新記事のお知らせはぜひ X(Twitter)、またはBlueskyをご覧ください! Tweets by jmdc_tech twitter.com