BerandaComputers and TechnologyKetergantungan Dag Aliran Udara

Ketergantungan Dag Aliran Udara

Seringkali DAG aliran udara menjadi terlalu besar dan rumit untuk dipahami. Mereka dipisahkan di antara tim yang berbeda di dalam perusahaan untuk implementasi dan dukungan di masa mendatang. Ini mungkin berakhir dengan masalah menggabungkan DAG yang berbeda ke dalam satu pipa.

Saya mengalami masalah ini – saya harus menghubungkan dua DAG yang independen tetapi terhubung secara logis. Dalam posting ini, kita akan membahas opsi apa yang tersedia di Airflow untuk menghubungkan DAG yang bergantung satu sama lain.

Contoh penyiapan

Mari kita mulai dengan membuat contoh. Mari kita bayangkan bahwa kita memiliki proses ETL yang dibagi antara 3 DAG independen – ekstrak, ubah, dan muat.

Agar contoh lebih ilustratif, kita membutuhkan setidaknya satu pelaksana Lokal agar lebih dari satu tugas dapat dijalankan secara paralel. Untuk melakukan ini, saya akan menggunakan file docker-compose ini dengan Airflow, PostgreSQL pra-instal dan LocalExecutor yang telah dikonfigurasi sebelumnya.

  $ docker-compose -f docker-compose.yml up -d   

Ekstrak dag:

  # dags / extract.py    dari datetime import datetime, timedelta    dari aliran udara impor DAG  dari airflow.operators mengimpor BashOperator    FREKUENSI='/ 10 '  DEFAULT_ARGS={      'dependent_on_past': Salah,      'start_date': datetime.today () - timedelta (1),      'retries': 1,      'catchup': Salah,  }    dag=DAG (      'extract_dag',      default_args=DEFAULT_ARGS,      schedule_interval=FREQUENCY,      catchup=False,  )  # Mencetak pesan di log dan tidur selama 2 detik  t1=BashOperator (      task_id='task_1',      bash_command='echo "Mengekstrak barang dari s3"; tidur 2;',      dag=dag)  # Mencetak pesan di log dan tidur selama 2 detik  t2=BashOperator (      task_id='task_2',      bash_command='echo "Mengekstrak barang dari jdbc"; tidur 2;',      dag=dag)    t1.set_downstream (t2)   

Transformasi dag:

  # dags / transform.py  dari datetime import datetime, timedelta    dari aliran udara impor DAG  dari airflow.operators mengimpor BashOperator    FREKUENSI='/ 10 '  DEFAULT_ARGS={      'dependent_on_past': Salah,      'start_date': datetime.today () - timedelta (1),      'retries': 1,      'catchup': Salah,  }    dag=DAG (      'transform_dag',      default_args=DEFAULT_ARGS,      schedule_interval=FREQUENCY,      catchup=False,  )  # Mencetak pesan di log dan tidur selama 2 detik  t1=BashOperator (      task_id='task_1',      bash_command='echo "Mengubah barang dari s3"; sleep 2;',      dag=dag)  # Mencetak pesan di log dan tidur selama 2 detik  t2=BashOperator (      task_id='task_2',      bash_command='echo "Mengubah barang dari jdbc"; sleep 2;',      dag=dag)    t1.set_downstream (t2)   

Muat dag:

  # dags / load.py  dari datetime import datetime, timedelta    dari aliran udara impor DAG  dari airflow.operators mengimpor BashOperator    FREKUENSI='/ 10 '  DEFAULT_ARGS={      'dependent_on_past': Salah,      'start_date': datetime.today () - timedelta (1),      'retries': 1,      'catchup': Salah,  }    dag=DAG (      'load_dag',      default_args=DEFAULT_ARGS,      schedule_interval=FREQUENCY,      catchup=False,  )  t1=BashOperator (      task_id='task_1',      bash_command='echo "Memuat barang ke s3"; tidur 2;',      dag=dag)    t2=BashOperator (      task_id='task_2',      bash_command='echo "Memuat barang ke sarang"; tidur 2;',      dag=dag)    t1.set_downstream (t2)   

Jika DAG tersebut adalah tugas dalam DAG yang sama, kita dapat menambahkan baris tersebut ke file DAG:

  ekstrak.set_downstream (transformasi)  transform.set_downstream (beban)   

Namun, karena mereka tidak berada di DAG yang sama, kami tidak dapat melakukan ini. Tetapi ada cara untuk mencapai hal yang sama di Airflow.

TriggerDagRunOperator

TriggerDagRunOperator adalah operator yang dapat memanggil DAG eksternal. Dengan operator ini dan pengenal DAG eksternal, kami dapat dengan mudah memicunya.

DAG eksternal harus dihidupkan. Jika tidak demikian, maka mereka akan tetap terpicu tetapi tidak akan dijalankan – hanya terjebak dalam status berjalan.

Hal keren tentang operator ini adalah bahwa proses DAG disimpan dalam riwayat DAG yang sama ini serta log. Jadi Anda melihat semua dag berjalan hanya dalam satu halaman alih-alih menggali UI aliran udara yang tampaknya sangat nyaman bagi saya.

Untuk TriggerDagRunOperator kita membutuhkan pengontrol, fungsi yang mengontrol awal dari target DAG berdasarkan beberapa kondisi. Kondisi ini bisa menggunakan konteks eksekusi yang diteruskan ke fungsi dan bisa jadi cukup rumit.

  def conditionally_trigger (konteks, dag_run_obj):      jika konteks ['params'] ['condition']:          return dag_run_obj   

Dalam fungsi pengontrol, jika objek dag_run_obj dikembalikan, dag akan dipicu . Dalam contoh di atas, suatu fungsi hanya mengembalikan objek ini, yaitu selalu dipicu.

Untuk melihat lebih dekat pada objek konteks , kita dapat mencetaknya. Dalam output kita melihat kamus besar dengan banyak informasi tentang proses saat ini:

  {  'conf': ,  'dag': ,  'ds': ..., 'next_ds': ..., 'next_ds_nodash': ..., 'prev_ds': ...,  'prev_ds_nodash': ..., 'ds_nodash': ..., 'ts': ..., 'ts_nodash': ...,  'ts_nodash_with_tz': ..., 'recent_ds': ...,'esterday_ds_nodash ': ...,  'besok_ds': ..., 'besok_ds_nodash': ..., 'END_DATE': ...,  'end_date': ..., 'dag_run': ..., 'run_id': ..., 'execution_date': ...,  'prev_execution_date': ..., 'prev_execution_date_success': ...,  'prev_start_date_success': ..., 'next_execution_date': ...,  'latest_date': ..., 'macros': ...,  'params': {'condition': True},  'tables': ..., 'task': ..., 'task_instance': ..., 'ti': ...,  'task_instance_key_str': ..., 'test_mode': Salah,  'var': {'value': None, 'json': None}, 'inlets': ...,  'outlet': ...  }   

Di bawah ini adalah contoh DAG yang akan berjalan setiap 5 menit dan memicu tiga DAG lagi menggunakan TriggerDagRunOperator .

  dari datetime import datetime, timedelta    dari aliran udara impor DAG  dari airflow.operators.dagrun_operator impor TriggerDagRunOperator    FREKUENSI='/ 5 '  DEFAULT_ARGS={      'dependent_on_past': Salah,      'start_date': datetime.today () - timedelta (1),      'retries': 1,      'catchup': Salah,  }    dag=DAG (      'etl_with_trigger',      description='DAG dengan pemicu',      default_args=DEFAULT_ARGS,      schedule_interval=FREQUENCY,      catchup=False,  )  trigger_extract_dag=TriggerDagRunOperator (      task_id='trigger_extract_dag',      trigger_dag_id='extract_dag',      python_callable=conditionally_trigger,      params={'condition': True},      dag=dag,  )  trigger_transform_dag=TriggerDagRunOperator (      task_id='trigger_transform_dag',      trigger_dag_id='transform_dag',      python_callable=conditionally_trigger,      params={'condition': True},      dag=dag,  )  trigger_load_dag=TriggerDagRunOperator (      task_id='trigger_load_dag',      trigger_dag_id='load_dag',      python_callable=conditionally_trigger,      params={'condition': True},      dag=dag,  )    trigger_extract_dag.set_downstream (trigger_transform_dag)  trigger_transform_dag.set_downstream (trigger_load_dag)   

trigger_dag_id di sini hanyalah identifikasi DAG eksternal yang ingin Anda picu.

Kesimpulannya, TriggerDagRunOperator dapat digunakan untuk menjalankan beberapa tugas berat atau mahal yang perlu dijalankan hanya jika kondisi tertentu terpenuhi. Tapi TriggerDagRunOperator bekerja dengan cara api-dan-lupakan. Ini berarti bahwa induk DAG tidak menunggu sampai DAG yang dipicu selesai sebelum memulai tugas berikutnya.

ExternalTaskSensor

Salah satu cara menandakan penyelesaian tugas antara DAG adalah dengan menggunakan sensor. Sensor di Aliran Udara adalah jenis tugas khusus. Ia memeriksa apakah kriteria tertentu dipenuhi sebelum selesai dan membiarkan tugas hilir mereka dijalankan.

Ini adalah cara yang bagus untuk membuat koneksi antara DAG dan sistem eksternal. Sistem eksternal ini dapat menjadi DAG lain saat menggunakan ExternalTaskSensor . ExternalTaskSensor secara teratur mencolek status eksekusi DAG turunan dan menunggu sampai mereka mencapai status yang diinginkan, yang dijelaskan di parameter allow_states . Secara default, status yang diinginkan adalah berhasil .

Ada dua hal yang ExternalTaskSensor mengasumsikan:

  • Anak DAG tidak boleh dipicu secara manual agar sensor berfungsi;
  • DAG turunan harus berjalan pada tanggal eksekusi yang sama dengan DAG induk, yang berarti DAG tersebut harus memiliki interval jadwal yang sama.

Untuk mengkonfigurasi sensor, kita memerlukan pengenal DAG lain, dag_id . Selain itu, kita juga dapat menentukan pengenal external_task_id tugas dalam DAG jika kita ingin menunggu tugas untuk diselesaikan. Jika kita ingin menunggu seluruh DAG kita harus mengaturnya ke Tidak ada

  dari datetime import datetime, timedelta    dari aliran udara impor DAG  dari airflow.sensors.external_task_sensor import ExternalTaskSensor    FREKUENSI='/ 5 '  DEFAULT_ARGS={      'dependent_on_past': Salah,      'start_date': datetime.today () - timedelta (1),      'retries': 1,      'catchup': Salah,  }    dag=DAG (      'etl_with_sensor',      description='DAG dengan sensor',      default_args=DEFAULT_ARGS,      schedule_interval=FREQUENCY,      catchup=False,  )  waiting_extract_dag=ExternalTaskSensor (      task_id='waiting_extract_dag', # menunggu seluruh dag dieksekusi      execution_delta=Tidak ada, # Hari yang sama seperti hari ini      external_dag_id='extract_dag', # ini adalah id dari dag      external_task_id=Tidak ada, # menunggu seluruh dag dieksekusi      dag=dag,      batas waktu=60,  )  waiting_transform_dag=ExternalTaskSensor (      task_id='waiting_transform_dag', # menunggu seluruh dag dieksekusi      execution_delta=Tidak ada, # Hari yang sama seperti hari ini      external_dag_id='transform_dag', # ini adalah id dari dag      external_task_id=Tidak ada, # menunggu seluruh dag dieksekusi      dag=dag,      batas waktu=60,  )  waiting_load_dag=ExternalTaskSensor (      task_id='waiting_load_dag', # menunggu seluruh dag dieksekusi      execution_delta=Tidak ada, # Hari yang sama seperti hari ini      external_dag_id='load_dag', # ini adalah id dari dag      external_task_id=Tidak ada, # menunggu seluruh dag dieksekusi      dag=dag,      batas waktu=60,  )    waiting_extract_dag.set_downstream (waiting_transform_dag)  waiting_transform_dag.set_downstream (waiting_load_dag)   

Saya juga telah menyetel parameter batas waktu di sini, karena menurut saya itu perlu. Jika batas waktu tidak disetel dan beberapa dag kami tidak berfungsi, sensor akan macet dalam status berjalan, yang dapat menyebabkan seluruh Aliran Udara macet saat tugas maksimum berjalan.

ExternalTaskSensor mengasumsikan bahwa itu bergantung pada tugas dalam menjalankan DAG dengan tanggal eksekusi yang sama . Jika tidak, Anda perlu menggunakan execution_delta atau execution_date_fn saat Anda membuat instance sensor.

Tapi bisakah kita menggabungkan ExternalTaskSensor dan TriggerDagRunOperator untuk menunggu satu dag selesai sebelum memicu yang berikutnya? Itu yang kita mau kan?

Tentu saja kita bisa. Tetapi kita perlu melakukan beberapa langkah tambahan untuk itu:

  • Buat ketergantungan antara TriggerDagRunOperator dan ExternalTaskSensor
  • Hapus jadwal DAG anak dengan membuat schedule_interval=Tidak ada sehingga dag ini hanya terpicu dari DAG kami.
  • ExternalTaskSensor harus ketahui waktu eksekusi TriggerDagRunOperator spesifik hingga satu detik. Ini perlu diimplementasikan karena Airflow tidak memiliki fungsi ini.

Tetapi semua ini terdengar rumit dan tidak perlu ketika Airflow memiliki SubDagOperator .

SubDagOperator

SubDAG adalah DAG yang dapat dicolokkan yang dapat dimasukkan ke DAG induk. Ini memungkinkan pengembang DAG untuk mengatur definisi DAG kompleks dengan lebih baik dan menggunakan kembali DAG yang ada dengan SubDagOperator .

Ini adalah solusi ideal untuk masalah saya, yang pada dasarnya dapat disajikan sebagai TriggerDagRunOperator + ExternalTaskSensor tanpa menambahkan kompleksitas tambahan dan operator yang tidak perlu.

  dari datetime import datetime, timedelta    dari aliran udara impor DAG  dari airflow.operators.subdag_operator import SubDagOperator  dari ekstrak import dag sebagai extract_dag  dari transform import dag sebagai transform_dag  dari load import dag sebagai load_dag    FREKUENSI='/ 5 '  DEFAULT_ARGS={      'dependent_on_past': Salah,      'start_date': datetime.today () - timedelta (1),      'retries': 1,      'catchup': Salah,  }    dag=DAG (      'etl_with_subdag',      description='DAG dengan subdag',      default_args=DEFAULT_ARGS,      schedule_interval=FREQUENCY,      catchup=False,  )  waiting_extract_dag_id='waiting_extract_dag'  waiting_transform_dag_id='waiting_transform_dag'  waiting_load_dag_id='waiting_load_dag'  extract_dag.dag_id='{}. {}'. format (dag.dag_id, waiting_extract_dag_id)  transform_dag.dag_id='{}. {}'. format (dag.dag_id, waiting_transform_dag_id)  load_dag.dag_id='{}. {}'. format (dag.dag_id, waiting_load_dag_id)    waiting_extract_dag=SubDagOperator (      task_id=waiting_extract_dag_id,      subdag=extract_dag,      dag=dag,  )  waiting_transform_dag=SubDagOperator (      task_id=waiting_transform_dag_id,      subdag=transform_dag,      dag=dag,  )  waiting_load_dag=SubDagOperator (      task_id=waiting_load_dag_id,      subdag=load_dag,      dag=dag,  )    waiting_extract_dag.set_downstream (waiting_transform_dag)  waiting_transform_dag.set_downstream (waiting_load_dag)   

Di sini Anda dapat melihat bahwa alih-alih dag_id SubDAG menggunakan objek DAG nyata yang diimpor dari bagian lain dari kode tersebut.

Zoom into SubDAG

Intinya, semua SubDAG adalah bagian dari DAG induk dalam segala hal – Anda tidak akan melihat prosesnya di riwayat atau log DAG. Di UI Aliran Udara, terdapat tombol “Perbesar ke Sub DAG” untuk melihat internal DAG anak.

Dokumentasi mengatakan bahwa cara terbaik untuk membuat DAG semacam itu adalah dengan menggunakan metode pabrik, tetapi saya telah mengabaikan ini untuk menyederhanakan kode. Perlu diingat bahwa tanpa metode pabrik, Anda dapat melihat SubDAG sebagai DAG normal pada panel admin di Airflow UI, tetapi untuk beberapa alasan, tidak selalu demikian.

Read More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments