import azure.functions as func
import logging
import base64
import os
from azure.identity import DefaultAzureCredential
from azure.mgmt.containerservice import ContainerServiceClient
from kubernetes import client, config
from kubernetes.client import V1Job, V1JobSpec, V1PodSpec, V1Container, V1EnvVar, V1ResourceRequirements, V1ObjectMeta, V1PodTemplateSpec, V1LocalObjectReference
import uuid
import yaml
logging.basicConfig(level=logging.DEBUG)
def kspm_dev(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Processing Azure Function request to trigger a Kubernetes job')
try:
req_body = req.get_json()
except ValueError:
return func.HttpResponse("Invalid request body", status_code=400)
job_type = req_body.get("jobType", "default")
if job_type == "k8s-scanner":
required_fields = [
'scanType', 'scanClusterRegion', 'scanClusterName',
'cdMasterId', 'cdVersionHistoryId', 'cdSnapshotVersion', 'cdOrgId', 'scanRequestSecret', 'scanClusterResourceGroup'
]
else:
required_fields = [
'AzureVmName', 'InstanceScanInfoID', 'ScanRequestSecret',
'HostAddress', 'InstancePublicIP', 'InstanceSnapshotInfoID', 'AzureResourceGroup'
]
missing_fields = [field for field in required_fields if field not in req_body]
if missing_fields:
return func.HttpResponse(f"Missing required fields: {', '.join(missing_fields)}", status_code=400)
credential = DefaultAzureCredential()
subscription_id = os.environ.get('AZURE_SUBSCRIPTION_ID')
aks_client = ContainerServiceClient(credential, subscription_id)
kubeconfig_yaml = get_kubeconfig(aks_client, os.environ.get('AZURE_CLUSTER_NAME'), os.environ.get('AZURE_CLUSTER_RESOURCE_GROUP'))
if not kubeconfig_yaml:
return func.HttpResponse(f"Failed to retrieve kubeconfig {subscription_id}", status_code=500)
try:
logging.info("Loading Kubernetes config from kubeconfig YAML")
kubeconfig_dict = yaml.safe_load(kubeconfig_yaml)
logging.info(f"Kubeconfig dictionary: {kubeconfig_dict}")
config.load_kube_config_from_dict(kubeconfig_dict)
except Exception as e:
return func.HttpResponse(f"Failed to load Kubernetes config: {str(e)}", status_code=500)
if job_type == "k8s-scanner":
logging.info("Using k8s-scanner job creation flow")
job = create_k8s_job_k8s_scanner(
req_body,
os.environ.get('AZURE_SUBSCRIPTION_ID'),
os.environ.get('AZURE_TENANT_ID_SCAN'),
os.environ.get('AZURE_CLIENT_ID_SCAN'),
os.environ.get('AZURE_CLIENT_SECRET_SCAN')
)
else:
logging.info("Using default job creation flow")
job = create_k8s_job(
req_body,
os.environ.get('AZURE_SUBSCRIPTION_ID'),
os.environ.get('AZURE_TENANT_ID_SCAN'),
os.environ.get('AZURE_CLIENT_ID_SCAN'),
os.environ.get('AZURE_CLIENT_SECRET_SCAN')
)
batch_v1 = client.BatchV1Api()
try:
api_response = batch_v1.create_namespaced_job(body=job, namespace='default')
logging.info(f"Job created: {api_response.metadata.name}")
return func.HttpResponse(f"Job {api_response.metadata.name} created successfully", status_code=200)
except Exception as e:
logging.error(f"Failed to create Kubernetes job: {str(e)}")
return func.HttpResponse(f"Failed to create job: {str(e)}", status_code=500)
def fix_base64_padding(base64_str):
return base64_str + '=' * (-len(base64_str) % 4)
def get_kubeconfig(aks_client, cluster_name, resource_group):
"""Retrieve the Kubernetes kubeconfig for the AKS cluster and return it as YAML or plain text."""
try:
creds = aks_client.managed_clusters.list_cluster_admin_credentials(resource_group, cluster_name)
logging.info(f"Kubernetes creds: {creds}")
if creds.kubeconfigs and len(creds.kubeconfigs) > 0:
kubeconfig_b64 = creds.kubeconfigs[0].value
if isinstance(kubeconfig_b64, bytearray):
kubeconfig_b64 = kubeconfig_b64.decode('utf-8')
logging.info(f"Kubeconfig Base64 (before decoding): {kubeconfig_b64}")
try:
kubeconfig_yaml = base64.b64decode(fix_base64_padding(kubeconfig_b64)).decode('utf-8')
logging.info(f"Successfully decoded Kubeconfig YAML")
return kubeconfig_yaml
except (base64.binascii.Error, UnicodeDecodeError) as decode_error:
logging.info(f"Base64 decoding failed, assuming plain-text kubeconfig: {decode_error}")
return kubeconfig_b64
else:
logging.error("No kubeconfig found in the response.")
return None
except Exception as e:
logging.error(f"Error retrieving kubeconfig: {str(e)}")
return None
def create_k8s_job(request_body, azuresubscriptionid, azuretenentid, azureclientid, azureclientsecret):
logging.info(f"Creating Kubernetes Job for Azure VM: {request_body['AzureVmName']}")
os_type = request_body["VmOs"]
if os_type == "windows":
image_type = "cdefense/workload-azure-run-command-cli-windows:dev"
else:
image_type = "cdefense/workload-azure-run-command-cli:dev"
logging.info(f"VM Os is {os_type}")
logging.info(f"Setting Job image as {image_type}")
container = V1Container(
name="workload-scanner",
image=image_type,
image_pull_policy="Always",
env=[
V1EnvVar(name="INSTANCE_SCAN_INFO_ID", value=request_body['InstanceScanInfoID']),
V1EnvVar(name="SCAN_REQUEST_SECRET", value=request_body['ScanRequestSecret']),
V1EnvVar(name="HOST_ADDRESS", value=request_body['HostAddress']),
V1EnvVar(name="INSTANCE_PUBLIC_IP", value=request_body['InstancePublicIP']),
V1EnvVar(name="INSTANCE_SNAPSHOT_INFO_ID", value=request_body['InstanceSnapshotInfoID']),
V1EnvVar(name="VM_NAME", value=request_body['AzureVmName']),
V1EnvVar(name="RESOURCE_GROUP_NAME", value=request_body['AzureResourceGroup']),
V1EnvVar(name="WORKLOAD-GOLANG-BACKEND", value="https://acs-backend-uae.clouddefenseai.com/"),
V1EnvVar(name="AZURE_SUBSCRIPTION_ID", value=azuresubscriptionid),
V1EnvVar(name="AZURE_TENANT_ID", value=azuretenentid),
V1EnvVar(name="AZURE_CLIENT_ID", value=azureclientid),
V1EnvVar(name="AZURE_CLIENT_SECRET", value=azureclientsecret),
],
resources=V1ResourceRequirements(
limits={"cpu": "900m", "memory": "912Mi"},
requests={"cpu": "250m", "memory": "256Mi"}
)
)
pod_spec = V1PodSpec(
containers=[container],
restart_policy="Never",
image_pull_secrets=[V1LocalObjectReference(name="acr-secret")]
)
template = V1PodTemplateSpec(
metadata=V1ObjectMeta(labels={"app": "workload-scanner"}),
spec=pod_spec
)
job_spec = V1JobSpec(
template=template,
backoff_limit=0,
ttl_seconds_after_finished=60
)
job_name = f"workloadscan-{uuid.uuid4()}".lower()
job = V1Job(
metadata=V1ObjectMeta(name=job_name),
spec=job_spec
)
logging.info("Kubernetes Job created")
return job
def create_k8s_job_k8s_scanner(request_body, azuresubscriptionid, azuretenentid, azureclientid, azureclientsecret):
logging.info(f"Creating Kubernetes Job for Scanner Cluster: {request_body['scanClusterName']}")
container = V1Container(
name="k8s-scanner",
image="azureonpremimage.azurecr.io/k8s-scanner:prod",
image_pull_policy="Always",
env=[
V1EnvVar(name="SCAN_TYPE", value=request_body['scanType']),
V1EnvVar(name="SCAN_CLUSTER_REGION", value=request_body['scanClusterRegion']),
V1EnvVar(name="SCAN_CLUSTER_NAME", value=request_body['scanClusterName']),
V1EnvVar(name="CD_MASTER_ID", value=request_body['cdMasterId']),
V1EnvVar(name="CD_VERSION_HISTORY_ID", value=request_body['cdVersionHistoryId']),
V1EnvVar(name="CD_SNAPSHOT_VERSION", value=str(request_body['cdSnapshotVersion'])),
V1EnvVar(name="CD_ORG_ID", value=request_body['cdOrgId']),
V1EnvVar(name="ACS_BACKEND_URL", value="https://acs-backend-uae.clouddefenseai.com"),
V1EnvVar(name="SCAN_REQUEST_SECRET", value=request_body['scanRequestSecret']),
V1EnvVar(name="SCAN_CLUSTER_RESOURCE_GROUP", value=request_body['scanClusterResourceGroup']),
V1EnvVar(name="AZURE_SUBSCRIPTION_ID", value=azuresubscriptionid),
V1EnvVar(name="AZURE_TENANT_ID", value=azuretenentid),
V1EnvVar(name="AZURE_CLIENT_ID", value=azureclientid),
V1EnvVar(name="AZURE_CLIENT_SECRET", value=azureclientsecret),
],
resources=V1ResourceRequirements(
limits={"cpu": "1", "memory": "1Gi"},
requests={"cpu": "500m", "memory": "512Mi"}
)
)
pod_spec = V1PodSpec(
containers=[container],
restart_policy="Never",
image_pull_secrets=[V1LocalObjectReference(name="acr-secret")]
)
template = V1PodTemplateSpec(
metadata=V1ObjectMeta(labels={"app": "k8s-scanner"}),
spec=pod_spec
)
job_spec = V1JobSpec(
template=template,
backoff_limit=1,
ttl_seconds_after_finished=60
)
job_name = f"k8s-scanner-{uuid.uuid4()}".lower()
job = V1Job(
metadata=V1ObjectMeta(name=job_name),
spec=job_spec
)
logging.info("Kubernetes k8s-scanner Job created")
return job