From CloudEvents to Apache Kafka Records, Part II ¶
Published on: 2023-04-03 , Revised on: 2024-01-17
From CloudEvents to Apache Kafka Records, Part II¶
Authors: Daniele Zonca, Senior Principal Software Engineer @ Red Hat, Matthias Weßendorf, Senior Principal Software Engineer @ Red Hat
In this blog post you will learn how to easily store incoming CloudEvents to an Apache Kafka Topic and using Knative Broker and Trigger APIs for content-based event routing.
The first part of this post explained how Knative helps to ingest CloudEvents to an Apache Kafka topic for further processing. The article showed the processing of the CloudEvents Kafka Record with a standard tool from the Apache Kafka ecosystem, like the kcat
CLI. In addition, the post also explained the benefits of the binary content mode of CloudEvents, which Knative defaults to. Now, in this article we show a different approach to process the ingested CloudEvents, by leveraging the Knative Broker and Trigger APIs for the event routing.
Setting up Apache Kafka and the Knative Broker¶
In order to use the Knative Broker for Apache Kafka you need to install Apache Kafka first. For this post we are using a local Apache Kafka installation, powered by Strimzi, as described here. The article also discusses how to install the Knative Broker for Apache Kafka for a local development environment.
Note
For a production-ready configuration of the Knative Broker for Apache Kafka see this blog.
Setting up the Knative Broker component¶
The above-mentioned article configures all Knative Broker
s to be in shape of the Kafka
class, therefore creating a new broker for Apache Kafka is pretty straightforward:
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-demo-kafka-broker
annotations:
eventing.knative.dev/broker.class: Kafka
spec: {}
The Broker
is an Addressable
type, which can receive incoming CloudEvents over HTTP to an address defined in their status.address.url
field:
kubectl get brokers.eventing.knative.dev
NAME URL AGE READY REASON
my-demo-kafka-broker http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker 7s True
Note
The Broker is reachable at the mentioned URL, inside the cluster. It is possible to create (and also secure) an Ingress
to do it. For development, you can also directly use kn
command line to send events, see Kn Event Plugin section.
But we do not see any information about the Apache Kafka Topic. The reason is that the topic used by the Broker implementation is considered an implementation detail. Let us have a look at the actual broker object:
kubectl get brokers.eventing.knative.dev my-demo-kafka-broker -o yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: my-demo-kafka-broker
namespace: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
status:
address:
url: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker
annotations:
bootstrap.servers: my-cluster-kafka-bootstrap.kafka:9092
default.topic: knative-broker-default-my-demo-kafka-broker
default.topic.partitions: "10"
default.topic.replication.factor: "1"
The above gives a simplified version of the YAML representation, but note the spec.config
: It points to the default configuration for all Kafka-enabled Knative Brokers in the cluster. The kafka-broker-config
ConfigMap configures the notion of the underlying topics, by defining knobs like partition
or replication factor
. However, in the status
of the broker you see the name of the topic: knative-broker-default-my-demo-kafka-broker
. The name is following the convention knative-broker-<namespace>-<broker-name>
.
Note
By default the Knative Kafka Broker creates its own internal topic, however this action might be restricted in some environments. For this and any other similar use cases, it is possible to bring your own topic.
Setting up the Consumer application¶
Now that we have the Broker
, which will act as the HTTP endpoint for ingesting the CloudEvents, it is time to define an application that is receiving and processing the CloudEvents:
apiVersion: v1
kind: Pod
metadata:
name: log-receiver
labels:
app: log-receiver
spec:
containers:
- name: log-receiver
image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
name: log-receiver
---
apiVersion: v1
kind: Service
metadata:
name: log-receiver
spec:
selector:
app: log-receiver
ports:
- port: 80
protocol: TCP
targetPort: log-receiver
name: http
Here we define a simple Pod
and its Service
, which points to an HTTP-Server, that receives the CloudEvents. As you can see, this is not a Kafka specific consumer, any HTTP Webserver, in any language, can be used for processing the CloudEvents coming from an Apache Kafka Topic.
The developers of the consumer applications do not need to know any detail about how to program a Kafka consumer application. Knative with its Broker implementation for Apache Kafka abstracts this away, by acting as an HTTP proxy for the consumer applications. This does dramatically simplify the engineering efforts for these focused and self-contained consumer applications.
Defining rules for message routing with Apache Kafka¶
It is quite common that a Topic in Apache Kafka is used to contain different types of events that maybe refer to the same Bounded Context (if you are applying Domain-Driven Design principles). This means that each consumer is going to receive all the events only to filter and process a subset of them.
This is one of the downsides of the Apache Kafka protocol: there is no direct filter API for routing of the Records. In order to process or filter events and route them to a different destination, or other Kafka topic, a fully fledged Kafka consumer client needs to be implemented. Or the usage of additional libraries like Kafka Streams is required.
As you can imagine this is a quite common pattern and Knative Eventing makes it part of the API. The Trigger
API defines a powerful set of filters to route CloudEvents based on their metadata:
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: log-trigger
spec:
broker: my-demo-kafka-broker
filter:
attributes:
type: <cloud-event-type>
<ce-extension>: <ce-extension-value>
subscriber:
ref:
apiVersion: v1
kind: Service
name: log-receiver
We see a Trigger that defines a set of filter
rules, if those are matching, the CloudEvent from the Kafka topic is routed, using HTTP, to our referenced webserver application. There is also an experimental feature in Knative which enables a new SQL-like filtering using the filters
field on the Trigger
API that implements CloudEvents Subscriptions API.
Note
It is highly recommended applying filter attributes on the Trigger
s for the CloudEvents metadata attributes and extensions. If no filter is provided, all occurring CloudEvents are routed to the referenced subscriber, which is a bad application design, expect if you explicitly want to have a logger for all events in the broker.
For Trigger
s that are executed by a Knative Broker for Apache Kafka it is also possible to configure the order of delivered events, using the kafka.eventing.knative.dev/delivery.order
annotation on the Trigger
.
Kn Event Plugin¶
For sending an event we also do not need to make use of the Apache Kafka Producer API, since we are ingesting CloudEvents to the Broker, using HTTP. As one option we could use a Pod
inside the Kubernetes cluster with the curl
program installed and send an event to the URL
of the Broker
. However, instead we are using the kn
client CLI with its event plugin for managing cloud events from command line:
kn event send \
--to Broker:eventing.knative.dev/v1:my-demo-kafka-broker \
--type=dev.knative.blog.post \
-f message="Hello"
With the above command we are sending a message
as a CloudEvents with the dev.knative.blog.post
type to our my-demo-kafka-broker
object. The kn event
plugin generates a valid CloudEvent from this invocation and sends it directly to the addressable URL of the referenced component, in this example our Broker
.
Conclusion¶
The example showed a simple flow from sending events to receiving them. The messages are persistent on the Kafka Topic behind the Knative Broker. From there it could be also consumed by any standard Apache Kafka API. However the abstraction that Knative offers simplifies the development process of event-driven applications. Without too much extra configuration it is also possible to filter and route events on their metadata.
In addition, the adoption of Trigger
/Filter
is not just a way to avoid to reimplement the same pattern in all consumers, but it also makes the whole message processing more efficient because the consumer is only invoked when necessary, and it can even scale to zero if it is a Knative Service!