Message Queue là phần không thể thiếu được trong các hệ thống lớn được thiết kế theo hướng Microservice.
Mục lục
Tổng quan về Kafka
Kafka là gì?
Kafka được sử dụng bởi hơn 80% trong số 100 công ty trong danh sách Fortune và là một trong những nền tảng xử lý luồng dữ liệu phổ biến nhất trong ngành hiện nay. Kafka được hàng nghìn tổ chức hàng đầu thế giới sử dụng cho các đường ống dẫn dữ liệu hiệu suất cao, phân tích luồng, tích hợp dữ liệu và nhiều ứng dụng quan trọng khác.
Kafka là một nền tảng message publish/subscribe phân tán (distributed messaging system) mã nguồn mở được xây dựng nhằm mục đích xử lý dữ liệu streaming theo thời gian thực.
LinkedIn đã phát triển Kafka vào năm 2011 như một message broker thông lượng cao để sử dụng cho chính nó. Sau đó Kafka có nguồn mở và được donate cho Software Foundation. Ngày nay, Kafka đã phát triển thành nền tảng stream dữ liệu phân tán được sử dụng rộng rãi nhất, có khả năng nhập và xử lý hàng nghìn tỷ bản ghi mỗi ngày mà không có bất kỳ độ trễ hiệu suất có thể nhận thấy nào theo quy mô khối lượng. Các tổ chức trong danh sách Fortune 500 như Target, Microsoft, AirBnB và Netflix dựa vào Kafka để cung cấp trải nghiệm theo thời gian thực, theo hướng dữ liệu cho khách hàng của họ.
Kafka hiện tại được sử dụng cho các trường hợp dưới đây:
- Publish và subscribe các stream of record (luồng dữ liệu)
- Lưu trữ các stream of record theo thứ tự
- Hỗ trợ xử lý stream of record theo thời gian thực
Chi tiết về Kafka, bạn xem tại: Kafka Documentation
Ưu điểm và nhược điểm của Kafka
Ưu điểm
- Open-source: Mã nguồn mở
- High-throughput: Có khả năng xử lý một lượng lớn thông tin một cách liên tục, gần như không có thời gian chờ
- High-frequency: Có thể xử lý cùng lúc nhiều message và nhiều thể loại topic
- Scalability: Dễ dàng mở rộng khi có nhu cầu
- Tự động lưu trữ message, dễ dàng kiểm tra lại
- Cộng đồng người dùng đông đảo, được hỗ trợ nhanh chóng khi cần
Nhược điểm
- Chưa có bộ công cụ giám sát hoàn chỉnh: Có nhiều tool khác nhau nhưng mỗi tool chỉ đáp ứng một tính năng quản lý nhất định, chẳng hạn như:
- Kafka tool (offset manager)
- GUI tool – quản lý topic và consumer
- Lense – hỗ trợ query message
- Akhq – toolbox quản lý Kafka và view data bên trong Kafka
- Không chọn được topic theo wildcard: Người dùng sẽ cần phải sử dụng chính xác tên topic để xử lý message
- Giảm hiệu suất khi kích thước message lớn: Kích thước message tăng khiến cho consumer và producer phải compress và decompress message, từ đó làm bộ nhớ bị chậm đi, ảnh hưởng đến throughput và hiệu suất.
- Đôi khi số lượng queues trong Kafka cluster tăng đột biến khiến Kafka xử lý chậm hơn.
Một số khái niệm cơ bản trong Kafka
Nếu bạn là người mới, lần đầu tìm hiểu về Kafka và Message Queue thì bạn cần nắm một số khái niệm cơ bản sau:
- Producer: Producer là những ứng dụng tạo hoặc thu thập dữ liệu và gửi dữ liệu tới Kafka Server. Dữ liệu này sẽ là những message có định dạng, được gửi dưới dạng mảng byte tới Kafka server.
- Consumer: Kafka sử dụng consumer để theo dõi (subscribe) vào topic, các consumer được định danh bằng các group name. Nhiều consumer có thể cùng đọc một topic. Sau khi nhận được data, Consumer có thể thêm code để xử lý data theo nhu cầu của mình.
- Cluster: Kafka cluster là một tập các server, mỗi server này được gọi là 1 broker.
- Broker: Broker là Kafka server, là cầu nối giữa Message Publisher và Message Consumer, giúp chúng có thể trao đổi message với nhau.
- Topic: Dữ liệu truyền trong Kafka theo topic, khi cần truyền dữ liệu cho các ứng dụng khác nhau thì sẽ tạo ra các topic khác nhau.
- Partitions: Kafka là một hệ thống phân tán và chúng ta có thể cài đặt Kafka server theo cluster. Trong trường hợp một topic nhận quá nhiều message tại cùng một thời điểm, chúng ta có thể chia topic này thành những partitions được share giữa các Kafka server với nhau trong một cluster. Mỗi một partition sẽ khá nhỏ và độc lập với các partitions khác. Số lượng partition cho mỗi topic thì tuỳ theo nhu cầu của ứng dụng mà chúng ta có thể quyết định.
- Consumer Group: Là một nhóm các Consumer từ Kafka server. Các Consumer trong một Consumer Group sẽ chia sẻ việc xử lý các message.
- ZOOKEEPER: Được dùng để quản lý và bố trí các broker.
Hướng dẫn cài đặt Kafka
Chi tiết bạn xem hướng dẫn tại: How to Install Apache Kafka on Ubuntu 18.04
Các lệnh thực hiện như sau:
// Cài đặt Java
sudo apt update
sudo apt install default-jdk
// Cài đătk Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzvf kafka_2.13-3.4.0.tgz
sudo mv kafka_2.13-3.4.0 /usr/local/kafka
Tiếp theo chúng ta cấu hình Kafka để ứng dụng chạy như một service trong hệ thống. Đầu tiên tạo tệp /etc/systemd/system/zookeeper.service bằng lệnh dưới:
sudo nano /etc/systemd/system/zookeeper.service
Với nội dung như dưới:
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Tiếp theo ta tạo tệp /etc/systemd/system/kafka.service bằng lệnh:
sudo nano /etc/systemd/system/kafka.service
Với nội dung như dưới (Chú ý biến môi trường JAVA_HOME nhớ cập nhật cho đúng như trên máy bạn):
[Unit] Description=Apache Kafka Server Documentation=http://kafka.apache.org/documentation.html Requires=zookeeper.service [Service] Type=simple Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64" ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh [Install] WantedBy=multi-user.target
Sau đó tải lại systemd để cập nhật các thay đổi mới bằng lệnh:
systemctl daemon-reload
Cuối cùng chúng ta bật service bằng lệnh sau
// Start zookeeper
sudo systemctl start zookeeper
// Start kafka
sudo systemctl start kafka
// Xem thông tin dịch vụ Kafka
sudo systemctl status kafka
Test thử Kafka sử dụng command line
// Chuyển đến thư mục cài đặt Kafka
cd /usr/local/kafka
// Tạo một testTopic
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testTopic
// Xem danh sách topic
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
// Mở producer gửi message tới Kafka, sau đó bạn nhận message, mỗi message trên 1 dòng
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
// Mở consumer để nhận dữ liệu từ Kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
Sử dụng Kafka trên NodeJs
Trên NodeJs chúng ta sử dụng thư viện kafkajs. Chi tiết hơn về thư viện này bạn xem tại: KafkaJs Documentation.
Đầu tiên ta tạo thư mục mới kafka-testing và cài thư viện này trước khi chạy bằng lệnh sau:
mkdir kafka-testing
npm i kafkajs
Đầu tiên chúng ta tạo tệp producer.js với tính năng định kỳ gửi message tới Kafka:
const KafkaJs = require("kafkajs");
const clientId = "kafka-app";
const brokers = ["localhost:9092"];
const topic = "message-log";
// initialize a new kafka client and initialize a producer from it
const kafka = new KafkaJs.Kafka({ clientId, brokers });
const producer = kafka.producer({ createPartitioner: KafkaJs.Partitioners.LegacyPartitioner });
// we define an async function that writes a new message each second
const produce = async () => {
await producer.connect();
let i = 0;
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
await producer.send({
topic,
messages: [
{
key: String(i),
value: "this is message " + i,
},
],
});
// if the message is written successfully, log it and increment `i`
console.log("writes: ", i)
i++;
} catch (err) {
console.error("could not write message " + err)
}
}, 1000);
}
Tiếp theo chúng ta tạo tệp consumer.js để nhận các message từ Kafka và hiển thị ra console. Thực tế khi nhận message này bạn sẽ phải xử lý nó.
const KafkaJs = require("kafkajs");
const clientId = "kafka-app";
const brokers = ["localhost:9092"];
const topic = "message-log";
const isSupportBatch = false;
// initialize a new kafka client and initialize a producer from it
const kafka = new KafkaJs.Kafka({ clientId, brokers });
const consumer = kafka.consumer({ groupId: clientId });
const consume = async () => {
await consumer.connect();
await consumer.subscribe({ topics: [topic], fromBeginning: true });
let consumerOpt = {};
if (isSupportBatch) {
consumerOpt.eachBatch = async ({ batch }) => {
let topic = batch.topic;
let partition = batch.partition;
let messages = batch.messages;
for (const message of messages) {
const prefix = `[${topic}] [${partition}] [${message.timestamp}]`;
console.log(`${prefix} ${message.key} - ${message.value}`)
}
}
} else {
consumerOpt.eachMessage = async ({ topic, partition, message }) => {
console.log(`received message: ${message.value} - topic=${topic} - partition=${partition}`);
};
}
await consumer.run(consumerOpt);
}
consume().catch((err) => {
console.error("error in consumer: ", err)
});
Bây giờ, bạn hãy mở 2 terminal trên server. Terminal thứ nhất bạn chạy lệnh:
node consumer.js
Terminal thứ 2 bạn chạy lệnh:
node producer.js
Bạn sẽ thấy dữ liệu trao đổi như ảnh dưới.
Consumer trong Kafka cho phép bạn nhận từng message một để xử lý hoặc nhận một lúc nhiều message để xử lý đồng thời. Trong tệp consumer.js, bạn đổi tham số isSupportBatch sang true nếu bạn muốn nhận nhiều message.
Một số lỗi phát sinh khi làm việc với Kafka
KafkaJS v2.0.0 switched default partitioner
Khi lập trình NodeJs sử dụng thư viện KafkaJs, tôi đã nhận được lỗi như sau:
{"level":"WARN","timestamp":"2023-05-04T07:07:52.831Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
(node:19947) UnhandledPromiseRejectionWarning: KafkaJSNonRetriableError: Failed to connect: broker at index 0 is invalid "undefined"
Đây là lỗi warning, do tôi dùng Kafka v2 nên có thông báo này. Để sửa lỗi này, thêm option sau khi tạo Producer:
const producer = kafka.producer({
createPartitioner: KafkaJs.Partitioners.LegacyPartitioner
});
Tham khảo thêm: Migrating to v2.0.0
Connection error: getaddrinfo ENOTFOUND ip-172-26-7-222.ap-northeast-1.compute.internal
Kafka tôi cài trên 1 máy AWS được tạo trên Amazon Lightsail, khi thực hiện chạy trực tiếp example trên server này thì OKIE, nhưng khi chạy example từ máy cá nhân kết nối từ đến Kafka thấy hiện thông báo lỗi:
{"level":"INFO","timestamp":"2023-05-04T07:17:20.864Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2023-05-04T07:17:20.871Z","logger":"kafkajs","message":"[Producer] Failed to send messages: Connection error: getaddrinfo ENOTFOUND ip-172-26-7-222.ap-northeast-1.compute.internal","retryCount":2,"retryTime":874}
{"level":"ERROR","timestamp":"2023-05-04T07:17:20.878Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND ip-172-26-7-222.ap-northeast-1.compute.internal","broker":"ip-172-26-7-222.ap-northeast-1.compute.internal:9092","clientId":"dexscanner-testing","stack":"Error: getaddrinfo ENOTFOUND ip-172-26-7-222.ap-northeast-1.compute.internal\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:69:26)"}
Vấn đề này tôi thấy do server AWS tự động chuyển từ Public IP sang internal domain, do đó các máy khác sẽ không thể phân giải được domain này để ra IP.
Để fix lỗi này, trên máy local (Client), trong file hosts, ta thêm 1 dòng ánh xạ domain này sang public IP là được:
x.x.x.x ip-172-26-7-222.ap-northeast-1.compute.internal
Chú ý tệp hosts trên Ubuntu đường dần đầy đủ là /etc/hosts, còn trên Windows đường dẫn đầy đủ là C:\Windows\System32\Drivers\etc\hosts
Trả lời