Skip to main content

Trivy

In this example, you will create a trivyVulnerability blueprint that ingests all vulnerabilities in your Trivy result file using a combination of Port's API and webhook functionality.

To ingest the scan results to Port, use a script that sends information about the vulnerabilities according to the webhook configuration.

Prerequisitesโ€‹

Create the following blueprint definition and webhook configuration:

Trivy vulnerability blueprint
{
"identifier": "trivyVulnerability",
"description": "This blueprint represents a Trivy vulnerability in our software catalog",
"title": "Trivy Vulnerability",
"icon": "Trivy",
"schema": {
"properties": {
"version": {
"title": "Version",
"type": "string"
},
"package_name": {
"title": "Package Name",
"type": "string"
},
"url": {
"title": "Primary URL",
"type": "string",
"format": "url"
},
"description": {
"title": "Description",
"type": "string"
},
"target": {
"title": "Target",
"type": "string"
},
"severity": {
"title": "Severity",
"type": "string",
"default": "HIGH",
"enum": ["HIGH", "MEDIUM", "LOW", "CRITICAL", "UNKNOWN"],
"enumColors": {
"HIGH": "red",
"MEDIUM": "yellow",
"LOW": "green",
"CRITICAL": "red",
"UNKNOWN": "purple"
}
},
"data_source": {
"title": "Data Source",
"type": "object"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"relations": {}
}
Trivy webhook configuration
{
"identifier": "trivyVulnerabilityMapper",
"title": "Trivy Vulnerability Mapper",
"description": "A webhook configuration to ingest Trivy vulnerabilities from a file",
"icon": "Trivy",
"mappings": [
{
"blueprint": "trivyVulnerability",
"itemsToParse": ".body.vulnerabilities",
"entity": {
"identifier": ".item.id",
"title": ".item.title",
"properties": {
"version": ".item.version",
"package_name": ".item.pkgName",
"url": ".item.primaryUrl",
"description": ".item.description",
"target": ".item.target",
"severity": ".item.severity",
"data_source": ".item.dataSource"
}
}
}
],
"enabled": true,
"security": {}
}

Working with Port's API and Python scriptโ€‹

Here is an example snippet showing how to integrate Port's API and webhook with your existing pipelines using Python:

Python script example
## Import the needed libraries
import requests
import os
import json
import re

WEBHOOK_URL = os.environ["WEBHOOK_URL"] # The URL for the webhook endpoint provided by Port
PATH_TO_TRIVY_JSON_FILE = os.environ["PATH_TO_TRIVY_JSON_FILE"] # The path to the Trivy result.json file relative to the project folder


def add_entity_to_port(entity_object):
"""A function to create the passed entity in Port using the webhook URL

Params
--------------
entity_object: dict
The entity to add in your Port catalog

Returns
--------------
response: dict
The response object after calling the webhook
"""
headers = {"Accept": "application/json"}
response = requests.post(WEBHOOK_URL, json=entity_object, headers=headers)
return response.json()

def remove_invalid_chars(input_str):
"""A helper function to remove characters that don't meet Port's entity identifier pattern"""
pattern = r"[^A-Za-z0-9@_.:\\/=-]"
clean_identifier = re.sub(pattern, '', input_str)
return clean_identifier

def extract_trivy_scan_data(trivy_file):
with open(trivy_file, 'r') as file:
data = json.load(file)

trivy_results = data.get("Results", [])
trivy_vulnerabilities = []
for result in trivy_results:
target = result.get("Target")
vulnerabilities = result.get("Vulnerabilities", [])
for vulnerability in vulnerabilities:
vulnerability_data = {
"id": remove_invalid_chars(vulnerability.get("VulnerabilityID", "") + "-" + vulnerability.get("PkgName", "")),
"pkgName": vulnerability.get("PkgName", ""),
"version": vulnerability.get("InstalledVersion", ""),
"title": vulnerability.get("Title", ""),
"description": vulnerability.get("Description", ""),
"severity": vulnerability.get("Severity", ""),
"primaryUrl": vulnerability.get("PrimaryURL"),
"dataSource": vulnerability.get("DataSource", {}),
"target": target
}
trivy_vulnerabilities.append(vulnerability_data)

entity_object = {
"vulnerabilities": trivy_vulnerabilities,
}
webhook_response = add_entity_to_port(entity_object)
return webhook_response


trivy_file = PATH_TO_TRIVY_JSON_FILE
response_data = extract_trivy_scan_data(trivy_file)
print(response_data)