비밀번호

커뮤니티2

  • 맑음속초18.9맑음북춘천11.9맑음철원11.8맑음동두천13.5흐림파주14.1맑음대관령10.1맑음춘천13.4맑음백령도14.1맑음북강릉17.7맑음강릉19.7맑음동해19.4맑음서울14.7맑음인천13.9맑음원주14.3맑음울릉도15.2맑음수원13.9맑음영월12.9맑음충주11.0맑음서산13.3맑음울진18.4맑음청주14.8맑음대전13.8맑음추풍령14.2맑음안동13.6맑음상주14.8맑음포항19.2맑음군산12.8맑음대구15.2맑음전주13.1맑음울산15.9맑음창원14.2맑음광주15.1맑음부산16.6맑음통영15.0맑음목포14.3맑음여수16.2맑음흑산도13.6맑음완도14.2맑음고창11.5맑음순천9.6맑음홍성13.5맑음서청주11.5맑음제주14.5맑음고산14.1맑음성산12.6구름조금서귀포14.5맑음진주12.1맑음강화13.0맑음양평13.8맑음이천14.1맑음인제10.8맑음홍천11.8맑음태백12.3맑음정선군9.6맑음제천10.0맑음보은10.2맑음천안10.5맑음보령13.5맑음부여11.8맑음금산11.1맑음세종12.3맑음부안12.7맑음임실9.2맑음정읍10.6맑음남원10.7맑음장수8.5맑음고창군11.0맑음영광군11.1맑음김해시15.0맑음순창군10.9맑음북창원15.6맑음양산시14.7맑음보성군12.6맑음강진군11.0맑음장흥10.3맑음해남10.1맑음고흥11.9맑음의령군13.0맑음함양군12.5맑음광양시14.0맑음진도군10.4맑음봉화9.2맑음영주15.9맑음문경15.8맑음청송군10.2맑음영덕17.1맑음의성10.8맑음구미14.5맑음영천16.6맑음경주시12.5맑음거창10.4맑음합천13.3맑음밀양13.9맑음산청13.0맑음거제13.4맑음남해17.5맑음북부산13.2
  • 2024.05.10(금)

데이터 엔지니어링데이터 엔지니어링

Airflow task를 ECS Fargate task로 실행하기(Airflow ECSOperator)

Airflow의 확장 Provider인 apache-airflow-providers-amazon의 ECSOperator를 사용하여 Airflow DAG내 Task들을 컨테이너로써 실행시킬 수 있다. Airflow KubernetesExecutor를 사용할때와 동일하게 이미지로 task 컨테이너를 띄워 Pod로 돌리듯이 ECS에 Airflow 테스크를 정의하여 실행시키면 된다.

 

이러한 방법으로 얻는 이점은

1. 테스크를 Fargate로 실행시키므로써 task를 실행시킬 컨테이너를 띄울 노드 인스턴스를 항시 켜놓지 않아도 작업이 필요한 순간에만 컴퓨팅을 하기 때문에 비용을 절감할 수 있다.

2. Fargate 테스크가 Airflow task마다 각각 실행되어 독립된 환경에서의 task실행을 보장할 수 있다.

 

또한 각각의 task마다 필요한만큼 따로 용량 프로비저닝도 가능하여 유연한 작업 실행 환경을 구성할 수도 있다.

 

우선 테스크 컨테이너를 실행할 ECS 클러스터를 생성한다.

클러스터에는 기본적으로 Fargate, Fargate SPOT 용량공급자가 사용 가능하다.

필요시 EC2 인스턴스, 또는 외부 인스턴스 용량 공급자도 같이 선택할 수 있다.

ECS에서 말하는 용량 공급자란 필요한 서비스 또는 테스크 컨테이너를 띄워줄 수 있는 인프라를 말한다.

 

클러스터를 생성했다면 Airflow를 구성한다.

Airflow를 실행하는 방법은 여러가지가 있으나 해당 글에서는 다루지 않음

 

Airflow에 AWS 커넥션을 구성해야 한다.

이 또한 여러 방법이 있지만 Airflow conn을 구성하지 않고 기본 커넥션으로 airflow.cfg에 구성하는 방법을 사용함

- AIRFLOW_CONN_AWS_DEFAULT={"conn_type":"aws","login":"<AWS 액세스키>","password":"<AWS 비밀키>"}
- AWS_DEFAULT_REGION=<AWS 기본 리전>

위와같이 환경변수를 구성하여 컨테이너에서 Airflow를 실행하는 방법

 

아래는 DAGRun API를 통해 비동기로 실행되는 DAG 예제 코드이다.

 

import sys

sys.path.append("/opt/airflow")


import pendulum

from airflow import DAG

from airflow.operators.empty import EmptyOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.amazon.aws.operators.ecs import (

    EcsRegisterTaskDefinitionOperator,

    EcsRunTaskOperator,

)


from src.config import SUBNET_MAPPED_2A_PRIVATE, SUBNET_MAPPED_2B_PRIVATE


# Constants

DAG_ID = "ECS-FARGATE-DAG"

AWS_REGION = "ap-northeast-2"

ECS_CLUSTER = "test"

DOCKER_IMAGE = "<Conatiner Image URL>"

AWS_LOG_GROUP = "/ecs/test"


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': pendulum.now(),

    'retries': 0,

}


task_definition_params = {

    "requiresCompatibilities": ["FARGATE"],

    "cpu": "4096",

    "memory": "30720",

    "runtimePlatform": {"operatingSystemFamily": "LINUX", "cpuArchitecture": "X86_64"},

    "networkMode": "awsvpc",

    "executionRoleArn": "<AWSRoleARN>",

}


network_configuration = {

    "awsvpcConfiguration": {

        "subnets": [<사용할 subnets>],

    }

}


def create_ecs_operator(task_id, cpu, memory, trigger_rule=TriggerRule.ALL_SUCCESS):

    return EcsRunTaskOperator(

        task_id=task_id,

        cluster=ECS_CLUSTER,

        task_definition=task_definition.output,

        launch_type="FARGATE",

        network_configuration=network_configuration,

        overrides={

            "containerOverrides": [

                {

                    "name": "test",

                    "environment": [

                        {"name": "param", "value": "{{ dag_run.conf['param1'] }}"},

                    ],

                    "command": ["python", "main.py", "--task", task_id],

                    "cpu": cpu,

                    "memory": memory,

                }

            ]

        },

        trigger_rule=trigger_rule,

    )


with DAG(DAG_ID, schedule=None, max_active_runs=100, default_args=default_args) as dag:

    task_definition = EcsRegisterTaskDefinitionOperator(

        task_id="task_definition",

        family="test",

        container_definitions=[

            {

                "name": "test",

                "image": DOCKER_IMAGE,

                "logConfiguration": {

                    "logDriver": "awslogs",

                    "options": {

                        "awslogs-region": AWS_REGION,

                        "awslogs-group": AWS_LOG_GROUP,

                        "awslogs-stream-prefix": "REPORT",

                        "awslogs-create-group": "true",

                    },

                },

            },

        ],

        **task_definition_params

    )


    task1 = create_ecs_operator("task1", 256, 512)

    task2 = create_ecs_operator("task2", 256, 512)


    # Define dependencies

    task1 >> task2

 

 

- 정의된 task_definition에 각각의 테스크에 추가로 containerOverrides를 정의하여 필요한 용량 프로비저닝 등을 설정한다.


전체댓글0

검색결과는 총 12건 입니다.    글쓰기
1