Airflow에는 Dynamic Task Mapping 기능을 통해 DAG 작성 시 필요한 task 개수를 미리 알 필요 없이
워크플로우가 현재 데이터를 기반으로 런타임에 여러 task를 실행할 수 있습니다. 마치 for 루프문 처럼 말이죠.
Dynamic Task의 기본적인 사용은 위와 같습니다. task의 expand 메소드를 통해 입력 데이터 개수만큼 런타임에서 TaskInstance를 생성하게 됩니다.
이렇게 하면 웹서버 UI에서 확인할 경우 다음을 볼 수 있습니다.
맵핑되었던 task별로 map index가 할당되어있습니다. 이는 임의로 0~n까지 맵핑됩니다.
동시에 실행되는 task 수가 많아지고 이를 모니터링하려면 이 화면에서 각 task를 미리 알아볼 수 있는 식별자가 필요할 것입니다. 이때 map index를 직접 지정할 수 있으면 어떨까요?
Airflow 2.9 버전부터는 이러한 기능을 제공합니다.
동적 맵핑되는 task의 데코레이터의 map_index_template 파라미터에 사용할 map index 템플릿을 지정합니다.
이후 task 코드 내에서 get_current_context() 메소드를 통해 컨텍스트를 가져온 후 지정한 템플릿을 작성합니다.
이렇게하여 map index에 템플릿이 적용된 인덱스를 넣을 수 있습니다.
사용 시 각 task를 파악할 수 있는 값을 지정한다면 task 수가 많아져도 모니터링하기 수월해 질 것입니다.
Gitlab에서는 CI/CD 파이프라인을 지원하여 애플리케이션의 배포를 자동화할 수 있습니다. 또한 Pipeline schedules를 통해 예약된 스케쥴을 통해 배포 파이프라인을 실행하는 기능 또한 지원합니다. 프로젝트 내 메뉴에서 Build > Pipeline schedules 로 이동하여 새 스케쥴을 생성합니다. Interval Pattern은 미리 지정되어있는 스케쥴로 설정하거나 cron 표현식을 이용하여 설정할 수도 있습니다. 배포될 타겟 브랜치를 선택하고 스케쥴 파이프라인에서 사용될 변수 또한 지정할 수 있습니다. 예약 파이프라인은 설정한 Interval에 의해 실행되며, Gitlab UI에서 수동으로 실행할 수 있습니다. 스케쥴에 의해 실행된 파이프라인은 기본적으로 scheduled 태그를 가집니다. 이를 이용하여 CI 스크립트 내에서 해당 파이프라인이 예약된 파이프라인인지 분기처리하여 워크로드에 맞는 파이프라인 로직을 작성할 수 있습니다. scheduled pipeline: stage: build rules: - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH && $CI_PIPELINE_SOURCE == "schedule" image: node:lts-alpine script: - echo "Scheduled Pipeline" 위는 예약된 파이프라인에서만 실행되는 스크립트를 정의한 스크립트 블록입니다. rules 블록에서 커밋 브랜치가 스케쥴에서 설정한 타겟 브랜치이고, 파이프라인의 소스가 schedule인 경우에만 해당 스크립트가 실행되게 됩니다. 이를 통해 스케쥴된 특정 브랜치는 매 커밋마다 배포하지 않고, 예약된 시간에만 배포되도록 설정할 수 있습니다.
안녕하세요, GCP cloud function을 이용해 Big Query Export Notification(빅쿼리 내보내기 알림) 설정을 하는 방법에 대해 스터디 한 내용을 정리해보겠습니다. 참고 문헌: https://ayudante.jp/column/2022-07-22/11-00/ [개요] GA4 속성을 빅쿼리에 연결하여 내보내기를 사용할때, 전날의 데이터가 정확히 어느 시점에 빅쿼리로 내보내지는지는 알 수 없습니다. 빅쿼리 내보내기는 정해진 시간에 도착하지 않으며, 알림 메커니즘 또한 따로 존재하지 않습니다. 이를 위한 사용 가능한 대안 사항에는 크게 2가지가 있습니다. 1. 내보내기가 최소 x 시간 이후에는 도착할꺼라는 가정을 바탕으로 다운스트림 작업을 지연시키기 - 내보내기가 예상보다 일찍 도착하면 작업이 더욱 지연되는 문제가 있습니다. 2. 내보내기가 도착 했는지 정기적으로 확인하기 - 데이터 파이프라인이 복잡해지고, 데이터에 대한 접근이 늦어지는 문제가 있습니다. 따라서 GCP에서 제공하는 컴포넌트들을 활용하여 일련의 과정을 통해 솔루션을 구축할 필요가 있습니다. [솔루션 구축] 아래의 3가지 컴포넌트를 사용하여 빅쿼리 내보내기 알림을 위한 Cloud Event Driven 솔루션을 구현할 수 있습니다. 1. Cloud Logging: GCP 프로젝트에서 발생하는 모든 이벤트에 대한 정보를 구조화된 로그 형식으로 저장 (빅쿼리 데이터 세트 / 테이블 생성, 쿼리 실행, 프로젝트 권한 추가 / 제거 등) 2. Pub/Sub: 확장 가능한 이벤트 수집 메시징 대기열 시스템. 이벤트 데이터가 게시되는 지점(publisher)과 이 이벤트 데이터를 독립적으로 수신하는 링크된 구독자(subscriber)로 구성. 사용자 정의 코드를 통해 구독자는 게시 시 원하는 비즈니스 로직을 통해 이벤트 데이터를 처리 가능 3. Cloud Functions: 클라우드 서버리스 실행 환경에서 모든 커스텀 코드를 실행 (현재 Python/Go/Java/Node.js/PHP/Ruby 지원) 단일 목적으로 코드를 작성하고 이 코드가 실행될 때 트리거를 설정. 엔드포인트에 대한 HTTP 요청, Pub/Sub 주제에 게시된 메시지, Cloud Storage에서 생성/수정/삭제된 파일 객체 등의 트리거 지원중 [구축 절차] 1. gcp log explorer에서 아래의 로그 쿼리 실행 resource.type="bigquery_dataset" protoPayload.authenticationInfo.principalEmail="firebase-measurement@system.gserviceaccount.com" protoPayload.serviceName: "bigquery.googleapis.com" protoPayload.methodName="google.cloud.bigquery.v2.JobService.InsertJob" protoPayload.metadata.tableDataChange.reason="JOB" (gcp log explorer에서는 로그의 키를 활용하여 다양한 정보를 필터링해서 확인 할 수 있습니다.) 2. GA4 일일 내보내기가 BigQuery에 도착할 때 Pub/Sub에 알리는 알림 메커니즘을 설정 - Log Explorer 결과 화면에서 "추가 작업" > "싱크 만들기" 클릭 - 로그 라우팅 싱크 만들기 새 창이 열리고, Sink name & Sink description 작성 후 다음 클릭 - 싱크 서비스 선택 > “Cloud Pub/Sub 주제”, Cloud Pub/Sub 주제 선택 > “주제 만들기” 클릭 - 주제 ID 입력 후 만들기 설정 완료 후 클라우드 프로젝트 내 Pub/Sub 서비스로 이동하면 방금 생성한 새로운 게시자 주제를 확인할 수 있습니다. (프로젝트 간 설정 시 싱크를 사용하여 로그를 다른 cloud 프로젝트로 라우팅하는 경우 필요한 권한을 부여하기 위해 설정이 필요 할 수 있습니다.) 3. 게시자 주제에 게시된 새 메시지를 수신하고, 새 메시지가 게시될 때마다 맞춤 코드를 실행하는 Cloud 함수를 배포 - Cloud Functions로 이동하여 '함수 만들기'를 클릭 - 함수 이름 입력 - 게시자 주제와 동일한 지역 선택 (클라우드 프로젝트 조직 정책에 리소스 위치 제한 조직 정책이 없으면 일반적으로 모든 지역이 작동함) - 트리거 유형 Cloud Pub/Sub으로 설정 - Cloud Pub/Sub 주제를 이전 단계에서 생성한 게시자 주제로 선택 - 코드 설정 화면으로 넘어와서, 런타임 > 파이썬 3.11 선택 (Go / JAVA / Node.js / PHP / Ruby 사용 가능) - 진입점에 함수 이름 입력 (사용자 정의 코드의 함수 이름과 동일해야함) - 함수 내용을 지우고 아래 내용을 입력 후 배포 from google.cloud import pubsub import base64 import json from google.cloud import bigquery from datetime import datetime def ga4_exports_arrived(event, context): # 1) receive export notifications - Pub/Sub message log_entry = json.loads(base64.b64decode(event.get("data")).decode('utf-8')) protoPayload = log_entry.get("protoPayload") destinationTable = protoPayload.get("resourceName") # 2) notify downstream job client = bigquery.Client() rows_to_insert = [ { "notify_timestamp": datetime.utcnow().isoformat(), "datasetId": destinationTable.split("/")[3], "tableId": destinationTable.split("/")[5], "event_timestamp": log_entry.get("timestamp"), # str "log_receive_timestamp": log_entry.get("receiveTimestamp") # str }, ] errors = client.insert_rows_json("[Your BQ table ID to store export notificatins], rows_to_insert) # Make an API request. if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors)) - requirements.txt에 아래 텍스트 추가 google-cloud-pubsub google-cloud-bigquery 코드에서는 내보낸 테이블 이름, 시간 등의 이벤트 로그 메타 데이터가 포함된 Pub/Sub 메시지를 수신한 뒤, 메시지를 처리하고 BigQuery에 수집된 새 내보내기 테이블의 다운 스트림 작업을 알립니다. 해당 코드는 BigQuery에 저장하고 있으며 실제 응용 프로그램에서는 HTTP 후크, 다른 Pub/Sub 주제 등, 이메일이나 Slack 알림, BigQuery 예약 쿼리 등을 포함 할 수 있습니다. [결과] 모든 단계 수행시 BigQuery 내보내기 알림 솔루션이 완전히 배포되어 실행됩니다. BigQuery에 새 내보내기가 도착하면 Pub/Sub는 내보내기 로그 알림을 받은 후 클라우드 함수 코드를 트리거 시킵니다. 맞춤 코드가 이러한 알림을 다른 BigQuery 테이블에 저장하고 있으며, 이는 일일 내보내기가 도착할 때 아래 사진과 같이 표시됩니다. 틀린 부분이 있거나 추가 할 내용이 있다면 알려주시면 감사하겠습니다. 감사합니다.
이전 포스트중 AwsEcsExecutor를 사용하는 Airflow의 DAG및 Task 작성법과 무중단 배포를 적었습니다. 이번에는 Airflow worker를 실행할 ECS 내의 태스크 정의를 설정하는 법에 대해 다룹니다. ECS에서 Fargate를 실행하기 위해선 Task definition detail(태스크 정의)를 설정해야 합니다. 이는 Fargate 노드의 리소스 프로비저닝, 실행할 이미지 등을 설정하기위해 필요한 과정입니다. 위는 테스크 정의의 인프라 요구 사항 항목입니다. 테스크의 인프라를 정의하는데 쓰입니다. 여기서 테스크 크기 설정은 테스크 실행 시에 오버라이드가 가능하므로 따로 설정하지 않아도 됩니다. 테스크 역할은 테스크가 실행되는 동안 부여받을 역할입니다. 작업중에 S3 또는 Lambda 등 권한이 필요한 작업이 있을 경우 따로 액세스 키를 사용하지 않아도 역할을 통해 사용할 수 있습니다. 테스크 실행 역할은 테스크 실행 시에 필요한 역할로 이미지를 가져올 ECR의 인증 권한 등이 필요하며, 이는 AWS에서 자동으로 생성됩니다. 위는 작업 실행 시 띄울 컨테이너들의 정의입니다. 컨테이너 이름과 이미지 URI를 지정하며 컨테이너 관련 여러가지 설정을 할 수 있습니다. 워커로 실행 시에는 Airflow가 설치되어있어야 하므로 기본적으로 Airflow 이미지를 베이스로 사용합니다. https://hub.docker.com/r/apache/airflow 컨테이너의 리소스 할당 제한 또한 Executor 설정에서 오버라이드 가능하므로 따로 설정하지 않습니다. 이외 필요한 환경 변수 및 환경 파일 등을 설정하며 로그 설정의 경우 Remote Logging을 사용할 경우 따로 설정하지 않아도 되며 CloudWatch 로그 또한 사용할 수 있습니다. 필요한 경우 임시 스토리지를 추가로 설정할 수 있습니다. 기본적으로 Fargate 노드 당 20GiB의 기본 스토리지가 제공됩니다. { "taskDefinitionArn": "<태스크 정의 ARN>", "containerDefinitions": [ { "name": "worker", "image": "<이미지 URI>", "cpu": 0, "portMappings": [], "essential": true, "environment": [], "mountPoints": [], "volumesFrom": [], "systemControls": [] } ], "family": "airflow-worker", "executionRoleArn": "<워커 테스크 실행 역할 ARN>", "networkMode": "awsvpc", "revision": 3, "volumes": [], "status": "ACTIVE", "requiresAttributes": [ { "name": "com.amazonaws.ecs.capability.ecr-auth" }, { "name": "ecs.capability.execution-role-ecr-pull" }, { "name": "com.amazonaws.ecs.capability.docker-remote-api.1.18" }, { "name": "ecs.capability.task-eni" } ], "placementConstraints": [], "compatibilities": [ "EC2", "FARGATE" ], "requiresCompatibilities": [ "FARGATE" ], "cpu": "256", "memory": "512", "runtimePlatform": { "cpuArchitecture": "ARM64", "operatingSystemFamily": "LINUX" }, "registeredAt": "2024-08-08T02:03:40.669Z", "registeredBy": "<생성 유저 ARN>", "tags": [] } 테스크 정의는 JSON으로도 작성될 수 있으며 콘솔 UI에서 설정할 수 없는 세부적인 설정이 가능하므로 문서를 참고해서 작성하실 수 있습니다. https://docs.aws.amazon.com/ko_kr/AmazonECS/latest/developerguide/task_definitions.html
Transformer 아키텍처는 현재 LLM 에서 사용하는 de facto의 모델 구조입니다. 처음 Vaswani가 제안했을때에는 Encoder와 Decoder를 함께 사용하는 언어번역 테스크를 위해 고안되었으나, 각 Encoder와 Decoder의 특징에 따라서 다양한 모델로 적용되고 있습니다. 여러분들이 많이 사용하고 계신 GPT의 원류 또한, Vaswani의 Attention is All you needs의 Decoder를 사용한 모델입니다. 그런데 현재 LLM을 통한 생성형 AI가 서비스화가 되면서 다양한 문제를 직면하였습니다. Decoder를 사용하는 모델이 문장을 생성할때에 "Auto-Regression" 방식을 사용한다는 것입니다. 이는 Prompt를 모델에게 제공하면, 모델은 이 Prompt를 받아서 다음 단어를 뽑고하고, (Prompt + 다음 단어)를 다시 모델에 제공하여 그 다음 단어를 뽑는 방식입니다. 이런 방식은 우선 문장을 생성하는 시간이 오래 소요됩니다. 이러한 방법은 서비스 형태에서는 사용자에게 큰 이슈로 적용될수 있습니다. 해당 이슈를 해결하기 위해서, 문장 생성까지의 시간을 줄이기 위해 고안된 방법은 다양한데, 그중 오늘은 KV-Cache라는 방식에 대해서 알아보도록 하겠습니다. 우선 KV-Cache에 대해 설명하기 앞서서, Transformer (Decoder) 구조에 대해서 다시한번 살펴뵤고, 어떻게 적용되는지에 대해 설명해보고자 합니다. Transformer는 아래와 같은 구조를 가지고 있습니다. 왼쪽의 Encoder와 오른쪽의 Decoder로 구성된 Transformer 아키텍쳐 입니다. 포스트의 주제인 KV-Cache를 이해하기 위해서는 Muti-Head Attention 블럭의 내부구조를 이해해 봐야 합니다. Multi-Head Attention은 다음과 같이 구성되어있습니다. 조금 더 설명하기 쉽게 예시를 통해 Prompt가 어떻게 변환 되는지 보겠습니다. ```python import torch from transformers import AutoTokenizer input_text = "Hello, how are you?" # Tokenize the input text tokenizer = AutoTokenizer.from_pretrained('gpt2-medium') x = tokenizer(input_text, return_tensors="pt") x = x['input_ids'] print('Shapf of the input tensor:', x.shape) print('Input tensor:', x) ``` ``` Shape of the input tensor: torch.Size([1, 6]) Input tensor: tensor([[15496, 11, 703, 389, 345, 30]]) ``` 위의 예시는 "Hello, how are you"가 토크나이저를 통해 변환된 결과 입니다. 토크나이저에 학습된 Vocab은 다음과 같습니다. ``` Token: 15496 -> Hello Token: 11 -> , Token: 703 -> how Token: 389 -> are Token: 345 -> you Token: 30 -> ? ``` * 예시를 통해 아키텍쳐의 Dimension은 32로 축소하였습니다. ```python embeddings = torch.nn.Embedding(tokenizer.vocab_size, 32) emb_output = embeddings(x) print(emb_output.shape) ``` ``` torch.Size([1, 6, 32]) ``` Batch Size: 1, Sequence_length: 6, Dimension: 32인 결과를 얻을수 있습니다. * Positional Encoding 또는 Rotary Encoding을 적용하는 부분은 이전 블로그를 참고해주세요. * 예시는 Positional Encoding을 가정한 결과로 가정합니다. - Multi-Head Attention 부분을 살펴보면, 아래와 같은 흐름으로 진행 됩니다. ```python head = 2 head_dim = 32//2 wq = torch.nn.Linear(32, head*head_dim) wk = torch.nn.Linear(32, head*head_dim) wv = torch.nn.Linear(32, head*head_dim) wq_output = wq(emb_output) wk_output = wk(emb_output) wv_output = wv(emb_output) print('Query shape:', wq_output.shape) print('Key shape:', wk_output.shape) print('Value shape:', wv_output.shape) ``` ``` Query shape: torch.Size([1, 6, 32]) Key shape: torch.Size([1, 6, 32]) Value shape: torch.Size([1, 6, 32]) ``` - Multi-Head ```python wq_head_output = wq_output.view(1, 6, head, head_dim) wq_head_output = wq_head_output.transpose(1, 2) wk_head_output = wk_output.view(1, 6, head, head_dim) wk_head_output = wk_head_output.transpose(1, 2) wv_head_output = wv_output.view(1, 6, head, head_dim) wv_head_output = wv_head_output.transpose(1, 2) print('Query head shape:', wq_head_output.shape) print('Key head shape:', wk_head_output.shape) print('Value head shape:', wv_head_output.shape) ``` ``` Query head shape: torch.Size([1, 2, 6, 16]) Key head shape: torch.Size([1, 2, 6, 16]) Value head shape: torch.Size([1, 2, 6, 16]) ``` Batch Size: 1, Heads: 2, Sequence Length: 6, Dimension:16 인 텐서를 Q(query), K(key), V(value) 별로 얻을수 있습니다. 다음은 Scaled-Dot Product Attention을 적용시켜 보겠습니다. ```python score = torch.matmul(wq_head_output, wk_head_output.transpose(-2, -1)) score = score / (32**0.5) # scale print('Score shape:', score.shape) attention = torch.nn.Softmax(dim=-1)(score) print('Attention shape:', attention.shape) output = torch.matmul(attention, wv_head_output) print('Output shape:', output.shape) ``` ``` Score shape: torch.Size([1, 2, 6, 6]) Attention shape: torch.Size([1, 2, 6, 6]) Output shape: torch.Size([1, 2, 6, 16]) ``` 마지막으로 Concat과 Linear를 통하면 Multi-Head Attention의 내부 구조가 됩니다. ```python wo = torch.nn.Linear(32, 32) output = output.transpose(1, 2).contiguous().view(1, 6, 32) mha_output = wo(output) print('MHA output shape:', mha_output.shape) ``` ``` MHA output shape: torch.Size([1, 6, 32]) ``` Multi-Head Attention 블럭 이후에는 Feed-Forward Layer를 통해 아키텍처의 블럭결과를 받을수 있지만, KV-Cache를 이해하기 위해서는 FFN에 대해서는 생략합니다. KV-Cache KV-Cache는 이름 그대로 Attention에서 K와 V에 대한 값을 Cache로 저장하는 방식을 얘기합니다. 이런 방식이 어떻게 문장 생성시 시간을 줄일수 있는지 한번 확인해보겠습니다. 문장 생성을 요청 할 시, Prompt 또는 Text를 모델에 제공하면, 모델은 다음과 같이 반복적인 작업을 수행하며, 다음 문장을 추출합니다. ``` Input: Large language models are recent advances in deep learning Step 0 input: Large language models are recent advances in deep learning Step 1 input: Large language models are recent advances in deep learning, Step 2 input: Large language models are recent advances in deep learning, which Step 3 input: Large language models are recent advances in deep learning, which uses Step 4 input: Large language models are recent advances in deep learning, which uses deep Step 5 input: Large language models are recent advances in deep learning, which uses deep neural Step 6 input: Large language models are recent advances in deep learning, which uses deep neural networks Step 7 input: Large language models are recent advances in deep learning, which uses deep neural networks to Step 8 input: Large language models are recent advances in deep learning, which uses deep neural networks to learn Step 9 input: Large language models are recent advances in deep learning, which uses deep neural networks to learn to ``` 예를 들면, Step 0은 초기 Input을 받아서 가장 확률이 높은 토큰을 뽑고 해당 토큰은 , (쉼표)가 되었습니다. Step 1은 Input과 , (쉼표)를 받아서 다음 단어인 which를 추출하였습니다. 이러한 방식으로 문장을 생성하는데, 위의 예시를 보면, 동일한 문장을 반복적으로 사용하는 것을 알수 있습니다. 이러한 식으로 문장을 생성할때 모델은 엄청난 FLOPS (Floating point operations per second)가 필요합니다. Attention Layer에서의 FLOPS의 계산은 아래와 같은 수식을 가지게 됩니다. ``` 2 * batch_size * N_Layers * N_head * head_dim * (sequence_length**2) ``` 여기서 가장 중요하게 보아야 할 점은, Attention Score를 구할때, Scaled Dot Attention의 Q와 K의 Dot Product로 인해서 (Sequence_length ** 2) 처럼 FLOPS가 Quadratic으로 증가한다는 것입니다. 많이 사용하는 모델들의 파라미터는 아래와 같습니다. 출처: https://medium.com/@plienhar/llm-inference-series-4-kv-caching-a-deeper-look-4ba9a77746c8 Attention만을 보았을때도 엄청난 FLOPS가 필요로 하는것을 볼수 있습니다. FLOPS의 증가는 연산량의 증가로, 결국에는 긴 문장 (Context)를 생성하는데에는 연산량이 Quadratic (제곱)으로 증가하는 것을 볼 수 있습니다. 그래서, 문장 생성시, FLOPS을 최소화 하기 위해 KV-Cache가 고안되었습니다. KV-Cache의 큰 틀은, Inference시에, 0-step (Initial Step)이후에, <Inital + step-0-output>을 합친 문장을 Inference하는 1-Step부터 사용됩니다. 예를 들어, ``` 0-Step-Input: Large language models are recent advances in deep learning 0-Step-Output: , 1-Step-Input: Large language models are recent advances in deep learning, ``` 1-Step-Input이 모델에 들어가서 1-Step-Output을 추출할때, KV-Cache가 없을시에는, 1-Step-Input이 그대로 모델의 인풋으로 사용됩니다. 그런데, 0-Step-Input과 1-Step-Input에서 동일한 단어들이 발생하는데, 바로 ``` Large language models are recent advances in deep learning ``` 이 문장이 반복적으로 사용된다는 점 입니다. KV-Cache는 이렇게 반복적인 연산의 결과를 Cache로 저장하는 방식을 채택하여, 불필요한 연산 (FLOPS)를 줄이는 최적화 방법입니다. KV-Cache의 최적화 방법을 조금더 자세히 봐보면 다음과 같습니다. 0-Step-Input을 통해 생성된 인풋은 다음과 같습니다. ``` x = tokenizer.encode('Large language models are recent advances in deep learning', return_tensors='pt') print(x.shape) ``` ``` torch.Size([1, 9]) ``` 0-Step의 Attention연산은 기존의 Vanilla Transformer와 동일합니다. 하지만 KV-Cache를 사용하기 위해 Attention Layer 내부에 K_Cache와 V_Cache에 대한 홀더를 가지고 있도록 합니다. ``` cache_k = torch.zeros((1, 50, 2, 16)) # b, max_seq_len, head, head_dim cache_v = torch.zeros((1, 50, 2, 16)) # b, max_seq_len, head, head_dim ``` - 모델 학습시에도 동일한 레이어를 사용하기에, KV-Cache를 사용하여 Attention Layer를 다시 구성해야 합니다 구현 코드는 아래에서 찾아볼수 있습니다. https://github.com/meta-llama/llama/blob/main/llama/model.py https://github.com/meta-llama/llama/blob/main/llama/generation.py KV-Cache의 구현에서는 0-Step에 대해서, Attention Layer는 cache_k 와 cache_v에 "transpose" 하기 이전의 wk_head_output 와 wv_head_output을 저장합니다. - KV-Cache에 대한 대략적인 내용으로 Llama의 구현코드에서 많은 부분을 생략하였습니다. ```python wq_head_output = wq_output.view(1, 6, head, head_dim) wk_head_output = wk_output.view(1, 6, head, head_dim) cache_k = wq_head_output wv_head_output = wv_output.view(1, 6, head, head_dim) cache_v = wv_head_output ``` Attention 에 사용하는 K와 V는, cache_k와 cache_v에서 파싱 하도록 합니다. ```python bsz, seqlen, _ = x.shape keys = cache_k[:bsz, : start_pos + seqlen] values = cache_v[:bsz, : start_pos + seqlen] ``` 여기서 중요한점은 start_pos라는 파라미터로, 이는 어텐션 레이어의 argument로 받도록 합니다. ```python def forward( self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor], ): ... return output ``` KV-Cache를 사용한 Attention Block은 다음과 같은 그림을 가지게 됩니다. 출처: https://medium.com/@plienhar/llm-inference-series-3-kv-caching-unveiled-048152e461c8 0-Step은 이전 정의가 없기때문에 K와 V는 그대로 사용하도록 합니다. 이때 모델내부에 있는 Multi-Head Attention은 해당 K와 V를 각 cache에 가지고 있게 됩니다. 여기서 중요한점은 1-Step의 Input으로 (0-step-input + 0-step-output)을 모델에 사용하지 않고, 0-step-output만을 모델에 사용하는 것이 KV-Cache의 핵심입니다. 하나의 단어를 통해서 Q, K, V의 인풋으로는 이전 Vanilla Transformer 에서는 Embedding Layer의 아웃풋인 [Batch_size, Sequence_length , Dimension]이었던 [1, 6, 32]가 [1, 1, 32]로 축소되게 됩니다. 이로 인해 FLOPS의 연산이 Quadratic이 아닌 Linear로 증가하게 됩니다. ``` 2 * batch_size * N_Layers * N_head * head_dim * sequence_length * 1 ``` 이로 인해서, 문장 생성시 FLOPS의 최적화로 Vanilla Transformer보다 빠른 Inference를 나타낼 수 있습니다. 하지만, 이로 인한 또 다른 문제가 발생하는데, 저장된 Cache의 크기는 문장의 길이에 따라 엄청난 양의 데이터를 가지고 있게 됩니다. LLM을 사용하는데 있어서 GPU연산을 통해 가속화를 하지만, GPU의 용량은 정해져 있어, 결국에는 GPU 메모리에 Cache를 담을수 없게 됩니다. 이 문제를 해결하기 위해 다양한 방법이 제시되었는데, Llama에서 사용한 방법은 GQA (Grouped Query Attention)을 사용하였습니다. (http://arxiv.org/abs/2305.13245) Llama의 모델의 SelfAttention 클래스에 있는 (self.n_kv_heads) Argument는 이 GQA를 적용한 부분입니다. 추가적인 방법은 Quantization 을 통해서 모델의 파라미터의 용량을 줄이는 방법도 많이 사용됩니다. 이상으로 Llama 등 LLM의 서비스화를 위해 연구된 KV-Cache에 대해 소개드렸습니다. 감사합니다.
Airflow에는 Dynamic Task Mapping 기능을 통해 DAG 작성 시 필요한 task 개수를 미리 알 필요 없이 워크플로우가 현재 데이터를 기반으로 런타임에 여러 task를 실행할 수 있습니다. 마치 for 루프문 처럼 말이죠. Dynamic Task의 기본적인 사용은 위와 같습니다. task의 expand 메소드를 통해 입력 데이터 개수만큼 런타임에서 TaskInstance를 생성하게 됩니다. 이렇게 하면 웹서버 UI에서 확인할 경우 다음을 볼 수 있습니다. 맵핑되었던 task별로 map index가 할당되어있습니다. 이는 임의로 0~n까지 맵핑됩니다. 동시에 실행되는 task 수가 많아지고 이를 모니터링하려면 이 화면에서 각 task를 미리 알아볼 수 있는 식별자가 필요할 것입니다. 이때 map index를 직접 지정할 수 있으면 어떨까요? Airflow 2.9 버전부터는 이러한 기능을 제공합니다. 동적 맵핑되는 task의 데코레이터의 map_index_template 파라미터에 사용할 map index 템플릿을 지정합니다. 이후 task 코드 내에서 get_current_context() 메소드를 통해 컨텍스트를 가져온 후 지정한 템플릿을 작성합니다. 이렇게하여 map index에 템플릿이 적용된 인덱스를 넣을 수 있습니다. 사용 시 각 task를 파악할 수 있는 값을 지정한다면 task 수가 많아져도 모니터링하기 수월해 질 것입니다.
2024년 8월 15일 Airflow의 2.10 버전이 릴리즈되었습니다. 중요한 변경 중 하나는 Hybrid Executor를 구성할 수 있게 된 것입니다. Airflow는 provider 모듈 포함 많은 Executor를 지원합니다. 각 Executor는 각각 컴퓨팅 효율과 실행시간, 격리 등에 장단점이 있으며 기존의 Airflow에서는 이중 하나의 Executor만을 사용할 수 있었습니다. Hybrid Executor를 구성할 수 있음에 따라 DAG 또는 Task는 각각의 워크플로우에 적절한 Executor를 통해 실행될 수 있게 되었습니다. 이번 글에서는 단일 Airflow Scheduler 내에서 DAG또는 Task에 따라 여러 Executor를 사용하는 새로운 기능을 소개합니다. 우선 Airflow 구성의 core 항목에서 사용할 executor들을 콤마로 구분하여 정의합니다. environment: AIRFLOW__CORE__PARALLELISM: 3 AIRFLOW__CORE__EXECUTOR: LocalExecutor,plugins.fargate_executor.AwsFargateExecutor 위 예시에서는 LocalExecutor, AwsFargateExecutor 두 Executor를 사용했습니다. 이중 가장 첫 번째의 Executor가 기본적으로 사용되며 DAG또는 Task 내에 실행될 Executor를 명시하지 않는다면 맨 첫 번째의 Executor로 실행되게 됩니다. @task(executor="plugins.fargate_executor.AwsFargateExecutor") def task_func(**kwargs): ... 데코레이터 Task의 경우 task 데코레이터의 파라미터로 Executor의 이름을 넘겨줍니다. task_op=PythonOperator( task_id="t1", python_callable=task_func, executor="plugins.fargate_executor.AwsFargateExecutor", ) PythonOperator의 경우 생성자 인자로 executor에 실행할 Executor의 이름을 넘겨줍니다. with DAG( dag_id="dag1", start_date=pendulum.datetime(2024, 1, 1, 0, 0, 0, tz="Asia/Seoul"), schedule_interval=None, default_args={ "executor": "plugins.fargate_executor.AwsFargateExecutor", }, ) as dag: DAG default_args의 executor에 Executor이름을 넣어 DAG내 모든 Task들에 적용할 수 있습니다. environment: AIRFLOW__CORE__PARALLELISM: 3 AIRFLOW__CORE__EXECUTOR: LocalExecutor,plugins.fargate_executor.AwsFargateExecutor:FargateExecutor 콜론(:) 뒤에 별칭을 넣어 사용할 수 있습니다. 이외 변경점들은 Airflow 공식 문서의 Release Note에서 확인할 수 있습니다. https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-10-0-2024-08-15
이번 포스트는 Chat-GPT와 같은 Instructive-LLM을 사용 할 때, 더욱 정확한 답변을 받을수 있는 프롬프트 방법을 소개하고자 합니다. 이 포스트에서 소개하는 내용은 "Principled Instructions Are All You Need for Questioning LLaMA-1/2, GPT-3.5/4" (http://arxiv.org/abs/2312.16171) 논문의 내용을 바탕으로 작성되었습니다. LLM의 답변은 프롬프트의 품질에 따라 상당히 큰 차이를 보여줍니다. 다른 논문에서의 예시를 보면, 질문: Was Mother Teresa born in an even month? 답변: No, Mother Teresa was born on August 26, 1910. August is the 8th month of the year. 위와 같이 "짝수 월" 이라는 단어에 대해서는 "August -> 8월 -> 8 -> 짝수" 라는 논리를 완벽하게 이해하지 못한 경우입니다. 보다 좋은 답변을 받기 위해서 프롬프트를 5가지의 카테고리로 분류하였습니다. 1. 구조와 명확성 2. 특징화 및 정보성 3. 사용자와의 상호작용 4. 컨텐츠와 언어 스타일 5. 복잡한 작업 본문에서는 26개의 원칙을 소개하였지만, 이 포스트에서는 간단하게 적용할수 있는 몇가지만 소개하고자 합니다. Principle #1 - 답변의 품질을 올리기 위해 "부탁해", "고마워"와 같은 정중한 표현을 사용할 필요가 없습니다. Principle #2, 5 - 답변을 받는 사람이 누구인지를 포함합니다. (답변을 받는 사람은 이 분야의 전문가이다. 또는 11살인 나에게 설명해.) Principle #3 - 복잡한 작업에 대해서는 간단한 프롬프트를 통해 순서를 정해줍니다. Principle #4 - 부정적인 표현을 가급적 사용하지 않습니다. (답변을 영어로 하지마 -> 답변은 한글로 작성해) Principle #6 - 좋은 답변에 팁을 줄께 라는 문구를 추가합니다 (I'm going to tip $xxx for a better solution!") Principle #7 - 예시를 주고 이를 모방하게끔 하는 Few-shot 프롬프팅을 사용합니다. Principle #8, 17 - 프롬프트를 작성할시 포멧을 정해줍니다 ( ###방법###, ###예시###, ###질문### 등, 구분기호와 함께 구조화 합니다) Principle #9, 16 - 프롬프트에 LLM의 역활을 정해주고, 어떤 작업을 해야하는지 명시해줍니다. (너는 마케팅 전문가야, 너가 해야하는 작업은 ~이다) Principle #10 - 잘못된 답변을 할 시, 처벌을 받을수 있다는 문구를 추가해 줍니다. (You will be penalized) Principle #12 - 차근히 생각해보라는 문구를 추가해 줍니다. (Think step by step) 아래는 해당 논문에서 실험한 결과입니다. 26개의 Principles에 대한 성능 향상은 아래 표에서 확인할수 있습니다. Chat-GPT를 사용하신다면, 녹색 바인 (Avg Large Scale)을 주로 보시면 됩니다. 가장 높은 성능을 보이는 Principle #14는 Few-shot 프롬프트와 비슷하게 LLM이 정확한 답변을 하기 위해 모델과 상호작용을 하는 프롬프트 입니다. 예를 들면, LLM이 답변을 위한 정확한 정보가 없다면, LLM은 사용자에게 필요한 정보를 위해 질문을 하고 사용자가 답변을 하여 정보를 수집 후 최종 답변을 내는 방식입니다. 그 외에 2번과 5번, 26번이 높은 수준을 보여주는데, 26번은 사용자가 LLM에게 글을 쓰게 하였을때, 샘플 글을 주고 "제공한 문장과 같은 언어를 사용해" 라는 문구를 추가하는 방법입니다. "Principled Instructions Are All You Need for Questioning LLaMA-1/2, GPT-3.5/4"논문 외에 직접적으로 사용할 수 있는 추가적인 방법은 "Rephrase and Respond"(RaR) 방법으로, 해당 논문은 (http://arxiv.org/abs/2311.04205) 에서 확인해 보실수 있습니다. 포스트 초기에 예시로 등장했던 마더 테레사의 예시를 Rephrase and Respond를 통해 정확한 답변을 받을수 있습니다. RaR은 사용자의 질문을 LLM이 수정하여 사용자의 질문 + 수정된 질문을 통해 답변을 내는 형식입니다. RaR의 사용은 아래의 프롬프트 형식을 통해 간단하게 사용할 수 있습니다. ``` <사용자의 질문> Rephrase and expand the question, and respond. ``` 아래는 다양한 10개의 작업에 대한 성능 향상 결과 입니다. Chat-Gpt와 같은 LLM모델을 사용하거나, 다른 서비스에 포함된 LLM을 사용할시에 이러한 방법을 사용하여 더 좋은 품질의 결과를 얻어 다양한 방면에 도움이 되셨으면 좋겠습니다. 감사합니다.
Airflow를 도커 컨테이너로 실행할 때 무중단 배포방식에 대해서 소개합니다. 현재 Airflow 운영 환경은 webserver와 scheduler를 각각 컨테이너로 EC2에서 실행하며 EC2에는 nginx가 실행되어 Airflow 웹서버로 proxy pass합니다. AwsEcsExecutor를 사용하여 Fargate 위에서 worker가 실행됩니다. 배포는 Gitlab CI로 자동 배포합니다. Gitlab CI를 통한 배포에서는 아래 두 단계를 거칩니다. 1. dag, 소스코드, 라이브러리를 포함하여 이미지를 빌드한 후 ECR에 푸시하는 단계 2. EC2에 SSH로 접속하여 ECR에서 Airflow 이미지를 받고 무중단으로 컨테이너를 실행하는 단계 1번 단계에서는 프로젝트의 최신 커밋에서 미리 정의된 Dockerfile을 통해 이미지 빌드를 진행합니다. 이미지 빌드 과정에서 환경변수를 통해 Airflow의 설정과 종속성 패키지 설치, DAG 및 소스 코드를 가져옵니다. 2단계에서는 EC2에 SSH 접속하여 미리 정의된 쉘스크립트를 실행하여 무중단 배포를 진행합니다. 아래는 블루그린 배포를 실행하는 쉘스크립트 소스입니다. #!/bin/bash 배포를 위해 blue, green 각각의 docker-compose 파일을 미리 정의해둡니다. (ex: docker-compose.blue.yml, docker-compose.green.yml) 스크립트 실행 과정입니다. - 이미지를 가져오기 전에 system prune을 통해 리소스를 정리합니다. - ECR에 인증 후(사전에 EC2에 IAM역할 설정) 이미지를 가져옵니다. - 현재 실행중인 그룹이 blue이면 green 그룹을, green이면 blue 그룹을 실행합니다. - 웹서버의 포트를 blue, green에 따라 다르게 정의합니다(blue: 8080, green: 8081) - 새 그룹의 컨테이너를 실행합니다. - 새로 실행한 컨테이너 그룹이 정상 작동할때까지 health check를 진행합니다(scheduler 컨테이너의 상태가 healthy일 때 까지) - 그룹 컨테이너 정상 작동이 확인되었다면 nginx -s reload 명령을 통해 웹서버 포트 변경을 반영합니다. - 기존 컨테이너를 종료합니다. 이를 통해 Airflow의 scheduler와 webserver를 중단 없이 배포할 수 있습니다. 위 방식은 한 EC2 인스턴스 내에서 nginx로 proxy pass를 조정하였지만 로드밸런서와 여러 대의 EC2 인스턴스로도 진행할 수 있습니다.
AWS에선 논리적으로 격리된 가상 네트워크인 가상 프라이빗 클라우드(VPC)를 정의할 수 있습니다. 또한 VPC의 리소스가 해당 VPC 외부의 리소스에 연결되도록 허용할 수 있습니다. 예를 들어, 인터넷 게이트웨이를 VPC에 추가하여 인터넷에 액세스할 수 있도록 하거나 VPN 연결을 추가하여 온프레미스 네트워크에 액세스할 수 있도록 합니다. 본 글에서는 AWS PrivateLink를 사용하여 VPC의 리소스가 프라이빗 IP 주소를 사용하여 AWS 서비스에 연결되도록(마치 해당 서비스가 VPC에 직접 호스팅된 것 처럼) 허용하는 방법을 소개합니다. AWS PrivateLink를 사용하게된 계기는 일정 주기마다 실행되어야하는 워크플로우를 Airflow와 ECSExecutor로 구현하여 Fargate 위에서 실행하고 있습니다. 어느날 이렇게 구축된 인프라 비용이 계산보다 높게 과금된 것을 확인했습니다. 발생 원인은 NAT 게이트웨이의 데이터 처리량이 4TB 이상으로, 1분마다 한 번 실행되는 DAG를 단지 일주일동안 실행했을 뿐이었습니다. 해당 지표를 확인 후 Airflow에서 NAT 게이트웨이에 트래픽이 생기는 경우를 추려보았습니다. 1. Airflow 메타 데이터베이스 통신 2. S3에 로그 업로드 (Airflow remote logging으로 S3 사용중) 3. DAG 테스크 내에서의 데이터 통신 4. Fargate의 이미지 Pull 1번의 경우 메타 데이터베이스는 내부망에서만 접근하도록 되어있어서 NAT 게이트웨이를 타지 않습니다. 2번의 경우 리모트 로깅 없이 실행해봤으나 트래픽 발생량은 동일했습니다. 3번의 경우 데이터베이스에 접근하는것 말고는 별다른 로직이 없었습니다. 4번의 경우 NAT 게이트웨이 지표상에서 분당 500MB 이상의 아웃바운드 트래픽이 발생한 것과, Airflow worker의 이미지가 약 500MB 이상인 점을 볼때 해당 경우일 가능성이 높았습니다. 이미지 저장소는 ECR의 Private Repository를 사용하고 있었으며 VPC Interface Endpoint를 등록하지 않아 프라이빗 서브넷에서 실행한 Fargate가 매번 NAT게이트웨이를 통해 이미지를 Pull 받았다고 판단했습니다. 이에 Fargate가 실행되는 VPC 내에 프라이빗 서브넷과 ECR 서비스를 내부망으로 통신시키기 위한 VPC 엔드포인트를 생성했습니다. 이후 NAT 게이트웨이의 트래픽량은 현저히 줄었으며 이는 ECR 연결에 사용하는 VPC Endpoint 트래픽으로 대체되었습니다.
현재 수 많은 LLM 어플리케이션이 개발되고 있습니다. 플러스제로 또한 LLM을 데이터 분석, 마케팅, 챗봇 등에 활용하고 있는데요 어플리케이션을 개발하는데 있어서 보안이 중요해지고 있습니다. OWASP Top 10 for LLM Applications(Version 1.1)를 참고해보면 아래와 같은 총 10가지의 보안 이슈에 대해 이야기하고 있습니다. 이 게시물을 통해 해당 이슈들에 대해 간단히 알아보도록 하겠습니다. LLM01: Prompt Injection -> 이는 우리가 흔히 알고있는 SQL Injection과 닮아있습니다. 보통 어플리케이션에서 우리는 사용자의 입력을 받게 되는데 이 과정에서 기존 필터링을 무시하거나 우회하여 LLM을 조작하는 행위입니다. 따라서 기존 제작된 프롬프트에 영향을 주지 않도록 설계해야합니다. Risk: 데이터 유출, 미인가 접속, 의사 결정에 영향 LLM02: Insecure Output Handling -> 유효성을 검사하지 않고 특정 어플리케이션에서 외부 입력을 처리할 때 발생하는 문제입니다. 가령 사용자의 입력으로 콘텐츠를 생성하는 어플리케이션이라고 했을 때 유해하거나 부적절한 콘텐츠를 생성할 수 있습니다. 혹은 사용자가 입력한 내용을 토대로 HTML을 만들어서 화면에 보여주는 경우 HTML 인젝션을 당해 개인정보 유출등의 사고가 발생할 수도 있습니다. Risk: 데이터 유출, 특권 권한 상승, 원격 코드 실행 등 LLM03: Training Data Poisoning -> 사전 훈련 데이터등을 조작, 손상시켜 취약성을 유발하는 위협입니다. Risk: 모델 효율 저하, 모델 행동 패턴 변화 등 LLM04: Model Denial of Service -> LLM에서 많은 리소스를 사용하도록 유도하여 성능을 저하시키거나 높은 비용을 발생시킵니다. Risk: 성능 저하, 모델 효율성 저하, 자원 사용 증가 등 LLM05: Supply Chain Vulnerabilities -> 학습 과정이나 모델 배포 과정에서 악성 소프트웨어를 포함하여 모델 편향을 유발하고 잘못된 정보를 제공하도록 하는 공격입니다. Risk: 훈련 데이터 및 모델 무결성 침해, 보안 결점, 인프라 장애 등 LLM06: Sensitive Information Disclosure -> 실수로 기밀 데이터를 응답에 공개해 무단 데이터 엑세스, 개인 정보 위반, 보안 침해로 이어집니다. Risk: 개인정보 유출, 개인정보 보호 규정 위반 등 LLM07: Insecure Plugin Design -> LLM의 플러그인이나 확장 기능이 안전하게 설계되지 않았을 때 발생하는 문제입니다. Risk: 데이터 유출, 시스템 손상 등 LLM08: Excessive Agency -> 모델의 자율성이지나치게 커질 때 생기는 문제로 모델이 독립적으로 너무 많은 결정을 내리거나 실행할 수 있을 때 발생하는 문제입니다. Risk: 남용, 잘못된 결정, 개인정보 침해 등 LLM09: Overreliance -> 지나치게 LLM에 의존하는 시스템이나 사람은 부정확하거나 부적절한 콘텐츠를 전달받을 수 있습니다. LLM의 세 가지 문제(할루시네이션, 지식단절(최신 정보), 도메인지식 부재)와도 연관된 이슈로 RAG등을 구성하여 사용하여야 합니다. Risk: 잘못된 의사결정, 법적 문제 등 LLM10: Model Theft -> 독점 LLM 모델에 대한 무단 액세스 문제입니다. Risk: 경제적 손실, 경쟁 우위 훼손, 민간 정보에 대한 접근 등 위 문제들은 어플리케이션을 개발할 때 항상 염두해야 하며 프롬프트 혹은 시스템을 통해 예방할 수 있습니다. OWASP에서 지속적으로 버전이 오를때 마다 업데이트 하고 있기 때문에 수시로 확인하는 것을 추천합니다.
Graph RAG는 RAG [ Retrieval Augmented Generation]의 방법입니다. Graph RAG를 설명하기 전에 RAG가 무엇인지 간단하게 설명을 드리자면, 최근 Chat GPT와 같이 대규모 언어 모델을 통한 서비스가 개발되고 있는데, 이 모델들의 단점은 학습되어 있는 정보를 바탕으로 결과를 나타낸다는 것입니다. 한 예로, 최근 발표된 gpt-4o의 모델은 2023년 10월 까지의 데이터를 사용하였다는 의미입니다. 최근에는 Chat GPT에 인터넷에서의 정보를 가져오는 기능이 포함되어 2023년 10월 이후의 데이터에 대한 정확도를 어느정도 확보 하였지만, 이 기능이 있기전 까지는 정확한 정보를 가져오는것이 불가능했습니다. 그렇기 때문에 각 특정분야에서는 대규모 언어모델을 자신들의 데이터로 학습하여 데이터에 대한 정보를 주입하는 방법을 사용했습니다. 그러나 이런 Fine-Tunning 방식은 데이터를 준비하고, 데이터의 품질 검사 및 학습에 소요되는 시간, 비용을 생각하면 소규모 조직에서는 득보다는 실이 많은 상황이 많습니다. 이런 방식을 최소화 하기 위해서, RAG라는 방법론이 2021년 Facebook AI를 통해 연구되었습니다. https://arxiv.org/pdf/2005.11401 해당 방법은 다양한 문서들에 대해서 검색 및 색인을 할수 있도록 하며, 해당 문서는 대규모 언어모델과도 연동이 되어야 하는 방법입니다. RAG와 관련되어서 더 자세한 글은 여기에서 더 확인해보실수 있습니다. 그렇다면 Graph RAG는 무엇인가라면, 기존의 RAG는 문서의 Embedding Vector를 통해 Query와의 유사성을 연산하였다면, Graph RAG는 Knowledge Graph를 추가적으로 이용한다는 것입니다. Knowledge Graph (지식 그래프)는 " (노드) - [관계] -> (노드) " 의 형태를 가지는 그래프입니다. 한 예시로, "플러스제로는 서울의 강남에 위치한 데이터 마케팅 회사입니다" 라는 문장에서 (플러스제로) - [위치해 있다] - (강남) (플러스제로) - [일을 한다] - (데이터 마케팅) (플러스제로) - [is] - (회사) (강남) - [Entity] - (위치) 등등 다양한 정보를 그래프로 나타낼수 있습니다 소괄호 ( ) 안에 있는 단어는 명사 같은 노드를, 중괄호[ ] 는 노드끼리의 관계를 나타냅니다. [이미지 출처 : Neo4j] 다양한 문서들에 대해서 이러한 Knowledge Graph를 만들게 되면 아래와 같이 데이터들의 관계를 나타낼수 있습니다. 해당 그래프에는 문서의 노드와 관계 뿐만 아니라 문서 자체도 포함되어 있습니다. 이렇게 데이터화가 되었다면, Graph RAG는 Embedding Vector의 유사성 뿐만 아니라, Query에서 중요한 단어에 대해서 Graph Query를 통해 연관된 노드들과 관계를 찾아서 해당 정보를 가져옵니다. 1. Graph Query를 통해 나온 노드들의 연관관계와 2. Query의 embedding vector와 유사도가 높은 문서의 정보 를 바탕으로 대규모 언어모델은 사용자가 질의한 내용에 대해서 답변하도록 합니다. Graph RAG방식은 문서와 질의의 유사성과 질의에 포함되어 있는 중요 단어 및 관계에 대한 정보를 더 잘 유추할수 있기에 기존의 Vector RAG보다 성능이 좋다는 연구가 있습니다. Graph RAG에 대해 더 알고 싶으시다면, 2024년에 마이크로소프트에서 연구한 블로그를 한번 보시는것도 도움이 될수 있습니다. 감사합니다.
지난 글에선 Airflow의 Operator를 통해 Fargate 태스크를 실행했었습니다. 이번 글에서는 AwsEcsExecutor를 사용하여 Fargate에서 airflow worker를 통해 task를 실행시켜보겠습니다. [Airflow ECS Executor Docs] https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/executors/ecs-executor.html 1. ECS Executor를 사용하기 위해선 우선 worker 컨테이너가 실행될 AWS ECS Cluster를 생성합니다. Amazon EC2 인스턴스 인프라는 사용하지 않으므로 제외합니다. 2. Airflow worker의 기본 태스크 정의를 생성합니다. 태스크 정의 생성 시 이미지에는 airflow가 설치되어있어야 하고, DAG및 task 소스를 포함해야 합니다. Dockerfile 예제 3. Airflow 코어 컨테이너(scheduler, webserver)를 정의합니다. 에어플로우 설정에서 AwsEcsExecutor 관련 설정을 추가해줍니다. 4. DAG를 정의합니다. 예제 DAG exec_config를 통해 태스크 정의를 오버라이드 하여 태스크별로 용량 프로비저닝이 가능합니다.
Airflow의 확장 Provider인 apache-airflow-providers-amazon의 ECSOperator를 사용하여 Airflow DAG내 Task들을 컨테이너로써 실행시킬 수 있다. Airflow KubernetesExecutor를 사용할때와 동일하게 이미지로 task 컨테이너를 띄워 Pod로 돌리듯이 ECS에 Airflow 테스크를 정의하여 실행시키면 된다. 이러한 방법으로 얻는 이점은 1. 테스크를 Fargate로 실행시키므로써 task를 실행시킬 컨테이너를 띄울 노드 인스턴스를 항시 켜놓지 않아도 작업이 필요한 순간에만 컴퓨팅을 하기 때문에 비용을 절감할 수 있다. 2. Fargate 테스크가 Airflow task마다 각각 실행되어 독립된 환경에서의 task실행을 보장할 수 있다. 또한 각각의 task마다 필요한만큼 따로 용량 프로비저닝도 가능하여 유연한 작업 실행 환경을 구성할 수도 있다. 우선 테스크 컨테이너를 실행할 ECS 클러스터를 생성한다. 클러스터에는 기본적으로 Fargate, Fargate SPOT 용량공급자가 사용 가능하다. 필요시 EC2 인스턴스, 또는 외부 인스턴스 용량 공급자도 같이 선택할 수 있다. ECS에서 말하는 용량 공급자란 필요한 서비스 또는 테스크 컨테이너를 띄워줄 수 있는 인프라를 말한다. 클러스터를 생성했다면 Airflow를 구성한다. Airflow를 실행하는 방법은 여러가지가 있으나 해당 글에서는 다루지 않음 Airflow에 AWS 커넥션을 구성해야 한다. 이 또한 여러 방법이 있지만 Airflow conn을 구성하지 않고 기본 커넥션으로 airflow.cfg에 구성하는 방법을 사용함 - AIRFLOW_CONN_AWS_DEFAULT={"conn_type":"aws","login":"<AWS 액세스키>","password":"<AWS 비밀키>"} - AWS_DEFAULT_REGION=<AWS 기본 리전> 위와같이 환경변수를 구성하여 컨테이너에서 Airflow를 실행하는 방법 아래는 DAGRun API를 통해 비동기로 실행되는 DAG 예제 코드이다. import sys sys.path.append("/opt/airflow") import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.utils.trigger_rule import TriggerRule from airflow.providers.amazon.aws.operators.ecs import ( EcsRegisterTaskDefinitionOperator, EcsRunTaskOperator, ) from src.config import SUBNET_MAPPED_2A_PRIVATE, SUBNET_MAPPED_2B_PRIVATE # Constants DAG_ID = "ECS-FARGATE-DAG" AWS_REGION = "ap-northeast-2" ECS_CLUSTER = "test" DOCKER_IMAGE = "<Conatiner Image URL>" AWS_LOG_GROUP = "/ecs/test" default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': pendulum.now(), 'retries': 0, } task_definition_params = { "requiresCompatibilities": ["FARGATE"], "cpu": "4096", "memory": "30720", "runtimePlatform": {"operatingSystemFamily": "LINUX", "cpuArchitecture": "X86_64"}, "networkMode": "awsvpc", "executionRoleArn": "<AWSRoleARN>", } network_configuration = { "awsvpcConfiguration": { "subnets": [<사용할 subnets>], } } def create_ecs_operator(task_id, cpu, memory, trigger_rule=TriggerRule.ALL_SUCCESS): return EcsRunTaskOperator( task_id=task_id, cluster=ECS_CLUSTER, task_definition=task_definition.output, launch_type="FARGATE", network_configuration=network_configuration, overrides={ "containerOverrides": [ { "name": "test", "environment": [ {"name": "param", "value": "{{ dag_run.conf['param1'] }}"}, ], "command": ["python", "main.py", "--task", task_id], "cpu": cpu, "memory": memory, } ] }, trigger_rule=trigger_rule, ) with DAG(DAG_ID, schedule=None, max_active_runs=100, default_args=default_args) as dag: task_definition = EcsRegisterTaskDefinitionOperator( task_id="task_definition", family="test", container_definitions=[ { "name": "test", "image": DOCKER_IMAGE, "logConfiguration": { "logDriver": "awslogs", "options": { "awslogs-region": AWS_REGION, "awslogs-group": AWS_LOG_GROUP, "awslogs-stream-prefix": "REPORT", "awslogs-create-group": "true", }, }, }, ], **task_definition_params ) task1 = create_ecs_operator("task1", 256, 512) task2 = create_ecs_operator("task2", 256, 512) # Define dependencies task1 >> task2 - 정의된 task_definition에 각각의 테스크에 추가로 containerOverrides를 정의하여 필요한 용량 프로비저닝 등을 설정한다.
간단한 서비스를 배포하기 위해 AWS를 사용하던 도중, Fast API와 같은 서비스가 아닌 최소한의 로직만을 품은 서비스를 배포해야하는 상황이 자주 발생하였다. 개발 싸이클의 최소화를 위해 AWS Lambda를 사용하여 서비스를 배포하며 겪은 시행착오와 해결 방법을 회고하고자 한다. 데이터 분석을 위한 이 서비스는 로우데이터를 가공하고 변형하여 Graph 형태 (https://en.wikipedia.org/wiki/Graph_theory)로 변환하고, GPT를 활용한 보조를 포함한 서비스이다. 이를 위한 requirements는 아래와 같다 위의 패키지들을 로컬과 개발 서버에서 사용하는데에는 문제가 없지만 AWS Lambda에서는 중요한 문제가 발생했다. 해당 문제는 다음과 같다
GCP(Google Cloud Platform) 서비스 계정 키 JSON을 발급받는 방법을 다섯 가지 단계로 설명해드리겠습니다. 1. Google Cloud Console에 로그인 먼저 Google Cloud Console에 로그인합니다. 계정에 액세스하려면 유효한 Google 계정이 있어야 합니다. 2. 서비스 계정 생성 왼쪽 상단의 네비게이션 메뉴에서 "IAM 및 관리" 섹션으로 이동한 후 "서비스 계정"을 선택합니다. "서비스 계정 만들기"를 클릭하여 새 서비스 계정을 생성합니다. 3. 권한 부여 서비스 계정에 적절한 역할(권한)을 할당합니다. 필요에 따라 프로젝트나 리소스에 대한 권한을 설정할 수 있습니다. 4. 키 생성 생성된 서비스 계정을 선택하고 "키 추가"를 클릭합니다. JSON 형식의 키 파일을 선택하고 키를 만듭니다. 5. JSON 다운로드 키가 성공적으로 생성되면 JSON 파일이 다운로드됩니다. 이 파일은 GCP API와 통신할 때 사용되며 안전하게 보관해야 합니다. 이렇게 하면 GCP 서비스 계정 키 JSON을 성공적으로 발급받을 수 있습니다.
안녕하세요. Airflow 를 helm chart를 이용하여 운영하고 있는데요. 이번에는 Airflow를 운영하면서 쌓이는 log 들을 DB가 아닌 AWS S3에 저장하는 방법을 소개시켜드리려고 해요. 이 글을 읽으시는 분들은 이미 Airflow를 운영하면서 사용하시는 helm chart, 즉 yaml 파일 하나를 가지고 있으실 텐데요. AWS S3에 연결하기 위해서는 yaml 파일에서 설정을 살짝 바꿔야합니다. 위 사진 처럼 remote_logging 부분에 'True', remote_base_log_folder 부분에 사용하실 S3 주소를 넣으면 된답니다. 그리고 remote_log_conn_id 에는 사용하실 id값을 자유롭게 적으셔도 돼요. 이렇게 수정을 하고 다시 helm chart를 업데이트 해줍니다. 이제 남은 것들은 굉장히 간단합니다. Airflow web으로 접속 한 뒤 [Admin > Connections] 메뉴를 클릭합니다. 새로운 항목을 추가할건데 위 사진처럼 Connection_id에 yaml파일에 설정한 remote_log_conn_id를, Connection Type은 Amazon Web Services를 선택해주고, AccessKey와 Secret Access Key는 사용하는 값들을 잘 입력해주면 됩니다. 이렇게하면 Airflow에서 나오는 log들이 S3에 저장됩니다.
AWS ElasticBeanstalk로 배포한 FastAPI 앱에서 1분 이상 걸리는 작업을 배포된 AWS Lambda를 통해 진행시키는데, 어느날 해당 API 호출이 실패하며 ElasticBeanstalk에는 아래와 같은 로그가 남아있었다. 2024/03/22 02:57:45 [error] 1112907#1112907: *88 upstream prematurely closed connection while reading response header from upstream, client: x.x.x.x, server: , request: "POST /cross-selling HTTP/1.1", upstream: "http://127.0.0.1:8000/cross-selling", host: "keywordsearch.pluszero.co.kr", referrer: "https://keywordsearch.pluszero.co.kr/crossselling" FastAPI 앱을 운영 서버에 배포하면 일반적으로 아래와 같은 구성으로 배포한다 즉 nginx에서 error.log에 남긴 로그는 업스트림 서버(WSGI HTTP Server 또는 Python Application)에서 준 응답 헤더를 읽는 도중 커넥션이 닫혀버렸다는 것이다. 위 현상은 응답까지 오래걸리는 lambda를 사용한 API에서만 발생하기 때문에 timeout 이슈라고 판단했다. 그래서 다시 ElasticBeanstalk의 web.stdout.log를 확인해 보니 아래와 같은 로그가 남아있었다. Mar 22 02:57:44 ip-10-0-3-4 web[1112864]: [2024-03-22 02:57:44 +0000] [1112864] [CRITICAL] WORKER TIMEOUT (pid:1112896) Mar 22 02:57:45 ip-10-0-3-4 web[1112864]: [2024-03-22 02:57:45 +0000] [1112864] [WARNING] Worker with pid 1112896 was terminated due to signal 9 해당 요청을 처리하는 gunicorn의 worker가 timeout으로 인해 강제로 종료된 것이다. gunicorn worker의 timeout 설정을 바로하여 문제를 해결할 수 있었다. 아래는 수정된 ElasticBeanstalk에서 서버를 시작하는 Procfile 내 명령어이다. web: gunicorn --bind 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker main:app --timeout=180 api별로 응답 timeout 시간을 설정할 수 있다면 좋겠지만, FastAPI가 아닌 WSGI gunicorn 서버에서 발생하는 오류이므로 전역으로 설정할 수 밖에 없는 문제였다.
안녕하세요 지난 글에 이어서 Adobe Analytics API로 데이터를 추출해볼건데요. 지난 글에서는 가이드에 있는 그대로 기본 지표들을 뽑아봤는데, 이번에는 내가 만든 커스텀 값, 계산된 지표들을 뽑아보려고해요. 먼저 가이드는 아래 가이드를 참고했습니다. https://developer.adobe.com/analytics-apis/docs/1.4/guides/reporting/report-description/dimensions/ https://developer.adobe.com/analytics-apis/docs/1.4/guides/reporting/report-description/metrics/ evar값을 찾아서 넣어볼건데요. 저 같은 경우는 evar5 에다가 CID를 수집을 하고있어요. 그래서 그 값을 추출해보려고 합니다. 최종적으로는 CID별 방문 수에 대해서 알아보려고 해요 지난 글에서는 dateRange였지만 위 사진처럼 내가 추출하고자 하는 변수 명을 넣어줍니다. 저는 evar5를 추출하고싶기 때문에 evar5를 써줬어요. 예시를 dimension에 대해 작성하였지만, metric도 방법이 같아요. 가이드에서 사용하고자 하는 값을 찾고 그 명칭으로 넣으시면 돼요 이상으로 Adobe Analytics API를 사용해서 원하는 디멘션, 메트릭 값 추출하는 방법 마치겠습니다.
전체댓글0