Skip to content
Open
14 changes: 12 additions & 2 deletions src/tirith/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import yaml
from types import CodeType

from typing import Any, Dict, List, Tuple, Optional
from typing import Any, Dict, List, Tuple, Optional, Union

from tirith.providers.common import ProviderError
from ..providers import PROVIDERS_DICT
Expand Down Expand Up @@ -81,8 +81,18 @@ def generate_evaluator_result(evaluator_obj, input_data, provider_module):
has_evaluation_passed = None
continue

# Run evaluation on the provider's input value
evaluation_result = evaluator_instance.evaluate(evaluator_input["value"], evaluator_data)
evaluation_result["meta"] = evaluator_input.get("meta")

# Copy metadata and addresses if provided by the provider
if "meta" in evaluator_input:
evaluation_result["meta"] = evaluator_input["meta"]

if "addresses" in evaluator_input:
# Add addresses directly
# TODO: We need to make a model class for the `evaluator_input` and move this validation there
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the type assertion mentioned in #262 (comment) here

                if not isinstance(addresses, list):
                    raise Exception("`addresses` should be a list")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the comment to the top of line 94

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                # TODO: We need to make a model class for the `evaluator_input` and move this validation there
                if not isinstance(addresses, list):
                    raise Exception("`addresses` should be a 
 

evaluation_result["addresses"] = evaluator_input["addresses"]

evaluation_results.append(evaluation_result)
has_valid_evaluation = True

Expand Down
9 changes: 9 additions & 0 deletions src/tirith/prettyprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@

for result_num, result_dict in enumerate(check_dict["result"]):
result_message = result_dict["message"]

# Include addresses in the message if it exists in the result_dict
if "addresses" in result_dict:
addresses = result_dict["addresses"]

Check warning on line 105 in src/tirith/prettyprinter.py

View check run for this annotation

Codecov / codecov/patch

src/tirith/prettyprinter.py#L105

Added line #L105 was not covered by tests
# Format addresses as a comma-separated string
if isinstance(addresses, list) and addresses:
addresses_str = ", ".join(addresses)
result_message = f"{result_message} - (Addresses: `{addresses_str}`)"

Check warning on line 109 in src/tirith/prettyprinter.py

View check run for this annotation

Codecov / codecov/patch

src/tirith/prettyprinter.py#L108-L109

Added lines #L108 - L109 were not covered by tests

if result_dict["passed"]:
print(TermStyle.green(f" {result_num+1}. PASSED: {result_message}"))
elif check_dict["passed"] is None:
Expand Down
8 changes: 7 additions & 1 deletion src/tirith/providers/json/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ def get_value(provider_args: Dict, input_data: Dict) -> List[dict]:
)
]

outputs = [create_result_dict(value=value, meta=None, err=None) for value in values]
# Create result dict with addresses as a separate property, not in meta
outputs = []
for value in values:
result = create_result_dict(value=value, meta=None, err=None)
# Simply use a list directly
result["addresses"] = [key_path]
outputs.append(result)

return outputs

Expand Down
119 changes: 78 additions & 41 deletions src/tirith/providers/terraform_plan/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# input->(list ["a.b","c", "d"],value of resource)
# returns->[any, any, any]
from typing import Iterable, Tuple
from typing import Iterable, Tuple, List, Union, Any
import pydash

from ..common import ProviderError
Expand Down Expand Up @@ -89,6 +89,10 @@
if resource_type in (resource_change["type"], "*"):
is_resource_found = True
input_resource_change_attrs = resource_change["change"]["after"]
# Extract address once for reuse
address = resource_change.get("address")
addresses = [address] if address is not None else []
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a helper method that creates addresses from resource_change dict as I'm seeing this pattern often

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are just repeating it twice or thrice atmost. I dont think we need a helper class for this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeating it twice is already a good candidate for making a helper method. It doesn't hurt much and improves readability :)


# [local_is_found_attribute] (local scope)
# Used to decide whether to append a None value for each specific resource that's missing the attribute
if input_resource_change_attrs:
Expand All @@ -97,24 +101,36 @@
is_attribute_found = True
local_is_found_attribute = True
attribute_value = input_resource_change_attrs[attribute]
outputs.append(
{
"value": attribute_value,
"meta": resource_change,
"err": None,
}
)
result = {
"value": attribute_value,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)
elif "." in attribute or "*" in attribute:
evaluated_outputs = _wrapper_get_exp_attribute(attribute, input_resource_change_attrs)
if evaluated_outputs:
is_attribute_found = True
local_is_found_attribute = True
for evaluated_output in evaluated_outputs:
outputs.append({"value": evaluated_output, "meta": resource_change, "err": None})
result = {
"value": evaluated_output,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)

# If we didn't find the attribute in this resource, add a None value so it still gets evaluated
if not local_is_found_attribute:
outputs.append({"value": None, "meta": resource_change, "err": None})
result = {
"value": None,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)
else:
outputs.append(
{
Expand Down Expand Up @@ -153,14 +169,17 @@
continue
if resource_type in (resource_change["type"], "*"):
is_resource_type_found = True
# Extract address once for reuse
address = resource_change.get("address")
addresses = [address] if address is not None else []
for action in resource_change["change"]["actions"]:
outputs.append(
{
"value": action,
"meta": resource_change,
"err": None,
}
)
result = {
"value": action,
"meta": resource_change,
"err": None,
"addresses": addresses,
}
outputs.append(result)
if not is_resource_type_found:
outputs.append(
{
Expand All @@ -175,6 +194,7 @@
elif input_type == "count":
count = 0
resource_meta = {}
addresses = []
resource_type = provider_inputs["terraform_resource_type"]
for resource_change in resource_changes:
# Skip if resource type is in exclude_resource_types when using wildcard
Expand All @@ -184,15 +204,13 @@
# No need to check if the resource is not found
# because the count of a resource can be zero
resource_meta = resource_change
# Add the address to our list of addresses if available
if "address" in resource_change:
addresses.append(resource_change["address"])
count += 1

outputs.append(
{
"value": count,
"meta": resource_meta,
"err": None,
}
)
result = {"value": count, "meta": resource_meta, "err": None, "addresses": addresses}
outputs.append(result)
return outputs
# CASE 4
elif input_type == "direct_dependencies":
Expand Down Expand Up @@ -252,7 +270,6 @@
continue

is_provider_full_name_found = True

attribute_value = None

if attribute_to_get == "version_constraint":
Expand All @@ -271,12 +288,9 @@
}
)
return
outputs.append(
{
"value": attribute_value,
"meta": provider_config_dict,
}
)

result = {"value": attribute_value, "meta": provider_config_dict, "addresses": [terraform_provider_full_name]}
outputs.append(result)

if not is_provider_full_name_found:
outputs.append(
Expand All @@ -297,7 +311,10 @@
:param provider_inputs: The provider inputs
:param outputs: The outputs
"""
outputs.append({"value": input_data.get("terraform_version"), "meta": input_data})
# For terraform_version, there's no specific address as it applies to the entire plan
outputs.append(
{"value": input_data.get("terraform_version"), "meta": input_data, "addresses": ["terraform_version"]}
)


def direct_dependencies_operator(input_data: dict, provider_inputs: dict, outputs: list):
Expand All @@ -316,12 +333,16 @@
is_resource_found = False

for resource in config_resources:

if resource.get("type") != resource_type:
continue
is_resource_found = True
deps_resource_type = {resource_id.split(".")[0] for resource_id in resource.get("depends_on", [])}
outputs.append({"value": list(deps_resource_type), "meta": config_resources})
result = {"value": list(deps_resource_type), "meta": config_resources}
# Add addresses if available
address = resource.get("address")
if address:
result["addresses"] = [address]
outputs.append(result)

if not is_resource_found:
outputs.append(
Expand Down Expand Up @@ -386,13 +407,20 @@
reference_address = f"{module_path}.{relative_reference_address}"
if reference_address in reference_target_addresses:
reference_target_addresses.remove(reference_address)
outputs.append(
{"value": True, "meta": {"address": reference_address, "referenced_by": resource_config}}
)
result = {

Check warning on line 410 in src/tirith/providers/terraform_plan/handler.py

View check run for this annotation

Codecov / codecov/patch

src/tirith/providers/terraform_plan/handler.py#L410

Added line #L410 was not covered by tests
"value": True,
"meta": {"referenced_by": resource_config},
"addresses": [reference_address],
}

outputs.append(result)

Check warning on line 416 in src/tirith/providers/terraform_plan/handler.py

View check run for this annotation

Codecov / codecov/patch

src/tirith/providers/terraform_plan/handler.py#L416

Added line #L416 was not covered by tests

# For all of the reference_target_addresses that don't have a reference
for reference_target_address in reference_target_addresses:
outputs.append({"value": False, "meta": {"address": reference_target_address, "referenced_by": {}}})
for reference_target_addresses_item in reference_target_addresses:
result = {"value": False, "meta": {"referenced_by": {}}}
# Add addresses as a simple list
result["addresses"] = [reference_target_addresses_item]
outputs.append(result)


def get_module_resources_by_type_recursive(module: dict, resource_type: str, current_module_path: str = "") -> iter:
Expand Down Expand Up @@ -478,7 +506,11 @@
return

is_all_resource_type_references_to = resource_type_count == reference_count
outputs.append({"value": is_all_resource_type_references_to, "meta": config_resources})
result = {"value": is_all_resource_type_references_to, "meta": config_resources}

Check warning on line 509 in src/tirith/providers/terraform_plan/handler.py

View check run for this annotation

Codecov / codecov/patch

src/tirith/providers/terraform_plan/handler.py#L509

Added line #L509 was not covered by tests
# Simple list with resource type
# TODO: Use the real specific addresses
result["addresses"] = [resource_type]
outputs.append(result)

Check warning on line 513 in src/tirith/providers/terraform_plan/handler.py

View check run for this annotation

Codecov / codecov/patch

src/tirith/providers/terraform_plan/handler.py#L512-L513

Added lines #L512 - L513 were not covered by tests
Comment thread
refeed marked this conversation as resolved.


def direct_references_operator(input_data: dict, provider_inputs: dict, outputs: list):
Expand Down Expand Up @@ -530,7 +562,12 @@
# Only get the resource type
resource_references.add(reference.split(".")[0])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the address from the reference

addresses.append(reference)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@refeed I am unable to understand why do we need to do this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that's the real addresses the optype is processing. The resource var that's placed in the meta key was actually not placed correctly, it only contains one resource even though this optype is processing more than one resources


outputs.append({"value": list(resource_references), "meta": resource})
result = {"value": list(resource_references), "meta": resource}
# Add addresses if available as a simple list
address = resource.get("address")
if address:
result["addresses"] = [address]
outputs.append(result)

if not is_resource_found:
outputs.append(
Expand Down