Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions alert_system/etl/base/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.utils import timezone

from alert_system.helpers import build_stac_search
from alert_system.models import Connector, ExtractionItem, LoadItem
from alert_system.models import Connector, ExtractionItem, ImpactDetailsEnum, LoadItem

from .config import ExtractionConfig
from .loader import BaseLoaderClass
Expand Down Expand Up @@ -267,7 +267,11 @@ def _impact_filter(self, impact_metadata: list[dict]) -> str:
filters = []

for data in impact_metadata or []:
if data.get("category") and data.get("type") and data.get("value") is not None:
if (
data.get("category") == ImpactDetailsEnum.Category.PEOPLE
and data.get("type") == ImpactDetailsEnum.Type.AFFECTED_TOTAL # TODO: Add other possible types here.
and data.get("value") is not None
):
filters.append(
f"monty:impact_detail.category = '{data['category']}' AND "
f"monty:impact_detail.type = '{data['type']}' AND "
Expand Down
48 changes: 28 additions & 20 deletions alert_system/etl/gdacs_flood/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ class GdacsTransformer(BaseTransformerClass):
Extracts and normalizes impact fields, computes derived values, and stores metadata.
"""

# NOTE: This logic might change in future
# NOTE: This logic might change in future. Currently only creating a function for future use.
def compute_people_exposed(self, metadata_list) -> Optional[int]:
for data in metadata_list:
if data["category"] == ImpactDetailsEnum.Category.PEOPLE and data["type"] == ImpactDetailsEnum.Type.AFFECTED_TOTAL:
return data["value"]
return None

# NOTE: This logic might change in future
# NOTE: This logic might change in future. Currently only creating a function for future use.
def compute_buildings_exposed(self, metadata_list) -> Optional[int]:
"""
Compute the 'buildings_exposed' field.
Expand All @@ -30,28 +30,36 @@ def compute_buildings_exposed(self, metadata_list) -> Optional[int]:
return data["value"]
return None

# NOTE: This logic will change with changes in montandon.
def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
metadata = []
meta_hash_map = {}

for item in impact_items:
properties = item.resp_data.get("properties", {})
impact_detail = properties.get("monty:impact_detail", {})
category = impact_detail.get("category")
type_ = impact_detail.get("type")
value = impact_detail.get("value")
if category == ImpactDetailsEnum.Category.PEOPLE and type_ == ImpactDetailsEnum.Type.AFFECTED_TOTAL:
metadata = [
{
"category": category,
"type": type_,
"value": value,
"unit": impact_detail.get("unit", ""),
"estimate_type": impact_detail.get("estimate_type", ""),
}
]
detail = properties.get("monty:impact_detail", {})

category = detail.get("category")
type_ = detail.get("type")
value = detail.get("value")
if value in (None, -1):
value = 0
key = (category, type_)
meta_hash_map[key] = meta_hash_map.get(key, 0) + value

metadata = [
{
"category": category,
"type": type_,
"value": value,
}
for (category, type_), value in meta_hash_map.items()
if category == ImpactDetailsEnum.Category.PEOPLE
]

return {
"people_exposed": self.compute_people_exposed(metadata),
"buildings_exposed": self.compute_buildings_exposed(metadata),
"people_exposed": meta_hash_map.get(
(ImpactDetailsEnum.Category.PEOPLE, ImpactDetailsEnum.Type.AFFECTED_TOTAL) # Taking only people exposed.
),
"buildings_exposed": meta_hash_map.get((ImpactDetailsEnum.Category.BUILDINGS, ImpactDetailsEnum.Type.DAMAGED)),
"impact_metadata": metadata,
}

Expand Down
13 changes: 7 additions & 6 deletions alert_system/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ def build_search_params(
collections: str,
cql_filters: list[str] | None = None,
extra_params: dict | None = None,
start_datetime: str | None = None,
end_datetime: str | None = None,
) -> dict:
params = {
"collections": collections,
Expand All @@ -15,6 +17,9 @@ def build_search_params(
params["filter-lang"] = "cql2-text"
params["filter"] = combined_filter

if start_datetime and end_datetime:
params["datetime"] = f"{start_datetime}/{end_datetime}"

if extra_params:
params.update(extra_params)

Expand All @@ -25,10 +30,6 @@ def build_guid_filter(guid: str) -> str:
return f"monty:guid = '{guid}'"


def build_datetime_filter(start_date: str, end_date: str) -> str:
return f"datetime >= '{start_date}' AND datetime < '{end_date}'"


def build_stac_search(
collections: str,
guid: str | None = None,
Expand All @@ -41,11 +42,11 @@ def build_stac_search(

if guid:
filters.append(build_guid_filter(guid))
if start_datetime and end_datetime:
filters.append(build_datetime_filter(start_datetime, end_datetime))

return build_search_params(
collections=collections,
cql_filters=filters,
extra_params=extra_params,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
Loading