Skip to content

Grid Module

gigaspatial.grid

h3

CountryH3Hexagons

Bases: H3Hexagons

H3Hexagons specialized for country-level operations.

Extends H3Hexagons to work specifically with country boundaries retrieved from the Giga administrative boundary dataset.

Note

Instances should be created using the create() factory method.

Attributes:

Name Type Description
country str

ISO 3166-1 alpha-3 country code.

Source code in gigaspatial/grid/h3.py
class CountryH3Hexagons(H3Hexagons):
    """
    H3Hexagons specialized for country-level operations.

    Extends H3Hexagons to work specifically with country boundaries retrieved
    from the Giga administrative boundary dataset.

    Note:
        Instances should be created using the `create()` factory method.

    Attributes:
        country: ISO 3166-1 alpha-3 country code.
    """

    country: str = Field(..., exclude=True)

    def __init__(self, *args, **kwargs):
        raise TypeError(
            "CountryH3Hexagons cannot be instantiated directly. "
            "Use CountryH3Hexagons.create() instead."
        )

    @classmethod
    def create(
        self,
        country: str,
        resolution: int,
        contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ) -> "CountryH3Hexagons":
        """Factory method to create H3Hexagons for a specific country's boundary.

        Args:
            country: ISO country code (3-letter alpha-3) or name.
            resolution: Target H3 resolution (0-15).
            contain: Containment logic for mapping the geometry to cells.
                Defaults to 'overlap'.
            data_store: Optional storage interface for boundary lookup.
            country_geom_path: Optional path override for the boundary file.

        Returns:
            A new CountryH3Hexagons instance fully populated for the country.
        """
        from gigaspatial.handlers.boundaries import AdminBoundaries

        instance = super().__new__(cls)
        super(CountryH3Hexagons, instance).__init__(
            resolution=resolution,
            hexagons=[],
            data_store=data_store or LocalDataStore(),
            country=pycountry.countries.lookup(country).alpha_3,
        )

        cls.logger.info(
            f"Initializing H3 hexagons for country: {country} at resolution {resolution}"
        )

        country_geom = AdminBoundaries.create(
            country_code=country,
            data_store=data_store,
            path=country_geom_path,
        ).to_geoms()[0]

        hexagons = H3Hexagons.from_geometry(country_geom, resolution, contain=contain)

        instance.hexagons = hexagons.hexagons
        return instance
create(country, resolution, contain='overlap', data_store=None, country_geom_path=None) classmethod

Factory method to create H3Hexagons for a specific country's boundary.

Parameters:

Name Type Description Default
country str

ISO country code (3-letter alpha-3) or name.

required
resolution int

Target H3 resolution (0-15).

required
contain Literal['center', 'full', 'overlap', 'bbox_overlap']

Containment logic for mapping the geometry to cells. Defaults to 'overlap'.

'overlap'
data_store Optional[DataStore]

Optional storage interface for boundary lookup.

None
country_geom_path Optional[Union[str, Path]]

Optional path override for the boundary file.

None

Returns:

Type Description
CountryH3Hexagons

A new CountryH3Hexagons instance fully populated for the country.

Source code in gigaspatial/grid/h3.py
@classmethod
def create(
    self,
    country: str,
    resolution: int,
    contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
) -> "CountryH3Hexagons":
    """Factory method to create H3Hexagons for a specific country's boundary.

    Args:
        country: ISO country code (3-letter alpha-3) or name.
        resolution: Target H3 resolution (0-15).
        contain: Containment logic for mapping the geometry to cells.
            Defaults to 'overlap'.
        data_store: Optional storage interface for boundary lookup.
        country_geom_path: Optional path override for the boundary file.

    Returns:
        A new CountryH3Hexagons instance fully populated for the country.
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    instance = super().__new__(cls)
    super(CountryH3Hexagons, instance).__init__(
        resolution=resolution,
        hexagons=[],
        data_store=data_store or LocalDataStore(),
        country=pycountry.countries.lookup(country).alpha_3,
    )

    cls.logger.info(
        f"Initializing H3 hexagons for country: {country} at resolution {resolution}"
    )

    country_geom = AdminBoundaries.create(
        country_code=country,
        data_store=data_store,
        path=country_geom_path,
    ).to_geoms()[0]

    hexagons = H3Hexagons.from_geometry(country_geom, resolution, contain=contain)

    instance.hexagons = hexagons.hexagons
    return instance

H3Hexagons

Bases: BaseModel

Representation of a collection of H3 hexagons at a specific resolution.

Provides utility methods to create, filter, manipulate, and export H3 hexagonal grids for spatial analysis. Handles conversion between coordinate pairs, geometries, and H3 cell IDs.

Attributes:

Name Type Description
resolution int

H3 resolution level (0-15).

hexagons List[str]

List of H3 cell ID strings.

data_store DataStore

Storage interface for I/O operations.

logger ClassVar

Class-level logger.

Source code in gigaspatial/grid/h3.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
class H3Hexagons(BaseModel):
    """
    Representation of a collection of H3 hexagons at a specific resolution.

    Provides utility methods to create, filter, manipulate, and export H3
    hexagonal grids for spatial analysis. Handles conversion between
    coordinate pairs, geometries, and H3 cell IDs.

    Attributes:
        resolution: H3 resolution level (0-15).
        hexagons: List of H3 cell ID strings.
        data_store: Storage interface for I/O operations.
        logger: Class-level logger.
    """

    resolution: int = Field(..., ge=0, le=15)
    hexagons: List[str] = Field(default_factory=list)
    data_store: DataStore = Field(default_factory=LocalDataStore, exclude=True)
    logger: ClassVar = config.get_logger("H3Hexagons")

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @classmethod
    def from_hexagons(cls, hexagons: List[str]) -> "H3Hexagons":
        """
        Create H3Hexagons from a list of H3 cell IDs.

        Args:
            hexagons: List of H3 cell hashes.

        Returns:
            A new H3Hexagons instance.
        """
        if not hexagons:
            cls.logger.warning("No hexagons provided to from_hexagons.")
            return cls(resolution=0, hexagons=[])

        cls.logger.info(
            f"Initializing H3Hexagons from {len(hexagons)} provided hexagons."
        )
        # Get resolution from first hexagon
        resolution = h3.get_resolution(hexagons[0])
        return cls(resolution=resolution, hexagons=list(set(hexagons)))

    @classmethod
    def from_bounds(
        cls, xmin: float, ymin: float, xmax: float, ymax: float, resolution: int
    ) -> "H3Hexagons":
        """
        Create H3Hexagons covering the specified geographic bounding box.

        Args:
            xmin: Minimum longitude.
            ymin: Minimum latitude.
            xmax: Maximum longitude.
            ymax: Maximum latitude.
            resolution: H3 resolution level.

        Returns:
            A new H3Hexagons instance covering the bounds.
        """
        cls.logger.info(
            f"Creating H3Hexagons from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at resolution: {resolution}"
        )

        # Create a LatLong bounding box polygon
        latlong_bbox_coords = [
            [ymin, xmin],
            [ymax, xmin],
            [ymax, xmax],
            [ymin, xmax],
            [ymin, xmin],
        ]

        # Get H3 cells that intersect with the bounding box
        poly = h3.LatLngPoly(latlong_bbox_coords)
        hexagons = h3.h3shape_to_cells(poly, res=resolution)

        return cls(resolution=resolution, hexagons=list(hexagons))

    @classmethod
    def from_spatial(
        cls,
        source: Union[
            BaseGeometry,
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],
        ],
        resolution: int,
        contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
        **kwargs,
    ) -> "H3Hexagons":
        """
        Factory method to create H3Hexagons from various spatial sources.

        Handles GeoDataFrames, Shapely geometries, and lists of coordinate points.

        Args:
            source: Spatial data source.
            resolution: Target H3 resolution.
            contain: H3 spatial containment logic.
            **kwargs: Forwarded to specific factory methods (e.g., from_geometry).

        Returns:
            A new H3Hexagons instance.
        """
        cls.logger.info(
            f"Creating H3Hexagons from spatial source (type: {type(source)}) at resolution: {resolution} with predicate: {contain}"
        )
        if isinstance(source, gpd.GeoDataFrame):
            if source.crs != "EPSG:4326":
                source = source.to_crs("EPSG:4326")

            is_point_series = source.geometry.geom_type == "Point"
            all_are_points = is_point_series.all()

            if all_are_points:
                source = source.geometry.to_list()
            else:
                source = source.geometry.unary_union

        if isinstance(source, BaseGeometry):
            return cls.from_geometry(
                geometry=source, resolution=resolution, contain=contain, **kwargs
            )
        elif isinstance(source, Iterable) and all(
            isinstance(pt, Point) or len(pt) == 2 for pt in source
        ):
            return cls.from_points(points=source, resolution=resolution, **kwargs)
        else:
            raise ValueError("Unsupported source type for H3Hexagons.from_spatial")

    @classmethod
    def from_geometry(
        cls,
        geometry: BaseGeometry,
        resolution: int,
        contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
        **kwargs,
    ) -> "H3Hexagons":
        """
        Create H3Hexagons from a Shapely geometry.

        Args:
            geometry: Input geometry (Polygon, MultiPolygon, etc.).
            resolution: H3 resolution level.
            contain: Containment logic for H3 cell generation.
            **kwargs: Additional metadata parameters.

        Returns:
            H3Hexagons instance covering the geometry.
        """
        cls.logger.info(
            f"Creating H3Hexagons from geometry (bounds: {geometry.bounds}) at resolution: {resolution} with predicate: {contain}"
        )

        if isinstance(geometry, Point):
            return cls.from_points([geometry])

        # Convert shapely geometry to GeoJSON-like format
        if hasattr(geometry, "__geo_interface__"):
            geojson_geom = geometry.__geo_interface__
        else:
            # Fallback for complex geometries
            import json
            from shapely.geometry import mapping

            geojson_geom = mapping(geometry)

        h3_geom = h3.geo_to_h3shape(geojson_geom)

        hexagons = h3.h3shape_to_cells_experimental(
            h3_geom, resolution, contain=contain
        )

        cls.logger.info(
            f"Generated {len(hexagons)} hexagons using `{contain}` spatial predicate."
        )
        return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)

    @classmethod
    def from_points(
        cls, points: List[Union[Point, Tuple[float, float]]], resolution: int, **kwargs
    ) -> "H3Hexagons":
        """
        Create H3Hexagons from a list of point coordinates.

        Args:
            points: List of points as Shapely Points or (lon, lat) tuples.
            resolution: H3 resolution level.
            **kwargs: Additional metadata parameters.

        Returns:
            H3Hexagons instance containing cells for all points.
        """
        cls.logger.info(
            f"Creating H3Hexagons from {len(points)} points at resolution: {resolution}"
        )
        hexagons = set(cls.get_hexagons_from_points(points, resolution))
        cls.logger.info(f"Generated {len(hexagons)} unique hexagons from points.")
        return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)

    @classmethod
    def from_json(
        cls, data_store: DataStore, file: Union[str, Path], **kwargs
    ) -> "H3Hexagons":
        """
        Load H3Hexagons from a JSON file in storage.

        Args:
            data_store: DataStore instance for the file.
            file: Path to the JSON file.
            **kwargs: Metadata overrides.

        Returns:
            H3Hexagons instance loaded from file.
        """
        cls.logger.info(
            f"Loading H3Hexagons from JSON file: {file} using data store: {type(data_store).__name__}"
        )
        with data_store.open(str(file), "r") as f:
            data = json.load(f)
            if isinstance(data, list):  # If file contains only hexagon IDs
                # Get resolution from first hexagon if available
                resolution = h3.get_resolution(data[0]) if data else 0
                data = {
                    "resolution": resolution,
                    "hexagons": data,
                    **kwargs,
                }
            else:
                data.update(kwargs)
            instance = cls(**data)
            instance.data_store = data_store
            cls.logger.info(
                f"Successfully loaded {len(instance.hexagons)} hexagons from JSON file."
            )
            return instance

    @property
    def average_hexagon_area(self) -> float:
        """Calculates the average area of hexagons at the current resolution.

        Returns:
            Average area in square kilometers.
        """
        return h3.average_hexagon_area(self.resolution)

    @property
    def average_hexagon_edge_length(self) -> float:
        """Calculates the average edge length of hexagons at the current resolution.

        Returns:
            Average edge length in kilometers.
        """
        return h3.average_hexagon_edge_length(self.resolution)

    def filter_hexagons(self, hexagons: Iterable[str]) -> "H3Hexagons":
        """Filters the current collection against a provided set of hexagon IDs.

        Args:
            hexagons: An iterable of H3 cell ID strings to keep.

        Returns:
            A new H3Hexagons instance containing only the intersecting cells.
        """
        original_count = len(self.hexagons)
        incoming_count = len(
            list(hexagons)
        )  # Convert to list to get length if it's an iterator

        self.logger.info(
            f"Filtering {original_count} hexagons with an incoming set of {incoming_count} hexagons."
        )
        filtered_hexagons = list(set(self.hexagons) & set(hexagons))
        self.logger.info(f"Resulting in {len(filtered_hexagons)} filtered hexagons.")
        return H3Hexagons(
            resolution=self.resolution,
            hexagons=filtered_hexagons,
        )

    def to_dataframe(self) -> pd.DataFrame:
        """Converts the hexagon collection to a pandas DataFrame.

        Returns:
            A DataFrame with 'hexagon', 'latitude', and 'longitude' columns.
        """
        self.logger.info(
            f"Converting {len(self.hexagons)} hexagons to pandas DataFrame."
        )
        if not self.hexagons:
            self.logger.warning(
                "No hexagons to convert to DataFrame. Returning empty DataFrame."
            )
            return pd.DataFrame(columns=["hexagon", "latitude", "longitude"])

        centroids = [h3.cell_to_latlng(hex_id) for hex_id in self.hexagons]

        self.logger.info(f"Successfully converted to DataFrame.")

        return pd.DataFrame(
            {
                "hexagon": self.hexagons,
                "latitude": [c[0] for c in centroids],
                "longitude": [c[1] for c in centroids],
            }
        )

    def to_geoms(self) -> List[Polygon]:
        """Converts hexagons to a list of Shapely Polygon geometries.

        Returns:
            A list of Polygons representing the cell boundaries.
        """
        self.logger.info(
            f"Converting {len(self.hexagons)} hexagons to shapely Polygon geometries."
        )
        return [shape(h3.cells_to_geo([hex_id])) for hex_id in self.hexagons]

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Converts the hexagon collection to a GeoPandas GeoDataFrame.

        Returns:
            A GeoDataFrame with 'h3' and 'geometry' columns.
        """
        return gpd.GeoDataFrame(
            {"h3": self.hexagons, "geometry": self.to_geoms()}, crs="EPSG:4326"
        )

    @staticmethod
    def get_hexagons_from_points(
        points: List[Union[Point, Tuple[float, float]]], resolution: int
    ) -> List[str]:
        """Get list of H3 hexagon IDs for the provided points at specified resolution.

        Args:
            points: List of points as either shapely Points or (lon, lat) tuples
            resolution: H3 resolution level

        Returns:
            List of H3 hexagon ID strings
        """
        hexagons = []
        for p in points:
            if isinstance(p, Point):
                # Shapely Point has x=lon, y=lat
                hex_id = h3.latlng_to_cell(p.y, p.x, resolution)
            else:
                # Assume tuple is (lon, lat) - convert to (lat, lon) for h3
                hex_id = h3.latlng_to_cell(p[1], p[0], resolution)
            hexagons.append(hex_id)
        return hexagons

    def get_neighbors(self, k: int = 1) -> "H3Hexagons":
        """Calculates k-ring neighbors for all hexagons in the collection.

        Args:
            k: The distance of neighbors to retrieve (1 for immediate neighbors).
                Defaults to 1.

        Returns:
            A new H3Hexagons instance containing the neighbors.
        """
        self.logger.info(
            f"Getting k-ring neighbors (k={k}) for {len(self.hexagons)} hexagons."
        )

        all_neighbors = set()
        for hex_id in self.hexagons:
            neighbors = h3.grid_ring(hex_id, k)
            all_neighbors.update(neighbors)

        self.logger.info(
            f"Found {len(all_neighbors)} total hexagons including neighbors."
        )
        return H3Hexagons(resolution=self.resolution, hexagons=list(all_neighbors))

    def get_compact_representation(self) -> "H3Hexagons":
        """Merges adjacent hexagons into parent cells where possible (compacting).

        Returns:
            A new H3Hexagons instance in compacted form. Note that this may
            result in a set containing cells of multiple resolutions.
        """
        self.logger.info(f"Compacting {len(self.hexagons)} hexagons.")

        # Convert to set for h3.compact
        hex_set = set(self.hexagons)
        compacted = h3.compact_cells(hex_set)

        self.logger.info(f"Compacted to {len(compacted)} hexagons.")

        # Note: compacted representation may have mixed resolutions
        # We'll keep the original resolution as the "target" resolution
        return H3Hexagons(resolution=self.resolution, hexagons=list(compacted))

    def get_children(self, target_resolution: int) -> "H3Hexagons":
        """Generates all child hexagons at a higher resolution.

        Args:
            target_resolution: The target H3 resolution (must be > current).

        Returns:
            A new H3Hexagons instance with children at the target resolution.

        Raises:
            ValueError: If `target_resolution` is not higher than current resolution.
        """
        if target_resolution <= self.resolution:
            raise ValueError("Target resolution must be higher than current resolution")

        self.logger.info(
            f"Getting children at resolution {target_resolution} for {len(self.hexagons)} hexagons."
        )

        all_children = []
        for hex_id in self.hexagons:
            children = h3.cell_to_children(hex_id, target_resolution)
            all_children.extend(children)

        self.logger.info(f"Generated {len(all_children)} children hexagons.")
        return H3Hexagons(resolution=target_resolution, hexagons=all_children)

    def get_parents(self, target_resolution: int) -> "H3Hexagons":
        """Retrieves parent hexagons at a lower resolution.

        Args:
            target_resolution: The target H3 resolution (must be < current).

        Returns:
            A new H3Hexagons instance with parents at the target resolution.

        Raises:
            ValueError: If `target_resolution` is not lower than current resolution.
        """
        if target_resolution >= self.resolution:
            raise ValueError("Target resolution must be lower than current resolution")

        self.logger.info(
            f"Getting parents at resolution {target_resolution} for {len(self.hexagons)} hexagons."
        )

        parents = set()
        for hex_id in self.hexagons:
            parent = h3.cell_to_parent(hex_id, target_resolution)
            parents.add(parent)

        self.logger.info(f"Generated {len(parents)} parent hexagons.")
        return H3Hexagons(resolution=target_resolution, hexagons=list(parents))

    def save(self, file: Union[str, Path], format: str = "json") -> None:
        """Saves the H3Hexagons collection to persistent storage.

        Args:
            file: The destination file path.
            format: The output format. Supported: 'json', 'parquet', 'geojson'.
                Defaults to 'json'.

        Raises:
            ValueError: If an unsupported format is provided.
        """
        with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
            if format == "parquet":
                self.to_geodataframe().to_parquet(f, index=False)
            elif format == "geojson":
                f.write(self.to_geodataframe().to_json(drop_id=True))
            elif format == "json":
                json.dump(self.hexagons, f)
            else:
                raise ValueError(f"Unsupported format: {format}")

    def __len__(self) -> int:
        return len(self.hexagons)
average_hexagon_area: float property

Calculates the average area of hexagons at the current resolution.

Returns:

Type Description
float

Average area in square kilometers.

average_hexagon_edge_length: float property

Calculates the average edge length of hexagons at the current resolution.

Returns:

Type Description
float

Average edge length in kilometers.

filter_hexagons(hexagons)

Filters the current collection against a provided set of hexagon IDs.

Parameters:

Name Type Description Default
hexagons Iterable[str]

An iterable of H3 cell ID strings to keep.

required

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance containing only the intersecting cells.

Source code in gigaspatial/grid/h3.py
def filter_hexagons(self, hexagons: Iterable[str]) -> "H3Hexagons":
    """Filters the current collection against a provided set of hexagon IDs.

    Args:
        hexagons: An iterable of H3 cell ID strings to keep.

    Returns:
        A new H3Hexagons instance containing only the intersecting cells.
    """
    original_count = len(self.hexagons)
    incoming_count = len(
        list(hexagons)
    )  # Convert to list to get length if it's an iterator

    self.logger.info(
        f"Filtering {original_count} hexagons with an incoming set of {incoming_count} hexagons."
    )
    filtered_hexagons = list(set(self.hexagons) & set(hexagons))
    self.logger.info(f"Resulting in {len(filtered_hexagons)} filtered hexagons.")
    return H3Hexagons(
        resolution=self.resolution,
        hexagons=filtered_hexagons,
    )
from_bounds(xmin, ymin, xmax, ymax, resolution) classmethod

Create H3Hexagons covering the specified geographic bounding box.

Parameters:

Name Type Description Default
xmin float

Minimum longitude.

required
ymin float

Minimum latitude.

required
xmax float

Maximum longitude.

required
ymax float

Maximum latitude.

required
resolution int

H3 resolution level.

required

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance covering the bounds.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_bounds(
    cls, xmin: float, ymin: float, xmax: float, ymax: float, resolution: int
) -> "H3Hexagons":
    """
    Create H3Hexagons covering the specified geographic bounding box.

    Args:
        xmin: Minimum longitude.
        ymin: Minimum latitude.
        xmax: Maximum longitude.
        ymax: Maximum latitude.
        resolution: H3 resolution level.

    Returns:
        A new H3Hexagons instance covering the bounds.
    """
    cls.logger.info(
        f"Creating H3Hexagons from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at resolution: {resolution}"
    )

    # Create a LatLong bounding box polygon
    latlong_bbox_coords = [
        [ymin, xmin],
        [ymax, xmin],
        [ymax, xmax],
        [ymin, xmax],
        [ymin, xmin],
    ]

    # Get H3 cells that intersect with the bounding box
    poly = h3.LatLngPoly(latlong_bbox_coords)
    hexagons = h3.h3shape_to_cells(poly, res=resolution)

    return cls(resolution=resolution, hexagons=list(hexagons))
from_geometry(geometry, resolution, contain='overlap', **kwargs) classmethod

Create H3Hexagons from a Shapely geometry.

Parameters:

Name Type Description Default
geometry BaseGeometry

Input geometry (Polygon, MultiPolygon, etc.).

required
resolution int

H3 resolution level.

required
contain Literal['center', 'full', 'overlap', 'bbox_overlap']

Containment logic for H3 cell generation.

'overlap'
**kwargs

Additional metadata parameters.

{}

Returns:

Type Description
H3Hexagons

H3Hexagons instance covering the geometry.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_geometry(
    cls,
    geometry: BaseGeometry,
    resolution: int,
    contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
    **kwargs,
) -> "H3Hexagons":
    """
    Create H3Hexagons from a Shapely geometry.

    Args:
        geometry: Input geometry (Polygon, MultiPolygon, etc.).
        resolution: H3 resolution level.
        contain: Containment logic for H3 cell generation.
        **kwargs: Additional metadata parameters.

    Returns:
        H3Hexagons instance covering the geometry.
    """
    cls.logger.info(
        f"Creating H3Hexagons from geometry (bounds: {geometry.bounds}) at resolution: {resolution} with predicate: {contain}"
    )

    if isinstance(geometry, Point):
        return cls.from_points([geometry])

    # Convert shapely geometry to GeoJSON-like format
    if hasattr(geometry, "__geo_interface__"):
        geojson_geom = geometry.__geo_interface__
    else:
        # Fallback for complex geometries
        import json
        from shapely.geometry import mapping

        geojson_geom = mapping(geometry)

    h3_geom = h3.geo_to_h3shape(geojson_geom)

    hexagons = h3.h3shape_to_cells_experimental(
        h3_geom, resolution, contain=contain
    )

    cls.logger.info(
        f"Generated {len(hexagons)} hexagons using `{contain}` spatial predicate."
    )
    return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)
from_hexagons(hexagons) classmethod

Create H3Hexagons from a list of H3 cell IDs.

Parameters:

Name Type Description Default
hexagons List[str]

List of H3 cell hashes.

required

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_hexagons(cls, hexagons: List[str]) -> "H3Hexagons":
    """
    Create H3Hexagons from a list of H3 cell IDs.

    Args:
        hexagons: List of H3 cell hashes.

    Returns:
        A new H3Hexagons instance.
    """
    if not hexagons:
        cls.logger.warning("No hexagons provided to from_hexagons.")
        return cls(resolution=0, hexagons=[])

    cls.logger.info(
        f"Initializing H3Hexagons from {len(hexagons)} provided hexagons."
    )
    # Get resolution from first hexagon
    resolution = h3.get_resolution(hexagons[0])
    return cls(resolution=resolution, hexagons=list(set(hexagons)))
from_json(data_store, file, **kwargs) classmethod

Load H3Hexagons from a JSON file in storage.

Parameters:

Name Type Description Default
data_store DataStore

DataStore instance for the file.

required
file Union[str, Path]

Path to the JSON file.

required
**kwargs

Metadata overrides.

{}

Returns:

Type Description
H3Hexagons

H3Hexagons instance loaded from file.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_json(
    cls, data_store: DataStore, file: Union[str, Path], **kwargs
) -> "H3Hexagons":
    """
    Load H3Hexagons from a JSON file in storage.

    Args:
        data_store: DataStore instance for the file.
        file: Path to the JSON file.
        **kwargs: Metadata overrides.

    Returns:
        H3Hexagons instance loaded from file.
    """
    cls.logger.info(
        f"Loading H3Hexagons from JSON file: {file} using data store: {type(data_store).__name__}"
    )
    with data_store.open(str(file), "r") as f:
        data = json.load(f)
        if isinstance(data, list):  # If file contains only hexagon IDs
            # Get resolution from first hexagon if available
            resolution = h3.get_resolution(data[0]) if data else 0
            data = {
                "resolution": resolution,
                "hexagons": data,
                **kwargs,
            }
        else:
            data.update(kwargs)
        instance = cls(**data)
        instance.data_store = data_store
        cls.logger.info(
            f"Successfully loaded {len(instance.hexagons)} hexagons from JSON file."
        )
        return instance
from_points(points, resolution, **kwargs) classmethod

Create H3Hexagons from a list of point coordinates.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as Shapely Points or (lon, lat) tuples.

required
resolution int

H3 resolution level.

required
**kwargs

Additional metadata parameters.

{}

Returns:

Type Description
H3Hexagons

H3Hexagons instance containing cells for all points.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_points(
    cls, points: List[Union[Point, Tuple[float, float]]], resolution: int, **kwargs
) -> "H3Hexagons":
    """
    Create H3Hexagons from a list of point coordinates.

    Args:
        points: List of points as Shapely Points or (lon, lat) tuples.
        resolution: H3 resolution level.
        **kwargs: Additional metadata parameters.

    Returns:
        H3Hexagons instance containing cells for all points.
    """
    cls.logger.info(
        f"Creating H3Hexagons from {len(points)} points at resolution: {resolution}"
    )
    hexagons = set(cls.get_hexagons_from_points(points, resolution))
    cls.logger.info(f"Generated {len(hexagons)} unique hexagons from points.")
    return cls(resolution=resolution, hexagons=list(hexagons), **kwargs)
from_spatial(source, resolution, contain='overlap', **kwargs) classmethod

Factory method to create H3Hexagons from various spatial sources.

Handles GeoDataFrames, Shapely geometries, and lists of coordinate points.

Parameters:

Name Type Description Default
source Union[BaseGeometry, GeoDataFrame, List[Union[Point, Tuple[float, float]]]]

Spatial data source.

required
resolution int

Target H3 resolution.

required
contain Literal['center', 'full', 'overlap', 'bbox_overlap']

H3 spatial containment logic.

'overlap'
**kwargs

Forwarded to specific factory methods (e.g., from_geometry).

{}

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance.

Source code in gigaspatial/grid/h3.py
@classmethod
def from_spatial(
    cls,
    source: Union[
        BaseGeometry,
        gpd.GeoDataFrame,
        List[Union[Point, Tuple[float, float]]],
    ],
    resolution: int,
    contain: Literal["center", "full", "overlap", "bbox_overlap"] = "overlap",
    **kwargs,
) -> "H3Hexagons":
    """
    Factory method to create H3Hexagons from various spatial sources.

    Handles GeoDataFrames, Shapely geometries, and lists of coordinate points.

    Args:
        source: Spatial data source.
        resolution: Target H3 resolution.
        contain: H3 spatial containment logic.
        **kwargs: Forwarded to specific factory methods (e.g., from_geometry).

    Returns:
        A new H3Hexagons instance.
    """
    cls.logger.info(
        f"Creating H3Hexagons from spatial source (type: {type(source)}) at resolution: {resolution} with predicate: {contain}"
    )
    if isinstance(source, gpd.GeoDataFrame):
        if source.crs != "EPSG:4326":
            source = source.to_crs("EPSG:4326")

        is_point_series = source.geometry.geom_type == "Point"
        all_are_points = is_point_series.all()

        if all_are_points:
            source = source.geometry.to_list()
        else:
            source = source.geometry.unary_union

    if isinstance(source, BaseGeometry):
        return cls.from_geometry(
            geometry=source, resolution=resolution, contain=contain, **kwargs
        )
    elif isinstance(source, Iterable) and all(
        isinstance(pt, Point) or len(pt) == 2 for pt in source
    ):
        return cls.from_points(points=source, resolution=resolution, **kwargs)
    else:
        raise ValueError("Unsupported source type for H3Hexagons.from_spatial")
get_children(target_resolution)

Generates all child hexagons at a higher resolution.

Parameters:

Name Type Description Default
target_resolution int

The target H3 resolution (must be > current).

required

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance with children at the target resolution.

Raises:

Type Description
ValueError

If target_resolution is not higher than current resolution.

Source code in gigaspatial/grid/h3.py
def get_children(self, target_resolution: int) -> "H3Hexagons":
    """Generates all child hexagons at a higher resolution.

    Args:
        target_resolution: The target H3 resolution (must be > current).

    Returns:
        A new H3Hexagons instance with children at the target resolution.

    Raises:
        ValueError: If `target_resolution` is not higher than current resolution.
    """
    if target_resolution <= self.resolution:
        raise ValueError("Target resolution must be higher than current resolution")

    self.logger.info(
        f"Getting children at resolution {target_resolution} for {len(self.hexagons)} hexagons."
    )

    all_children = []
    for hex_id in self.hexagons:
        children = h3.cell_to_children(hex_id, target_resolution)
        all_children.extend(children)

    self.logger.info(f"Generated {len(all_children)} children hexagons.")
    return H3Hexagons(resolution=target_resolution, hexagons=all_children)
get_compact_representation()

Merges adjacent hexagons into parent cells where possible (compacting).

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance in compacted form. Note that this may

H3Hexagons

result in a set containing cells of multiple resolutions.

Source code in gigaspatial/grid/h3.py
def get_compact_representation(self) -> "H3Hexagons":
    """Merges adjacent hexagons into parent cells where possible (compacting).

    Returns:
        A new H3Hexagons instance in compacted form. Note that this may
        result in a set containing cells of multiple resolutions.
    """
    self.logger.info(f"Compacting {len(self.hexagons)} hexagons.")

    # Convert to set for h3.compact
    hex_set = set(self.hexagons)
    compacted = h3.compact_cells(hex_set)

    self.logger.info(f"Compacted to {len(compacted)} hexagons.")

    # Note: compacted representation may have mixed resolutions
    # We'll keep the original resolution as the "target" resolution
    return H3Hexagons(resolution=self.resolution, hexagons=list(compacted))
get_hexagons_from_points(points, resolution) staticmethod

Get list of H3 hexagon IDs for the provided points at specified resolution.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as either shapely Points or (lon, lat) tuples

required
resolution int

H3 resolution level

required

Returns:

Type Description
List[str]

List of H3 hexagon ID strings

Source code in gigaspatial/grid/h3.py
@staticmethod
def get_hexagons_from_points(
    points: List[Union[Point, Tuple[float, float]]], resolution: int
) -> List[str]:
    """Get list of H3 hexagon IDs for the provided points at specified resolution.

    Args:
        points: List of points as either shapely Points or (lon, lat) tuples
        resolution: H3 resolution level

    Returns:
        List of H3 hexagon ID strings
    """
    hexagons = []
    for p in points:
        if isinstance(p, Point):
            # Shapely Point has x=lon, y=lat
            hex_id = h3.latlng_to_cell(p.y, p.x, resolution)
        else:
            # Assume tuple is (lon, lat) - convert to (lat, lon) for h3
            hex_id = h3.latlng_to_cell(p[1], p[0], resolution)
        hexagons.append(hex_id)
    return hexagons
get_neighbors(k=1)

Calculates k-ring neighbors for all hexagons in the collection.

Parameters:

Name Type Description Default
k int

The distance of neighbors to retrieve (1 for immediate neighbors). Defaults to 1.

1

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance containing the neighbors.

Source code in gigaspatial/grid/h3.py
def get_neighbors(self, k: int = 1) -> "H3Hexagons":
    """Calculates k-ring neighbors for all hexagons in the collection.

    Args:
        k: The distance of neighbors to retrieve (1 for immediate neighbors).
            Defaults to 1.

    Returns:
        A new H3Hexagons instance containing the neighbors.
    """
    self.logger.info(
        f"Getting k-ring neighbors (k={k}) for {len(self.hexagons)} hexagons."
    )

    all_neighbors = set()
    for hex_id in self.hexagons:
        neighbors = h3.grid_ring(hex_id, k)
        all_neighbors.update(neighbors)

    self.logger.info(
        f"Found {len(all_neighbors)} total hexagons including neighbors."
    )
    return H3Hexagons(resolution=self.resolution, hexagons=list(all_neighbors))
get_parents(target_resolution)

Retrieves parent hexagons at a lower resolution.

Parameters:

Name Type Description Default
target_resolution int

The target H3 resolution (must be < current).

required

Returns:

Type Description
H3Hexagons

A new H3Hexagons instance with parents at the target resolution.

Raises:

Type Description
ValueError

If target_resolution is not lower than current resolution.

Source code in gigaspatial/grid/h3.py
def get_parents(self, target_resolution: int) -> "H3Hexagons":
    """Retrieves parent hexagons at a lower resolution.

    Args:
        target_resolution: The target H3 resolution (must be < current).

    Returns:
        A new H3Hexagons instance with parents at the target resolution.

    Raises:
        ValueError: If `target_resolution` is not lower than current resolution.
    """
    if target_resolution >= self.resolution:
        raise ValueError("Target resolution must be lower than current resolution")

    self.logger.info(
        f"Getting parents at resolution {target_resolution} for {len(self.hexagons)} hexagons."
    )

    parents = set()
    for hex_id in self.hexagons:
        parent = h3.cell_to_parent(hex_id, target_resolution)
        parents.add(parent)

    self.logger.info(f"Generated {len(parents)} parent hexagons.")
    return H3Hexagons(resolution=target_resolution, hexagons=list(parents))
save(file, format='json')

Saves the H3Hexagons collection to persistent storage.

Parameters:

Name Type Description Default
file Union[str, Path]

The destination file path.

required
format str

The output format. Supported: 'json', 'parquet', 'geojson'. Defaults to 'json'.

'json'

Raises:

Type Description
ValueError

If an unsupported format is provided.

Source code in gigaspatial/grid/h3.py
def save(self, file: Union[str, Path], format: str = "json") -> None:
    """Saves the H3Hexagons collection to persistent storage.

    Args:
        file: The destination file path.
        format: The output format. Supported: 'json', 'parquet', 'geojson'.
            Defaults to 'json'.

    Raises:
        ValueError: If an unsupported format is provided.
    """
    with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
        if format == "parquet":
            self.to_geodataframe().to_parquet(f, index=False)
        elif format == "geojson":
            f.write(self.to_geodataframe().to_json(drop_id=True))
        elif format == "json":
            json.dump(self.hexagons, f)
        else:
            raise ValueError(f"Unsupported format: {format}")
to_dataframe()

Converts the hexagon collection to a pandas DataFrame.

Returns:

Type Description
DataFrame

A DataFrame with 'hexagon', 'latitude', and 'longitude' columns.

Source code in gigaspatial/grid/h3.py
def to_dataframe(self) -> pd.DataFrame:
    """Converts the hexagon collection to a pandas DataFrame.

    Returns:
        A DataFrame with 'hexagon', 'latitude', and 'longitude' columns.
    """
    self.logger.info(
        f"Converting {len(self.hexagons)} hexagons to pandas DataFrame."
    )
    if not self.hexagons:
        self.logger.warning(
            "No hexagons to convert to DataFrame. Returning empty DataFrame."
        )
        return pd.DataFrame(columns=["hexagon", "latitude", "longitude"])

    centroids = [h3.cell_to_latlng(hex_id) for hex_id in self.hexagons]

    self.logger.info(f"Successfully converted to DataFrame.")

    return pd.DataFrame(
        {
            "hexagon": self.hexagons,
            "latitude": [c[0] for c in centroids],
            "longitude": [c[1] for c in centroids],
        }
    )
to_geodataframe()

Converts the hexagon collection to a GeoPandas GeoDataFrame.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame with 'h3' and 'geometry' columns.

Source code in gigaspatial/grid/h3.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Converts the hexagon collection to a GeoPandas GeoDataFrame.

    Returns:
        A GeoDataFrame with 'h3' and 'geometry' columns.
    """
    return gpd.GeoDataFrame(
        {"h3": self.hexagons, "geometry": self.to_geoms()}, crs="EPSG:4326"
    )
to_geoms()

Converts hexagons to a list of Shapely Polygon geometries.

Returns:

Type Description
List[Polygon]

A list of Polygons representing the cell boundaries.

Source code in gigaspatial/grid/h3.py
def to_geoms(self) -> List[Polygon]:
    """Converts hexagons to a list of Shapely Polygon geometries.

    Returns:
        A list of Polygons representing the cell boundaries.
    """
    self.logger.info(
        f"Converting {len(self.hexagons)} hexagons to shapely Polygon geometries."
    )
    return [shape(h3.cells_to_geo([hex_id])) for hex_id in self.hexagons]

mercator_tiles

CountryMercatorTiles

Bases: MercatorTiles

MercatorTiles specialized for country-level operations.

Extends MercatorTiles to work specifically with country boundaries retrieved from the Giga administrative boundary dataset.

Note

Instances should be created using the create() factory method.

Attributes:

Name Type Description
country str

ISO 3166-1 alpha-3 country code.

Source code in gigaspatial/grid/mercator_tiles.py
class CountryMercatorTiles(MercatorTiles):
    """
    MercatorTiles specialized for country-level operations.

    Extends MercatorTiles to work specifically with country boundaries retrieved
    from the Giga administrative boundary dataset.

    Note:
        Instances should be created using the `create()` factory method.

    Attributes:
        country: ISO 3166-1 alpha-3 country code.
    """

    country: str = Field(..., exclude=True)

    def __init__(self, *args, **kwargs):
        raise TypeError(
            "CountryMercatorTiles cannot be instantiated directly. "
            "Use CountryMercatorTiles.create() instead."
        )

    @classmethod
    def create(
        self,
        country: str,
        zoom_level: int,
        predicate: str = "intersects",
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ) -> "CountryMercatorTiles":
        """Factory method to create MercatorTiles for a specific country's boundary.

        Args:
            country: ISO country code (3-letter alpha-3) or name.
            zoom_level: Target Web Mercator zoom level (0-20).
            predicate: Spatial join predicate for the boundary.
                Defaults to 'intersects'.
            data_store: Optional storage interface for boundary lookup.
            country_geom_path: Optional path override for the boundary file.

        Returns:
            A new CountryMercatorTiles instance fully populated for the country.
        """
        from gigaspatial.handlers.boundaries import AdminBoundaries

        instance = super().__new__(cls)
        super(CountryMercatorTiles, instance).__init__(
            zoom_level=zoom_level,
            quadkeys=[],
            data_store=data_store or LocalDataStore(),
            country=pycountry.countries.lookup(country).alpha_3,
        )

        cls.logger.info(
            f"Initializing Mercator zones for country: {country} at zoom level {zoom_level}"
        )

        country_geom = AdminBoundaries.create(
            country_code=country,
            data_store=data_store,
            path=country_geom_path,
        ).to_geoms()[0]

        tiles = MercatorTiles.from_geometry(country_geom, zoom_level, predicate)

        instance.quadkeys = tiles.quadkeys
        return instance
create(country, zoom_level, predicate='intersects', data_store=None, country_geom_path=None) classmethod

Factory method to create MercatorTiles for a specific country's boundary.

Parameters:

Name Type Description Default
country str

ISO country code (3-letter alpha-3) or name.

required
zoom_level int

Target Web Mercator zoom level (0-20).

required
predicate str

Spatial join predicate for the boundary. Defaults to 'intersects'.

'intersects'
data_store Optional[DataStore]

Optional storage interface for boundary lookup.

None
country_geom_path Optional[Union[str, Path]]

Optional path override for the boundary file.

None

Returns:

Type Description
CountryMercatorTiles

A new CountryMercatorTiles instance fully populated for the country.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def create(
    self,
    country: str,
    zoom_level: int,
    predicate: str = "intersects",
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
) -> "CountryMercatorTiles":
    """Factory method to create MercatorTiles for a specific country's boundary.

    Args:
        country: ISO country code (3-letter alpha-3) or name.
        zoom_level: Target Web Mercator zoom level (0-20).
        predicate: Spatial join predicate for the boundary.
            Defaults to 'intersects'.
        data_store: Optional storage interface for boundary lookup.
        country_geom_path: Optional path override for the boundary file.

    Returns:
        A new CountryMercatorTiles instance fully populated for the country.
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    instance = super().__new__(cls)
    super(CountryMercatorTiles, instance).__init__(
        zoom_level=zoom_level,
        quadkeys=[],
        data_store=data_store or LocalDataStore(),
        country=pycountry.countries.lookup(country).alpha_3,
    )

    cls.logger.info(
        f"Initializing Mercator zones for country: {country} at zoom level {zoom_level}"
    )

    country_geom = AdminBoundaries.create(
        country_code=country,
        data_store=data_store,
        path=country_geom_path,
    ).to_geoms()[0]

    tiles = MercatorTiles.from_geometry(country_geom, zoom_level, predicate)

    instance.quadkeys = tiles.quadkeys
    return instance

MercatorTiles

Bases: BaseModel

Representation of a collection of Web Mercator tiles as quadkeys.

Provides utility methods to create, filter, and manipulate Web Mercator grids for spatial analysis. Handles conversion between coordinate pairs, geometries, and quadkey strings.

Attributes:

Name Type Description
zoom_level int

Web Mercator zoom level (0-20).

quadkeys List[str]

List of quadkey strings.

data_store DataStore

Storage interface for I/O operations.

logger ClassVar

Class-level logger.

Source code in gigaspatial/grid/mercator_tiles.py
class MercatorTiles(BaseModel):
    """
    Representation of a collection of Web Mercator tiles as quadkeys.

    Provides utility methods to create, filter, and manipulate Web Mercator
    grids for spatial analysis. Handles conversion between coordinate pairs,
    geometries, and quadkey strings.

    Attributes:
        zoom_level: Web Mercator zoom level (0-20).
        quadkeys: List of quadkey strings.
        data_store: Storage interface for I/O operations.
        logger: Class-level logger.
    """

    zoom_level: int = Field(..., ge=0, le=20)
    quadkeys: List[str] = Field(default_factory=list)
    data_store: DataStore = Field(default_factory=LocalDataStore, exclude=True)
    logger: ClassVar = config.get_logger("MercatorTiles")

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @classmethod
    def from_quadkeys(cls, quadkeys: Iterable[Union[str, int]]) -> "MercatorTiles":
        """
        Create MercatorTiles from a list of quadkeys.

        Args:
            quadkeys: Iterable of quadkey strings or integers.

        Returns:
            A new MercatorTiles instance.
        """
        quadkeys = list(set(str(q) for q in quadkeys))
        if not quadkeys:
            cls.logger.warning("No quadkeys provided to from_quadkeys.")
            return cls(zoom_level=0, quadkeys=[])
        cls.logger.info(
            f"Initializing MercatorTiles from {len(quadkeys)} provided quadkeys."
        )
        return cls(zoom_level=len(quadkeys[0]), quadkeys=quadkeys)

    @classmethod
    def from_bounds(
        cls, xmin: float, ymin: float, xmax: float, ymax: float, zoom_level: int
    ) -> "MercatorTiles":
        """
        Create MercatorTiles covering the specified geographic bounding box.

        Args:
            xmin: Minimum longitude.
            ymin: Minimum latitude.
            xmax: Maximum longitude.
            ymax: Maximum latitude.
            zoom_level: Web Mercator zoom level.

        Returns:
            A new MercatorTiles instance covering the bounds.
        """
        cls.logger.info(
            f"Creating MercatorTiles from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at zoom level: {zoom_level}"
        )
        return cls(
            zoom_level=zoom_level,
            quadkeys=[
                mercantile.quadkey(tile)
                for tile in mercantile.tiles(xmin, ymin, xmax, ymax, zoom_level)
            ],
        )

    @classmethod
    def from_spatial(
        cls,
        source: Union[
            BaseGeometry,
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],
        ],
        zoom_level: int,
        predicate: str = "intersects",
        **kwargs,
    ) -> "MercatorTiles":
        """
        Factory method to create MercatorTiles from various spatial sources.

        Args:
            source: Spatial data source (Geometry, GDF, or points).
            zoom_level: Web Mercator zoom level.
            predicate: Spatial predicate for containment ('intersects', 'within').
            **kwargs: Forwarded to specific factory methods.

        Returns:
            A new MercatorTiles instance.
        """
        cls.logger.info(
            f"Creating MercatorTiles from spatial source (type: {type(source)}) at zoom level: {zoom_level} with predicate: {predicate}"
        )
        if isinstance(source, gpd.GeoDataFrame):
            if source.crs != "EPSG:4326":
                source = source.to_crs("EPSG:4326")
            source = source.geometry.unary_union

        if isinstance(source, BaseGeometry):
            return cls.from_geometry(
                geometry=source, zoom_level=zoom_level, predicate=predicate, **kwargs
            )
        elif isinstance(source, Iterable) and all(
            isinstance(pt, Point) or len(pt) == 2 for pt in source
        ):
            return cls.from_points(geometry=source, zoom_level=zoom_level, **kwargs)
        else:
            raise

    @classmethod
    def from_geometry(
        cls,
        geometry: BaseGeometry,
        zoom_level: int,
        predicate: str = "intersects",
        **kwargs,
    ) -> "MercatorTiles":
        """
        Create MercatorTiles from a Shapely geometry.

        Args:
            geometry: Input geometry.
            zoom_level: Web Mercator zoom level.
            predicate: Spatial join predicate.
            **kwargs: Additional metadata parameters.

        Returns:
            MercatorTiles instance covering the geometry.
        """
        cls.logger.info(
            f"Creating MercatorTiles from geometry (bounds: {geometry.bounds}) at zoom level: {zoom_level} with predicate: {predicate}"
        )
        tiles = list(mercantile.tiles(*geometry.bounds, zoom_level))
        quadkeys_boxes = [
            (mercantile.quadkey(t), box(*mercantile.bounds(t))) for t in tiles
        ]
        quadkeys, boxes = zip(*quadkeys_boxes) if quadkeys_boxes else ([], [])

        if not boxes:
            cls.logger.warning(
                "No boxes generated from geometry bounds. Returning empty MercatorTiles."
            )
            return MercatorTiles(zoom_level=zoom_level, quadkeys=[])

        s = STRtree(boxes)
        result_indices = s.query(geometry, predicate=predicate)
        filtered_quadkeys = [quadkeys[i] for i in result_indices]
        cls.logger.info(
            f"Filtered down to {len(filtered_quadkeys)} quadkeys using spatial predicate."
        )
        return cls(zoom_level=zoom_level, quadkeys=filtered_quadkeys, **kwargs)

    @classmethod
    def from_points(
        cls, points: List[Union[Point, Tuple[float, float]]], zoom_level: int, **kwargs
    ) -> "MercatorTiles":
        """Creates a MercatorTiles collection from a list of points.

        Args:
            points: List of points as Shapely Points or (lon, lat) tuples.
            zoom_level: Web Mercator zoom level (0-20).
            **kwargs: Additional metadata parameters.

        Returns:
            A new MercatorTiles instance containing tiles for all points.
        """
        cls.logger.info(
            f"Creating MercatorTiles from {len(points)} points at zoom level: {zoom_level}"
        )
        quadkeys = set(cls.get_quadkeys_from_points(points, zoom_level))
        cls.logger.info(f"Generated {len(quadkeys)} unique quadkeys from points.")
        return cls(zoom_level=zoom_level, quadkeys=list(quadkeys), **kwargs)

    @classmethod
    def from_json(
        cls, data_store: DataStore, file: Union[str, Path], **kwargs
    ) -> "MercatorTiles":
        """
        Load MercatorTiles from a JSON file in storage.

        Args:
            data_store: DataStore instance for the file.
            file: Path to the JSON file.
            **kwargs: Metadata overrides.

        Returns:
            MercatorTiles instance loaded from file.
        """
        cls.logger.info(
            f"Loading MercatorTiles from JSON file: {file} using data store: {type(data_store).__name__}"
        )
        with data_store.open(str(file), "r") as f:
            data = json.load(f)
            if isinstance(data, list):  # If file contains only quadkeys
                data = {
                    "zoom_level": len(data[0]) if data else 0,
                    "quadkeys": data,
                    **kwargs,
                }
            else:
                data.update(kwargs)
            instance = cls(**data)
            instance.data_store = data_store
            cls.logger.info(
                f"Successfully loaded {len(instance.quadkeys)} quadkeys from JSON file."
            )
            return instance

    def filter_quadkeys(self, quadkeys: Iterable[str]) -> "MercatorTiles":
        """Filters the current collection against a provided set of quadkeys.

        Args:
            quadkeys: An iterable of quadkey strings to keep.

        Returns:
            A new MercatorTiles instance containing only the intersecting tiles.
        """
        original_count = len(self.quadkeys)
        incoming_count = len(
            list(quadkeys)
        )  # Convert to list to get length if it's an iterator

        self.logger.info(
            f"Filtering {original_count} quadkeys with an incoming set of {incoming_count} quadkeys."
        )
        filtered_quadkeys = list(set(self.quadkeys) & set(quadkeys))
        self.logger.info(f"Resulting in {len(filtered_quadkeys)} filtered quadkeys.")
        return MercatorTiles(
            zoom_level=self.zoom_level,
            quadkeys=filtered_quadkeys,
        )

    def to_dataframe(self) -> pd.DataFrame:
        """Converts the tile collection to a pandas DataFrame.

        Returns:
            A DataFrame with 'quadkey', 'latitude', and 'longitude' columns.
        """
        self.logger.info(
            f"Converting {len(self.quadkeys)} quadkeys to pandas DataFrame."
        )
        if not self.quadkeys:
            self.logger.warning(
                "No quadkeys to convert to DataFrame. Returning empty DataFrame."
            )
            return pd.DataFrame(columns=["quadkey", "latitude", "longitude"])
        tiles_data = [mercantile.quadkey_to_tile(q) for q in self.quadkeys]
        bounds_data = [mercantile.bounds(tile) for tile in tiles_data]

        centroids = [
            (
                (bounds.south + bounds.north) / 2,  # latitude
                (bounds.west + bounds.east) / 2,  # longitude
            )
            for bounds in bounds_data
        ]

        self.logger.info(f"Successfully converted to DataFrame.")

        return pd.DataFrame(
            {
                "quadkey": self.quadkeys,
                "latitude": [c[0] for c in centroids],
                "longitude": [c[1] for c in centroids],
            }
        )

    def to_geoms(self) -> List[box]:
        """Converts quadkeys into a list of Shapely box (polygon) geometries.

        Returns:
            A list of boxes representing the tile boundaries.
        """
        self.logger.info(
            f"Converting {len(self.quadkeys)} quadkeys to shapely box geometries."
        )
        return [
            box(*mercantile.bounds(mercantile.quadkey_to_tile(q)))
            for q in self.quadkeys
        ]

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Converts the tile collection to a GeoPandas GeoDataFrame.

        Returns:
            A GeoDataFrame with 'quadkey' and 'geometry' columns.
        """
        return gpd.GeoDataFrame(
            {"quadkey": self.quadkeys, "geometry": self.to_geoms()}, crs="EPSG:4326"
        )

    @staticmethod
    def get_quadkeys_from_points(
        points: List[Union[Point, Tuple[float, float]]], zoom_level: int
    ) -> List[str]:
        """Get list of quadkeys for the provided points at specified zoom level.

        Args:
            points: List of points as either shapely Points or (lon, lat) tuples
            zoom_level: Zoom level for the quadkeys

        Returns:
            List of quadkey strings
        """
        quadkeys = [
            (
                mercantile.quadkey(mercantile.tile(p.x, p.y, zoom_level))
                if isinstance(p, Point)
                else mercantile.quadkey(mercantile.tile(p[1], p[0], zoom_level))
            )
            for p in points
        ]
        return quadkeys

    def save(self, file: Union[str, Path], format: str = "json") -> None:
        """Saves the MercatorTiles collection to persistent storage.

        Args:
            file: The destination file path.
            format: The output format. Supported: 'json', 'parquet', 'geojson'.
                Defaults to 'json'.

        Raises:
            ValueError: If an unsupported format is provided.
        """
        with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
            if format == "parquet":
                self.to_geodataframe().to_parquet(f, index=False)
            elif format == "geojson":
                f.write(self.to_geodataframe().to_json(drop_id=True))
            elif format == "json":
                json.dump(self.quadkeys, f)
            else:
                raise ValueError(f"Unsupported format: {format}")

    def __len__(self) -> int:
        return len(self.quadkeys)
filter_quadkeys(quadkeys)

Filters the current collection against a provided set of quadkeys.

Parameters:

Name Type Description Default
quadkeys Iterable[str]

An iterable of quadkey strings to keep.

required

Returns:

Type Description
MercatorTiles

A new MercatorTiles instance containing only the intersecting tiles.

Source code in gigaspatial/grid/mercator_tiles.py
def filter_quadkeys(self, quadkeys: Iterable[str]) -> "MercatorTiles":
    """Filters the current collection against a provided set of quadkeys.

    Args:
        quadkeys: An iterable of quadkey strings to keep.

    Returns:
        A new MercatorTiles instance containing only the intersecting tiles.
    """
    original_count = len(self.quadkeys)
    incoming_count = len(
        list(quadkeys)
    )  # Convert to list to get length if it's an iterator

    self.logger.info(
        f"Filtering {original_count} quadkeys with an incoming set of {incoming_count} quadkeys."
    )
    filtered_quadkeys = list(set(self.quadkeys) & set(quadkeys))
    self.logger.info(f"Resulting in {len(filtered_quadkeys)} filtered quadkeys.")
    return MercatorTiles(
        zoom_level=self.zoom_level,
        quadkeys=filtered_quadkeys,
    )
from_bounds(xmin, ymin, xmax, ymax, zoom_level) classmethod

Create MercatorTiles covering the specified geographic bounding box.

Parameters:

Name Type Description Default
xmin float

Minimum longitude.

required
ymin float

Minimum latitude.

required
xmax float

Maximum longitude.

required
ymax float

Maximum latitude.

required
zoom_level int

Web Mercator zoom level.

required

Returns:

Type Description
MercatorTiles

A new MercatorTiles instance covering the bounds.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_bounds(
    cls, xmin: float, ymin: float, xmax: float, ymax: float, zoom_level: int
) -> "MercatorTiles":
    """
    Create MercatorTiles covering the specified geographic bounding box.

    Args:
        xmin: Minimum longitude.
        ymin: Minimum latitude.
        xmax: Maximum longitude.
        ymax: Maximum latitude.
        zoom_level: Web Mercator zoom level.

    Returns:
        A new MercatorTiles instance covering the bounds.
    """
    cls.logger.info(
        f"Creating MercatorTiles from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at zoom level: {zoom_level}"
    )
    return cls(
        zoom_level=zoom_level,
        quadkeys=[
            mercantile.quadkey(tile)
            for tile in mercantile.tiles(xmin, ymin, xmax, ymax, zoom_level)
        ],
    )
from_geometry(geometry, zoom_level, predicate='intersects', **kwargs) classmethod

Create MercatorTiles from a Shapely geometry.

Parameters:

Name Type Description Default
geometry BaseGeometry

Input geometry.

required
zoom_level int

Web Mercator zoom level.

required
predicate str

Spatial join predicate.

'intersects'
**kwargs

Additional metadata parameters.

{}

Returns:

Type Description
MercatorTiles

MercatorTiles instance covering the geometry.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_geometry(
    cls,
    geometry: BaseGeometry,
    zoom_level: int,
    predicate: str = "intersects",
    **kwargs,
) -> "MercatorTiles":
    """
    Create MercatorTiles from a Shapely geometry.

    Args:
        geometry: Input geometry.
        zoom_level: Web Mercator zoom level.
        predicate: Spatial join predicate.
        **kwargs: Additional metadata parameters.

    Returns:
        MercatorTiles instance covering the geometry.
    """
    cls.logger.info(
        f"Creating MercatorTiles from geometry (bounds: {geometry.bounds}) at zoom level: {zoom_level} with predicate: {predicate}"
    )
    tiles = list(mercantile.tiles(*geometry.bounds, zoom_level))
    quadkeys_boxes = [
        (mercantile.quadkey(t), box(*mercantile.bounds(t))) for t in tiles
    ]
    quadkeys, boxes = zip(*quadkeys_boxes) if quadkeys_boxes else ([], [])

    if not boxes:
        cls.logger.warning(
            "No boxes generated from geometry bounds. Returning empty MercatorTiles."
        )
        return MercatorTiles(zoom_level=zoom_level, quadkeys=[])

    s = STRtree(boxes)
    result_indices = s.query(geometry, predicate=predicate)
    filtered_quadkeys = [quadkeys[i] for i in result_indices]
    cls.logger.info(
        f"Filtered down to {len(filtered_quadkeys)} quadkeys using spatial predicate."
    )
    return cls(zoom_level=zoom_level, quadkeys=filtered_quadkeys, **kwargs)
from_json(data_store, file, **kwargs) classmethod

Load MercatorTiles from a JSON file in storage.

Parameters:

Name Type Description Default
data_store DataStore

DataStore instance for the file.

required
file Union[str, Path]

Path to the JSON file.

required
**kwargs

Metadata overrides.

{}

Returns:

Type Description
MercatorTiles

MercatorTiles instance loaded from file.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_json(
    cls, data_store: DataStore, file: Union[str, Path], **kwargs
) -> "MercatorTiles":
    """
    Load MercatorTiles from a JSON file in storage.

    Args:
        data_store: DataStore instance for the file.
        file: Path to the JSON file.
        **kwargs: Metadata overrides.

    Returns:
        MercatorTiles instance loaded from file.
    """
    cls.logger.info(
        f"Loading MercatorTiles from JSON file: {file} using data store: {type(data_store).__name__}"
    )
    with data_store.open(str(file), "r") as f:
        data = json.load(f)
        if isinstance(data, list):  # If file contains only quadkeys
            data = {
                "zoom_level": len(data[0]) if data else 0,
                "quadkeys": data,
                **kwargs,
            }
        else:
            data.update(kwargs)
        instance = cls(**data)
        instance.data_store = data_store
        cls.logger.info(
            f"Successfully loaded {len(instance.quadkeys)} quadkeys from JSON file."
        )
        return instance
from_points(points, zoom_level, **kwargs) classmethod

Creates a MercatorTiles collection from a list of points.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as Shapely Points or (lon, lat) tuples.

required
zoom_level int

Web Mercator zoom level (0-20).

required
**kwargs

Additional metadata parameters.

{}

Returns:

Type Description
MercatorTiles

A new MercatorTiles instance containing tiles for all points.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_points(
    cls, points: List[Union[Point, Tuple[float, float]]], zoom_level: int, **kwargs
) -> "MercatorTiles":
    """Creates a MercatorTiles collection from a list of points.

    Args:
        points: List of points as Shapely Points or (lon, lat) tuples.
        zoom_level: Web Mercator zoom level (0-20).
        **kwargs: Additional metadata parameters.

    Returns:
        A new MercatorTiles instance containing tiles for all points.
    """
    cls.logger.info(
        f"Creating MercatorTiles from {len(points)} points at zoom level: {zoom_level}"
    )
    quadkeys = set(cls.get_quadkeys_from_points(points, zoom_level))
    cls.logger.info(f"Generated {len(quadkeys)} unique quadkeys from points.")
    return cls(zoom_level=zoom_level, quadkeys=list(quadkeys), **kwargs)
from_quadkeys(quadkeys) classmethod

Create MercatorTiles from a list of quadkeys.

Parameters:

Name Type Description Default
quadkeys Iterable[Union[str, int]]

Iterable of quadkey strings or integers.

required

Returns:

Type Description
MercatorTiles

A new MercatorTiles instance.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_quadkeys(cls, quadkeys: Iterable[Union[str, int]]) -> "MercatorTiles":
    """
    Create MercatorTiles from a list of quadkeys.

    Args:
        quadkeys: Iterable of quadkey strings or integers.

    Returns:
        A new MercatorTiles instance.
    """
    quadkeys = list(set(str(q) for q in quadkeys))
    if not quadkeys:
        cls.logger.warning("No quadkeys provided to from_quadkeys.")
        return cls(zoom_level=0, quadkeys=[])
    cls.logger.info(
        f"Initializing MercatorTiles from {len(quadkeys)} provided quadkeys."
    )
    return cls(zoom_level=len(quadkeys[0]), quadkeys=quadkeys)
from_spatial(source, zoom_level, predicate='intersects', **kwargs) classmethod

Factory method to create MercatorTiles from various spatial sources.

Parameters:

Name Type Description Default
source Union[BaseGeometry, GeoDataFrame, List[Union[Point, Tuple[float, float]]]]

Spatial data source (Geometry, GDF, or points).

required
zoom_level int

Web Mercator zoom level.

required
predicate str

Spatial predicate for containment ('intersects', 'within').

'intersects'
**kwargs

Forwarded to specific factory methods.

{}

Returns:

Type Description
MercatorTiles

A new MercatorTiles instance.

Source code in gigaspatial/grid/mercator_tiles.py
@classmethod
def from_spatial(
    cls,
    source: Union[
        BaseGeometry,
        gpd.GeoDataFrame,
        List[Union[Point, Tuple[float, float]]],
    ],
    zoom_level: int,
    predicate: str = "intersects",
    **kwargs,
) -> "MercatorTiles":
    """
    Factory method to create MercatorTiles from various spatial sources.

    Args:
        source: Spatial data source (Geometry, GDF, or points).
        zoom_level: Web Mercator zoom level.
        predicate: Spatial predicate for containment ('intersects', 'within').
        **kwargs: Forwarded to specific factory methods.

    Returns:
        A new MercatorTiles instance.
    """
    cls.logger.info(
        f"Creating MercatorTiles from spatial source (type: {type(source)}) at zoom level: {zoom_level} with predicate: {predicate}"
    )
    if isinstance(source, gpd.GeoDataFrame):
        if source.crs != "EPSG:4326":
            source = source.to_crs("EPSG:4326")
        source = source.geometry.unary_union

    if isinstance(source, BaseGeometry):
        return cls.from_geometry(
            geometry=source, zoom_level=zoom_level, predicate=predicate, **kwargs
        )
    elif isinstance(source, Iterable) and all(
        isinstance(pt, Point) or len(pt) == 2 for pt in source
    ):
        return cls.from_points(geometry=source, zoom_level=zoom_level, **kwargs)
    else:
        raise
get_quadkeys_from_points(points, zoom_level) staticmethod

Get list of quadkeys for the provided points at specified zoom level.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as either shapely Points or (lon, lat) tuples

required
zoom_level int

Zoom level for the quadkeys

required

Returns:

Type Description
List[str]

List of quadkey strings

Source code in gigaspatial/grid/mercator_tiles.py
@staticmethod
def get_quadkeys_from_points(
    points: List[Union[Point, Tuple[float, float]]], zoom_level: int
) -> List[str]:
    """Get list of quadkeys for the provided points at specified zoom level.

    Args:
        points: List of points as either shapely Points or (lon, lat) tuples
        zoom_level: Zoom level for the quadkeys

    Returns:
        List of quadkey strings
    """
    quadkeys = [
        (
            mercantile.quadkey(mercantile.tile(p.x, p.y, zoom_level))
            if isinstance(p, Point)
            else mercantile.quadkey(mercantile.tile(p[1], p[0], zoom_level))
        )
        for p in points
    ]
    return quadkeys
save(file, format='json')

Saves the MercatorTiles collection to persistent storage.

Parameters:

Name Type Description Default
file Union[str, Path]

The destination file path.

required
format str

The output format. Supported: 'json', 'parquet', 'geojson'. Defaults to 'json'.

'json'

Raises:

Type Description
ValueError

If an unsupported format is provided.

Source code in gigaspatial/grid/mercator_tiles.py
def save(self, file: Union[str, Path], format: str = "json") -> None:
    """Saves the MercatorTiles collection to persistent storage.

    Args:
        file: The destination file path.
        format: The output format. Supported: 'json', 'parquet', 'geojson'.
            Defaults to 'json'.

    Raises:
        ValueError: If an unsupported format is provided.
    """
    with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
        if format == "parquet":
            self.to_geodataframe().to_parquet(f, index=False)
        elif format == "geojson":
            f.write(self.to_geodataframe().to_json(drop_id=True))
        elif format == "json":
            json.dump(self.quadkeys, f)
        else:
            raise ValueError(f"Unsupported format: {format}")
to_dataframe()

Converts the tile collection to a pandas DataFrame.

Returns:

Type Description
DataFrame

A DataFrame with 'quadkey', 'latitude', and 'longitude' columns.

Source code in gigaspatial/grid/mercator_tiles.py
def to_dataframe(self) -> pd.DataFrame:
    """Converts the tile collection to a pandas DataFrame.

    Returns:
        A DataFrame with 'quadkey', 'latitude', and 'longitude' columns.
    """
    self.logger.info(
        f"Converting {len(self.quadkeys)} quadkeys to pandas DataFrame."
    )
    if not self.quadkeys:
        self.logger.warning(
            "No quadkeys to convert to DataFrame. Returning empty DataFrame."
        )
        return pd.DataFrame(columns=["quadkey", "latitude", "longitude"])
    tiles_data = [mercantile.quadkey_to_tile(q) for q in self.quadkeys]
    bounds_data = [mercantile.bounds(tile) for tile in tiles_data]

    centroids = [
        (
            (bounds.south + bounds.north) / 2,  # latitude
            (bounds.west + bounds.east) / 2,  # longitude
        )
        for bounds in bounds_data
    ]

    self.logger.info(f"Successfully converted to DataFrame.")

    return pd.DataFrame(
        {
            "quadkey": self.quadkeys,
            "latitude": [c[0] for c in centroids],
            "longitude": [c[1] for c in centroids],
        }
    )
to_geodataframe()

Converts the tile collection to a GeoPandas GeoDataFrame.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame with 'quadkey' and 'geometry' columns.

Source code in gigaspatial/grid/mercator_tiles.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Converts the tile collection to a GeoPandas GeoDataFrame.

    Returns:
        A GeoDataFrame with 'quadkey' and 'geometry' columns.
    """
    return gpd.GeoDataFrame(
        {"quadkey": self.quadkeys, "geometry": self.to_geoms()}, crs="EPSG:4326"
    )
to_geoms()

Converts quadkeys into a list of Shapely box (polygon) geometries.

Returns:

Type Description
List[box]

A list of boxes representing the tile boundaries.

Source code in gigaspatial/grid/mercator_tiles.py
def to_geoms(self) -> List[box]:
    """Converts quadkeys into a list of Shapely box (polygon) geometries.

    Returns:
        A list of boxes representing the tile boundaries.
    """
    self.logger.info(
        f"Converting {len(self.quadkeys)} quadkeys to shapely box geometries."
    )
    return [
        box(*mercantile.bounds(mercantile.quadkey_to_tile(q)))
        for q in self.quadkeys
    ]

s2

CountryS2Cells

Bases: S2Cells

S2Cells specialized for country-level operations.

This class extends S2Cells to work specifically with country boundaries. It can only be instantiated through the create() classmethod.

Source code in gigaspatial/grid/s2.py
class CountryS2Cells(S2Cells):
    """S2Cells specialized for country-level operations.

    This class extends S2Cells to work specifically with country boundaries.
    It can only be instantiated through the create() classmethod.
    """

    country: str = Field(..., exclude=True)

    def __init__(self, *args, **kwargs):
        raise TypeError(
            "CountryS2Cells cannot be instantiated directly. "
            "Use CountryS2Cells.create() instead."
        )

    @classmethod
    def create(
        cls,
        country: str,
        level: int,
        max_cells: int = 1000,
        data_store: Optional[DataStore] = None,
        country_geom_path: Optional[Union[str, Path]] = None,
    ):
        """Create CountryS2Cells for a specific country."""
        from gigaspatial.handlers.boundaries import AdminBoundaries

        instance = super().__new__(cls)
        super(CountryS2Cells, instance).__init__(
            level=level,
            cells=[],
            data_store=data_store or LocalDataStore(),
            country=pycountry.countries.lookup(country).alpha_3,
        )

        cls.logger.info(
            f"Initializing S2 cells for country: {country} at level {level}"
        )

        country_geom = AdminBoundaries.create(
            country_code=country,
            data_store=data_store,
            path=country_geom_path,
        ).to_geoms()[0]

        cells = S2Cells.from_geometry(country_geom, level, max_cells=max_cells)
        instance.cells = cells.cells

        return instance
create(country, level, max_cells=1000, data_store=None, country_geom_path=None) classmethod

Create CountryS2Cells for a specific country.

Source code in gigaspatial/grid/s2.py
@classmethod
def create(
    cls,
    country: str,
    level: int,
    max_cells: int = 1000,
    data_store: Optional[DataStore] = None,
    country_geom_path: Optional[Union[str, Path]] = None,
):
    """Create CountryS2Cells for a specific country."""
    from gigaspatial.handlers.boundaries import AdminBoundaries

    instance = super().__new__(cls)
    super(CountryS2Cells, instance).__init__(
        level=level,
        cells=[],
        data_store=data_store or LocalDataStore(),
        country=pycountry.countries.lookup(country).alpha_3,
    )

    cls.logger.info(
        f"Initializing S2 cells for country: {country} at level {level}"
    )

    country_geom = AdminBoundaries.create(
        country_code=country,
        data_store=data_store,
        path=country_geom_path,
    ).to_geoms()[0]

    cells = S2Cells.from_geometry(country_geom, level, max_cells=max_cells)
    instance.cells = cells.cells

    return instance

S2Cells

Bases: BaseModel

S2Cells class for generating and managing Google S2 cell grids.

S2 uses levels 0-30, where higher levels represent finer resolution. Level 0 covers the largest area and level 30 the smallest.

Source code in gigaspatial/grid/s2.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
class S2Cells(BaseModel):
    """S2Cells class for generating and managing Google S2 cell grids.

    S2 uses levels 0-30, where higher levels represent finer resolution.
    Level 0 covers the largest area and level 30 the smallest.
    """

    level: int = Field(..., ge=0, le=30)
    cells: List[int] = Field(default_factory=list)  # S2 cell IDs as integers
    data_store: DataStore = Field(default_factory=LocalDataStore, exclude=True)
    logger: ClassVar = config.get_logger("S2Cells")

    class Config:
        arbitrary_types_allowed = True

    @classmethod
    def from_cells(cls, cells: List[Union[int, str]]):
        """Create S2Cells from list of S2 cell IDs (integers or tokens)."""
        if not cells:
            cls.logger.warning("No cells provided to from_cells.")
            return cls(level=0, cells=[])

        cls.logger.info(f"Initializing S2Cells from {len(cells)} provided cells.")

        # Convert tokens to integers if needed
        cell_ids = []
        for cell in cells:
            if isinstance(cell, str):
                cell_ids.append(CellId.from_token(cell).id())
            else:
                cell_ids.append(cell)

        # Get level from first cell
        level = CellId(cell_ids[0]).level()
        return cls(level=level, cells=list(set(cell_ids)))

    @classmethod
    def from_bounds(
        cls,
        xmin: float,
        ymin: float,
        xmax: float,
        ymax: float,
        level: int,
        max_cells: int = 100,
    ):
        """Create S2Cells from boundary coordinates.

        Args:
            xmin, ymin, xmax, ymax: Bounding box coordinates in degrees
            level: S2 level (0-30)
            max_cells: Maximum number of cells to generate
        """
        cls.logger.info(
            f"Creating S2Cells from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at level: {level}"
        )

        # Create a LatLngRect for the bounding box
        rect = LatLngRect(
            LatLng.from_degrees(ymin, xmin), LatLng.from_degrees(ymax, xmax)
        )

        # Use RegionCoverer to get cells
        coverer = RegionCoverer()
        coverer.min_level = level
        coverer.max_level = level
        coverer.max_cells = max_cells

        covering = coverer.get_covering(rect)
        cells = [cell.id() for cell in covering]

        cls.logger.info(f"Generated {len(cells)} cells from bounds.")
        return cls(level=level, cells=cells)

    @classmethod
    def from_spatial(
        cls,
        source: Union[
            BaseGeometry,
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],
        ],
        level: int,
        max_cells: int = 1000,
        **kwargs,
    ):
        """Create S2Cells from various spatial sources."""
        cls.logger.info(
            f"Creating S2Cells from spatial source (type: {type(source)}) at level: {level}"
        )

        if isinstance(source, gpd.GeoDataFrame):
            if source.crs != "EPSG:4326":
                source = source.to_crs("EPSG:4326")
            is_point_series = source.geometry.geom_type == "Point"
            all_are_points = is_point_series.all()
            if all_are_points:
                source = source.geometry.to_list()
            else:
                source = source.geometry.unary_union

        if isinstance(source, BaseGeometry):
            return cls.from_geometry(
                geometry=source, level=level, max_cells=max_cells, **kwargs
            )
        elif isinstance(source, Iterable) and all(
            isinstance(pt, Point) or len(pt) == 2 for pt in source
        ):
            return cls.from_points(points=source, level=level, **kwargs)
        else:
            raise ValueError("Unsupported source type for S2Cells.from_spatial")

    @classmethod
    def from_geometry(
        cls,
        geometry: BaseGeometry,
        level: int,
        max_cells: int = 1000,
        **kwargs,
    ):
        """Create S2Cells from a geometry.

        Args:
            geometry: Shapely geometry
            level: S2 level (0-30)
            max_cells: Maximum number of cells to generate
        """
        cls.logger.info(
            f"Creating S2Cells from geometry (bounds: {geometry.bounds}) at level: {level}"
        )

        if isinstance(geometry, Point):
            return cls.from_points([geometry], level)

        # For polygons and other shapes, use bounding box with RegionCoverer
        # Then filter to actual intersection
        minx, miny, maxx, maxy = geometry.bounds

        rect = LatLngRect(
            LatLng.from_degrees(miny, minx), LatLng.from_degrees(maxy, maxx)
        )

        coverer = RegionCoverer()
        coverer.min_level = level
        coverer.max_level = level
        coverer.max_cells = max_cells

        covering = coverer.get_covering(rect)

        # Filter cells that actually intersect the geometry
        cells = []
        for cell_id in covering:
            cell = Cell(cell_id)
            # Create polygon from cell vertices
            vertices = []
            for i in range(4):
                vertex = cell.get_vertex(i)
                lat_lng = LatLng.from_point(vertex)
                vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))
            vertices.append(vertices[0])  # Close the polygon

            cell_polygon = Polygon(vertices)
            if cell_polygon.intersects(geometry):
                cells.append(cell_id.id())

        cls.logger.info(f"Generated {len(cells)} cells from geometry.")
        return cls(level=level, cells=cells, **kwargs)

    @classmethod
    def from_points(
        cls, points: List[Union[Point, Tuple[float, float]]], level: int, **kwargs
    ) -> "S2Cells":
        """Create S2Cells from a list of points or lat-lon pairs."""
        cls.logger.info(f"Creating S2Cells from {len(points)} points at level: {level}")

        cells = set(cls.get_cells_from_points(points, level))
        cls.logger.info(f"Generated {len(cells)} unique cells from points.")
        return cls(level=level, cells=list(cells), **kwargs)

    @classmethod
    def from_json(
        cls, data_store: DataStore, file: Union[str, Path], **kwargs
    ) -> "S2Cells":
        """Load S2Cells from a JSON file."""
        cls.logger.info(
            f"Loading S2Cells from JSON file: {file} using data store: {type(data_store).__name__}"
        )

        with data_store.open(str(file), "r") as f:
            data = json.load(f)

        if isinstance(data, list):  # If file contains only cell IDs
            # Get level from first cell if available
            level = CellId(data[0]).level() if data else 0
            data = {
                "level": level,
                "cells": data,
                **kwargs,
            }
        else:
            data.update(kwargs)

        instance = cls(**data)
        instance.data_store = data_store
        cls.logger.info(
            f"Successfully loaded {len(instance.cells)} cells from JSON file."
        )
        return instance

    @property
    def average_cell_area(self):
        """Average area of cells at this level in square meters."""
        # Approximate area calculation based on S2 geometry
        # Earth surface area is ~510 trillion square meters
        # Each level quadruples the number of cells
        earth_area = 510_000_000_000_000  # m^2
        num_cells_at_level = 6 * (4**self.level)  # 6 faces, each subdivided
        return earth_area / num_cells_at_level

    def filter_cells(self, cells: Iterable[int]) -> "S2Cells":
        """Filter cells by a given set of cell IDs."""
        original_count = len(self.cells)
        incoming_count = len(list(cells))

        self.logger.info(
            f"Filtering {original_count} cells with an incoming set of {incoming_count} cells."
        )

        filtered_cells = list(set(self.cells) & set(cells))
        self.logger.info(f"Resulting in {len(filtered_cells)} filtered cells.")

        return S2Cells(
            level=self.level,
            cells=filtered_cells,
        )

    def to_dataframe(self) -> pd.DataFrame:
        """Convert to pandas DataFrame with cell ID and centroid coordinates."""
        self.logger.info(f"Converting {len(self.cells)} cells to pandas DataFrame.")

        if not self.cells:
            self.logger.warning(
                "No cells to convert to DataFrame. Returning empty DataFrame."
            )
            return pd.DataFrame(
                columns=["cell_id", "cell_token", "latitude", "longitude"]
            )

        data = []
        for cell_id in self.cells:
            cell = Cell(CellId(cell_id))
            center = LatLng.from_point(cell.get_center())
            data.append(
                {
                    "cell_id": cell_id,
                    "cell_token": CellId(cell_id).to_token(),
                    "latitude": center.lat().degrees,
                    "longitude": center.lng().degrees,
                }
            )

        self.logger.info(f"Successfully converted to DataFrame.")
        return pd.DataFrame(data)

    def to_geoms(self) -> List[Polygon]:
        """Convert cells to shapely Polygon geometries."""
        self.logger.info(f"Converting {len(self.cells)} cells to geometries.")
        polygons = []
        invalid_count = 0

        for cell_id in self.cells:
            c_id = CellId(int(cell_id))
            cell = Cell(c_id)

            vertices = []
            for i in range(4):
                vertex = cell.get_vertex(i)
                lat_lng = LatLng.from_point(vertex)
                vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))

            vertices.append(vertices[0])

            try:
                poly = Polygon(vertices)

                # Enforce CCW orientation
                poly = orient(poly, sign=1.0)

                # Fix projection artifacts
                if not poly.is_valid:
                    invalid_count += 1
                    poly = poly.buffer(0)

                # Verify after repair
                if not poly.is_valid:
                    self.logger.warning(
                        f"Cell {c_id.to_token()} still invalid after buffer(0)"
                    )
                    continue

                polygons.append(poly)

            except Exception as e:
                self.logger.error(f"Failed to convert cell {c_id.to_token()}: {e}")

        if invalid_count > 0:
            self.logger.info(
                f"Repaired {invalid_count} invalid geometries via buffer(0)"
            )

        return polygons

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert to GeoPandas GeoDataFrame."""
        return gpd.GeoDataFrame(
            {
                "cell_id": self.cells,
                "cell_token": [CellId(c).to_token() for c in self.cells],
                "geometry": self.to_geoms(),
            },
            crs="EPSG:4326",
        )

    @staticmethod
    def get_cells_from_points(
        points: List[Union[Point, Tuple[float, float]]], level: int
    ) -> List[int]:
        """Get list of S2 cell IDs for the provided points at specified level.

        Args:
            points: List of points as either shapely Points or (lon, lat) tuples
            level: S2 level

        Returns:
            List of S2 cell IDs as integers
        """
        cells = []
        for p in points:
            if isinstance(p, Point):
                # Shapely Point has x=lon, y=lat
                lat_lng = LatLng.from_degrees(p.y, p.x)
            else:
                # Assume tuple is (lon, lat)
                lat_lng = LatLng.from_degrees(p[1], p[0])

            cell_id = CellId.from_lat_lng(lat_lng).parent(level)
            cells.append(cell_id.id())

        return cells

    def get_neighbors(self, direct_only: bool = True) -> "S2Cells":
        """Get neighbors of all cells.

        Args:
            direct_only: If True, get only direct edge neighbors (4 per cell).
                        If False, get all 8 neighbors including corners.

        Returns:
            New S2Cells instance with neighbors included
        """
        self.logger.info(
            f"Getting neighbors for {len(self.cells)} cells (direct_only={direct_only})."
        )

        all_neighbors = set()
        for cell_id in self.cells:
            cell = CellId(cell_id)
            # Get edge neighbors
            for i in range(4):
                neighbors = cell.get_edge_neighbors()
                all_neighbors.update([n.id() for n in neighbors])

            if not direct_only:
                # Get corner neighbors
                for i in range(4):
                    vertex_neighbors = cell.get_vertex_neighbors(i)
                    all_neighbors.update([n.id() for n in vertex_neighbors])

        self.logger.info(f"Found {len(all_neighbors)} total cells including neighbors.")

        return S2Cells(level=self.level, cells=list(all_neighbors))

    def get_children(self, target_level: int) -> "S2Cells":
        """Get children cells at higher level.

        Args:
            target_level: Target level (must be higher than current)

        Returns:
            New S2Cells instance with children at target level
        """
        if target_level <= self.level:
            raise ValueError("Target level must be higher than current level")

        self.logger.info(
            f"Getting children at level {target_level} for {len(self.cells)} cells."
        )

        all_children = []
        for cell_id in self.cells:
            cell = CellId(cell_id)
            # Get all children at target level
            child = cell.child_begin(target_level)
            end = cell.child_end(target_level)

            while child != end:
                all_children.append(child.id())
                child = child.next()

        self.logger.info(f"Generated {len(all_children)} children cells.")
        return S2Cells(level=target_level, cells=all_children)

    def get_parents(self, target_level: int) -> "S2Cells":
        """Get parent cells at lower level.

        Args:
            target_level: Target level (must be lower than current)

        Returns:
            New S2Cells instance with parents at target level
        """
        if target_level >= self.level:
            raise ValueError("Target level must be lower than current level")

        self.logger.info(
            f"Getting parents at level {target_level} for {len(self.cells)} cells."
        )

        parents = set()
        for cell_id in self.cells:
            parent = CellId(cell_id).parent(target_level)
            parents.add(parent.id())

        self.logger.info(f"Generated {len(parents)} parent cells.")
        return S2Cells(level=target_level, cells=list(parents))

    def save(self, file: Union[str, Path], format: str = "json") -> None:
        """Save S2Cells to file in specified format."""
        with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
            if format == "parquet":
                self.to_geodataframe().to_parquet(f, index=False)
            elif format == "geojson":
                f.write(self.to_geodataframe().to_json(drop_id=True))
            elif format == "json":
                json.dump(self.cells, f)
            else:
                raise ValueError(f"Unsupported format: {format}")

    def __len__(self) -> int:
        return len(self.cells)
average_cell_area property

Average area of cells at this level in square meters.

filter_cells(cells)

Filter cells by a given set of cell IDs.

Source code in gigaspatial/grid/s2.py
def filter_cells(self, cells: Iterable[int]) -> "S2Cells":
    """Filter cells by a given set of cell IDs."""
    original_count = len(self.cells)
    incoming_count = len(list(cells))

    self.logger.info(
        f"Filtering {original_count} cells with an incoming set of {incoming_count} cells."
    )

    filtered_cells = list(set(self.cells) & set(cells))
    self.logger.info(f"Resulting in {len(filtered_cells)} filtered cells.")

    return S2Cells(
        level=self.level,
        cells=filtered_cells,
    )
from_bounds(xmin, ymin, xmax, ymax, level, max_cells=100) classmethod

Create S2Cells from boundary coordinates.

Parameters:

Name Type Description Default
xmin, (ymin, xmax, ymax)

Bounding box coordinates in degrees

required
level int

S2 level (0-30)

required
max_cells int

Maximum number of cells to generate

100
Source code in gigaspatial/grid/s2.py
@classmethod
def from_bounds(
    cls,
    xmin: float,
    ymin: float,
    xmax: float,
    ymax: float,
    level: int,
    max_cells: int = 100,
):
    """Create S2Cells from boundary coordinates.

    Args:
        xmin, ymin, xmax, ymax: Bounding box coordinates in degrees
        level: S2 level (0-30)
        max_cells: Maximum number of cells to generate
    """
    cls.logger.info(
        f"Creating S2Cells from bounds: ({xmin}, {ymin}, {xmax}, {ymax}) at level: {level}"
    )

    # Create a LatLngRect for the bounding box
    rect = LatLngRect(
        LatLng.from_degrees(ymin, xmin), LatLng.from_degrees(ymax, xmax)
    )

    # Use RegionCoverer to get cells
    coverer = RegionCoverer()
    coverer.min_level = level
    coverer.max_level = level
    coverer.max_cells = max_cells

    covering = coverer.get_covering(rect)
    cells = [cell.id() for cell in covering]

    cls.logger.info(f"Generated {len(cells)} cells from bounds.")
    return cls(level=level, cells=cells)
from_cells(cells) classmethod

Create S2Cells from list of S2 cell IDs (integers or tokens).

Source code in gigaspatial/grid/s2.py
@classmethod
def from_cells(cls, cells: List[Union[int, str]]):
    """Create S2Cells from list of S2 cell IDs (integers or tokens)."""
    if not cells:
        cls.logger.warning("No cells provided to from_cells.")
        return cls(level=0, cells=[])

    cls.logger.info(f"Initializing S2Cells from {len(cells)} provided cells.")

    # Convert tokens to integers if needed
    cell_ids = []
    for cell in cells:
        if isinstance(cell, str):
            cell_ids.append(CellId.from_token(cell).id())
        else:
            cell_ids.append(cell)

    # Get level from first cell
    level = CellId(cell_ids[0]).level()
    return cls(level=level, cells=list(set(cell_ids)))
from_geometry(geometry, level, max_cells=1000, **kwargs) classmethod

Create S2Cells from a geometry.

Parameters:

Name Type Description Default
geometry BaseGeometry

Shapely geometry

required
level int

S2 level (0-30)

required
max_cells int

Maximum number of cells to generate

1000
Source code in gigaspatial/grid/s2.py
@classmethod
def from_geometry(
    cls,
    geometry: BaseGeometry,
    level: int,
    max_cells: int = 1000,
    **kwargs,
):
    """Create S2Cells from a geometry.

    Args:
        geometry: Shapely geometry
        level: S2 level (0-30)
        max_cells: Maximum number of cells to generate
    """
    cls.logger.info(
        f"Creating S2Cells from geometry (bounds: {geometry.bounds}) at level: {level}"
    )

    if isinstance(geometry, Point):
        return cls.from_points([geometry], level)

    # For polygons and other shapes, use bounding box with RegionCoverer
    # Then filter to actual intersection
    minx, miny, maxx, maxy = geometry.bounds

    rect = LatLngRect(
        LatLng.from_degrees(miny, minx), LatLng.from_degrees(maxy, maxx)
    )

    coverer = RegionCoverer()
    coverer.min_level = level
    coverer.max_level = level
    coverer.max_cells = max_cells

    covering = coverer.get_covering(rect)

    # Filter cells that actually intersect the geometry
    cells = []
    for cell_id in covering:
        cell = Cell(cell_id)
        # Create polygon from cell vertices
        vertices = []
        for i in range(4):
            vertex = cell.get_vertex(i)
            lat_lng = LatLng.from_point(vertex)
            vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))
        vertices.append(vertices[0])  # Close the polygon

        cell_polygon = Polygon(vertices)
        if cell_polygon.intersects(geometry):
            cells.append(cell_id.id())

    cls.logger.info(f"Generated {len(cells)} cells from geometry.")
    return cls(level=level, cells=cells, **kwargs)
from_json(data_store, file, **kwargs) classmethod

Load S2Cells from a JSON file.

Source code in gigaspatial/grid/s2.py
@classmethod
def from_json(
    cls, data_store: DataStore, file: Union[str, Path], **kwargs
) -> "S2Cells":
    """Load S2Cells from a JSON file."""
    cls.logger.info(
        f"Loading S2Cells from JSON file: {file} using data store: {type(data_store).__name__}"
    )

    with data_store.open(str(file), "r") as f:
        data = json.load(f)

    if isinstance(data, list):  # If file contains only cell IDs
        # Get level from first cell if available
        level = CellId(data[0]).level() if data else 0
        data = {
            "level": level,
            "cells": data,
            **kwargs,
        }
    else:
        data.update(kwargs)

    instance = cls(**data)
    instance.data_store = data_store
    cls.logger.info(
        f"Successfully loaded {len(instance.cells)} cells from JSON file."
    )
    return instance
from_points(points, level, **kwargs) classmethod

Create S2Cells from a list of points or lat-lon pairs.

Source code in gigaspatial/grid/s2.py
@classmethod
def from_points(
    cls, points: List[Union[Point, Tuple[float, float]]], level: int, **kwargs
) -> "S2Cells":
    """Create S2Cells from a list of points or lat-lon pairs."""
    cls.logger.info(f"Creating S2Cells from {len(points)} points at level: {level}")

    cells = set(cls.get_cells_from_points(points, level))
    cls.logger.info(f"Generated {len(cells)} unique cells from points.")
    return cls(level=level, cells=list(cells), **kwargs)
from_spatial(source, level, max_cells=1000, **kwargs) classmethod

Create S2Cells from various spatial sources.

Source code in gigaspatial/grid/s2.py
@classmethod
def from_spatial(
    cls,
    source: Union[
        BaseGeometry,
        gpd.GeoDataFrame,
        List[Union[Point, Tuple[float, float]]],
    ],
    level: int,
    max_cells: int = 1000,
    **kwargs,
):
    """Create S2Cells from various spatial sources."""
    cls.logger.info(
        f"Creating S2Cells from spatial source (type: {type(source)}) at level: {level}"
    )

    if isinstance(source, gpd.GeoDataFrame):
        if source.crs != "EPSG:4326":
            source = source.to_crs("EPSG:4326")
        is_point_series = source.geometry.geom_type == "Point"
        all_are_points = is_point_series.all()
        if all_are_points:
            source = source.geometry.to_list()
        else:
            source = source.geometry.unary_union

    if isinstance(source, BaseGeometry):
        return cls.from_geometry(
            geometry=source, level=level, max_cells=max_cells, **kwargs
        )
    elif isinstance(source, Iterable) and all(
        isinstance(pt, Point) or len(pt) == 2 for pt in source
    ):
        return cls.from_points(points=source, level=level, **kwargs)
    else:
        raise ValueError("Unsupported source type for S2Cells.from_spatial")
get_cells_from_points(points, level) staticmethod

Get list of S2 cell IDs for the provided points at specified level.

Parameters:

Name Type Description Default
points List[Union[Point, Tuple[float, float]]]

List of points as either shapely Points or (lon, lat) tuples

required
level int

S2 level

required

Returns:

Type Description
List[int]

List of S2 cell IDs as integers

Source code in gigaspatial/grid/s2.py
@staticmethod
def get_cells_from_points(
    points: List[Union[Point, Tuple[float, float]]], level: int
) -> List[int]:
    """Get list of S2 cell IDs for the provided points at specified level.

    Args:
        points: List of points as either shapely Points or (lon, lat) tuples
        level: S2 level

    Returns:
        List of S2 cell IDs as integers
    """
    cells = []
    for p in points:
        if isinstance(p, Point):
            # Shapely Point has x=lon, y=lat
            lat_lng = LatLng.from_degrees(p.y, p.x)
        else:
            # Assume tuple is (lon, lat)
            lat_lng = LatLng.from_degrees(p[1], p[0])

        cell_id = CellId.from_lat_lng(lat_lng).parent(level)
        cells.append(cell_id.id())

    return cells
get_children(target_level)

Get children cells at higher level.

Parameters:

Name Type Description Default
target_level int

Target level (must be higher than current)

required

Returns:

Type Description
S2Cells

New S2Cells instance with children at target level

Source code in gigaspatial/grid/s2.py
def get_children(self, target_level: int) -> "S2Cells":
    """Get children cells at higher level.

    Args:
        target_level: Target level (must be higher than current)

    Returns:
        New S2Cells instance with children at target level
    """
    if target_level <= self.level:
        raise ValueError("Target level must be higher than current level")

    self.logger.info(
        f"Getting children at level {target_level} for {len(self.cells)} cells."
    )

    all_children = []
    for cell_id in self.cells:
        cell = CellId(cell_id)
        # Get all children at target level
        child = cell.child_begin(target_level)
        end = cell.child_end(target_level)

        while child != end:
            all_children.append(child.id())
            child = child.next()

    self.logger.info(f"Generated {len(all_children)} children cells.")
    return S2Cells(level=target_level, cells=all_children)
get_neighbors(direct_only=True)

Get neighbors of all cells.

Parameters:

Name Type Description Default
direct_only bool

If True, get only direct edge neighbors (4 per cell). If False, get all 8 neighbors including corners.

True

Returns:

Type Description
S2Cells

New S2Cells instance with neighbors included

Source code in gigaspatial/grid/s2.py
def get_neighbors(self, direct_only: bool = True) -> "S2Cells":
    """Get neighbors of all cells.

    Args:
        direct_only: If True, get only direct edge neighbors (4 per cell).
                    If False, get all 8 neighbors including corners.

    Returns:
        New S2Cells instance with neighbors included
    """
    self.logger.info(
        f"Getting neighbors for {len(self.cells)} cells (direct_only={direct_only})."
    )

    all_neighbors = set()
    for cell_id in self.cells:
        cell = CellId(cell_id)
        # Get edge neighbors
        for i in range(4):
            neighbors = cell.get_edge_neighbors()
            all_neighbors.update([n.id() for n in neighbors])

        if not direct_only:
            # Get corner neighbors
            for i in range(4):
                vertex_neighbors = cell.get_vertex_neighbors(i)
                all_neighbors.update([n.id() for n in vertex_neighbors])

    self.logger.info(f"Found {len(all_neighbors)} total cells including neighbors.")

    return S2Cells(level=self.level, cells=list(all_neighbors))
get_parents(target_level)

Get parent cells at lower level.

Parameters:

Name Type Description Default
target_level int

Target level (must be lower than current)

required

Returns:

Type Description
S2Cells

New S2Cells instance with parents at target level

Source code in gigaspatial/grid/s2.py
def get_parents(self, target_level: int) -> "S2Cells":
    """Get parent cells at lower level.

    Args:
        target_level: Target level (must be lower than current)

    Returns:
        New S2Cells instance with parents at target level
    """
    if target_level >= self.level:
        raise ValueError("Target level must be lower than current level")

    self.logger.info(
        f"Getting parents at level {target_level} for {len(self.cells)} cells."
    )

    parents = set()
    for cell_id in self.cells:
        parent = CellId(cell_id).parent(target_level)
        parents.add(parent.id())

    self.logger.info(f"Generated {len(parents)} parent cells.")
    return S2Cells(level=target_level, cells=list(parents))
save(file, format='json')

Save S2Cells to file in specified format.

Source code in gigaspatial/grid/s2.py
def save(self, file: Union[str, Path], format: str = "json") -> None:
    """Save S2Cells to file in specified format."""
    with self.data_store.open(str(file), "wb" if format == "parquet" else "w") as f:
        if format == "parquet":
            self.to_geodataframe().to_parquet(f, index=False)
        elif format == "geojson":
            f.write(self.to_geodataframe().to_json(drop_id=True))
        elif format == "json":
            json.dump(self.cells, f)
        else:
            raise ValueError(f"Unsupported format: {format}")
to_dataframe()

Convert to pandas DataFrame with cell ID and centroid coordinates.

Source code in gigaspatial/grid/s2.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert to pandas DataFrame with cell ID and centroid coordinates."""
    self.logger.info(f"Converting {len(self.cells)} cells to pandas DataFrame.")

    if not self.cells:
        self.logger.warning(
            "No cells to convert to DataFrame. Returning empty DataFrame."
        )
        return pd.DataFrame(
            columns=["cell_id", "cell_token", "latitude", "longitude"]
        )

    data = []
    for cell_id in self.cells:
        cell = Cell(CellId(cell_id))
        center = LatLng.from_point(cell.get_center())
        data.append(
            {
                "cell_id": cell_id,
                "cell_token": CellId(cell_id).to_token(),
                "latitude": center.lat().degrees,
                "longitude": center.lng().degrees,
            }
        )

    self.logger.info(f"Successfully converted to DataFrame.")
    return pd.DataFrame(data)
to_geodataframe()

Convert to GeoPandas GeoDataFrame.

Source code in gigaspatial/grid/s2.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert to GeoPandas GeoDataFrame."""
    return gpd.GeoDataFrame(
        {
            "cell_id": self.cells,
            "cell_token": [CellId(c).to_token() for c in self.cells],
            "geometry": self.to_geoms(),
        },
        crs="EPSG:4326",
    )
to_geoms()

Convert cells to shapely Polygon geometries.

Source code in gigaspatial/grid/s2.py
def to_geoms(self) -> List[Polygon]:
    """Convert cells to shapely Polygon geometries."""
    self.logger.info(f"Converting {len(self.cells)} cells to geometries.")
    polygons = []
    invalid_count = 0

    for cell_id in self.cells:
        c_id = CellId(int(cell_id))
        cell = Cell(c_id)

        vertices = []
        for i in range(4):
            vertex = cell.get_vertex(i)
            lat_lng = LatLng.from_point(vertex)
            vertices.append((lat_lng.lng().degrees, lat_lng.lat().degrees))

        vertices.append(vertices[0])

        try:
            poly = Polygon(vertices)

            # Enforce CCW orientation
            poly = orient(poly, sign=1.0)

            # Fix projection artifacts
            if not poly.is_valid:
                invalid_count += 1
                poly = poly.buffer(0)

            # Verify after repair
            if not poly.is_valid:
                self.logger.warning(
                    f"Cell {c_id.to_token()} still invalid after buffer(0)"
                )
                continue

            polygons.append(poly)

        except Exception as e:
            self.logger.error(f"Failed to convert cell {c_id.to_token()}: {e}")

    if invalid_count > 0:
        self.logger.info(
            f"Repaired {invalid_count} invalid geometries via buffer(0)"
        )

    return polygons