Tworzymy pierwszą transformację w Apache Flink
Programy i przepływy danych
Podstawowymi programami w Apache Flink są strumienie i transformacje. Koncepcyjnie strumień jest (potencjalnie niekończącym się) przepływem rekordów danych, a transformacja jest operacją, która pobiera jeden lub więcej strumieni wejściowych (źródło) i w rezultacie generuje jeden lub więcej strumieni wyjściowych. W krokach opisanych poniżej stworzymy transformację - batch który pobierze informacje z jednego źródła i zapisze informacje do innego miejsca (nazywanych we Flink 'sinks').
Język i środowisko
Programy Flink można pisać w trzech językach; Scala, Java, Python. Ponieważ Flink napisany jest w Scala, ten język jest najbardziej naturalny. Kodowanie w Java jest także wygodne. Pisanie programów Flik w Python nie jest zbyt wygodne (wymagany interfejs miedzy programem Python a Flink).
Nasz pierwszy program napiszemy w Java. Środowiskiem w którym go stworzymy będzie Eclipse IDE. Program można pobrać ze strony Eclipse - https://www.eclipse.org/ide/.
Opis instalacji i konfiguracji Flink znajdziesz we artykule "Uruchomienie Apache Flink w Windows"
Tworzymy projekt w Eclipse IDE
Uruchom Eclipse i wybierz New => Other => Java Project
Po wybraniu "Java Project" wciśnij "Next". W następnym oknie, "New Java Project" nadaj nazwę projektowi, np. 'read-and-write' i odhacz "Create module-info.java file'. Kliknij "Next". Program Flink będzie wymagać "Execution environment JRE" w wersji 11 - upewnij się wcześniej że posiadasz taką wersję Java na komputerze.
Wybierz zakładkę "Libraries", wybierz "Classpath" [wstawione JARy muszą być pod 'Classpath'] myszką a następnie kliknij na "Add External JARs..."
Upewnij się raz jeszcze że JARy wkleiły się pod 'Classpath'. W oknie które się otworzy, "JAR selection", wybierz pliki JAR z katalogu Flink [lib] - to może być np. "C:\Flink\flink-1.15.1\lib". Kliknij "Finish".
Tworzymy klasę i kod
Następnie utwórz klasę; ustaw kursor na katalogu "src" projektu i prawym przyciskiem myszy wybierz "New => Class"
Nadajemy nazwę naszej klasie, np. "ReadAndWrite" i klikamy "Finish":
Następnym krokiem jest utworzenie kodu klasy. Co robi ten program? Jest banalny; odczytuje dane z pliku i ładuje je do tabeli Flink (nie mylić z tabelą bazy danych). Następnie dane z tej tabeli zapisuje do pliku. Dane nie są w żaden sposób modyfikowane. Celem jest pokazanie jak odczytać dane i jak je zapisać.
Oto jego zawartość [wklej ten kod do klasy "ReadAndWrite" - zastąp ten który jest tam domyślnie]:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class ReadAndWrite {
public static void main(String[] args) throws Exception {
//Definiuje wszystkie parametry, które inicjują środowisko tabeli.
//Te parametry są używane tylko podczas tworzenia inicjalizacji TableEnvironment i nie można ich później zmienić.
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
//Tworzymy tablele i ladujemy do niej dane z pliku
tEnv.executeSql("CREATE TABLE csv_in_table (csv_column_name1 STRING, csv_column_name2 DOUBLE) WITH ('connector' = 'filesystem', 'path' = 'file:///C:/Temp/flink_source.txt', 'format' = 'csv', 'csv.field-delimiter' = ';')");
tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM csv_in_table")
.execute()
.print();
//Zapisujemy dane do pliku
tEnv.executeSql("CREATE TABLE sink_table ("
+ " sink_column_nameA STRING, "
+ " sink_column_nameB DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/Temp/flink_output', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file'"
+ " )");
tEnv.executeSql("INSERT INTO sink_table SELECT csv_column_name1, csv_column_name2 from csv_in_table");
//Testowy select
tEnv.sqlQuery("SELECT COUNT(*) AS sink_table_result FROM sink_table")
.execute()
.print();
}
}
Plik wejściowy, "flink_source.txt" może mieć przykładową zawartość:
aa; 23
bb; 657.9
cc; 55.1
Test kodu
Zanim uruchomisz ten kod na platformie Flink, możesz testowo wykonać go w środowisku Eclipse. Zacznij od tego gdyż Eclipse zdecydowanie lepiej debuguje kod i otrzymasz jaśniejsze objaśnienie ewentualnych błędów.
Zapisz kod klikając na dyskietkę na pasku programu i uruchom kod: Run => Run
Jeśli wszystko przebiegnie pomyślnie, w konsoli zobaczysz log [dół ekranu Eclipse IDE]:
+----------------------+
| Table1_result |
+----------------------+
| 3 |
+----------------------+
+----------------------+
| fs_table_result |
+----------------------+
| 3 |
+----------------------+
1 row in set
Ewentualne błędy "WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleane" mogą się pojawić jeśli używasz wersji 1.15.1 i niższej [nie mają wpływu na wykonanie programu]. W wyższych wersjach nie powinny się już pojawić.
Eksport do JAR
Następnym krokiem jest zbudowanie JAR który wykonany we Flink. Kliknij prawym przyciskiem myszy na "ReadAndWrite.java" w lewym oknie Eclipse [pod 'default package']. Wybierz "Export" z menu. Upewnij się że w oknie "Export" wybrana jest pozycja "JAR file" pod "Java". Kliknij "Next". W następnym oknie zaznacz pozycje ".classpath" i ".project" oraz wybierz gdzie ma byc zapisany plik. Kliknij "Next". W następnym oknie pozostaw domyślny wybór i kliknij "Next". W następnym oknie, "JAR Manifest specification" kliknij na "Browse" (Main class) i zatwierdz wybor klasy ["ReadAndWrite"] poprzez kliknięcie na "OK". Kliknij na "Finish". Twój program jest gotowy!
Uruchomienie programu we Flink
Programem kóry wykona nasz JAR jest "flink" katalogu "bin". Wykonaj polecenie:
flink run c:/temp/ReadAndWrite.jar
Rezultat będzie podobny do tego który zobaczyłeś w Eclipse. Sprawdź wykonanie programu w GUI Flink [http://localhost:8081/]
Nasz program zapisze plik do katalogu "C:/Temp/flink_output". Może to być więcej plików niż tylko jeden. Zależy to od ilości zadań których użyje Flink. Możesz wymusić wykonanie wszystkiego w jednym task ustawiając parametr 'sink.parallelism' = '1', np:
tEnv.executeSql("CREATE TABLE sink_table ("
+ " sink_column_nameA STRING, "
+ " sink_column_nameB DOUBLE "
+ " ) WITH ( \n"
+ " 'connector'='filesystem', "
+ " 'path'='file:///C:/Temp/flink_output', "
+ " 'format'='csv', "
+ " 'sink.partition-commit.delay'='1 s', "
+ " 'sink.partition-commit.policy.kind'='success-file', "
+ " 'sink.parallelism' = '1'"
+ " )");
Gratulacje - napisałeś swój pierwszy program dla Flink!