Apache Flink ile Streaming Veri Transferi

Cansu Candan
4 min readNov 8, 2021

Bugün stream verilerinin işlenmesinde sıklıkla kullanılan Apache Flink’ten bahsedeceğim. Günümüzde mobil cihazlardan, girdiğimiz web sitelerinden, sensörlerden, kullanıcının etkileşime girdiği çoğu teknolojiden veri akışı sağlanmaktadır. “Yeni altın” veya “yeni petrol” hangi anlamda ifade edilirse edilsin veri artık günümüzde değerli bir hazine olarak yerini almış durumda.

Big Data dünyasında kendimi geliştirirken Apache Flink ile karşılaştım. Nedir bu Flink derken basit örneklerle Flink’e bir giriş yaptım. Apache Flink’ten kısaca bahsettikten sonra Kafka’dan veri okuyup HDFS’e veri yazma işlemini değineceğim. Lafı uzatmadan Flink nedir bir göz atalım.

Apache Flink Nedir?

Apache Flink akan verileri işlemeyi sağlayan yani streaming yapan çekirdeği Java ve Scala ile yazılmış dağıtık bir veri akış platformudur. Flink cluster ortamlarında, bellek içi hızda ve herhangi ölçeklendirmede çalışabilir.

Flink dağıtık bir sistemdir ve bağımsız bir cluster olarak çalışabileceği gibi Hadoop YARN, Apache Mesos ve Kubernetes gibi yaygın cluster kaynak yöneticileriyle de çalışabilir. Flink’in özel operatörleri sayesinde her türlü stream verileri işleyebilir. Flink sınırlı, sınırsız, gerçek zamanlı veya kayıtlı verileri işleyebilme kapasitesine sahiptir.

Veri işlemede Flink ile rekabet içinde olan Spark da bulunmaktadır. Spark da streaming işlemlerini kolayca gerçekleştirebilir ancak tam bir stream gerçekleştirdiği söylenemez. Spark, Flink’in aksine verileri batch olarak işler. Ancak Spark’ın Scala ve Python dilleri ile kolayca entegre edilmesi Spark’ın geliştiriciler tarafından biraz daha sevilmesini sağlamıştır.

Apache Flink vs Apache Spark Streaming

  • Her ikisi de Apache Foundation organizasyonu tarafından geliştirilmiş açık kaynaklı araçlardır.
  • Her biri tek başına kullanılabilir ancak genellikle Hadoop YARN, Hadoop HDFS, Apache Kafka gibi büyük veri ortamlarıyla entegre edilirler.
  • Spark hem batch hem de stream olarak veriyi işler, ancak Flink salt bir streaming gerçekleştirir. Yani Flink tam streaming yaparken, gelen verileri anında işleyebilirken, Spark kullanıcının ihtiyacına bağlı olarak saniyelik, dakikalık, saatlik gibi istenilen sürede veriyi alır ve işler.
  • Bir kaynağa göre Spark’ın lanse edilen hız özelliği programları bellekte Hadoop MapReduce’dan 100 kata kadar veya diskte 10 kata kadar daha hızlı çalıştırılabildiği ile öne çıkmaktadır. Flink batch processing işleminde benzer performans sergilerken, streaming işleminde daha düşük gecikme sergiler.
  • Flink ve Spark, daha kolay, daha hızlı ve daha akıllı veri işleme özellikleri sunmak için günden güne geliştirilmeye devam ettiğini görüyoruz.
  • Sonuç olarak en iyi framework sorusuna yanıt ise “ihtiyaçlarınıza hangisi daha uygun?” sorusuna bağlıdır.

Apache Flink ile HDFS’e Veri Transferi

Bu örnekte ilk olarak Kafka topik ortamına veri ekleyip ardından HDFS ortamına transferini gerçekleştireceğiz.

İlk olarak gerekli kütüphaneleri import etmemiz gerekiyor.

import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.core.fs.Path;

App adında bir class oluşturduk. Ben IntellijIdea IDE’sini kullanarak Java ortamında programı gerçekleştirdim. getExecutionEnvironment() ile programı local ortamda yürütecek bir ortam hazırlar. checkpoint ile ise Kafka’da bulunan son kaldığımız yerden itibaren veriyi almamıza sağlar. Kafka’da veri okumak amacıyla bir consumer oluşturulur. Producer kullanarak oluşturduğumuz topiğe veri atacağız. Son olarak ise belirttiğimiz path’e StreamingFileSink ile veri transfer etme işlemini gerçekleştiriyoruz.

public class App {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// checkpoint
env.enableCheckpointing(10_000);
env.setStateBackend((StateBackend) new FsStateBackend("file:///tmp/flink/checkpoints"));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ExecutionConfig executionConfig = env.getConfig();
executionConfig.disableForceKryo();
executionConfig.enableForceAvro();
// HDFS E YAZMA KISMI
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my_topic_name", new
SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);stream.map(new MapFunction<String, String>() {
private static final long
serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return
"Stream Value: " + value;
}
}).print();
RollingPolicy<String, String> rollingPolicy =
DefaultRollingPolicy.create().withRolloverInterval(15_000) .build();
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("hdfs://localhost:9000/projects"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build();
stream.addSink(sink);env.execute();// HDFS'E YAZMA KISMI BURADA BİTTİ

İlgili bağımlılıklar aşağıdaki gibidir:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
</dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>1.11.4</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-parquet -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.0</version>
</dependency>

Kafka’da topic oluşturma:

./kafka-topics.sh --create --topic my_topic_name --bootstrap-server localhost:9092

Kafka topiğe veri atma işlemi aşağıdaki gibidir. Burada veri üretme işlemi olacağı için producer kullanıyoruz.

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_name

Sonuç:

Apache Flink salt bir stream işlemlerini gerçekleştirebilir. Yani real time verileri işleyebilir. Aynı zamanda verileri başka bir ortama taşıyabilmeyi de sağlar. Hız konusunda oldukça ön plana çıkar. Dağıtık çalışabilmesi sebebiyle performansta ciddi iyileşmeler gözlemlenir. Çoğu e-ticaret firmaları müşterilerine hızlı çıktılar sunabilmek için Apache Flink’i kullanmaktadır.

Bir problem olması durumunda benimle bağlantı kurmaktan veya daha iyi bir çözümünüz var ise benimle paylaşmaktan çekinmeyin. 😉

Kaynaklar:

https://tr.theastrologypage.com/importance-apache-flink-processing-streaming-data

--

--