KSPM On-Prem Scan Setup

Modified on Tue, 21 Jan at 11:17 AM

Steps to install on-prem KSPM scans:



Architecture:

Important Notes :


Select Azure Function plan which allows Vnet Integration and setup Vnet integration between Azure Function and Azure Kubernetes Cluster which is getting used for launching AKS Job for scan


If Private Kubernetes Cluster Which you wanted to scan is present on different Vnet then create Vnet Peering to This Cluster Vnet to Azure Function Vnet 

and also make sure that Subnet IP is not overlapping on this Vnet peering which can cause in connection.



1. Install an Azure Function at the Customer’s End:


Create an Azure Function with permissions to trigger jobs for AKS (Azure Kubernetes Services) in their environment.


Permissions required for the Azure Function


To trigger AKS jobs, the Azure Function requires the following roles:


  • Azure Kubernetes Service Cluster User Role
  • Azure Kubernetes Service Contributor Role

These roles will be added from identity page using system assigned as "On"



2. Allow CloudDefense to invoke the Azure Function with minimum access:


This Azure Function will be triggered by CloudDefense’s backend. Therefore, the customer should provide CloudDefense with the privilege to invoke the Azure Function by adding the following information into our KSPM Cluster Configure page (access to invoke function):



Function Url (with function key)



Azure Function code (python):



import azure.functions as func
import logging
import base64
import os
from azure.identity import DefaultAzureCredential
from azure.mgmt.containerservice import ContainerServiceClient
from kubernetes import client, config
from kubernetes.client import V1Job, V1JobSpec, V1PodSpec, V1Container, V1EnvVar, V1ResourceRequirements, V1ObjectMeta, V1PodTemplateSpec, V1LocalObjectReference
import uuid
import yaml
logging.basicConfig(level=logging.DEBUG)
def kspm_dev(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Processing Azure Function request to trigger a Kubernetes job')
    # Parse request body
    try:
        req_body = req.get_json()
    except ValueError:
        return func.HttpResponse("Invalid request body", status_code=400)
    job_type = req_body.get("jobType", "default")
    # Define required fields based on the job type
    if job_type == "k8s-scanner":
        required_fields = [
            'scanType', 'scanClusterRegion', 'scanClusterName',
            'cdMasterId', 'cdVersionHistoryId', 'cdSnapshotVersion', 'cdOrgId', 'scanRequestSecret', 'scanClusterResourceGroup'
        ]
    else:
        required_fields = [
            'AzureVmName', 'InstanceScanInfoID', 'ScanRequestSecret',
            'HostAddress', 'InstancePublicIP', 'InstanceSnapshotInfoID', 'AzureResourceGroup'
        ]
    missing_fields = [field for field in required_fields if field not in req_body]
    if missing_fields:
        return func.HttpResponse(f"Missing required fields: {', '.join(missing_fields)}", status_code=400)
    # Use managed identity to authenticate with Azure
    credential = DefaultAzureCredential()
    subscription_id = os.environ.get('AZURE_SUBSCRIPTION_ID')
    # Initialize AKS client and retrieve kubeconfig using managed identity
    aks_client = ContainerServiceClient(credential, subscription_id)
    kubeconfig_yaml = get_kubeconfig(aks_client, os.environ.get('AZURE_CLUSTER_NAME'), os.environ.get('AZURE_CLUSTER_RESOURCE_GROUP'))
    if not kubeconfig_yaml:
        return func.HttpResponse(f"Failed to retrieve kubeconfig {subscription_id}", status_code=500)
    # Load Kubernetes config into the client
    try:
        logging.info("Loading Kubernetes config from kubeconfig YAML")
        kubeconfig_dict = yaml.safe_load(kubeconfig_yaml)
        logging.info(f"Kubeconfig dictionary: {kubeconfig_dict}")
        config.load_kube_config_from_dict(kubeconfig_dict)
    except Exception as e:
        return func.HttpResponse(f"Failed to load Kubernetes config: {str(e)}", status_code=500)
    if job_type == "k8s-scanner":
        logging.info("Using k8s-scanner job creation flow")
        job = create_k8s_job_k8s_scanner(
            req_body,
            os.environ.get('AZURE_SUBSCRIPTION_ID'),
            os.environ.get('AZURE_TENANT_ID_SCAN'),
            os.environ.get('AZURE_CLIENT_ID_SCAN'),
            os.environ.get('AZURE_CLIENT_SECRET_SCAN')
        )
    else:
        logging.info("Using default job creation flow")
        job = create_k8s_job(
            req_body,
            os.environ.get('AZURE_SUBSCRIPTION_ID'),
            os.environ.get('AZURE_TENANT_ID_SCAN'),
            os.environ.get('AZURE_CLIENT_ID_SCAN'),
            os.environ.get('AZURE_CLIENT_SECRET_SCAN')
        )
    batch_v1 = client.BatchV1Api()
    try:
        api_response = batch_v1.create_namespaced_job(body=job, namespace='default')
        logging.info(f"Job created: {api_response.metadata.name}")
        return func.HttpResponse(f"Job {api_response.metadata.name} created successfully", status_code=200)
    except Exception as e:
        logging.error(f"Failed to create Kubernetes job: {str(e)}")
        return func.HttpResponse(f"Failed to create job: {str(e)}", status_code=500)
def fix_base64_padding(base64_str):
    return base64_str + '=' * (-len(base64_str) % 4)
def get_kubeconfig(aks_client, cluster_name, resource_group):
    """Retrieve the Kubernetes kubeconfig for the AKS cluster and return it as YAML or plain text."""
    try:
        creds = aks_client.managed_clusters.list_cluster_admin_credentials(resource_group, cluster_name)
        logging.info(f"Kubernetes creds: {creds}")
        if creds.kubeconfigs and len(creds.kubeconfigs) > 0:
            kubeconfig_b64 = creds.kubeconfigs[0].value
            if isinstance(kubeconfig_b64, bytearray):
                kubeconfig_b64 = kubeconfig_b64.decode('utf-8')
            logging.info(f"Kubeconfig Base64 (before decoding): {kubeconfig_b64}")
            try:
                kubeconfig_yaml = base64.b64decode(fix_base64_padding(kubeconfig_b64)).decode('utf-8')
                logging.info(f"Successfully decoded Kubeconfig YAML")
                return kubeconfig_yaml
            except (base64.binascii.Error, UnicodeDecodeError) as decode_error:
                logging.info(f"Base64 decoding failed, assuming plain-text kubeconfig: {decode_error}")
                return kubeconfig_b64
        else:
            logging.error("No kubeconfig found in the response.")
            return None
    except Exception as e:
        logging.error(f"Error retrieving kubeconfig: {str(e)}")
        return None
def create_k8s_job(request_body, azuresubscriptionid, azuretenentid, azureclientid, azureclientsecret):
    logging.info(f"Creating Kubernetes Job for Azure VM: {request_body['AzureVmName']}")
    os_type = request_body["VmOs"]
    if os_type == "windows":
        image_type  = "cdefense/workload-azure-run-command-cli-windows:dev"
    else:
        image_type = "cdefense/workload-azure-run-command-cli:dev"
    logging.info(f"VM Os is {os_type}")
    logging.info(f"Setting Job image as {image_type}")
    # Updated container configuration to use ACR
    container = V1Container(
        name="workload-scanner",
        # Update the image path to use your ACR repository
        image=image_type,
        image_pull_policy="Always",
        env=[
            V1EnvVar(name="INSTANCE_SCAN_INFO_ID", value=request_body['InstanceScanInfoID']),
            V1EnvVar(name="SCAN_REQUEST_SECRET", value=request_body['ScanRequestSecret']),
            V1EnvVar(name="HOST_ADDRESS", value=request_body['HostAddress']),
            V1EnvVar(name="INSTANCE_PUBLIC_IP", value=request_body['InstancePublicIP']),
            V1EnvVar(name="INSTANCE_SNAPSHOT_INFO_ID", value=request_body['InstanceSnapshotInfoID']),
            V1EnvVar(name="VM_NAME", value=request_body['AzureVmName']),
            V1EnvVar(name="RESOURCE_GROUP_NAME", value=request_body['AzureResourceGroup']),
            V1EnvVar(name="WORKLOAD-GOLANG-BACKEND", value="https://acs-backend-uae.clouddefenseai.com/"),
            V1EnvVar(name="AZURE_SUBSCRIPTION_ID", value=azuresubscriptionid),
            V1EnvVar(name="AZURE_TENANT_ID", value=azuretenentid),
            V1EnvVar(name="AZURE_CLIENT_ID", value=azureclientid),
            V1EnvVar(name="AZURE_CLIENT_SECRET", value=azureclientsecret),
        ],
        resources=V1ResourceRequirements(
            limits={"cpu": "900m", "memory": "912Mi"},
            requests={"cpu": "250m", "memory": "256Mi"}
        )
    )
    # Create pod spec with image pull secret
    pod_spec = V1PodSpec(
        containers=[container],
        restart_policy="Never",
        image_pull_secrets=[V1LocalObjectReference(name="acr-secret")]  # Add the image pull secret
    )
    template = V1PodTemplateSpec(
        metadata=V1ObjectMeta(labels={"app": "workload-scanner"}),
        spec=pod_spec
    )
    job_spec = V1JobSpec(
        template=template, 
        backoff_limit=0,
        ttl_seconds_after_finished=60
    )
    job_name = f"workloadscan-{uuid.uuid4()}".lower()
    job = V1Job(
       metadata=V1ObjectMeta(name=job_name),
        spec=job_spec
    )
    logging.info("Kubernetes Job created")
    return job
def create_k8s_job_k8s_scanner(request_body, azuresubscriptionid, azuretenentid, azureclientid, azureclientsecret):
    logging.info(f"Creating Kubernetes Job for Scanner Cluster: {request_body['scanClusterName']}")
    container = V1Container(
        name="k8s-scanner",
        image="azureonpremimage.azurecr.io/k8s-scanner:prod",
        image_pull_policy="Always",
        env=[
            V1EnvVar(name="SCAN_TYPE", value=request_body['scanType']),
            V1EnvVar(name="SCAN_CLUSTER_REGION", value=request_body['scanClusterRegion']),
            V1EnvVar(name="SCAN_CLUSTER_NAME", value=request_body['scanClusterName']),
            V1EnvVar(name="CD_MASTER_ID", value=request_body['cdMasterId']),
            V1EnvVar(name="CD_VERSION_HISTORY_ID", value=request_body['cdVersionHistoryId']),
            V1EnvVar(name="CD_SNAPSHOT_VERSION", value=str(request_body['cdSnapshotVersion'])),
            V1EnvVar(name="CD_ORG_ID", value=request_body['cdOrgId']),
            V1EnvVar(name="ACS_BACKEND_URL", value="https://acs-backend-uae.clouddefenseai.com"),
            V1EnvVar(name="SCAN_REQUEST_SECRET", value=request_body['scanRequestSecret']),
            V1EnvVar(name="SCAN_CLUSTER_RESOURCE_GROUP", value=request_body['scanClusterResourceGroup']),
            V1EnvVar(name="AZURE_SUBSCRIPTION_ID", value=azuresubscriptionid),
            V1EnvVar(name="AZURE_TENANT_ID", value=azuretenentid),
            V1EnvVar(name="AZURE_CLIENT_ID", value=azureclientid),
            V1EnvVar(name="AZURE_CLIENT_SECRET", value=azureclientsecret),
        ],
        resources=V1ResourceRequirements(
            limits={"cpu": "1", "memory": "1Gi"},
            requests={"cpu": "500m", "memory": "512Mi"}
        )
    )
    pod_spec = V1PodSpec(
       containers=[container],
        restart_policy="Never",
        image_pull_secrets=[V1LocalObjectReference(name="acr-secret")]
    )
    template = V1PodTemplateSpec(
        metadata=V1ObjectMeta(labels={"app": "k8s-scanner"}),
        spec=pod_spec
    )
    job_spec = V1JobSpec(
        template=template,
        backoff_limit=1,
        ttl_seconds_after_finished=60
    )
    job_name = f"k8s-scanner-{uuid.uuid4()}".lower()
    job = V1Job(
        metadata=V1ObjectMeta(name=job_name),
        spec=job_spec
    )
    logging.info("Kubernetes k8s-scanner Job created")
    return job


requirement.txt


azure-functions
azure-identity
azure-mgmt-containerservice
kubernetes
pyyaml


Once azure function is ready and deployed. Customer's needs to set few environment keys which would be


  • Name of the resource group for AKS

  • Name of the AKS cluster



Azure client secret, clientid, tenantid, subscriptionid,  which needs to have below role and permission


  • Azure Kubernetes Service RBAC Reader
  • Microsoft.ContainerService/managedClusters/listClusterUserCredential/action  permission 


Note: We need Azure clientid, clientsecret, tenanatid, subsscriptionid set as env variable because these credentials will be used by AKS jobs to run kubernetes scan on target private k8s cluster



3. AKS at the Customer’s End



Once the Azure Function is set up with sufficient permissions, the customer should create a Kubernetes cluster in their Azure environment, which can launch jobs using the following image:


azureonpremimage.azurecr.io/k8s-scanner:prod


The purpose of this job is to launch kubernetes scan on the targeted private kubernetes cluster.



4. WhiteList our load balancer



Once AKS and Azure function setup is done . To send data from customer network to CloudDefense, customers need to whitelist our Ingress DNS. 


Our Ingress DNS is:


k8s-prod-acsingre-2967d5e127-442314495.me-central-1.elb.amazonaws.com 


Once these setups are ready. Customers can directly come to our ACS platform and run the Kubernetes scan from the UI. 





Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select at least one of the reasons

Feedback sent

We appreciate your effort and will try to fix the article