JMDC TECH BLOG

JMDCのエンジニアブログです

dbtをデータ基盤のデータ品質維持に活用できないかを試してみた

はじめに

データウェアハウス開発部の高野です。
オンプレミスの電子カルテデータ基盤のAWS移行のプロジェクトに参画しておりますが、開発も終盤となり、データ基盤を運用するフェーズも考えるタイミングになってきました。
運用する上で重要な事の1つとしてデータ品質の維持があります。製薬企業様、医療機器メーカー様を中心に幅広くご活用いただいているデータであるため、最重要と言っても良いものです。 そのデータ品質の維持に、近年、多くの企業様でも活用されているdbtを電子カルテデータ基盤独特の要件がある中、活用できないかを試してみました。

dbtとは

dbtとはSQLによるデータ変換や、データ品質チェック、ドキュメンテーションを効率的に行うためのOSSです。 zenn.dev 以下の記事を拝見し、データ品質チェックだけでも活用が出来そうということが今回のきっかけにもなりました。 zenn.dev

dbtのテストには以下の2種類があります。

  • singularテスト
    存在してはいけない行をSELECTするクエリを作り、行がSELECTされたらテスト失敗、SELECTされなければテスト成功となるテストです。既に活用されているクエリがあれば、容易に移行もできそうなテストです。
  • genericテスト
    dbtに用意されているuniquenot null等、汎用的に、テーブル、カラムに対してテストをschema.ymlに定義して実行できるものになります。テストとして生成されるクエリはsingularテストと同様に存在してはいけない行をSELECTするものとなります。また、dbtのPackageも公開されていて、例えばdbt-labs/dbt-utilsを利用して使えるgenericテストが増やせたり、自作のgenericテストを追加したりも可能です。 zenn.dev docs.getdbt.com

前提

電子カルテデータ基盤の構成について

データベース

データベースとしてはAmazon Redshift Serverless(以下Redshift Serverless)を採用しており、dbtにもRedshiftのアダプタがあり、Redshift Serverlessにも接続可能です。 aws.amazon.com docs.getdbt.com

テーブル構成

弊社では締め処理と呼んでいますが、月1回サービス部門向けに最新のデータマートを提供しています。
実際サービス部門がそのデータを使うのは翌月以降なので、ある程度、過去のデータマートに対する問い合わせ等にも対応できるようにしておく必要がありました。
開発メンバーにて検討した結果、以下の例のように同一スキーマ内でテーブル名にprefixを付けてどの締め処理で作られたテーブルかを区別することとなりました。
この前提が今回dbt活用にあたり大きなポイントとなります。

テーブル名の例

pre00001_drug_table
pre00001_disease_table
pre00002_drug_table
pre00002_disease_table
 ・
 ・

使ったdbt Package

以下のパッケージを使っています。
packages.yml

packages:
  - package: dbt-labs/redshift
    version: 0.9.0

dbt-labs/redshiftdbt-labs/dbt-utilsを使っているため、dbt depsコマンド実行時、dbt-labs/dbt-utilsも併せて取得されます。 docs.getdbt.com

実現したいこと

以下の2点となります。

①同じテーブル定義のテーブルに対して同じデータ品質チェックを行いたい

どの締め処理で作られたテーブルかを区別するためにテーブル名にprefixを付加して分けていますが、テーブル定義は同じものとなるので、データ品質チェックは同じものを使えるのが運用上望ましいです。

②抽出結果をcsvでデータ出力したい

データ品質を他部門向けに可視化したり、確認を依頼することがあります。そのような際にある程度集計したデータを抽出し、そのまま提供、もしくはスプレッドシート等で可視化したりしますが、それに使えるデータをcsv出力したいです。

試してみた

①同じテーブル定義のテーブルに対して同じデータ品質チェックを行いたい

modelのaliasとコマンド実行時に指定するvariablesで対応できました。

docs.getdbt.com docs.getdbt.com

具体的には以下のサンプルのようなmodel、dbtコマンドを使います。
drug_table.sql(model)

{{
    config({
        "alias": var("table_prefix") ~ "drug_table"
    })
}}

ポイント

  • dbtで作っていない既にあるテーブルにテストをする場合、以下の記事のとおり、空のmodelファイルがあればテストは出来るが、空ではなくconfigaliasにvaliablesとテーブル名を連結したものを指定することにより、動的にテーブル名を変更できるようにした。 zenn.dev

例えば、以下のサンプルのようにschema.yml(modelファイルと同じディレクトリに配置)でテストを定義しておくこととします。
schema.yml(drug_table(model)のidカラムがuniqueであることのテストを定義する)

version: 2

models:
  - name: drug_table
    columns:
      - name: id
        data_tests:
          - unique

dbtコマンド(model指定、variables指定でtestを実行)

dbt test --model drug_table --vars '{"table_prefix": "pre00001_"}'

dbtコマンド実行結果(テスト成功の場合)

root@eaeb0e8b3e7e:/dbt/ops# dbt test --model drug_table --vars '{"table_prefix": "pre00001_"}'
08:19:36  Running with dbt=1.10.0-b1
08:19:36  target not specified in profile 'dwh', using 'default'
08:19:36  Registered adapter: redshift=1.9.2
08:19:37  Found 25 models, 1 test, 643 macros
08:19:37
08:19:37  Concurrency: 1 threads (target='default')
08:19:37
08:19:39  1 of 1 START test unique_drug_table_id ......................................... [RUN]
08:19:40  1 of 1 PASS unique_drug_table_id ............................................... [PASS in 1.58s]
08:19:42
08:19:42  Finished running 1 test in 0 hours 0 minutes and 4.14 seconds (4.14s).
08:19:42
08:19:42  Completed successfully
08:19:42
08:19:42  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1

コンパイル後のSQL

select
    id as unique_field,
    count(*) as n_records

from "dev"."public"."pre00001_drug_table"
where id is not null
group by id
having count(*) > 1

SQL上で、prefixが付加されたテーブル名に置き換わっているので、やりたいことは実現できました。

②抽出結果をcsvでデータ出力したい

Redshiftには、UNLOADコマンド(S3にクエリ結果を出力する)があります。 docs.aws.amazon.com dbtのPackageのdbt-labs/redshiftでUNLOADコマンドを実行するunload_tableマクロがあり、そちらを活用しました。 hub.getdbt.com

具体的には以下のサンプルのような抽出用のmodel、dbtコマンドを使います。
extract_drug_table.sql(model)

{{
    config({
        "materialized": "view",
        "post-hook": [
          "{{ redshift.unload_table(this.schema,
                                    this.table,
                                    s3_path='s3://' ~ env_var('OUTPUT_BACKET') ~ '/' ~ this.database ~ '/' ~ this.schema ~ '/' ~ var('table_prefix') ~ '/' ~ this.table ~ '_',
                                    iam_role=env_var('DB_IAM_ROLE'),
                                    header=True,
                                    delimiter=',',
                                    overwrite=True,
                                    parallel=False) }}"
        ]
    })
}}

select
  drug_id,
  count(*)
from
  {{ ref('drug_table') }}
group by
  drug_id

ポイント

  • modelのmaterializedtableまたはviewにする必要がある。 docs.getdbt.com
    • unload_tableマクロのExample usageにも記載のように、modelのpost-hookを利用してマクロを実行しますが、post-hookがtableviewといったデータベースオブジェクトを作るmodelでないと動作しない仕様(サンプルはviewで作成) github.com
  • クエリのfrom句で①のdrug_tablemodelを指定することによりaliasによりテーブル名が置換される
  • 出力先(unload_tableマクロの引数s3_path)に、環境変数やvaliables、this等のdbt変数を使うことにより、規則性を持ったS3オブジェクトキーとして出力できる。
    • thisは、modelのデータベース接続情報である、databaseschemaなどの情報をmodel内容に埋め込むことが可能なdbt Jinja functionsの1つ。 docs.getdbt.com
  • 1つのファイルで出力したいので、UNLOADコマンドにPARALLEL OFFオプションが指定されるようにマクロの引数を指定。
  • 他、ヘッダを出力する、出力時に使うIAMロール等、UNLOADコマンドで指定可能なものはマクロの引数に揃っているので適宜指定。

dbtコマンド(model指定、variables指定でrunを実行)

dbt run --model extract_drug_table --vars '{"table_prefix": "pre00001_"}'

dbtコマンド実行結果

root@bfc3163620a7:/dbt/ops# dbt run --model extract_drug_table --vars '{"table_prefix": "pre00001_"}'
00:47:09  Running with dbt=1.10.0-b1
00:47:09  target not specified in profile 'dwh', using 'default'
00:47:09  Registered adapter: redshift=1.9.2
00:47:11  Found 28 models, 2 data tests, 643 macros
00:47:11
00:47:11  Concurrency: 1 threads (target='default')
00:47:11
00:47:15  1 of 1 START sql view model public.extract_drug_table .......................... [RUN]
00:47:19  1 of 1 OK created sql view model public.extract_drug_table ..................... [SUCCESS in 3.22s]
00:47:20
00:47:20  Finished running 1 view model in 0 hours 0 minutes and 9.39 seconds (9.39s).
00:47:20
00:47:20  Completed successfully
00:47:20
00:47:20  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1

コンパイル後のSQL

select
  drug_id,
  count(*)
from
  "dev"."public"."pre00001_drug_table"
group by
  drug_id

dbtコマンド実行後にS3にも以下のようなkeyで出力されました。

AWSマネージメントコンソール(S3)での表示

ファイル名はUNLOADコマンドの仕様で末尾が「_000」になってしまうので、扱う際に必要に応じて「.csv」に変更するとか工夫は必要ですが、S3にcsvとして出力できたので、やりたいことを実現できました。

終わりに

実現したいことは一通り可能であり、dbtが汎用性の高いツールであることが改めて良く分かりました。データ品質維持の運用に活用できるイメージが出来たので、必要なテストを定義、作成やdbtのモニタリングツールであるelementaryも活用し、運用に乗せる準備を進めています。 docs.elementary-data.com

数年前にクラウド移行した医療機関のレセプト、DPC調査データを扱うデータ基盤では、この辺りの機能は自前で作りこんでいました。
techblog.jmdc.co.jp 以下のような機能も含めて作りこんでいましたが、これらはdbtとelementaryで実現できそうなので、テスト内容を作ることに集中でき、工数削減効果も期待できそうです。

  • クエリ雛形をメンテできる仕組み
  • 運用上のタイミングで必要なクエリを全て実行できる仕組み
  • 実行したクエリの履歴管理
  • 実行結果やエラーの通知

電子カルテデータ基盤としても、既に約470のテストを用意しており、genericテストを一部自作したり、汎用性の高いテストを多く活用すれば比較的短期間でデータ品質維持の基礎も構築可能でした。

またデータウェアハウス開発部としても、各データ基盤のRedshift Serverlessへの移行が進んでいるので、他、データマート範囲外のデータの抽出対応や開発時のテストへの活用など、活用できそうな機会をさらに模索できればと思っています。
少し特殊な使い方ではあるかもしれませんが、データパイプラインは既にあり、データ品質維持からでもdbtを活用してみたいという方の参考になれば幸いです。
最後までお読みいただきありがとうございました。

JMDCでは、ヘルスケア領域の課題解決に一緒に取り組んでいただける方を積極採用中です! フロントエンド /バックエンド/ データベースエンジニア等、様々なポジションで募集をしています。 詳細は下記の募集一覧からご確認ください。 hrmos.co

まずはカジュアルにJMDCメンバーと話してみたい/経験が活かせそうなポジションの話を聞いてみたい等ございましたら、下記よりエントリーいただけますと幸いです。 hrmos.co

★最新記事のお知らせはぜひ X(Twitter)、またはBlueskyをご覧ください! twitter.com

bsky.app