비밀번호

커뮤니티2

  • 맑음속초24.7맑음북춘천22.7맑음철원20.7맑음동두천21.6맑음파주20.6맑음대관령17.5맑음춘천22.4맑음백령도19.2맑음북강릉25.2맑음강릉25.0구름조금동해26.2맑음서울21.6구름조금인천17.6맑음원주21.5맑음울릉도17.7구름많음수원21.4맑음영월21.3맑음충주22.6맑음서산18.7맑음울진25.3맑음청주23.7맑음대전22.4맑음추풍령21.5맑음안동23.1맑음상주23.0맑음포항24.3맑음군산20.1맑음대구24.6맑음전주21.6맑음울산20.6맑음창원22.0맑음광주22.4맑음부산19.2맑음통영21.2맑음목포19.7맑음여수19.7맑음흑산도19.6맑음완도22.4맑음고창20.9맑음순천22.3맑음홍성20.4맑음서청주22.0맑음제주20.7맑음고산18.9맑음성산20.5맑음서귀포21.1맑음진주22.6구름조금강화17.5맑음양평22.8맑음이천22.7구름조금인제21.6맑음홍천22.4구름조금태백20.3맑음정선군23.2맑음제천21.1맑음보은21.4구름조금천안22.2맑음보령18.5맑음부여21.5맑음금산21.1맑음세종22.8맑음부안21.3맑음임실21.7맑음정읍22.0맑음남원23.5맑음장수20.4맑음고창군22.2맑음영광군20.7맑음김해시20.5맑음순창군23.0맑음북창원23.8맑음양산시21.7맑음보성군23.5맑음강진군23.2맑음장흥22.6맑음해남21.5맑음고흥23.4맑음의령군24.2맑음함양군23.8맑음광양시23.7맑음진도군19.9맑음봉화21.2구름조금영주21.6구름조금문경22.2맑음청송군22.5맑음영덕23.2맑음의성24.1맑음구미24.4맑음영천24.0맑음경주시24.8맑음거창23.5맑음합천25.6맑음밀양24.7맑음산청24.5맑음거제19.9맑음남해22.5맑음북부산21.0
  • 2024.05.09(목)

데이터 엔지니어링데이터 엔지니어링

Airflow task를 ECS Fargate task로 실행하기(Airflow ECSOperator)

Airflow의 확장 Provider인 apache-airflow-providers-amazon의 ECSOperator를 사용하여 Airflow DAG내 Task들을 컨테이너로써 실행시킬 수 있다. Airflow KubernetesExecutor를 사용할때와 동일하게 이미지로 task 컨테이너를 띄워 Pod로 돌리듯이 ECS에 Airflow 테스크를 정의하여 실행시키면 된다.

 

이러한 방법으로 얻는 이점은

1. 테스크를 Fargate로 실행시키므로써 task를 실행시킬 컨테이너를 띄울 노드 인스턴스를 항시 켜놓지 않아도 작업이 필요한 순간에만 컴퓨팅을 하기 때문에 비용을 절감할 수 있다.

2. Fargate 테스크가 Airflow task마다 각각 실행되어 독립된 환경에서의 task실행을 보장할 수 있다.

 

또한 각각의 task마다 필요한만큼 따로 용량 프로비저닝도 가능하여 유연한 작업 실행 환경을 구성할 수도 있다.

 

우선 테스크 컨테이너를 실행할 ECS 클러스터를 생성한다.

클러스터에는 기본적으로 Fargate, Fargate SPOT 용량공급자가 사용 가능하다.

필요시 EC2 인스턴스, 또는 외부 인스턴스 용량 공급자도 같이 선택할 수 있다.

ECS에서 말하는 용량 공급자란 필요한 서비스 또는 테스크 컨테이너를 띄워줄 수 있는 인프라를 말한다.

 

클러스터를 생성했다면 Airflow를 구성한다.

Airflow를 실행하는 방법은 여러가지가 있으나 해당 글에서는 다루지 않음

 

Airflow에 AWS 커넥션을 구성해야 한다.

이 또한 여러 방법이 있지만 Airflow conn을 구성하지 않고 기본 커넥션으로 airflow.cfg에 구성하는 방법을 사용함

- AIRFLOW_CONN_AWS_DEFAULT={"conn_type":"aws","login":"<AWS 액세스키>","password":"<AWS 비밀키>"}
- AWS_DEFAULT_REGION=<AWS 기본 리전>

위와같이 환경변수를 구성하여 컨테이너에서 Airflow를 실행하는 방법

 

아래는 DAGRun API를 통해 비동기로 실행되는 DAG 예제 코드이다.

 

import sys

sys.path.append("/opt/airflow")


import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.amazon.aws.operators.ecs import (

    EcsRegisterTaskDefinitionOperator,

    EcsRunTaskOperator,

)


from src.config import SUBNET_MAPPED_2A_PRIVATE, SUBNET_MAPPED_2B_PRIVATE


# Constants

DAG_ID = "ECS-FARGATE-DAG"

AWS_REGION = "ap-northeast-2"

ECS_CLUSTER = "test"

DOCKER_IMAGE = "<Conatiner Image URL>"

AWS_LOG_GROUP = "/ecs/test"


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': pendulum.now(),

    'retries': 0,

}


task_definition_params = {

    "requiresCompatibilities": ["FARGATE"],

    "cpu": "4096",

    "memory": "30720",

    "runtimePlatform": {"operatingSystemFamily": "LINUX", "cpuArchitecture": "X86_64"},

    "networkMode": "awsvpc",

    "executionRoleArn": "<AWSRoleARN>",

}


network_configuration = {

    "awsvpcConfiguration": {

        "subnets": [<사용할 subnets>],

    }

}


def create_ecs_operator(task_id, cpu, memory, trigger_rule=TriggerRule.ALL_SUCCESS):

    return EcsRunTaskOperator(

        task_id=task_id,

        cluster=ECS_CLUSTER,

        task_definition=task_definition.output,

        launch_type="FARGATE",

        network_configuration=network_configuration,

        overrides={

            "containerOverrides": [

                {

                    "name": "test",

                    "environment": [

                        {"name": "param", "value": "{{ dag_run.conf['param1'] }}"},

                    ],

                    "command": ["python", "main.py", "--task", task_id],

                    "cpu": cpu,

                    "memory": memory,

                }

            ]

        },

        trigger_rule=trigger_rule,

    )


with DAG(DAG_ID, schedule=None, max_active_runs=100, default_args=default_args) as dag:

    task_definition = EcsRegisterTaskDefinitionOperator(

        task_id="task_definition",

        family="test",

        container_definitions=[

            {

                "name": "test",

                "image": DOCKER_IMAGE,

                "logConfiguration": {

                    "logDriver": "awslogs",

                    "options": {

                        "awslogs-region": AWS_REGION,

                        "awslogs-group": AWS_LOG_GROUP,

                        "awslogs-stream-prefix": "REPORT",

                        "awslogs-create-group": "true",

                    },

                },

            },

        ],

        **task_definition_params

    )


    task1 = create_ecs_operator("task1", 256, 512)

    task2 = create_ecs_operator("task2", 256, 512)


    # Define dependencies

    task1 >> task2

 

 

- 정의된 task_definition에 각각의 테스크에 추가로 containerOverrides를 정의하여 필요한 용량 프로비저닝 등을 설정한다.


전체댓글0

검색결과는 총 12건 입니다.    글쓰기
1