たそらぼ

日頃思ったこととかメモとか。

データメッシュに向けての勉強

今年はデータメッシュアーキテクチャの活用に向けての勉強や検証をしていきたいなと思っています。

データメッシュアーキテクチャは、Thoughtworks社のホームページで紹介されているような原則のもと、AWSはじめ多くのクラウドベンダーがそれを念頭に置いたサービスを出し始めています。

www.thoughtworks.com

dev.classmethod.jp


re:Invent2022に実際行ってみたところ、アナリティクス系のセッションは少なくとも自分が参加したものはほとんどデータメッシュか、データメッシュに関連した前提を伴ったもののように感じたので、データメッシュで使うような考え方や用語を理解しておかないとこれから出てくるサービスだったり活用は理解が甘くなってしまうだろうなーと思いました。

もちろんデータメッシュアーキテクチャが本当に適しているかはケースバイケースだとは思うものの、中央集権型のアーキテクチャしか選択肢にないよりは手札が多い方がいいし、中央集権型を取るにしてもそのアーキテクチャのメリットデメリットが比較することでより分かるので、視野を広く柔軟に捉えることができるはずです。

Thoughtworks社はメンバーがマイクロサービスを提唱した会社でもあり、データメッシュアーキテクチャの考え方も、それに関連性の高いドメイン駆動設計の考え方や言葉がよく使われています。

www.oreilly.co.jp

私は今までマイクロサービスやドメイン駆動設計は気になってはいたものの、まだ勉強はできてないので(エリック・エヴァンスのドメイン駆動設計は持ってるけどずっと積読してた)、遅ればせながらいよいよ準備する時がきたなーと思い、今年はしっかり理解していければなと思っています。

本を読めばいいという訳ではないけれど、インプットは大切なので、とりあえず以下は読んでみたいなと思っています。

データメッシュを採用することで、一般的には中央集権型の時と比べて、中央データレイク・データウェアハウスを担当するデータエンジニアなどの負担やそれに起因するボトルネックを緩和することができ、全体としてデータ活用のアジリティが向上するといわれています。実際のところ、今度はデータエンジニアは各ドメインのデータ製品開発を支援するためのツールを頑張って作り、自分たちでデータを活用できるように支援する必要があるし、非エンジニアも巻き込んでデータを活用していかないといけないので、これはこれで大変なこともあると思うけれど、実現するならみんなでより良いデータ活用ができる方向に向かっていると思うので、頑張ってやっていきたいなーと考えています。

AWS CodeCommitに入門した

Code三銃士のうち、AWS CodeCommitに入門した。

モチベーション

認定Developerアソシエイトの参考書を読んでいるとCodeCommitの設定方法が出てきたが、触ったことがないのでなかなか覚えられなかった。触ったことがないものは仕方ないので、実際に触ってみることにした。

AWS CodeCommitとは

AWSがホストしているプライベートGitレポジトリ。
aws.amazon.com

Gitレポジトリとして使えるほか、CodeCommitを起点に後続のCodeBuildなどに連携することができる。
※CodeBuildはソースとしてCodeCommit以外にもGithub、Bitbucket、S3にも対応しているので、CodeCommitが必須という訳ではない。

レポジトリを作成して、Pushしてみる。

実際に進めていくと分かるが、手順は結構丁寧に書いてあり、特に困ることはないので、安心してほしい。
詳細は、以下のドキュメントに日本語で丁寧に書いてあるのでいつでも参照できる。
Git 認証情報を使用した HTTPS ユーザーのセットアップ - AWS CodeCommit

それでは早速やっていこう。

レポジトリの作成

まず、AWS マネジメントコンソールにログインして、CodeCommitの画面を開く。「レポジトリを作成」からレポジトリを作成する。名前は適当に付けておく。
f:id:tasotasoso:20210612232640j:plain

「作成」を押すと作成される。とても簡単。

次に以下のような手順が表示されるので、これに従って作業を進める。
f:id:tasotasoso:20210612232624j:plain
ステップ1は完了している体で進める。

認証情報の作成

IAMの画面から作業をするIAMロールを選択し、「認証情報」の「AWS CodeCommitのHTTPS Git 認証情報」から「認証情報を生成」を押す。
f:id:tasotasoso:20210612232917j:plain

これでCodeCommitとやり取りする際の認証情報が生成される。

レポジトリのクローン

ステップ3と表示されたgit cloneのコマンドをコピーし、ローカルで実行する。
すると、認証情報を聞かれるので、先ほど取得した情報を入力する。
f:id:tasotasoso:20210612233030j:plain
これでクローンが成功する。

ファイルのPush

後は普通のGitと同じように作業すればよい。
私はVSCodeが好きなので、VSCodeでファイルを追加し、commitしてpushまで行った。
f:id:tasotasoso:20210612233222j:plain
pushした後、作成したレポジトリを確認すると、確かにファイルが上がっていることが分かる。
f:id:tasotasoso:20210612233343j:plain

まとめ

私はCodeCommitはなんとなく敷居が高いイメージがあって手が出せていなかったが、とても簡単にレポジトリを作成し、ファイルを共有することができた。SNSなどと連携すれば作業に伴って通知やLambdaなどの実行も簡単にできるので、それが良いところだろう。

(小ネタ)別ファイルにしたSQLをAthenaで投げたい

Athenaに投げるSQLを別ファイルにしておきたい。
こうしておくと、管理もしやすくなり、SQLFluffでCIしたりできるようになる。

ディレクトリ構造

同じ階層にathena.pyという名前でpythonスニペットと、sample.sqlという名前のDMLを置いておくとする。

C:.
    athena.py
    sample.sql

PythonからのSQLの読み出し方

DMLをopenしてread()で読み出し、そのままstart_query_executionのQueryString引数に渡せばよいことが分かった。

import boto3

athena = boto3.client('athena', region_name='ap-northeast-1')

DATABASE_NAME = ""
S3BUCKET_NAME = ""

with open("./sample.sql", 'r') as sql_f:
    sql =sql_f.read()


result = athena.start_query_execution(
    QueryString = sql,
    QueryExecutionContext = {
        'Database': DATABASE_NAME
    },
    ResultConfiguration = {
        'OutputLocation': 's3://' + S3BUCKET_NAME,
    }
)

DML

SELECT * 
  FROM "sample_db"."sample_table" 
 limit 10;

digdagでpythonコードを呼び出す

久しぶりにdigdagを使うことになったら、完全に全てを忘れていたので、pythonとの連携方法を再確認した。

以下のdigdagドキュメントのLanguage API - Pythonの挙動を抜粋して確認した。
Language API - Python — Digdag 0.10.0 documentation

f:id:tasotasoso:20190731083120p:plain

ディレクトリ構成

windows環境でやったので、以下のような感じ。tree /Fの結果の抜粋。

C:.
│  workflow.dig
└─tasks
    │  __init__.py

digdagのワークフロー定義ファイル

pythonは>pyの行のフォーマットで呼び出す。
.でパッケージ構造を指定する。

  • workflow.dig
+step1:
  py>: tasks.MyWorkflow.step1

+step2:
  py>: tasks.MyWorkflow.step2

pythonスニペット

ドキュメントによると、__init__.pyに定義を書いておくスタイルらしい。

  • __init__.py
import digdag

class MyWorkflow(object):
  def step1(self):
    print("step1")
    digdag.env.store({'my_value': 1})

  def step2(self):
    print("step2: %s" % digdag.env.params["my_value"])

1発動かして挙動を確認するには、workflow.digのあるディレクトリでdigdag runコマンドを使う。

digdag run ./workflow.dig

このスニペットを動かすだけで、pythonスニペットの呼び出しと、digdagとの変数のやり取りが確認できるので便利。


あたり前ではあるが、別のパッケージに処理内容を定義しておいて、__init__.pyからimportするようにしておいても動いた。

  • __init__.py
from .task_contents import *
  • task_contents.py
import digdag

class MyWorkflow(object):
  def step1(self):
    print("step1")
    digdag.env.store({'my_value': 1})

  def step2(self):
    print("step2: %s" % digdag.env.params["my_value"])

感想

前回使おうと思ったのが2年前だが、それから結構本格運用している企業が増えたようで、ブログなどの公開情報で品質が高いものが新しく公開されていた。
特に、Digdagからdockerコンテナを呼んだり、Digdagをコンテナで動かしたりなどのノウハウが貯まってきているのがすごい。しっかり読んで、参考にさせていただきたい。

Athenaでjson文字列から値を取り出したい

入れ子jsonになっていると見せかけて、json文字列が入っている場合に、どのようにすればAthenaでデータを取り出すことができるか確認した。

データを見てみる

以下のようなデータを考える。

{"key1": 1, "key2": "{\"key21\":2, \"key22\":3}"}
{"key1": 4, "key2": "{\"key21\":5, \"key22\":6}"}

key2のバリューが文字列になっている。
似たようなパターンで、json文字列ではなくjson入れ子になっているものがある。

{"key1": 1, "key2": {"key21":2, "key22":3}}
{"key1": 4, "key2": {"key21":5, "key22":6}}

パッと見るとおんなじように見えるが、当然格納されているデータが全然違うので、DDLには大きな影響がある。

例えば、入れ子は以下のようにテーブルを定義すればよい。

CREATE EXTERNAL TABLE test_table ( 
    key1 int, 
    key2 struct<
        key21:int, 
        key22:int
    >
) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://test_bucket/'

SELECTしてみると以下のようになる。
f:id:tasotasoso:20210522232917j:plain

後は、SELECTするときに"."演算子を使えば、jsonの要素も取得できる。

一方、json文字列の中身はこの方法では当然とれない。SELECTしてみると、「JSONObjectちゃうやん、文字列やんけ」と、至極ごもっともなご指摘を受ける。
f:id:tasotasoso:20210522232418j:plain

Presto関数を使ってデータを取り出す

取り出す一つの方法として、Presto関数を使うことができる。
テーブルは以下のように定義する。

CREATE EXTERNAL TABLE test_table ( key1 int, key2 string ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://test_bucket/'

DML側でPresto関数のcastとjson_extractおよびjson_parseを使い、json文字列の中身にアクセスして期待した型にキャストする。

SELECT
 cast(json_extract(k2, '$.key21') as int) as key21,
 cast(json_extract(k2, '$.key22') as int) as key22
FROM
(SELECT key1 AS k1,
       json_parse(key2) AS k2
 FROM "test_db"."test_table")


castは、

json_extractとjson_parseは、

を参照してほしい。

クエリの結果は以下のようになる。
f:id:tasotasoso:20210522225809j:plain

ちゃんと値が取れていることが分かる。

その他、Athenaから使えるPresto関数についてはこちらを参照いただきたい。
docs.aws.amazon.com

感想

jsonのバリューにjsonが入っているか、json文字列が入っているかは、よく考えると大きな違いなのですが、私はぱっと見同じような印象を受けるので、今までその違いをどうやってAthenaで吸収するのか深く考えていませんでした。Athenaの話よりかは、ベースになっているPrestoの話なので、検索してもズバリな情報も少なく、かなり面食らってしまいました。上手くパースする方法が分かったので良かったです。

S3イベント→SNS→SQSの構成を試した。

S3イベント→SNS→SQSの構成を試した。
SQSの先にはLambdaが繋がっているようなイメージ。
意外と設定をズバリ書いてくれいてる文献がなかったのでまとめる。

設定手順

S3バケットの作成

イベントを起こすS3バケットがなければ、バケットを作成する。

SNSの設定

SNSでトピックを作成する。
このとき、アクセスポリシーにS3のイベントを検知するための権限設定をすることを忘れない。
アクセスポリシーは以下の公式ドキュメントを参考にするとよい。
チュートリアル: 通知 (SNS トピックまたは SQS キュー) のバケットを設定する - Amazon Simple Storage Service
以下のjsonは上記ドキュメントの「ステップ 2: Amazon SNS トピックを作成する」の3の引用。
"SNS-topic-ARN"、"bucket-name"、"bucket-owner-account-id"は変更する。

{
 "Version": "2012-10-17",
 "Id": "example-ID",
 "Statement": [
  {
   "Sid": "example-statement-ID",
   "Effect": "Allow",
   "Principal": {
    "AWS":"*"  
   },
   "Action": [
    "SNS:Publish"
   ],
   "Resource": "SNS-topic-ARN",
   "Condition": {
      "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:bucket-name" },
      "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" }
   }
  }
 ]
}

これはIAMのポリシー作成画面から作成するのではなく、SNSの画面から直接設定する。Cloudformationテンプレートだとどうなるんだろう?

S3バケットのイベント設定

再度S3バケットに戻る。「プロパティ」>「イベント通知」で発火させたいイベントを設定する。今回はとりあえずPUTをイベントタイプにした。SNSトピックは先に作成したものを選ぶ。トピックにポリシーを付け損ねていた場合は、ここで権限エラーになる。

SQSキュー作成、SNSサブスクリプション設定

SQSキューを作成し、SNSサブスクリプションを設定する。
SQSの画面から適当にキューを作る。作成後、キューの画面から「SNS サブスクリプション」で先ほど作成したSNSトピックのARNを設定する。

  • デッドレターキューは設定しておく。

なお、少し脱線するが、Lambdaのトリガーにするようなキューを作る場合は、忘れずにデッドレターキューを設定する。
Amazon SQS デッドレターキュー - Amazon Simple Queue Service

デッドレターキューを設定しておくと、検証中にSQSからデータを取ったLambdaでなにかエラーが吐いた際に、可視性リトライで知らないうちにリトライされて続けることを防ぐことができる。

  • サブスクライブ設定の動作確認

SNSからのサブスクライブがちゃんとできているかは、以下の方法で分かる。

  1. SNSトピックの画面から「メッセージの発行」でテストメッセージを送る。
  2. SQSの画面で「メッセージを受信」>「メッセージをポーリング」でメッセージを受け付け、中身を見て送ったメッセージであることを確認する。

動作確認

とりあえずバケットとキーを取り出す。

Lambdaを作成し、動作確認を行う。
だいぶ泥臭いが、以下のような感じのコードでPUTされたS3バケットとオブジェクトを取ることができた。

import json

def lambda_handler(event, context):
    
    for msg in event["Records"]:
        notification = json.loads(msg["body"])
        records = json.loads(notification["Message"])
        for record in records["Records"]:
            print("Backet is: ", record["s3"]["bucket"]["name"])
            print("Object is: ", record["s3"]["object"]["key"])
            
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

メッセージの構造

以下の3つのメッセージ構造が組み合わさる。

S3のイベントメッセージがSNSのHTTP/HTTPS 通知の JSONの"Message"キーのバリューに入り、さらにこの全体がlambdaでSQSからメッセージを取り出した時の形式の"body"キーのバリューに入るような感じになる。むずい。

postgreSQLのパラレルクエリを試してみたけど、上手くいかなかった。

PostgreSQLに興味がでてきたので、PostgreSQL徹底入門(第4版)を読んでいる。

P.111に「パラレルクエリ」が出てきてとても面白そうだったので、試してみた。
結構設定が難しかったのと、プロセスが上手く立ち上げられず、結果的にパフォーマンスが落ちてしまった。
あまり日本語文献がなかったので、調べた資料のまとめとして記事にしておく

パラレルクエリは、PostgreSQLの、複数のCPUを利用して、より速く問い合わせに答えることができるような問い合わせ計画を立てる機能。

環境

準備

データ

以下の手順で作成した。

  1. pgAdminから、testという名前で、DBを作成する。クエリをかけるユーザーも作成し、ログインとSELECTできるように権限を割り当てておく。
  2. 以下のコマンドで、pgbenchを使ってテストデータを作成する。
pgbench -i test -s 50

変数設定

公式ドキュメントから、パラレルクエリを利用するためにはいくつか変数を設定する必要があることが分かる。
PostgreSQL: Documentation: 11: 15.2. When Can Parallel Query Be Used?

  • max_parallel_workers_per_gather: 1以上
  • dynamic_shared_memory_type: none以外

また、今回は普通にpsqlコマンドプロンプトからpostgreSQLにアクセスして、上記2つの設定をしてクエリを試したが、パラレルクエリにならなかった。いろいろ調べて、force_parallel_modeをonにするとパラレルで実行された。

  • force_parallel_mode: on

PostgreSQL: Documentation: 11: 19.7. Query Planning
によると、

Allows the use of parallel queries for testing purposes even in cases where no performance benefit is expected. The allowed values of force_parallel_mode are off (use parallel mode only when it is expected to improve performance), on (force parallel query for all queries for which it is thought to be safe), and regress (like on, but with additional behavior changes as explained below).

とあるので、今回検証したクエリをローカルPC上のpostgreSQLで実行した場合は性能が上がらないのだろう。賢い。

max_parallel_workers_per_gatherとforce_parallel_modeは、psqlでpostgrsサーバーにアクセスし、以下のコマンドを実行すれば設定できる。max_parallel_workers_per_gatherは2とする(最大のワーカープロセスが2つになる)。

SET max_parallel_workers_per_gather=2;
SET force_parallel_mode=on;

確認は以下でできる。

SHOW max_parallel_workers_per_gather;
SHOW force_parallel_mode;
SHOW dynamic_shared_memory_type;

dynamic_shared_memory_typeはwindowsだとデフォルトで"windows"になっているようだった。

パラレルクエリを投げてみる

パラレルクエリを投げてみるといっても、特に特別なことをするわけではなかった。
上記設定をする前とした後で、どのようにクエリの実行時のプロファイルが変わるかEXPLAINを使って見てみる。

max_parallel_workers_per_gatherが0のとき

# パラレルクエリを無効にしておく。
test=> SET max_parallel_workers_per_gather=0;

test=> EXPLAIN ANALYZE VERBOSE SELECT MAX(bid) FROM pgbench_accounts;
                                                             QUERY PLAN                                          
------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=2890.00..2890.01 rows=1 width=4) (actual time=10.692..10.692 rows=1 loops=1)
   Output: max(bid)
   ->  Seq Scan on public.pgbench_accounts  (cost=0.00..2640.00 rows=100000 width=4) (actual time=0.009..4.484 rows=100000 loops=1)
         Output: aid, bid, abalance, filler
 Planning Time: 0.082 ms
 Execution Time: 10.713 ms
(6 行)

普通にクエリが投げられた。実行時間は10ms程度。

max_parallel_workers_per_gatherが2のとき

# パラレルクエリを有効にしておく。
test=> SET max_parallel_workers_per_gather=2;

test=> EXPLAIN ANALYZE VERBOSE SELECT MAX(bid) FROM pgbench_accounts;
                                                                QUERY PLAN                                       
------------------------------------------------------------------------------------------------------------------------------------------
 Gather  (cost=3890.00..3890.11 rows=1 width=4) (actual time=44.050..47.663 rows=1 loops=1)
   Output: (max(bid))
   Workers Planned: 1
   Workers Launched: 1
   Single Copy: true
   ->  Aggregate  (cost=2890.00..2890.01 rows=1 width=4) (actual time=13.309..13.309 rows=1 loops=1)
         Output: max(bid)
         Worker 0: actual time=13.309..13.309 rows=1 loops=1
         ->  Seq Scan on public.pgbench_accounts  (cost=0.00..2640.00 rows=100000 width=4) (actual time=0.205..4.729 rows=100000 loops=1)
               Output: aid, bid, abalance, filler
               Worker 0: actual time=0.205..4.729 rows=100000 loops=1
 Planning Time: 0.321 ms
 Execution Time: 47.690 ms
(13 行)

1個しかワーカーができなかったが、Worker0というプロセスが立ち上がっているようだった。残念ながら、2つ目以降は立ち上げられなかった。実行時間も47.69msに落ちていた。おそらく、司令塔のプロセスとワーカープロセスの間で通信したり、ワーカープロセスを立ち上げたりするのに余分に時間がかかってしまっているのではと思われる。force_parallel_modeをonにしないとパラレルクエリ風に動いてくれないのもこれが原因だろう。

感想

参考にしたサイトとほとんど同じやり方を試してみたが、結果として性能が悪くなってしまった。Postgresのバージョンが違うこともありそうだが、なんでだろう。CPUがインテルじゃないから?RDBMSは奥が深い......。