비밀번호

커뮤니티2

  • 흐림속초11.2비북춘천6.6흐림철원5.2흐림동두천6.3흐림파주5.0흐림대관령4.5흐림춘천6.9비백령도6.5흐림북강릉10.1흐림강릉12.0흐림동해8.8흐림서울8.4흐림인천7.4흐림원주8.4구름많음울릉도8.3흐림수원7.8흐림영월3.3흐림충주6.2흐림서산7.4구름많음울진8.9흐림청주8.6흐림대전7.9흐림추풍령4.5흐림안동6.5흐림상주6.4흐림포항9.3흐림군산7.1흐림대구8.0박무전주7.4흐림울산7.4흐림창원7.9흐림광주10.6흐림부산10.1흐림통영8.4흐림목포9.6흐림여수9.3흐림흑산도9.2흐림완도10.2흐림고창8.5흐림순천5.1박무홍성7.2흐림서청주7.5흐림제주10.2흐림고산11.9흐림성산10.7흐림서귀포11.9흐림진주6.3흐림강화6.0흐림양평6.9흐림이천7.9흐림인제5.1흐림홍천5.4흐림태백3.0구름많음정선군2.2흐림제천4.2흐림보은6.9흐림천안6.0흐림보령8.3흐림부여6.6흐림금산7.1흐림세종7.0흐림부안7.3흐림임실8.1흐림정읍7.2흐림남원7.3흐림장수5.4흐림고창군7.6흐림영광군8.5흐림김해시8.5흐림순창군7.3흐림북창원10.2흐림양산시6.8흐림보성군6.9흐림강진군10.2흐림장흥10.9흐림해남11.2흐림고흥7.1흐림의령군6.6흐림함양군5.5흐림광양시8.7흐림진도군9.9구름많음봉화0.6구름많음영주3.8흐림문경6.1흐림청송군2.6구름많음영덕5.7구름많음의성4.2흐림구미7.0흐림영천4.6흐림경주시4.3흐림거창4.9흐림합천7.0흐림밀양6.4흐림산청6.4흐림거제7.5흐림남해8.2흐림북부산6.4
  • 2025.04.05(토)

데이터 엔지니어링데이터 엔지니어링

Airflow task를 ECS Fargate task로 실행하기(Airflow ECSOperator)

Airflow의 확장 Provider인 apache-airflow-providers-amazon의 ECSOperator를 사용하여 Airflow DAG내 Task들을 컨테이너로써 실행시킬 수 있다. Airflow KubernetesExecutor를 사용할때와 동일하게 이미지로 task 컨테이너를 띄워 Pod로 돌리듯이 ECS에 Airflow 테스크를 정의하여 실행시키면 된다.

 

이러한 방법으로 얻는 이점은

1. 테스크를 Fargate로 실행시키므로써 task를 실행시킬 컨테이너를 띄울 노드 인스턴스를 항시 켜놓지 않아도 작업이 필요한 순간에만 컴퓨팅을 하기 때문에 비용을 절감할 수 있다.

2. Fargate 테스크가 Airflow task마다 각각 실행되어 독립된 환경에서의 task실행을 보장할 수 있다.

 

또한 각각의 task마다 필요한만큼 따로 용량 프로비저닝도 가능하여 유연한 작업 실행 환경을 구성할 수도 있다.

 

우선 테스크 컨테이너를 실행할 ECS 클러스터를 생성한다.

클러스터에는 기본적으로 Fargate, Fargate SPOT 용량공급자가 사용 가능하다.

필요시 EC2 인스턴스, 또는 외부 인스턴스 용량 공급자도 같이 선택할 수 있다.

ECS에서 말하는 용량 공급자란 필요한 서비스 또는 테스크 컨테이너를 띄워줄 수 있는 인프라를 말한다.

 

클러스터를 생성했다면 Airflow를 구성한다.

Airflow를 실행하는 방법은 여러가지가 있으나 해당 글에서는 다루지 않음

 

Airflow에 AWS 커넥션을 구성해야 한다.

이 또한 여러 방법이 있지만 Airflow conn을 구성하지 않고 기본 커넥션으로 airflow.cfg에 구성하는 방법을 사용함

- AIRFLOW_CONN_AWS_DEFAULT={"conn_type":"aws","login":"<AWS 액세스키>","password":"<AWS 비밀키>"}
- AWS_DEFAULT_REGION=<AWS 기본 리전>

위와같이 환경변수를 구성하여 컨테이너에서 Airflow를 실행하는 방법

 

아래는 DAGRun API를 통해 비동기로 실행되는 DAG 예제 코드이다.

 

import sys

sys.path.append("/opt/airflow")


import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.amazon.aws.operators.ecs import (

    EcsRegisterTaskDefinitionOperator,

    EcsRunTaskOperator,

)


from src.config import SUBNET_MAPPED_2A_PRIVATE, SUBNET_MAPPED_2B_PRIVATE


# Constants

DAG_ID = "ECS-FARGATE-DAG"

AWS_REGION = "ap-northeast-2"

ECS_CLUSTER = "test"

DOCKER_IMAGE = "<Conatiner Image URL>"

AWS_LOG_GROUP = "/ecs/test"


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': pendulum.now(),

    'retries': 0,

}


task_definition_params = {

    "requiresCompatibilities": ["FARGATE"],

    "cpu": "4096",

    "memory": "30720",

    "runtimePlatform": {"operatingSystemFamily": "LINUX", "cpuArchitecture": "X86_64"},

    "networkMode": "awsvpc",

    "executionRoleArn": "<AWSRoleARN>",

}


network_configuration = {

    "awsvpcConfiguration": {

        "subnets": [<사용할 subnets>],

    }

}


def create_ecs_operator(task_id, cpu, memory, trigger_rule=TriggerRule.ALL_SUCCESS):

    return EcsRunTaskOperator(

        task_id=task_id,

        cluster=ECS_CLUSTER,

        task_definition=task_definition.output,

        launch_type="FARGATE",

        network_configuration=network_configuration,

        overrides={

            "containerOverrides": [

                {

                    "name": "test",

                    "environment": [

                        {"name": "param", "value": "{{ dag_run.conf['param1'] }}"},

                    ],

                    "command": ["python", "main.py", "--task", task_id],

                    "cpu": cpu,

                    "memory": memory,

                }

            ]

        },

        trigger_rule=trigger_rule,

    )


with DAG(DAG_ID, schedule=None, max_active_runs=100, default_args=default_args) as dag:

    task_definition = EcsRegisterTaskDefinitionOperator(

        task_id="task_definition",

        family="test",

        container_definitions=[

            {

                "name": "test",

                "image": DOCKER_IMAGE,

                "logConfiguration": {

                    "logDriver": "awslogs",

                    "options": {

                        "awslogs-region": AWS_REGION,

                        "awslogs-group": AWS_LOG_GROUP,

                        "awslogs-stream-prefix": "REPORT",

                        "awslogs-create-group": "true",

                    },

                },

            },

        ],

        **task_definition_params

    )


    task1 = create_ecs_operator("task1", 256, 512)

    task2 = create_ecs_operator("task2", 256, 512)


    # Define dependencies

    task1 >> task2

 

 

- 정의된 task_definition에 각각의 테스크에 추가로 containerOverrides를 정의하여 필요한 용량 프로비저닝 등을 설정한다.


전체댓글0

검색결과는 총 25건 입니다.    글쓰기
1 2