Kafka Nedir? .Net Core ile Kullanımı

Apache Kafka, yüksek hacimli gerçek zamanlı (real-time) veri akışlarını güvenli ve hızlı bir şekilde işlemek için kullanılan dağıtık bir message queue sistemidir.

İlk olarak Linkedin tarafından geliştirilmiş ve sonrasında Apache Software Foundation çatısı altında open source haline getirilmiştir.

Ne zaman kullanmalıyız?

  • Gerçek zamanlı veri işleme
  • Mikroservisler arası iletişim
  • Log toplama
  • Event sourcing
  • Stream processing

Kafka Mimarisi

via https://www.cloudduggu.com/kafka/architecture/

Kafka, aşağıdaki bileşenlerden oluşmaktadır:

Producer: Data’yı Kafka’ya gönderen bileşendir.

Consumer: Kafka’daki datayı okuyan bileşendir.

Kafka Broker: Kafka’nın kendisidir, mesajları alır, saklar ve dağıtır. Her broker, Kafka cluster’ının bir parçasıdır.

Topic: Mesajların kategorize edildiği yapıdır. (Örnek: order-created, order-status-changed)

Partition: Topic ‘ler bölünerek dağıtılabilir, bu da consumer lar tarafından datanın paralel işlemesini sağlar. Ayrıca Kafka, bir partition aynı anda sadece bir consumer tarafından okunabilir kuralına uyar. Ama bir consumer, birden fazla partition okuyabilir.

Offset: Her consumer, okuduğu son mesajın “offset” numarasını tutar.

Zookeeper: Dağıtık sistemlerin koordinasyonunu sağlamak için kullanılan bir servis olup, özellikle birden fazla instance içeren sistemlerin yapılandırılması ve senkronize çalışması için gereklidir. Kafka Cluster’da node durumunu izlemek, topic ve mesajların listesini tutmak için kullanılır. Bu nedenle Kafka kurulumundan önce Zookeper ‘ın kurulması zorunludur.

Şimdi .Net Core ile Kafka kullanımıyla ilgili yapacağımız örnek için gerekli kurulumları yapalım.

Kafkayı Docker ‘da Çalıştırma

Zookeeper ve Kafka kurulumu için docker-compose.yml dosyamızın içeriği:

YAML
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Cmd veya git bash ile docker-compose up -d komutunu çalıştırın. Zookeeper ve Kafka imajları indirildikten sonra kurulacaktır. Aşağıdaki gibi Docker Desktop tada 2 servisinde ayağa kalktığını görebilirsiniz.

Producer ve Consumer uygulamalarımızı yazalım.

Hava Durumu Takibi Uygulama Örneği

.NET Core ile Kafka kullanarak basit bir hava durumu takip uygulaması geliştireceğiz. Producer olan uygulamamız, belirli aralıklarla sıcaklık verileri üretecek ve Kafka ‘ya iletecek. Consumer tarafındada bu değerleri okurken eğer sıcaklık belirli bir eşiğin üzerine çıkarsa, ekrana bir uyarı mesajı basacağız. Gerçek dünya projelerinde bu adımda genellikle email / notification gönderimi ya da socket üzerinden Client ‘ları bilgilendirme gibi işlemler gerçekleştirilebilir.

Oluşturduğum test projesinin yapısı aşağıdaki gibidir, basit bir kurgu yaptım.

Öncelikle hava durumu verilerini üretecek Producer uygulamamızı yazalım. Bunun için yeni bir Console App açıyoruz ve aşağıdaki Confluent.Kafka paketini yüklüyoruz. Bu işlemi Consumer Console App ‘imiz için de yapacağız.

Bash
dotnet add package Confluent.Kafka

Producer

C#
using Confluent.Kafka;
using Kafka.Dto;
using System.Text.Json;

var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
var producer = new ProducerBuilder<Null, string>(config).Build();

var cities = new[] { "Antalya", "Adana", "Amasya" };
var rand = new Random();

while (true)
{
    KafkaWeatherDto data = new KafkaWeatherDto
    {
        City = cities[rand.Next(cities.Length)],
        Temperature = rand.Next(1, 50),
        Humidity = rand.Next(30, 90),
        Date = DateTime.Now
    };

    string jsonData = JsonSerializer.Serialize(data);
    await producer.ProduceAsync("WeatherTopic", new Message<Null, string>
    {
        Value = jsonData
    });

    Console.WriteLine($"Data gönderildi: {jsonData}");
    await Task.Delay(2000);
}

Üst kısımda konfigurasyonları ve tanımlamaları yaptıktan sonra while bloğu içerisinde işlemlerimizi gerçekleştiriyoruz. Burada da KafkaWeatherModel tipindeki modelimizi serialize ediyor ve ProduceAsync ile WeatherTopic ‘ine iletiyoruz. Senaryomuz gereği 2 sn de bir bu datanın topic e iletilmesini sağlıyoruz. Producer projesini ayağa kaldırdığımızda çıktısı şu şekilde olacaktır:

Consumer

C#
using Confluent.Kafka;
using Kafka.Dto;
using System.Text.Json;

var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "WeatherConsumerGroup",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

var consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe("WeatherTopic");

while (true)
{
    var consume = consumer.Consume();
    var kafkaWeatherModel = JsonSerializer.Deserialize<KafkaWeatherDto>(consume.Message.Value);

    if (kafkaWeatherModel != null)
    {
        Console.WriteLine($"{kafkaWeatherModel.City} > Sıcaklık : {kafkaWeatherModel.Temperature} , Nem : {kafkaWeatherModel.Humidity}");

        if (kafkaWeatherModel.Temperature > 35)
            Console.WriteLine($"DİKKAT : {kafkaWeatherModel.City} şehrinde sıcaklık çok yüksek : {kafkaWeatherModel.Temperature}");
    }
}

Consumer tarafında yine Kafka ile ilgili yapılandırmamızı yapıyoruz. BootstrapServers ile Kafka adresimizi belirtiyoruz. GroupId, Kafka ‘daki consumer group adıdır. Aynı GroupId ‘ye sahip consumer’lar aynı gruba aittir ve partition ‘ları paylaşarak çalışır. Bu sayede Kafka her mesajı bir grup içinde sadece bir consumer a iletir. Örneğin: 2 partition varsa ve aynı GroupId ‘ye sahip 2 consumer varsa her biri 1 partition dinler.

AutoOffsetReset = AutoOffsetReset.Earliest ayarı consumer daha önce hiç mesaj okumamışsa veya kaldığı yerin kaydı yoksa, nereden başlayacağını belirler. Earliest en eski mesajlardan, Latest ise en son gelen mesajlardan okumaya başla demektir. Biz kendi örneğimizde bu değeri Earliest olarak ayarladık.

Consumer tarafında ekran görüntüsünde de görüldüğü üzere, sıcaklıklar belli bir derecenin üzerine çıktığında uyarı mesajını ekrana bastık. Bu örnek basit olsa da, temelleri öğrendikten sonra senaryo kolaylıkla genişletilebilir. Gerçek hayat uygulamalarında bu aşamada farklı aksiyonlar alınabilir. İlgili veriler veritabanına kaydedilebilir. Kaydedilen bilgiler grafikler ile görselleştirilebilir. Gerisi kendi uygulamanızın ve sizin ihtiyaçlarınıza göre şekillenecektir.

BONUS olarak Kafka tarafında nelerimiz oluşmuş Kafka UI ‘da görmeyelim mi ? (https://www.youtube.com/watch?v=ANnoNIMPWnQ)

Broker ‘larımız:

Topic ‘lerimiz:

Topic Detayı: 1 adet partition ‘ımızın olduğu görünüyor. İstenirse partition sayısı Producer tarafında ayarlanabilir.

Consumer ‘larımızın listesi:

Bir makalenin daha sonuna geldik 🙂 , yeni makalelerde görüşmek üzere.

Faydalandığım linkler:

http://interviewbit.com/blog/kafka-architecture/

https://www.cloudduggu.com/kafka/architecture

1
Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir