Jutaan catatan data dihasilkan setiap hari dalam sistem komputasi saat ini. Ini termasuk transaksi keuangan Anda, melakukan pemesanan, atau data dari sensor mobil Anda. Untuk memproses peristiwa streaming data ini secara waktu nyata dan untuk memindahkan catatan peristiwa secara andal di antara sistem perusahaan yang berbeda, Anda memerlukannya Apache Kafka.
Apache Kafka adalah solusi streaming data sumber terbuka yang menangani lebih dari 1 juta rekaman per detik. Di samping throughput yang tinggi ini, Apache Kafka menyediakan skalabilitas dan ketersediaan yang tinggi, latensi rendah, dan penyimpanan permanen.
Perusahaan seperti LinkedIn, Uber, dan Netflix mengandalkan Apache Kafka untuk pemrosesan waktu nyata dan streaming data. Cara termudah untuk memulai dengan Apache Kafka adalah mengaktifkan dan menjalankannya di komputer lokal Anda. Ini memungkinkan Anda untuk tidak hanya melihat server Apache Kafka beraksi tetapi juga memungkinkan Anda membuat dan menggunakan pesan.
Dengan pengalaman langsung dalam memulai server, membuat topik, dan menulis kode Java menggunakan klien Kafka, Anda akan siap menggunakan Apache Kafka untuk memenuhi semua kebutuhan pipeline data Anda.
Daftar isi
Cara Mengunduh Apache Kafka di komputer lokal Anda
Anda dapat mengunduh Apache Kafka versi terbaru dari tautan resmi. Konten yang diunduh akan dikompresi dalam format .tgz. Setelah diunduh, Anda harus mengekstraknya.
Jika Anda pengguna Linux, buka terminal Anda. Selanjutnya, navigasikan ke lokasi tempat Anda mengunduh versi terkompresi Apache Kafka. Jalankan perintah berikut:
tar -xzvf kafka_2.13-3.5.0.tgz
Setelah perintah selesai, Anda akan menemukan direktori baru bernama kafka_2.13-3.5.0. Arahkan ke dalam folder menggunakan:
cd kafka_2.13-3.5.0
Anda sekarang dapat membuat daftar isi direktori ini menggunakan perintah ls.
Untuk pengguna Windows, Anda dapat mengikuti langkah yang sama. Jika Anda tidak dapat menemukan perintah tar, Anda dapat menggunakan alat pihak ketiga seperti WinZip untuk membuka arsip.
Cara memulai Apache Kafka di mesin lokal Anda
Setelah Anda mengunduh dan mengekstraksi Apache Kafka, saatnya untuk mulai menjalankannya. Itu tidak memiliki penginstal. Anda dapat langsung mulai menggunakannya melalui baris perintah atau jendela terminal.
Sebelum memulai dengan Apache Kafka, pastikan Anda telah menginstal Java 8+ di sistem Anda. Apache Kafka membutuhkan instalasi Java yang sedang berjalan.
#1. Jalankan server Apache Zookeeper
Langkah pertama adalah menjalankan Apache Zookeeper. Anda mendapatkannya pra-unduh sebagai bagian dari arsip. Ini adalah layanan yang bertanggung jawab untuk memelihara konfigurasi dan menyediakan sinkronisasi untuk layanan lain.
Setelah Anda berada di dalam direktori tempat Anda mengekstrak konten arsip, jalankan perintah berikut:
Untuk pengguna Linux:
bin/zookeeper-server-start.sh config/zookeeper.properties
Untuk pengguna Windows:
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
File zookeeper.properties menyediakan konfigurasi untuk menjalankan server Apache Zookeeper. Anda dapat mengonfigurasi properti seperti direktori lokal tempat data akan disimpan dan port tempat server akan dijalankan.
#2. Mulai server Apache Kafka
Sekarang server Apache Zookeeper telah dimulai, saatnya untuk memulai server Apache Kafka.
Buka terminal baru atau jendela prompt perintah dan navigasikan ke direktori tempat file yang diekstrak berada. Kemudian Anda dapat memulai server Apache Kafka menggunakan perintah di bawah ini:
Untuk pengguna Linux:
bin/kafka-server-start.sh config/server.properties
Untuk pengguna Windows:
bin/windows/kafka-server-start.bat config/server.properties
Anda menjalankan server Apache Kafka Anda. Jika Anda ingin mengubah konfigurasi default, Anda dapat melakukannya dengan memodifikasi file server.properties. Nilai-nilai yang berbeda hadir dalam dokumentasi resmi.
Cara Menggunakan Apache Kafka di mesin lokal Anda
Anda sekarang siap untuk mulai menggunakan Apache Kafka di komputer lokal Anda untuk membuat dan menggunakan pesan. Karena server Apache Zookeeper dan Apache Kafka aktif dan berjalan, mari kita lihat bagaimana Anda dapat membuat topik pertama, menghasilkan pesan pertama, dan menggunakan hal yang sama.
Bagaimana langkah-langkah membuat topik di Apache Kafka?
Sebelum Anda membuat topik pertama Anda, mari kita pahami apa sebenarnya topik itu. Di Apache Kafka, topik adalah penyimpanan data logis yang membantu streaming data. Anggap saja sebagai saluran melalui mana data diangkut dari satu komponen ke komponen lainnya.
Topik mendukung multi-produsen dan multi-konsumen – lebih dari satu sistem dapat menulis dan membaca dari suatu topik. Tidak seperti sistem perpesanan lainnya, pesan apa pun dari suatu topik dapat dikonsumsi lebih dari satu kali. Selain itu, Anda juga dapat menyebutkan periode retensi pesan Anda.
Mari kita ambil contoh sebuah sistem (produsen) yang menghasilkan data untuk transaksi bank. Dan sistem lain (konsumen) menggunakan data ini dan mengirimkan notifikasi aplikasi ke pengguna. Untuk memfasilitasi ini, diperlukan topik.
Buka terminal baru atau jendela prompt perintah, dan arahkan ke direktori tempat Anda mengekstrak arsip. Perintah berikut akan membuat topik yang disebut transaksi:
Untuk pengguna Linux:
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
Untuk pengguna Windows:
bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092
Anda sekarang telah membuat topik pertama Anda, dan Anda siap untuk mulai memproduksi dan menggunakan pesan.
Bagaimana cara menghasilkan pesan ke Apache Kafka?
Dengan topik Apache Kafka Anda siap, Anda sekarang dapat membuat pesan pertama Anda. Buka terminal baru atau jendela prompt perintah, atau gunakan terminal yang sama dengan yang Anda gunakan untuk membuat topik. Selanjutnya, pastikan Anda berada di direktori yang tepat tempat Anda mengekstrak konten arsip. Anda dapat menggunakan baris perintah untuk menghasilkan pesan Anda pada topik menggunakan perintah berikut:
Untuk pengguna Linux:
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
Untuk pengguna Windows:
bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092
Setelah Anda menjalankan perintah, Anda akan melihat bahwa jendela terminal atau prompt perintah Anda sedang menunggu input. Tulis pesan pertama Anda dan tekan Enter.
> This is a transactional record for $100
Anda telah membuat pesan pertama Anda ke Apache Kafka di mesin lokal Anda. Selanjutnya, Anda sekarang siap untuk menggunakan pesan ini.
Bagaimana cara mengkonsumsi pesan dari Apache Kafka?
Asalkan topik Anda telah dibuat dan Anda telah membuat pesan ke topik Kafka Anda, Anda sekarang dapat menggunakan pesan itu.
Apache Kafka memungkinkan Anda melampirkan banyak konsumen ke topik yang sama. Setiap konsumen dapat menjadi bagian dari kelompok konsumen – pengidentifikasi logis. Misalnya, jika Anda memiliki dua layanan yang perlu menggunakan data yang sama, mereka dapat memiliki grup konsumen yang berbeda.
Namun, jika Anda memiliki dua instance dari layanan yang sama, Anda ingin menghindari menggunakan dan memproses pesan yang sama dua kali. Dalam hal ini, keduanya akan memiliki kelompok konsumen yang sama.
Di jendela terminal atau command prompt, pastikan Anda berada di direktori yang tepat. Gunakan perintah berikut untuk memulai konsumen:
Untuk pengguna Linux:
bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Untuk pengguna Windows:
bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Anda akan melihat pesan yang Anda buat sebelumnya muncul di terminal Anda. Anda sekarang telah menggunakan Apache Kafka untuk menggunakan pesan pertama Anda.
Perintah kafka-console-consumer membutuhkan banyak argumen yang diteruskan. Mari kita lihat apa artinya masing-masing:
- The –topic menyebutkan topik dari mana Anda akan mengkonsumsi
- –dari awal memberi tahu konsumen konsol untuk mulai membaca pesan langsung dari pesan pertama yang ada
- Server Apache Kafka Anda disebutkan melalui opsi –bootstrap-server
- Selain itu, Anda dapat menyebutkan grup konsumen dengan meneruskan parameter –group
- Dengan tidak adanya parameter grup konsumen, itu dihasilkan secara otomatis
Dengan konsumen konsol berjalan, Anda dapat mencoba membuat pesan baru. Anda akan melihat bahwa semuanya dikonsumsi dan muncul di terminal Anda.
Sekarang setelah Anda membuat topik dan berhasil membuat dan menggunakan pesan, mari integrasikan ini dengan aplikasi Java.
Cara membuat produsen dan konsumen Apache Kafka menggunakan Java
Sebelum memulai, pastikan Anda telah menginstal Java 8+ di mesin lokal Anda. Apache Kafka menyediakan pustaka kliennya sendiri yang memungkinkan Anda terhubung dengan lancar. Jika Anda menggunakan Maven untuk mengelola dependensi, tambahkan dependensi berikut ke pom.xml Anda
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.0</version> </dependency>
Anda juga dapat mengunduh perpustakaan dari Repositori Maven dan tambahkan ke classpath Java Anda.
Setelah perpustakaan Anda siap, buka editor kode pilihan Anda. Mari kita lihat bagaimana Anda dapat memulai produsen dan konsumen Anda menggunakan Java.
Buat produser Apache Kafka Java
Dengan perpustakaan kafka-clients, Anda sekarang siap untuk mulai membuat produser Kafka Anda.
Mari buat kelas bernama SimpleProducer.java. Ini akan bertanggung jawab untuk menghasilkan pesan tentang topik yang telah Anda buat sebelumnya. Di dalam kelas ini, Anda akan membuat instance org.apache.kafka.clients.producer.KafkaProducer. Selanjutnya, Anda akan menggunakan produser ini untuk mengirim pesan Anda.
Untuk membuat produser Kafka, Anda memerlukan host dan port server Apache Kafka Anda. Karena Anda menjalankannya di komputer lokal, host-nya adalah localhost. Karena Anda tidak mengubah properti default saat memulai server, porta akan menjadi 9092. Perhatikan kode berikut di bawah ini yang akan membantu Anda membuat produser:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } }
Anda akan melihat bahwa ada tiga properti yang disetel. Mari kita telusuri masing-masing dengan cepat:
- BOOTSTRAP_SERVERS_CONFIG memungkinkan Anda menentukan di mana server Apache Kafka berjalan
- KEY_SERIALIZER_CLASS_CONFIG memberi tahu produser format apa yang digunakan untuk mengirim kunci pesan.
- Format pengiriman pesan sebenarnya ditentukan menggunakan properti VALUE_SERIALIZER_CLASS_CONFIG.
Karena Anda akan mengirimkan pesan teks, kedua properti diatur untuk menggunakan StringSerializer.class.
Untuk benar-benar mengirim pesan ke topik Anda, Anda perlu menggunakan metode producer.send() yang menggunakan ProducerRecord. Kode berikut memberi Anda metode yang akan mengirim pesan ke topik dan mencetak respons bersama dengan offset pesan.
public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); }
Dengan seluruh kode terpasang, kini Anda dapat mengirim pesan ke topik Anda. Anda dapat menggunakan metode main untuk mengujinya, seperti yang disajikan dalam kode di bawah ini:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); } public static void main(String[] args) throws Exception{ SimpleProducer producer = new SimpleProducer("localhost", "9092"); producer.produce("transactions", "This is a transactional record of $200"); } }
Dalam kode ini, Anda membuat SimpleProducer yang terhubung ke server Apache Kafka di mesin lokal Anda. Itu secara internal menggunakan KafkaProducer untuk menghasilkan pesan teks tentang topik Anda.
Buat konsumen Apache Kafka Java
Saatnya membuat konsumen Apache Kafka menggunakan klien Java. Buat kelas bernama SimpleConsumer.java. Selanjutnya, Anda akan membuat konstruktor untuk kelas ini, yang menginisialisasi org.apache.kafka.clients.consumer.KafkaConsumer. Untuk membuat konsumen, Anda memerlukan host dan port tempat server Apache Kafka berjalan. Selain itu, Anda memerlukan Grup Konsumen serta topik yang ingin Anda konsumsi. Gunakan cuplikan kode yang diberikan di bawah ini:
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } }
Mirip dengan Produser Kafka, Konsumen Kafka juga menerima objek Properti. Mari kita lihat semua set properti yang berbeda:
- BOOTSTRAP_SERVERS_CONFIG memberi tahu konsumen tempat server Apache Kafka berjalan
- Grup konsumen disebutkan menggunakan GROUP_ID_CONFIG
- Ketika konsumen mulai mengkonsumsi, AUTO_OFFSET_RESET_CONFIG memungkinkan Anda menyebutkan seberapa jauh Anda ingin mulai mengkonsumsi pesan dari
- KEY_DESERIALIZER_CLASS_CONFIG memberi tahu konsumen jenis kunci pesan
- VALUE_DESERIALIZER_CLASS_CONFIG memberi tahu tipe konsumen dari pesan yang sebenarnya
Karena, dalam kasus Anda, Anda akan menggunakan pesan teks, properti deserializer disetel ke StringDeserializer.class.
Anda sekarang akan mengkonsumsi pesan dari topik Anda. Agar semuanya tetap sederhana, setelah pesan dikonsumsi, Anda akan mencetak pesan ke konsol. Mari kita lihat bagaimana Anda bisa mencapainya dengan menggunakan kode di bawah ini:
private boolean keepConsuming = true; public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } }
Kode ini akan terus memilih topik. Saat Anda menerima Data Konsumen apa pun, pesan akan dicetak. Uji tindakan konsumen Anda menggunakan metode utama. Anda akan memulai aplikasi Java yang akan terus menggunakan topik dan mencetak pesan. Hentikan aplikasi Java untuk menghentikan konsumen.
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } } public static void main(String[] args) { SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions"); simpleConsumer.consume(); } }
Ketika Anda menjalankan kode, Anda akan mengamati bahwa itu tidak hanya menggunakan pesan yang dihasilkan oleh produser Java Anda, tetapi juga pesan yang Anda hasilkan melalui Produser Konsol. Ini karena properti AUTO_OFFSET_RESET_CONFIG telah disetel paling awal.
Dengan SimpleConsumer berjalan, Anda dapat menggunakan produsen konsol atau aplikasi Java SimpleProducer untuk menghasilkan pesan lebih lanjut ke topik. Anda akan melihatnya dikonsumsi dan dicetak di konsol.
Penuhi semua kebutuhan pipeline data Anda dengan Apache Kafka
Apache Kafka memungkinkan Anda menangani semua persyaratan saluran data dengan mudah. Dengan pengaturan Apache Kafka di mesin lokal Anda, Anda dapat menjelajahi semua fitur berbeda yang disediakan Kafka. Selain itu, klien Java resmi memungkinkan Anda menulis, menghubungkan, dan berkomunikasi dengan server Apache Kafka Anda secara efisien.
Menjadi sistem streaming data yang serbaguna, dapat diskalakan, dan berperforma tinggi, Apache Kafka benar-benar dapat menjadi pengubah permainan untuk Anda. Anda dapat menggunakannya untuk pengembangan lokal Anda atau bahkan mengintegrasikannya ke dalam sistem produksi Anda. Sama seperti pengaturan lokal yang mudah, pengaturan Apache Kafka untuk aplikasi yang lebih besar bukanlah tugas yang berat.
Jika Anda mencari platform streaming data, Anda dapat melihat platform data streaming terbaik untuk Analisis dan Pemrosesan Waktu Nyata.