Skip to content

Handlers Module

gigaspatial.handlers

boundaries

AdminBoundaries

Bases: BaseModel

Base class for administrative boundary data with flexible fields.

Source code in gigaspatial/handlers/boundaries.py
class AdminBoundaries(BaseModel):
    """Base class for administrative boundary data with flexible fields."""

    boundaries: List[AdminBoundary] = Field(default_factory=list)
    level: int = Field(
        ...,
        ge=0,
        le=4,
        description="Administrative level (e.g., 0=country, 1=state, etc.)",
    )

    logger: ClassVar = config.get_logger(__name__)

    _schema_config: ClassVar[Dict[str, Dict[str, str]]] = {
        "gadm": {
            "country_code": "GID_0",
            "id": "GID_{level}",
            "name": "NAME_{level}",
            "parent_id": "GID_{parent_level}",
        },
        "internal": {
            "id": "admin{level}_id_giga",
            "name": "name",
            "name_en": "name_en",
            "country_code": "iso_3166_1_alpha_3",
        },
    }

    @classmethod
    def get_schema_config(cls) -> Dict[str, Dict[str, str]]:
        """Return field mappings for different data sources"""
        return cls._schema_config

    @classmethod
    def from_gadm(
        cls, country_code: str, admin_level: int = 0, **kwargs
    ) -> "AdminBoundaries":
        """Load and create instance from GADM data."""
        url = f"https://geodata.ucdavis.edu/gadm/gadm4.1/json/gadm41_{country_code}_{admin_level}.json"

        try:
            gdf = gpd.read_file(url)

            gdf = cls._map_fields(gdf, "gadm", admin_level)

            if admin_level == 0:
                gdf["country_code"] = gdf["id"]
                gdf["name"] = gdf["COUNTRY"]
            elif admin_level == 1:
                gdf["country_code"] = gdf["parent_id"]

            boundaries = [
                AdminBoundary(**row_dict) for row_dict in gdf.to_dict("records")
            ]
            return cls(
                boundaries=boundaries, level=admin_level, country_code=country_code
            )

        except (ValueError, HTTPError, FileNotFoundError) as e:
            cls.logger.warning(
                f"No data found for {country_code} at admin level {admin_level}: {str(e)}"
            )
            return cls._create_empty_instance(country_code, admin_level, "gadm")

    @classmethod
    def from_data_store(
        cls,
        data_store: DataStore,
        path: Union[str, "Path"],
        admin_level: int = 0,
        **kwargs,
    ) -> "AdminBoundaries":
        """Load and create instance from internal data store."""
        try:
            gdf = read_dataset(data_store, str(path), **kwargs)

            if gdf.empty:
                return cls._create_empty_instance(None, admin_level, "internal")

            gdf = cls._map_fields(gdf, "internal", admin_level)

            if admin_level == 0:
                gdf["id"] = gdf["country_code"]
            else:
                gdf["parent_id"] = gdf["id"].apply(lambda x: x[:-3])

            boundaries = [
                AdminBoundary(**row_dict) for row_dict in gdf.to_dict("records")
            ]
            return cls(boundaries=boundaries, level=admin_level)

        except (FileNotFoundError, KeyError) as e:
            cls.logger.warning(
                f"No data found at {path} for admin level {admin_level}: {str(e)}"
            )
            return cls._create_empty_instance(None, admin_level, "internal")

    @classmethod
    def create(
        cls,
        country_code: Optional[str] = None,
        admin_level: int = 0,
        data_store: Optional[DataStore] = None,
        path: Optional[Union[str, "Path"]] = None,
        **kwargs,
    ) -> "AdminBoundaries":
        """Factory method to create AdminBoundaries instance from either GADM or data store."""
        if data_store is not None:
            if path is None:
                if country_code is None:
                    ValueError(
                        "If data_store is provided, path or country_code must also be specified."
                    )
                path = config.get_admin_path(
                    country_code=country_code, admin_level=admin_level
                )
            return cls.from_data_store(data_store, path, admin_level, **kwargs)
        elif country_code is not None:
            return cls.from_gadm(country_code, admin_level, **kwargs)
        else:
            raise ValueError(
                "Either country_code or (data_store, path) must be provided."
            )

    @classmethod
    def _create_empty_instance(
        cls, country_code: Optional[str], admin_level: int, source_type: str
    ) -> "AdminBoundaries":
        """Create an empty instance with the required schema structure."""
        # for to_geodataframe() to use later
        instance = cls(boundaries=[], level=admin_level, country_code=country_code)

        schema_fields = set(cls.get_schema_config()[source_type].keys())
        schema_fields.update(["geometry", "country_code", "id", "name", "name_en"])
        if admin_level > 0:
            schema_fields.add("parent_id")

        instance._empty_schema = list(schema_fields)
        return instance

    @classmethod
    def _map_fields(
        cls,
        gdf: gpd.GeoDataFrame,
        source: str,
        current_level: int,
    ) -> gpd.GeoDataFrame:
        """Map source fields to schema fields"""
        config = cls.get_schema_config().get(source, {})
        parent_level = current_level - 1

        field_mapping = {}
        for k, v in config.items():
            if "{parent_level}" in v:
                field_mapping[v.format(parent_level=parent_level)] = k
            elif "{level}" in v:
                field_mapping[v.format(level=current_level)] = k
            else:
                field_mapping[v] = k

        return gdf.rename(columns=field_mapping)

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert the AdminBoundaries to a GeoDataFrame."""
        if not self.boundaries:
            if hasattr(self, "_empty_schema"):
                columns = self._empty_schema
            else:
                columns = ["id", "name", "country_code", "geometry"]
                if self.level > 0:
                    columns.append("parent_id")

            return gpd.GeoDataFrame(columns=columns, geometry="geometry", crs=4326)

        return gpd.GeoDataFrame(
            [boundary.model_dump() for boundary in self.boundaries],
            geometry="geometry",
            crs=4326,
        )
create(country_code=None, admin_level=0, data_store=None, path=None, **kwargs) classmethod

Factory method to create AdminBoundaries instance from either GADM or data store.

Source code in gigaspatial/handlers/boundaries.py
@classmethod
def create(
    cls,
    country_code: Optional[str] = None,
    admin_level: int = 0,
    data_store: Optional[DataStore] = None,
    path: Optional[Union[str, "Path"]] = None,
    **kwargs,
) -> "AdminBoundaries":
    """Factory method to create AdminBoundaries instance from either GADM or data store."""
    if data_store is not None:
        if path is None:
            if country_code is None:
                ValueError(
                    "If data_store is provided, path or country_code must also be specified."
                )
            path = config.get_admin_path(
                country_code=country_code, admin_level=admin_level
            )
        return cls.from_data_store(data_store, path, admin_level, **kwargs)
    elif country_code is not None:
        return cls.from_gadm(country_code, admin_level, **kwargs)
    else:
        raise ValueError(
            "Either country_code or (data_store, path) must be provided."
        )
from_data_store(data_store, path, admin_level=0, **kwargs) classmethod

Load and create instance from internal data store.

Source code in gigaspatial/handlers/boundaries.py
@classmethod
def from_data_store(
    cls,
    data_store: DataStore,
    path: Union[str, "Path"],
    admin_level: int = 0,
    **kwargs,
) -> "AdminBoundaries":
    """Load and create instance from internal data store."""
    try:
        gdf = read_dataset(data_store, str(path), **kwargs)

        if gdf.empty:
            return cls._create_empty_instance(None, admin_level, "internal")

        gdf = cls._map_fields(gdf, "internal", admin_level)

        if admin_level == 0:
            gdf["id"] = gdf["country_code"]
        else:
            gdf["parent_id"] = gdf["id"].apply(lambda x: x[:-3])

        boundaries = [
            AdminBoundary(**row_dict) for row_dict in gdf.to_dict("records")
        ]
        return cls(boundaries=boundaries, level=admin_level)

    except (FileNotFoundError, KeyError) as e:
        cls.logger.warning(
            f"No data found at {path} for admin level {admin_level}: {str(e)}"
        )
        return cls._create_empty_instance(None, admin_level, "internal")
from_gadm(country_code, admin_level=0, **kwargs) classmethod

Load and create instance from GADM data.

Source code in gigaspatial/handlers/boundaries.py
@classmethod
def from_gadm(
    cls, country_code: str, admin_level: int = 0, **kwargs
) -> "AdminBoundaries":
    """Load and create instance from GADM data."""
    url = f"https://geodata.ucdavis.edu/gadm/gadm4.1/json/gadm41_{country_code}_{admin_level}.json"

    try:
        gdf = gpd.read_file(url)

        gdf = cls._map_fields(gdf, "gadm", admin_level)

        if admin_level == 0:
            gdf["country_code"] = gdf["id"]
            gdf["name"] = gdf["COUNTRY"]
        elif admin_level == 1:
            gdf["country_code"] = gdf["parent_id"]

        boundaries = [
            AdminBoundary(**row_dict) for row_dict in gdf.to_dict("records")
        ]
        return cls(
            boundaries=boundaries, level=admin_level, country_code=country_code
        )

    except (ValueError, HTTPError, FileNotFoundError) as e:
        cls.logger.warning(
            f"No data found for {country_code} at admin level {admin_level}: {str(e)}"
        )
        return cls._create_empty_instance(country_code, admin_level, "gadm")
get_schema_config() classmethod

Return field mappings for different data sources

Source code in gigaspatial/handlers/boundaries.py
@classmethod
def get_schema_config(cls) -> Dict[str, Dict[str, str]]:
    """Return field mappings for different data sources"""
    return cls._schema_config
to_geodataframe()

Convert the AdminBoundaries to a GeoDataFrame.

Source code in gigaspatial/handlers/boundaries.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert the AdminBoundaries to a GeoDataFrame."""
    if not self.boundaries:
        if hasattr(self, "_empty_schema"):
            columns = self._empty_schema
        else:
            columns = ["id", "name", "country_code", "geometry"]
            if self.level > 0:
                columns.append("parent_id")

        return gpd.GeoDataFrame(columns=columns, geometry="geometry", crs=4326)

    return gpd.GeoDataFrame(
        [boundary.model_dump() for boundary in self.boundaries],
        geometry="geometry",
        crs=4326,
    )

AdminBoundary

Bases: BaseModel

Base class for administrative boundary data with flexible fields.

Source code in gigaspatial/handlers/boundaries.py
class AdminBoundary(BaseModel):
    """Base class for administrative boundary data with flexible fields."""

    id: str = Field(..., description="Unique identifier for the administrative unit")
    name: str = Field(..., description="Primary local name")
    geometry: Union[Polygon, MultiPolygon] = Field(
        ..., description="Geometry of the administrative boundary"
    )

    name_en: Optional[str] = Field(
        None, description="English name if different from local name"
    )
    parent_id: Optional[str] = Field(
        None, description="ID of parent administrative unit"
    )
    country_code: Optional[str] = Field(
        None, min_length=3, max_length=3, description="ISO 3166-1 alpha-3 country code"
    )

    class Config:
        # extra = "allow"
        arbitrary_types_allowed = True

ghsl

CoordSystem

Bases: int, Enum

Enum for coordinate systems used by GHSL datasets.

Source code in gigaspatial/handlers/ghsl.py
class CoordSystem(int, Enum):
    """Enum for coordinate systems used by GHSL datasets."""

    WGS84 = 4326
    Mollweide = 54009

GHSLDataConfig

Source code in gigaspatial/handlers/ghsl.py
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class GHSLDataConfig:
    # constants
    AVAILABLE_YEARS: List = Field(default=np.append(np.arange(1975, 2031, 5), 2018))
    AVAILABLE_RESOLUTIONS: List = Field(default=[10, 100, 1000])

    # base config
    GHSL_DB_BASE_URL: HttpUrl = Field(
        default="https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/GHSL/"
    )
    TILES_URL: str = "https://ghsl.jrc.ec.europa.eu/download/GHSL_data_{}_shapefile.zip"

    # user config
    base_path: Path = Field(default=global_config.get_path("ghsl", "bronze"))
    coord_system: CoordSystem = CoordSystem.WGS84
    release: str = "R2023A"

    product: Literal[
        "GHS_BUILT_S",
        "GHS_BUILT_H_AGBH",
        "GHS_BUILT_H_ANBH",
        "GHS_BUILT_V",
        "GHS_POP",
        "GHS_SMOD",
    ] = Field(...)
    year: int = 2020
    resolution: int = 100

    logger: logging.Logger = global_config.get_logger(__name__)
    n_workers: int = 4

    def _load_tiles(self):
        """Load GHSL tiles from tiles shapefile."""
        try:
            self.tiles_gdf = gpd.read_file(self.TILES_URL)
        except Exception as e:
            self.logger.error(f"Failed to download tiles shapefile: {e}")
            raise ValueError(
                f"Could not download GHSL tiles from {self.TILES_URL}"
            ) from e

    @field_validator("year")
    def validate_year(cls, value: str) -> int:
        if value in cls.AVAILABLE_YEARS:
            return value
        raise ValueError(
            f"No datasets found for the provided year: {value}\nAvailable years are: {cls.AVAILABLE_YEARS}"
        )

    @field_validator("resolution")
    def validate_resolution(cls, value: str) -> int:
        if value in cls.AVAILABLE_RESOLUTIONS:
            return value
        raise ValueError(
            f"No datasets found for the provided resolution: {value}\nAvailable resolutions are: {cls.AVAILABLE_RESOLUTIONS}"
        )

    @model_validator(mode="after")
    def validate_configuration(self):
        """
        Validate that the configuration is valid based on dataset availability constraints.

        Specific rules:
        -
        """
        if self.year == 2018 and self.product in ["GHS_BUILT_V", "GHS_POP", "GHS_SMOD"]:
            raise ValueError(f"{self.product} product is not available for 2018")

        if self.resolution == 10 and self.product != "GHS_BUILT_H":
            raise ValueError(
                f"{self.product} product is not available at 10 (10m) resolution"
            )

        if "GHS_BUILT_H" in self.product:
            if self.year != 2018:
                self.logger.warning(
                    "Building height product is only available for 2018, year is set as 2018"
                )
                self.year = 2018

        if self.product == "GHS_BUILT_S":
            if self.year == 2018 and self.resolution != 10:
                self.logger.warning(
                    "Built-up surface product for 2018 is only available at 10m resolution, resolution is set as 10m"
                )
                self.resolution = 10

            if self.resolution == 10 and self.year != 2018:
                self.logger.warning(
                    "Built-up surface product at resolution 10 is only available for 2018, year is set as 2018"
                )
                self.year = 2018

            if self.resolution == 10 and self.coord_system != CoordSystem.Mollweide:
                self.logger.warning(
                    f"Built-up surface product at resolution 10 is only available with Mollweide ({CoordSystem.Mollweide}) projection, coordinate system is set as Mollweide"
                )
                self.coord_system = CoordSystem.Mollweide

        if self.product == "GHS_SMOD":
            if self.resolution != 1000:
                self.logger.warning(
                    f"Settlement model (SMOD) product is only available at 1000 (1km) resolution, resolution is set as 1000"
                )
                self.resolution = 1000

            if self.coord_system != CoordSystem.Mollweide:
                self.logger.warning(
                    f"Settlement model (SMOD) product is only available with Mollweide ({CoordSystem.Mollweide}) projection, coordinate system is set as Mollweide"
                )
                self.coord_system = CoordSystem.Mollweide

        self.TILES_URL = self.TILES_URL.format(self.coord_system)

        self._load_tiles()

        return self

    @property
    def crs(self) -> str:
        return "EPSG:4326" if self.coord_system == CoordSystem.WGS84 else "ESRI:54009"

    def _get_product_info(self) -> dict:
        """Generate and return common product information used in multiple methods."""
        resolution_str = (
            str(self.resolution)
            if self.coord_system == CoordSystem.Mollweide
            else ("3ss" if self.resolution == 100 else "30ss")
        )
        product_folder = f"{self.product}_GLOBE_{self.release}"
        product_name = f"{self.product}_E{self.year}_GLOBE_{self.release}_{self.coord_system}_{resolution_str}"
        product_version = 2 if self.product == "GHS_SMOD" else 1

        return {
            "resolution_str": resolution_str,
            "product_folder": product_folder,
            "product_name": product_name,
            "product_version": product_version,
        }

    def compute_dataset_url(self, tile_id=None) -> str:
        """Compute the download URL for a GHSL dataset."""
        info = self._get_product_info()

        path_segments = [
            str(self.GHSL_DB_BASE_URL),
            info["product_folder"],
            info["product_name"],
            f"V{info['product_version']}-0",
            "tiles" if tile_id else "",
            f"{info['product_name']}_V{info['product_version']}_0"
            + (f"_{tile_id}" if tile_id else "")
            + ".zip",
        ]

        return "/".join(path_segments)

    def get_country_tiles(
        self,
        country: str,
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[Path, str]] = None,
    ) -> List:

        def _load_country_geometry(
            country: str,
            data_store: Optional[DataStore] = None,
            country_geom_path: Optional[Union[Path, str]] = None,
        ) -> Union[Polygon, MultiPolygon]:
            """Load country boundary geometry from DataStore or GADM."""

            gdf_admin0 = (
                AdminBoundaries.create(
                    country_code=pycountry.countries.lookup(country).alpha_3,
                    admin_level=0,
                    data_store=data_store,
                    path=country_geom_path,
                )
                .to_geodataframe()
                .to_crs(self.tiles_gdf.crs)
            )

            return gdf_admin0.geometry.iloc[0]

        country_geom = _load_country_geometry(country, data_store, country_geom_path)

        s = STRtree(self.tiles_gdf.geometry)
        result = s.query(country_geom, predicate="intersects")

        intersection_tiles = self.tiles_gdf.iloc[result].reset_index(drop=True)

        return [tile for tile in intersection_tiles.tile_id]

    def get_intersecting_tiles(
        self, geometry: Union[Polygon, MultiPolygon, gpd.GeoDataFrame], crs=4326
    ) -> List[str]:
        """
        Find all GHSL tiles that intersect with the provided geometry.

        Args:
            geometry: A geometry or GeoDataFrame to check for intersection with GHSL tiles
            crs: Coordinate reference system of the given geometry

        Returns:
            List of URLs for GHSL dataset tiles that intersect with the geometry
        """

        if isinstance(geometry, gpd.GeoDataFrame):
            search_geom = geometry.geometry.unary_union
        else:
            search_geom = geometry

        search_geom = (
            gpd.GeoDataFrame(geometry=[search_geom], crs=crs)
            .to_crs(self.tiles_gdf.crs)
            .geometry.iloc[0]
        )

        s = STRtree(self.tiles_gdf.geometry)
        result = s.query(search_geom, predicate="intersects")

        intersection_tiles = self.tiles_gdf.iloc[result].reset_index(drop=True)

        return [tile for tile in intersection_tiles.tile_id]

    def get_tile_path(self, tile_id=None) -> str:
        """Construct and return the path for the configured dataset."""
        info = self._get_product_info()

        tile_path = (
            self.base_path
            / info["product_folder"]
            / (
                f"{info['product_name']}_V{info['product_version']}_0"
                + (f"_{tile_id}" if tile_id else "")
                + ".zip"
            )
        )

        return tile_path

    def __repr__(self) -> str:
        """Return a string representation of the GHSL dataset configuration."""
        return (
            f"GHSLDataConfig("
            f"product='{self.product}', "
            f"year={self.year}, "
            f"resolution={self.resolution}, "
            f"coord_system={self.coord_system.name}, "
            f"release='{self.release}'"
            f")"
        )
__repr__()

Return a string representation of the GHSL dataset configuration.

Source code in gigaspatial/handlers/ghsl.py
def __repr__(self) -> str:
    """Return a string representation of the GHSL dataset configuration."""
    return (
        f"GHSLDataConfig("
        f"product='{self.product}', "
        f"year={self.year}, "
        f"resolution={self.resolution}, "
        f"coord_system={self.coord_system.name}, "
        f"release='{self.release}'"
        f")"
    )
compute_dataset_url(tile_id=None)

Compute the download URL for a GHSL dataset.

Source code in gigaspatial/handlers/ghsl.py
def compute_dataset_url(self, tile_id=None) -> str:
    """Compute the download URL for a GHSL dataset."""
    info = self._get_product_info()

    path_segments = [
        str(self.GHSL_DB_BASE_URL),
        info["product_folder"],
        info["product_name"],
        f"V{info['product_version']}-0",
        "tiles" if tile_id else "",
        f"{info['product_name']}_V{info['product_version']}_0"
        + (f"_{tile_id}" if tile_id else "")
        + ".zip",
    ]

    return "/".join(path_segments)
get_intersecting_tiles(geometry, crs=4326)

Find all GHSL tiles that intersect with the provided geometry.

Parameters:

Name Type Description Default
geometry Union[Polygon, MultiPolygon, GeoDataFrame]

A geometry or GeoDataFrame to check for intersection with GHSL tiles

required
crs

Coordinate reference system of the given geometry

4326

Returns:

Type Description
List[str]

List of URLs for GHSL dataset tiles that intersect with the geometry

Source code in gigaspatial/handlers/ghsl.py
def get_intersecting_tiles(
    self, geometry: Union[Polygon, MultiPolygon, gpd.GeoDataFrame], crs=4326
) -> List[str]:
    """
    Find all GHSL tiles that intersect with the provided geometry.

    Args:
        geometry: A geometry or GeoDataFrame to check for intersection with GHSL tiles
        crs: Coordinate reference system of the given geometry

    Returns:
        List of URLs for GHSL dataset tiles that intersect with the geometry
    """

    if isinstance(geometry, gpd.GeoDataFrame):
        search_geom = geometry.geometry.unary_union
    else:
        search_geom = geometry

    search_geom = (
        gpd.GeoDataFrame(geometry=[search_geom], crs=crs)
        .to_crs(self.tiles_gdf.crs)
        .geometry.iloc[0]
    )

    s = STRtree(self.tiles_gdf.geometry)
    result = s.query(search_geom, predicate="intersects")

    intersection_tiles = self.tiles_gdf.iloc[result].reset_index(drop=True)

    return [tile for tile in intersection_tiles.tile_id]
get_tile_path(tile_id=None)

Construct and return the path for the configured dataset.

Source code in gigaspatial/handlers/ghsl.py
def get_tile_path(self, tile_id=None) -> str:
    """Construct and return the path for the configured dataset."""
    info = self._get_product_info()

    tile_path = (
        self.base_path
        / info["product_folder"]
        / (
            f"{info['product_name']}_V{info['product_version']}_0"
            + (f"_{tile_id}" if tile_id else "")
            + ".zip"
        )
    )

    return tile_path
validate_configuration()

Validate that the configuration is valid based on dataset availability constraints.

Specific rules:
Source code in gigaspatial/handlers/ghsl.py
@model_validator(mode="after")
def validate_configuration(self):
    """
    Validate that the configuration is valid based on dataset availability constraints.

    Specific rules:
    -
    """
    if self.year == 2018 and self.product in ["GHS_BUILT_V", "GHS_POP", "GHS_SMOD"]:
        raise ValueError(f"{self.product} product is not available for 2018")

    if self.resolution == 10 and self.product != "GHS_BUILT_H":
        raise ValueError(
            f"{self.product} product is not available at 10 (10m) resolution"
        )

    if "GHS_BUILT_H" in self.product:
        if self.year != 2018:
            self.logger.warning(
                "Building height product is only available for 2018, year is set as 2018"
            )
            self.year = 2018

    if self.product == "GHS_BUILT_S":
        if self.year == 2018 and self.resolution != 10:
            self.logger.warning(
                "Built-up surface product for 2018 is only available at 10m resolution, resolution is set as 10m"
            )
            self.resolution = 10

        if self.resolution == 10 and self.year != 2018:
            self.logger.warning(
                "Built-up surface product at resolution 10 is only available for 2018, year is set as 2018"
            )
            self.year = 2018

        if self.resolution == 10 and self.coord_system != CoordSystem.Mollweide:
            self.logger.warning(
                f"Built-up surface product at resolution 10 is only available with Mollweide ({CoordSystem.Mollweide}) projection, coordinate system is set as Mollweide"
            )
            self.coord_system = CoordSystem.Mollweide

    if self.product == "GHS_SMOD":
        if self.resolution != 1000:
            self.logger.warning(
                f"Settlement model (SMOD) product is only available at 1000 (1km) resolution, resolution is set as 1000"
            )
            self.resolution = 1000

        if self.coord_system != CoordSystem.Mollweide:
            self.logger.warning(
                f"Settlement model (SMOD) product is only available with Mollweide ({CoordSystem.Mollweide}) projection, coordinate system is set as Mollweide"
            )
            self.coord_system = CoordSystem.Mollweide

    self.TILES_URL = self.TILES_URL.format(self.coord_system)

    self._load_tiles()

    return self

GHSLDataDownloader

A class to handle downloads of WorldPop datasets.

Source code in gigaspatial/handlers/ghsl.py
class GHSLDataDownloader:
    """A class to handle downloads of WorldPop datasets."""

    def __init__(
        self,
        config: Union[GHSLDataConfig, dict[str, Union[str, int]]],
        data_store: Optional[DataStore] = None,
        logger: Optional[logging.Logger] = None,
    ):
        """
        Initialize the downloader.

        Args:
            config: Configuration for the GHSL dataset, either as a GHSLDataConfig object or a dictionary of parameters
            data_store: Optional data storage interface. If not provided, uses LocalDataStore.
            logger: Optional custom logger. If not provided, uses default logger.
        """
        self.logger = logger or global_config.get_logger(__name__)
        self.data_store = data_store or LocalDataStore()
        self.config = (
            config if isinstance(config, GHSLDataConfig) else GHSLDataConfig(**config)
        )
        self.config.logger = self.logger

    def _download_tile(self, tile_id: str) -> str:
        """
        Download the configured dataset to the provided output path.

        Args:
            tile_id: tile ID to download

        Returns:
            path to extracted files
        """

        try:
            response = requests.get(
                self.config.compute_dataset_url(tile_id=tile_id), stream=True
            )
            response.raise_for_status()

            output_path = str(self.config.get_tile_path(tile_id=tile_id))

            total_size = int(response.headers.get("content-length", 0))

            with self.data_store.open(output_path, "wb") as file:
                with tqdm(
                    total=total_size,
                    unit="B",
                    unit_scale=True,
                    desc=f"Downloading {os.path.basename(output_path)}",
                ) as pbar:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            file.write(chunk)
                            pbar.update(len(chunk))

            self.logger.debug(f"Successfully downloaded dataset: {self.config}")

            return output_path

        except requests.exceptions.RequestException as e:
            self.logger.error(f"Failed to download dataset {self.config}: {str(e)}")
            return None
        except Exception as e:
            self.logger.error(f"Unexpected error downloading dataset: {str(e)}")
            return None

    def download_and_extract_tile(self, tile_id, file_pattern=None):
        """
        Download and extract specific files from GHSL dataset tile zip archives.

        Args:
            tile_id: tile ID to download and extract
            file_pattern: Optional regex pattern to filter which files to extract
                        (e.g., '.*\\.tif$' for only TIF files)

        Returns:
            path to extracted files
        """
        output_path = self.config.get_tile_path(tile_id=tile_id).parents[0]

        extracted_files = []

        url = self.config.compute_dataset_url(tile_id=tile_id)
        self.logger.info(f"Downloading zip from {url}")

        try:
            # download zip to temporary file
            with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file:
                with requests.get(url, stream=True) as response:
                    response.raise_for_status()
                    shutil.copyfileobj(response.raw, temp_file)

            with zipfile.ZipFile(temp_file.name, "r") as zip_ref:
                # get list of files in the zip (filter if pattern provided)
                if file_pattern:
                    import re

                    pattern = re.compile(file_pattern)
                    files_to_extract = [
                        f for f in zip_ref.namelist() if pattern.match(f)
                    ]
                else:
                    files_to_extract = zip_ref.namelist()

                for file in files_to_extract:
                    extracted_path = output_path / Path(file).name
                    with zip_ref.open(file) as source, open(
                        extracted_path, "wb"
                    ) as target:
                        shutil.copyfileobj(source, target)
                    extracted_files.append(extracted_path)
                    self.logger.info(f"Extracted {file} to {extracted_path}")

            Path(temp_file.name).unlink()

        except Exception as e:
            self.logger.error(f"Error downloading/extracting tile {tile_id}: {e}")
            raise

        return extracted_files

    def download_by_country(
        self,
        country: str,
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
        extract: bool = False,
    ) -> List[str]:
        """
        Download GHSL data for a specific country.

        Args:
            country: ISO 3166-1 alpha-3 country code, ISO alpha-2 country code or country name

        Returns:
            List of paths to downloaded files
        """

        # Get intersecting tiles
        country_tiles = self.config.get_country_tiles(
            country=country, data_store=data_store, country_geom_path=country_geom_path
        )

        if not country_tiles:
            self.logger.warning(f"There is no matching data for {country}")
            return []

        # Download tiles in parallel
        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(
                self._download_tile if not extract else self.download_and_extract_tile
            )
            file_paths = list(
                tqdm(
                    pool.imap(download_func, country_tiles),
                    total=len(country_tiles),
                    desc=f"Downloading for {country}",
                )
            )

        # Filter out None values (failed downloads)
        return [path for path in file_paths if path is not None]

    def download_by_points(
        self, points_gdf: gpd.GeoDataFrame, extract: bool = False
    ) -> List[str]:
        """
        Download GHSL data for areas containing specific points.

        Args:
            points_gdf: GeoDataFrame containing points of interest

        Returns:
            List of paths to downloaded files
        """
        # Get intersecting tiles
        int_tiles = self.config.get_intersecting_tiles(points_gdf, points_gdf.crs)

        if not int_tiles:
            self.logger.warning(f"There is no matching data for the points")
            return []

        # Download tiles in parallel
        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(
                self._download_tile if not extract else self.download_and_extract_tile
            )
            file_paths = list(
                tqdm(
                    pool.imap(download_func, int_tiles),
                    total=len(int_tiles),
                    desc=f"Downloading for points dataset",
                )
            )

        # Filter out None values (failed downloads)
        return [path for path in file_paths if path is not None]
__init__(config, data_store=None, logger=None)

Initialize the downloader.

Parameters:

Name Type Description Default
config Union[GHSLDataConfig, dict[str, Union[str, int]]]

Configuration for the GHSL dataset, either as a GHSLDataConfig object or a dictionary of parameters

required
data_store Optional[DataStore]

Optional data storage interface. If not provided, uses LocalDataStore.

None
logger Optional[Logger]

Optional custom logger. If not provided, uses default logger.

None
Source code in gigaspatial/handlers/ghsl.py
def __init__(
    self,
    config: Union[GHSLDataConfig, dict[str, Union[str, int]]],
    data_store: Optional[DataStore] = None,
    logger: Optional[logging.Logger] = None,
):
    """
    Initialize the downloader.

    Args:
        config: Configuration for the GHSL dataset, either as a GHSLDataConfig object or a dictionary of parameters
        data_store: Optional data storage interface. If not provided, uses LocalDataStore.
        logger: Optional custom logger. If not provided, uses default logger.
    """
    self.logger = logger or global_config.get_logger(__name__)
    self.data_store = data_store or LocalDataStore()
    self.config = (
        config if isinstance(config, GHSLDataConfig) else GHSLDataConfig(**config)
    )
    self.config.logger = self.logger
download_and_extract_tile(tile_id, file_pattern=None)

Download and extract specific files from GHSL dataset tile zip archives.

Parameters:

Name Type Description Default
tile_id

tile ID to download and extract

required
file_pattern

Optional regex pattern to filter which files to extract (e.g., '.*.tif$' for only TIF files)

None

Returns:

Type Description

path to extracted files

Source code in gigaspatial/handlers/ghsl.py
def download_and_extract_tile(self, tile_id, file_pattern=None):
    """
    Download and extract specific files from GHSL dataset tile zip archives.

    Args:
        tile_id: tile ID to download and extract
        file_pattern: Optional regex pattern to filter which files to extract
                    (e.g., '.*\\.tif$' for only TIF files)

    Returns:
        path to extracted files
    """
    output_path = self.config.get_tile_path(tile_id=tile_id).parents[0]

    extracted_files = []

    url = self.config.compute_dataset_url(tile_id=tile_id)
    self.logger.info(f"Downloading zip from {url}")

    try:
        # download zip to temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file:
            with requests.get(url, stream=True) as response:
                response.raise_for_status()
                shutil.copyfileobj(response.raw, temp_file)

        with zipfile.ZipFile(temp_file.name, "r") as zip_ref:
            # get list of files in the zip (filter if pattern provided)
            if file_pattern:
                import re

                pattern = re.compile(file_pattern)
                files_to_extract = [
                    f for f in zip_ref.namelist() if pattern.match(f)
                ]
            else:
                files_to_extract = zip_ref.namelist()

            for file in files_to_extract:
                extracted_path = output_path / Path(file).name
                with zip_ref.open(file) as source, open(
                    extracted_path, "wb"
                ) as target:
                    shutil.copyfileobj(source, target)
                extracted_files.append(extracted_path)
                self.logger.info(f"Extracted {file} to {extracted_path}")

        Path(temp_file.name).unlink()

    except Exception as e:
        self.logger.error(f"Error downloading/extracting tile {tile_id}: {e}")
        raise

    return extracted_files
download_by_country(country, data_store=None, country_geom_path=None, extract=False)

Download GHSL data for a specific country.

Parameters:

Name Type Description Default
country str

ISO 3166-1 alpha-3 country code, ISO alpha-2 country code or country name

required

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/ghsl.py
def download_by_country(
    self,
    country: str,
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
    extract: bool = False,
) -> List[str]:
    """
    Download GHSL data for a specific country.

    Args:
        country: ISO 3166-1 alpha-3 country code, ISO alpha-2 country code or country name

    Returns:
        List of paths to downloaded files
    """

    # Get intersecting tiles
    country_tiles = self.config.get_country_tiles(
        country=country, data_store=data_store, country_geom_path=country_geom_path
    )

    if not country_tiles:
        self.logger.warning(f"There is no matching data for {country}")
        return []

    # Download tiles in parallel
    with multiprocessing.Pool(self.config.n_workers) as pool:
        download_func = functools.partial(
            self._download_tile if not extract else self.download_and_extract_tile
        )
        file_paths = list(
            tqdm(
                pool.imap(download_func, country_tiles),
                total=len(country_tiles),
                desc=f"Downloading for {country}",
            )
        )

    # Filter out None values (failed downloads)
    return [path for path in file_paths if path is not None]
download_by_points(points_gdf, extract=False)

Download GHSL data for areas containing specific points.

Parameters:

Name Type Description Default
points_gdf GeoDataFrame

GeoDataFrame containing points of interest

required

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/ghsl.py
def download_by_points(
    self, points_gdf: gpd.GeoDataFrame, extract: bool = False
) -> List[str]:
    """
    Download GHSL data for areas containing specific points.

    Args:
        points_gdf: GeoDataFrame containing points of interest

    Returns:
        List of paths to downloaded files
    """
    # Get intersecting tiles
    int_tiles = self.config.get_intersecting_tiles(points_gdf, points_gdf.crs)

    if not int_tiles:
        self.logger.warning(f"There is no matching data for the points")
        return []

    # Download tiles in parallel
    with multiprocessing.Pool(self.config.n_workers) as pool:
        download_func = functools.partial(
            self._download_tile if not extract else self.download_and_extract_tile
        )
        file_paths = list(
            tqdm(
                pool.imap(download_func, int_tiles),
                total=len(int_tiles),
                desc=f"Downloading for points dataset",
            )
        )

    # Filter out None values (failed downloads)
    return [path for path in file_paths if path is not None]

google_open_buildings

GoogleOpenBuildingsConfig dataclass

Configuration for Google Open Buildings dataset files.

Source code in gigaspatial/handlers/google_open_buildings.py
@dataclass
class GoogleOpenBuildingsConfig:
    """Configuration for Google Open Buildings dataset files."""

    base_path: Path = global_config.get_path("google_open_buildings", "bronze")
    data_types: tuple = ("polygons", "points")
    n_workers: int = 4  # number of workers for parallel processing

    def get_tile_path(
        self, tile_id: str, data_type: Literal["polygons", "points"]
    ) -> Path:
        """
        Construct the full path for a tile file.

        Args:
            tile_id: S2 tile identifier
            data_type: Type of building data ('polygons' or 'points')

        Returns:
            Full path to the tile file
        """
        if data_type not in self.data_types:
            raise ValueError(f"data_type must be one of {self.data_types}")

        return self.base_path / f"{data_type}_s2_level_4_{tile_id}_buildings.csv.gz"
get_tile_path(tile_id, data_type)

Construct the full path for a tile file.

Parameters:

Name Type Description Default
tile_id str

S2 tile identifier

required
data_type Literal['polygons', 'points']

Type of building data ('polygons' or 'points')

required

Returns:

Type Description
Path

Full path to the tile file

Source code in gigaspatial/handlers/google_open_buildings.py
def get_tile_path(
    self, tile_id: str, data_type: Literal["polygons", "points"]
) -> Path:
    """
    Construct the full path for a tile file.

    Args:
        tile_id: S2 tile identifier
        data_type: Type of building data ('polygons' or 'points')

    Returns:
        Full path to the tile file
    """
    if data_type not in self.data_types:
        raise ValueError(f"data_type must be one of {self.data_types}")

    return self.base_path / f"{data_type}_s2_level_4_{tile_id}_buildings.csv.gz"

GoogleOpenBuildingsDownloader

A class to handle downloads of Google's Open Buildings dataset.

Source code in gigaspatial/handlers/google_open_buildings.py
class GoogleOpenBuildingsDownloader:
    """A class to handle downloads of Google's Open Buildings dataset."""

    TILES_URL = "https://openbuildings-public-dot-gweb-research.uw.r.appspot.com/public/tiles.geojson"

    def __init__(
        self,
        config: Optional[GoogleOpenBuildingsConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: Optional[logging.Logger] = None,
    ):
        """
        Initialize the downloader.

        Args:
            config: Optional configuration for file paths
            data_store: Instance of DataStore for accessing data storage
            logger: Optional custom logger. If not provided, uses default logger.
        """
        self.data_store = data_store or LocalDataStore()
        self.config = config or GoogleOpenBuildingsConfig()
        self.logger = logger or global_config.get_logger(__name__)

        # Load and cache S2 tiles
        self._load_s2_tiles()

    def _load_s2_tiles(self):
        """Load S2 tiles from GeoJSON file."""
        response = requests.get(self.TILES_URL)
        response.raise_for_status()

        # Convert to GeoDataFrame
        self.tiles_gdf = gpd.GeoDataFrame.from_features(
            response.json()["features"], crs="EPSG:4326"
        )

    def _get_intersecting_tiles(
        self, geometry: Union[Polygon, MultiPolygon, gpd.GeoDataFrame]
    ) -> pd.DataFrame:
        """Get tiles that intersect with the given geometry."""

        if isinstance(geometry, gpd.GeoDataFrame):
            if geometry.crs != "EPSG:4326":
                geometry = geometry.to_crs("EPSG:4326")
            search_geom = geometry.geometry.unary_union
        elif isinstance(geometry, (Polygon, MultiPolygon)):
            search_geom = geometry
        else:
            raise ValueError(
                f"Expected Polygon, Multipolygon or GeoDataFrame got {geometry.__class__}"
            )

        # Find intersecting tiles
        mask = (
            tile_geom.intersects(search_geom) for tile_geom in self.tiles_gdf.geometry
        )

        return self.tiles_gdf.loc[mask, ["tile_id", "tile_url", "size_mb"]]

    def _download_tile(
        self,
        tile_info: Union[pd.Series, dict],
        data_type: Literal["polygons", "points"],
    ) -> Optional[str]:
        """Download data file for a single tile."""

        tile_url = tile_info["tile_url"]
        if data_type == "points":
            tile_url = tile_url.replace("polygons", "points")

        try:
            response = requests.get(tile_url, stream=True)
            response.raise_for_status()

            file_path = str(self.config.get_tile_path(tile_info["tile_id"], data_type))

            with self.data_store.open(file_path, "wb") as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)

                self.logger.debug(
                    f"Successfully downloaded tile: {tile_info['tile_id']}"
                )
                return file_path

        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Failed to download tile {tile_info['tile_id']}: {str(e)}"
            )
            return None
        except Exception as e:
            self.logger.error(f"Unexpected error downloading dataset: {str(e)}")
            return None

    def get_download_size_estimate(
        self, geometry: Union[Polygon, MultiPolygon, gpd.GeoDataFrame]
    ) -> float:
        """
        Estimate the download size in MB for a given geometry or GeoDataFrame.

        Args:
            geometry: Shapely Polygon/MultiPolygon or GeoDataFrame with geometries

        Returns:
            Estimated size in megabytes
        """
        gdf_tiles = self._get_intersecting_tiles(geometry)

        return gdf_tiles["size_mb"].sum()

    def download_by_country(
        self,
        country_code: str,
        data_type: Literal["polygons", "points"] = "polygons",
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ) -> List[str]:
        """
        Download Google Open Buildings data for a specific country.

        Args:
            country_code: ISO 3166-1 alpha-3 country code
            data_type: Type of data to download ('polygons' or 'points')

        Returns:
            List of paths to downloaded files
        """

        gdf_admin0 = AdminBoundaries.create(
            country_code=country_code,
            admin_level=0,
            data_store=data_store,
            path=country_geom_path,
        ).to_geodataframe()

        # Get intersecting tiles
        gdf_tiles = self._get_intersecting_tiles(gdf_admin0)

        if gdf_tiles.empty:
            self.logger.warning(f"There is no matching data for {country_code}")
            return []

        # Download tiles in parallel
        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(self._download_tile, data_type=data_type)
            file_paths = list(
                tqdm(
                    pool.imap(download_func, [row for _, row in gdf_tiles.iterrows()]),
                    total=len(gdf_tiles),
                    desc=f"Downloading {data_type} for {country_code}",
                )
            )

        # Filter out None values (failed downloads)
        return [path for path in file_paths if path is not None]

    def download_by_points(
        self,
        points_gdf: gpd.GeoDataFrame,
        data_type: Literal["polygons", "points"] = "polygons",
    ) -> List[str]:
        """
        Download Google Open Buildings data for areas containing specific points.

        Args:
            points_gdf: GeoDataFrame containing points of interest
            data_type: Type of data to download ('polygons' or 'points')

        Returns:
            List of paths to downloaded files
        """
        # Get intersecting tiles
        gdf_tiles = self._get_intersecting_tiles(points_gdf)

        if gdf_tiles.empty:
            self.logger.warning(f"There is no matching data for the points")
            return []

        # Download tiles in parallel
        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(self._download_tile, data_type=data_type)
            file_paths = list(
                tqdm(
                    pool.imap(download_func, [row for _, row in gdf_tiles.iterrows()]),
                    total=len(gdf_tiles),
                    desc=f"Downloading {data_type} for points dataset",
                )
            )

        # Filter out None values (failed downloads)
        return [path for path in file_paths if path is not None]
__init__(config=None, data_store=None, logger=None)

Initialize the downloader.

Parameters:

Name Type Description Default
config Optional[GoogleOpenBuildingsConfig]

Optional configuration for file paths

None
data_store Optional[DataStore]

Instance of DataStore for accessing data storage

None
logger Optional[Logger]

Optional custom logger. If not provided, uses default logger.

None
Source code in gigaspatial/handlers/google_open_buildings.py
def __init__(
    self,
    config: Optional[GoogleOpenBuildingsConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: Optional[logging.Logger] = None,
):
    """
    Initialize the downloader.

    Args:
        config: Optional configuration for file paths
        data_store: Instance of DataStore for accessing data storage
        logger: Optional custom logger. If not provided, uses default logger.
    """
    self.data_store = data_store or LocalDataStore()
    self.config = config or GoogleOpenBuildingsConfig()
    self.logger = logger or global_config.get_logger(__name__)

    # Load and cache S2 tiles
    self._load_s2_tiles()
download_by_country(country_code, data_type='polygons', data_store=None, country_geom_path=None)

Download Google Open Buildings data for a specific country.

Parameters:

Name Type Description Default
country_code str

ISO 3166-1 alpha-3 country code

required
data_type Literal['polygons', 'points']

Type of data to download ('polygons' or 'points')

'polygons'

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/google_open_buildings.py
def download_by_country(
    self,
    country_code: str,
    data_type: Literal["polygons", "points"] = "polygons",
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
) -> List[str]:
    """
    Download Google Open Buildings data for a specific country.

    Args:
        country_code: ISO 3166-1 alpha-3 country code
        data_type: Type of data to download ('polygons' or 'points')

    Returns:
        List of paths to downloaded files
    """

    gdf_admin0 = AdminBoundaries.create(
        country_code=country_code,
        admin_level=0,
        data_store=data_store,
        path=country_geom_path,
    ).to_geodataframe()

    # Get intersecting tiles
    gdf_tiles = self._get_intersecting_tiles(gdf_admin0)

    if gdf_tiles.empty:
        self.logger.warning(f"There is no matching data for {country_code}")
        return []

    # Download tiles in parallel
    with multiprocessing.Pool(self.config.n_workers) as pool:
        download_func = functools.partial(self._download_tile, data_type=data_type)
        file_paths = list(
            tqdm(
                pool.imap(download_func, [row for _, row in gdf_tiles.iterrows()]),
                total=len(gdf_tiles),
                desc=f"Downloading {data_type} for {country_code}",
            )
        )

    # Filter out None values (failed downloads)
    return [path for path in file_paths if path is not None]
download_by_points(points_gdf, data_type='polygons')

Download Google Open Buildings data for areas containing specific points.

Parameters:

Name Type Description Default
points_gdf GeoDataFrame

GeoDataFrame containing points of interest

required
data_type Literal['polygons', 'points']

Type of data to download ('polygons' or 'points')

'polygons'

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/google_open_buildings.py
def download_by_points(
    self,
    points_gdf: gpd.GeoDataFrame,
    data_type: Literal["polygons", "points"] = "polygons",
) -> List[str]:
    """
    Download Google Open Buildings data for areas containing specific points.

    Args:
        points_gdf: GeoDataFrame containing points of interest
        data_type: Type of data to download ('polygons' or 'points')

    Returns:
        List of paths to downloaded files
    """
    # Get intersecting tiles
    gdf_tiles = self._get_intersecting_tiles(points_gdf)

    if gdf_tiles.empty:
        self.logger.warning(f"There is no matching data for the points")
        return []

    # Download tiles in parallel
    with multiprocessing.Pool(self.config.n_workers) as pool:
        download_func = functools.partial(self._download_tile, data_type=data_type)
        file_paths = list(
            tqdm(
                pool.imap(download_func, [row for _, row in gdf_tiles.iterrows()]),
                total=len(gdf_tiles),
                desc=f"Downloading {data_type} for points dataset",
            )
        )

    # Filter out None values (failed downloads)
    return [path for path in file_paths if path is not None]
get_download_size_estimate(geometry)

Estimate the download size in MB for a given geometry or GeoDataFrame.

Parameters:

Name Type Description Default
geometry Union[Polygon, MultiPolygon, GeoDataFrame]

Shapely Polygon/MultiPolygon or GeoDataFrame with geometries

required

Returns:

Type Description
float

Estimated size in megabytes

Source code in gigaspatial/handlers/google_open_buildings.py
def get_download_size_estimate(
    self, geometry: Union[Polygon, MultiPolygon, gpd.GeoDataFrame]
) -> float:
    """
    Estimate the download size in MB for a given geometry or GeoDataFrame.

    Args:
        geometry: Shapely Polygon/MultiPolygon or GeoDataFrame with geometries

    Returns:
        Estimated size in megabytes
    """
    gdf_tiles = self._get_intersecting_tiles(geometry)

    return gdf_tiles["size_mb"].sum()

mapbox_image

MapboxImageDownloader

Class to download images from Mapbox Static Images API using a specific style

Source code in gigaspatial/handlers/mapbox_image.py
class MapboxImageDownloader:
    """Class to download images from Mapbox Static Images API using a specific style"""

    BASE_URL = "https://api.mapbox.com/styles/v1"

    def __init__(
        self,
        access_token: str = config.MAPBOX_ACCESS_TOKEN,
        style_id: Optional[str] = None,
        data_store: Optional[DataStore] = None,
    ):
        """
        Initialize the downloader with Mapbox credentials

        Args:
            access_token: Mapbox access token
            style_id: Mapbox style ID to use for image download
            data_store: Instance of DataStore for accessing data storage
        """
        self.access_token = access_token
        self.style_id = style_id if style_id else "mapbox/satellite-v9"
        self.data_store = data_store or LocalDataStore()
        self.logger = config.get_logger(__name__)

    def _construct_url(self, bounds: Iterable[float], image_size: str) -> str:
        """Construct the Mapbox Static Images API URL"""
        bounds_str = f"[{','.join(map(str, bounds))}]"

        return (
            f"{self.BASE_URL}/{self.style_id}/static/{bounds_str}/{image_size}"
            f"?access_token={self.access_token}&attribution=false&logo=false"
        )

    def _download_single_image(self, url: str, output_path: Path) -> bool:
        """Download a single image from URL"""
        try:
            response = requests.get(url)
            response.raise_for_status()

            with self.data_store.open(str(output_path), "wb") as f:
                f.write(response.content)
            return True
        except Exception as e:
            self.logger.warning(f"Error downloading {output_path.name}: {str(e)}")
            return False

    def download_images_by_tiles(
        self,
        mercator_tiles: "MercatorTiles",
        output_dir: Union[str, Path],
        image_size: Tuple[int, int] = (512, 512),
        max_workers: int = 4,
        image_prefix: str = "image_",
    ) -> None:
        """
        Download images for given mercator tiles using the specified style

        Args:
            mercator_tiles: MercatorTiles instance containing quadkeys
            output_dir: Directory to save images
            image_size: Tuple of (width, height) for output images
            max_workers: Maximum number of concurrent downloads
            image_prefix: Prefix for output image names
        """
        output_dir = Path(output_dir)
        # self.data_store.makedirs(str(output_dir), exist_ok=True)

        image_size_str = f"{image_size[0]}x{image_size[1]}"
        total_tiles = len(mercator_tiles.quadkeys)

        self.logger.info(
            f"Downloading {total_tiles} tiles with size {image_size_str}..."
        )

        def _get_tile_bounds(quadkey: str) -> List[float]:
            """Get tile bounds from quadkey"""
            tile = mercantile.quadkey_to_tile(quadkey)
            bounds = mercantile.bounds(tile)
            return [bounds.west, bounds.south, bounds.east, bounds.north]

        def download_image(quadkey: str) -> bool:
            bounds = _get_tile_bounds(quadkey)
            file_name = f"{image_prefix}{quadkey}.png"

            url = self._construct_url(bounds, image_size_str)
            success = self._download_single_image(url, output_dir / file_name)

            return success

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [
                executor.submit(download_image, quadkey)
                for quadkey in mercator_tiles.quadkeys
            ]

            successful_downloads = 0
            with tqdm(total=total_tiles) as pbar:
                for future in as_completed(futures):
                    if future.result():
                        successful_downloads += 1
                    pbar.update(1)

        self.logger.info(
            f"Successfully downloaded {successful_downloads}/{total_tiles} images!"
        )

    def download_images_by_bounds(
        self,
        gdf: gpd.GeoDataFrame,
        output_dir: Union[str, Path],
        image_size: Tuple[int, int] = (512, 512),
        max_workers: int = 4,
        image_prefix: str = "image_",
    ) -> None:
        """
        Download images for given points using the specified style

        Args:
            gdf_points: GeoDataFrame containing bounding box polygons
            output_dir: Directory to save images
            image_size: Tuple of (width, height) for output images
            max_workers: Maximum number of concurrent downloads
            image_prefix: Prefix for output image names
        """
        output_dir = Path(output_dir)
        # self.data_store.makedirs(str(output_dir), exist_ok=True)

        image_size_str = f"{image_size[0]}x{image_size[1]}"
        total_images = len(gdf)

        self.logger.info(
            f"Downloading {total_images} images with size {image_size_str}..."
        )

        def download_image(idx: Any, bounds: Tuple[float, float, float, float]) -> bool:
            file_name = f"{image_prefix}{idx}.png"
            url = self._construct_url(bounds, image_size_str)
            success = self._download_single_image(url, output_dir / file_name)
            return success

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [
                executor.submit(download_image, row.Index, row.geometry.bounds)
                for row in gdf.itertuples()
            ]

            successful_downloads = 0
            with tqdm(total=total_images) as pbar:
                for future in as_completed(futures):
                    if future.result():
                        successful_downloads += 1
                    pbar.update(1)

        self.logger.info(
            f"Successfully downloaded {successful_downloads}/{total_images} images!"
        )

    def download_images_by_coordinates(
        self,
        data: Union[pd.DataFrame, List[Tuple[float, float]]],
        res_meters_pixel: float,
        output_dir: Union[str, Path],
        image_size: Tuple[int, int] = (512, 512),
        max_workers: int = 4,
        image_prefix: str = "image_",
    ) -> None:
        """
        Download images for given coordinates by creating bounded boxes around points

        Args:
            data: Either a DataFrame with either latitude/longitude columns or a geometry column or a list of (lat, lon) tuples
            res_meters_pixel: Size of the bounding box in meters (creates a square)
            output_dir: Directory to save images
            image_size: Tuple of (width, height) for output images
            max_workers: Maximum number of concurrent downloads
            image_prefix: Prefix for output image names
        """

        if isinstance(data, pd.DataFrame):
            coordinates_df = data
        else:
            coordinates_df = pd.DataFrame(data, columns=["latitude", "longitude"])

        gdf = convert_to_geodataframe(coordinates_df)

        buffered_gdf = buffer_geodataframe(
            gdf, res_meters_pixel / 2, cap_style="square"
        )

        self.download_images_by_bounds(
            buffered_gdf, output_dir, image_size, max_workers, image_prefix
        )
__init__(access_token=config.MAPBOX_ACCESS_TOKEN, style_id=None, data_store=None)

Initialize the downloader with Mapbox credentials

Parameters:

Name Type Description Default
access_token str

Mapbox access token

MAPBOX_ACCESS_TOKEN
style_id Optional[str]

Mapbox style ID to use for image download

None
data_store Optional[DataStore]

Instance of DataStore for accessing data storage

None
Source code in gigaspatial/handlers/mapbox_image.py
def __init__(
    self,
    access_token: str = config.MAPBOX_ACCESS_TOKEN,
    style_id: Optional[str] = None,
    data_store: Optional[DataStore] = None,
):
    """
    Initialize the downloader with Mapbox credentials

    Args:
        access_token: Mapbox access token
        style_id: Mapbox style ID to use for image download
        data_store: Instance of DataStore for accessing data storage
    """
    self.access_token = access_token
    self.style_id = style_id if style_id else "mapbox/satellite-v9"
    self.data_store = data_store or LocalDataStore()
    self.logger = config.get_logger(__name__)
download_images_by_bounds(gdf, output_dir, image_size=(512, 512), max_workers=4, image_prefix='image_')

Download images for given points using the specified style

Parameters:

Name Type Description Default
gdf_points

GeoDataFrame containing bounding box polygons

required
output_dir Union[str, Path]

Directory to save images

required
image_size Tuple[int, int]

Tuple of (width, height) for output images

(512, 512)
max_workers int

Maximum number of concurrent downloads

4
image_prefix str

Prefix for output image names

'image_'
Source code in gigaspatial/handlers/mapbox_image.py
def download_images_by_bounds(
    self,
    gdf: gpd.GeoDataFrame,
    output_dir: Union[str, Path],
    image_size: Tuple[int, int] = (512, 512),
    max_workers: int = 4,
    image_prefix: str = "image_",
) -> None:
    """
    Download images for given points using the specified style

    Args:
        gdf_points: GeoDataFrame containing bounding box polygons
        output_dir: Directory to save images
        image_size: Tuple of (width, height) for output images
        max_workers: Maximum number of concurrent downloads
        image_prefix: Prefix for output image names
    """
    output_dir = Path(output_dir)
    # self.data_store.makedirs(str(output_dir), exist_ok=True)

    image_size_str = f"{image_size[0]}x{image_size[1]}"
    total_images = len(gdf)

    self.logger.info(
        f"Downloading {total_images} images with size {image_size_str}..."
    )

    def download_image(idx: Any, bounds: Tuple[float, float, float, float]) -> bool:
        file_name = f"{image_prefix}{idx}.png"
        url = self._construct_url(bounds, image_size_str)
        success = self._download_single_image(url, output_dir / file_name)
        return success

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(download_image, row.Index, row.geometry.bounds)
            for row in gdf.itertuples()
        ]

        successful_downloads = 0
        with tqdm(total=total_images) as pbar:
            for future in as_completed(futures):
                if future.result():
                    successful_downloads += 1
                pbar.update(1)

    self.logger.info(
        f"Successfully downloaded {successful_downloads}/{total_images} images!"
    )
download_images_by_coordinates(data, res_meters_pixel, output_dir, image_size=(512, 512), max_workers=4, image_prefix='image_')

Download images for given coordinates by creating bounded boxes around points

Parameters:

Name Type Description Default
data Union[DataFrame, List[Tuple[float, float]]]

Either a DataFrame with either latitude/longitude columns or a geometry column or a list of (lat, lon) tuples

required
res_meters_pixel float

Size of the bounding box in meters (creates a square)

required
output_dir Union[str, Path]

Directory to save images

required
image_size Tuple[int, int]

Tuple of (width, height) for output images

(512, 512)
max_workers int

Maximum number of concurrent downloads

4
image_prefix str

Prefix for output image names

'image_'
Source code in gigaspatial/handlers/mapbox_image.py
def download_images_by_coordinates(
    self,
    data: Union[pd.DataFrame, List[Tuple[float, float]]],
    res_meters_pixel: float,
    output_dir: Union[str, Path],
    image_size: Tuple[int, int] = (512, 512),
    max_workers: int = 4,
    image_prefix: str = "image_",
) -> None:
    """
    Download images for given coordinates by creating bounded boxes around points

    Args:
        data: Either a DataFrame with either latitude/longitude columns or a geometry column or a list of (lat, lon) tuples
        res_meters_pixel: Size of the bounding box in meters (creates a square)
        output_dir: Directory to save images
        image_size: Tuple of (width, height) for output images
        max_workers: Maximum number of concurrent downloads
        image_prefix: Prefix for output image names
    """

    if isinstance(data, pd.DataFrame):
        coordinates_df = data
    else:
        coordinates_df = pd.DataFrame(data, columns=["latitude", "longitude"])

    gdf = convert_to_geodataframe(coordinates_df)

    buffered_gdf = buffer_geodataframe(
        gdf, res_meters_pixel / 2, cap_style="square"
    )

    self.download_images_by_bounds(
        buffered_gdf, output_dir, image_size, max_workers, image_prefix
    )
download_images_by_tiles(mercator_tiles, output_dir, image_size=(512, 512), max_workers=4, image_prefix='image_')

Download images for given mercator tiles using the specified style

Parameters:

Name Type Description Default
mercator_tiles MercatorTiles

MercatorTiles instance containing quadkeys

required
output_dir Union[str, Path]

Directory to save images

required
image_size Tuple[int, int]

Tuple of (width, height) for output images

(512, 512)
max_workers int

Maximum number of concurrent downloads

4
image_prefix str

Prefix for output image names

'image_'
Source code in gigaspatial/handlers/mapbox_image.py
def download_images_by_tiles(
    self,
    mercator_tiles: "MercatorTiles",
    output_dir: Union[str, Path],
    image_size: Tuple[int, int] = (512, 512),
    max_workers: int = 4,
    image_prefix: str = "image_",
) -> None:
    """
    Download images for given mercator tiles using the specified style

    Args:
        mercator_tiles: MercatorTiles instance containing quadkeys
        output_dir: Directory to save images
        image_size: Tuple of (width, height) for output images
        max_workers: Maximum number of concurrent downloads
        image_prefix: Prefix for output image names
    """
    output_dir = Path(output_dir)
    # self.data_store.makedirs(str(output_dir), exist_ok=True)

    image_size_str = f"{image_size[0]}x{image_size[1]}"
    total_tiles = len(mercator_tiles.quadkeys)

    self.logger.info(
        f"Downloading {total_tiles} tiles with size {image_size_str}..."
    )

    def _get_tile_bounds(quadkey: str) -> List[float]:
        """Get tile bounds from quadkey"""
        tile = mercantile.quadkey_to_tile(quadkey)
        bounds = mercantile.bounds(tile)
        return [bounds.west, bounds.south, bounds.east, bounds.north]

    def download_image(quadkey: str) -> bool:
        bounds = _get_tile_bounds(quadkey)
        file_name = f"{image_prefix}{quadkey}.png"

        url = self._construct_url(bounds, image_size_str)
        success = self._download_single_image(url, output_dir / file_name)

        return success

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(download_image, quadkey)
            for quadkey in mercator_tiles.quadkeys
        ]

        successful_downloads = 0
        with tqdm(total=total_tiles) as pbar:
            for future in as_completed(futures):
                if future.result():
                    successful_downloads += 1
                pbar.update(1)

    self.logger.info(
        f"Successfully downloaded {successful_downloads}/{total_tiles} images!"
    )

maxar_image

MaxarConfig

Bases: BaseModel

Configuration for Maxar Image Downloader using Pydantic

Source code in gigaspatial/handlers/maxar_image.py
class MaxarConfig(BaseModel):
    """Configuration for Maxar Image Downloader using Pydantic"""

    username: str = Field(
        default=global_config.MAXAR_USERNAME, description="Maxar API username"
    )
    password: str = Field(
        default=global_config.MAXAR_PASSWORD, description="Maxar API password"
    )
    connection_string: str = Field(
        default=global_config.MAXAR_CONNECTION_STRING,
        description="Maxar WMS connection string",
    )

    base_url: HttpUrl = Field(
        default="https://evwhs.digitalglobe.com/mapservice/wmsaccess?",
        description="Base URL for Maxar WMS service",
    )

    layers: List[Literal["DigitalGlobe:ImageryFootprint", "DigitalGlobe:Imagery"]] = (
        Field(
            default=["DigitalGlobe:Imagery"],
            description="List of layers to request from WMS",
        )
    )

    feature_profile: str = Field(
        default="Most_Aesthetic_Mosaic_Profile",
        description="Feature profile to use for WMS requests",
    )

    coverage_cql_filter: str = Field(
        default="", description="CQL filter for coverage selection"
    )

    exceptions: str = Field(
        default="application/vnd.ogc.se_xml",
        description="Exception handling format for WMS",
    )

    transparent: bool = Field(
        default=True,
        description="Whether the requested images should have transparency",
    )

    image_format: Literal["image/png", "image/jpeg", "image/geotiff"] = Field(
        default="image/png",
    )

    data_crs: Literal["EPSG:4326", "EPSG:3395", "EPSG:3857", "CAR:42004"] = Field(
        default="EPSG:4326"
    )

    max_retries: int = Field(
        default=3, description="Number of retries for failed image downloads"
    )

    retry_delay: int = Field(default=5, description="Delay in seconds between retries")

    @field_validator("username", "password", "connection_string")
    @classmethod
    def validate_non_empty(cls, value: str, field) -> str:
        """Ensure required credentials are provided"""
        if not value or value.strip() == "":
            raise ValueError(
                f"{field.name} cannot be empty. Please provide a valid {field.name}."
            )
        return value

    @property
    def wms_url(self) -> str:
        """Generate the full WMS URL with connection string"""
        return f"{self.base_url}connectid={self.connection_string}"

    @property
    def suffix(self) -> str:
        return f".{self.image_format.split('/')[1]}"
wms_url: str property

Generate the full WMS URL with connection string

validate_non_empty(value, field) classmethod

Ensure required credentials are provided

Source code in gigaspatial/handlers/maxar_image.py
@field_validator("username", "password", "connection_string")
@classmethod
def validate_non_empty(cls, value: str, field) -> str:
    """Ensure required credentials are provided"""
    if not value or value.strip() == "":
        raise ValueError(
            f"{field.name} cannot be empty. Please provide a valid {field.name}."
        )
    return value

MaxarImageDownloader

Class to download images from Maxar

Source code in gigaspatial/handlers/maxar_image.py
class MaxarImageDownloader:
    """Class to download images from Maxar"""

    def __init__(
        self,
        config: Optional[MaxarConfig] = None,
        data_store: Optional[DataStore] = None,
    ):
        """
        Initialize the downloader with Maxar config.

        Args:
            config: MaxarConfig instance containing credentials and settings
            data_store: Instance of DataStore for accessing data storage
        """
        self.config = config or MaxarConfig()
        self.wms = WebMapService(
            self.config.wms_url,
            username=self.config.username,
            password=self.config.password,
        )
        self.data_store = data_store or LocalDataStore()
        self.logger = global_config.get_logger(__name__)

    def _download_single_image(self, bbox, output_path: Union[Path, str], size) -> bool:
        """Download a single image from bbox and pixel size"""
        for attempt in range(self.config.max_retries):
            try:
                img_data = self.wms.getmap(
                    bbox=bbox,
                    layers=self.config.layers,
                    srs=self.config.data_crs,
                    size=size,
                    featureProfile=self.config.feature_profile,
                    coverage_cql_filter=self.config.coverage_cql_filter,
                    exceptions=self.config.exceptions,
                    transparent=self.config.transparent,
                    format=self.config.image_format,
                )
                self.data_store.write_file(str(output_path), img_data.read())
                return True
            except Exception as e:
                self.logger.warning(
                    f"Attempt {attempt + 1} of downloading {output_path.name} failed: {str(e)}"
                )
                if attempt < self.max_retries - 1:
                    sleep(self.config.retry_delay)
                else:
                    self.logger.warning(
                        f"Failed to download {output_path.name} after {self.config.max_retries} attemps: {str(e)}"
                    )
                    return False

    def download_images_by_tiles(
        self,
        mercator_tiles: "MercatorTiles",
        output_dir: Union[str, Path],
        image_size: Tuple[int, int] = (512, 512),
        image_prefix: str = "maxar_image_",
    ) -> None:
        """
        Download images for given mercator tiles using the specified style

        Args:
            mercator_tiles: MercatorTiles instance containing quadkeys
            output_dir: Directory to save images
            image_size: Tuple of (width, height) for output images
            image_prefix: Prefix for output image names
        """
        output_dir = Path(output_dir)

        image_size_str = f"{image_size[0]}x{image_size[1]}"
        total_tiles = len(mercator_tiles.quadkeys)

        self.logger.info(
            f"Downloading {total_tiles} tiles with size {image_size_str}..."
        )

        def _get_tile_bounds(quadkey: str) -> Tuple[float]:
            """Get tile bounds from quadkey"""
            tile = mercantile.quadkey_to_tile(quadkey)
            bounds = mercantile.bounds(tile)
            return (bounds.west, bounds.south, bounds.east, bounds.north)

        def download_image(
            quadkey: str, image_size: Tuple[int, int], suffix: str = self.config.suffix
        ) -> bool:
            bounds = _get_tile_bounds(quadkey)
            file_name = f"{image_prefix}{quadkey}{suffix}"

            success = self._download_single_image(
                bounds, output_dir / file_name, image_size
            )

            return success

        successful_downloads = 0
        with tqdm(total=total_tiles) as pbar:
            for quadkey in mercator_tiles.quadkeys:
                if download_image(quadkey, image_size):
                    successful_downloads += 1
                pbar.update(1)

        self.logger.info(
            f"Successfully downloaded {successful_downloads}/{total_tiles} images!"
        )

    def download_images_by_bounds(
        self,
        gdf: gpd.GeoDataFrame,
        output_dir: Union[str, Path],
        image_size: Tuple[int, int] = (512, 512),
        image_prefix: str = "maxar_image_",
    ) -> None:
        """
        Download images for given points using the specified style

        Args:
            gdf_points: GeoDataFrame containing bounding box polygons
            output_dir: Directory to save images
            image_size: Tuple of (width, height) for output images
            image_prefix: Prefix for output image names
        """
        output_dir = Path(output_dir)

        image_size_str = f"{image_size[0]}x{image_size[1]}"
        total_images = len(gdf)

        self.logger.info(
            f"Downloading {total_images} images with size {image_size_str}..."
        )

        def download_image(
            idx: Any,
            bounds: Tuple[float, float, float, float],
            image_size,
            suffix: str = self.config.suffix,
        ) -> bool:
            file_name = f"{image_prefix}{idx}{suffix}"
            success = self._download_single_image(
                bounds, output_dir / file_name, image_size
            )
            return success

        gdf = gdf.to_crs(self.config.data_crs)

        successful_downloads = 0
        with tqdm(total=total_images) as pbar:
            for row in gdf.itertuples():
                if download_image(row.Index, tuple(row.geometry.bounds), image_size):
                    successful_downloads += 1
                pbar.update(1)

        self.logger.info(
            f"Successfully downloaded {successful_downloads}/{total_images} images!"
        )

    def download_images_by_coordinates(
        self,
        data: Union[pd.DataFrame, List[Tuple[float, float]]],
        res_meters_pixel: float,
        output_dir: Union[str, Path],
        image_size: Tuple[int, int] = (512, 512),
        image_prefix: str = "maxar_image_",
    ) -> None:
        """
        Download images for given coordinates by creating bounded boxes around points

        Args:
            data: Either a DataFrame with either latitude/longitude columns or a geometry column or a list of (lat, lon) tuples
            res_meters_pixel: resolution in meters per pixel
            output_dir: Directory to save images
            image_size: Tuple of (width, height) for output images
            image_prefix: Prefix for output image names
        """

        if isinstance(data, pd.DataFrame):
            coordinates_df = data
        else:
            coordinates_df = pd.DataFrame(data, columns=["latitude", "longitude"])

        gdf = convert_to_geodataframe(coordinates_df)

        buffered_gdf = buffer_geodataframe(
            gdf, res_meters_pixel / 2, cap_style="square"
        )

        buffered_gdf = buffered_gdf.to_crs(self.config.data_crs)

        self.download_images_by_bounds(
            buffered_gdf, output_dir, image_size, image_prefix
        )
__init__(config=None, data_store=None)

Initialize the downloader with Maxar config.

Parameters:

Name Type Description Default
config Optional[MaxarConfig]

MaxarConfig instance containing credentials and settings

None
data_store Optional[DataStore]

Instance of DataStore for accessing data storage

None
Source code in gigaspatial/handlers/maxar_image.py
def __init__(
    self,
    config: Optional[MaxarConfig] = None,
    data_store: Optional[DataStore] = None,
):
    """
    Initialize the downloader with Maxar config.

    Args:
        config: MaxarConfig instance containing credentials and settings
        data_store: Instance of DataStore for accessing data storage
    """
    self.config = config or MaxarConfig()
    self.wms = WebMapService(
        self.config.wms_url,
        username=self.config.username,
        password=self.config.password,
    )
    self.data_store = data_store or LocalDataStore()
    self.logger = global_config.get_logger(__name__)
download_images_by_bounds(gdf, output_dir, image_size=(512, 512), image_prefix='maxar_image_')

Download images for given points using the specified style

Parameters:

Name Type Description Default
gdf_points

GeoDataFrame containing bounding box polygons

required
output_dir Union[str, Path]

Directory to save images

required
image_size Tuple[int, int]

Tuple of (width, height) for output images

(512, 512)
image_prefix str

Prefix for output image names

'maxar_image_'
Source code in gigaspatial/handlers/maxar_image.py
def download_images_by_bounds(
    self,
    gdf: gpd.GeoDataFrame,
    output_dir: Union[str, Path],
    image_size: Tuple[int, int] = (512, 512),
    image_prefix: str = "maxar_image_",
) -> None:
    """
    Download images for given points using the specified style

    Args:
        gdf_points: GeoDataFrame containing bounding box polygons
        output_dir: Directory to save images
        image_size: Tuple of (width, height) for output images
        image_prefix: Prefix for output image names
    """
    output_dir = Path(output_dir)

    image_size_str = f"{image_size[0]}x{image_size[1]}"
    total_images = len(gdf)

    self.logger.info(
        f"Downloading {total_images} images with size {image_size_str}..."
    )

    def download_image(
        idx: Any,
        bounds: Tuple[float, float, float, float],
        image_size,
        suffix: str = self.config.suffix,
    ) -> bool:
        file_name = f"{image_prefix}{idx}{suffix}"
        success = self._download_single_image(
            bounds, output_dir / file_name, image_size
        )
        return success

    gdf = gdf.to_crs(self.config.data_crs)

    successful_downloads = 0
    with tqdm(total=total_images) as pbar:
        for row in gdf.itertuples():
            if download_image(row.Index, tuple(row.geometry.bounds), image_size):
                successful_downloads += 1
            pbar.update(1)

    self.logger.info(
        f"Successfully downloaded {successful_downloads}/{total_images} images!"
    )
download_images_by_coordinates(data, res_meters_pixel, output_dir, image_size=(512, 512), image_prefix='maxar_image_')

Download images for given coordinates by creating bounded boxes around points

Parameters:

Name Type Description Default
data Union[DataFrame, List[Tuple[float, float]]]

Either a DataFrame with either latitude/longitude columns or a geometry column or a list of (lat, lon) tuples

required
res_meters_pixel float

resolution in meters per pixel

required
output_dir Union[str, Path]

Directory to save images

required
image_size Tuple[int, int]

Tuple of (width, height) for output images

(512, 512)
image_prefix str

Prefix for output image names

'maxar_image_'
Source code in gigaspatial/handlers/maxar_image.py
def download_images_by_coordinates(
    self,
    data: Union[pd.DataFrame, List[Tuple[float, float]]],
    res_meters_pixel: float,
    output_dir: Union[str, Path],
    image_size: Tuple[int, int] = (512, 512),
    image_prefix: str = "maxar_image_",
) -> None:
    """
    Download images for given coordinates by creating bounded boxes around points

    Args:
        data: Either a DataFrame with either latitude/longitude columns or a geometry column or a list of (lat, lon) tuples
        res_meters_pixel: resolution in meters per pixel
        output_dir: Directory to save images
        image_size: Tuple of (width, height) for output images
        image_prefix: Prefix for output image names
    """

    if isinstance(data, pd.DataFrame):
        coordinates_df = data
    else:
        coordinates_df = pd.DataFrame(data, columns=["latitude", "longitude"])

    gdf = convert_to_geodataframe(coordinates_df)

    buffered_gdf = buffer_geodataframe(
        gdf, res_meters_pixel / 2, cap_style="square"
    )

    buffered_gdf = buffered_gdf.to_crs(self.config.data_crs)

    self.download_images_by_bounds(
        buffered_gdf, output_dir, image_size, image_prefix
    )
download_images_by_tiles(mercator_tiles, output_dir, image_size=(512, 512), image_prefix='maxar_image_')

Download images for given mercator tiles using the specified style

Parameters:

Name Type Description Default
mercator_tiles MercatorTiles

MercatorTiles instance containing quadkeys

required
output_dir Union[str, Path]

Directory to save images

required
image_size Tuple[int, int]

Tuple of (width, height) for output images

(512, 512)
image_prefix str

Prefix for output image names

'maxar_image_'
Source code in gigaspatial/handlers/maxar_image.py
def download_images_by_tiles(
    self,
    mercator_tiles: "MercatorTiles",
    output_dir: Union[str, Path],
    image_size: Tuple[int, int] = (512, 512),
    image_prefix: str = "maxar_image_",
) -> None:
    """
    Download images for given mercator tiles using the specified style

    Args:
        mercator_tiles: MercatorTiles instance containing quadkeys
        output_dir: Directory to save images
        image_size: Tuple of (width, height) for output images
        image_prefix: Prefix for output image names
    """
    output_dir = Path(output_dir)

    image_size_str = f"{image_size[0]}x{image_size[1]}"
    total_tiles = len(mercator_tiles.quadkeys)

    self.logger.info(
        f"Downloading {total_tiles} tiles with size {image_size_str}..."
    )

    def _get_tile_bounds(quadkey: str) -> Tuple[float]:
        """Get tile bounds from quadkey"""
        tile = mercantile.quadkey_to_tile(quadkey)
        bounds = mercantile.bounds(tile)
        return (bounds.west, bounds.south, bounds.east, bounds.north)

    def download_image(
        quadkey: str, image_size: Tuple[int, int], suffix: str = self.config.suffix
    ) -> bool:
        bounds = _get_tile_bounds(quadkey)
        file_name = f"{image_prefix}{quadkey}{suffix}"

        success = self._download_single_image(
            bounds, output_dir / file_name, image_size
        )

        return success

    successful_downloads = 0
    with tqdm(total=total_tiles) as pbar:
        for quadkey in mercator_tiles.quadkeys:
            if download_image(quadkey, image_size):
                successful_downloads += 1
            pbar.update(1)

    self.logger.info(
        f"Successfully downloaded {successful_downloads}/{total_tiles} images!"
    )

microsoft_global_buildings

MSBuildingsConfig

Configuration for Microsoft Global Buildings dataset files.

Source code in gigaspatial/handlers/microsoft_global_buildings.py
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class MSBuildingsConfig:
    """Configuration for Microsoft Global Buildings dataset files."""

    data_store: DataStore = field(
        default_factory=LocalDataStore
    )  # instance of DataStore for accessing data storage
    BASE_PATH: Path = global_config.get_path("microsoft_global_buildings", "bronze")

    TILE_URLS: str = (
        "https://minedbuildings.z5.web.core.windows.net/global-buildings/dataset-links.csv"
    )
    MERCATOR_ZOOM_LEVEL: int = 9

    LOCATION_MAPPING_FILE: Path = BASE_PATH / "location_mapping.json"
    SIMILARITY_SCORE: float = 0.8
    DEFAULT_MAPPING: Dict[str, str] = field(
        default_factory=lambda: {
            "Bonaire": "BES",
            "Brunei": "BRN",
            "IvoryCoast": "CIV",
            "CongoDRC": "COD",
            "DemocraticRepublicoftheCongo": "COD",
            "RepublicoftheCongo": "COG",
            "TheGambia": "GMB",
            "FYROMakedonija": "MKD",
            "SultanateofOman": "OMN",
            "StateofQatar": "QAT",
            "Russia": "RUS",
            "KingdomofSaudiArabia": "SAU",
            "Svalbard": "SJM",
            "Swaziland": "SWZ",
            "StMartin": "SXM",
            "leSaint-Martin": "MAF",
            "Turkey": "TUR",
            "VaticanCity": "VAT",
            "BritishVirginIslands": "VGB",
            "USVirginIslands": "VIR",
            "RepublicofYemen": "YEM",
            "CzechRepublic": "CZE",
            "French-Martinique": "MTQ",
            "French-Guadeloupe": "GLP",
            "UnitedStates": "USA",
        }
    )
    CUSTOM_MAPPING: Optional[Dict[str, str]] = None

    n_workers: int = 4  # number of workers for parallel processing
    logger: logging.Logger = global_config.get_logger(__name__)

    def __post_init__(self):
        """Validate inputs and set location mapping"""

        self._load_tile_urls()

        self.upload_date = self.df_tiles.upload_date[0]

        if self.data_store.file_exists(str(self.LOCATION_MAPPING_FILE)):
            self.location_mapping = read_json(
                self.data_store, str(self.LOCATION_MAPPING_FILE)
            )
        else:
            self.location_mapping = self.create_location_mapping(
                similarity_score_threshold=self.SIMILARITY_SCORE
            )
            self.location_mapping.update(self.DEFAULT_MAPPING)
            write_json(
                self.location_mapping, self.data_store, str(self.LOCATION_MAPPING_FILE)
            )

        self.location_mapping.update(self.CUSTOM_MAPPING or {})

        self._map_locations()

        self.df_tiles.loc[self.df_tiles.country.isnull(), "country"] = None

    def _load_tile_urls(self):
        """Load dataset links from csv file."""
        self.df_tiles = pd.read_csv(
            self.TILE_URLS,
            names=["location", "quadkey", "url", "size", "upload_date"],
            dtype={"quadkey": str},
            header=0,
        )

    def _map_locations(self):
        self.df_tiles["country"] = self.df_tiles.location.map(self.location_mapping)

    def create_location_mapping(self, similarity_score_threshold: float = 0.8):

        def similar(a, b):
            return SequenceMatcher(None, a, b).ratio()

        location_mapping = dict()

        for country in pycountry.countries:
            if country.name not in self.df_tiles.location.unique():
                try:
                    country_quadkey = CountryMercatorTiles.create(
                        country.alpha_3, self.MERCATOR_ZOOM_LEVEL
                    )
                except:
                    self.logger.warning(f"{country.name} is not mapped.")
                    continue
                country_datasets = country_quadkey.filter_quadkeys(
                    self.df_tiles.quadkey
                )
                matching_locations = self.df_tiles[
                    self.df_tiles.quadkey.isin(country_datasets.quadkeys)
                ].location.unique()
                scores = np.array(
                    [
                        (
                            similar(c, country.common_name)
                            if hasattr(country, "common_name")
                            else similar(c, country.name)
                        )
                        for c in matching_locations
                    ]
                )
                if any(scores > similarity_score_threshold):
                    matched = matching_locations[scores > similarity_score_threshold]
                    if len(matched) > 2:
                        self.logger.warning(
                            f"Multiple matches exist for {country.name}. {country.name} is not mapped."
                        )
                    location_mapping[matched[0]] = country.alpha_3
                    self.logger.debug(f"{country.name} matched with {matched[0]}!")
                else:
                    self.logger.warning(
                        f"No direct matches for {country.name}. {country.name} is not mapped."
                    )
                    self.logger.debug("Possible matches are: ")
                    for c, score in zip(matching_locations, scores):
                        self.logger.debug(c, score)
            else:
                location_mapping[country.name] = country.alpha_3

        return location_mapping

    def get_tiles_for_country(self, country_code: str) -> pd.DataFrame:
        country_tiles = self.df_tiles[self.df_tiles["country"] == country_code]

        if not country_tiles.empty:
            return country_tiles

        self.logger.warning(
            f"The country is not in location mapping. Manually checking if there are overlapping locations with country boundary."
        )

        country_mercator = CountryMercatorTiles.create(
            country_code, self.MERCATOR_ZOOM_LEVEL
        )
        country_tiles = country_mercator.filter_quadkeys(self.df_tiles.quadkey)

        if country_tiles:
            filtered_tiles = self.df_tiles[
                self.df_tiles.country.isnull()
                & self.df_tiles.quadkey.isin(country_tiles.quadkeys)
            ]
            return filtered_tiles

        return pd.DataFrame(columns=self.df_tiles.columns)

    def get_tiles_for_geometry(
        self, geometry: Union[Polygon, MultiPolygon]
    ) -> pd.DataFrame:
        if isinstance(geometry, MultiPolygon):
            geom_mercator = MercatorTiles.from_multipolygon(
                geometry, self.MERCATOR_ZOOM_LEVEL
            )
        elif isinstance(geometry, Polygon):
            geom_mercator = MercatorTiles.from_polygon(
                geometry, self.MERCATOR_ZOOM_LEVEL
            )

        geom_tiles = geom_mercator.filter_quadkeys(self.df_tiles.quadkey)

        if geom_tiles:
            return self.df_tiles[self.df_tiles.quadkey.isin(geom_tiles.quadkeys)]

        return pd.DataFrame(columns=self.df_tiles.columns)

    def get_tiles_for_points(
        self, points: List[Union[Point, Tuple[float, float]]]
    ) -> pd.DataFrame:

        points_mercator = MercatorTiles.from_points(points, self.MERCATOR_ZOOM_LEVEL)

        points_tiles = points_mercator.filter_quadkeys(self.df_tiles.quadkey)

        if points_tiles:
            return self.df_tiles[self.df_tiles.quadkey.isin(points_tiles.quadkeys)]

        return pd.DataFrame(columns=self.df_tiles.columns)

    def get_tile_path(self, quadkey: str, location: str) -> Path:
        return self.BASE_PATH / location / self.upload_date / f"{quadkey}.csv.gz"
__post_init__()

Validate inputs and set location mapping

Source code in gigaspatial/handlers/microsoft_global_buildings.py
def __post_init__(self):
    """Validate inputs and set location mapping"""

    self._load_tile_urls()

    self.upload_date = self.df_tiles.upload_date[0]

    if self.data_store.file_exists(str(self.LOCATION_MAPPING_FILE)):
        self.location_mapping = read_json(
            self.data_store, str(self.LOCATION_MAPPING_FILE)
        )
    else:
        self.location_mapping = self.create_location_mapping(
            similarity_score_threshold=self.SIMILARITY_SCORE
        )
        self.location_mapping.update(self.DEFAULT_MAPPING)
        write_json(
            self.location_mapping, self.data_store, str(self.LOCATION_MAPPING_FILE)
        )

    self.location_mapping.update(self.CUSTOM_MAPPING or {})

    self._map_locations()

    self.df_tiles.loc[self.df_tiles.country.isnull(), "country"] = None

MSBuildingsDownloader

A class to handle downloads of Microsoft's Global ML Building Footprints dataset.

Source code in gigaspatial/handlers/microsoft_global_buildings.py
class MSBuildingsDownloader:
    """A class to handle downloads of Microsoft's Global ML Building Footprints dataset."""

    def __init__(
        self,
        config: Optional[MSBuildingsConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: Optional[logging.Logger] = None,
    ):
        """
        Initialize the downloader.

        Args:
            config: Optional configuration for customizing file paths.
            logger: Optional custom logger. If not provided, uses default logger.
        """
        self.logger = logger or global_config.get_logger(__name__)
        self.data_store = data_store or LocalDataStore()
        self.config = config or MSBuildingsConfig(
            data_store=self.data_store, logger=self.logger
        )

    def _download_tile(
        self,
        tile_info: Union[pd.Series, dict],
    ) -> Optional[str]:
        """Download data file for a single tile."""

        # Modify URL based on data type if needed
        tile_url = tile_info["url"]

        try:
            response = requests.get(tile_url, stream=True)
            response.raise_for_status()

            file_path = str(
                self.config.get_tile_path(
                    quadkey=tile_info["quadkey"],
                    location=(
                        tile_info["country"]
                        if tile_info["country"]
                        else tile_info["location"]
                    ),
                )
            )

            with self.config.data_store.open(file_path, "wb") as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)

                self.logger.debug(
                    f"Successfully downloaded tile: {tile_info['quadkey']}"
                )
                return file_path

        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Failed to download tile {tile_info['quadkey']}: {str(e)}"
            )
            return None

    def download_by_country(self, country_code: str) -> List[str]:
        """
        Download Microsoft Global ML Building Footprints data for a specific country.

        Args:
            country_code: ISO 3166-1 alpha-3 country code

        Returns:
            List of paths to downloaded files
        """

        country_tiles = self.config.get_tiles_for_country(country_code)

        if not country_tiles.empty:

            with multiprocessing.Pool(self.config.n_workers) as pool:
                download_func = functools.partial(self._download_tile)
                file_paths = list(
                    tqdm(
                        pool.imap(
                            download_func, [row for _, row in country_tiles.iterrows()]
                        ),
                        total=len(country_tiles),
                        desc=f"Downloading tiles for {country_code}",
                    )
                )

            return [path for path in file_paths if path is not None]

        return []

    def download_by_geometry(self, geometry: Union[Polygon, MultiPolygon]) -> List[str]:
        """
        Download Microsoft Global ML Building Footprints data for a specific geometry.

        Args:
            geometry: Polygon or MultiPolygon geometry

        Returns:
            List of paths to downloaded files
        """

        geom_tiles = self.config.get_tiles_for_geometry(geometry)

        if not geom_tiles.empty:

            with multiprocessing.Pool(self.config.n_workers) as pool:
                download_func = functools.partial(self._download_tile)
                file_paths = list(
                    tqdm(
                        pool.imap(
                            download_func, [row for _, row in geom_tiles.iterrows()]
                        ),
                        total=len(geom_tiles),
                        desc="Downloading tiles for the geometry",
                    )
                )

            return [path for path in file_paths if path is not None]

        return []

    def download_by_points(
        self,
        points: List[Union[Point, Tuple[float, float]]],
    ) -> List[str]:
        """
        Download  Microsoft Global ML Building Footprints data for areas containing specific points.

        Args:
            points_gdf: GeoDataFrame containing points of interest

        Returns:
            List of paths to downloaded files
        """

        points_tiles = self.config.get_tiles_for_points(points)

        if not points_tiles.empty:

            with multiprocessing.Pool(self.config.n_workers) as pool:
                download_func = functools.partial(self._download_tile)
                file_paths = list(
                    tqdm(
                        pool.imap(
                            download_func, [row for _, row in points_tiles.iterrows()]
                        ),
                        total=len(points_tiles),
                        desc=f"Downloading tiles for the points",
                    )
                )

            return [path for path in file_paths if path is not None]

        return []
__init__(config=None, data_store=None, logger=None)

Initialize the downloader.

Parameters:

Name Type Description Default
config Optional[MSBuildingsConfig]

Optional configuration for customizing file paths.

None
logger Optional[Logger]

Optional custom logger. If not provided, uses default logger.

None
Source code in gigaspatial/handlers/microsoft_global_buildings.py
def __init__(
    self,
    config: Optional[MSBuildingsConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: Optional[logging.Logger] = None,
):
    """
    Initialize the downloader.

    Args:
        config: Optional configuration for customizing file paths.
        logger: Optional custom logger. If not provided, uses default logger.
    """
    self.logger = logger or global_config.get_logger(__name__)
    self.data_store = data_store or LocalDataStore()
    self.config = config or MSBuildingsConfig(
        data_store=self.data_store, logger=self.logger
    )
download_by_country(country_code)

Download Microsoft Global ML Building Footprints data for a specific country.

Parameters:

Name Type Description Default
country_code str

ISO 3166-1 alpha-3 country code

required

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/microsoft_global_buildings.py
def download_by_country(self, country_code: str) -> List[str]:
    """
    Download Microsoft Global ML Building Footprints data for a specific country.

    Args:
        country_code: ISO 3166-1 alpha-3 country code

    Returns:
        List of paths to downloaded files
    """

    country_tiles = self.config.get_tiles_for_country(country_code)

    if not country_tiles.empty:

        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(self._download_tile)
            file_paths = list(
                tqdm(
                    pool.imap(
                        download_func, [row for _, row in country_tiles.iterrows()]
                    ),
                    total=len(country_tiles),
                    desc=f"Downloading tiles for {country_code}",
                )
            )

        return [path for path in file_paths if path is not None]

    return []
download_by_geometry(geometry)

Download Microsoft Global ML Building Footprints data for a specific geometry.

Parameters:

Name Type Description Default
geometry Union[Polygon, MultiPolygon]

Polygon or MultiPolygon geometry

required

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/microsoft_global_buildings.py
def download_by_geometry(self, geometry: Union[Polygon, MultiPolygon]) -> List[str]:
    """
    Download Microsoft Global ML Building Footprints data for a specific geometry.

    Args:
        geometry: Polygon or MultiPolygon geometry

    Returns:
        List of paths to downloaded files
    """

    geom_tiles = self.config.get_tiles_for_geometry(geometry)

    if not geom_tiles.empty:

        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(self._download_tile)
            file_paths = list(
                tqdm(
                    pool.imap(
                        download_func, [row for _, row in geom_tiles.iterrows()]
                    ),
                    total=len(geom_tiles),
                    desc="Downloading tiles for the geometry",
                )
            )

        return [path for path in file_paths if path is not None]

    return []
download_by_points(points)

Download Microsoft Global ML Building Footprints data for areas containing specific points.

Parameters:

Name Type Description Default
points_gdf

GeoDataFrame containing points of interest

required

Returns:

Type Description
List[str]

List of paths to downloaded files

Source code in gigaspatial/handlers/microsoft_global_buildings.py
def download_by_points(
    self,
    points: List[Union[Point, Tuple[float, float]]],
) -> List[str]:
    """
    Download  Microsoft Global ML Building Footprints data for areas containing specific points.

    Args:
        points_gdf: GeoDataFrame containing points of interest

    Returns:
        List of paths to downloaded files
    """

    points_tiles = self.config.get_tiles_for_points(points)

    if not points_tiles.empty:

        with multiprocessing.Pool(self.config.n_workers) as pool:
            download_func = functools.partial(self._download_tile)
            file_paths = list(
                tqdm(
                    pool.imap(
                        download_func, [row for _, row in points_tiles.iterrows()]
                    ),
                    total=len(points_tiles),
                    desc=f"Downloading tiles for the points",
                )
            )

        return [path for path in file_paths if path is not None]

    return []

read_dataset(data_store, path, compression=None, **kwargs)

Read data from various file formats stored in both local and cloud-based storage.

Parameters:

data_store : DataStore Instance of DataStore for accessing data storage. path : str, Path Path to the file in data storage. **kwargs : dict Additional arguments passed to the specific reader function.

Returns:

pandas.DataFrame or geopandas.GeoDataFrame The data read from the file.

Raises:

FileNotFoundError If the file doesn't exist in blob storage. ValueError If the file type is unsupported or if there's an error reading the file.

Source code in gigaspatial/core/io/readers.py
def read_dataset(data_store: DataStore, path: str, compression: str = None, **kwargs):
    """
    Read data from various file formats stored in both local and cloud-based storage.

    Parameters:
    ----------
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    path : str, Path
        Path to the file in data storage.
    **kwargs : dict
        Additional arguments passed to the specific reader function.

    Returns:
    -------
    pandas.DataFrame or geopandas.GeoDataFrame
        The data read from the file.

    Raises:
    ------
    FileNotFoundError
        If the file doesn't exist in blob storage.
    ValueError
        If the file type is unsupported or if there's an error reading the file.
    """

    # Define supported file formats and their readers
    BINARY_FORMATS = {
        ".shp",
        ".zip",
        ".parquet",
        ".gpkg",
        ".xlsx",
        ".xls",
        ".kmz",
        ".gz",
    }

    PANDAS_READERS = {
        ".csv": pd.read_csv,
        ".xlsx": lambda f, **kw: pd.read_excel(f, engine="openpyxl", **kw),
        ".xls": lambda f, **kw: pd.read_excel(f, engine="xlrd", **kw),
        ".json": pd.read_json,
        # ".gz": lambda f, **kw: pd.read_csv(f, compression="gzip", **kw),
    }

    GEO_READERS = {
        ".shp": gpd.read_file,
        ".zip": gpd.read_file,
        ".geojson": gpd.read_file,
        ".gpkg": gpd.read_file,
        ".parquet": gpd.read_parquet,
        ".kmz": read_kmz,
    }

    COMPRESSION_FORMATS = {
        ".gz": "gzip",
        ".bz2": "bz2",
        ".zip": "zip",
        ".xz": "xz",
    }

    try:
        # Check if file exists
        if not data_store.file_exists(path):
            raise FileNotFoundError(f"File '{path}' not found in blob storage")

        path_obj = Path(path)
        suffixes = path_obj.suffixes
        file_extension = suffixes[-1].lower() if suffixes else ""

        if compression is None and file_extension in COMPRESSION_FORMATS:
            compression_format = COMPRESSION_FORMATS[file_extension]

            # if file has multiple extensions (e.g., .csv.gz), get the inner format
            if len(suffixes) > 1:
                inner_extension = suffixes[-2].lower()

                if inner_extension == ".tar":
                    raise ValueError(
                        "Tar archives (.tar.gz) are not directly supported"
                    )

                if inner_extension in PANDAS_READERS:
                    try:
                        with data_store.open(path, "rb") as f:
                            return PANDAS_READERS[inner_extension](
                                f, compression=compression_format, **kwargs
                            )
                    except Exception as e:
                        raise ValueError(f"Error reading compressed file: {str(e)}")
                elif inner_extension in GEO_READERS:
                    try:
                        with data_store.open(path, "rb") as f:
                            if compression_format == "gzip":
                                import gzip

                                decompressed_data = gzip.decompress(f.read())
                                import io

                                return GEO_READERS[inner_extension](
                                    io.BytesIO(decompressed_data), **kwargs
                                )
                            else:
                                raise ValueError(
                                    f"Compression format {compression_format} not supported for geo data"
                                )
                    except Exception as e:
                        raise ValueError(f"Error reading compressed geo file: {str(e)}")
            else:
                # if just .gz without clear inner type, assume csv
                try:
                    with data_store.open(path, "rb") as f:
                        return pd.read_csv(f, compression=compression_format, **kwargs)
                except Exception as e:
                    raise ValueError(
                        f"Error reading compressed file as CSV: {str(e)}. "
                        f"If not a CSV, specify the format in the filename (e.g., .json.gz)"
                    )

        # Special handling for compressed files
        if file_extension == ".zip":
            # For zip files, we need to use binary mode
            with data_store.open(path, "rb") as f:
                return gpd.read_file(f)

        # Determine if we need binary mode based on file type
        mode = "rb" if file_extension in BINARY_FORMATS else "r"

        # Try reading with appropriate reader
        if file_extension in PANDAS_READERS:
            try:
                with data_store.open(path, mode) as f:
                    return PANDAS_READERS[file_extension](f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error reading file with pandas: {str(e)}")

        if file_extension in GEO_READERS:
            try:
                with data_store.open(path, "rb") as f:
                    return GEO_READERS[file_extension](f, **kwargs)
            except Exception as e:
                # For parquet files, try pandas reader if geopandas fails
                if file_extension == ".parquet":
                    try:
                        with data_store.open(path, "rb") as f:
                            return pd.read_parquet(f, **kwargs)
                    except Exception as e2:
                        raise ValueError(
                            f"Failed to read parquet with both geopandas ({str(e)}) "
                            f"and pandas ({str(e2)})"
                        )
                raise ValueError(f"Error reading file with geopandas: {str(e)}")

        # If we get here, the file type is unsupported
        supported_formats = sorted(set(PANDAS_READERS.keys()) | set(GEO_READERS.keys()))
        supported_compressions = sorted(COMPRESSION_FORMATS.keys())
        raise ValueError(
            f"Unsupported file type: {file_extension}\n"
            f"Supported formats: {', '.join(supported_formats)}"
            f"Supported compressions: {', '.join(supported_compressions)}"
        )

    except Exception as e:
        if isinstance(e, (FileNotFoundError, ValueError)):
            raise
        raise RuntimeError(f"Unexpected error reading dataset: {str(e)}")

read_datasets(data_store, paths, **kwargs)

Read multiple datasets from data storage at once.

Parameters:

data_store : DataStore Instance of DataStore for accessing data storage. paths : list of str Paths to files in data storage. **kwargs : dict Additional arguments passed to read_dataset.

Returns:

dict Dictionary mapping paths to their corresponding DataFrames/GeoDataFrames.

Source code in gigaspatial/core/io/readers.py
def read_datasets(data_store: DataStore, paths, **kwargs):
    """
    Read multiple datasets from data storage at once.

    Parameters:
    ----------
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    paths : list of str
        Paths to files in data storage.
    **kwargs : dict
        Additional arguments passed to read_dataset.

    Returns:
    -------
    dict
        Dictionary mapping paths to their corresponding DataFrames/GeoDataFrames.
    """
    results = {}
    errors = {}

    for path in paths:
        try:
            results[path] = read_dataset(data_store, path, **kwargs)
        except Exception as e:
            errors[path] = str(e)

    if errors:
        error_msg = "\n".join(f"- {path}: {error}" for path, error in errors.items())
        raise ValueError(f"Errors reading datasets:\n{error_msg}")

    return results

read_gzipped_json_or_csv(file_path, data_store)

Reads a gzipped file, attempting to parse it as JSON (lines=True) or CSV.

Source code in gigaspatial/core/io/readers.py
def read_gzipped_json_or_csv(file_path, data_store):
    """Reads a gzipped file, attempting to parse it as JSON (lines=True) or CSV."""

    with data_store.open(file_path, "rb") as f:
        g = gzip.GzipFile(fileobj=f)
        text = g.read().decode("utf-8")
        try:
            df = pd.read_json(io.StringIO(text), lines=True)
            return df
        except json.JSONDecodeError:
            try:
                df = pd.read_csv(io.StringIO(text))
                return df
            except pd.errors.ParserError:
                print(f"Error: Could not parse {file_path} as JSON or CSV.")
                return None

read_kmz(file_obj, **kwargs)

Helper function to read KMZ files and return a GeoDataFrame.

Source code in gigaspatial/core/io/readers.py
def read_kmz(file_obj, **kwargs):
    """Helper function to read KMZ files and return a GeoDataFrame."""
    try:
        with zipfile.ZipFile(file_obj) as kmz:
            # Find the KML file in the archive (usually doc.kml)
            kml_filename = next(
                name for name in kmz.namelist() if name.endswith(".kml")
            )

            # Read the KML content
            kml_content = io.BytesIO(kmz.read(kml_filename))

            gdf = gpd.read_file(kml_content)

            # Validate the GeoDataFrame
            if gdf.empty:
                raise ValueError(
                    "The KML file is empty or does not contain valid geospatial data."
                )

        return gdf

    except zipfile.BadZipFile:
        raise ValueError("The provided file is not a valid KMZ file.")
    except StopIteration:
        raise ValueError("No KML file found in the KMZ archive.")
    except Exception as e:
        raise RuntimeError(f"An error occurred: {e}")

write_dataset(data, data_store, path, **kwargs)

Write DataFrame or GeoDataFrame to various file formats in Azure Blob Storage.

Parameters:

data : pandas.DataFrame or geopandas.GeoDataFrame The data to write to blob storage. data_store : DataStore Instance of DataStore for accessing data storage. path : str Path where the file will be written in data storage. **kwargs : dict Additional arguments passed to the specific writer function.

Raises:

ValueError If the file type is unsupported or if there's an error writing the file. TypeError If input data is not a DataFrame or GeoDataFrame.

Source code in gigaspatial/core/io/writers.py
def write_dataset(data, data_store: DataStore, path, **kwargs):
    """
    Write DataFrame or GeoDataFrame to various file formats in Azure Blob Storage.

    Parameters:
    ----------
    data : pandas.DataFrame or geopandas.GeoDataFrame
        The data to write to blob storage.
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    path : str
        Path where the file will be written in data storage.
    **kwargs : dict
        Additional arguments passed to the specific writer function.

    Raises:
    ------
    ValueError
        If the file type is unsupported or if there's an error writing the file.
    TypeError
        If input data is not a DataFrame or GeoDataFrame.
    """

    # Define supported file formats and their writers
    BINARY_FORMATS = {".shp", ".zip", ".parquet", ".gpkg", ".xlsx", ".xls"}

    PANDAS_WRITERS = {
        ".csv": lambda df, buf, **kw: df.to_csv(buf, **kw),
        ".xlsx": lambda df, buf, **kw: df.to_excel(buf, engine="openpyxl", **kw),
        ".json": lambda df, buf, **kw: df.to_json(buf, **kw),
        ".parquet": lambda df, buf, **kw: df.to_parquet(buf, **kw),
    }

    GEO_WRITERS = {
        ".geojson": lambda gdf, buf, **kw: gdf.to_file(buf, driver="GeoJSON", **kw),
        ".gpkg": lambda gdf, buf, **kw: gdf.to_file(buf, driver="GPKG", **kw),
        ".parquet": lambda gdf, buf, **kw: gdf.to_parquet(buf, **kw),
    }

    try:
        # Input validation
        if not isinstance(data, (pd.DataFrame, gpd.GeoDataFrame)):
            raise TypeError("Input data must be a pandas DataFrame or GeoDataFrame")

        # Get file suffix and ensure it's lowercase
        suffix = Path(path).suffix.lower()

        # Determine if we need binary mode based on file type
        mode = "wb" if suffix in BINARY_FORMATS else "w"

        # Handle different data types and formats
        if isinstance(data, gpd.GeoDataFrame):
            if suffix not in GEO_WRITERS:
                supported_formats = sorted(GEO_WRITERS.keys())
                raise ValueError(
                    f"Unsupported file type for GeoDataFrame: {suffix}\n"
                    f"Supported formats: {', '.join(supported_formats)}"
                )

            try:
                with data_store.open(path, "wb") as f:
                    GEO_WRITERS[suffix](data, f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error writing GeoDataFrame: {str(e)}")

        else:  # pandas DataFrame
            if suffix not in PANDAS_WRITERS:
                supported_formats = sorted(PANDAS_WRITERS.keys())
                raise ValueError(
                    f"Unsupported file type for DataFrame: {suffix}\n"
                    f"Supported formats: {', '.join(supported_formats)}"
                )

            try:
                with data_store.open(path, mode) as f:
                    PANDAS_WRITERS[suffix](data, f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error writing DataFrame: {str(e)}")

    except Exception as e:
        if isinstance(e, (TypeError, ValueError)):
            raise
        raise RuntimeError(f"Unexpected error writing dataset: {str(e)}")

write_datasets(data_dict, data_store, **kwargs)

Write multiple datasets to data storage at once.

Parameters:

data_dict : dict Dictionary mapping paths to DataFrames/GeoDataFrames. data_store : DataStore Instance of DataStore for accessing data storage. **kwargs : dict Additional arguments passed to write_dataset.

Raises:

ValueError If there are any errors writing the datasets.

Source code in gigaspatial/core/io/writers.py
def write_datasets(data_dict, data_store: DataStore, **kwargs):
    """
    Write multiple datasets to data storage at once.

    Parameters:
    ----------
    data_dict : dict
        Dictionary mapping paths to DataFrames/GeoDataFrames.
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    **kwargs : dict
        Additional arguments passed to write_dataset.

    Raises:
    ------
    ValueError
        If there are any errors writing the datasets.
    """
    errors = {}

    for path, data in data_dict.items():
        try:
            write_dataset(data, data_store, path, **kwargs)
        except Exception as e:
            errors[path] = str(e)

    if errors:
        error_msg = "\n".join(f"- {path}: {error}" for path, error in errors.items())
        raise ValueError(f"Errors writing datasets:\n{error_msg}")

osm

OSMLocationFetcher dataclass

A class to fetch and process location data from OpenStreetMap using the Overpass API.

This class supports fetching various OSM location types including amenities, buildings, shops, and other POI categories.

Source code in gigaspatial/handlers/osm.py
@dataclass
class OSMLocationFetcher:
    """
    A class to fetch and process location data from OpenStreetMap using the Overpass API.

    This class supports fetching various OSM location types including amenities, buildings,
    shops, and other POI categories.
    """

    country: str
    location_types: Union[List[str], Dict[str, List[str]]]
    base_url: str = "http://overpass-api.de/api/interpreter"
    timeout: int = 600
    max_retries: int = 3
    retry_delay: int = 5

    def __post_init__(self):
        """Validate inputs, normalize location_types, and set up logging."""
        try:
            self.country = pycountry.countries.lookup(self.country).alpha_2
        except LookupError:
            raise ValueError(f"Invalid country code provided: {self.country}")

        # Normalize location_types to always be a dictionary
        if isinstance(self.location_types, list):
            self.location_types = {"amenity": self.location_types}
        elif not isinstance(self.location_types, dict):
            raise TypeError(
                "location_types must be a list of strings or a dictionary mapping categories to type lists"
            )

        self.logger = config.get_logger(__name__)

    def _build_queries(self, since_year: Optional[int] = None) -> List[str]:
        """
        Construct separate Overpass QL queries for different element types and categories.
        Returns list of [nodes_relations_query, ways_query]
        """
        if since_year:
            date_filter = f'(newer:"{since_year}-01-01T00:00:00Z")'
        else:
            date_filter = ""

        # Query for nodes and relations (with center output)
        nodes_relations_queries = []
        for category, types in self.location_types.items():
            nodes_relations_queries.extend(
                [
                    f"""node["{category}"~"^({"|".join(types)})"]{date_filter}(area.searchArea);""",
                    f"""relation["{category}"~"^({"|".join(types)})"]{date_filter}(area.searchArea);""",
                ]
            )

        nodes_relations_queries = "\n".join(nodes_relations_queries)

        nodes_relations_query = f"""
        [out:json][timeout:{self.timeout}];
        area["ISO3166-1"={self.country}]->.searchArea;
        (
            {nodes_relations_queries}
        );
        out center;
        """

        # Query for ways (with geometry output)
        ways_queries = []
        for category, types in self.location_types.items():
            ways_queries.append(
                f"""way["{category}"~"^({"|".join(types)})"]{date_filter}(area.searchArea);"""
            )

        ways_queries = "\n".join(ways_queries)

        ways_query = f"""
        [out:json][timeout:{self.timeout}];
        area["ISO3166-1"={self.country}]->.searchArea;
        (
            {ways_queries}
        );
        out geom;
        """

        return [nodes_relations_query, ways_query]

    def _make_request(self, query: str) -> Dict:
        """Make HTTP request to Overpass API with retry mechanism."""
        for attempt in range(self.max_retries):
            try:
                self.logger.debug(f"Executing query:\n{query}")
                response = requests.get(
                    self.base_url, params={"data": query}, timeout=self.timeout
                )
                response.raise_for_status()
                return response.json()
            except RequestException as e:
                self.logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
                if attempt < self.max_retries - 1:
                    sleep(self.retry_delay)
                else:
                    raise RuntimeError(
                        f"Failed to fetch data after {self.max_retries} attempts"
                    ) from e

    def _extract_matching_categories(self, tags: Dict[str, str]) -> Dict[str, str]:
        """
        Extract all matching categories and their values from the tags.
        Returns:
            Dict mapping each matching category to its value
        """
        matches = {}
        for category, types in self.location_types.items():
            if category in tags and tags[category] in types:
                matches[category] = tags[category]
        return matches

    def _process_node_relation(self, element: Dict) -> List[Dict[str, any]]:
        """
        Process a node or relation element.
        May return multiple processed elements if the element matches multiple categories.
        """
        try:
            tags = element.get("tags", {})
            matching_categories = self._extract_matching_categories(tags)

            if not matching_categories:
                self.logger.warning(
                    f"Element {element['id']} missing or not matching specified category tags"
                )
                return []

            _lat = element.get("lat") or element["center"]["lat"]
            _lon = element.get("lon") or element["center"]["lon"]
            point_geom = Point(_lon, _lat)

            # for each matching category, create a separate element
            results = []
            for category, value in matching_categories.items():
                results.append(
                    {
                        "source_id": element["id"],
                        "category": category,
                        "category_value": value,
                        "name": tags.get("name", ""),
                        "name_en": tags.get("name:en", ""),
                        "type": element["type"],
                        "geometry": point_geom,
                        "latitude": _lat,
                        "longitude": _lon,
                        "matching_categories": list(matching_categories.keys()),
                    }
                )

            return results

        except KeyError as e:
            self.logger.error(f"Corrupt data received for node/relation: {str(e)}")
            return []

    def _process_way(self, element: Dict) -> List[Dict[str, any]]:
        """
        Process a way element with geometry.
        May return multiple processed elements if the element matches multiple categories.
        """
        try:
            tags = element.get("tags", {})
            matching_categories = self._extract_matching_categories(tags)

            if not matching_categories:
                self.logger.warning(
                    f"Element {element['id']} missing or not matching specified category tags"
                )
                return []

            # Create polygon from geometry points
            polygon = Polygon([(p["lon"], p["lat"]) for p in element["geometry"]])
            centroid = polygon.centroid

            # For each matching category, create a separate element
            results = []
            for category, value in matching_categories.items():
                results.append(
                    {
                        "source_id": element["id"],
                        "category": category,
                        "category_value": value,
                        "name": tags.get("name", ""),
                        "name_en": tags.get("name:en", ""),
                        "type": element["type"],
                        "geometry": polygon,
                        "latitude": centroid.y,
                        "longitude": centroid.x,
                        "matching_categories": list(matching_categories.keys()),
                    }
                )

            return results
        except (KeyError, ValueError) as e:
            self.logger.error(f"Error processing way geometry: {str(e)}")
            return []

    def fetch_locations(
        self,
        since_year: Optional[int] = None,
        handle_duplicates: Literal["separate", "combine", "primary"] = "separate",
    ) -> pd.DataFrame:
        """
        Fetch and process OSM locations.

        Args:
            since_year (int, optional): Filter for locations added/modified since this year.
            handle_duplicates (str): How to handle objects matching multiple categories:
                - 'separate': Create separate entries for each category (default)
                - 'combine': Use a single entry with a list of matching categories
                - 'primary': Keep only the first matching category

        Returns:
            pd.DataFrame: Processed OSM locations
        """
        if handle_duplicates not in ("separate", "combine", "primary"):
            raise ValueError(
                "handle_duplicates must be one of: 'separate', 'combine', 'primary'"
            )

        self.logger.info(
            f"Fetching OSM locations from Overpass API for country: {self.country}"
        )
        self.logger.info(f"Location types: {self.location_types}")
        self.logger.info(f"Handling duplicate category matches as: {handle_duplicates}")

        # Get queries for different element types
        nodes_relations_query, ways_query = self._build_queries(since_year)

        # Fetch nodes and relations
        nodes_relations_response = self._make_request(nodes_relations_query)
        nodes_relations = nodes_relations_response.get("elements", [])

        # Fetch ways
        ways_response = self._make_request(ways_query)
        ways = ways_response.get("elements", [])

        if not nodes_relations and not ways:
            self.logger.warning("No locations found for the specified criteria")
            return pd.DataFrame()

        self.logger.info(
            f"Processing {len(nodes_relations)} nodes/relations and {len(ways)} ways..."
        )

        # Process nodes and relations
        with ThreadPoolExecutor() as executor:
            processed_nodes_relations = [
                item
                for sublist in executor.map(
                    self._process_node_relation, nodes_relations
                )
                for item in sublist
            ]

        # Process ways
        with ThreadPoolExecutor() as executor:
            processed_ways = [
                item
                for sublist in executor.map(self._process_way, ways)
                for item in sublist
            ]

        # Combine all processed elements
        all_elements = processed_nodes_relations + processed_ways

        if not all_elements:
            self.logger.warning("No matching elements found after processing")
            return pd.DataFrame()

        # Handle duplicates based on the specified strategy
        if handle_duplicates != "separate":
            # Group by source_id
            grouped_elements = {}
            for elem in all_elements:
                source_id = elem["source_id"]
                if source_id not in grouped_elements:
                    grouped_elements[source_id] = elem
                elif handle_duplicates == "combine":
                    # Combine matching categories
                    if grouped_elements[source_id]["category"] != elem["category"]:
                        if isinstance(grouped_elements[source_id]["category"], str):
                            grouped_elements[source_id]["category"] = [
                                grouped_elements[source_id]["category"]
                            ]
                            grouped_elements[source_id]["category_value"] = [
                                grouped_elements[source_id]["category_value"]
                            ]

                        if (
                            elem["category"]
                            not in grouped_elements[source_id]["category"]
                        ):
                            grouped_elements[source_id]["category"].append(
                                elem["category"]
                            )
                            grouped_elements[source_id]["category_value"].append(
                                elem["category_value"]
                            )
                # For 'primary', just keep the first one we encountered

            all_elements = list(grouped_elements.values())

        locations = pd.DataFrame(all_elements)

        # Log element type distribution
        type_counts = locations["type"].value_counts()
        self.logger.info("\nElement type distribution:")
        for element_type, count in type_counts.items():
            self.logger.info(f"{element_type}: {count}")

        # Log category distribution
        if handle_duplicates == "combine":
            # Count each category separately when they're in lists
            category_counts = {}
            for cats in locations["category"]:
                if isinstance(cats, list):
                    for cat in cats:
                        category_counts[cat] = category_counts.get(cat, 0) + 1
                else:
                    category_counts[cats] = category_counts.get(cats, 0) + 1

            self.logger.info("\nCategory distribution:")
            for category, count in category_counts.items():
                self.logger.info(f"{category}: {count}")
        else:
            category_counts = locations["category"].value_counts()
            self.logger.info("\nCategory distribution:")
            for category, count in category_counts.items():
                self.logger.info(f"{category}: {count}")

        # Log elements with multiple matching categories
        multi_category = [e for e in all_elements if len(e["matching_categories"]) > 1]
        if multi_category:
            self.logger.info(
                f"\n{len(multi_category)} elements matched multiple categories"
            )

        self.logger.info(f"Successfully processed {len(locations)} locations")
        return locations
__post_init__()

Validate inputs, normalize location_types, and set up logging.

Source code in gigaspatial/handlers/osm.py
def __post_init__(self):
    """Validate inputs, normalize location_types, and set up logging."""
    try:
        self.country = pycountry.countries.lookup(self.country).alpha_2
    except LookupError:
        raise ValueError(f"Invalid country code provided: {self.country}")

    # Normalize location_types to always be a dictionary
    if isinstance(self.location_types, list):
        self.location_types = {"amenity": self.location_types}
    elif not isinstance(self.location_types, dict):
        raise TypeError(
            "location_types must be a list of strings or a dictionary mapping categories to type lists"
        )

    self.logger = config.get_logger(__name__)
fetch_locations(since_year=None, handle_duplicates='separate')

Fetch and process OSM locations.

Parameters:

Name Type Description Default
since_year int

Filter for locations added/modified since this year.

None
handle_duplicates str

How to handle objects matching multiple categories: - 'separate': Create separate entries for each category (default) - 'combine': Use a single entry with a list of matching categories - 'primary': Keep only the first matching category

'separate'

Returns:

Type Description
DataFrame

pd.DataFrame: Processed OSM locations

Source code in gigaspatial/handlers/osm.py
def fetch_locations(
    self,
    since_year: Optional[int] = None,
    handle_duplicates: Literal["separate", "combine", "primary"] = "separate",
) -> pd.DataFrame:
    """
    Fetch and process OSM locations.

    Args:
        since_year (int, optional): Filter for locations added/modified since this year.
        handle_duplicates (str): How to handle objects matching multiple categories:
            - 'separate': Create separate entries for each category (default)
            - 'combine': Use a single entry with a list of matching categories
            - 'primary': Keep only the first matching category

    Returns:
        pd.DataFrame: Processed OSM locations
    """
    if handle_duplicates not in ("separate", "combine", "primary"):
        raise ValueError(
            "handle_duplicates must be one of: 'separate', 'combine', 'primary'"
        )

    self.logger.info(
        f"Fetching OSM locations from Overpass API for country: {self.country}"
    )
    self.logger.info(f"Location types: {self.location_types}")
    self.logger.info(f"Handling duplicate category matches as: {handle_duplicates}")

    # Get queries for different element types
    nodes_relations_query, ways_query = self._build_queries(since_year)

    # Fetch nodes and relations
    nodes_relations_response = self._make_request(nodes_relations_query)
    nodes_relations = nodes_relations_response.get("elements", [])

    # Fetch ways
    ways_response = self._make_request(ways_query)
    ways = ways_response.get("elements", [])

    if not nodes_relations and not ways:
        self.logger.warning("No locations found for the specified criteria")
        return pd.DataFrame()

    self.logger.info(
        f"Processing {len(nodes_relations)} nodes/relations and {len(ways)} ways..."
    )

    # Process nodes and relations
    with ThreadPoolExecutor() as executor:
        processed_nodes_relations = [
            item
            for sublist in executor.map(
                self._process_node_relation, nodes_relations
            )
            for item in sublist
        ]

    # Process ways
    with ThreadPoolExecutor() as executor:
        processed_ways = [
            item
            for sublist in executor.map(self._process_way, ways)
            for item in sublist
        ]

    # Combine all processed elements
    all_elements = processed_nodes_relations + processed_ways

    if not all_elements:
        self.logger.warning("No matching elements found after processing")
        return pd.DataFrame()

    # Handle duplicates based on the specified strategy
    if handle_duplicates != "separate":
        # Group by source_id
        grouped_elements = {}
        for elem in all_elements:
            source_id = elem["source_id"]
            if source_id not in grouped_elements:
                grouped_elements[source_id] = elem
            elif handle_duplicates == "combine":
                # Combine matching categories
                if grouped_elements[source_id]["category"] != elem["category"]:
                    if isinstance(grouped_elements[source_id]["category"], str):
                        grouped_elements[source_id]["category"] = [
                            grouped_elements[source_id]["category"]
                        ]
                        grouped_elements[source_id]["category_value"] = [
                            grouped_elements[source_id]["category_value"]
                        ]

                    if (
                        elem["category"]
                        not in grouped_elements[source_id]["category"]
                    ):
                        grouped_elements[source_id]["category"].append(
                            elem["category"]
                        )
                        grouped_elements[source_id]["category_value"].append(
                            elem["category_value"]
                        )
            # For 'primary', just keep the first one we encountered

        all_elements = list(grouped_elements.values())

    locations = pd.DataFrame(all_elements)

    # Log element type distribution
    type_counts = locations["type"].value_counts()
    self.logger.info("\nElement type distribution:")
    for element_type, count in type_counts.items():
        self.logger.info(f"{element_type}: {count}")

    # Log category distribution
    if handle_duplicates == "combine":
        # Count each category separately when they're in lists
        category_counts = {}
        for cats in locations["category"]:
            if isinstance(cats, list):
                for cat in cats:
                    category_counts[cat] = category_counts.get(cat, 0) + 1
            else:
                category_counts[cats] = category_counts.get(cats, 0) + 1

        self.logger.info("\nCategory distribution:")
        for category, count in category_counts.items():
            self.logger.info(f"{category}: {count}")
    else:
        category_counts = locations["category"].value_counts()
        self.logger.info("\nCategory distribution:")
        for category, count in category_counts.items():
            self.logger.info(f"{category}: {count}")

    # Log elements with multiple matching categories
    multi_category = [e for e in all_elements if len(e["matching_categories"]) > 1]
    if multi_category:
        self.logger.info(
            f"\n{len(multi_category)} elements matched multiple categories"
        )

    self.logger.info(f"Successfully processed {len(locations)} locations")
    return locations

overture

OvertureAmenityFetcher

A class to fetch and process amenity locations from OpenStreetMap using the Overpass API.

Source code in gigaspatial/handlers/overture.py
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class OvertureAmenityFetcher:
    """
    A class to fetch and process amenity locations from OpenStreetMap using the Overpass API.
    """

    # constants
    release: Optional[str] = "2024-12-18.0"
    base_url: Optional[str] = (
        "s3://overturemaps-us-west-2/release/{release}/theme=places/*/*"
    )

    # user config
    country: str = Field(...)
    amenity_types: List[str] = Field(..., description="List of amenity types to fetch")
    geom: Union[Polygon, MultiPolygon] = None

    # config for country boundary access from data storage
    # if None GADM boundaries will be used
    data_store: DataStore = None
    country_geom_path: Optional[Union[str, Path]] = None

    def __post_init__(self):
        """Validate inputs and set up logging."""
        try:
            self.country = pycountry.countries.lookup(self.country).alpha_2
        except LookupError:
            raise ValueError(f"Invalid country code provided: {self.country}")

        self.base_url = self.base_url.format(release=self.release)
        self.logger = config.get_logger(__name__)

        self.connection = self._set_connection()

    def _set_connection(self):
        """Set the connection to the DB"""
        db = duckdb.connect()
        db.install_extension("spatial")
        db.load_extension("spatial")
        return db

    def _load_country_geometry(
        self,
    ) -> Union[Polygon, MultiPolygon]:
        """Load country boundary geometry from DataStore or GADM."""

        gdf_admin0 = AdminBoundaries.create(
            country_code=pycountry.countries.lookup(self.country).alpha_3,
            admin_level=0,
            data_store=self.data_store,
            path=self.country_geom_path,
        ).to_geodataframe()

        return gdf_admin0.geometry.iloc[0]

    def _build_query(self, match_pattern: bool = False, **kwargs) -> str:
        """Constructs and returns the query"""

        if match_pattern:
            amenity_query = " OR ".join(
                [f"category ilike '%{amenity}%'" for amenity in self.amenity_types]
            )
        else:
            amenity_query = " OR ".join(
                [f"category == '{amenity}'" for amenity in self.amenity_types]
            )

        query = """
        SELECT id,
            names.primary AS name,
            ROUND(confidence,2) as confidence,
            categories.primary AS category,
            ST_AsText(geometry) as geometry,
        FROM read_parquet('s3://overturemaps-us-west-2/release/2024-12-18.0/theme=places/type=place/*',
            hive_partitioning=1)
        WHERE bbox.xmin > {}
            AND bbox.ymin > {} 
            AND bbox.xmax <  {}
            AND bbox.ymax < {}
            AND ({})
        """

        if not self.geom:
            self.geom = self._load_country_geometry()

        return query.format(*self.geom.bounds, amenity_query)

    def fetch_locations(
        self, match_pattern: bool = False, **kwargs
    ) -> gpd.GeoDataFrame:
        """Fetch and process amenity locations."""
        self.logger.info("Fetching amenity locations from Overture DB...")

        query = self._build_query(match_pattern=match_pattern, **kwargs)

        df = self.connection.execute(query).df()

        self.logger.info("Processing geometries")
        gdf = gpd.GeoDataFrame(
            df, geometry=gpd.GeoSeries.from_wkt(df["geometry"]), crs="EPSG:4326"
        )

        # filter by geometry boundary
        s = STRtree(gdf.geometry)
        result = s.query(self.geom, predicate="intersects")

        locations = gdf.iloc[result].reset_index(drop=True)

        self.logger.info(f"Successfully processed {len(locations)} amenity locations")
        return locations
__post_init__()

Validate inputs and set up logging.

Source code in gigaspatial/handlers/overture.py
def __post_init__(self):
    """Validate inputs and set up logging."""
    try:
        self.country = pycountry.countries.lookup(self.country).alpha_2
    except LookupError:
        raise ValueError(f"Invalid country code provided: {self.country}")

    self.base_url = self.base_url.format(release=self.release)
    self.logger = config.get_logger(__name__)

    self.connection = self._set_connection()
fetch_locations(match_pattern=False, **kwargs)

Fetch and process amenity locations.

Source code in gigaspatial/handlers/overture.py
def fetch_locations(
    self, match_pattern: bool = False, **kwargs
) -> gpd.GeoDataFrame:
    """Fetch and process amenity locations."""
    self.logger.info("Fetching amenity locations from Overture DB...")

    query = self._build_query(match_pattern=match_pattern, **kwargs)

    df = self.connection.execute(query).df()

    self.logger.info("Processing geometries")
    gdf = gpd.GeoDataFrame(
        df, geometry=gpd.GeoSeries.from_wkt(df["geometry"]), crs="EPSG:4326"
    )

    # filter by geometry boundary
    s = STRtree(gdf.geometry)
    result = s.query(self.geom, predicate="intersects")

    locations = gdf.iloc[result].reset_index(drop=True)

    self.logger.info(f"Successfully processed {len(locations)} amenity locations")
    return locations

worldpop

WorldPopConfig

Bases: BaseModel

Source code in gigaspatial/handlers/worldpop.py
class WorldPopConfig(BaseModel):
    # class variables
    _metadata_cache: ClassVar[Optional[pd.DataFrame]] = None

    # constants
    CURRENT_MAX_YEAR: int = 2022
    EARLIEST_YEAR: int = 2000
    SCHOOL_AGE_YEAR: int = 2020

    # base config
    WORLDPOP_DB_BASE_URL: HttpUrl = Field(default="https://data.worldpop.org/")
    SCHOOL_AGE_POPULATION_PATH: str = Field(
        default="GIS/AgeSex_structures/school_age_population/v1/2020/"
    )
    PPP_2021_2022_PATH: str = Field(
        default="GIS/Population/Global_2021_2022_1km_UNadj/"
    )
    DATASETS_METADATA_PATH: str = Field(default="assets/wpgpDatasets.csv")

    # user config
    base_path: Path = Field(default=global_config.get_path("worldpop", "bronze"))
    country: str = Field(...)
    year: int = Field(..., ge=EARLIEST_YEAR, le=CURRENT_MAX_YEAR)
    resolution: Literal["HIGH", "LOW"] = Field(
        default="LOW",
        description="Spatial resolution of the population grid: HIGH (100m) or LOW (1km)",
    )
    un_adjusted: bool = True
    constrained: bool = False
    school_age: Optional[Literal["PRIMARY", "SECONDARY"]] = None
    gender: Literal["F", "M", "F_M"] = "F_M"

    @field_validator("country")
    def validate_country(cls, value: str) -> str:
        try:
            return pycountry.countries.lookup(value).alpha_3
        except LookupError:
            raise ValueError(f"Invalid country code provided: {value}")

    @model_validator(mode="after")
    def validate_configuration(self):
        """
        Validate that the configuration is valid based on dataset availability constraints.

        Specific rules:
        - Post-2020 data is only available at 1km resolution with UN adjustment
        - School age population data is only available for 2020 at 1km resolution
        """
        if self.year > self.SCHOOL_AGE_YEAR:
            if self.resolution != "LOW":
                raise ValueError(
                    f"Data for year {self.year} is only available at LOW (1km) resolution"
                )

            if not self.un_adjusted:
                raise ValueError(
                    f"Data for year {self.year} is only available with UN adjustment"
                )

        if self.school_age:
            if self.resolution != "LOW":
                raise ValueError(
                    f"School age data is only available at LOW (1km) resolution"
                )

            if self.year != self.SCHOOL_AGE_YEAR:
                self.year = self.SCHOOL_AGE_YEAR
                raise ValueError(f"School age data is only available for 2020")

        return self

    @property
    def dataset_url(self) -> str:
        """Get the URL for the configured dataset. The URL is computed on first access and then cached for subsequent calls."""
        if not hasattr(self, "_dataset_url"):
            self._dataset_url = self._compute_dataset_url()
        return self._dataset_url

    @property
    def dataset_path(self) -> Path:
        """Construct and return the path for the configured dataset."""
        url_parts = self.dataset_url.split("/")
        file_path = (
            "/".join(
                [url_parts[4], url_parts[5], url_parts[7], self.country, url_parts[-1]]
            )
            if self.school_age
            else "/".join([url_parts[4], url_parts[6], self.country, url_parts[-1]])
        )
        return self.base_path / file_path

    def _load_datasets_metadata(self) -> pd.DataFrame:
        """Load and return the WorldPop datasets metadata, using cache if available."""
        if WorldPopConfig._metadata_cache is not None:
            return WorldPopConfig._metadata_cache

        try:
            WorldPopConfig._metadata_cache = pd.read_csv(
                str(self.WORLDPOP_DB_BASE_URL) + self.DATASETS_METADATA_PATH
            )
            return WorldPopConfig._metadata_cache
        except (URLError, pd.errors.EmptyDataError) as e:
            raise RuntimeError(f"Failed to load WorldPop datasets metadata: {e}")

    def _compute_dataset_url(self) -> str:
        """Construct and return the URL for the configured dataset."""
        # handle post-2020 datasets
        if self.year > self.SCHOOL_AGE_YEAR:
            return (
                str(self.WORLDPOP_DB_BASE_URL)
                + self.PPP_2021_2022_PATH
                + f"{'' if self.constrained else 'un'}constrained/{self.year}/{self.country}/{self.country.lower()}_ppp_{self.year}_1km_UNadj{'_constrained' if self.constrained else ''}.tif"
            )

        # handle school-age population datasets
        if self.school_age:
            return (
                str(self.WORLDPOP_DB_BASE_URL)
                + self.SCHOOL_AGE_POPULATION_PATH
                + f"{self.country}/{self.country}_SAP_1km_2020/{self.country}_{self.gender}_{self.school_age}_2020_1km.tif"
            )

        # handle standard population datasets
        wp_metadata = self._load_datasets_metadata()

        try:
            dataset_url = (
                self.WORLDPOP_DB_BASE_URL
                + wp_metadata[
                    (wp_metadata.ISO3 == self.country)
                    & (
                        wp_metadata.Covariate
                        == "ppp_"
                        + str(self.year)
                        + ("_UNadj" if self.un_adjusted else "")
                    )
                ].PathToRaster.values[0]
            )
        except IndexError:
            raise ValueError(
                f"No dataset found for country={self.country}, year={self.year}, un_adjusted={self.un_adjusted}"
            )

        # handle resolution conversion if needed
        if self.resolution == "HIGH":
            return dataset_url

        url_parts = dataset_url.split("/")
        url_parts[5] = (
            url_parts[5] + "_1km" + ("_UNadj" if self.un_adjusted else "")
        )  # get 1km folder with UNadj specification
        url_parts[8] = url_parts[8].replace(
            str(self.year), str(self.year) + "_1km_Aggregated"
        )  # get filename with 1km res
        dataset_url = "/".join(url_parts)

        return dataset_url

    def __repr__(self) -> str:

        parts = [
            f"WorldpopConfig(",
            f"  country='{self.country}'",
            f"  year={self.year}",
            f"  resolution={self.resolution}",
            f"  un_adjusted={self.un_adjusted}",
            f"  constrained={self.constrained}",
        ]

        if self.school_age:
            parts.append(f"  school_age='{self.school_age}'")
            parts.append(f"  gender='{self.gender}'")

        parts.append(")")

        return "\n".join(parts)
dataset_path: Path property

Construct and return the path for the configured dataset.

dataset_url: str property

Get the URL for the configured dataset. The URL is computed on first access and then cached for subsequent calls.

validate_configuration()

Validate that the configuration is valid based on dataset availability constraints.

Specific rules: - Post-2020 data is only available at 1km resolution with UN adjustment - School age population data is only available for 2020 at 1km resolution

Source code in gigaspatial/handlers/worldpop.py
@model_validator(mode="after")
def validate_configuration(self):
    """
    Validate that the configuration is valid based on dataset availability constraints.

    Specific rules:
    - Post-2020 data is only available at 1km resolution with UN adjustment
    - School age population data is only available for 2020 at 1km resolution
    """
    if self.year > self.SCHOOL_AGE_YEAR:
        if self.resolution != "LOW":
            raise ValueError(
                f"Data for year {self.year} is only available at LOW (1km) resolution"
            )

        if not self.un_adjusted:
            raise ValueError(
                f"Data for year {self.year} is only available with UN adjustment"
            )

    if self.school_age:
        if self.resolution != "LOW":
            raise ValueError(
                f"School age data is only available at LOW (1km) resolution"
            )

        if self.year != self.SCHOOL_AGE_YEAR:
            self.year = self.SCHOOL_AGE_YEAR
            raise ValueError(f"School age data is only available for 2020")

    return self

WorldPopDownloader

A class to handle downloads of WorldPop datasets.

Source code in gigaspatial/handlers/worldpop.py
class WorldPopDownloader:
    """A class to handle downloads of WorldPop datasets."""

    def __init__(
        self,
        config: Union[WorldPopConfig, dict[str, Union[str, int]]],
        data_store: Optional[DataStore] = None,
        logger: Optional[logging.Logger] = None,
    ):
        """
        Initialize the downloader.

        Args:
            config: Configuration for the WorldPop dataset, either as a WorldPopConfig object or a dictionary of parameters
            data_store: Optional data storage interface. If not provided, uses LocalDataStore.
            logger: Optional custom logger. If not provided, uses default logger.
        """
        self.logger = logger or global_config.get_logger(__name__)
        self.data_store = data_store or LocalDataStore()
        self.config = (
            config if isinstance(config, WorldPopConfig) else WorldPopConfig(**config)
        )

    @classmethod
    def from_country_year(cls, country: str, year: int, **kwargs):
        """
        Create a downloader instance from country and year.

        Args:
            country: Country code or name
            year: Year of the dataset
            **kwargs: Additional parameters for WorldPopConfig or the downloader
        """
        return cls({"country": country, "year": year}, **kwargs)

    def download_dataset(self) -> str:
        """
        Download the configured dataset to the provided output path.
        """

        try:
            response = requests.get(self.config.dataset_url, stream=True)
            response.raise_for_status()

            output_path = str(self.config.dataset_path)

            total_size = int(response.headers.get("content-length", 0))

            with self.data_store.open(output_path, "wb") as file:
                with tqdm(
                    total=total_size,
                    unit="B",
                    unit_scale=True,
                    desc=f"Downloading {os.path.basename(output_path)}",
                ) as pbar:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            file.write(chunk)
                            pbar.update(len(chunk))

            self.logger.debug(f"Successfully downloaded dataset: {self.config}")

            return output_path

        except requests.exceptions.RequestException as e:
            self.logger.error(f"Failed to download dataset {self.config}: {str(e)}")
            return None
        except Exception as e:
            self.logger.error(f"Unexpected error downloading dataset: {str(e)}")
            return None
__init__(config, data_store=None, logger=None)

Initialize the downloader.

Parameters:

Name Type Description Default
config Union[WorldPopConfig, dict[str, Union[str, int]]]

Configuration for the WorldPop dataset, either as a WorldPopConfig object or a dictionary of parameters

required
data_store Optional[DataStore]

Optional data storage interface. If not provided, uses LocalDataStore.

None
logger Optional[Logger]

Optional custom logger. If not provided, uses default logger.

None
Source code in gigaspatial/handlers/worldpop.py
def __init__(
    self,
    config: Union[WorldPopConfig, dict[str, Union[str, int]]],
    data_store: Optional[DataStore] = None,
    logger: Optional[logging.Logger] = None,
):
    """
    Initialize the downloader.

    Args:
        config: Configuration for the WorldPop dataset, either as a WorldPopConfig object or a dictionary of parameters
        data_store: Optional data storage interface. If not provided, uses LocalDataStore.
        logger: Optional custom logger. If not provided, uses default logger.
    """
    self.logger = logger or global_config.get_logger(__name__)
    self.data_store = data_store or LocalDataStore()
    self.config = (
        config if isinstance(config, WorldPopConfig) else WorldPopConfig(**config)
    )
download_dataset()

Download the configured dataset to the provided output path.

Source code in gigaspatial/handlers/worldpop.py
def download_dataset(self) -> str:
    """
    Download the configured dataset to the provided output path.
    """

    try:
        response = requests.get(self.config.dataset_url, stream=True)
        response.raise_for_status()

        output_path = str(self.config.dataset_path)

        total_size = int(response.headers.get("content-length", 0))

        with self.data_store.open(output_path, "wb") as file:
            with tqdm(
                total=total_size,
                unit="B",
                unit_scale=True,
                desc=f"Downloading {os.path.basename(output_path)}",
            ) as pbar:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        file.write(chunk)
                        pbar.update(len(chunk))

        self.logger.debug(f"Successfully downloaded dataset: {self.config}")

        return output_path

    except requests.exceptions.RequestException as e:
        self.logger.error(f"Failed to download dataset {self.config}: {str(e)}")
        return None
    except Exception as e:
        self.logger.error(f"Unexpected error downloading dataset: {str(e)}")
        return None
from_country_year(country, year, **kwargs) classmethod

Create a downloader instance from country and year.

Parameters:

Name Type Description Default
country str

Country code or name

required
year int

Year of the dataset

required
**kwargs

Additional parameters for WorldPopConfig or the downloader

{}
Source code in gigaspatial/handlers/worldpop.py
@classmethod
def from_country_year(cls, country: str, year: int, **kwargs):
    """
    Create a downloader instance from country and year.

    Args:
        country: Country code or name
        year: Year of the dataset
        **kwargs: Additional parameters for WorldPopConfig or the downloader
    """
    return cls({"country": country, "year": year}, **kwargs)

read_dataset(data_store, path, compression=None, **kwargs)

Read data from various file formats stored in both local and cloud-based storage.

Parameters:

data_store : DataStore Instance of DataStore for accessing data storage. path : str, Path Path to the file in data storage. **kwargs : dict Additional arguments passed to the specific reader function.

Returns:

pandas.DataFrame or geopandas.GeoDataFrame The data read from the file.

Raises:

FileNotFoundError If the file doesn't exist in blob storage. ValueError If the file type is unsupported or if there's an error reading the file.

Source code in gigaspatial/core/io/readers.py
def read_dataset(data_store: DataStore, path: str, compression: str = None, **kwargs):
    """
    Read data from various file formats stored in both local and cloud-based storage.

    Parameters:
    ----------
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    path : str, Path
        Path to the file in data storage.
    **kwargs : dict
        Additional arguments passed to the specific reader function.

    Returns:
    -------
    pandas.DataFrame or geopandas.GeoDataFrame
        The data read from the file.

    Raises:
    ------
    FileNotFoundError
        If the file doesn't exist in blob storage.
    ValueError
        If the file type is unsupported or if there's an error reading the file.
    """

    # Define supported file formats and their readers
    BINARY_FORMATS = {
        ".shp",
        ".zip",
        ".parquet",
        ".gpkg",
        ".xlsx",
        ".xls",
        ".kmz",
        ".gz",
    }

    PANDAS_READERS = {
        ".csv": pd.read_csv,
        ".xlsx": lambda f, **kw: pd.read_excel(f, engine="openpyxl", **kw),
        ".xls": lambda f, **kw: pd.read_excel(f, engine="xlrd", **kw),
        ".json": pd.read_json,
        # ".gz": lambda f, **kw: pd.read_csv(f, compression="gzip", **kw),
    }

    GEO_READERS = {
        ".shp": gpd.read_file,
        ".zip": gpd.read_file,
        ".geojson": gpd.read_file,
        ".gpkg": gpd.read_file,
        ".parquet": gpd.read_parquet,
        ".kmz": read_kmz,
    }

    COMPRESSION_FORMATS = {
        ".gz": "gzip",
        ".bz2": "bz2",
        ".zip": "zip",
        ".xz": "xz",
    }

    try:
        # Check if file exists
        if not data_store.file_exists(path):
            raise FileNotFoundError(f"File '{path}' not found in blob storage")

        path_obj = Path(path)
        suffixes = path_obj.suffixes
        file_extension = suffixes[-1].lower() if suffixes else ""

        if compression is None and file_extension in COMPRESSION_FORMATS:
            compression_format = COMPRESSION_FORMATS[file_extension]

            # if file has multiple extensions (e.g., .csv.gz), get the inner format
            if len(suffixes) > 1:
                inner_extension = suffixes[-2].lower()

                if inner_extension == ".tar":
                    raise ValueError(
                        "Tar archives (.tar.gz) are not directly supported"
                    )

                if inner_extension in PANDAS_READERS:
                    try:
                        with data_store.open(path, "rb") as f:
                            return PANDAS_READERS[inner_extension](
                                f, compression=compression_format, **kwargs
                            )
                    except Exception as e:
                        raise ValueError(f"Error reading compressed file: {str(e)}")
                elif inner_extension in GEO_READERS:
                    try:
                        with data_store.open(path, "rb") as f:
                            if compression_format == "gzip":
                                import gzip

                                decompressed_data = gzip.decompress(f.read())
                                import io

                                return GEO_READERS[inner_extension](
                                    io.BytesIO(decompressed_data), **kwargs
                                )
                            else:
                                raise ValueError(
                                    f"Compression format {compression_format} not supported for geo data"
                                )
                    except Exception as e:
                        raise ValueError(f"Error reading compressed geo file: {str(e)}")
            else:
                # if just .gz without clear inner type, assume csv
                try:
                    with data_store.open(path, "rb") as f:
                        return pd.read_csv(f, compression=compression_format, **kwargs)
                except Exception as e:
                    raise ValueError(
                        f"Error reading compressed file as CSV: {str(e)}. "
                        f"If not a CSV, specify the format in the filename (e.g., .json.gz)"
                    )

        # Special handling for compressed files
        if file_extension == ".zip":
            # For zip files, we need to use binary mode
            with data_store.open(path, "rb") as f:
                return gpd.read_file(f)

        # Determine if we need binary mode based on file type
        mode = "rb" if file_extension in BINARY_FORMATS else "r"

        # Try reading with appropriate reader
        if file_extension in PANDAS_READERS:
            try:
                with data_store.open(path, mode) as f:
                    return PANDAS_READERS[file_extension](f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error reading file with pandas: {str(e)}")

        if file_extension in GEO_READERS:
            try:
                with data_store.open(path, "rb") as f:
                    return GEO_READERS[file_extension](f, **kwargs)
            except Exception as e:
                # For parquet files, try pandas reader if geopandas fails
                if file_extension == ".parquet":
                    try:
                        with data_store.open(path, "rb") as f:
                            return pd.read_parquet(f, **kwargs)
                    except Exception as e2:
                        raise ValueError(
                            f"Failed to read parquet with both geopandas ({str(e)}) "
                            f"and pandas ({str(e2)})"
                        )
                raise ValueError(f"Error reading file with geopandas: {str(e)}")

        # If we get here, the file type is unsupported
        supported_formats = sorted(set(PANDAS_READERS.keys()) | set(GEO_READERS.keys()))
        supported_compressions = sorted(COMPRESSION_FORMATS.keys())
        raise ValueError(
            f"Unsupported file type: {file_extension}\n"
            f"Supported formats: {', '.join(supported_formats)}"
            f"Supported compressions: {', '.join(supported_compressions)}"
        )

    except Exception as e:
        if isinstance(e, (FileNotFoundError, ValueError)):
            raise
        raise RuntimeError(f"Unexpected error reading dataset: {str(e)}")

read_datasets(data_store, paths, **kwargs)

Read multiple datasets from data storage at once.

Parameters:

data_store : DataStore Instance of DataStore for accessing data storage. paths : list of str Paths to files in data storage. **kwargs : dict Additional arguments passed to read_dataset.

Returns:

dict Dictionary mapping paths to their corresponding DataFrames/GeoDataFrames.

Source code in gigaspatial/core/io/readers.py
def read_datasets(data_store: DataStore, paths, **kwargs):
    """
    Read multiple datasets from data storage at once.

    Parameters:
    ----------
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    paths : list of str
        Paths to files in data storage.
    **kwargs : dict
        Additional arguments passed to read_dataset.

    Returns:
    -------
    dict
        Dictionary mapping paths to their corresponding DataFrames/GeoDataFrames.
    """
    results = {}
    errors = {}

    for path in paths:
        try:
            results[path] = read_dataset(data_store, path, **kwargs)
        except Exception as e:
            errors[path] = str(e)

    if errors:
        error_msg = "\n".join(f"- {path}: {error}" for path, error in errors.items())
        raise ValueError(f"Errors reading datasets:\n{error_msg}")

    return results

read_gzipped_json_or_csv(file_path, data_store)

Reads a gzipped file, attempting to parse it as JSON (lines=True) or CSV.

Source code in gigaspatial/core/io/readers.py
def read_gzipped_json_or_csv(file_path, data_store):
    """Reads a gzipped file, attempting to parse it as JSON (lines=True) or CSV."""

    with data_store.open(file_path, "rb") as f:
        g = gzip.GzipFile(fileobj=f)
        text = g.read().decode("utf-8")
        try:
            df = pd.read_json(io.StringIO(text), lines=True)
            return df
        except json.JSONDecodeError:
            try:
                df = pd.read_csv(io.StringIO(text))
                return df
            except pd.errors.ParserError:
                print(f"Error: Could not parse {file_path} as JSON or CSV.")
                return None

read_kmz(file_obj, **kwargs)

Helper function to read KMZ files and return a GeoDataFrame.

Source code in gigaspatial/core/io/readers.py
def read_kmz(file_obj, **kwargs):
    """Helper function to read KMZ files and return a GeoDataFrame."""
    try:
        with zipfile.ZipFile(file_obj) as kmz:
            # Find the KML file in the archive (usually doc.kml)
            kml_filename = next(
                name for name in kmz.namelist() if name.endswith(".kml")
            )

            # Read the KML content
            kml_content = io.BytesIO(kmz.read(kml_filename))

            gdf = gpd.read_file(kml_content)

            # Validate the GeoDataFrame
            if gdf.empty:
                raise ValueError(
                    "The KML file is empty or does not contain valid geospatial data."
                )

        return gdf

    except zipfile.BadZipFile:
        raise ValueError("The provided file is not a valid KMZ file.")
    except StopIteration:
        raise ValueError("No KML file found in the KMZ archive.")
    except Exception as e:
        raise RuntimeError(f"An error occurred: {e}")

write_dataset(data, data_store, path, **kwargs)

Write DataFrame or GeoDataFrame to various file formats in Azure Blob Storage.

Parameters:

data : pandas.DataFrame or geopandas.GeoDataFrame The data to write to blob storage. data_store : DataStore Instance of DataStore for accessing data storage. path : str Path where the file will be written in data storage. **kwargs : dict Additional arguments passed to the specific writer function.

Raises:

ValueError If the file type is unsupported or if there's an error writing the file. TypeError If input data is not a DataFrame or GeoDataFrame.

Source code in gigaspatial/core/io/writers.py
def write_dataset(data, data_store: DataStore, path, **kwargs):
    """
    Write DataFrame or GeoDataFrame to various file formats in Azure Blob Storage.

    Parameters:
    ----------
    data : pandas.DataFrame or geopandas.GeoDataFrame
        The data to write to blob storage.
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    path : str
        Path where the file will be written in data storage.
    **kwargs : dict
        Additional arguments passed to the specific writer function.

    Raises:
    ------
    ValueError
        If the file type is unsupported or if there's an error writing the file.
    TypeError
        If input data is not a DataFrame or GeoDataFrame.
    """

    # Define supported file formats and their writers
    BINARY_FORMATS = {".shp", ".zip", ".parquet", ".gpkg", ".xlsx", ".xls"}

    PANDAS_WRITERS = {
        ".csv": lambda df, buf, **kw: df.to_csv(buf, **kw),
        ".xlsx": lambda df, buf, **kw: df.to_excel(buf, engine="openpyxl", **kw),
        ".json": lambda df, buf, **kw: df.to_json(buf, **kw),
        ".parquet": lambda df, buf, **kw: df.to_parquet(buf, **kw),
    }

    GEO_WRITERS = {
        ".geojson": lambda gdf, buf, **kw: gdf.to_file(buf, driver="GeoJSON", **kw),
        ".gpkg": lambda gdf, buf, **kw: gdf.to_file(buf, driver="GPKG", **kw),
        ".parquet": lambda gdf, buf, **kw: gdf.to_parquet(buf, **kw),
    }

    try:
        # Input validation
        if not isinstance(data, (pd.DataFrame, gpd.GeoDataFrame)):
            raise TypeError("Input data must be a pandas DataFrame or GeoDataFrame")

        # Get file suffix and ensure it's lowercase
        suffix = Path(path).suffix.lower()

        # Determine if we need binary mode based on file type
        mode = "wb" if suffix in BINARY_FORMATS else "w"

        # Handle different data types and formats
        if isinstance(data, gpd.GeoDataFrame):
            if suffix not in GEO_WRITERS:
                supported_formats = sorted(GEO_WRITERS.keys())
                raise ValueError(
                    f"Unsupported file type for GeoDataFrame: {suffix}\n"
                    f"Supported formats: {', '.join(supported_formats)}"
                )

            try:
                with data_store.open(path, "wb") as f:
                    GEO_WRITERS[suffix](data, f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error writing GeoDataFrame: {str(e)}")

        else:  # pandas DataFrame
            if suffix not in PANDAS_WRITERS:
                supported_formats = sorted(PANDAS_WRITERS.keys())
                raise ValueError(
                    f"Unsupported file type for DataFrame: {suffix}\n"
                    f"Supported formats: {', '.join(supported_formats)}"
                )

            try:
                with data_store.open(path, mode) as f:
                    PANDAS_WRITERS[suffix](data, f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error writing DataFrame: {str(e)}")

    except Exception as e:
        if isinstance(e, (TypeError, ValueError)):
            raise
        raise RuntimeError(f"Unexpected error writing dataset: {str(e)}")

write_datasets(data_dict, data_store, **kwargs)

Write multiple datasets to data storage at once.

Parameters:

data_dict : dict Dictionary mapping paths to DataFrames/GeoDataFrames. data_store : DataStore Instance of DataStore for accessing data storage. **kwargs : dict Additional arguments passed to write_dataset.

Raises:

ValueError If there are any errors writing the datasets.

Source code in gigaspatial/core/io/writers.py
def write_datasets(data_dict, data_store: DataStore, **kwargs):
    """
    Write multiple datasets to data storage at once.

    Parameters:
    ----------
    data_dict : dict
        Dictionary mapping paths to DataFrames/GeoDataFrames.
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    **kwargs : dict
        Additional arguments passed to write_dataset.

    Raises:
    ------
    ValueError
        If there are any errors writing the datasets.
    """
    errors = {}

    for path, data in data_dict.items():
        try:
            write_dataset(data, data_store, path, **kwargs)
        except Exception as e:
            errors[path] = str(e)

    if errors:
        error_msg = "\n".join(f"- {path}: {error}" for path, error in errors.items())
        raise ValueError(f"Errors writing datasets:\n{error_msg}")