From 62664d67ec0c917e1b114acddd732f51a4a92438 Mon Sep 17 00:00:00 2001 From: pedrooot Date: Fri, 5 Jul 2024 14:13:42 +0200 Subject: [PATCH] chore(cis): add class for all the providers --- prowler/lib/outputs/compliance/cis_azure.py | 97 ++++++++++++------- prowler/lib/outputs/compliance/cis_gcp.py | 97 ++++++++++++------- .../lib/outputs/compliance/cis_kubernetes.py | 97 ++++++++++++------- 3 files changed, 192 insertions(+), 99 deletions(-) diff --git a/prowler/lib/outputs/compliance/cis_azure.py b/prowler/lib/outputs/compliance/cis_azure.py index a45075a06a5..62be35e1f0a 100644 --- a/prowler/lib/outputs/compliance/cis_azure.py +++ b/prowler/lib/outputs/compliance/cis_azure.py @@ -1,37 +1,68 @@ -from prowler.config.config import timestamp +from csv import DictWriter +from venv import logger + +from prowler.lib.check.compliance_models import ComplianceBaseModel +from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput from prowler.lib.outputs.compliance.models import Azure -from prowler.lib.outputs.csv.csv import generate_csv_fields -from prowler.lib.utils.utils import outputs_unix_timestamp +from prowler.lib.outputs.finding import Finding -def generate_compliance_row_cis_azure( - finding, compliance, requirement, attribute, output_options -): - compliance_row = Azure( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - Subscription=finding.subscription, - AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), - Requirements_Id=requirement.Id, - Requirements_Description=requirement.Description, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_Profile=attribute.Profile, - Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, - Requirements_Attributes_Description=attribute.Description, - Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, - Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, - Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, - Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, - Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, - Requirements_Attributes_DefaultValue=attribute.DefaultValue, - Requirements_Attributes_References=attribute.References, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - ResourceName=finding.resource_name, - CheckId=finding.check_metadata.CheckID, - Muted=finding.muted, - ) - csv_header = generate_csv_fields(Azure) +class AzureCIS(ComplianceOutput): + def transform( + self, findings: list[Finding], compliance: ComplianceBaseModel + ) -> None: + for finding in findings: + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + compliance_row = Azure( + Provider=finding.provider, + Description=compliance.Description, + Subscription=finding.subscription, + AssessmentDate=str(finding.timestamp), + Requirements_Id=requirement.Id, + Requirements_Description=requirement.Description, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_Profile=attribute.Profile, + Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, + Requirements_Attributes_Description=attribute.Description, + Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, + Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, + Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, + Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, + Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, + Requirements_Attributes_DefaultValue=attribute.DefaultValue, + Requirements_Attributes_References=attribute.References, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + ResourceName=finding.resource_name, + CheckId=finding.check_id, + Muted=finding.muted, + ) + self._data.append(compliance_row) - return compliance_row, csv_header + def batch_write_data_to_file(self, header: bool) -> None: + try: + if ( + getattr(self, "_file_descriptor", None) + and not self._file_descriptor.closed + and self._data + ): + csv_writer = DictWriter( + self._file_descriptor, + fieldnames=[ + field.upper() for field in self._data[0].__dict__.keys() + ], + delimiter=";", + ) + if header: + csv_writer.writeheader() + for finding in self._data: + for key in list(finding.__dict__.keys()): + finding.__dict__[key.upper()] = finding.__dict__.pop(key) + csv_writer.writerow(finding.dict()) + self._file_descriptor.close() + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) diff --git a/prowler/lib/outputs/compliance/cis_gcp.py b/prowler/lib/outputs/compliance/cis_gcp.py index 9a4d0cfb539..96507458fc0 100644 --- a/prowler/lib/outputs/compliance/cis_gcp.py +++ b/prowler/lib/outputs/compliance/cis_gcp.py @@ -1,37 +1,68 @@ -from prowler.config.config import timestamp +from csv import DictWriter +from venv import logger + +from prowler.lib.check.compliance_models import ComplianceBaseModel +from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput from prowler.lib.outputs.compliance.models import GCP -from prowler.lib.outputs.csv.csv import generate_csv_fields -from prowler.lib.utils.utils import outputs_unix_timestamp +from prowler.lib.outputs.finding import Finding -def generate_compliance_row_cis_gcp( - finding, compliance, requirement, attribute, output_options -): - compliance_row = GCP( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - ProjectId=finding.project_id, - Location=finding.location.lower(), - AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), - Requirements_Id=requirement.Id, - Requirements_Description=requirement.Description, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_Profile=attribute.Profile, - Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, - Requirements_Attributes_Description=attribute.Description, - Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, - Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, - Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, - Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, - Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, - Requirements_Attributes_References=attribute.References, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - ResourceName=finding.resource_name, - CheckId=finding.check_metadata.CheckID, - Muted=finding.muted, - ) - csv_header = generate_csv_fields(GCP) +class GCPCIS(ComplianceOutput): + def transform( + self, findings: list[Finding], compliance: ComplianceBaseModel + ) -> None: + for finding in findings: + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + compliance_row = GCP( + Provider=finding.provider, + Description=compliance.Description, + ProjectId=finding.project_id, + Location=finding.location.lower(), + AssessmentDate=str(finding.timestamp), + Requirements_Id=requirement.Id, + Requirements_Description=requirement.Description, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_Profile=attribute.Profile, + Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, + Requirements_Attributes_Description=attribute.Description, + Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, + Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, + Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, + Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, + Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, + Requirements_Attributes_References=attribute.References, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + ResourceName=finding.resource_name, + CheckId=finding.check_id, + Muted=finding.muted, + ) + self._data.append(compliance_row) - return compliance_row, csv_header + def batch_write_data_to_file(self, header: bool) -> None: + try: + if ( + getattr(self, "_file_descriptor", None) + and not self._file_descriptor.closed + and self._data + ): + csv_writer = DictWriter( + self._file_descriptor, + fieldnames=[ + field.upper() for field in self._data[0].__dict__.keys() + ], + delimiter=";", + ) + if header: + csv_writer.writeheader() + for finding in self._data: + for key in list(finding.__dict__.keys()): + finding.__dict__[key.upper()] = finding.__dict__.pop(key) + csv_writer.writerow(finding.dict()) + self._file_descriptor.close() + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) diff --git a/prowler/lib/outputs/compliance/cis_kubernetes.py b/prowler/lib/outputs/compliance/cis_kubernetes.py index e711977f566..5926c6382b7 100644 --- a/prowler/lib/outputs/compliance/cis_kubernetes.py +++ b/prowler/lib/outputs/compliance/cis_kubernetes.py @@ -1,37 +1,68 @@ -from prowler.config.config import timestamp +from csv import DictWriter +from venv import logger + +from prowler.lib.check.compliance_models import ComplianceBaseModel +from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput from prowler.lib.outputs.compliance.models import Kubernetes -from prowler.lib.outputs.csv.csv import generate_csv_fields -from prowler.lib.utils.utils import outputs_unix_timestamp +from prowler.lib.outputs.finding import Finding -def generate_compliance_row_cis_kubernetes( - finding, compliance, requirement, attribute, output_options, provider -): - compliance_row = Kubernetes( - Provider=finding.check_metadata.Provider, - Description=compliance.Description, - Context=provider.identity.context, - Namespace=finding.namespace, - AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), - Requirements_Id=requirement.Id, - Requirements_Description=requirement.Description, - Requirements_Attributes_Section=attribute.Section, - Requirements_Attributes_Profile=attribute.Profile, - Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, - Requirements_Attributes_Description=attribute.Description, - Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, - Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, - Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, - Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, - Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, - Requirements_Attributes_References=attribute.References, - Requirements_Attributes_DefaultValue=attribute.DefaultValue, - Status=finding.status, - StatusExtended=finding.status_extended, - ResourceId=finding.resource_id, - CheckId=finding.check_metadata.CheckID, - Muted=finding.muted, - ) - csv_header = generate_csv_fields(Kubernetes) +class KubernetesCIS(ComplianceOutput): + def transform( + self, findings: list[Finding], compliance: ComplianceBaseModel + ) -> None: + for finding in findings: + for requirement in compliance.Requirements: + for attribute in requirement.Attributes: + compliance_row = Kubernetes( + Provider=finding.check_metadata.Provider, + Description=compliance.Description, + Context=finding.context, + Namespace=finding.namespace, + AssessmentDate=str(finding.timestamp), + Requirements_Id=requirement.Id, + Requirements_Description=requirement.Description, + Requirements_Attributes_Section=attribute.Section, + Requirements_Attributes_Profile=attribute.Profile, + Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, + Requirements_Attributes_Description=attribute.Description, + Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, + Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, + Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, + Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, + Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, + Requirements_Attributes_References=attribute.References, + Requirements_Attributes_DefaultValue=attribute.DefaultValue, + Status=finding.status, + StatusExtended=finding.status_extended, + ResourceId=finding.resource_id, + CheckId=finding.check_id, + Muted=finding.muted, + ) + self._data.append(compliance_row) - return compliance_row, csv_header + def batch_write_data_to_file(self, header: bool) -> None: + try: + if ( + getattr(self, "_file_descriptor", None) + and not self._file_descriptor.closed + and self._data + ): + csv_writer = DictWriter( + self._file_descriptor, + fieldnames=[ + field.upper() for field in self._data[0].__dict__.keys() + ], + delimiter=";", + ) + if header: + csv_writer.writeheader() + for finding in self._data: + for key in list(finding.__dict__.keys()): + finding.__dict__[key.upper()] = finding.__dict__.pop(key) + csv_writer.writerow(finding.dict()) + self._file_descriptor.close() + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + )