MWAA とは?
MWAA = Amazon Managed Workflows for Apache Airflow です。AWS がマネージドで Airflow を使用できるようにしてくれたやつです。
GCP 側にも Cloud Composer という、GCP がマネージドで Airflow を用意してくれているものがあります。なので、要件でどうしても AWS と GCP どちらも使わなければいけないという事でない限りは本記事は無用になると思います。
そういえば、余り関係ないですが GCP って名前変わりましたよね。Google Cloud Platform から Google Cloud と変更されたので、ちゃんと略すとするなら GC?初見だと何か分からないですね。ということで筆者は相変わらず GCP と略しています。
あと Apache も名称変更の要請が出ているようで、頭の A の文字が変わったら MWAA も略し方変わってしまうんですかね。
個人での検証用に建ててたら、それはそれはいいお値段がするので、誰かの助けになればと書き残します。
GCP 側リソースの作成
早速 GCP 側に下記のリソースを作成します。
- AWS MWAA 側から使用する サービスアカウント
- 権限は今回 BigQuery 管理者
- Workload Identity Pool
- 上記 Workload Identity Pool に Provider
以下 GCP の CloudShell 上で作業していきます。権限が足りない方などは適宜追加しましょう。
パラメータ設定
POOL_NAME="pl-aws-mwaa"
PROVIDER_NAME="pvdr-aws-mwaa"
SERVICE_ACCOUNT="sa-aws-mwaa"
SERVICE_ACCOUNT_DESC="Access GCP services from the MWAA in AWS."
SA_EMAIL="${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com"
AWS_IAM_ROLE="<MWAA で使用している IAM Role 名。ARN ではないので注意>"
MWAA 用サービスアカウント作成
gcloud iam service-accounts create "${SERVICE_ACCOUNT}" --description="${SERVICE_ACCOUNT_DESC}" --display-name="${SERVICE_ACCOUNT}"
BigQuery 管理者権限をサービスアカウントへ付与
他の権限にしたい人は適宜変更
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member=serviceAccount:${SA_EMAIL} \
--role=roles/bigquery.admin
Workload Identity Pool 作成
gcloud iam workload-identity-pools create "${POOL_NAME}" \
--project="${PROJECT_ID}" \
--location="global" \
--display-name="Access from AWS MWAA"
Workload Identity Pool - Provider 作成
Pool 情報取得
WORKLOAD_IDENTITY_POOL=$(gcloud iam workload-identity-pools describe "${POOL_NAME}" \
--project="${PROJECT_ID}" \
--location="global" --format="value(name)")
Provider 作成
gcloud iam workload-identity-pools providers create-aws $PROVIDER_NAME \
--location="global" \
--workload-identity-pool=$POOL_NAME \
--account-id=$AWS_ACCOUND_ID \
--attribute-mapping='google.subject=assertion.arn,attribute.aws_role=assertion.arn.contains("assumed-role") ? assertion.arn.extract("{account_arn}assumed-role/") + "assumed-role/" + assertion.arn.extract("assumed-role/{role_name}/") : assertion.arn'
Pool のサービスアカウント連携
gcloud iam service-accounts add-iam-policy-binding "${SA_EMAIL}" \
--role="roles/iam.workloadIdentityUser" \
--member="principalSet://iam.googleapis.com/${WORKLOAD_IDENTITY_POOL}/attribute.aws_role/arn:aws:sts::${AWS_ACCOUND_ID}:assumed-role/${AWS_IAM_ROLE}"
AWS 側で使用する接続情報ファイル生成
gcloud iam workload-identity-pools create-cred-config \
"${WORKLOAD_IDENTITY_POOL}/providers/${PROVIDER_NAME}" \
--service-account="${SA_EMAIL}" \
--aws --output-file="/tmp/gcp-wi-config.json"
接続情報ファイル確認
cat /tmp/gcp-wi-config.json
接続情報ファイル(クライアント ライブラリの構成ファイル)の罠
下記のような JSON ファイルができます。ただし、MWAA ではこれはこのまま使えないんです。GCP 側がコンテナのタスクで使用することを想定していないのか、インスタンスメタデータ用の 169.254.169.254 アドレスに取りに行ってます。ここは自力でタスクメタデータを取得しなければいけません。
単純にアドレスをタスクメタデータの 169.254.170.2 に変更してもダメなので、削除した上で、後でひと手間加えます。
gcp-wi-config.json 変更前
{
"type": "external_account",
"audience": "//iam.googleapis.com/projects/<PROJECT_NUMBER>/locations/global/workloadIdentityPools/pl-aws-mwaa/providers/pvdr-aws-mwaa",
"subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"environment_id": "aws1",
"region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
"regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
},
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/sa-aws-mwaa@<PROJECT_ID>.iam.gserviceaccount.com:generateAccessToken"
}
上記ファイルの region_url と url を削除し、下記の状態にして AWS 側へ持っていきます。
gcp-wi-config.json 変更後
{
"type": "external_account",
"audience": "//iam.googleapis.com/projects/<PROJECT_NUMBER>/locations/global/workloadIdentityPools/pl-aws-mwaa/providers/pvdr-aws-mwaa",
"subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
"token_url": "https://sts.googleapis.com/v1/token",
"credential_source": {
"environment_id": "aws1",
"regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
},
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/sa-aws-mwaa@<PROJECT_ID>.iam.gserviceaccount.com:generateAccessToken"
}
AWS 側でやること
以下作業していきます。Airflow の環境更新が必要です。
config-gcp-wi.json を DAG ディレクトリへ配置
gcp-wi-config.json を DAG ファイル置き場 dags/ へ配置。タスクから読み込めれば他の場所でもOK。筆者は下記の配置にし、dags に入れました。
plugins.zip の作成
Airflow 内で環境変数を設定するための作業です。ローカル環境など、どこかで plugins ディレクトリ配下へ下記ファイル env_var_plugin.py を作成します。この中で、自力でタスクメタデータを取得しています。
plugins/env_var_plugin.py
from airflow.plugins_manager import AirflowPlugin
import os
from urllib.parse import urljoin
import requests
# Get AWS credentials
task_meta_url = 'http://169.254.170.2'
relative_uri_path = os.environ.get('AWS_CONTAINER_CREDENTIALS_RELATIVE_URI')
metadata_url = urljoin(task_meta_url, relative_uri_path)
aws_cred = requests.get(metadata_url).json()
# Set AWS credentials
os.environ["AWS_ACCESS_KEY_ID"] = aws_cred['AccessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_cred['SecretAccessKey']
os.environ["AWS_SESSION_TOKEN"] = aws_cred['Token']
# Set GCP account info
os.environ["GOOGLE_CLOUD_PROJECT"] = "<GCP_PROJECT_ID>"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/usr/local/airflow/dags/gcp-wi-config.json"
class EnvVarPlugin(AirflowPlugin):
name = 'env_var_plugin'
作成後 plugins.zip として圧縮。
zip -r ../plugins.zip ./
requirements.txt へ requests 追加
env_var_plugin.py で requests を使っているので requirements.txt に requests がない場合はここで追加。追加したくない人はそれぞれ違う取得方法で取ってください。
Airflow 設定オプションに core.lazy_load_plugins: False を設定
環境の更新をする際に Airflow 設定オプション内で core.lazy_load_plugins
: False
の設定をします。
ではてきとーに簡単な DAG を作成してクエリを投げてみます。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
now = datetime.now()
args = {
'owner': 'airflow',
'start_date': now,
'schedule_interval': "00 23 * * *",
'retries': 0,
'retry_delay': timedelta(minutes=10),
"is_paused_upon_creation": True,
}
with DAG(dag_id='TestBigQuery', default_args=args) as dag:
start = DummyOperator(
task_id='start'
)
end = DummyOperator(
task_id='end'
)
bq_insert = BigQueryOperator(
task_id='bq_insert',
sql='INSERT `<GCP_PROJECT_ID>.<DATASET_ID>.<TABLE_NAME>` (id, name, ts) VALUES (555, "ydak", TIMESTAMP("{}"));'.format(
now.strftime("%Y-%m-%d %H:%M:%S"),
),
use_legacy_sql=False,
dag=dag,
)
start >> bq_insert >> end
以下が結果です。success してます。
*** Reading remote log from Cloudwatch log_group: airflow-MyAirflowEnvironment-Task log_stream: TestBigQuery/bq_insert/2023-01-21T12_38_34.695211+00_00/1.log.
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1035}} INFO - Dependencies all met for <TaskInstance: TestBigQuery.bq_insert manual__2023-01-21T12:38:34.695211+00:00 [queued]>
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1035}} INFO - Dependencies all met for <TaskInstance: TestBigQuery.bq_insert manual__2023-01-21T12:38:34.695211+00:00 [queued]>
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1241}} INFO -
--------------------------------------------------------------------------------
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1242}} INFO - Starting attempt 1 of 1
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1243}} INFO -
--------------------------------------------------------------------------------
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1262}} INFO - Executing <Task(BigQueryOperator): bq_insert> on 2023-01-21 12:38:34.695211+00:00
[2023-01-21, 12:38:38 UTC] {{standard_task_runner.py:52}} INFO - Started process 172 to run task
[2023-01-21, 12:38:38 UTC] {{standard_task_runner.py:76}} INFO - Running: ['airflow', 'tasks', 'run', 'TestBigQuery', 'bq_insert', 'manual__2023-01-21T12:38:34.695211+00:00', '--job-id', '9', '--raw', '--subdir', 'DAGS_FOLDER/test10.py', '--cfg-path', '/tmp/tmpmucxv_ny', '--error-file', '/tmp/tmpta64tun1']
[2023-01-21, 12:38:38 UTC] {{standard_task_runner.py:77}} INFO - Job 9: Subtask bq_insert
[2023-01-21, 12:38:38 UTC] {{logging_mixin.py:109}} INFO - Running <TaskInstance: TestBigQuery.bq_insert manual__2023-01-21T12:38:34.695211+00:00 [running]> on host ip-10-192-20-51.ap-northeast-1.compute.internal
[2023-01-21, 12:38:38 UTC] {{taskinstance.py:1429}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=TestBigQuery
AIRFLOW_CTX_TASK_ID=bq_insert
AIRFLOW_CTX_EXECUTION_DATE=2023-01-21T12:38:34.695211+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-01-21T12:38:34.695211+00:00
[2023-01-21, 12:38:38 UTC] {{bigquery.py:684}} INFO - Executing: INSERT `<GCP_PROJECT_ID>.<DATASET_ID>.<TABLE_NAME>` (id, name, ts) VALUES (555, "ydak", TIMESTAMP("2023-01-21 12:38:38"));
[2023-01-21, 12:38:38 UTC] {{logging_mixin.py:109}} WARNING - /usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/bigquery.py:2191 DeprecationWarning: This method is deprecated. Please use `BigQueryHook.insert_job` method.
[2023-01-21, 12:38:38 UTC] {{credentials_provider.py:295}} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2023-01-21, 12:38:40 UTC] {{bigquery.py:1637}} INFO - Inserting job airflow_1674304720084345_f01ade31811c937f05f099a513e71fcc
[2023-01-21, 12:38:42 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=TestBigQuery, task_id=bq_insert, execution_date=20230121T123834, start_date=20230121T123838, end_date=20230121T123842
[2023-01-21, 12:38:42 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
[2023-01-21, 12:38:42 UTC] {{local_task_job.py:264}} INFO - 1 downstream tasks scheduled from follow-on schedule check
データ確認
BigQuery に直接 SELECT します。無事にいました。
めでたしめでたし。