Skip to main content

Trivy

In this example, you will create a trivyVulnerability blueprint that ingests all vulnerabilities in your Trivy result file using a combination of Port's API and webhook functionality.

Recommended installation option

While the script provided in this example facilitates scheduled ingestion of Trivy scan results to Port, we highly recommend that you use our Trivy Kubernetes exporter to continuously scan your kubernetes cluster and ingest vulnerabilities to Port in real time.

To ingest the scan results to Port, use a script that sends information about the vulnerabilities according to the webhook configuration.

Prerequisites​

Create the following blueprint definition and webhook configuration:

Trivy vulnerability blueprint
{
"identifier": "trivyVulnerability",
"description": "This blueprint represents a Trivy vulnerability in our software catalog",
"title": "Trivy Vulnerability",
"icon": "Trivy",
"schema": {
"properties": {
"version": {
"title": "Version",
"type": "string"
},
"package_name": {
"title": "Package Name",
"type": "string"
},
"url": {
"title": "Primary URL",
"type": "string",
"format": "url"
},
"description": {
"title": "Description",
"type": "string"
},
"target": {
"title": "Target",
"type": "string"
},
"severity": {
"title": "Severity",
"type": "string",
"default": "HIGH",
"enum": ["HIGH", "MEDIUM", "LOW", "CRITICAL", "UNKNOWN"],
"enumColors": {
"HIGH": "red",
"MEDIUM": "yellow",
"LOW": "green",
"CRITICAL": "red",
"UNKNOWN": "purple"
}
},
"data_source": {
"title": "Data Source",
"type": "object"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"relations": {}
}
Trivy webhook configuration
{
"identifier": "trivyVulnerabilityMapper",
"title": "Trivy Vulnerability Mapper",
"description": "A webhook configuration to ingest Trivy vulnerabilities from a file",
"icon": "Trivy",
"mappings": [
{
"blueprint": "trivyVulnerability",
"itemsToParse": ".body.vulnerabilities",
"entity": {
"identifier": ".item.id",
"title": ".item.title",
"properties": {
"version": ".item.version",
"package_name": ".item.pkgName",
"url": ".item.primaryUrl",
"description": ".item.description",
"target": ".item.target",
"severity": ".item.severity",
"data_source": ".item.dataSource"
}
}
}
],
"enabled": true,
"security": {}
}

Working with Port's API and Python script​

Here is an example snippet showing how to integrate Port's API and webhook with your existing pipelines using Python:

Python script example
## Import the needed libraries
import requests
import os
import json
import re

WEBHOOK_URL = os.environ["WEBHOOK_URL"] # The URL for the webhook endpoint provided by Port
PATH_TO_TRIVY_JSON_FILE = os.environ["PATH_TO_TRIVY_JSON_FILE"] # The path to the Trivy result.json file relative to the project folder


def add_entity_to_port(entity_object):
"""A function to create the passed entity in Port using the webhook URL

Params
--------------
entity_object: dict
The entity to add in your Port catalog

Returns
--------------
response: dict
The response object after calling the webhook
"""
headers = {"Accept": "application/json"}
response = requests.post(WEBHOOK_URL, json=entity_object, headers=headers)
return response.json()

def remove_invalid_chars(input_str):
"""A helper function to remove characters that don't meet Port's entity identifier pattern"""
pattern = r"[^A-Za-z0-9@_.:\\/=-]"
clean_identifier = re.sub(pattern, '', input_str)
return clean_identifier

def extract_trivy_scan_data(trivy_file):
with open(trivy_file, 'r') as file:
data = json.load(file)

trivy_results = data.get("Results", [])
trivy_vulnerabilities = []
for result in trivy_results:
target = result.get("Target")
vulnerabilities = result.get("Vulnerabilities", [])
for vulnerability in vulnerabilities:
vulnerability_data = {
"id": remove_invalid_chars(vulnerability.get("VulnerabilityID", "") + "-" + vulnerability.get("PkgName", "")),
"pkgName": vulnerability.get("PkgName", ""),
"version": vulnerability.get("InstalledVersion", ""),
"title": vulnerability.get("Title", ""),
"description": vulnerability.get("Description", ""),
"severity": vulnerability.get("Severity", ""),
"primaryUrl": vulnerability.get("PrimaryURL"),
"dataSource": vulnerability.get("DataSource", {}),
"target": target
}
trivy_vulnerabilities.append(vulnerability_data)

entity_object = {
"vulnerabilities": trivy_vulnerabilities,
}
webhook_response = add_entity_to_port(entity_object)
return webhook_response


trivy_file = PATH_TO_TRIVY_JSON_FILE
response_data = extract_trivy_scan_data(trivy_file)
print(response_data)