Skip to content

Grid Module

gigaspatial.grid

h3

CountryH3Hexagons

Bases: H3Hexagons

H3Hexagons specialized for country-level operations.

This class extends H3Hexagons to work specifically with country boundaries. It can only be instantiated through the create() classmethod.

Source code in gigaspatial/grid/h3.py
class CountryH3Hexagons(H3Hexagons):
    """H3Hexagons specialized for country-level operations.

    This class extends H3Hexagons to work specifically with country boundaries.
    It can only be instantiated through the create() classmethod.
    """

    country: str = Field(..., exclude=True)

    def __init__(self, *args, **kwargs):
        raise TypeError(
            "CountryH3Hexagons cannot be instantiated directly. "
            "Use CountryH3Hexagons.create() instead."
        )

    @classmethod
    def create(
        cls,
        country: str,
        resolution: int,
        contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ):
        """Create CountryH3Hexagons for a specific country."""
        from gigaspatial.handlers.boundaries import AdminBoundaries

        instance = super().__new__(cls)
        super(CountryH3Hexagons, instance).__init__(
            resolution=resolution,
            hexagons=[],
            data_store=data_store or LocalDataStore(),
            country=pycountry.countries.lookup(country).alpha_3,
        )

        cls.logger.info(
            f"Initializing H3 hexagons for country: {country} at resolution {resolution}"
        )

        country_geom = (
            AdminBoundaries.create(
                country_code=country,
                data_store=data_store,
                path=country_geom_path,
            )
            .boundaries[0]
            .geometry
        )

        hexagons = H3Hexagons.from_geometry(country_geom, resolution, contain=contain)

        instance.hexagons = hexagons.hexagons
        return instance
create(country, resolution, contain='overlap', data_store=None, country_geom_path=None) classmethod

Create CountryH3Hexagons for a specific country.

Source code in gigaspatial/grid/h3.py
@classmethod
def create(
    cls,
    country: str,
    resolution: int,
    contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
):
    """Create CountryH3Hexagons for a specific country."""
    from gigaspatial.handlers.boundaries import AdminBoundaries

    instance = super().__new__(cls)
    super(CountryH3Hexagons, instance).__init__(
        resolution=resolution,
        hexagons=[],
        data_store=data_store or LocalDataStore(),
        country=pycountry.countries.lookup(country).alpha_3,
    )

    cls.logger.info(
        f"Initializing H3 hexagons for country: {country} at resolution {resolution}"
    )

    country_geom = (
        AdminBoundaries.create(
            country_code=country,
            data_store=data_store,
            path=country_geom_path,
        )
        .boundaries[0]
        .geometry
    )

    hexagons = H3Hexagons.from_geometry(country_geom, resolution, contain=contain)

    instance.hexagons = hexagons.hexagons
    return instance

H3Hexagons

Bases: BaseModel

Source code in gigaspatial/grid/h3.py
class H3Hexagons(BaseModel):
    resolution: int = Field(..., ge=0, le=15)
    hexagons: List[str] = Field(default_factory=list)
    data_store: DataStore = Field(default_factory=LocalDataStore, exclude=True)
    logger: ClassVar = config.get_logger("H3Hexagons")

    class Config:
        arbitrary_types_allowed = True

    @classmethod
    def from_hexagons(cls, hexagons: List[str]):
        """Create H3Hexagons from list of H3 cell IDs."""
        if not hexagons:
            cls.logger.warning("No hexagons provided to from_hexagons.")
            return cls(resolution=0, hexagons=[])

        cls.logger.info(
            f"Initializing H3Hexagons from {len(hexagons)} provided hexagons."
        )
        # Get resolution from first hexagon
        resolution = h3.get_resolution(hexagons[0])
        return cls(resolution=resolution, hexagons=list(set(hexagons)))

    @classmethod
    def from_bounds(
        cls, xmin: float, ymin: float, xmax: float, ymax: float, resolution: int
    ):
        """Create H3Hexagons from boundary coordinates."""
        cls.logger.info(
            f"Creating H3Hexagons from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at resolution: {resolution}"
        )

        # Create a LatLong bounding box polygon
        latlong_bbox_coords = [
            [ymin, xmin],
            [ymax, xmin],
            [ymax, xmax],
            [ymin, xmax],
            [ymin, xmin],
        ]

        # Get H3 cells that intersect with the bounding box
        poly = h3.LatLngPoly(latlong_bbox_coords)
        hexagons = h3.h3shape_to_cells(poly, res=resolution)

        return cls(resolution=resolution, hexagons=list(hexagons))

    @classmethod
    def from_spatial(
        cls,
        source: Union[
            BaseGeometry,
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],  # points
        ],
        resolution: int,
        contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
        **kwargs,
    ):
        cls.logger.info(
            f"Creating H3Hexagons from spatial source (type: {type(source)}) at resolution: {resolution} with predicate: {contain}"
        )
        if isinstance(source, gpd.GeoDataFrame):
            if source.crs != "EPSG:4326":
                source = source.to_crs("EPSG:4326")

            is_point_series = source.geometry.geom_type == "Point"
            all_are_points = is_point_series.all()

            if all_are_points:
                source = source.geometry.to_list()
            else:
                source = source.geometry.unary_union

        if isinstance(source, BaseGeometry):
            return cls.from_geometry(
                geometry=source, resolution=resolution, contain=contain, **kwargs
            )
        elif isinstance(source, Iterable) and all(
            isinstance(pt, Point) or len(pt) == 2 for pt in source
        ):
            return cls.from_points(points=source, resolution=resolution, **kwargs)
        else:
            raise ValueError("Unsupported source type for H3Hexagons.from_spatial")

    @classmethod
    def from_geometry(
        cls,
        geometry: BaseGeometry,
        resolution: int,
        contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
        **kwargs,
    ):
        """Create H3Hexagons from a geometry."""
        cls.logger.info(
            f"Creating H3Hexagons from geometry (bounds: {geometry.bounds}) at resolution: {resolution} with predicate: {contain}"
        )

        if isinstance(geometry, Point):
            return cls.from_points([geometry])

        # Convert shapely geometry to GeoJSON-like format
        if hasattr(geometry, "__geo_interface__"):
            geojson_geom = geometry.__geo_interface__
        else:
            # Fallback for complex geometries
            import json
            from shapely.geometry import mapping

            geojson_geom = mapping(geometry)

        h3_geom = h3.geo_to_h3shape(geojson_geom)

        hexagons = h3.h3shape_to_cells_experimental(
            h3_geom, resolution, contain=contain
        )

        cls.logger.info(
            f"Generated {len(hexagons)} hexagons using `{contain}` spatial predicate."
        )
        return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)

    @classmethod
    def from_points(
        cls, points: List[Union[Point, Tuple[float, float]]], resolution: int, **kwargs
    ) -> "H3Hexagons":
        """Create H3Hexagons from a list of points or lat-lon pairs."""
        cls.logger.info(
            f"Creating H3Hexagons from {len(points)} points at resolution: {resolution}"
        )
        hexagons = set(cls.get_hexagons_from_points(points, resolution))
        cls.logger.info(f"Generated {len(hexagons)} unique hexagons from points.")
        return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)

    @classmethod
    def from_json(
        cls, data_store: DataStore, file: Union[str, Path], **kwargs
    ) -> "H3Hexagons":
        """Load H3Hexagons from a JSON file."""
        cls.logger.info(
            f"Loading H3Hexagons from JSON file: {file} using data store: {type(data_store).__name__}"
        )
        with data_store.open(str(file), "r") as f:
            data = json.load(f)
            if isinstance(data, list):  # If file contains only hexagon IDs
                # Get resolution from first hexagon if available
                resolution = h3.get_resolution(data[0]) if data else 0
                data = {
                    "resolution": resolution,
                    "hexagons": data,
                    **kwargs,
                }
            else:
                data.update(kwargs)
            instance = cls(**data)
            instance.data_store = data_store
            cls.logger.info(
                f"Successfully loaded {len(instance.hexagons)} hexagons from JSON file."
            )
            return instance

    @property
    def average_hexagon_area(self):
        return h3.average_hexagon_area(self.resolution)

    @property
    def average_hexagon_edge_length(self):
        return h3.average_hexagon_edge_length(self.resolution)

    def filter_hexagons(self, hexagons: Iterable[str]) -> "H3Hexagons":
        """Filter hexagons by a given set of hexagon IDs."""
        original_count = len(self.hexagons)
        incoming_count = len(
            list(hexagons)
        )  # Convert to list to get length if it's an iterator

        self.logger.info(
            f"Filtering {original_count} hexagons with an incoming set of {incoming_count} hexagons."
        )
        filtered_hexagons = list(set(self.hexagons) & set(hexagons))
        self.logger.info(f"Resulting in {len(filtered_hexagons)} filtered hexagons.")
        return H3Hexagons(
            resolution=self.resolution,
            hexagons=filtered_hexagons,
        )

    def to_dataframe(self) -> pd.DataFrame:
        """Convert to pandas DataFrame with hexagon ID and centroid coordinates."""
        self.logger.info(
            f"Converting {len(self.hexagons)} hexagons to pandas DataFrame."
        )
        if not self.hexagons:
            self.logger.warning(
                "No hexagons to convert to DataFrame. Returning empty DataFrame."
            )
            return pd.DataFrame(columns=["hexagon", "latitude", "longitude"])

        centroids = [h3.cell_to_latlng(hex_id) for hex_id in self.hexagons]

        self.logger.info(f"Successfully converted to DataFrame.")

        return pd.DataFrame(
            {
                "hexagon": self.hexagons,
                "latitude": [c[0] for c in centroids],
                "longitude": [c[1] for c in centroids],
            }
        )

    def to_geoms(self) -> List[Polygon]:
        """Convert hexagons to shapely Polygon geometries."""
        self.logger.info(
            f"Converting {len(self.hexagons)} hexagons to shapely Polygon geometries."
        )
        return [shape(h3.cells_to_geo([hex_id])) for hex_id in self.hexagons]

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert to GeoPandas GeoDataFrame."""
        return gpd.GeoDataFrame(
            {"h3": self.hexagons, "geometry": self.to_geoms()}, crs="EPSG:4326"
        )

    @staticmethod
    def get_hexagons_from_points(
        points: List[Union[Point, Tuple[float, float]]], resolution: int
    ) -> List[str]:
        """Get list of H3 hexagon IDs for the provided points at specified resolution.

        Args:
            points: List of points as either shapely Points or (lon, lat) tuples
            resolution: H3 resolution level

        Returns:
            List of H3 hexagon ID strings
        """
        hexagons = []
        for p in points:
            if isinstance(p, Point):
                # Shapely Point has x=lon, y=lat
                hex_id = h3.latlng_to_cell(p.y, p.x, resolution)
            else:
                # Assume tuple is (lon, lat) - convert to (lat, lon) for h3
                hex_id = h3.latlng_to_cell(p[1], p[0], resolution)
            hexagons.append(hex_id)
        return hexagons

    def get_neighbors(self, k: int = 1) -> "H3Hexagons":
        """Get k-ring neighbors of all hexagons.

        Args:
            k: Distance of neighbors (1 for immediate neighbors, 2 for neighbors of neighbors, etc.)

        Returns:
            New H3Hexagons instance with neighbors included
        """
        self.logger.info(
            f"Getting k-ring neighbors (k={k}) for {len(self.hexagons)} hexagons."
        )

        all_neighbors = set()
        for hex_id in self.hexagons:
            neighbors = h3.grid_ring(hex_id, k)
            all_neighbors.update(neighbors)

        self.logger.info(
            f"Found {len(all_neighbors)} total hexagons including neighbors."
        )
        return H3Hexagons(resolution=self.resolution, hexagons=list(all_neighbors))

    def get_compact_representation(self) -> "H3Hexagons":
        """Get compact representation by merging adjacent hexagons into parent cells where possible."""
        self.logger.info(f"Compacting {len(self.hexagons)} hexagons.")

        # Convert to set for h3.compact
        hex_set = set(self.hexagons)
        compacted = h3.compact_cells(hex_set)

        self.logger.info(f"Compacted to {len(compacted)} hexagons.")

        # Note: compacted representation may have mixed resolutions
        # We'll keep the original resolution as the "target" resolution
        return H3Hexagons(resolution=self.resolution, hexagons=list(compacted))

    def get_children(self, target_resolution: int) -> "H3Hexagons":
        """Get children hexagons at higher resolution.

        Args:
            target_resolution: Target resolution (must be higher than current)

        Returns:
            New H3Hexagons instance with children at target resolution
        """
        if target_resolution <= self.resolution:
            raise ValueError("Target resolution must be higher than current resolution")

        self.logger.info(
            f"Getting children at resolution {target_resolution} for {len(self.hexagons)} hexagons."
        )

        all_children = []
        for hex_id in self.hexagons:
            children = h3.cell_to_children(hex_id, target_resolution)
            all_children.extend(children)

        self.logger.info(f"Generated {len(all_children)} children hexagons.")
        return H3Hexagons(resolution=target_resolution, hexagons=all_children)

    def get_parents(self, target_resolution: int) -> "H3Hexagons":
        """Get parent hexagons at lower resolution.

        Args:
            target_resolution: Target resolution (must be lower than current)

        Returns:
            New H3Hexagons instance with parents at target resolution
        """
        if target_resolution >= self.resolution:
            raise ValueError("Target resolution must be lower than current resolution")

        self.logger.info(
            f"Getting parents at resolution {target_resolution} for {len(self.hexagons)} hexagons."
        )

        parents = set()
        for hex_id in self.hexagons:
            parent = h3.cell_to_parent(hex_id, target_resolution)
            parents.add(parent)

        self.logger.info(f"Generated {len(parents)} parent hexagons.")
        return H3Hexagons(resolution=target_resolution, hexagons=list(parents))

    def save(self, file: Union[str, Path], format: str = "json") -> None:
        """Save H3Hexagons to file in specified format."""
        with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
            if format == "parquet":
                self.to_geodataframe().to_parquet(f, index=False)
            elif format == "geojson":
                f.write(self.to_geodataframe().to_json(drop_id=True))
            elif format == "json":
                json.dump(self.hexagons, f)
            else:
                raise ValueError(f"Unsupported format: {format}")

    def __len__(self) -> int:
        return len(self.hexagons)
filter_hexagons(hexagons)

Filter hexagons by a given set of hexagon IDs.

Source code in gigaspatial/grid/h3.py
def filter_hexagons(self, hexagons: Iterable[str]) -> "H3Hexagons":
    """Filter hexagons by a given set of hexagon IDs."""
    original_count = len(self.hexagons)
    incoming_count = len(
        list(hexagons)
    )  # Convert to list to get length if it's an iterator

    self.logger.info(
        f"Filtering {original_count} hexagons with an incoming set of {incoming_count} hexagons."
    )
    filtered_hexagons = list(set(self.hexagons) & set(hexagons))
    self.logger.info(f"Resulting in {len(filtered_hexagons)} filtered hexagons.")
    return H3Hexagons(
        resolution=self.resolution,
        hexagons=filtered_hexagons,
    )
from_bounds(xmin, ymin, xmax, ymax, resolution) classmethod

Create H3Hexagons from boundary coordinates.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_bounds(
    cls, xmin: float, ymin: float, xmax: float, ymax: float, resolution: int
):
    """Create H3Hexagons from boundary coordinates."""
    cls.logger.info(
        f"Creating H3Hexagons from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at resolution: {resolution}"
    )

    # Create a LatLong bounding box polygon
    latlong_bbox_coords = [
        [ymin, xmin],
        [ymax, xmin],
        [ymax, xmax],
        [ymin, xmax],
        [ymin, xmin],
    ]

    # Get H3 cells that intersect with the bounding box
    poly = h3.LatLngPoly(latlong_bbox_coords)
    hexagons = h3.h3shape_to_cells(poly, res=resolution)

    return cls(resolution=resolution, hexagons=list(hexagons))
from_geometry(geometry, resolution, contain='overlap', **kwargs) classmethod

Create H3Hexagons from a geometry.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_geometry(
    cls,
    geometry: BaseGeometry,
    resolution: int,
    contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
    **kwargs,
):
    """Create H3Hexagons from a geometry."""
    cls.logger.info(
        f"Creating H3Hexagons from geometry (bounds: {geometry.bounds}) at resolution: {resolution} with predicate: {contain}"
    )

    if isinstance(geometry, Point):
        return cls.from_points([geometry])

    # Convert shapely geometry to GeoJSON-like format
    if hasattr(geometry, "__geo_interface__"):
        geojson_geom = geometry.__geo_interface__
    else:
        # Fallback for complex geometries
        import json
        from shapely.geometry import mapping

        geojson_geom = mapping(geometry)

    h3_geom = h3.geo_to_h3shape(geojson_geom)

    hexagons = h3.h3shape_to_cells_experimental(
        h3_geom, resolution, contain=contain
    )

    cls.logger.info(
        f"Generated {len(hexagons)} hexagons using `{contain}` spatial predicate."
    )
    return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)
from_hexagons(hexagons) classmethod

Create H3Hexagons from list of H3 cell IDs.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_hexagons(cls, hexagons: List[str]):
    """Create H3Hexagons from list of H3 cell IDs."""
    if not hexagons:
        cls.logger.warning("No hexagons provided to from_hexagons.")
        return cls(resolution=0, hexagons=[])

    cls.logger.info(
        f"Initializing H3Hexagons from {len(hexagons)} provided hexagons."
    )
    # Get resolution from first hexagon
    resolution = h3.get_resolution(hexagons[0])
    return cls(resolution=resolution, hexagons=list(set(hexagons)))
from_json(data_store, file, **kwargs) classmethod

Load H3Hexagons from a JSON file.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_json(
    cls, data_store: DataStore, file: Union[str, Path], **kwargs
) -> "H3Hexagons":
    """Load H3Hexagons from a JSON file."""
    cls.logger.info(
        f"Loading H3Hexagons from JSON file: {file} using data store: {type(data_store).__name__}"
    )
    with data_store.open(str(file), "r") as f:
        data = json.load(f)
        if isinstance(data, list):  # If file contains only hexagon IDs
            # Get resolution from first hexagon if available
            resolution = h3.get_resolution(data[0]) if data else 0
            data = {
                "resolution": resolution,
                "hexagons": data,
                **kwargs,
            }
        else:
            data.update(kwargs)
        instance = cls(**data)
        instance.data_store = data_store
        cls.logger.info(
            f"Successfully loaded {len(instance.hexagons)} hexagons from JSON file."
        )
        return instance
from_points(points, resolution, **kwargs) classmethod

Create H3Hexagons from a list of points or lat-lon pairs.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_points(
    cls, points: List[Union[Point, Tuple[float, float]]], resolution: int, **kwargs
) -> "H3Hexagons":
    """Create H3Hexagons from a list of points or lat-lon pairs."""
    cls.logger.info(
        f"Creating H3Hexagons from {len(points)} points at resolution: {resolution}"
    )
    hexagons = set(cls.get_hexagons_from_points(points, resolution))
    cls.logger.info(f"Generated {len(hexagons)} unique hexagons from points.")
    return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)
get_children(target_resolution)

Get children hexagons at higher resolution.

Parameters:

Name Type Description Default
target_resolution int

Target resolution (must be higher than current)

required

Returns:

Type Description
H3Hexagons

New H3Hexagons instance with children at target resolution

Source code in gigaspatial/grid/h3.py
def get_children(self, target_resolution: int) -> "H3Hexagons":
    """Get children hexagons at higher resolution.

    Args:
        target_resolution: Target resolution (must be higher than current)

    Returns:
        New H3Hexagons instance with children at target resolution
    """
    if target_resolution <= self.resolution:
        raise ValueError("Target resolution must be higher than current resolution")

    self.logger.info(
        f"Getting children at resolution {target_resolution} for {len(self.hexagons)} hexagons."
    )

    all_children = []
    for hex_id in self.hexagons:
        children = h3.cell_to_children(hex_id, target_resolution)
        all_children.extend(children)

    self.logger.info(f"Generated {len(all_children)} children hexagons.")
    return H3Hexagons(resolution=target_resolution, hexagons=all_children)
get_compact_representation()

Get compact representation by merging adjacent hexagons into parent cells where possible.

Source code in gigaspatial/grid/h3.py
def get_compact_representation(self) -> "H3Hexagons":
    """Get compact representation by merging adjacent hexagons into parent cells where possible."""
    self.logger.info(f"Compacting {len(self.hexagons)} hexagons.")

    # Convert to set for h3.compact
    hex_set = set(self.hexagons)
    compacted = h3.compact_cells(hex_set)

    self.logger.info(f"Compacted to {len(compacted)} hexagons.")

    # Note: compacted representation may have mixed resolutions
    # We'll keep the original resolution as the "target" resolution
    return H3Hexagons(resolution=self.resolution, hexagons=list(compacted))
get_hexagons_from_points(points, resolution) staticmethod

Get list of H3 hexagon IDs for the provided points at specified resolution.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as either shapely Points or (lon, lat) tuples

required
resolution int

H3 resolution level

required

Returns:

Type Description
List[str]

List of H3 hexagon ID strings

Source code in gigaspatial/grid/h3.py
@staticmethod
def get_hexagons_from_points(
    points: List[Union[Point, Tuple[float, float]]], resolution: int
) -> List[str]:
    """Get list of H3 hexagon IDs for the provided points at specified resolution.

    Args:
        points: List of points as either shapely Points or (lon, lat) tuples
        resolution: H3 resolution level

    Returns:
        List of H3 hexagon ID strings
    """
    hexagons = []
    for p in points:
        if isinstance(p, Point):
            # Shapely Point has x=lon, y=lat
            hex_id = h3.latlng_to_cell(p.y, p.x, resolution)
        else:
            # Assume tuple is (lon, lat) - convert to (lat, lon) for h3
            hex_id = h3.latlng_to_cell(p[1], p[0], resolution)
        hexagons.append(hex_id)
    return hexagons
get_neighbors(k=1)

Get k-ring neighbors of all hexagons.

Parameters:

Name Type Description Default
k int

Distance of neighbors (1 for immediate neighbors, 2 for neighbors of neighbors, etc.)

1

Returns:

Type Description
H3Hexagons

New H3Hexagons instance with neighbors included

Source code in gigaspatial/grid/h3.py
def get_neighbors(self, k: int = 1) -> "H3Hexagons":
    """Get k-ring neighbors of all hexagons.

    Args:
        k: Distance of neighbors (1 for immediate neighbors, 2 for neighbors of neighbors, etc.)

    Returns:
        New H3Hexagons instance with neighbors included
    """
    self.logger.info(
        f"Getting k-ring neighbors (k={k}) for {len(self.hexagons)} hexagons."
    )

    all_neighbors = set()
    for hex_id in self.hexagons:
        neighbors = h3.grid_ring(hex_id, k)
        all_neighbors.update(neighbors)

    self.logger.info(
        f"Found {len(all_neighbors)} total hexagons including neighbors."
    )
    return H3Hexagons(resolution=self.resolution, hexagons=list(all_neighbors))
get_parents(target_resolution)

Get parent hexagons at lower resolution.

Parameters:

Name Type Description Default
target_resolution int

Target resolution (must be lower than current)

required

Returns:

Type Description
H3Hexagons

New H3Hexagons instance with parents at target resolution

Source code in gigaspatial/grid/h3.py
def get_parents(self, target_resolution: int) -> "H3Hexagons":
    """Get parent hexagons at lower resolution.

    Args:
        target_resolution: Target resolution (must be lower than current)

    Returns:
        New H3Hexagons instance with parents at target resolution
    """
    if target_resolution >= self.resolution:
        raise ValueError("Target resolution must be lower than current resolution")

    self.logger.info(
        f"Getting parents at resolution {target_resolution} for {len(self.hexagons)} hexagons."
    )

    parents = set()
    for hex_id in self.hexagons:
        parent = h3.cell_to_parent(hex_id, target_resolution)
        parents.add(parent)

    self.logger.info(f"Generated {len(parents)} parent hexagons.")
    return H3Hexagons(resolution=target_resolution, hexagons=list(parents))
save(file, format='json')

Save H3Hexagons to file in specified format.

Source code in gigaspatial/grid/h3.py
def save(self, file: Union[str, Path], format: str = "json") -> None:
    """Save H3Hexagons to file in specified format."""
    with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
        if format == "parquet":
            self.to_geodataframe().to_parquet(f, index=False)
        elif format == "geojson":
            f.write(self.to_geodataframe().to_json(drop_id=True))
        elif format == "json":
            json.dump(self.hexagons, f)
        else:
            raise ValueError(f"Unsupported format: {format}")
to_dataframe()

Convert to pandas DataFrame with hexagon ID and centroid coordinates.

Source code in gigaspatial/grid/h3.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert to pandas DataFrame with hexagon ID and centroid coordinates."""
    self.logger.info(
        f"Converting {len(self.hexagons)} hexagons to pandas DataFrame."
    )
    if not self.hexagons:
        self.logger.warning(
            "No hexagons to convert to DataFrame. Returning empty DataFrame."
        )
        return pd.DataFrame(columns=["hexagon", "latitude", "longitude"])

    centroids = [h3.cell_to_latlng(hex_id) for hex_id in self.hexagons]

    self.logger.info(f"Successfully converted to DataFrame.")

    return pd.DataFrame(
        {
            "hexagon": self.hexagons,
            "latitude": [c[0] for c in centroids],
            "longitude": [c[1] for c in centroids],
        }
    )
to_geodataframe()

Convert to GeoPandas GeoDataFrame.

Source code in gigaspatial/grid/h3.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert to GeoPandas GeoDataFrame."""
    return gpd.GeoDataFrame(
        {"h3": self.hexagons, "geometry": self.to_geoms()}, crs="EPSG:4326"
    )
to_geoms()

Convert hexagons to shapely Polygon geometries.

Source code in gigaspatial/grid/h3.py
def to_geoms(self) -> List[Polygon]:
    """Convert hexagons to shapely Polygon geometries."""
    self.logger.info(
        f"Converting {len(self.hexagons)} hexagons to shapely Polygon geometries."
    )
    return [shape(h3.cells_to_geo([hex_id])) for hex_id in self.hexagons]

mercator_tiles

CountryMercatorTiles

Bases: MercatorTiles

MercatorTiles specialized for country-level operations.

This class extends MercatorTiles to work specifically with country boundaries. It can only be instantiated through the create() classmethod.

Source code in gigaspatial/grid/mercator_tiles.py
class CountryMercatorTiles(MercatorTiles):
    """MercatorTiles specialized for country-level operations.

    This class extends MercatorTiles to work specifically with country boundaries.
    It can only be instantiated through the create() classmethod.
    """

    country: str = Field(..., exclude=True)

    def __init__(self, *args, **kwargs):
        raise TypeError(
            "CountryMercatorTiles cannot be instantiated directly. "
            "Use CountryMercatorTiles.create() instead."
        )

    @classmethod
    def create(
        cls,
        country: str,
        zoom_level: int,
        predicate: str = "intersects",
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ):
        """Create CountryMercatorTiles for a specific country."""
        from gigaspatial.handlers.boundaries import AdminBoundaries

        instance = super().__new__(cls)
        super(CountryMercatorTiles, instance).__init__(
            zoom_level=zoom_level,
            quadkeys=[],
            data_store=data_store or LocalDataStore(),
            country=pycountry.countries.lookup(country).alpha_3,
        )

        cls.logger.info(
            f"Initializing Mercator zones for country: {country} at zoom level {zoom_level}"
        )

        country_geom = (
            AdminBoundaries.create(
                country_code=country,
                data_store=data_store,
                path=country_geom_path,
            )
            .boundaries[0]
            .geometry
        )

        tiles = MercatorTiles.from_geometry(country_geom, zoom_level, predicate)

        instance.quadkeys = tiles.quadkeys
        return instance
create(country, zoom_level, predicate='intersects', data_store=None, country_geom_path=None) classmethod

Create CountryMercatorTiles for a specific country.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def create(
    cls,
    country: str,
    zoom_level: int,
    predicate: str = "intersects",
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
):
    """Create CountryMercatorTiles for a specific country."""
    from gigaspatial.handlers.boundaries import AdminBoundaries

    instance = super().__new__(cls)
    super(CountryMercatorTiles, instance).__init__(
        zoom_level=zoom_level,
        quadkeys=[],
        data_store=data_store or LocalDataStore(),
        country=pycountry.countries.lookup(country).alpha_3,
    )

    cls.logger.info(
        f"Initializing Mercator zones for country: {country} at zoom level {zoom_level}"
    )

    country_geom = (
        AdminBoundaries.create(
            country_code=country,
            data_store=data_store,
            path=country_geom_path,
        )
        .boundaries[0]
        .geometry
    )

    tiles = MercatorTiles.from_geometry(country_geom, zoom_level, predicate)

    instance.quadkeys = tiles.quadkeys
    return instance

MercatorTiles

Bases: BaseModel

Source code in gigaspatial/grid/mercator_tiles.py
class MercatorTiles(BaseModel):
    zoom_level: int = Field(..., ge=0, le=20)
    quadkeys: List[str] = Field(default_factory=list)
    data_store: DataStore = Field(default_factory=LocalDataStore, exclude=True)
    logger: ClassVar = config.get_logger("MercatorTiles")

    class Config:
        arbitrary_types_allowed = True

    @classmethod
    def from_quadkeys(cls, quadkeys: List[str]):
        """Create MercatorTiles from list of quadkeys."""
        if not quadkeys:
            cls.logger.warning("No quadkeys provided to from_quadkeys.")
            return cls(zoom_level=0, quadkeys=[])
        cls.logger.info(
            f"Initializing MercatorTiles from {len(quadkeys)} provided quadkeys."
        )
        return cls(zoom_level=len(quadkeys[0]), quadkeys=set(quadkeys))

    @classmethod
    def from_bounds(
        cls, xmin: float, ymin: float, xmax: float, ymax: float, zoom_level: int
    ):
        """Create MercatorTiles from boundary coordinates."""
        cls.logger.info(
            f"Creating MercatorTiles from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at zoom level: {zoom_level}"
        )
        return cls(
            zoom_level=zoom_level,
            quadkeys=[
                mercantile.quadkey(tile)
                for tile in mercantile.tiles(xmin, ymin, xmax, ymax, zoom_level)
            ],
        )

    @classmethod
    def from_spatial(
        cls,
        source: Union[
            BaseGeometry,
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],  # points
        ],
        zoom_level: int,
        predicate: str = "intersects",
        **kwargs,
    ):
        cls.logger.info(
            f"Creating MercatorTiles from spatial source (type: {type(source)}) at zoom level: {zoom_level} with predicate: {predicate}"
        )
        if isinstance(source, gpd.GeoDataFrame):
            if source.crs != "EPSG:4326":
                source = source.to_crs("EPSG:4326")
            source = source.geometry.unary_union

        if isinstance(source, BaseGeometry):
            return cls.from_geometry(
                geometry=source, zoom_level=zoom_level, predicate=predicate, **kwargs
            )
        elif isinstance(source, Iterable) and all(
            isinstance(pt, Point) or len(pt) == 2 for pt in source
        ):
            return cls.from_points(geometry=source, zoom_level=zoom_level, **kwargs)
        else:
            raise

    @classmethod
    def from_geometry(
        cls,
        geometry: BaseGeometry,
        zoom_level: int,
        predicate: str = "intersects",
        **kwargs,
    ):
        """Create MercatorTiles from a polygon."""
        cls.logger.info(
            f"Creating MercatorTiles from geometry (bounds: {geometry.bounds}) at zoom level: {zoom_level} with predicate: {predicate}"
        )
        tiles = list(mercantile.tiles(*geometry.bounds, zoom_level))
        quadkeys_boxes = [
            (mercantile.quadkey(t), box(*mercantile.bounds(t))) for t in tiles
        ]
        quadkeys, boxes = zip(*quadkeys_boxes) if quadkeys_boxes else ([], [])

        if not boxes:
            cls.logger.warning(
                "No boxes generated from geometry bounds. Returning empty MercatorTiles."
            )
            return MercatorTiles(zoom_level=zoom_level, quadkeys=[])

        s = STRtree(boxes)
        result_indices = s.query(geometry, predicate=predicate)
        filtered_quadkeys = [quadkeys[i] for i in result_indices]
        cls.logger.info(
            f"Filtered down to {len(filtered_quadkeys)} quadkeys using spatial predicate."
        )
        return cls(zoom_level=zoom_level, quadkeys=filtered_quadkeys, **kwargs)

    @classmethod
    def from_points(
        cls, points: List[Union[Point, Tuple[float, float]]], zoom_level: int, **kwargs
    ) -> "MercatorTiles":
        """Create MercatorTiles from a list of points or lat-lon pairs."""
        cls.logger.info(
            f"Creating MercatorTiles from {len(points)} points at zoom level: {zoom_level}"
        )
        quadkeys = set(cls.get_quadkeys_from_points(points, zoom_level))
        cls.logger.info(f"Generated {len(quadkeys)} unique quadkeys from points.")
        return cls(zoom_level=zoom_level, quadkeys=list(quadkeys), **kwargs)

    @classmethod
    def from_json(
        cls, data_store: DataStore, file: Union[str, Path], **kwargs
    ) -> "MercatorTiles":
        """Load MercatorTiles from a JSON file."""
        cls.logger.info(
            f"Loading MercatorTiles from JSON file: {file} using data store: {type(data_store).__name__}"
        )
        with data_store.open(str(file), "r") as f:
            data = json.load(f)
            if isinstance(data, list):  # If file contains only quadkeys
                data = {
                    "zoom_level": len(data[0]) if data else 0,
                    "quadkeys": data,
                    **kwargs,
                }
            else:
                data.update(kwargs)
            instance = cls(**data)
            instance.data_store = data_store
            cls.logger.info(
                f"Successfully loaded {len(instance.quadkeys)} quadkeys from JSON file."
            )
            return instance

    def filter_quadkeys(self, quadkeys: Iterable[str]) -> "MercatorTiles":
        """Filter quadkeys by a given set of quadkeys."""
        original_count = len(self.quadkeys)
        incoming_count = len(
            list(quadkeys)
        )  # Convert to list to get length if it's an iterator

        self.logger.info(
            f"Filtering {original_count} quadkeys with an incoming set of {incoming_count} quadkeys."
        )
        filtered_quadkeys = list(set(self.quadkeys) & set(quadkeys))
        self.logger.info(f"Resulting in {len(filtered_quadkeys)} filtered quadkeys.")
        return MercatorTiles(
            zoom_level=self.zoom_level,
            quadkeys=filtered_quadkeys,
        )

    def to_dataframe(self) -> pd.DataFrame:
        """Convert to pandas DataFrame with quadkey and centroid coordinates."""
        self.logger.info(
            f"Converting {len(self.quadkeys)} quadkeys to pandas DataFrame."
        )
        if not self.quadkeys:
            self.logger.warning(
                "No quadkeys to convert to DataFrame. Returning empty DataFrame."
            )
            return pd.DataFrame(columns=["quadkey", "latitude", "longitude"])
        tiles_data = [mercantile.quadkey_to_tile(q) for q in self.quadkeys]
        bounds_data = [mercantile.bounds(tile) for tile in tiles_data]

        centroids = [
            (
                (bounds.south + bounds.north) / 2,  # latitude
                (bounds.west + bounds.east) / 2,  # longitude
            )
            for bounds in bounds_data
        ]

        self.logger.info(f"Successfully converted to DataFrame.")

        return pd.DataFrame(
            {
                "quadkey": self.quadkeys,
                "latitude": [c[0] for c in centroids],
                "longitude": [c[1] for c in centroids],
            }
        )

    def to_geoms(self) -> List[box]:
        self.logger.info(
            f"Converting {len(self.quadkeys)} quadkeys to shapely box geometries."
        )
        return [
            box(*mercantile.bounds(mercantile.quadkey_to_tile(q)))
            for q in self.quadkeys
        ]

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert to GeoPandas GeoDataFrame."""
        return gpd.GeoDataFrame(
            {"quadkey": self.quadkeys, "geometry": self.to_geoms()}, crs="EPSG:4326"
        )

    @staticmethod
    def get_quadkeys_from_points(
        points: List[Union[Point, Tuple[float, float]]], zoom_level: int
    ) -> List[str]:
        """Get list of quadkeys for the provided points at specified zoom level.

        Args:
            points: List of points as either shapely Points or (lon, lat) tuples
            zoom_level: Zoom level for the quadkeys

        Returns:
            List of quadkey strings
        """
        quadkeys = [
            (
                mercantile.quadkey(mercantile.tile(p.x, p.y, zoom_level))
                if isinstance(p, Point)
                else mercantile.quadkey(mercantile.tile(p[1], p[0], zoom_level))
            )
            for p in points
        ]
        return quadkeys

    def save(self, file: Union[str, Path], format: str = "json") -> None:
        """Save MercatorTiles to file in specified format."""
        with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
            if format == "parquet":
                self.to_geodataframe().to_parquet(f, index=False)
            elif format == "geojson":
                f.write(self.to_geodataframe().to_json(drop_id=True))
            elif format == "json":
                json.dump(self.quadkeys, f)
            else:
                raise ValueError(f"Unsupported format: {format}")

    def __len__(self) -> int:
        return len(self.quadkeys)
filter_quadkeys(quadkeys)

Filter quadkeys by a given set of quadkeys.

Source code in gigaspatial/grid/mercator_tiles.py
def filter_quadkeys(self, quadkeys: Iterable[str]) -> "MercatorTiles":
    """Filter quadkeys by a given set of quadkeys."""
    original_count = len(self.quadkeys)
    incoming_count = len(
        list(quadkeys)
    )  # Convert to list to get length if it's an iterator

    self.logger.info(
        f"Filtering {original_count} quadkeys with an incoming set of {incoming_count} quadkeys."
    )
    filtered_quadkeys = list(set(self.quadkeys) & set(quadkeys))
    self.logger.info(f"Resulting in {len(filtered_quadkeys)} filtered quadkeys.")
    return MercatorTiles(
        zoom_level=self.zoom_level,
        quadkeys=filtered_quadkeys,
    )
from_bounds(xmin, ymin, xmax, ymax, zoom_level) classmethod

Create MercatorTiles from boundary coordinates.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_bounds(
    cls, xmin: float, ymin: float, xmax: float, ymax: float, zoom_level: int
):
    """Create MercatorTiles from boundary coordinates."""
    cls.logger.info(
        f"Creating MercatorTiles from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at zoom level: {zoom_level}"
    )
    return cls(
        zoom_level=zoom_level,
        quadkeys=[
            mercantile.quadkey(tile)
            for tile in mercantile.tiles(xmin, ymin, xmax, ymax, zoom_level)
        ],
    )
from_geometry(geometry, zoom_level, predicate='intersects', **kwargs) classmethod

Create MercatorTiles from a polygon.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_geometry(
    cls,
    geometry: BaseGeometry,
    zoom_level: int,
    predicate: str = "intersects",
    **kwargs,
):
    """Create MercatorTiles from a polygon."""
    cls.logger.info(
        f"Creating MercatorTiles from geometry (bounds: {geometry.bounds}) at zoom level: {zoom_level} with predicate: {predicate}"
    )
    tiles = list(mercantile.tiles(*geometry.bounds, zoom_level))
    quadkeys_boxes = [
        (mercantile.quadkey(t), box(*mercantile.bounds(t))) for t in tiles
    ]
    quadkeys, boxes = zip(*quadkeys_boxes) if quadkeys_boxes else ([], [])

    if not boxes:
        cls.logger.warning(
            "No boxes generated from geometry bounds. Returning empty MercatorTiles."
        )
        return MercatorTiles(zoom_level=zoom_level, quadkeys=[])

    s = STRtree(boxes)
    result_indices = s.query(geometry, predicate=predicate)
    filtered_quadkeys = [quadkeys[i] for i in result_indices]
    cls.logger.info(
        f"Filtered down to {len(filtered_quadkeys)} quadkeys using spatial predicate."
    )
    return cls(zoom_level=zoom_level, quadkeys=filtered_quadkeys, **kwargs)
from_json(data_store, file, **kwargs) classmethod

Load MercatorTiles from a JSON file.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_json(
    cls, data_store: DataStore, file: Union[str, Path], **kwargs
) -> "MercatorTiles":
    """Load MercatorTiles from a JSON file."""
    cls.logger.info(
        f"Loading MercatorTiles from JSON file: {file} using data store: {type(data_store).__name__}"
    )
    with data_store.open(str(file), "r") as f:
        data = json.load(f)
        if isinstance(data, list):  # If file contains only quadkeys
            data = {
                "zoom_level": len(data[0]) if data else 0,
                "quadkeys": data,
                **kwargs,
            }
        else:
            data.update(kwargs)
        instance = cls(**data)
        instance.data_store = data_store
        cls.logger.info(
            f"Successfully loaded {len(instance.quadkeys)} quadkeys from JSON file."
        )
        return instance
from_points(points, zoom_level, **kwargs) classmethod

Create MercatorTiles from a list of points or lat-lon pairs.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_points(
    cls, points: List[Union[Point, Tuple[float, float]]], zoom_level: int, **kwargs
) -> "MercatorTiles":
    """Create MercatorTiles from a list of points or lat-lon pairs."""
    cls.logger.info(
        f"Creating MercatorTiles from {len(points)} points at zoom level: {zoom_level}"
    )
    quadkeys = set(cls.get_quadkeys_from_points(points, zoom_level))
    cls.logger.info(f"Generated {len(quadkeys)} unique quadkeys from points.")
    return cls(zoom_level=zoom_level, quadkeys=list(quadkeys), **kwargs)
from_quadkeys(quadkeys) classmethod

Create MercatorTiles from list of quadkeys.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_quadkeys(cls, quadkeys: List[str]):
    """Create MercatorTiles from list of quadkeys."""
    if not quadkeys:
        cls.logger.warning("No quadkeys provided to from_quadkeys.")
        return cls(zoom_level=0, quadkeys=[])
    cls.logger.info(
        f"Initializing MercatorTiles from {len(quadkeys)} provided quadkeys."
    )
    return cls(zoom_level=len(quadkeys[0]), quadkeys=set(quadkeys))
get_quadkeys_from_points(points, zoom_level) staticmethod

Get list of quadkeys for the provided points at specified zoom level.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as either shapely Points or (lon, lat) tuples

required
zoom_level int

Zoom level for the quadkeys

required

Returns:

Type Description
List[str]

List of quadkey strings

Source code in gigaspatial/grid/mercator_tiles.py
@staticmethod
def get_quadkeys_from_points(
    points: List[Union[Point, Tuple[float, float]]], zoom_level: int
) -> List[str]:
    """Get list of quadkeys for the provided points at specified zoom level.

    Args:
        points: List of points as either shapely Points or (lon, lat) tuples
        zoom_level: Zoom level for the quadkeys

    Returns:
        List of quadkey strings
    """
    quadkeys = [
        (
            mercantile.quadkey(mercantile.tile(p.x, p.y, zoom_level))
            if isinstance(p, Point)
            else mercantile.quadkey(mercantile.tile(p[1], p[0], zoom_level))
        )
        for p in points
    ]
    return quadkeys
save(file, format='json')

Save MercatorTiles to file in specified format.

Source code in gigaspatial/grid/mercator_tiles.py
def save(self, file: Union[str, Path], format: str = "json") -> None:
    """Save MercatorTiles to file in specified format."""
    with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
        if format == "parquet":
            self.to_geodataframe().to_parquet(f, index=False)
        elif format == "geojson":
            f.write(self.to_geodataframe().to_json(drop_id=True))
        elif format == "json":
            json.dump(self.quadkeys, f)
        else:
            raise ValueError(f"Unsupported format: {format}")
to_dataframe()

Convert to pandas DataFrame with quadkey and centroid coordinates.

Source code in gigaspatial/grid/mercator_tiles.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert to pandas DataFrame with quadkey and centroid coordinates."""
    self.logger.info(
        f"Converting {len(self.quadkeys)} quadkeys to pandas DataFrame."
    )
    if not self.quadkeys:
        self.logger.warning(
            "No quadkeys to convert to DataFrame. Returning empty DataFrame."
        )
        return pd.DataFrame(columns=["quadkey", "latitude", "longitude"])
    tiles_data = [mercantile.quadkey_to_tile(q) for q in self.quadkeys]
    bounds_data = [mercantile.bounds(tile) for tile in tiles_data]

    centroids = [
        (
            (bounds.south + bounds.north) / 2,  # latitude
            (bounds.west + bounds.east) / 2,  # longitude
        )
        for bounds in bounds_data
    ]

    self.logger.info(f"Successfully converted to DataFrame.")

    return pd.DataFrame(
        {
            "quadkey": self.quadkeys,
            "latitude": [c[0] for c in centroids],
            "longitude": [c[1] for c in centroids],
        }
    )
to_geodataframe()

Convert to GeoPandas GeoDataFrame.

Source code in gigaspatial/grid/mercator_tiles.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert to GeoPandas GeoDataFrame."""
    return gpd.GeoDataFrame(
        {"quadkey": self.quadkeys, "geometry": self.to_geoms()}, crs="EPSG:4326"
    )

s2

CountryS2Cells

Bases: S2Cells

S2Cells specialized for country-level operations.

This class extends S2Cells to work specifically with country boundaries. It can only be instantiated through the create() classmethod.

Source code in gigaspatial/grid/s2.py
class CountryS2Cells(S2Cells):
    """S2Cells specialized for country-level operations.

    This class extends S2Cells to work specifically with country boundaries.
    It can only be instantiated through the create() classmethod.
    """

    country: str = Field(..., exclude=True)

    def __init__(self, *args, **kwargs):
        raise TypeError(
            "CountryS2Cells cannot be instantiated directly. "
            "Use CountryS2Cells.create() instead."
        )

    @classmethod
    def create(
        cls,
        country: str,
        level: int,
        max_cells: int = 1000,
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ):
        """Create CountryS2Cells for a specific country."""
        from gigaspatial.handlers.boundaries import AdminBoundaries

        instance = super().__new__(cls)
        super(CountryS2Cells, instance).__init__(
            level=level,
            cells=[],
            data_store=data_store or LocalDataStore(),
            country=pycountry.countries.lookup(country).alpha_3,
        )

        cls.logger.info(
            f"Initializing S2 cells for country: {country} at level {level}"
        )

        country_geom = (
            AdminBoundaries.create(
                country_code=country,
                data_store=data_store,
                path=country_geom_path,
            )
            .boundaries[0]
            .geometry
        )

        cells = S2Cells.from_geometry(country_geom, level, max_cells=max_cells)
        instance.cells = cells.cells

        return instance
create(country, level, max_cells=1000, data_store=None, country_geom_path=None) classmethod

Create CountryS2Cells for a specific country.

Source code in gigaspatial/grid/s2.py
@classmethod
def create(
    cls,
    country: str,
    level: int,
    max_cells: int = 1000,
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
):
    """Create CountryS2Cells for a specific country."""
    from gigaspatial.handlers.boundaries import AdminBoundaries

    instance = super().__new__(cls)
    super(CountryS2Cells, instance).__init__(
        level=level,
        cells=[],
        data_store=data_store or LocalDataStore(),
        country=pycountry.countries.lookup(country).alpha_3,
    )

    cls.logger.info(
        f"Initializing S2 cells for country: {country} at level {level}"
    )

    country_geom = (
        AdminBoundaries.create(
            country_code=country,
            data_store=data_store,
            path=country_geom_path,
        )
        .boundaries[0]
        .geometry
    )

    cells = S2Cells.from_geometry(country_geom, level, max_cells=max_cells)
    instance.cells = cells.cells

    return instance

S2Cells

Bases: BaseModel

S2Cells class for generating and managing Google S2 cell grids.

S2 uses levels 0-30, where higher levels represent finer resolution. Level 0 covers the largest area and level 30 the smallest.

Source code in gigaspatial/grid/s2.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
class S2Cells(BaseModel):
    """S2Cells class for generating and managing Google S2 cell grids.

    S2 uses levels 0-30, where higher levels represent finer resolution.
    Level 0 covers the largest area and level 30 the smallest.
    """

    level: int = Field(..., ge=0, le=30)
    cells: List[int] = Field(default_factory=list)  # S2 cell IDs as integers
    data_store: DataStore = Field(default_factory=LocalDataStore, exclude=True)
    logger: ClassVar = config.get_logger("S2Cells")

    class Config:
        arbitrary_types_allowed = True

    @classmethod
    def from_cells(cls, cells: List[Union[int, str]]):
        """Create S2Cells from list of S2 cell IDs (integers or tokens)."""
        if not cells:
            cls.logger.warning("No cells provided to from_cells.")
            return cls(level=0, cells=[])

        cls.logger.info(f"Initializing S2Cells from {len(cells)} provided cells.")

        # Convert tokens to integers if needed
        cell_ids = []
        for cell in cells:
            if isinstance(cell, str):
                cell_ids.append(CellId.from_token(cell).id())
            else:
                cell_ids.append(cell)

        # Get level from first cell
        level = CellId(cell_ids[0]).level()
        return cls(level=level, cells=list(set(cell_ids)))

    @classmethod
    def from_bounds(
        cls,
        xmin: float,
        ymin: float,
        xmax: float,
        ymax: float,
        level: int,
        max_cells: int = 100,
    ):
        """Create S2Cells from boundary coordinates.

        Args:
            xmin, ymin, xmax, ymax: Bounding box coordinates in degrees
            level: S2 level (0-30)
            max_cells: Maximum number of cells to generate
        """
        cls.logger.info(
            f"Creating S2Cells from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at level: {level}"
        )

        # Create a LatLngRect for the bounding box
        rect = LatLngRect(
            LatLng.from_degrees(ymin, xmin), LatLng.from_degrees(ymax, xmax)
        )

        # Use RegionCoverer to get cells
        coverer = RegionCoverer()
        coverer.min_level = level
        coverer.max_level = level
        coverer.max_cells = max_cells

        covering = coverer.get_covering(rect)
        cells = [cell.id() for cell in covering]

        cls.logger.info(f"Generated {len(cells)} cells from bounds.")
        return cls(level=level, cells=cells)

    @classmethod
    def from_spatial(
        cls,
        source: Union[
            BaseGeometry,
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],
        ],
        level: int,
        max_cells: int = 1000,
        **kwargs,
    ):
        """Create S2Cells from various spatial sources."""
        cls.logger.info(
            f"Creating S2Cells from spatial source (type: {type(source)}) at level: {level}"
        )

        if isinstance(source, gpd.GeoDataFrame):
            if source.crs != "EPSG:4326":
                source = source.to_crs("EPSG:4326")
            is_point_series = source.geometry.geom_type == "Point"
            all_are_points = is_point_series.all()
            if all_are_points:
                source = source.geometry.to_list()
            else:
                source = source.geometry.unary_union

        if isinstance(source, BaseGeometry):
            return cls.from_geometry(
                geometry=source, level=level, max_cells=max_cells, **kwargs
            )
        elif isinstance(source, Iterable) and all(
            isinstance(pt, Point) or len(pt) == 2 for pt in source
        ):
            return cls.from_points(points=source, level=level, **kwargs)
        else:
            raise ValueError("Unsupported source type for S2Cells.from_spatial")

    @classmethod
    def from_geometry(
        cls,
        geometry: BaseGeometry,
        level: int,
        max_cells: int = 1000,
        **kwargs,
    ):
        """Create S2Cells from a geometry.

        Args:
            geometry: Shapely geometry
            level: S2 level (0-30)
            max_cells: Maximum number of cells to generate
        """
        cls.logger.info(
            f"Creating S2Cells from geometry (bounds: {geometry.bounds}) at level: {level}"
        )

        if isinstance(geometry, Point):
            return cls.from_points([geometry], level)

        # For polygons and other shapes, use bounding box with RegionCoverer
        # Then filter to actual intersection
        minx, miny, maxx, maxy = geometry.bounds

        rect = LatLngRect(
            LatLng.from_degrees(miny, minx), LatLng.from_degrees(maxy, maxx)
        )

        coverer = RegionCoverer()
        coverer.min_level = level
        coverer.max_level = level
        coverer.max_cells = max_cells

        covering = coverer.get_covering(rect)

        # Filter cells that actually intersect the geometry
        cells = []
        for cell_id in covering:
            cell = Cell(cell_id)
            # Create polygon from cell vertices
            vertices = []
            for i in range(4):
                vertex = cell.get_vertex(i)
                lat_lng = LatLng.from_point(vertex)
                vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))
            vertices.append(vertices[0])  # Close the polygon

            cell_polygon = Polygon(vertices)
            if cell_polygon.intersects(geometry):
                cells.append(cell_id.id())

        cls.logger.info(f"Generated {len(cells)} cells from geometry.")
        return cls(level=level, cells=cells, **kwargs)

    @classmethod
    def from_points(
        cls, points: List[Union[Point, Tuple[float, float]]], level: int, **kwargs
    ) -> "S2Cells":
        """Create S2Cells from a list of points or lat-lon pairs."""
        cls.logger.info(f"Creating S2Cells from {len(points)} points at level: {level}")

        cells = set(cls.get_cells_from_points(points, level))
        cls.logger.info(f"Generated {len(cells)} unique cells from points.")
        return cls(level=level, cells=list(cells), **kwargs)

    @classmethod
    def from_json(
        cls, data_store: DataStore, file: Union[str, Path], **kwargs
    ) -> "S2Cells":
        """Load S2Cells from a JSON file."""
        cls.logger.info(
            f"Loading S2Cells from JSON file: {file} using data store: {type(data_store).__name__}"
        )

        with data_store.open(str(file), "r") as f:
            data = json.load(f)

        if isinstance(data, list):  # If file contains only cell IDs
            # Get level from first cell if available
            level = CellId(data[0]).level() if data else 0
            data = {
                "level": level,
                "cells": data,
                **kwargs,
            }
        else:
            data.update(kwargs)

        instance = cls(**data)
        instance.data_store = data_store
        cls.logger.info(
            f"Successfully loaded {len(instance.cells)} cells from JSON file."
        )
        return instance

    @property
    def average_cell_area(self):
        """Average area of cells at this level in square meters."""
        # Approximate area calculation based on S2 geometry
        # Earth surface area is ~510 trillion square meters
        # Each level quadruples the number of cells
        earth_area = 510_000_000_000_000  # m^2
        num_cells_at_level = 6 * (4**self.level)  # 6 faces, each subdivided
        return earth_area / num_cells_at_level

    def filter_cells(self, cells: Iterable[int]) -> "S2Cells":
        """Filter cells by a given set of cell IDs."""
        original_count = len(self.cells)
        incoming_count = len(list(cells))

        self.logger.info(
            f"Filtering {original_count} cells with an incoming set of {incoming_count} cells."
        )

        filtered_cells = list(set(self.cells) & set(cells))
        self.logger.info(f"Resulting in {len(filtered_cells)} filtered cells.")

        return S2Cells(
            level=self.level,
            cells=filtered_cells,
        )

    def to_dataframe(self) -> pd.DataFrame:
        """Convert to pandas DataFrame with cell ID and centroid coordinates."""
        self.logger.info(f"Converting {len(self.cells)} cells to pandas DataFrame.")

        if not self.cells:
            self.logger.warning(
                "No cells to convert to DataFrame. Returning empty DataFrame."
            )
            return pd.DataFrame(
                columns=["cell_id", "cell_token", "latitude", "longitude"]
            )

        data = []
        for cell_id in self.cells:
            cell = Cell(CellId(cell_id))
            center = LatLng.from_point(cell.get_center())
            data.append(
                {
                    "cell_id": cell_id,
                    "cell_token": CellId(cell_id).to_token(),
                    "latitude": center.lat().degrees,
                    "longitude": center.lng().degrees,
                }
            )

        self.logger.info(f"Successfully converted to DataFrame.")
        return pd.DataFrame(data)

    def to_geoms(self) -> List[Polygon]:
        """Convert cells to shapely Polygon geometries."""
        self.logger.info(
            f"Converting {len(self.cells)} cells to shapely Polygon geometries."
        )

        polygons = []
        for cell_id in self.cells:
            cell = Cell(CellId(cell_id))
            vertices = []
            for i in range(4):
                vertex = cell.get_vertex(i)
                lat_lng = LatLng.from_point(vertex)
                vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))
            vertices.append(vertices[0])  # Close the polygon
            polygons.append(Polygon(vertices))

        return polygons

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert to GeoPandas GeoDataFrame."""
        return gpd.GeoDataFrame(
            {
                "cell_id": self.cells,
                "cell_token": [CellId(c).to_token() for c in self.cells],
                "geometry": self.to_geoms(),
            },
            crs="EPSG:4326",
        )

    @staticmethod
    def get_cells_from_points(
        points: List[Union[Point, Tuple[float, float]]], level: int
    ) -> List[int]:
        """Get list of S2 cell IDs for the provided points at specified level.

        Args:
            points: List of points as either shapely Points or (lon, lat) tuples
            level: S2 level

        Returns:
            List of S2 cell IDs as integers
        """
        cells = []
        for p in points:
            if isinstance(p, Point):
                # Shapely Point has x=lon, y=lat
                lat_lng = LatLng.from_degrees(p.y, p.x)
            else:
                # Assume tuple is (lon, lat)
                lat_lng = LatLng.from_degrees(p[1], p[0])

            cell_id = CellId.from_lat_lng(lat_lng).parent(level)
            cells.append(cell_id.id())

        return cells

    def get_neighbors(self, direct_only: bool = True) -> "S2Cells":
        """Get neighbors of all cells.

        Args:
            direct_only: If True, get only direct edge neighbors (4 per cell).
                        If False, get all 8 neighbors including corners.

        Returns:
            New S2Cells instance with neighbors included
        """
        self.logger.info(
            f"Getting neighbors for {len(self.cells)} cells (direct_only={direct_only})."
        )

        all_neighbors = set()
        for cell_id in self.cells:
            cell = CellId(cell_id)
            # Get edge neighbors
            for i in range(4):
                neighbors = cell.get_edge_neighbors()
                all_neighbors.update([n.id() for n in neighbors])

            if not direct_only:
                # Get corner neighbors
                for i in range(4):
                    vertex_neighbors = cell.get_vertex_neighbors(i)
                    all_neighbors.update([n.id() for n in vertex_neighbors])

        self.logger.info(f"Found {len(all_neighbors)} total cells including neighbors.")

        return S2Cells(level=self.level, cells=list(all_neighbors))

    def get_children(self, target_level: int) -> "S2Cells":
        """Get children cells at higher level.

        Args:
            target_level: Target level (must be higher than current)

        Returns:
            New S2Cells instance with children at target level
        """
        if target_level <= self.level:
            raise ValueError("Target level must be higher than current level")

        self.logger.info(
            f"Getting children at level {target_level} for {len(self.cells)} cells."
        )

        all_children = []
        for cell_id in self.cells:
            cell = CellId(cell_id)
            # Get all children at target level
            child = cell.child_begin(target_level)
            end = cell.child_end(target_level)

            while child != end:
                all_children.append(child.id())
                child = child.next()

        self.logger.info(f"Generated {len(all_children)} children cells.")
        return S2Cells(level=target_level, cells=all_children)

    def get_parents(self, target_level: int) -> "S2Cells":
        """Get parent cells at lower level.

        Args:
            target_level: Target level (must be lower than current)

        Returns:
            New S2Cells instance with parents at target level
        """
        if target_level >= self.level:
            raise ValueError("Target level must be lower than current level")

        self.logger.info(
            f"Getting parents at level {target_level} for {len(self.cells)} cells."
        )

        parents = set()
        for cell_id in self.cells:
            parent = CellId(cell_id).parent(target_level)
            parents.add(parent.id())

        self.logger.info(f"Generated {len(parents)} parent cells.")
        return S2Cells(level=target_level, cells=list(parents))

    def save(self, file: Union[str, Path], format: str = "json") -> None:
        """Save S2Cells to file in specified format."""
        with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
            if format == "parquet":
                self.to_geodataframe().to_parquet(f, index=False)
            elif format == "geojson":
                f.write(self.to_geodataframe().to_json(drop_id=True))
            elif format == "json":
                json.dump(self.cells, f)
            else:
                raise ValueError(f"Unsupported format: {format}")

    def __len__(self) -> int:
        return len(self.cells)
average_cell_area property

Average area of cells at this level in square meters.

filter_cells(cells)

Filter cells by a given set of cell IDs.

Source code in gigaspatial/grid/s2.py
def filter_cells(self, cells: Iterable[int]) -> "S2Cells":
    """Filter cells by a given set of cell IDs."""
    original_count = len(self.cells)
    incoming_count = len(list(cells))

    self.logger.info(
        f"Filtering {original_count} cells with an incoming set of {incoming_count} cells."
    )

    filtered_cells = list(set(self.cells) & set(cells))
    self.logger.info(f"Resulting in {len(filtered_cells)} filtered cells.")

    return S2Cells(
        level=self.level,
        cells=filtered_cells,
    )
from_bounds(xmin, ymin, xmax, ymax, level, max_cells=100) classmethod

Create S2Cells from boundary coordinates.

Parameters:

Name Type Description Default
xmin, (ymin, xmax, ymax)

Bounding box coordinates in degrees

required
level int

S2 level (0-30)

required
max_cells int

Maximum number of cells to generate

100
Source code in gigaspatial/grid/s2.py
@classmethod
def from_bounds(
    cls,
    xmin: float,
    ymin: float,
    xmax: float,
    ymax: float,
    level: int,
    max_cells: int = 100,
):
    """Create S2Cells from boundary coordinates.

    Args:
        xmin, ymin, xmax, ymax: Bounding box coordinates in degrees
        level: S2 level (0-30)
        max_cells: Maximum number of cells to generate
    """
    cls.logger.info(
        f"Creating S2Cells from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at level: {level}"
    )

    # Create a LatLngRect for the bounding box
    rect = LatLngRect(
        LatLng.from_degrees(ymin, xmin), LatLng.from_degrees(ymax, xmax)
    )

    # Use RegionCoverer to get cells
    coverer = RegionCoverer()
    coverer.min_level = level
    coverer.max_level = level
    coverer.max_cells = max_cells

    covering = coverer.get_covering(rect)
    cells = [cell.id() for cell in covering]

    cls.logger.info(f"Generated {len(cells)} cells from bounds.")
    return cls(level=level, cells=cells)
from_cells(cells) classmethod

Create S2Cells from list of S2 cell IDs (integers or tokens).

Source code in gigaspatial/grid/s2.py
@classmethod
def from_cells(cls, cells: List[Union[int, str]]):
    """Create S2Cells from list of S2 cell IDs (integers or tokens)."""
    if not cells:
        cls.logger.warning("No cells provided to from_cells.")
        return cls(level=0, cells=[])

    cls.logger.info(f"Initializing S2Cells from {len(cells)} provided cells.")

    # Convert tokens to integers if needed
    cell_ids = []
    for cell in cells:
        if isinstance(cell, str):
            cell_ids.append(CellId.from_token(cell).id())
        else:
            cell_ids.append(cell)

    # Get level from first cell
    level = CellId(cell_ids[0]).level()
    return cls(level=level, cells=list(set(cell_ids)))
from_geometry(geometry, level, max_cells=1000, **kwargs) classmethod

Create S2Cells from a geometry.

Parameters:

Name Type Description Default
geometry BaseGeometry

Shapely geometry

required
level int

S2 level (0-30)

required
max_cells int

Maximum number of cells to generate

1000
Source code in gigaspatial/grid/s2.py
@classmethod
def from_geometry(
    cls,
    geometry: BaseGeometry,
    level: int,
    max_cells: int = 1000,
    **kwargs,
):
    """Create S2Cells from a geometry.

    Args:
        geometry: Shapely geometry
        level: S2 level (0-30)
        max_cells: Maximum number of cells to generate
    """
    cls.logger.info(
        f"Creating S2Cells from geometry (bounds: {geometry.bounds}) at level: {level}"
    )

    if isinstance(geometry, Point):
        return cls.from_points([geometry], level)

    # For polygons and other shapes, use bounding box with RegionCoverer
    # Then filter to actual intersection
    minx, miny, maxx, maxy = geometry.bounds

    rect = LatLngRect(
        LatLng.from_degrees(miny, minx), LatLng.from_degrees(maxy, maxx)
    )

    coverer = RegionCoverer()
    coverer.min_level = level
    coverer.max_level = level
    coverer.max_cells = max_cells

    covering = coverer.get_covering(rect)

    # Filter cells that actually intersect the geometry
    cells = []
    for cell_id in covering:
        cell = Cell(cell_id)
        # Create polygon from cell vertices
        vertices = []
        for i in range(4):
            vertex = cell.get_vertex(i)
            lat_lng = LatLng.from_point(vertex)
            vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))
        vertices.append(vertices[0])  # Close the polygon

        cell_polygon = Polygon(vertices)
        if cell_polygon.intersects(geometry):
            cells.append(cell_id.id())

    cls.logger.info(f"Generated {len(cells)} cells from geometry.")
    return cls(level=level, cells=cells, **kwargs)
from_json(data_store, file, **kwargs) classmethod

Load S2Cells from a JSON file.

Source code in gigaspatial/grid/s2.py
@classmethod
def from_json(
    cls, data_store: DataStore, file: Union[str, Path], **kwargs
) -> "S2Cells":
    """Load S2Cells from a JSON file."""
    cls.logger.info(
        f"Loading S2Cells from JSON file: {file} using data store: {type(data_store).__name__}"
    )

    with data_store.open(str(file), "r") as f:
        data = json.load(f)

    if isinstance(data, list):  # If file contains only cell IDs
        # Get level from first cell if available
        level = CellId(data[0]).level() if data else 0
        data = {
            "level": level,
            "cells": data,
            **kwargs,
        }
    else:
        data.update(kwargs)

    instance = cls(**data)
    instance.data_store = data_store
    cls.logger.info(
        f"Successfully loaded {len(instance.cells)} cells from JSON file."
    )
    return instance
from_points(points, level, **kwargs) classmethod

Create S2Cells from a list of points or lat-lon pairs.

Source code in gigaspatial/grid/s2.py
@classmethod
def from_points(
    cls, points: List[Union[Point, Tuple[float, float]]], level: int, **kwargs
) -> "S2Cells":
    """Create S2Cells from a list of points or lat-lon pairs."""
    cls.logger.info(f"Creating S2Cells from {len(points)} points at level: {level}")

    cells = set(cls.get_cells_from_points(points, level))
    cls.logger.info(f"Generated {len(cells)} unique cells from points.")
    return cls(level=level, cells=list(cells), **kwargs)
from_spatial(source, level, max_cells=1000, **kwargs) classmethod

Create S2Cells from various spatial sources.

Source code in gigaspatial/grid/s2.py
@classmethod
def from_spatial(
    cls,
    source: Union[
        BaseGeometry,
        gpd.GeoDataFrame,
        List[Union[Point, Tuple[float, float]]],
    ],
    level: int,
    max_cells: int = 1000,
    **kwargs,
):
    """Create S2Cells from various spatial sources."""
    cls.logger.info(
        f"Creating S2Cells from spatial source (type: {type(source)}) at level: {level}"
    )

    if isinstance(source, gpd.GeoDataFrame):
        if source.crs != "EPSG:4326":
            source = source.to_crs("EPSG:4326")
        is_point_series = source.geometry.geom_type == "Point"
        all_are_points = is_point_series.all()
        if all_are_points:
            source = source.geometry.to_list()
        else:
            source = source.geometry.unary_union

    if isinstance(source, BaseGeometry):
        return cls.from_geometry(
            geometry=source, level=level, max_cells=max_cells, **kwargs
        )
    elif isinstance(source, Iterable) and all(
        isinstance(pt, Point) or len(pt) == 2 for pt in source
    ):
        return cls.from_points(points=source, level=level, **kwargs)
    else:
        raise ValueError("Unsupported source type for S2Cells.from_spatial")
get_cells_from_points(points, level) staticmethod

Get list of S2 cell IDs for the provided points at specified level.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as either shapely Points or (lon, lat) tuples

required
level int

S2 level

required

Returns:

Type Description
List[int]

List of S2 cell IDs as integers

Source code in gigaspatial/grid/s2.py
@staticmethod
def get_cells_from_points(
    points: List[Union[Point, Tuple[float, float]]], level: int
) -> List[int]:
    """Get list of S2 cell IDs for the provided points at specified level.

    Args:
        points: List of points as either shapely Points or (lon, lat) tuples
        level: S2 level

    Returns:
        List of S2 cell IDs as integers
    """
    cells = []
    for p in points:
        if isinstance(p, Point):
            # Shapely Point has x=lon, y=lat
            lat_lng = LatLng.from_degrees(p.y, p.x)
        else:
            # Assume tuple is (lon, lat)
            lat_lng = LatLng.from_degrees(p[1], p[0])

        cell_id = CellId.from_lat_lng(lat_lng).parent(level)
        cells.append(cell_id.id())

    return cells
get_children(target_level)

Get children cells at higher level.

Parameters:

Name Type Description Default
target_level int

Target level (must be higher than current)

required

Returns:

Type Description
S2Cells

New S2Cells instance with children at target level

Source code in gigaspatial/grid/s2.py
def get_children(self, target_level: int) -> "S2Cells":
    """Get children cells at higher level.

    Args:
        target_level: Target level (must be higher than current)

    Returns:
        New S2Cells instance with children at target level
    """
    if target_level <= self.level:
        raise ValueError("Target level must be higher than current level")

    self.logger.info(
        f"Getting children at level {target_level} for {len(self.cells)} cells."
    )

    all_children = []
    for cell_id in self.cells:
        cell = CellId(cell_id)
        # Get all children at target level
        child = cell.child_begin(target_level)
        end = cell.child_end(target_level)

        while child != end:
            all_children.append(child.id())
            child = child.next()

    self.logger.info(f"Generated {len(all_children)} children cells.")
    return S2Cells(level=target_level, cells=all_children)
get_neighbors(direct_only=True)

Get neighbors of all cells.

Parameters:

Name Type Description Default
direct_only bool

If True, get only direct edge neighbors (4 per cell). If False, get all 8 neighbors including corners.

True

Returns:

Type Description
S2Cells

New S2Cells instance with neighbors included

Source code in gigaspatial/grid/s2.py
def get_neighbors(self, direct_only: bool = True) -> "S2Cells":
    """Get neighbors of all cells.

    Args:
        direct_only: If True, get only direct edge neighbors (4 per cell).
                    If False, get all 8 neighbors including corners.

    Returns:
        New S2Cells instance with neighbors included
    """
    self.logger.info(
        f"Getting neighbors for {len(self.cells)} cells (direct_only={direct_only})."
    )

    all_neighbors = set()
    for cell_id in self.cells:
        cell = CellId(cell_id)
        # Get edge neighbors
        for i in range(4):
            neighbors = cell.get_edge_neighbors()
            all_neighbors.update([n.id() for n in neighbors])

        if not direct_only:
            # Get corner neighbors
            for i in range(4):
                vertex_neighbors = cell.get_vertex_neighbors(i)
                all_neighbors.update([n.id() for n in vertex_neighbors])

    self.logger.info(f"Found {len(all_neighbors)} total cells including neighbors.")

    return S2Cells(level=self.level, cells=list(all_neighbors))
get_parents(target_level)

Get parent cells at lower level.

Parameters:

Name Type Description Default
target_level int

Target level (must be lower than current)

required

Returns:

Type Description
S2Cells

New S2Cells instance with parents at target level

Source code in gigaspatial/grid/s2.py
def get_parents(self, target_level: int) -> "S2Cells":
    """Get parent cells at lower level.

    Args:
        target_level: Target level (must be lower than current)

    Returns:
        New S2Cells instance with parents at target level
    """
    if target_level >= self.level:
        raise ValueError("Target level must be lower than current level")

    self.logger.info(
        f"Getting parents at level {target_level} for {len(self.cells)} cells."
    )

    parents = set()
    for cell_id in self.cells:
        parent = CellId(cell_id).parent(target_level)
        parents.add(parent.id())

    self.logger.info(f"Generated {len(parents)} parent cells.")
    return S2Cells(level=target_level, cells=list(parents))
save(file, format='json')

Save S2Cells to file in specified format.

Source code in gigaspatial/grid/s2.py
def save(self, file: Union[str, Path], format: str = "json") -> None:
    """Save S2Cells to file in specified format."""
    with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
        if format == "parquet":
            self.to_geodataframe().to_parquet(f, index=False)
        elif format == "geojson":
            f.write(self.to_geodataframe().to_json(drop_id=True))
        elif format == "json":
            json.dump(self.cells, f)
        else:
            raise ValueError(f"Unsupported format: {format}")
to_dataframe()

Convert to pandas DataFrame with cell ID and centroid coordinates.

Source code in gigaspatial/grid/s2.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert to pandas DataFrame with cell ID and centroid coordinates."""
    self.logger.info(f"Converting {len(self.cells)} cells to pandas DataFrame.")

    if not self.cells:
        self.logger.warning(
            "No cells to convert to DataFrame. Returning empty DataFrame."
        )
        return pd.DataFrame(
            columns=["cell_id", "cell_token", "latitude", "longitude"]
        )

    data = []
    for cell_id in self.cells:
        cell = Cell(CellId(cell_id))
        center = LatLng.from_point(cell.get_center())
        data.append(
            {
                "cell_id": cell_id,
                "cell_token": CellId(cell_id).to_token(),
                "latitude": center.lat().degrees,
                "longitude": center.lng().degrees,
            }
        )

    self.logger.info(f"Successfully converted to DataFrame.")
    return pd.DataFrame(data)
to_geodataframe()

Convert to GeoPandas GeoDataFrame.

Source code in gigaspatial/grid/s2.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert to GeoPandas GeoDataFrame."""
    return gpd.GeoDataFrame(
        {
            "cell_id": self.cells,
            "cell_token": [CellId(c).to_token() for c in self.cells],
            "geometry": self.to_geoms(),
        },
        crs="EPSG:4326",
    )
to_geoms()

Convert cells to shapely Polygon geometries.

Source code in gigaspatial/grid/s2.py
def to_geoms(self) -> List[Polygon]:
    """Convert cells to shapely Polygon geometries."""
    self.logger.info(
        f"Converting {len(self.cells)} cells to shapely Polygon geometries."
    )

    polygons = []
    for cell_id in self.cells:
        cell = Cell(CellId(cell_id))
        vertices = []
        for i in range(4):
            vertex = cell.get_vertex(i)
            lat_lng = LatLng.from_point(vertex)
            vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))
        vertices.append(vertices[0])  # Close the polygon
        polygons.append(Polygon(vertices))

    return polygons