Integracja Git[Gitea] z Apache Airflow: Publikowanie i Zarządzanie DAG-ami
Apache Airflow przechowuje DAGi w katalogu 'dags'. Dokładną scieżkę do tego katalogu znajdziesz w pliku konfiguracyjnym Airflow.
[core]
dags_folder = /opt/airflow/dags
Może się zdarzyć, że po instalacji ten katalog nie zostanie automatycznie utworzony - musisz go stworzyć. W najprostszym scenariuszu, publikacja nowego workflow (Dag) polega na umieszczeniu pliku w tym katalogu. Niestety ale Apache Airflow nie umożliwia publikacji z poziomu webowego GUI. W środowisku produkcyjnym takie zarządznie plikami DAGów byłoby jednak uciążliwe, dlatego lepszym pomysłem jest wykorzystanie systemu Git. Git jest jednym z przykładów oprogramowania z klasy „rozproszonych systemów kontroli wersji” (Distributed Version Control Systems, DVCS).
Jeśli chcesz korzystać z systemu kontroli wersji, nie musisz ograniczać się do GitHub. Gitea, GitHub czy GitLab to serwery i GUI dla Git-a — korzystają z tych samych protokołów (SSH/HTTPS) i wywołują natywne komendy Git w tle, oferując interfejs webowy, zarządzanie użytkownikami, pull requesty i CI/CD, ale same nie zastępują Git-a. Dla naszych testów użyjemy lokalnego servera Git; Gitea.
Przygotowanie repozytorium w Gitea
W poniższych przykładach używać będziemy lokalnego serwera Gita zainstalowanego w środowisku WSL. Wcześniej jednak musimy zainstalować pakiet Git w środowisku WSL:
sudo apt update
sudo apt install git -y
Następnie tworzymy katalog (i podkatalogi) w którym będzie zainstalowana Gitea i nadajemy uprawnienia:
sudo mkdir -p /opt/gitea/{custom,data,log}
sudo chown -R $USER:$USER /opt/gitea
chmod -R 750 /opt/gitea
Teraz pobieramy binarkę ze strony Gitea, nadajemy plikowi prawo wykonywania i przenosimy go do docelowego katalogu:
wget -O gitea https://dl.gitea.io/gitea/1.24.6/gitea-1.24.6-linux-amd64
chmod +x gitea
sudo mv gitea /opt/gitea/
Tworzymy skrypt startujący Gitea; plik opt/gitea/start_gitea.sh:
#!/bin/bash
# -----------------------------
# Start Gitea in WSL
# -----------------------------
# Ścieżki
GITEA_BIN=/opt/gitea/gitea
GITEA_CUSTOM=/opt/gitea/custom
GITEA_WORK_DIR=/opt/gitea/data
LOG_DIR=/opt/gitea/logs
mkdir -p $LOG_DIR
# Funkcja uruchamia proces w tle jeśli nie działa
run_if_not_running() {
local name=$1
local cmd=$2
if pgrep -f "$cmd" > /dev/null; then
echo "$name is already running."
else
echo "Starting $name..."
nohup $cmd > $LOG_DIR/${name}_$(date +%Y%m%d_%H%M%S).log 2>&1 &
fi
}
# Eksport zmiennych środowiskowych
export GITEA_CUSTOM
export GITEA_WORK_DIR
# Start Gitea webserver
run_if_not_running "gitea-web" "$GITEA_BIN web --port 3000"
echo "Gitea startup script finished. Web UI: http://localhost:3000"
Za każdym razem kiedy będzie startować WSL, będzie uruchamiana Gitea - dodajemy więc komendę wykonującą ten plik do pliku bashrc:
nano ~/.bashrc
/opt/gitea/start_gite.sh
Organizacja struktury katalogów po stronie Airflow
W praktyce często w produkcji nie trzyma się wszystkich DAG-ów w jednym repozytorium. Wiele organizacji stosuje wielorepozytoryjne podejście, szczególnie gdy:
-
Istnieje wiele zespołów odpowiedzialnych za różne workflowy,
-
Niektóre DAG-i są powiązane z różnymi projektami lub systemami,
-
Chcesz oddzielić środowisko produkcyjne od testowego / eksperymentalnego.
W takim podejściu każdy podzespół/projekt miałby swoje repozytorium DAG-ów np. airflow-dags-finance, airflow-dags-marketing, airflow-dags-utilities. Po stronie Airflow istniałby zatem trzy katalogi zasilane z trzech różnych źródeł:
dags/
├── finance/ -> git clone git@gitea:finance-dags.git
├── marketing/ -> git clone git@gitea:marketing-dags.git
└── utilities/ -> git clone git@gitea:utilities-dags.git
W naszych przykładach będziemy kożystać z jednego repozytorium Git.
Przykladowy kod DAG
Potrzebujemy jakiegos flow DAG do naszych testów. Poniżej trywialny skrypt który odczytuje aktualną datę, godzinę i zapisuje ją do pliku tekstowego (zapisz_date_do_pliku_dag.py):
from airflow.decorators import dag, task
from datetime import datetime
import os
from airflow.models import Variable
@dag(
dag_id="zapisz_date_do_pliku_dag",
start_date=datetime(2025, 10, 1),
schedule="@daily",
catchup=False,
tags=["utils", "example"],
)
def zapisz_date_dag():
@task()
def zapisz_date_do_pliku():
try:
output_path = Variable.get("output_path", default_var="/mnt/g/ApacheAirflow/out")
os.makedirs(output_path, exist_ok=True)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(os.path.join(output_path, "data_i_godzina.txt"), "w") as f:
f.write(now + "\n")
except Exception as e:
raise Exception(f"Błąd podczas zapisu do pliku: {str(e)}")
zapisz_date_do_pliku()
zapisz_date_dag()
Sprawdzanie poprawnosci kodu DAG
DAG może zawierać błędy. Przed publikacją DAGa musimy upewnić się że składnia jest poprawna. Oczywiście możesz pominąć ten proces jeśli masz pewność że DAG jest poprawnie napisany.
Na maszynie na której piszemy kod
Zazwyczaj dzielimy ten proces na dwa etapy - rozpoczynamy od poprawności składni. Na maszynie na której piszemy kod wykonujemy polecenie:
python -m py_compile zapisz_date_do_pliku_dag.py
A to oznacza „skompiluj plik źródłowy Pythona do bajtkodu (.pyc), nie uruchamiając go.” W efekcie powstanie katalog '__pycache__' w którym znajdzie się skompilowany plik. Gdyby kod zawierał błędy skłądni, otrzymasz odpowiednie ostrzeżenie. Możesz też sprawdzić wiele plików jednocześnie (cały folder) poleceniem:
python -m compileall .
Na serwerze Airflow
By wykonać drugi test, musimy posiadać moduły Airflow. Skopiuj plik do jakiegokolwiek katalogu na serwerze Airflow a następnie uruchom polecenia:
#WSL - aktywuj wirtualne środowisko Python
source /opt/venvs/airflow_venv/bin/activate
#Test importu
python -c "from zapisz_date_do_pliku_dag import dag"
To tylko test importu - nie zapisuje nic do bazy danych Airflow, nie rejestruje DAG-a i nie uruchamia żadnych zadań. Airflow Scheduler robi potem to samo (import), ale dodatkowo:
-
rejestruje obiekty
DAGw bazie metadanych, -
analizuje ich zależności i harmonogramy,
-
pokazuje je w UI.
Jeśli DAG jest poprawny, powyższe polecenie nie zwróci niczego.
Przygotowanie repozytorium na serwerze Gitea
Utwórz użytkownika który będzie właścicielem repozytorium Git (Gitea nie pozwala tworzyć administratorowi repozytóriów w imieniu użytkowników, których właścicielami będą użytkownicy). Zaloguj się jako użytkownik i wybierz opcje "Repositories => New Repository". W zasadzie wszystko co musisz wpisać w polach okna tworzenia nowego repozytorium to jego nazwa i kliknąć "Create Repository":

Czym jest 'branch'
Branch (gałąź) to po prostu nazwa wskaźnika do konkretnego 'commita' (patrz niżej) w historii repozytorium. Inaczej mówiąc — to „ścieżka rozwoju” Twojego kodu. Repozytorium może mieć dowolną liczbę gałęzi, np.:
| Branch | Cel |
|---|---|
main |
główna, stabilna wersja (wcześniej nazywana master) |
develop |
wersja robocza do integracji funkcji |
feature/login |
nowa funkcja logowania |
bugfix/email |
poprawka błędu w wysyłce e-maili |
release/v1.0 |
przygotowanie wersji 1.0 do wydania |
Jeśli chcesz utworzyć nowy lub wykonać inne operacje na branch'ach, skorzystaj z tego menu:

Czym jest 'commit'
Commit – to migawka kodu, czyli zapis stanu projektu w danym momencie. Każdy commit tworzy kolejną wersję kodu. Dzięki organizacji repozytorium w 'commits' masz możliwość np. powrotu do poprzedniej wersji kodu; każda publikacja nowej wersji powoduje automatycznie przesunięcie obecnej do historii. Nawet powrót nie powoduje usunięcia kodu; przywrócenie kodu powoduje nałożenie obecnego nową ('starą') wersją.
Umieszczenie pliku DAGa w repozytorium Git
Publikacja pliku DAG odbywa się porzez wybranie opcji 'Add File => Upload File". W menu upload'u mamy możliwość wybrania utworzenia katalogu w którym znajdzie się docelowo plik (w ramach wybranego branch), opisania naszego commit (nadanie tych etykiet jest dobrą praktyką byśmy później wiedzieli czego dotyczy ta wersja commit'u):

Klikamy na 'Commit Changes' i gotowe - nasz DAG jest w repozytorium Git.
Klonowanie repozytorium w Apache Airflow
Klonowanie repozytorium w Git oznacza skopiowanie całego repozytorium zdalnego na serwer Airflow. Domyślnie jest to skopiowanie wszystkiego, łącznie ze wszystkimi commitami, branchami i historią zmian. Domyślnie nie potrzebujemy historii - wystarczy nam ostatni commit. Zatem w poleceniu ograniczamy takie kopiowanie opcją '--depth 1':
git clone --depth 1 http://user:haslo@localhost:3000/airflow/airflow.git dags
Gdybyśmy jednak chcieli mimo wszystko pobrać wszystko, usuwamy filtr:
git clone http://user:haslo@localhost:3000/airflow/airflow.git .
W efekcie wykonania tego polecenia na ekranie pojawi się informacja podobna do tej:
Cloning into '.'...
remote: Enumerating objects: 36, done.
remote: Counting objects: 100% (36/36), done.
remote: Compressing objects: 100% (28/28), done.
remote: Total 36 (delta 9), reused 0 (delta 0), pack-reused 0
Receiving objects: 100% (36/36), 4.46 KiB | 4.46 MiB/s, done.
Resolving deltas: 100% (9/9), done.
Sprawdzanie czy katalog nie korzysta już z repozytorium Git (zwraca 'true' jeśli tak):
git rev-parse --is-inside-work-tree
Pobieranie świeżych commit'ów (np. po umieszczeniu kolejnej wersji pliku DAG w repozytorium Git):
git pull origin main
I rezultat wykonania polecenia:
remote: Enumerating objects: 5, done.
remote: Counting objects: 100% (5/5), done.
remote: Compressing objects: 100% (3/3), done.
remote: Total 3 (delta 1), reused 0 (delta 0), pack-reused 0
Unpacking objects: 100% (3/3), 330 bytes | 165.00 KiB/s, done.
From http://localhost:3000/airflow/airflow
* branch main -> FETCH_HEAD
(airflow_venv) user@HPLO-LsXEPiJEWA:/opt/airflow/dags/test$
Updating c96ddf6..778ed19
Fast-forward
zapisz_date_i_godzine_dag.py | 12 ++----------
1 file changed, 2 insertions(+), 10 deletions(-)
DAG w menu Airflow
Umieszczając kod DAG w repozytorium upewnij się że 'start_date' nie jest przeszłą datą - w takim przypadku import nie powiedzie się. Zakładając że import DAG przebiegł pomyślnie, w GUI Airflow, w menu 'Dags' zobaczysz go na liście. Menu Airflow nie umożliwia wyświetlenia DAGów które są w danym podkatalogu 'dags' (używanym do importu Dagów). Informacji z którego podkatalogu pochodzi dany DAG zobaczysz jedynie w szczegółach Daga (Details => File Location). Dlatego dobrą praktyką jest nadawanie tagów; opcja 'tags'. Nasz Dag ma dwa tagi: tags=["utils", "example"]
Możesz wyświetlać listę Dagów w menu Dag korzystając z opcji wyszukiwarki lub filtrując po tagach:

Dodatkowy filtry to wyświetlanie Dagów aktywnych lub nie, ulubionych lub nielubianych.
Modyfikacje kodu, zmiana harmonogramu wykonania
Apache Airflow nie daje żadnych możliwości zmiany kodu DAGa z poziomu webowego GUI. Jeśli chcesz dokonać modyfikacji, musisz zmienić plik w edytorze i raz jeszcze wgrać go do katalogu 'dags'. Airflow powinien automatycznie pobrać nową wersję pliku i zaktualizować DAG.
Tak samo wygląda zmiana harmonogramu; dokonać tego można jedynie zmieniając plik i importując go ponownie.
Aktywowanie i dezaktywowanie DAGów
Domyślnie DAG jest włączony, chyba że ustawisz paramentr w kodzie by był wyłączony podczas importu (is_paused_upon_creation=True). Możesz go w każdej chwili wyłączyć lub włączyć ponownie po wyłączeniu przesuwając przełącznik po prawej stronie na liście DAGów (po lewej stronie kolumny 'dag ID'). Wyłączony DAG jest usuwany z harmonogramu schedulera - nie wykonuje się gdy pozostaje wyłączony.
Log z każdego wykonania DAGu znajdziesz w szczegółach DAG, zakładka 'Runs'.