이번 포스팅에서는 slack sdk와 airflow을 활용하여 slack 봇을 만드는 과정을 설명해보겠습니다. 🤖
목차
- Slack Bot 만들기
- Airflow와 slack sdk 를 활용하여 개발 행사 및 공모전 정보 알람 받기
airflow는 배치 스케줄로 돌아가는 task를 관리할 수 있기 때문에 알람 기능에 적용한다면 편리하게 특정 task를 모니터링 할 수 있다.
ETL 작업을 수행하는 DAG에 slack알림 기능을 적용하여, 매일 새로운 개발행사, 공모전 소식을 받을 수 있는 서비스를 만들어보자!
개발환경
OS : ubuntu 22.04
Python 3.11
Airflow 2.3.0
1. 🤖 Slack Bot 만들기
1-1. Slack App install & bot token 부여 받기
slack으로 알람을 받으려면 slack app을 다운받고 bot을 사용할 수 있는 token을 부여받아야한다.
app name과 workspace 를 지정해준다.
그런 다음 OAuth & Permission 탭으로 이동하여 쭉 내리면 Scope 부분이 있다. 여기서 chat:write, im:write OAuth Scope 를 추가한다.
그런다음 Install to workspace 클릭! 해당 창이 뜨면 허용 버튼을 눌러준다.
이렇게 하면 token 번호가 부여된다.
이 token 번호를 활용하여 Slack SDK 와 연동하여 slack에 알림 또는 메신저를 보낼 수 있게 된다.
1-2. EC2 환경 셋업
이제 ec2 환경에 slack sdk 라이브러리를 설치해준다.
pip install slack_sdk
1-3. Slack App 채널에 초대하기
자신이 만든 slack bot App을 채널에 추가해주면 다음과 같은 메세지를 확인할 수 있다.
이걸로 내게 메세지를 보내줄 Slack bot을 만들고 초대해서 준비하는 단계까지 완료!
이제 Python 코드를 통해 slack에 message 가 가는지 확인해보자
1-4. Python 에서 Slack SDK를 이용한 Alarm 테스트
slack에 메세지를 보내는 파이썬 코드는 다음과 같다.
import slack_sdk
slack_token = {발급된 토큰 번호}
client = slack_sdk.WebClient(token=slack_token)
client.chat_postMessage(channel = '#khuda-4th-de', #자신의 채널명
text = 'Hi KHUDA!') # 보낼 메세지
vi 에서 python 파일을 작성한 후 python test.py 명령어로 해당 스크립트를 실행시킬 수 있다.
#cli 로 python 스크립트 실행
python test.py
실행시키면 bot으로부터 메세지를 받는 것을 확인할 수 있다!
이제 Airflow환경에서 DAG 를 실행시켰을 때 그 결과에 대한 메세지를 slack으로 전달해보자
팀원들과 만든 DAG 를 활용하여 task의 결과를 slack 알람으로 받는 서비스를 구현한다.
2. Airflow와 slack sdk 를 활용한 개발 행사 및 공모전 정보 알람 받기
전체적인 파일 디렉토리 구조는 다음과 같다.
주목해야할 점은 slack sdk와 관련된 부분은 slack_alert.py로 따로 구성하여야한다.
// airflow 디렉토리 내 구조
├── airflow
│ └── data
│ ├── contest_20231130.csv
│ ├── event_20231130.csv
│ ├── velog_20231130.csv
├── airflow.cfg
├── airflow.db
├── crawling
│ ├── crawling_contest.py
│ ├── crawling_event.py
│ ├── crawling_velog.py
│ └── requirements.py
├── dags
│ ├── dag_merge.py
├── logs
├── slack
│ ├── slack_alert.py
└── webserver_config.py
2-1. slack_alert.py 스크립트 구성
slack_alert.py는 하나의 class 로 이루어져 있고 DAG가 성공됐을 때 notify_msg 함수를 실행시켜 크롤링된 csv 파일의 내용을 slack 메세지로 전달할 수 있도록했다.
#slack_alert.py
from slack_sdk import WebClient
class SlackAlert:
def __init__(self, channel, token):
self.channel = channel
self.client = WebClient(token=token)
def notify_msg(self, context):
date = datetime.today().strftime('%Y%m%d')
event_filename = f'/home/ubuntu/airflow/airflow/data/event_{date}.csv'
velog_filename = f'/home/ubuntu/airflow/airflow/data/velog_{date}.csv'
contest_filename = f'/home/ubuntu/airflow/airflow/data/contest_{date}.csv'
# csv 파일 읽기
with open(event_filename, 'r', encoding='utf-8') as eventfile:
event_reader = csv.DictReader(eventfile)
event_content = f"""
Today's Date: {date}
*📣개발행사 정보입니다.📣*
"""
for row in event_reader:
event_content += f"""
'행사명' : {row['title']},
'주최사' : {row['host']},
'날짜' : {row['start_date']},
'포스터 링크' : {row['image']},
'관련 링크' : {row['link']} """
self.client.chat_postMessage(channel=self.channel, text=event_content)
with open (contest_filename, 'r', encoding = 'utf-8') as contestfile:
contest_reader = csv.DictReader(contestfile)
contest_text = f"""
*📣공모전 정보입니다.📣*"""
for row in contest_reader :
contest_content += f"""
'대회명' : {row['제목']},
'카테고리' : {row['카테고리']},
'주최사' : {row['주최']},
'접수 시작일' : {row['접수 시작일']},
'접수 마감일' : {row['접수 마감일']},
'심사 시작일' : {row['심사 시작일']},
'심사 종료일' : {row['심사 종료일']},
'심사 마감일' : {row['심사 마감일']},
'D-day' : {row['D-Day']},
'접수 상태' : {row['상태']},
'관련 포스터' : {row['이미지 링크']},
'관련 링크' : {row['링크']}
"""
self.client.chat_postMessage(channel=self.channel, text= contest_content)
with open (velog_filename, 'r', encoding = 'utf-8') as velogfile:
velog_reader = csv.DictReader(velogfile)
velog_text = f"""
*📣IT 트렌드 정보입니다.📣*"""
for row in velog_reader :
velog_content += f"""
'제목' : {row['title']},
'작성자' : {row['writer']},
'하단 링크에서 자세한 내용을 확인해보세요'\n : {row['link']}
"""
self.client.chat_postMessage(channel=self.channel, text= velog_content)
2-2. dag.py 에 slack 인자 부여하기
이제 dag.py 파일에서 DAG가 성공했을 때 slack 메세지로 바로 보낼 수 있게 코드를 수정해줘야한다.
주목해야할 부분은 upload task부분에서 'on_success_callback' : slack.notify_msg으로 설정하여 upload tasksuccess 시 callback함수로 실행될 수 있게 해주었다.
defalut_args 에 DAG success시 실행되는 callback 함수로 등록해도 된다.
(upload task에 callback 함수로 해 둔 이유는 task 로그가 보기 편해서.. )
#dag_merge.py
#airflow DAG, Operator 라이브러리 import 생략
from slack_sdk import WebClient
from crawling.requirements import *
from crawling.crawling_event import *
from crawling.crawling_velog import *
from crawling.crawling_contest_final import *
from slack_alert import SlackAlert
slack = SlackAlert('#채널명', #발급받은토큰번호)
def upload_to_s3() :
date = datetime.now().strftime("%Y%m%d")
hook = S3Hook(# connection ID 입력) # connection ID 입력
#..생략
default_args = {
'owner': 'owner-name',
'depends_on_past': False,
'email': ['your-email@g.com'],
'email_on_failure': False,
'email_on_retry': False,
':retries': 1,
'retry_delay': timedelta(minutes=30),
#'on_success_callback' : slack.notify_msg <- 이 부분에 success 시 callback함수로 지정해도됨
}
dag_args = dict(
dag_id="crawling-upload",
default_args=default_args,
description='KHUDA de-project DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 11, 29),
tags=['de-project'],
)
with DAG( **dag_args ) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "start"'
)
upload = PythonOperator(
task_id = 'upload',
python_callable = upload_to_s3,
on_success_callback=slack.notify_msg
)
# -------- velog -------- #
velog_get_url_task = PythonOperator(
task_id='velog_get_url',
python_callable= velog_get_url
)
velog_get_info_task= PythonOperator(
task_id='velog_get_info',
python_callable= velog_get_info
)
# -------- event -------- #
event_get_data_task = PythonOperator(
task_id="event_get_data",
python_callable= event_get_data
)
# -------- contest -------- #
contest_task = PythonOperator(
task_id='contest_crawling',
python_callable=contest_crawling
)
complete = BashOperator(
task_id='complete_bash',
bash_command='echo "complete"'
)
start >> event_get_data_task >> upload >> complete
start >> velog_get_url_task >> velog_get_info_task >> upload >> complete
start >> contest_task >> upload >> complete
이제 DAG 를 실행시켜보자!
현재 DAG의 workflow는 각 사이트에서 크롤링 후 S3 Hook을 이용해서 S3에 적재한 뒤, slack으로 크롤링 결과를 보내주는 프로세스로 구성했다.
2-3. DAG 실행
몇 번의 시행착오가 있었지만 ^^! 성공 !
당황하지말고,, 인내심을 가지며,, task 로그들을 통해 디버깅 해주면 언젠가 된다.. ✨
정상적으로 s3에 적재된 것 확인!
봇으로부터 메세지 알림이 오는 것도 확인할 수 있다.🔔
참고자료
https://lsjsj92.tistory.com/634
'데이터 > 데이터 엔지니어링' 카테고리의 다른 글
[Apache Airflow] ETL 프로젝트 #3 (Prometheus + Grafana를 이용한 airflow 모니터링) (2) | 2023.12.03 |
---|---|
[Apache Airflow] ETL 프로젝트 #1 (ETL 파이프라인 구축) (1) | 2023.11.25 |
[Apache Airflow] Airflow로 Branch, ML 예제 수행해보기 (1) | 2023.11.19 |
[Chapter07] OpenSearch CRUD 실습 (1) | 2023.10.07 |
[Chapter 06] 분산시스템의 이해, Zookeeper (1) | 2023.10.01 |