Get In Touch
Kadıköy, İstanbul
mail@oguzerdogan.com
Ph: +90 554 524 0164
Back

Airflow 3.0 Geldi, yenilikler neler?

Bugün heyecanlıyız çünkü kapımızda büyük bir haber var: Airflow 3.0 resmen aramızda! 🥳

Sektörde defacto standart haline gelen orchestration aracı Airflow köklü bir UI değişikliği yaparak aylar öncesinden heyecan uyandırmıştı. Gelin birlikte son güncellemeleri inceleyelim.

Sekmeleri İnceleyelim

Gördüğünüz gibi, bu eski arayüz gitmiş ve yerine modern bir şey gelmiş. Tamamen farklı! Öyle ki, “Acaba yanlış programı mı açtım?” diye düşünebilirsiniz. Ama korkmayın, doğru yerdesiniz. Bu güzellik, sıfırdan React ile yazılmış. Yani artık daha hızlı, daha akıcı.

Home

Açar açmaz karşınıza çıkan ana sayfa bile artık daha anlamlı. Tepede, Airflow bileşenlerinizin (meta database, scheduler, trigger falan filan) sağlık durumunu görüyorsunuz. “Acaba scheduler yine uyuya mı kaldı?” derdine son! (1)

Hemen altında başarısız, çalışan, aktif DAG’lere hızlı linkler var. (2)

Ama asıl olay, bir bakışta DAG çalıştırmalarınızı ve task örneklerinizi izleyebileceğiniz o müthiş metrikler. Hangi task ne durumda, kim koşuyor, kim sürünüyor, hepsi gözünüzün önünde. (3)

Sağdaki “Asset Events” kısmına ise sonra geleceğiz. (4)

DAG’s

Artık DAG’lerinizi ister kutucuk (tiles) ister liste olarak görün. Seçim sizin! Ama o kutucuklar çok tatlı olmuş. Her bir kutucuktaki çubuklar, DAG çalıştırmalarının süresini gösteriyor. Çubuk ne kadar yüksekse, o çalıştırma o kadar uzun sürmüş demek. Durumunu da anında görüyorsunuz. Şahane!

Sıralama seçenekleri (display name, next DAG run, latest run state…) gırla. Arama çubuğu zaten standart. DAG çalıştırmaları sekmesi, task örnekleri sekmesi… hepsi yerli yerinde.

DAG Detayları

Bir DAG’e tıkladığınızda açılan sayfa da elden geçirilmiş.

  • Sol Taraf (Grid View): Kaç çalıştırma görüntülemek istediğinizi seçin (varsayılan 10, ama abartabilirsiniz). Çalıştırmaların ve task’ların durumları net bir şekilde görünüyor. (1)
  • Sağ Taraf (Özet): Son çalıştırma, durumu, sahibi, son DAG versiyonu (evet yanlış duymadınız buna da geleceğiz). “Reparse DAG” butonu da burada! Yani DAG kodunu değiştirdiğinizde Airflow’un keyfini beklemek yerine “Hadi koçum!” diyebiliyorsunuz.(2)
  • Fiyasko Raporu: Son 24 saatte (veya seçtiğiniz aralıkta) kaç task, kaç DAG çalıştırması patlamış, görebilirsiniz.(3)
  • Süre Metrikleri: Son 3 çalıştırmanın ne kadar sürdüğünü, ne kadar süre kuyrukta beklediğini görebiliyoruz.(3)
  • Recent Failed Task Logs! Yani, patlayan bir task’ın loglarına ulaşmak için artık 50 tane tıklama yapmanıza gerek yok! Doğrudan önünüze geliyor. İsterseniz tüm logları açın, isterseniz tek tuşla task’ı yeniden başlatın. Bu özellik debug sırasında hayat kurtarır, benden söylemesi!(5)

Runs Sekmesi

Tasks Sekmesi

Genel DAG görünümündeki Task sekmesi de yenilenmiş. Her task için kullanılan operatör, tetikleme kuralı, son durum gibi bilgiler var. O çubuk grafikler yine burada, task örneklerinin sürelerini gösteriyor.

Bir DAG çalıştırmasına not ekleyebiliyorsunuz (“Abi burası şu yüzden patladı, haberin olsun” demek için birebir). Aynı şekilde tek bir task’a da not düşebilirsiniz.

Task logları artık ÇOK daha okunaklı. Kaynağa veya log seviyesine göre filtreleyebiliyorsunuz. Gözlerimiz bayram etti resmen!

Graph View: Spagetti DAG’lere Görsel Çözüm!

Task’lar arasındaki bağımlılıkları görmek için Graph View hala emrinizde. DAG versiyonu, çalıştırma, bağımlılıklar gibi seçeneklerle oynamak mümkün.

DAG Tetikleme: Gelişmiş Ayarlarla Daha Fazla Kontrol!

“Trigger” butonu artık daha akıllı. Mantıksal tarih (logical_date), çalıştırma ID’si (run ID – genelde kurcalamayın ama isterseniz o da var), parametreler, notlar… ne ararsanız var.

Pools Görünümü: Kim Nerede Koşuyor?

Pools görünümü de biraz değişmiş. Arama, sıralama var. En güzeli, hangi pool‘da hangi task’lar çalışıyor, durumları ne, görebiliyorsunuz.

Arayüzdeki Ufak Tefek Güzellikler

  • Aydınlık mı, Karanlık mı? Seçim sizin! (Ben karanlıkçıyım!)
  • Bir DAG’e tıklayınca varsayılan olarak hangi görünümün (Grid, Graph vs.) açılacağını ayarlayabiliyorsunuz.
  • Docs kısmından REST API referansına ulaşabilirsiniz.

API Cephesinde Neler Yeni?

Artık Airflow FastAPI kullanıyor ve yeni API uç noktaları /api/v2 altında sizleri bekliyor. Şiddetle tavsiye ederim, bir göz atın! Artık API ile yapabilecekleriniz çok daha fazla:

  • Varlıkları (Assets) yönetin.
  • API üzerinden backfill yapın! (Evet, doğru duydunuz!)
  • XCOMs pushlayın.
  • Kullanıcı izinlerine erişin… ve daha neler neler!

Dokümantasyon

Eğer DAG’leriniz için DOC.MD dosyalarıyla dokümantasyon yazıyorsanız, artık arayüzde bunun için özel bir “Docs” butonu var. Tıklayınca mis gibi dokümantasyonunuz karşınızda. “Bu DAG ne işe yarıyordu yahu?” devri bitti!

Arayüz kısmı bu kadar. Sırada daha da heyecan verici şeyler var.

Assets ile Data-Aware ve Event-Driven Akışlar

Hepimiz biliyoruz ki, sadece Cron tabanlı zamanlamalar bazen yetersiz kalıyor. Bir sonraki görevin ne zaman çalışması gerektiğini tahmin etmek yerine, ihtiyaç duyduğu verinin hazır olduğu anı bilmek istemez miydik? İşte Airflow 2.4 ile hayatımıza giren Datasets tam da data-aware olma yönünde atılmış ilk adımdı; DAG’ler arası veri bağımlılıklarını kurmanın basit bir yolunu sunuyordu.

Airflow 3.0 ise bu data-awareness konseptini Assets ile bir sonraki seviyeye taşıyor. Datasets‘in üzerine inşa edilen Assets, temelinde mantıksal olarak ilişkili verilerin bir koleksiyonunu temsil ediyor. Yeni @asset dekoratörünü kullanarak, artık veri hatlarınızı doğrudan bu varlıkları tanımlayan fonksiyonlar etrafında şekillendirebiliyorsunuz.

Bir Asset‘in bir adı, verinin kaynağını veya hedefini belirten bir URI’si ve ek bilgileri olabilir. Bir fonksiyon, birden fazla Asset tanımlayabilir veya güncelleyebilir. Buradaki temel fayda ne mi? Artık DAG’lerimiz verinin hareketi etrafında şekilleniyor. Bu da onları hem daha verimli, hem de yazması ve anlaşılması daha kolay, data-aware DAG’ler haline getiriyor.

İşte basit bir @asset tanımı örneği:

from airflow.sdk import asset

@asset(schedule="@daily")
def poke_catcha():
    """
    Extracts a random set of quotes.
    """
    import requests

    r = requests.get(
        "<https://pokeapi.co/api/v2/pokemon/>"
    )
    pokemons = r.json()

    return pokemons

Olaylara Anında Tepki: Asset Watchers Geldi!

Assets sınıfının üzerine inşa edilen bir diğer güçlü konsept ise Asset Watchers.

Bu izleyiciler, bir mesaj kuyruğunu sürekli (ve asenkron olarak) dinleyerek yeni mesaj gelip gelmediğini kontrol ediyor. İşte bu, Airflow’da bugüne kadarki en güçlü event-driven scheduling yeteneği!

Airflow 3.0, kutudan çıktığı gibi Amazon SQS için tam destek sunuyor. Temeller atıldığına göre, gelecekteki sürümlerde diğer popüler mesaj kuyruklarının (Kafka, RabbitMQ vb.) desteğinin de hızla eklenmesi muhtemel. Artık dışarıdan bir olay (örneğin bir dosyanın yüklenmesi, bir veritabanı kaydının değişmesi) olduğunda Airflow DAG’inizi anında tetikleyebilirsiniz!

Makine Öğrenmesi Çıkarım (Inference Execution) Senaryolarına Destek

Yukarıda bahsettiğimiz event-driven scheduling, özellikle eğitilmiş bir Makine Öğrenmesi (ML) veya Yapay Zeka (AI) modelini kullanarak yeni veriler üzerinde tahminler yapma süreci olan Inference Execution için büyük avantaj sağlıyor. Artık çıkarım görevlerini, önceden tanımlanmış sabit zamanlamalara bağlı kalmadan, talep üzerine (örneğin yeni veri geldiğinde) çalıştırabilirsiniz. Bu event-driven scheduling yaklaşımı, kaynakların daha verimli kullanılmasını sağlar.

Ek olarak, Airflow 3.0’ın bir DAG çalıştırmasının mantıksal tarihi (logical_date) üzerindeki benzersizlik kısıtlamasını kaldırması, eş zamanlı birden fazla DAG çalıştırmasına izin veriyor. Bu event-driven yapı ve eş zamanlı çalıştırma yeteneği, özellikle birden fazla çıkarım isteğinin aynı anda işlenmesi gereken senaryolarda oldukça kullanışlıdır.

Gördüğünüz gibi Assets ve ilgili mekanizmalar, Airflow’u sadece görev zamanlayıcı olmaktan çıkarıp, daha dinamik, data-aware ve event-driven bir iş akışı platformuna dönüştürüyor.

Asset’leri Yönetmek:

Arayüzde yeni bir Assets görünümü var. Burada varlıklarınızı listeleyebilir, arayabilir, hangi gruba ait olduğunu, hangi DAG’in tükettiğini, hangi task’ın ürettiğini görebilirsiniz.

Bir Asset‘i tetikleyebilirsiniz. Bu iki anlama gelebilir:

  1. Materialize: Varlığı üreten yukarı akış DAG’ini/task’ını gerçekten çalıştırır.
  2. Manual Event: Sadece bir “bu varlık oluştu” olayı yaratır. Arkadaki mantığı çalıştırmaz. Aşağı akış bağımlılıklarını test etmek için süper!

Bir Asset‘e tıkladığınızda ise, DAG’lerdeki Graph View gibi, varlıklar arası bağımlılıkları gösteren bir görünüm açılır. Hangi varlık hangisini tetikliyor, hangi DAG’ler bu varlıkları üretiyor, hepsi burada.

Çoklu Asset Üretimi:

Bir fonksiyondan birden fazla Asset döndürmek isterseniz, @asset_multi dekoratörünü kullanıp outlets parametresinde varlıklarınızı tanımlayabilirsiniz.

Eski dost datasets kullanıyorsanız, artık Assets‘e geçme vakti gelmiş demektir. Özelliklerin çoğu burada da var ve datasets artık deprecated (yani ömrünü tamamlamış).

Backfill Çilesi Bitti mi Yoksa? UI ve API ile Geri Doldurma!

Ah backfill… CLI üzerinden yapmaya çalışırken çektiğimiz çileler… Artık BİTTİ! Airflow 3.0 ile DAG’lerinizi hem Arayüzden hem de API üzerinden geri doldurabilirsiniz!

Arayüzde bir DAG’e gidip “Run Backfill” butonuna basmanız yeterli. Karşınıza çıkan ekranda:

  • Hangi tarih aralığını dolduracağınızı seçin.
  • Davranışı belirleyin: Sadece eksikleri mi (missing runs only), eksik ve hatalıları mı (missing and errored runs), yoksa hepsini mi (all runs) çalıştırsın?
  • İsterseniz geriye doğru (run backwards) çalıştırın (en yeniden en eskiye).
  • Aynı anda kaç çalıştırma aktif olsun (number of active runs)?
  • Ve “Run Backfill” deyin!

Çalışan backfill süreçlerini ve geçmişte yaptıklarınızı yeni Backfills sekmesinde görebilirsiniz. Hangi çalıştırmanın backfill ile tetiklendiğini de özel bir ikon ve çalıştırma türü (run type: backfill) ile anlıyorsunuz.

API endpoint’leri de cabası! Sonunda backfill yapmak eziyet olmaktan çıkıyor. Ayrıca, backfill işleri artık scheduler‘ın bir parçası olduğu için ÇOK DAHA GÜVENİLİR çalışıyorlar.

DAG Bundles

Airflow 3.0 öncesini hatırlayın:

  • Task’lar hep en son DAG koduyla çalışırdı (versiyonlama yoktu).
  • Airflow sürekli DAG dosyalarını tarardı (ya güncellemeler gecikirdi ya da kaynak israfı olurdu).
  • Tüm DAG’ler tek bir dags klasöründeydi (organizasyon kabusu!).

Bir DAG Bundle:

  • DAG’ler ve diğer dosyaları içeren bir koleksiyondur.
  • DAG sürümlemesini destekler!
  • Task’ların belirli bir DAG kodu sürümüyle çalışmasını sağlar.
  • Airflow’un dosya tarama davranışını daha iyi kontrol etmenizi sağlar.
  • DAG’lerinizin birden fazla farklı kaynaktan gelmesini sağlar! (Git repo, yerel dosya sistemi, S3… gelecekte daha fazlası?)

Örneğin, bir Git reponuzdaki DAG’leri kullanmak için bir DAG Bundle tanımlayabilirsiniz:

# airflow.yaml veya config dosyanızda (örnektir)
dag_bundles:
  - name: "benim-projem-git"
    type: "git"
    git_connection_id: "my_git_connection"
    tracking_ref: "main" # Hangi branch/tag takip edilsin
    refresh_interval: 5 # Kaç saniyede bir kontrol etsin (opsiyonel)
  - name: "lokal-daglerim"
    type: "local_filesystem"
    path: "/opt/airflow/local_dags"

Yukarıdaki örnekte hem bir Git reposundan hem de yerel bir klasörden DAG’ler alıyoruz. İsterseniz birden fazla Git reposu bile tanımlayabilirsiniz! Bu özellik inanılmaz esneklik sağlıyor.

DAG Versioning: Geçmişi Kaybetmeye Son!

Versiyonlama olmadan yaşadığımız acıları hatırlayalım: Bir task ekleyip çalıştırdınız, sonra o task’ı sildiniz. Geçmiş çalıştırmalara baktığınızda o task sanki hiç var olmamış gibi görünürdü. Ya da DAG’i tamamen değiştirirdiniz, eski çalıştırmaların neye benzediğini anlamak imkansızlaşırdı.

DAG Versioning ile Bu Kabus Bitiyor!

Artık arayüzde bir DAG’e baktığınızda, farklı versiyonları arasında geçiş yapabilirsiniz:

  • V1’de hangi task’lar vardı?
  • V2’de yapı nasıldı?
  • V3’e nasıl geldik?

Her bir çalıştırmanın ve hatta her bir task örneğinin hangi DAG versiyonu ile çalıştığını görebilirsiniz. Kod görünümünde bile versiyonlar arasında geçiş yapıp o anki DAG kodunu inceleyebilirsiniz. Bu özellik MUHTEŞEM!

Önemli Notlar:

  • Bir DAG’in yeni bir versiyonu, ancak siz değişiklik yapıp çalıştırdıktan sonra oluşur. Sadece kod değiştirip çalıştırmazsanız yeni versiyon oluşmaz.
  • Şu an için Grid View’da versiyon bilgisi görünmüyor (sanırım yakında gelir).
  • Bir task’ı eski bir versiyonla çalıştırmak için versiyonlamayı destekleyen bir DAG Bundle kullanmanız gerekir.
  • API’de de versiyonlama ile ilgili yeni endpoint’ler var.

Event Driven Scheduling: Dış Dünya Airflow’u Tetiklesin!

İşte bu! Sonunda veri hatlarımızı harici olaylara göre tetikleyebiliyoruz! Nasıl mı? Asset Watchers ile!

Bir Asset Watcher, bir mesaj kuyruğunu (şimdilik sadece AWS SQS destekleniyor, ama yenileri yolda) sürekli dinler. Yeni bir mesaj geldiğinde, o mesajla ilişkili Asset‘e bağlı olan DAG’leri tetikler.

Senaryo:

  1. Harici bir sistem (veya siz) SQS kuyruğuna bir mesaj gönderir.
  2. Asset Watcher bu mesajı algılar.
  3. Bu Asset‘e göre zamanlanmış olan DAG tetiklenir!

Logical Date karmaşasına son

Eski Airflow’larda bir DAG’i @daily olarak zamanlayıp start_date‘i 1 Ocak verirseniz, ilk çalıştırma 1 Ocak gece yarısı için 2 Ocak gece yarısı başlardı. Yani hep bir sonraki periyodun başında, geçmiş aralık için çalışırdı. Bu kafa karıştırıcıydı.

Artık Değil! Airflow 3.0 ile DAG’ler tam olarak çalışmasını beklediğiniz anda çalışıyor!

  • start_date 1 Ocak, @daily schedule -> İlk çalıştırma 1 Ocak’ta başlar.
  • İkinci çalıştırma 2 Ocak’ta başlar.

Bu şu anlama geliyor: Bir çalıştırma için data_interval_start, data_interval_end ve logical_date artık aynı değere sahip! Yani 1 Ocak çalıştırması için hepsi 1 Ocak’tır. Bu, logical_date kullanımını çok daha sezgisel hale getiriyor.

DİKKAT! Eğer mevcut DAG’leriniz eski data_interval_start ve end mantığına dayanıyorsa, bu bir breaking change olabilir! Kodlarınızı gözden geçirmeniz gerekebilir.

Eski davranışa dönmek isterseniz, Airflow config’inde iki ayarı

  • [scheduler]schedule_after_task_execution
  • [core]catchup_by_default

true yapmanız gerekiyor.

Yeni Mimari

Airflow 2.x mimarisinde tüm bileşenler (scheduler, webserver, workers) doğrudan meta database‘e bağlanırdı. Bunun getirdiği sorunlar vardı:

  • Güvenlik Riskleri: Task’lar (yani potansiyel olarak kullanıcı kodu) veritabanına erişebiliyordu. Pek hoş değil!
  • Bağımsız Yükseltme Yok: Workers‘ı scheduler‘dan ayrı upgrade edemiyordunuz.
  • Ölçeklenebilirlik Sınırları: Veritabanına çok fazla bağlantı yük bindiriyordu (Merhaba PgBouncer!).
  • Uzaktan Çalıştırma Zorluğu: Task’ları farklı ağlarda çalıştırmak zordu.

Airflow 3.0 ile Sahne Değişiyor!

Karşınızda yeni oyuncu: API Server! Bu arkadaşın 3 ana görevi var:

  1. Kullanıcı Arayüzünü (UI) sunmak. (API Server olmadan UI yok!)
  2. Workers ve dolayısıyla task’lar için diğer Airflow bileşenleriyle konuşacak bir arayüz sağlamak.
  3. Genel Airflow REST API‘sini sunmak.

Ayrıca, DAG dosyalarını tarayan DAG file processor artık kendi başına ayrı bir bileşen olarak çalışıyor.

Bu Yeni Mimarinin Faydaları Neler?

  1. Task’lar artık doğrudan veritabanına erişmiyor! Bu hem güvenlik hem de ölçeklenebilirlik için harika. O “acaba worker bir halt karıştırır mı?” endişesiyle içilen acı kahve sayısı azalacak! Huzur geldi!
  2. Bileşenler ayrı ayrı upgrade edilebilir! Sonunda workers‘ı diğerlerinden bağımsız güncelleyebilirsiniz.
  3. Task’ları uzak, güvenilir ağlarda çalıştırmak mümkün! (Bir sonraki konuya bağlanıyoruz…)
  4. Potansiyel olarak farklı dillerde task çalıştırma imkanı! (Şu an sadece deneysel Golang var, ama gelecekte daha fazlası bekleniyor).

Kısacası, Airflow 3.0 ile “Veritabanına dokunan yanar!” dönemi resmen başladı ve bu hepimizin iyiliği için! 😎

EdgeExecutor: Task’ları İstediğiniz Yerden çalıştırın.

Yeni mimarinin getirdiği güzelliklerden biri de EdgeExecutor. Bu executor sayesinde task’larınızı Airflow’un ana kümesinden farklı, uzak ve güvenilir bir ağda/kümede çalıştırabilirsiniz!

Ana kümenizde scheduler, API server vb. çalışırken, task’larınız başka bir lokasyondaki workers üzerinde koşabilir. API server burada merkezi bir köprü görevi görerek ana bileşenlerle uzak workers arasındaki iletişimi sağlıyor. Bu, özellikle hibrit bulut veya farklı güvenlik bölgeleriyle çalışanlar için müthiş bir esneklik!

Çoklu Zamanlama:

Bazen bir DAG’in hem her saat başı hem de her Pazar gece yarısı çalışmasını istersiniz, değil mi? Artık mümkün! Yeni MultipleCronTriggerTimetable sayesinde bir DAG için birden fazla Cron ifadesi tanımlayabilirsiniz.

from airflow.timetables.trigger import MultipleCronTriggerTimetable

# At 1:10 and 2:40 each day.
@dag(schedule=MultipleCronTriggerTimetable(
	"10 1 * * *", "40 2 * * *", 
	timezone="UTC"), ...)
def example_dag():
    pass

İşte bu kadar basit!

Son Sözler

Vay be! Ne kadar çok yenilik var, değil mi? Arayüzden mimariye, DAG yazımından zamanlamaya kadar Airflow resmen çağ atlamış. Ben şahsen ÇOK heyecanlıyım!

Bu devasa güncellemede emeği geçen tüm committer’lara ve katkıda bulunan herkese kocaman bir teşekkür borçluyuz! Bu topluluk gerçekten harika.

Siz ne düşünüyorsunuz? Hangi özellik sizi en çok heyecanlandırdı? Kafanıza takılan bir şey var mı? Yorumlarda belirtin, hep birlikte öğrenelim, tartışalım.

Benim için bu yenilikleri sizinle paylaşmak büyük keyifti. Bir sonraki yazıda görüşmek üzere, kendinize iyi bakın ve kodunuz bug’sız olsun!

Kaynaklar:

https://airflow.apache.org/docs

Oğuz
Oğuz
http://www.oguzerdogan.com
Data Delivery Guy

Leave a Reply

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

We use cookies to give you the best experience. Cookie Policy