-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(cis): add class for all the providers
- Loading branch information
Showing
3 changed files
with
192 additions
and
99 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,68 @@ | ||
from prowler.config.config import timestamp | ||
from csv import DictWriter | ||
from venv import logger | ||
|
||
from prowler.lib.check.compliance_models import ComplianceBaseModel | ||
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput | ||
from prowler.lib.outputs.compliance.models import Azure | ||
from prowler.lib.outputs.csv.csv import generate_csv_fields | ||
from prowler.lib.utils.utils import outputs_unix_timestamp | ||
from prowler.lib.outputs.finding import Finding | ||
|
||
|
||
def generate_compliance_row_cis_azure( | ||
finding, compliance, requirement, attribute, output_options | ||
): | ||
compliance_row = Azure( | ||
Provider=finding.check_metadata.Provider, | ||
Description=compliance.Description, | ||
Subscription=finding.subscription, | ||
AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), | ||
Requirements_Id=requirement.Id, | ||
Requirements_Description=requirement.Description, | ||
Requirements_Attributes_Section=attribute.Section, | ||
Requirements_Attributes_Profile=attribute.Profile, | ||
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, | ||
Requirements_Attributes_Description=attribute.Description, | ||
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, | ||
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, | ||
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, | ||
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, | ||
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, | ||
Requirements_Attributes_DefaultValue=attribute.DefaultValue, | ||
Requirements_Attributes_References=attribute.References, | ||
Status=finding.status, | ||
StatusExtended=finding.status_extended, | ||
ResourceId=finding.resource_id, | ||
ResourceName=finding.resource_name, | ||
CheckId=finding.check_metadata.CheckID, | ||
Muted=finding.muted, | ||
) | ||
csv_header = generate_csv_fields(Azure) | ||
class AzureCIS(ComplianceOutput): | ||
def transform( | ||
self, findings: list[Finding], compliance: ComplianceBaseModel | ||
) -> None: | ||
for finding in findings: | ||
for requirement in compliance.Requirements: | ||
for attribute in requirement.Attributes: | ||
compliance_row = Azure( | ||
Provider=finding.provider, | ||
Description=compliance.Description, | ||
Subscription=finding.subscription, | ||
AssessmentDate=str(finding.timestamp), | ||
Requirements_Id=requirement.Id, | ||
Requirements_Description=requirement.Description, | ||
Requirements_Attributes_Section=attribute.Section, | ||
Requirements_Attributes_Profile=attribute.Profile, | ||
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, | ||
Requirements_Attributes_Description=attribute.Description, | ||
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, | ||
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, | ||
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, | ||
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, | ||
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, | ||
Requirements_Attributes_DefaultValue=attribute.DefaultValue, | ||
Requirements_Attributes_References=attribute.References, | ||
Status=finding.status, | ||
StatusExtended=finding.status_extended, | ||
ResourceId=finding.resource_id, | ||
ResourceName=finding.resource_name, | ||
CheckId=finding.check_id, | ||
Muted=finding.muted, | ||
) | ||
self._data.append(compliance_row) | ||
|
||
return compliance_row, csv_header | ||
def batch_write_data_to_file(self, header: bool) -> None: | ||
try: | ||
if ( | ||
getattr(self, "_file_descriptor", None) | ||
and not self._file_descriptor.closed | ||
and self._data | ||
): | ||
csv_writer = DictWriter( | ||
self._file_descriptor, | ||
fieldnames=[ | ||
field.upper() for field in self._data[0].__dict__.keys() | ||
], | ||
delimiter=";", | ||
) | ||
if header: | ||
csv_writer.writeheader() | ||
for finding in self._data: | ||
for key in list(finding.__dict__.keys()): | ||
finding.__dict__[key.upper()] = finding.__dict__.pop(key) | ||
csv_writer.writerow(finding.dict()) | ||
self._file_descriptor.close() | ||
except Exception as error: | ||
logger.error( | ||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,68 @@ | ||
from prowler.config.config import timestamp | ||
from csv import DictWriter | ||
from venv import logger | ||
|
||
from prowler.lib.check.compliance_models import ComplianceBaseModel | ||
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput | ||
from prowler.lib.outputs.compliance.models import GCP | ||
from prowler.lib.outputs.csv.csv import generate_csv_fields | ||
from prowler.lib.utils.utils import outputs_unix_timestamp | ||
from prowler.lib.outputs.finding import Finding | ||
|
||
|
||
def generate_compliance_row_cis_gcp( | ||
finding, compliance, requirement, attribute, output_options | ||
): | ||
compliance_row = GCP( | ||
Provider=finding.check_metadata.Provider, | ||
Description=compliance.Description, | ||
ProjectId=finding.project_id, | ||
Location=finding.location.lower(), | ||
AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), | ||
Requirements_Id=requirement.Id, | ||
Requirements_Description=requirement.Description, | ||
Requirements_Attributes_Section=attribute.Section, | ||
Requirements_Attributes_Profile=attribute.Profile, | ||
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, | ||
Requirements_Attributes_Description=attribute.Description, | ||
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, | ||
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, | ||
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, | ||
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, | ||
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, | ||
Requirements_Attributes_References=attribute.References, | ||
Status=finding.status, | ||
StatusExtended=finding.status_extended, | ||
ResourceId=finding.resource_id, | ||
ResourceName=finding.resource_name, | ||
CheckId=finding.check_metadata.CheckID, | ||
Muted=finding.muted, | ||
) | ||
csv_header = generate_csv_fields(GCP) | ||
class GCPCIS(ComplianceOutput): | ||
def transform( | ||
self, findings: list[Finding], compliance: ComplianceBaseModel | ||
) -> None: | ||
for finding in findings: | ||
for requirement in compliance.Requirements: | ||
for attribute in requirement.Attributes: | ||
compliance_row = GCP( | ||
Provider=finding.provider, | ||
Description=compliance.Description, | ||
ProjectId=finding.project_id, | ||
Location=finding.location.lower(), | ||
AssessmentDate=str(finding.timestamp), | ||
Requirements_Id=requirement.Id, | ||
Requirements_Description=requirement.Description, | ||
Requirements_Attributes_Section=attribute.Section, | ||
Requirements_Attributes_Profile=attribute.Profile, | ||
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, | ||
Requirements_Attributes_Description=attribute.Description, | ||
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, | ||
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, | ||
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, | ||
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, | ||
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, | ||
Requirements_Attributes_References=attribute.References, | ||
Status=finding.status, | ||
StatusExtended=finding.status_extended, | ||
ResourceId=finding.resource_id, | ||
ResourceName=finding.resource_name, | ||
CheckId=finding.check_id, | ||
Muted=finding.muted, | ||
) | ||
self._data.append(compliance_row) | ||
|
||
return compliance_row, csv_header | ||
def batch_write_data_to_file(self, header: bool) -> None: | ||
try: | ||
if ( | ||
getattr(self, "_file_descriptor", None) | ||
and not self._file_descriptor.closed | ||
and self._data | ||
): | ||
csv_writer = DictWriter( | ||
self._file_descriptor, | ||
fieldnames=[ | ||
field.upper() for field in self._data[0].__dict__.keys() | ||
], | ||
delimiter=";", | ||
) | ||
if header: | ||
csv_writer.writeheader() | ||
for finding in self._data: | ||
for key in list(finding.__dict__.keys()): | ||
finding.__dict__[key.upper()] = finding.__dict__.pop(key) | ||
csv_writer.writerow(finding.dict()) | ||
self._file_descriptor.close() | ||
except Exception as error: | ||
logger.error( | ||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,68 @@ | ||
from prowler.config.config import timestamp | ||
from csv import DictWriter | ||
from venv import logger | ||
|
||
from prowler.lib.check.compliance_models import ComplianceBaseModel | ||
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput | ||
from prowler.lib.outputs.compliance.models import Kubernetes | ||
from prowler.lib.outputs.csv.csv import generate_csv_fields | ||
from prowler.lib.utils.utils import outputs_unix_timestamp | ||
from prowler.lib.outputs.finding import Finding | ||
|
||
|
||
def generate_compliance_row_cis_kubernetes( | ||
finding, compliance, requirement, attribute, output_options, provider | ||
): | ||
compliance_row = Kubernetes( | ||
Provider=finding.check_metadata.Provider, | ||
Description=compliance.Description, | ||
Context=provider.identity.context, | ||
Namespace=finding.namespace, | ||
AssessmentDate=outputs_unix_timestamp(output_options.unix_timestamp, timestamp), | ||
Requirements_Id=requirement.Id, | ||
Requirements_Description=requirement.Description, | ||
Requirements_Attributes_Section=attribute.Section, | ||
Requirements_Attributes_Profile=attribute.Profile, | ||
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, | ||
Requirements_Attributes_Description=attribute.Description, | ||
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, | ||
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, | ||
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, | ||
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, | ||
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, | ||
Requirements_Attributes_References=attribute.References, | ||
Requirements_Attributes_DefaultValue=attribute.DefaultValue, | ||
Status=finding.status, | ||
StatusExtended=finding.status_extended, | ||
ResourceId=finding.resource_id, | ||
CheckId=finding.check_metadata.CheckID, | ||
Muted=finding.muted, | ||
) | ||
csv_header = generate_csv_fields(Kubernetes) | ||
class KubernetesCIS(ComplianceOutput): | ||
def transform( | ||
self, findings: list[Finding], compliance: ComplianceBaseModel | ||
) -> None: | ||
for finding in findings: | ||
for requirement in compliance.Requirements: | ||
for attribute in requirement.Attributes: | ||
compliance_row = Kubernetes( | ||
Provider=finding.check_metadata.Provider, | ||
Description=compliance.Description, | ||
Context=finding.context, | ||
Namespace=finding.namespace, | ||
AssessmentDate=str(finding.timestamp), | ||
Requirements_Id=requirement.Id, | ||
Requirements_Description=requirement.Description, | ||
Requirements_Attributes_Section=attribute.Section, | ||
Requirements_Attributes_Profile=attribute.Profile, | ||
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus, | ||
Requirements_Attributes_Description=attribute.Description, | ||
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement, | ||
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement, | ||
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure, | ||
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure, | ||
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation, | ||
Requirements_Attributes_References=attribute.References, | ||
Requirements_Attributes_DefaultValue=attribute.DefaultValue, | ||
Status=finding.status, | ||
StatusExtended=finding.status_extended, | ||
ResourceId=finding.resource_id, | ||
CheckId=finding.check_id, | ||
Muted=finding.muted, | ||
) | ||
self._data.append(compliance_row) | ||
|
||
return compliance_row, csv_header | ||
def batch_write_data_to_file(self, header: bool) -> None: | ||
try: | ||
if ( | ||
getattr(self, "_file_descriptor", None) | ||
and not self._file_descriptor.closed | ||
and self._data | ||
): | ||
csv_writer = DictWriter( | ||
self._file_descriptor, | ||
fieldnames=[ | ||
field.upper() for field in self._data[0].__dict__.keys() | ||
], | ||
delimiter=";", | ||
) | ||
if header: | ||
csv_writer.writeheader() | ||
for finding in self._data: | ||
for key in list(finding.__dict__.keys()): | ||
finding.__dict__[key.upper()] = finding.__dict__.pop(key) | ||
csv_writer.writerow(finding.dict()) | ||
self._file_descriptor.close() | ||
except Exception as error: | ||
logger.error( | ||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
) |