Skip to main content

Humanitec Integration

Overview

In this example, you are going to create a github worklow integration to facilitate the ingestion of Humanitec applications, environments, workloads, resources and resource graphs into your port catalog on schedule

Prerequisites
  1. In your GitHub repository, go to Settings > Secrets and add the following secrets:

Port blueprints

Create the following blueprint definitions in port:

Humanitec Application Blueprint
{
"identifier": "humanitecApplication",
"description": "Humanitec Application",
"title": "humanitecApplication",
"icon": "Apps",
"schema": {
"properties": {
"createdAt": {
"type": "string",
"title": "Created At",
"format": "date-time"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"aggregationProperties": {},
"relations": {}
}
Humanitec Environment Blueprint

{
"identifier": "humanitecEnvironment",
"title": "Humanitec Environment",
"icon": "Environment",
"schema": {
"properties": {
"type": {
"title": "Type",
"icon": "DefaultProperty",
"type": "string"
},
"createdAt": {
"type": "string",
"format": "date-time",
"title": "Creation Date",
"description": "The date and time when the environment was created."
},
"lastDeploymentStatus": {
"type": "string",
"title": "Last Deployment Status",
"description": "The status of the last deployment."
},
"lastDeploymentDate": {
"type": "string",
"format": "date-time",
"title": "Last Deployment Date",
"description": "The date and time of the last time the environment was deployed."
},
"lastDeploymentComment": {
"type": "string",
"title": "Last Deployment Comment",
"description": "comment on the last deployment"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"aggregationProperties": {},
"relations": {
"humanitecApplication": {
"title": "Application",
"target": "humanitecApplication",
"required": false,
"many": false
}
}
}
Humanitec Workload Blueprint

{
"identifier": "humanitecWorkload",
"title": "Workload",
"icon": "Cluster",
"schema": {
"properties": {
"class": {
"title": "Class",
"description": "The class of the workload",
"type": "string",
"icon": "DefaultProperty"
},
"driverType": {
"title": "Driver Type",
"description": "The driver type of the workload",
"type": "string",
"icon": "DefaultProperty"
},
"definitionId": {
"title": "Definition ID",
"description": "The definition ID of the workload",
"type": "string",
"icon": "DefaultProperty"
},
"definitionVersionId": {
"title": "Definition Version ID",
"description": "The definition version ID of the workload",
"type": "string",
"icon": "DefaultProperty"
},
"status": {
"title": "Status",
"description": "The status of the workload",
"type": "string",
"icon": "DefaultProperty"
},
"updatedAt": {
"title": "Update Date",
"description": "The date and time when the workload was last updated",
"type": "string",
"format": "date-time",
"icon": "DefaultProperty"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"aggregationProperties": {},
"relations": {
"humanitecEnvironment": {
"title": "Environment",
"target": "humanitecEnvironment",
"required": false,
"many": false
}
}
}
Humanitec Resource Blueprint

{
"identifier": "humanitecResource",
"title": "Humanitec Resource",
"icon": "Microservice",
"schema": {
"properties": {
"type": {
"title": "Type",
"description": "The type of the resource",
"type": "string",
"icon": "DefaultProperty"
},
"class": {
"title": "Class",
"description": "The class of the resource",
"type": "string",
"icon": "DefaultProperty"
},
"resource": {
"title": "Resource",
"description": "The resource",
"type": "object",
"icon": "DefaultProperty"
},
"resourceSchema": {
"title": "Resource Schema",
"description": "The schema of the resource",
"type": "object",
"icon": "DefaultProperty"
},
"guresid": {
"title": "GU Resource ID",
"description": "The GU resource ID",
"type": "string",
"icon": "DefaultProperty"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"aggregationProperties": {},
"relations": {
"humanitecResourceGraph": {
"title": "Depends On",
"description": "Resource Graph",
"target": "humanitecResourceGraph",
"required": false,
"many": true
},
"humanitecWorkload": {
"title": "Humanitec Workload",
"target": "humanitecWorkload",
"required": false,
"many": false
}
}
}
Humanitec Resource Graph Blueprint

{
"identifier": "humanitecResourceGraph",
"description": "Humanitec Resource Graph",
"title": "Resource Graph",
"icon": "Microservice",
"schema": {
"properties": {},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"aggregationProperties": {},
"relations": {
"humanitecResourceGraph": {
"title": "Resource Graph",
"target": "humanitecResourceGraph",
"required": false,
"many": true
}
}
}
Blueprint Properties

You may select the blueprints depending on what you want to track in your Humanitec account.

GitHub Workflow

tip

Fork our humanitec integration repository to get started.

  1. Create the following Python files in a folder name integration folder at the root of your GitHub repository:
    1. main.py - Orchestrates the synchronization of data from Humanitec to Port, ensuring that resource entities are accurately mirrored and updated on your port catalog.
    2. requirements.txt - This file contains the dependencies or necessary external packages need to run the integration
Main Executable Script
main.py

import asyncio
import argparse
import time
import datetime
from decouple import config
import re
import asyncio
from loguru import logger
from clients.humanitec_client import HumanitecClient
from clients.port_client import PortClient
import httpx


class BLUEPRINT:
APPLICATION = "humanitecApplication"
ENVIRONMENT = "humanitecEnvironment"
WORKLOAD = "humanitecWorkload"
RESOURCE_GRAPH = "humanitecResourceGraph"
RESOURCE = "humanitecResource"


class HumanitecExporter:
def __init__(self, port_client, humanitec_client) -> None:
self.port_client = port_client
self.humanitec_client = humanitec_client

@staticmethod
def convert_to_datetime(timestamp: int) -> str:
converted_datetime = datetime.datetime.fromtimestamp(
timestamp / 1000.0, datetime.timezone.utc
)
return converted_datetime.strftime("%Y-%m-%dT%H:%M:%SZ")

@staticmethod
def remove_symbols_and_title_case(input_string: str) -> str:
cleaned_string = re.sub(r"[^A-Za-z0-9\s]", " ", input_string)
title_case_string = cleaned_string.title()
return title_case_string

async def sync_applications(self) -> None:
logger.info(f"Syncing entities for blueprint {BLUEPRINT.APPLICATION}")
applications = await self.humanitec_client.get_all_applications()

def create_entity(application):
return {
"identifier": application["id"],
"title": self.remove_symbols_and_title_case(application["name"]),
"properties": {"createdAt": application["created_at"]},
"relations": {},
}

tasks = [
self.port_client.upsert_entity(
blueprint_id=BLUEPRINT.APPLICATION,
entity_object=create_entity(application),
)
for application in applications
]

await asyncio.gather(*tasks)
logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.APPLICATION}")

async def sync_environments(self) -> None:
logger.info(f"Syncing entities for blueprint {BLUEPRINT.ENVIRONMENT}")
applications = await self.humanitec_client.get_all_applications()

def create_entity(application, environment):
return {
"identifier": environment["id"],
"title": environment["name"],
"properties": {
"type": environment["type"],
"createdAt": environment["created_at"],
"lastDeploymentStatus": environment.get("last_deploy", {}).get(
"status"
),
"lastDeploymentDate": environment.get("last_deploy", {}).get(
"created_at"
),
"lastDeploymentComment": environment.get("last_deploy", {}).get(
"comment"
),
},
"relations": {BLUEPRINT.APPLICATION: application["id"]},
}

tasks = [
self.port_client.upsert_entity(
blueprint_id=BLUEPRINT.ENVIRONMENT,
entity_object=create_entity(application, environment),
)
for application in applications
for environments in [
await humanitec_client.get_all_environments(application)
]
for environment in environments
]
await asyncio.gather(*tasks)
logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.ENVIRONMENT}")

async def sync_workloads(self):
logger.info(f"Syncing entities for blueprint {BLUEPRINT.WORKLOAD}")
def create_workload_entity(resource):
return {
"identifier": resource["res_id"].replace("modules.", ""),
"title": self.remove_symbols_and_title_case(
resource["res_id"].replace("modules.", "")
),
"properties": {
"status": resource["status"],
"class": resource["class"],
"driverType": resource["driver_type"],
"definitionVersionId": resource["def_version_id"],
"definitionId": resource["def_id"],
"updatedAt": resource["updated_at"],
"graphResourceID": resource["gu_res_id"],
},
"relations": {
BLUEPRINT.ENVIRONMENT: resource["env_id"],
},
}

applications = await humanitec_client.get_all_applications()
for application in applications:
environments = await self.humanitec_client.get_all_environments(application)
for environment in environments:
resources = await self.humanitec_client.get_all_resources(
application, environment
)
resource_group = humanitec_client.group_resources_by_type(resources)
tasks = [
self.port_client.upsert_entity(
blueprint_id=BLUEPRINT.WORKLOAD,
entity_object=create_workload_entity(resource),
)
for resource in resource_group.get("modules", [])
if resource and resource["type"] == "workload"
]
await asyncio.gather(*tasks)
logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.WORKLOAD}")

async def sync_resource_graphs(self) -> None:
logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}")

def create_resource_graph_entity(graph_data, include_relations):
entity = {
"identifier": graph_data["guresid"],
"title": self.remove_symbols_and_title_case(graph_data["def_id"]),
"properties": {
"type": graph_data["type"],
"class": graph_data["class"],
"resourceSchema": graph_data["resource_schema"],
"resource": graph_data["resource"],
},
"relations": {},
}
if include_relations:
entity["relations"] = {
BLUEPRINT.RESOURCE_GRAPH: graph_data["depends_on"]
}
return entity

applications = await self.humanitec_client.get_all_applications()
for application in applications:
environments = await self.humanitec_client.get_all_environments(application)
for environment in environments:
resources = await self.humanitec_client.get_all_resources(
application, environment
)
resources = humanitec_client.group_resources_by_type(resources)
modules = resources.get("modules", [])
if not modules:
continue

resource_graph = await humanitec_client.get_all_resource_graphs(modules,
application, environment
)

# First pass: Create entities without relations
tasks = [
self.port_client.upsert_entity(
blueprint_id=BLUEPRINT.RESOURCE_GRAPH,
entity_object=create_resource_graph_entity(
graph_data, include_relations=False
),
)
for graph_data in resource_graph
]
await asyncio.gather(*tasks)

# Second pass: Update entities with relations
tasks = [
self.port_client.upsert_entity(
blueprint_id=BLUEPRINT.RESOURCE_GRAPH,
entity_object=create_resource_graph_entity(
graph_data, include_relations=True
),
)
for graph_data in resource_graph
]
await asyncio.gather(*tasks)
logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE_GRAPH}")

async def enrich_resource_with_graph(self, resource, application, environment):
data = {
"id": resource["gu_res_id"],
"type": resource["type"],
"resource": resource["resource"],
}
response = await humanitec_client.get_resource_graph(
application, environment, [data]
)

resource.update(
{"__resourceGraph": i for i in response if i["type"] == data["type"]}
)
return resource

async def sync_resources(self) -> None:
logger.info(f"Syncing entities for blueprint {BLUEPRINT.RESOURCE}")
def create_resource_entity(resource):
workload_id = (
resource["res_id"].split(".")[1]
if resource["res_id"].split(".")[0].startswith("modules")
else ""
)
return {
"identifier": resource["__resourceGraph"]["guresid"],
"title": self.remove_symbols_and_title_case(resource["def_id"]),
"properties": {
"type": resource["type"],
"class": resource["class"],
"resource": resource["resource"],
"status": resource["status"],
"updateAt": resource["updated_at"],
"driverType": resource["driver_type"],
},
"relations": {
BLUEPRINT.RESOURCE_GRAPH: resource["__resourceGraph"]["depends_on"],
BLUEPRINT.WORKLOAD: workload_id,
},
}

async def fetch_resources(application, environment):
resources = await self.humanitec_client.get_all_resources(
application, environment
)
resources = humanitec_client.group_resources_by_type(resources)
modules = resources.get("modules", [])
if not modules:
return []

tasks = [
self.enrich_resource_with_graph(resource, application, environment)
for resource in modules
]
enriched_resources = await asyncio.gather(*tasks)
return enriched_resources

applications = await self.humanitec_client.get_all_applications()
for application in applications:
environments = await self.humanitec_client.get_all_environments(application)

resource_tasks = [
fetch_resources(application, environment)
for environment in environments
]
all_resources = await asyncio.gather(*resource_tasks)
all_resources = [
resource for sublist in all_resources for resource in sublist
] # Flatten the list

entity_tasks = [
self.port_client.upsert_entity(
blueprint_id=BLUEPRINT.RESOURCE,
entity_object=create_resource_entity(resource),
)
for resource in all_resources
]
await asyncio.gather(*entity_tasks)
logger.info(f"Finished syncing entities for blueprint {BLUEPRINT.RESOURCE}")

async def sync_all(self) -> None:
await self.sync_applications()
await self.sync_environments()
await self.sync_workloads()
await self.sync_resource_graphs()
await self.sync_resources()
logger.info("Event Finished")

async def __call__(self, args) -> None:
await self.sync_all()


if __name__ == "__main__":

def validate_args(args):
required_keys = ["org_id", "api_key", "port_client_id", "port_client_secret"]
missing_keys = [key for key in required_keys if not getattr(args, key)]

if missing_keys:
logger.error(f"The following keys are required: {', '.join(missing_keys)}")
return False
return True

parser = argparse.ArgumentParser()
parser.add_argument(
"--org-id", required=False,default=config("ORG_ID",""), type=str, help="Humanitec organization ID"
)
parser.add_argument("--api-key", required=False,default=config("API_KEY",""), type=str, help="Humanitec API key")
parser.add_argument(
"--api-url",
type=str,
default=config("API_URL","https://api.humanitec.com"),
help="Humanitec API URL",
)
parser.add_argument(
"--port-client-id", type=str, required=False,default=config("PORT_CLIENT_ID",""), help="Port client ID"
)
parser.add_argument(
"--port-client-secret", type=str, required=False,default = config("PORT_CLIENT_SECRET",""), help="Port client secret"
)
args = parser.parse_args()
if not(validate_args(args)):
import sys
sys.exit()

httpx_async_client = httpx.AsyncClient()
port_client = PortClient(
args.port_client_id,
args.port_client_secret,
httpx_async_client=httpx_async_client,
)
humanitec_client = HumanitecClient(
args.org_id,
args.api_key,
api_url=args.api_url,
httpx_async_client=httpx_async_client,
)
exporter = HumanitecExporter(port_client, humanitec_client)
asyncio.run(exporter(args))

Requirements
requirements.txt

python-decouple==3.8
loguru==0.7.2
httpx==0.27.0
loguru==0.7.2
  1. Create the following Python files in a folder named client at the base directory of the integration folder:
    1. port_client.py – Manages authentication and API requests to Port, facilitating the creation and updating of entities within Port's system.
    2. humanitec_client.py – Handles API interactions with Humanitec, including retrieving data with caching mechanisms to optimize performance.
    3. cache.py - Provides an in-memory caching mechanism with thread-safe operations for setting, retrieving, and deleting cache entries asynchronously.
Port Client
port_client.py

import httpx
from typing import Any, Dict
from loguru import logger
from typing import List, Dict, Optional, Union
from .cache import InMemoryCache


class PortClient:
def __init__(self, client_id, client_secret, **kwargs) -> None:
self.httpx_async_client = kwargs.get("httpx_async_client", httpx.AsyncClient())
self.client_id = client_id
self.cache = InMemoryCache()
self.client_secret = client_secret
self.base_url = kwargs.get("base_url", "https://api.getport.io/v1")
self.port_headers = None

async def get_port_access_token(self) -> str:
credentials = {"clientId": self.client_id, "clientSecret": self.client_secret}
endpoint = f"/auth/access_token"
response = await self.send_api_request("POST", endpoint, json=credentials)
access_token = response["accessToken"]
return access_token

async def get_port_headers(self) -> Dict[str, str]:
access_token = await self.get_port_access_token()
port_headers = {"Authorization": f"Bearer {access_token}"}
return port_headers

async def send_api_request(
self,
method: str,
endpoint: str,
headers: Dict[str, str] | None = None,
json: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
url = f"{self.base_url}{endpoint}"
try:
response = await self.httpx_async_client.request(
method, url, headers=headers, json=json
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e.response.text}")
raise
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise

async def upsert_entity(
self, blueprint_id: str, entity_object: Dict[str, Any]
) -> None:
endpoint = f"/blueprints/{blueprint_id}/entities?upsert=true&merge=true"
port_headers = (
self.port_headers if self.port_headers else await self.get_port_headers()
)
response = await self.send_api_request(
"POST", endpoint, headers=port_headers, json=entity_object
)
logger.info(response)
return response

Humanitec Client
humanitec_client.py

import httpx
import asyncio
from typing import Dict, Any, List
import datetime
import re
from loguru import logger
from .cache import InMemoryCache


class CACHE_KEYS:
APPLICATION = "APPLICATION_CACHE_KEY"
ENVIRONMENT = "ENVIRONMENT_CACHE_KEY"
WORKLOAD = "WORKLOAD_CACHE_KEY"
RESOURCE = "RESOURCE_CACHE_KEY"


class HumanitecClient:
def __init__(self, org_id, api_token, **kwargs) -> None:
self.client = kwargs.get("httpx_async_client", httpx.AsyncClient())
self.base_url = (
f"{kwargs.get('base_url','https://api.humanitec.io')}/orgs/{org_id}/"
)
self.api_token = api_token
self.cache = InMemoryCache()
self.port_headers = None

def get_humanitec_headers(self) -> Dict[str, str]:
humanitec_headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
}
return humanitec_headers

async def send_api_request(
self,
method: str,
endpoint: str,
headers: Dict[str, str] | None = None,
json: Dict[str, Any] | None = None,
) -> Any:
url = self.base_url + endpoint
try:
logger.debug(f"Requesting Humanitec data for endpoint: {endpoint}")
response = await self.client.request(
method, url, headers=headers, json=json
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e.response.text}")
raise
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise

async def get_all_applications(self) -> List[Dict[str, Any]]:
if cached_applications := await self.cache.get(CACHE_KEYS.APPLICATION):
logger.info(f"Retrieved {len(cached_applications)} applications from cache")
return list(cached_applications.values())

endpoint = "apps"
humanitec_headers = self.get_humanitec_headers()
applications: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)

await self.cache.set(
CACHE_KEYS.APPLICATION, {app["id"]: app for app in applications}
)
logger.info(f"Received {len(applications)} applications from Humanitec")

return applications

async def get_all_environments(self, app) -> List[Dict[str, Any]]:
if cached_environments := await self.cache.get(CACHE_KEYS.ENVIRONMENT):
cached_environments = cached_environments.get(app["id"], {})
logger.info(
f"Retrieved {len(cached_environments)} environment for {app['id']} from cache"
)
return list(cached_environments.values())

endpoint = f"apps/{app['id']}/envs"
humanitec_headers = self.get_humanitec_headers()
environments: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
await self.cache.set(
CACHE_KEYS.ENVIRONMENT,
{
app["id"]: {
environment["id"]: environment for environment in environments
}
},
)
logger.info(f"Received {len(environments)} environments from Humanitec")
return environments

async def get_all_resources(self, app, env) -> List[Dict[str, Any]]:
if cached_resources := await self.cache.get(CACHE_KEYS.RESOURCE):
cached_resources = cached_resources.get(app["id"], {}).get(env["id"], {})
logger.info(
f"Retrieved {len(cached_resources)} resources from cache for app {app['id']} and env {env['id']}"
)
return list(cached_resources.values())

endpoint = f"apps/{app['id']}/envs/{env['id']}/resources"
humanitec_headers = self.get_humanitec_headers()
resources: List[Dict[str, Any]] = await self.send_api_request(
"GET", endpoint, headers=humanitec_headers
)
await self.cache.set(
CACHE_KEYS.RESOURCE,
{
app["id"]: {
env["id"]: {
resource["gu_res_id"]: resource for resource in resources
}
}
},
)
logger.info(f"Received {len(resources)} resources from Humanitec")
return resources

async def get_resource_graph(
self, app: str, env: str, data: List[Dict[str, Any]]
) -> Any:
endpoint = f"apps/{app['id']}/envs/{env['id']}/resources/graph"
humanitec_headers = self.get_humanitec_headers()
graph = await self.send_api_request(
"POST", endpoint, headers=humanitec_headers, json=data
)

return graph

async def get_all_resource_graphs(
self, modules: List[Dict[str, Any]], app: str, env: str
) -> Any:

def get_resource_graph_request_body(modules):
return [
{
"id": module["gu_res_id"],
"type": module["type"],
"resource": module["resource"],
}
for module in modules
]
data = get_resource_graph_request_body(modules)

graph_entities = await self.get_resource_graph(app, env, data)
logger.info(
f"Received {len(graph_entities)} resource graph entities from app: {app['id']} and env: {env['id']} using data: {data}"
)
return graph_entities

def group_resources_by_type(
self, data: List[Dict[str, Any]]
) -> Dict[str, List[Dict[str, Any]]]:
grouped_resources = {}
for resource in data:
workload_id = resource["res_id"].split(".")[0]
if workload_id not in grouped_resources:
grouped_resources[workload_id] = []
grouped_resources[workload_id].append(resource)
return grouped_resources
Cache
cache.py

import asyncio
from typing import Dict, Any

class InMemoryCache:
def __init__(self):
self.cache = {}
self.lock = asyncio.Lock()

async def set(self, key, data):
"""
Sets or updates a cache entry with the given key.

Parameters:
- key (str): The key to use for the cache entry.
- data (dict): The data to be cached.
"""
async with self.lock:
if key in self.cache:
self.cache[key].update(data)
else:
self.cache[key] = data
return True

async def get(self, key) -> Dict[str, Any]:
"""
Retrieves cached data using the given key.

Parameters:
- key (str): The key to retrieve from the cache.

Returns:
- dict: The cached data associated with the key, or None if not found.
"""
async with self.lock:
return self.cache.get(key, {})

async def delete(self, key):
"""
Deletes cached data associated with the given key.

Parameters:
- key (str): The key to delete from the cache.

Returns:
- bool: True if deletion was successful, False otherwise (key not found).
"""
async with self.lock:
if key in self.cache:
del self.cache[key]
return True
return False

  1. Create the file .github/workflows/humanitec-exporter.yaml in the .github/workflows folder of your repository.
Cron

Adjust the cron expression to fit your schedule. By default, the workflow is set to run at 2:00 AM every Monday ('0 2 * * 1').

GitHub Workflow
humanitec-exporter.yaml
name: Ingest Humanitec Integration Resources

on:
schedule:
- cron: '0 2 * * 1'
workflow_dispatch:

jobs:
ingest-humanitec-resources:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.x'

- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Ingest Entities to Port
env:
PORT_CLIENT_ID: ${{ secrets.PORT_CLIENT_ID }}
PORT_CLIENT_SECRET: ${{ secrets.PORT_CLIENT_SECRET }}
API_KEY: ${{ secrets.HUMANITEC_API_KEY }}
ORG_ID: ${{secrets.HUMANITEC_ORG_ID }}
run: |
python integration/main.py

Done! any change that happens to your application, environment, workloads or resources in Humanitec will be synced to port on the schedule interval defined in the github workflow.