Tworzymy pierwszą transformację w Apache Flink

Programy i przepływy danych

Podstawowymi programami w Apache Flink są strumienie i transformacje. Koncepcyjnie strumień jest (potencjalnie niekończącym się) przepływem rekordów danych,  a transformacja jest operacją, która pobiera jeden lub więcej strumieni wejściowych (źródło) i w rezultacie generuje jeden lub więcej strumieni wyjściowych. W krokach opisanych poniżej stworzymy transformację - batch który pobierze informacje z jednego źródła i zapisze informacje do innego miejsca (nazywanych we Flink 'sinks').

Język i środowisko

Programy Flink można pisać w trzech językach; Scala, Java, Python. Ponieważ Flink napisany jest w Scala, ten język jest najbardziej naturalny. Kodowanie w Java jest także wygodne. Pisanie programów Flik w Python nie jest zbyt wygodne (wymagany interfejs miedzy programem Python a Flink).

Nasz pierwszy program napiszemy w Java. Środowiskiem w którym go stworzymy będzie Eclipse IDE. Program można pobrać ze strony Eclipse - https://www.eclipse.org/ide/.

Opis instalacji i konfiguracji Flink znajdziesz we artykule "Uruchomienie Apache Flink w Windows"

Tworzymy projekt w Eclipse IDE

Uruchom Eclipse i wybierz New => Other => Java Project

Po wybraniu "Java Project" wciśnij "Next". W następnym oknie, "New Java Project" nadaj nazwę projektowi, np. 'read-and-write' i odhacz "Create module-info.java file'. Kliknij "Next". Program Flink będzie wymagać "Execution environment JRE" w wersji 11 - upewnij się wcześniej że posiadasz taką wersję Java na komputerze.

Wybierz zakładkę "Libraries", wybierz "Classpath" [wstawione JARy muszą być pod 'Classpath'] myszką a następnie kliknij na "Add External JARs..."

Upewnij się raz jeszcze że JARy wkleiły się pod 'Classpath'. W oknie które się otworzy, "JAR selection", wybierz pliki JAR z katalogu Flink [lib] - to może być np. "C:\Flink\flink-1.15.1\lib". Kliknij "Finish".

Tworzymy klasę i kod

Następnie utwórz klasę; ustaw kursor na katalogu "src" projektu i prawym przyciskiem myszy wybierz "New => Class"

Nadajemy nazwę naszej klasie, np. "ReadAndWrite" i klikamy "Finish":

Następnym krokiem jest utworzenie kodu klasy. Co robi ten program? Jest banalny; odczytuje dane z pliku i ładuje je do tabeli Flink (nie mylić z tabelą bazy danych). Następnie dane z tej tabeli zapisuje do pliku. Dane nie są w żaden sposób modyfikowane. Celem jest pokazanie jak odczytać dane i jak je zapisać.

Oto jego zawartość [wklej ten kod do klasy "ReadAndWrite" - zastąp ten który jest tam domyślnie]:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class ReadAndWrite {
    
public static void main(String[] args) throws Exception {
        
        //Definiuje wszystkie parametry, które inicjują środowisko tabeli. 
        //Te parametry są używane tylko podczas tworzenia inicjalizacji TableEnvironment i nie można ich później zmienić.
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                //.inStreamingMode()
                .inBatchMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        
        //Tworzymy tablele i ladujemy do niej dane z pliku
        tEnv.executeSql("CREATE TABLE csv_in_table (csv_column_name1 STRING, csv_column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/Temp/flink_source.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')");
        
        tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM csv_in_table")
        .execute()
        .print();
        
        //Zapisujemy dane do pliku
        tEnv.executeSql("CREATE TABLE sink_table ("
                + "  sink_column_nameA STRING, "
                + "  sink_column_nameB DOUBLE "
                + "  ) WITH ( \n"
                + "  'connector'='filesystem', "
                + "  'path'='file:///C:/Temp/flink_output', "
                + "  'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file'"
                + "   )");
        
        tEnv.executeSql("INSERT INTO sink_table SELECT csv_column_name1, csv_column_name2 from csv_in_table");
        
        //Testowy select
        tEnv.sqlQuery("SELECT COUNT(*) AS sink_table_result FROM sink_table")
        .execute()
        .print();
        
     }
}

Plik wejściowy, "flink_source.txt" może mieć przykładową zawartość:

aa; 23 
bb; 657.9
cc; 55.1

Test kodu

Zanim uruchomisz ten kod na platformie Flink, możesz testowo wykonać go w środowisku Eclipse. Zacznij od tego gdyż Eclipse zdecydowanie lepiej debuguje kod i otrzymasz jaśniejsze objaśnienie ewentualnych błędów.

Zapisz kod klikając na dyskietkę na pasku programu i uruchom kod: Run => Run

Jeśli wszystko przebiegnie pomyślnie, w konsoli zobaczysz log [dół ekranu Eclipse IDE]:

+----------------------+
|        Table1_result |
+----------------------+
|                    3 |
+----------------------+
+----------------------+
|      fs_table_result |
+----------------------+
|                    3 |
+----------------------+
1 row in set

Ewentualne błędy "WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleane" mogą się pojawić jeśli używasz wersji 1.15.1 i niższej [nie mają wpływu na wykonanie programu]. W wyższych wersjach nie powinny się już pojawić.

Eksport do JAR

Następnym krokiem jest zbudowanie JAR który wykonany we Flink. Kliknij prawym przyciskiem myszy na "ReadAndWrite.java" w lewym oknie Eclipse [pod 'default package']. Wybierz "Export" z menu. Upewnij się że w oknie "Export" wybrana jest pozycja "JAR file" pod "Java". Kliknij "Next". W następnym oknie zaznacz pozycje ".classpath" i ".project" oraz wybierz gdzie ma byc zapisany plik. Kliknij "Next". W następnym oknie pozostaw domyślny wybór i kliknij "Next". W następnym oknie, "JAR Manifest specification" kliknij na "Browse" (Main class) i zatwierdz wybor klasy ["ReadAndWrite"] poprzez kliknięcie na "OK". Kliknij na "Finish". Twój program jest gotowy!

Uruchomienie programu we Flink

Programem kóry wykona nasz JAR jest "flink" katalogu "bin". Wykonaj polecenie:

flink run c:/temp/ReadAndWrite.jar

Rezultat będzie podobny do tego który zobaczyłeś w Eclipse. Sprawdź wykonanie programu w GUI Flink  [http://localhost:8081/]

Nasz program zapisze plik do katalogu "C:/Temp/flink_output". Może to być więcej plików niż tylko jeden. Zależy to od ilości zadań których użyje Flink. Możesz wymusić wykonanie wszystkiego w jednym task ustawiając parametr 'sink.parallelism' = '1', np:

        tEnv.executeSql("CREATE TABLE sink_table ("
                + "  sink_column_nameA STRING, "
                + "  sink_column_nameB DOUBLE "
                + "  ) WITH ( \n"
                + "  'connector'='filesystem', "
                + "  'path'='file:///C:/Temp/flink_output', "
                + "  'format'='csv', "
                + "  'sink.partition-commit.delay'='1 s', "
                + "  'sink.partition-commit.policy.kind'='success-file', "
              + " 'sink.parallelism' = '1'"
                + "   )");

Gratulacje - napisałeś swój pierwszy program dla Flink!