ทำไมต้องใช้ Apache Airflow Open Source Platform ที่น่าสนใจปี 2023 !

Apache Airflow คืออะไร?
เป็น Open Source Platform ตัวหนึ่งที่ออกแบบมาเพื่อการทำ schedule, monitor workflow และ data pipelines โดย Airflow มีการเขียน Workflow เป็น Directed Acyclic Graph(DAG) กราฟที่มีหัวลูกศรทิศทางเดียว ซึ่ง DAG ประกอบไปด้วยหลายๆ Task ที่เชื่อมต่อกันและในแต่ละ Task นั้นก็มีความสามารถที่แตกต่างกัน
 
ความสามารถของ Apache Airflow
 1. Task scheduling: airflow สามารถตั้งเวลาในการทำงาน และสามารถ trigger จากภายนอกระบบได้
 2. Dependency management: สามารถตั้งค่าลำดับการทำงานของ tasks ได้
 3. Tasks execution: airflow รอบรับการทำงานหลาย platforms เช่น local machines, cloud environments (เช่น AWS, GCP), and containerized environments (เช่น Docker)
 4. Monitoring and logging: airflow มีหน้าจอสำหรับการ monitor สถานะการทำงาน ของ workflows และสามารถตรวจสอบ logs การทำงานได้
 5. Extensibility: เราสามารถสร้าง operators, sensors, and hooks สำหรับการใช้งานร่วมกับระบบภายนอก airflow ได้
 
หลักการของ Apache Airflow
 1. Scalable: airflow มีสถาปัตยกรรมแบบโมดูลาร์ ที่สามารถปรับขนาด workers ได้ตามที่ต้องการ
 2. Dynamic: เราสามารถสร้าง data pipelines ได้ตามต้องการด้วยภาษา python
 3. Extensible: เราสามารถสร้าง operators ใหม่ๆจาก operators ของ airflow เพื่อให้ตรงกับความต้องการของงานแต่ละประเภทได้
 4. Elegant: airflow มีการออกแบบให้ระบบมีความเรียบง่ายและยืดหยุ่นในการใช้งาน
 
ทำไมการทำ Airflow ถึงเป็นที่นิยม
 ⁃ มีฟังก์ชันการทำงานที่ครบครัน
 ⁃ สามารถที่จะจัดการ workflows ที่ซับซ้อนได้
 ⁃ ระบบมี UI ที่สามารถใช้งานง่าย
 ⁃ มีความยืดหยุ่นในการทำงานเพราะสามารถเขียนโค้ดในการสร้าง tasks ได้เอง
 ⁃ สามารถขยายขนาดของระบบ และ สร้าง operators จาก core operators ของ airflow ได้
 ⁃ สามารถ integrate กับ 3rd party ได้
 ⁃ เป็น open source ที่มี comunity ขนาดใหญ่
Example Scenario:
ลองนึกภาพสถานการณ์ที่เราจำเป็นต้องสร้าง workflow การประมวลผลข้อมูลรายวัน workflow เกี่ยวข้องกับสามงาน: task A task B และ task C
task A ดึงข้อมูลจากแหล่งที่มา task B ดำเนินการแปลงข้อมูล และ task C โหลดข้อมูลที่แปลงแล้วไปยังแหล่งเก็บข้อมูลปลายทาง
 
Setting up the DAG:
อย่างแรกเราต้องสร้าง Directed Acyclic Graph (DAG) ใน airflow
สร้าง python file ชื่อ ‘my_dag.py’ และ import modules ที่จำเป็น
 
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
 
ถัดมาสร้าง DAG พร้อมกับ default arguments:
 
default_args = {
  ‘owner’: ‘airflow’,
  ‘start_date’: datetime(2023, 1, 1),
}
 
dag = DAG(‘daily_data_workflow’, default_args=default_args, schedule_interval=’@daily’)
 
 
Defining Tasks:
ตอนนี้เราจะสร้าง tasks ขึ้นมา 3 tasks ด้วย python functions และ สร้าง operators ที่เกี่ยวข้องของแต่ละ task:
 
# Task A
def task_a_function():
  print(“Executing Task A – Data Retrieval”)
 
task_a = PythonOperator(
  task_id=’task_a’,
  python_callable=task_a_function,
  dag=dag
)
 
# Task B
def task_b_function():
  print(“Executing Task B – Data Transformation”)
 
task_b = PythonOperator(
  task_id=’task_b’,
  python_callable=task_b_function,
  dag=dag
)
 
# Task C
def task_c_function():
  print(“Executing Task C – Data Loading”)
 
task_c = PythonOperator(
  task_id=’task_c’,
  python_callable=task_c_function,
  dag=dag
)
 
 
Task Dependencies:
เพื่อบอกลำดับการทำงานระหว่าง tasks โดยใช้ >> operator:
 
task_a >> task_b >> task_c
 
 
ในตัวอย่างนี้ task A เป็นข้อกำหนดเบื้องต้นสำหรับ task B และ task B เป็นข้อกำหนดเบื้องต้นสำหรับ task C ซึ่งหมายความว่า task C จะดำเนินการต่อเมื่อ task A และ task B เสร็จสมบูรณ์แล้วเท่านั้น
 
Running the Workflow:
ในการเรียกใช้ workflow Scheduler ของ Airflow จะดึง DAG ขึ้นมาทำงานตามช่วงเวลาที่ระบุ (@daily ในกรณีของเราจะทำงานทุกวัน)
 
เมื่อ save file DAG แล้วเข้าไปที่ Airflow จะเห็น DAG ที่เราสร้างขึ้นมาบอกสถานะการทำงาน และ schedule ที่ตั้งไว้
 

การติดตั้ง Apache Airflow ด้วย Docker

Step 1: Fetching docker-compose.yaml

สร้าง directory airflow ของเราขึ้นมา แล้ว download docker-compose.yaml ด้วยคำสั่งข้างล่างนี้

ใน docker-compose.yaml จะประกอบไปด้วย services ดังนี้

airflow-schedulerscheduler จะตรวจสอ tasks และ DAG ทั้งหมดจากนั้นจะ trigger task ถัดไปเมื่อ งานก่อนหน้าทำงานสำเร็จ
airflow-webserver – webserver สามารถเข้าใช้งานที่http://localhost:8080.
airflow-workerworker ที่ประมวลผล tasks ที่ถูกป้อนให้โดยscheduler
airflow-triggerer – The triggerer runs an event loop for deferrable tasks.
airflow-init – initialization service
postgres database
redisตัวกลางที่คอยป้อนงานจาก scheduler ให้กับ worker

Step 2: Initializing Environment

ก่อนการรัน airflow ครั้งแรกต้องสร้าง directories ที่จำเป็น และ initialize database

Directories ที่จำเป็น

./dags – you can put your DAG files here.
./logscontains logs from task execution and scheduler.
./config – you can add custom log parser or add airflow_local_settings.py to configure cluster policy.
./plugins – you can put your custom plugins here.

สร้าง directories ที่จำเป็นด้วยคำสั่ง

mkdirp ./dags ./logs ./plugins ./config

Initialize database

docker compose up airflow-init

หลังจาก initialize สำเร็จจะได้ผลลัพธ์หน้าตาแบบนี้

airflow-init_1       | Upgrades done
airflow-init_1       | Admin user airflow created
airflow-init_1       | 2.6.1
start_airflow-init_1 exited with code 0

จะมี account ถูกสร้างขึ้นมา สามารถ login ด้วย  username airflow และpassword airflow

Step 3: Running Airflow

ตอนนี้สามารถ start airflow ได้แล้วด้วยคำสั่ง

docker compose up

แล้วเราสามารถ login เข้าใช้งาน web ui ของ airflow ได้ที่http://localhost:808


บริการ พัฒนาซอฟต์แวร์ แอปพลิเคชัน ตามความต้องการ!

เราเป็นผู้นำในด้านการให้บริการ พัฒนาซอฟต์แวร์ แอปพลิเคชัน และโซลูชันด้านไอที แบบครบวงจร (Full-stack) ตั้งแต่การให้คำปรึกษาไปจนถึงการ Maintenace ระบบ เรามีความตั้งใจที่จะเปลี่ยนให้ นวัตกรรม และไอเดีย ระดับโลกของคุณให้กลายเป็นซอฟต์แวร์ที่มีคุณภาพ โดยที่บริษัทรับพัฒนาซอฟต์แวร์ เขียนโปรแกรม และ แอปพลิเคชัน ตามความต้องการทางธุรกิจคุณได้ ทุกรูปแบบ ทุกประเภท ทุกความต้องการทางธุรกิจ หากคุณมีไอเดียดีๆ ที่ต้องการพัฒนา Software หรือ พัฒนา Application สามารถปรึกษาเราได้ที่นี่!