Skip to main content

Kafka

Our Kafka integration allows you to import brokers and topics from your Kafka clusters into Port, according to your mapping and definition.

Common use casesโ€‹

  • Map brokers and topics in your Kafka clusters.
  • Watch for object changes (create/update/delete) on schedule, and automatically apply the changes to your entities in Port.
  • Create/delete Kafka objects using self-service actions.

Prerequisitesโ€‹

To install the integration, you need a Kubernetes cluster that the integration's container chart will be deployed to.

Please make sure that you have kubectl and helm installed on your machine, and that your kubectl CLI is connected to the Kubernetes cluster where you plan to install the integration.

Installationโ€‹

Choose one of the following installation methods:

Using this installation option means that the integration will be able to update Port in real time using webhooks.

This table summarizes the available parameters for the installation. Set them as you wish in the script below, then copy it and run it in your terminal:

ParameterDescriptionExampleRequired
port.clientIdYour port client idโœ…
port.clientSecretYour port client secretโœ…
integration.secrets.clusterConfMappingThe Mapping of Kafka cluster names to Kafka client configโœ…

Advanced configuration

ParameterDescription
integration.eventListener.typeThe event listener type. Read more about event listeners
integration.typeThe integration to be installed
scheduledResyncIntervalThe number of minutes between each resync. When not set the integration will resync for each event listener resync event. Read more about scheduledResyncInterval
initializePortResourcesDefault true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources

To install the integration using Helm, run the following command:

helm repo add --force-update port-labs https://port-labs.github.io/helm-charts
helm upgrade --install kafka port-labs/port-ocean \
--set port.clientId="PORT_CLIENT_ID" \
--set port.clientSecret="PORT_CLIENT_SECRET" \
--set port.baseUrl="https://api.getport.io" \
--set initializePortResources=true \
--set scheduledResyncInterval=60 \
--set integration.identifier="my-kafka-integration" \
--set integration.type="kafka" \
--set integration.eventListener.type="POLLING" \
--set-json integration.secrets.clusterConfMapping='{"local": {"bootstrap.servers": "localhost:9092"}}'
Advanced integration configuration

For advanced configuration such as proxies or self-signed certificates, click here.

Ingesting Kafka objectsโ€‹

The Kafka integration uses a YAML configuration to describe the process of loading data into the developer portal.

Here is an example snippet from the config which demonstrates the process for getting cluster data from Kafka:

createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id

The integration makes use of the JQ JSON processor to select, modify, concatenate, transform and perform other operations on existing fields and values from Kafka metadata objects.

Configuration structureโ€‹

The integration configuration determines which resources will be queried from Kafka, and which entities and properties will be created in Port.

Supported resources

The following resources can be used to map data from Kafka, it is possible to reference any field that appears in the examples below for the mapping configuration.

Cluster example
{
"name": "local",
"controller_id": "1"
}
Broker example
{
"id": "1",
"address": "localhost:9092/1",
"cluster_name": "local",
"config": {"key": "value", ...}
}
Topic example
{
"name": "_consumer_offsets",
"cluster_name": "local",
"partitions": [
{
"id": 0,
"leader": 2,
"replicas": [2, 1, 3],
"isrs": [3, 2, 1]
}
],
"config": {"key": "value", ...}
}
  • The root key of the integration configuration is the resources key:

    resources:
    - kind: cluster
    selector:
    ...
  • The kind key is a specifier for a Kafka object:

      resources:
    - kind: cluster
    selector:
    ...
  • The selector and the query keys allow you to filter which objects of the specified kind will be ingested into your software catalog:

    resources:
    - kind: cluster
    selector:
    query: "true" # JQ boolean expression. If evaluated to false - this object will be skipped.
    port:
  • The port, entity and the mappings keys are used to map the Kafka object fields to Port entities. To create multiple mappings of the same kind, you can add another item in the resources array;

    resources:
    - kind: cluster
    selector:
    query: "true"
    port:
    entity:
    mappings: # Mappings between one Kafka cluster to a Port entity. Each value is a JQ query.
    identifier: .name
    title: .name
    blueprint: '"kafkaCluster"'
    properties:
    controllerId: .controller_id
    - kind: cluster # In this instance cluster is mapped again with a different filter
    selector:
    query: '.name == "MyClusterName"'
    port:
    entity:
    mappings: ...
    Blueprint key

    Note the value of the blueprint key - if you want to use a hardcoded string, you need to encapsulate it in 2 sets of quotes, for example use a pair of single-quotes (') and then another pair of double-quotes (")

Ingest data into Portโ€‹

To ingest Kafka objects using the integration configuration, you can follow the steps below:

  1. Go to the DevPortal Builder page.
  2. Select a blueprint you want to ingest using Kafka.
  3. Choose the Ingest Data option from the menu.
  4. Select Kafka under the Event Processing providers category.
  5. Modify the configuration according to your needs.
  6. Click Resync.

Examplesโ€‹

Examples of blueprints and the relevant integration configurations:

Clusterโ€‹

Cluster blueprint
{
"identifier": "kafkaCluster",
"title": "Cluster",
"icon": "Kafka",
"schema": {
"properties": {
"controllerId": {
"title": "Controller ID",
"type": "string"
}
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id

Brokerโ€‹

Broker blueprint
{
"identifier": "kafkaBroker",
"title": "Broker",
"icon": "Kafka",
"schema": {
"properties": {
"address": {
"title": "Address",
"type": "string"
},
"region": {
"title": "Region",
"type": "string"
},
"version": {
"title": "Version",
"type": "string"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: broker
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name

Topicโ€‹

Topic blueprint
{
"identifier": "kafkaTopic",
"title": "Topic",
"icon": "Kafka",
"schema": {
"properties": {
"replicas": {
"title": "Replicas",
"type": "number"
},
"partitions": {
"title": "Partitions",
"type": "number"
},
"compaction": {
"title": "Compaction",
"type": "boolean"
},
"retention": {
"title": "Retention",
"type": "boolean"
},
"deleteRetentionTime": {
"title": "Delete Retention Time",
"type": "number"
},
"partitionsMetadata": {
"title": "Partitions Metadata",
"type": "array"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
},
"brokers": {
"target": "kafkaBroker",
"required": false,
"many": true
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: topic
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'