Kafka
Our Kafka integration allows you to import brokers
and topics
from your Kafka clusters
into Port, according to your mapping and definition.
Common use cases
- Map brokers and topics in your Kafka clusters.
- Watch for object changes (create/update/delete) on schedule, and automatically apply the changes to your entities in Port.
- Create/delete Kafka objects using self-service actions.
Prerequisites
To install the integration, you need a Kubernetes cluster that the integration's container chart will be deployed to.
Please make sure that you have kubectl
and helm
installed on your machine, and that your kubectl
CLI is connected to the Kubernetes cluster where you plan to install the integration.
If you are having trouble installing this integration, please refer to these troubleshooting steps.
Installation
Choose one of the following installation methods:
- Real Time & Always On
- Scheduled
Using this installation option means that the integration will be able to update Port in real time using webhooks.
This table summarizes the available parameters for the installation. Set them as you wish in the script below, then copy it and run it in your terminal:
Parameter | Description | Example | Required |
---|---|---|---|
port.clientId | Your port client id | ✅ | |
port.clientSecret | Your port client secret | ✅ | |
port.baseUrl | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | ✅ | |
integration.secrets.clusterConfMapping | The Mapping of Kafka cluster names to Kafka client config | ✅ |
Advanced configuration
Parameter | Description |
---|---|
integration.eventListener.type | The event listener type. Read more about event listeners |
integration.type | The integration to be installed |
scheduledResyncInterval | The number of minutes between each resync. When not set the integration will resync for each event listener resync event. Read more about scheduledResyncInterval |
initializePortResources | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources |
sendRawDataExamples | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true |
- Helm
- ArgoCD
To install the integration using Helm, run the following command:
helm repo add --force-update port-labs https://port-labs.github.io/helm-charts
helm upgrade --install kafka port-labs/port-ocean \
--set port.clientId="PORT_CLIENT_ID" \
--set port.clientSecret="PORT_CLIENT_SECRET" \
--set port.baseUrl="https://api.getport.io" \
--set initializePortResources=true \
--set sendRawDataExamples=true \
--set scheduledResyncInterval=60 \
--set integration.identifier="my-kafka-integration" \
--set integration.type="kafka" \
--set integration.eventListener.type="POLLING" \
--set-json integration.secrets.clusterConfMapping='{"local": {"bootstrap.servers": "localhost:9092"}}'
The baseUrl
, port_region
, port.baseUrl
, portBaseUrl
, port_base_url
and OCEAN__PORT__BASE_URL
parameters are used to select which instance or Port API will be used.
Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.
- If you use the EU region of Port (https://app.getport.io), your API URL is
https://api.getport.io
. - If you use the US region of Port (https://app.us.getport.io), your API URL is
https://api.us.getport.io
.
To install the integration using ArgoCD, follow these steps:
- Create a
values.yaml
file inargocd/my-ocean-kafka-integration
in your git repository with the content:
Remember to replace the placeholders for KAFKA_CLUSTER_CONFIG_MAPPING
.
initializePortResources: true
scheduledResyncInterval: 120
integration:
identifier: my-ocean-kafka-integration
type: kafka
eventListener:
type: POLLING
secrets:
clusterConfMapping: KAFKA_CLUSTER_CONFIG_MAPPING
- Install the
my-ocean-kafka-integration
ArgoCD Application by creating the followingmy-ocean-kafka-integration.yaml
manifest:
Remember to replace the placeholders for YOUR_PORT_CLIENT_ID
YOUR_PORT_CLIENT_SECRET
and YOUR_GIT_REPO_URL
.
Multiple sources ArgoCD documentation can be found here.
ArgoCD Application
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: my-ocean-kafka-integration
namespace: argocd
spec:
destination:
namespace: my-ocean-kafka-integration
server: https://kubernetes.default.svc
project: default
sources:
- repoURL: 'https://port-labs.github.io/helm-charts/'
chart: port-ocean
targetRevision: 0.1.14
helm:
valueFiles:
- $values/argocd/my-ocean-kafka-integration/values.yaml
parameters:
- name: port.clientId
value: YOUR_PORT_CLIENT_ID
- name: port.clientSecret
value: YOUR_PORT_CLIENT_SECRET
- name: port.baseUrl
value: https://api.getport.io
- repoURL: YOUR_GIT_REPO_URL
targetRevision: main
ref: values
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
The baseUrl
, port_region
, port.baseUrl
, portBaseUrl
, port_base_url
and OCEAN__PORT__BASE_URL
parameters are used to select which instance or Port API will be used.
Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.
- If you use the EU region of Port (https://app.getport.io), your API URL is
https://api.getport.io
. - If you use the US region of Port (https://app.us.getport.io), your API URL is
https://api.us.getport.io
.
- Apply your application manifest with
kubectl
:
kubectl apply -f my-ocean-kafka-integration.yaml
- GitHub
- Jenkins
- Azure Devops
- GitLab
This workflow will run the Kafka integration once and then exit, this is useful for scheduled ingestion of data.
If you want the integration to update Port in real time using webhooks you should use the Real Time & Always On installation option.
Make sure to configure the following Github Secrets:
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | ✅ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | ✅ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | ✅ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | ✅ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | ❌ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | ❌ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | ❌ |
Here is an example for kafka-integration.yml
workflow file:
name: Kafka Exporter Workflow
on:
workflow_dispatch:
schedule:
- cron: '0 */1 * * *' # Determines the scheduled interval for this workflow. This example runs every hour.
jobs:
run-integration:
runs-on: ubuntu-latest
timeout-minutes: 30 # Set a time limit for the job
steps:
- uses: port-labs/ocean-sail@v1
with:
type: 'kafka'
port_client_id: ${{ secrets.OCEAN__PORT__CLIENT_ID }}
port_client_secret: ${{ secrets.OCEAN__PORT__CLIENT_SECRET }}
port_base_url: https://api.getport.io
config: |
cluster_conf_mapping: ${{ secrets.OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING }}
This pipeline will run the Kafka integration once and then exit, this is useful for scheduled ingestion of data.
Your Jenkins agent should be able to run docker commands.
If you want the integration to update Port in real time using webhooks you should use the Real Time & Always On installation option.
Make sure to configure the following Jenkins Credentials of Secret Text
type:
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | ✅ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | ✅ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | ✅ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | ✅ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | ❌ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | ❌ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | ❌ |
Here is an example for Jenkinsfile
groovy pipeline file:
pipeline {
agent any
stages {
stage('Run Kafka Integration') {
steps {
script {
withCredentials([
string(credentialsId: 'OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING', variable: 'OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING'),
string(credentialsId: 'OCEAN__PORT__CLIENT_ID', variable: 'OCEAN__PORT__CLIENT_ID'),
string(credentialsId: 'OCEAN__PORT__CLIENT_SECRET', variable: 'OCEAN__PORT__CLIENT_SECRET'),
]) {
sh('''
#Set Docker image and run the container
integration_type="kafka"
version="latest"
image_name="ghcr.io/port-labs/port-ocean-${integration_type}:${version}"
docker run -i --rm --platform=linux/amd64 \
-e OCEAN__EVENT_LISTENER='{"type":"ONCE"}' \
-e OCEAN__INITIALIZE_PORT_RESOURCES=true \
-e OCEAN__SEND_RAW_DATA_EXAMPLES=true \
-e OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING=$OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING \
-e OCEAN__PORT__CLIENT_ID=$OCEAN__PORT__CLIENT_ID \
-e OCEAN__PORT__CLIENT_SECRET=$OCEAN__PORT__CLIENT_SECRET \
-e OCEAN__PORT__BASE_URL='https://api.getport.io' \
$image_name
exit $?
''')
}
}
}
}
}
}
This pipeline will run the Kafka integration once and then exit, this is useful for scheduled ingestion of data.
Your Azure Devops agent should be able to run docker commands. Learn more about agents here.
If you want the integration to update Port in real time using webhooks you should use the Real Time & Always On installation option.
Variable groups store values and secrets you'll use in your pipelines across your project. Learn more
Setting Up Your Credentials
- Create a Variable Group: Name it port-ocean-credentials. Store the required variables from the table.
- Authorize Your Pipeline:
- Go to "Library" -> "Variable groups."
- Find port-ocean-credentials and click on it.
- Select "Pipeline Permissions" and add your pipeline to the authorized list.
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | ✅ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | ✅ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | ✅ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | ✅ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | ❌ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | ❌ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | ❌ |
Here is an example for kafka-integration.yml
pipeline file:
trigger:
- main
pool:
vmImage: "ubuntu-latest"
variables:
- group: port-ocean-credentials
steps:
- script: |
# Set Docker image and run the container
integration_type="kafka"
version="latest"
image_name="ghcr.io/port-labs/port-ocean-$integration_type:$version"
docker run -i --rm \
-e OCEAN__EVENT_LISTENER='{"type":"ONCE"}' \
-e OCEAN__INITIALIZE_PORT_RESOURCES=true \
-e OCEAN__SEND_RAW_DATA_EXAMPLES=true \
-e OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING=$(OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING) \
-e OCEAN__PORT__CLIENT_ID=$(OCEAN__PORT__CLIENT_ID) \
-e OCEAN__PORT__CLIENT_SECRET=$(OCEAN__PORT__CLIENT_SECRET) \
-e OCEAN__PORT__BASE_URL='https://api.getport.io' \
$image_name
exit $?
displayName: 'Ingest Data into Port'
This pipeline will run the Kafka integration once and then exit, this is useful for scheduled ingestion of data.
If you want the integration to update Port in real time using webhooks you should use the Real Time & Always On installation option.
Make sure to configure the following GitLab variables:
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | ✅ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | ✅ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | ✅ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | ✅ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | ❌ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | ❌ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | ❌ |
Here is an example for .gitlab-ci.yml
pipeline file:
default:
image: docker:24.0.5
services:
- docker:24.0.5-dind
before_script:
- docker info
variables:
INTEGRATION_TYPE: kafka
VERSION: latest
stages:
- ingest
ingest_data:
stage: ingest
variables:
IMAGE_NAME: ghcr.io/port-labs/port-ocean-$INTEGRATION_TYPE:$VERSION
script:
- |
docker run -i --rm --platform=linux/amd64 \
-e OCEAN__EVENT_LISTENER='{"type":"ONCE"}' \
-e OCEAN__INITIALIZE_PORT_RESOURCES=true \
-e OCEAN__SEND_RAW_DATA_EXAMPLES=true \
-e OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING=$OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING \
-e OCEAN__PORT__CLIENT_ID=$OCEAN__PORT__CLIENT_ID \
-e OCEAN__PORT__CLIENT_SECRET=$OCEAN__PORT__CLIENT_SECRET \
-e OCEAN__PORT__BASE_URL='https://api.getport.io' \
$IMAGE_NAME
rules: # Run only when changes are made to the main branch
- if: '$CI_COMMIT_BRANCH == "main"'
The baseUrl
, port_region
, port.baseUrl
, portBaseUrl
, port_base_url
and OCEAN__PORT__BASE_URL
parameters are used to select which instance or Port API will be used.
Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.
- If you use the EU region of Port (https://app.getport.io), your API URL is
https://api.getport.io
. - If you use the US region of Port (https://app.us.getport.io), your API URL is
https://api.us.getport.io
.
For advanced configuration such as proxies or self-signed certificates, click here.
Ingesting Kafka objects
The Kafka integration uses a YAML configuration to describe the process of loading data into the developer portal.
Here is an example snippet from the config which demonstrates the process for getting cluster
data from Kafka:
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id
The integration makes use of the JQ JSON processor to select, modify, concatenate, transform and perform other operations on existing fields and values from Kafka metadata objects.
Configuration structure
The integration configuration determines which resources will be queried from Kafka, and which entities and properties will be created in Port.
The following resources can be used to map data from Kafka, it is possible to reference any field that appears in the examples below for the mapping configuration.
Cluster example
{
"name": "local",
"controller_id": "1"
}
Broker example
{
"id": "1",
"address": "localhost:9092/1",
"cluster_name": "local",
"config": {"key": "value", ...}
}
Topic example
{
"name": "_consumer_offsets",
"cluster_name": "local",
"partitions": [
{
"id": 0,
"leader": 2,
"replicas": [2, 1, 3],
"isrs": [3, 2, 1]
}
],
"config": {"key": "value", ...}
}
-
The root key of the integration configuration is the
resources
key:resources:
- kind: cluster
selector:
... -
The
kind
key is a specifier for a Kafka object:resources:
- kind: cluster
selector:
... -
The
selector
and thequery
keys allow you to filter which objects of the specifiedkind
will be ingested into your software catalog:resources:
- kind: cluster
selector:
query: "true" # JQ boolean expression. If evaluated to false - this object will be skipped.
port: -
The
port
,entity
and themappings
keys are used to map the Kafka object fields to Port entities. To create multiple mappings of the same kind, you can add another item in theresources
array;resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings: # Mappings between one Kafka cluster to a Port entity. Each value is a JQ query.
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id
- kind: cluster # In this instance cluster is mapped again with a different filter
selector:
query: '.name == "MyClusterName"'
port:
entity:
mappings: ...Blueprint keyNote the value of the
blueprint
key - if you want to use a hardcoded string, you need to encapsulate it in 2 sets of quotes, for example use a pair of single-quotes ('
) and then another pair of double-quotes ("
)
Ingest data into Port
To ingest Kafka objects using the integration configuration, you can follow the steps below:
- Go to the DevPortal Builder page.
- Select a blueprint you want to ingest using Kafka.
- Choose the Ingest Data option from the menu.
- Select Kafka under the Event Processing providers category.
- Modify the configuration according to your needs.
- Click
Resync
.
Examples
Examples of blueprints and the relevant integration configurations:
Cluster
Cluster blueprint
{
"identifier": "kafkaCluster",
"title": "Cluster",
"icon": "Kafka",
"schema": {
"properties": {
"controllerId": {
"title": "Controller ID",
"type": "string"
}
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id
Broker
Broker blueprint
{
"identifier": "kafkaBroker",
"title": "Broker",
"icon": "Kafka",
"schema": {
"properties": {
"address": {
"title": "Address",
"type": "string"
},
"region": {
"title": "Region",
"type": "string"
},
"version": {
"title": "Version",
"type": "string"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: broker
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name
Topic
Topic blueprint
{
"identifier": "kafkaTopic",
"title": "Topic",
"icon": "Kafka",
"schema": {
"properties": {
"replicas": {
"title": "Replicas",
"type": "number"
},
"partitions": {
"title": "Partitions",
"type": "number"
},
"compaction": {
"title": "Compaction",
"type": "boolean"
},
"retention": {
"title": "Retention",
"type": "boolean"
},
"deleteRetentionTime": {
"title": "Delete Retention Time",
"type": "number"
},
"partitionsMetadata": {
"title": "Partitions Metadata",
"type": "array"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
},
"brokers": {
"target": "kafkaBroker",
"required": false,
"many": true
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: topic
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'