비밀번호

커뮤니티2

  • 맑음속초13.6맑음북춘천1.6맑음철원1.1맑음동두천3.4맑음파주2.4맑음대관령4.0맑음춘천1.9황사백령도10.9황사북강릉14.0맑음강릉11.5맑음동해13.3황사서울7.2황사인천7.9맑음원주3.9황사울릉도11.0맑음수원4.9맑음영월1.4맑음충주2.2맑음서산1.1맑음울진9.9황사청주5.1황사대전3.9맑음추풍령1.7황사안동4.7맑음상주5.0황사포항8.7맑음군산3.2황사대구8.1황사전주3.0황사울산8.3맑음창원11.4황사광주5.8맑음부산11.1맑음통영9.0황사목포7.1황사여수8.4황사흑산도6.7구름많음완도6.6구름많음고창1.8구름많음순천6.0황사홍성3.4맑음서청주1.8황사제주9.0구름많음고산9.8구름많음성산9.1흐림서귀포13.4구름많음진주7.8맑음강화6.4맑음양평3.6맑음이천3.6구름많음인제2.7맑음홍천2.9맑음태백4.7맑음정선군1.6맑음제천0.2맑음보은0.8맑음천안1.0맑음보령1.9맑음부여1.2맑음금산1.0맑음세종1.9구름많음부안3.0구름많음임실0.2구름많음정읍2.3구름많음남원1.5구름많음장수-0.6구름많음고창군2.5구름많음영광군3.5맑음김해시10.3구름많음순창군1.9구름많음북창원11.2구름많음양산시12.0맑음보성군6.2구름많음강진군6.0구름많음장흥4.3구름많음해남5.8맑음고흥3.1구름많음의령군3.4구름많음함양군1.0맑음광양시5.6구름많음진도군7.9맑음봉화2.2맑음영주4.0맑음문경3.3맑음청송군2.2맑음영덕7.7맑음의성2.3맑음구미5.0맑음영천6.6구름많음경주시8.5구름많음거창1.7구름많음합천4.1구름많음밀양10.5구름많음산청3.2맑음거제10.9맑음남해8.1구름많음북부산12.2
  • 2026.04.21(화)

데이터 엔지니어링데이터 엔지니어링

Airflow task를 ECS Fargate task로 실행하기(Airflow ECSOperator)

Airflow의 확장 Provider인 apache-airflow-providers-amazon의 ECSOperator를 사용하여 Airflow DAG내 Task들을 컨테이너로써 실행시킬 수 있다. Airflow KubernetesExecutor를 사용할때와 동일하게 이미지로 task 컨테이너를 띄워 Pod로 돌리듯이 ECS에 Airflow 테스크를 정의하여 실행시키면 된다.

 

이러한 방법으로 얻는 이점은

1. 테스크를 Fargate로 실행시키므로써 task를 실행시킬 컨테이너를 띄울 노드 인스턴스를 항시 켜놓지 않아도 작업이 필요한 순간에만 컴퓨팅을 하기 때문에 비용을 절감할 수 있다.

2. Fargate 테스크가 Airflow task마다 각각 실행되어 독립된 환경에서의 task실행을 보장할 수 있다.

 

또한 각각의 task마다 필요한만큼 따로 용량 프로비저닝도 가능하여 유연한 작업 실행 환경을 구성할 수도 있다.

 

우선 테스크 컨테이너를 실행할 ECS 클러스터를 생성한다.

클러스터에는 기본적으로 Fargate, Fargate SPOT 용량공급자가 사용 가능하다.

필요시 EC2 인스턴스, 또는 외부 인스턴스 용량 공급자도 같이 선택할 수 있다.

ECS에서 말하는 용량 공급자란 필요한 서비스 또는 테스크 컨테이너를 띄워줄 수 있는 인프라를 말한다.

 

클러스터를 생성했다면 Airflow를 구성한다.

Airflow를 실행하는 방법은 여러가지가 있으나 해당 글에서는 다루지 않음

 

Airflow에 AWS 커넥션을 구성해야 한다.

이 또한 여러 방법이 있지만 Airflow conn을 구성하지 않고 기본 커넥션으로 airflow.cfg에 구성하는 방법을 사용함

- AIRFLOW_CONN_AWS_DEFAULT={"conn_type":"aws","login":"<AWS 액세스키>","password":"<AWS 비밀키>"}
- AWS_DEFAULT_REGION=<AWS 기본 리전>

위와같이 환경변수를 구성하여 컨테이너에서 Airflow를 실행하는 방법

 

아래는 DAGRun API를 통해 비동기로 실행되는 DAG 예제 코드이다.

 

import sys

sys.path.append("/opt/airflow")


import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.amazon.aws.operators.ecs import (

    EcsRegisterTaskDefinitionOperator,

    EcsRunTaskOperator,

)


from src.config import SUBNET_MAPPED_2A_PRIVATE, SUBNET_MAPPED_2B_PRIVATE


# Constants

DAG_ID = "ECS-FARGATE-DAG"

AWS_REGION = "ap-northeast-2"

ECS_CLUSTER = "test"

DOCKER_IMAGE = "<Conatiner Image URL>"

AWS_LOG_GROUP = "/ecs/test"


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': pendulum.now(),

    'retries': 0,

}


task_definition_params = {

    "requiresCompatibilities": ["FARGATE"],

    "cpu": "4096",

    "memory": "30720",

    "runtimePlatform": {"operatingSystemFamily": "LINUX", "cpuArchitecture": "X86_64"},

    "networkMode": "awsvpc",

    "executionRoleArn": "<AWSRoleARN>",

}


network_configuration = {

    "awsvpcConfiguration": {

        "subnets": [<사용할 subnets>],

    }

}


def create_ecs_operator(task_id, cpu, memory, trigger_rule=TriggerRule.ALL_SUCCESS):

    return EcsRunTaskOperator(

        task_id=task_id,

        cluster=ECS_CLUSTER,

        task_definition=task_definition.output,

        launch_type="FARGATE",

        network_configuration=network_configuration,

        overrides={

            "containerOverrides": [

                {

                    "name": "test",

                    "environment": [

                        {"name": "param", "value": "{{ dag_run.conf['param1'] }}"},

                    ],

                    "command": ["python", "main.py", "--task", task_id],

                    "cpu": cpu,

                    "memory": memory,

                }

            ]

        },

        trigger_rule=trigger_rule,

    )


with DAG(DAG_ID, schedule=None, max_active_runs=100, default_args=default_args) as dag:

    task_definition = EcsRegisterTaskDefinitionOperator(

        task_id="task_definition",

        family="test",

        container_definitions=[

            {

                "name": "test",

                "image": DOCKER_IMAGE,

                "logConfiguration": {

                    "logDriver": "awslogs",

                    "options": {

                        "awslogs-region": AWS_REGION,

                        "awslogs-group": AWS_LOG_GROUP,

                        "awslogs-stream-prefix": "REPORT",

                        "awslogs-create-group": "true",

                    },

                },

            },

        ],

        **task_definition_params

    )


    task1 = create_ecs_operator("task1", 256, 512)

    task2 = create_ecs_operator("task2", 256, 512)


    # Define dependencies

    task1 >> task2

 

 

- 정의된 task_definition에 각각의 테스크에 추가로 containerOverrides를 정의하여 필요한 용량 프로비저닝 등을 설정한다.


전체댓글0

검색결과는 총 26건 입니다.    글쓰기
1 2