비밀번호

커뮤니티2

  • 맑음속초13.3맑음북춘천3.0맑음철원2.7맑음동두천4.3맑음파주5.3맑음대관령6.4맑음춘천3.1황사백령도10.8황사북강릉11.5맑음강릉13.0맑음동해13.5황사서울7.6황사인천8.1맑음원주4.1황사울릉도12.1맑음수원5.9맑음영월3.0맑음충주3.7맑음서산2.9맑음울진10.3황사청주5.9황사대전5.1맑음추풍령2.5황사안동5.0맑음상주5.4황사포항9.4맑음군산4.6황사대구8.4황사전주4.7황사울산8.7황사창원10.8황사광주6.8구름많음부산11.1구름많음통영8.9황사목포6.8황사여수8.6황사흑산도7.4구름많음완도6.4맑음고창2.8구름많음순천7.0황사홍성5.2맑음서청주3.4황사제주9.7맑음고산10.0구름많음성산10.9구름많음서귀포13.7구름많음진주7.2맑음강화7.5맑음양평4.1맑음이천4.2맑음인제4.5맑음홍천2.9맑음태백9.0맑음정선군3.2맑음제천2.0맑음보은1.4맑음천안2.7맑음보령3.9맑음부여2.3맑음금산1.8맑음세종3.0맑음부안5.0구름많음임실0.9맑음정읍3.8구름많음남원2.6구름많음장수0.0맑음고창군4.5구름많음영광군3.4구름많음김해시10.8구름많음순창군2.7구름많음북창원10.7구름많음양산시11.9구름많음보성군6.4구름많음강진군5.3구름많음장흥4.0흐림해남5.6구름많음고흥4.3구름많음의령군4.3구름많음함양군2.2구름많음광양시6.8흐림진도군7.8맑음봉화2.0맑음영주4.2맑음문경4.3맑음청송군3.1구름많음영덕8.8맑음의성3.4맑음구미6.5맑음영천6.8구름많음경주시8.7구름많음거창2.5구름많음합천4.7구름많음밀양9.3구름많음산청4.7구름많음거제11.8구름많음남해10.0구름많음북부산11.9
  • 2026.04.21(화)

데이터 엔지니어링데이터 엔지니어링

Airflow task를 ECS Fargate task로 실행하기(Airflow ECSOperator)

Airflow의 확장 Provider인 apache-airflow-providers-amazon의 ECSOperator를 사용하여 Airflow DAG내 Task들을 컨테이너로써 실행시킬 수 있다. Airflow KubernetesExecutor를 사용할때와 동일하게 이미지로 task 컨테이너를 띄워 Pod로 돌리듯이 ECS에 Airflow 테스크를 정의하여 실행시키면 된다.

 

이러한 방법으로 얻는 이점은

1. 테스크를 Fargate로 실행시키므로써 task를 실행시킬 컨테이너를 띄울 노드 인스턴스를 항시 켜놓지 않아도 작업이 필요한 순간에만 컴퓨팅을 하기 때문에 비용을 절감할 수 있다.

2. Fargate 테스크가 Airflow task마다 각각 실행되어 독립된 환경에서의 task실행을 보장할 수 있다.

 

또한 각각의 task마다 필요한만큼 따로 용량 프로비저닝도 가능하여 유연한 작업 실행 환경을 구성할 수도 있다.

 

우선 테스크 컨테이너를 실행할 ECS 클러스터를 생성한다.

클러스터에는 기본적으로 Fargate, Fargate SPOT 용량공급자가 사용 가능하다.

필요시 EC2 인스턴스, 또는 외부 인스턴스 용량 공급자도 같이 선택할 수 있다.

ECS에서 말하는 용량 공급자란 필요한 서비스 또는 테스크 컨테이너를 띄워줄 수 있는 인프라를 말한다.

 

클러스터를 생성했다면 Airflow를 구성한다.

Airflow를 실행하는 방법은 여러가지가 있으나 해당 글에서는 다루지 않음

 

Airflow에 AWS 커넥션을 구성해야 한다.

이 또한 여러 방법이 있지만 Airflow conn을 구성하지 않고 기본 커넥션으로 airflow.cfg에 구성하는 방법을 사용함

- AIRFLOW_CONN_AWS_DEFAULT={"conn_type":"aws","login":"<AWS 액세스키>","password":"<AWS 비밀키>"}
- AWS_DEFAULT_REGION=<AWS 기본 리전>

위와같이 환경변수를 구성하여 컨테이너에서 Airflow를 실행하는 방법

 

아래는 DAGRun API를 통해 비동기로 실행되는 DAG 예제 코드이다.

 

import sys

sys.path.append("/opt/airflow")


import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.amazon.aws.operators.ecs import (

    EcsRegisterTaskDefinitionOperator,

    EcsRunTaskOperator,

)


from src.config import SUBNET_MAPPED_2A_PRIVATE, SUBNET_MAPPED_2B_PRIVATE


# Constants

DAG_ID = "ECS-FARGATE-DAG"

AWS_REGION = "ap-northeast-2"

ECS_CLUSTER = "test"

DOCKER_IMAGE = "<Conatiner Image URL>"

AWS_LOG_GROUP = "/ecs/test"


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': pendulum.now(),

    'retries': 0,

}


task_definition_params = {

    "requiresCompatibilities": ["FARGATE"],

    "cpu": "4096",

    "memory": "30720",

    "runtimePlatform": {"operatingSystemFamily": "LINUX", "cpuArchitecture": "X86_64"},

    "networkMode": "awsvpc",

    "executionRoleArn": "<AWSRoleARN>",

}


network_configuration = {

    "awsvpcConfiguration": {

        "subnets": [<사용할 subnets>],

    }

}


def create_ecs_operator(task_id, cpu, memory, trigger_rule=TriggerRule.ALL_SUCCESS):

    return EcsRunTaskOperator(

        task_id=task_id,

        cluster=ECS_CLUSTER,

        task_definition=task_definition.output,

        launch_type="FARGATE",

        network_configuration=network_configuration,

        overrides={

            "containerOverrides": [

                {

                    "name": "test",

                    "environment": [

                        {"name": "param", "value": "{{ dag_run.conf['param1'] }}"},

                    ],

                    "command": ["python", "main.py", "--task", task_id],

                    "cpu": cpu,

                    "memory": memory,

                }

            ]

        },

        trigger_rule=trigger_rule,

    )


with DAG(DAG_ID, schedule=None, max_active_runs=100, default_args=default_args) as dag:

    task_definition = EcsRegisterTaskDefinitionOperator(

        task_id="task_definition",

        family="test",

        container_definitions=[

            {

                "name": "test",

                "image": DOCKER_IMAGE,

                "logConfiguration": {

                    "logDriver": "awslogs",

                    "options": {

                        "awslogs-region": AWS_REGION,

                        "awslogs-group": AWS_LOG_GROUP,

                        "awslogs-stream-prefix": "REPORT",

                        "awslogs-create-group": "true",

                    },

                },

            },

        ],

        **task_definition_params

    )


    task1 = create_ecs_operator("task1", 256, 512)

    task2 = create_ecs_operator("task2", 256, 512)


    # Define dependencies

    task1 >> task2

 

 

- 정의된 task_definition에 각각의 테스크에 추가로 containerOverrides를 정의하여 필요한 용량 프로비저닝 등을 설정한다.


전체댓글0

검색결과는 총 26건 입니다.    글쓰기
1 2