Skip to content

Generators Module

gigaspatial.generators

poi

PoiViewGenerator

POI View Generator for integrating various geospatial datasets such as Google Open Buildings, Microsoft Global Buildings, GHSL Built Surface, and GHSL Settlement Model (SMOD) data with Points of Interest (POIs).

This class provides methods to load, process, and map external geospatial data to a given set of POIs, enriching them with relevant attributes. It leverages handler/reader classes for efficient data access and processing.

The POIs can be initialized from a list of (latitude, longitude) tuples, a list of dictionaries, a pandas DataFrame, or a geopandas GeoDataFrame.

Source code in gigaspatial/generators/poi.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
class PoiViewGenerator:
    """
    POI View Generator for integrating various geospatial datasets
    such as Google Open Buildings, Microsoft Global Buildings, GHSL Built Surface,
    and GHSL Settlement Model (SMOD) data with Points of Interest (POIs).

    This class provides methods to load, process, and map external geospatial
    data to a given set of POIs, enriching them with relevant attributes.
    It leverages handler/reader classes for efficient data access and processing.

    The POIs can be initialized from a list of (latitude, longitude) tuples,
    a list of dictionaries, a pandas DataFrame, or a geopandas GeoDataFrame.
    """

    def __init__(
        self,
        points: Union[
            List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame
        ],
        poi_id_column: str = "poi_id",
        config: Optional[PoiViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):
        """
        Initializes the PoiViewGenerator with the input points and configurations.

        The input `points` are converted into an internal GeoDataFrame
        (`_points_gdf`) for consistent geospatial operations.

        Args:
            points (Union[List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame]):
                The input points of interest. Can be:
                - A list of (latitude, longitude) tuples.
                - A list of dictionaries, where each dict must contain 'latitude' and 'longitude' keys.
                - A pandas DataFrame with 'latitude' and 'longitude' columns.
                - A geopandas GeoDataFrame (expected to have a 'geometry' column representing points).
            generator_config (Optional[PoiViewGeneratorConfig]):
                Configuration for the POI view generation process. If None, a
                default `PoiViewGeneratorConfig` will be used.
            data_store (Optional[DataStore]):
                An instance of a data store for managing data access (e.g., LocalDataStore).
                If None, a default `LocalDataStore` will be used.
        """
        if hasattr(points, "__len__") and len(points) == 0:
            raise ValueError("Points input cannot be empty")

        self.config = config or PoiViewGeneratorConfig()
        self.data_store = data_store or LocalDataStore()
        self.logger = logger or global_config.get_logger(self.__class__.__name__)
        self._points_gdf = self._init_points_gdf(points, poi_id_column)
        self._view: pd.DataFrame = self._points_gdf.drop(columns=["geometry"])

    @staticmethod
    def _init_points_gdf(
        points: Union[
            List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame
        ],
        poi_id_column: str,
    ) -> gpd.GeoDataFrame:
        """
        Internal static method to convert various point input formats into a GeoDataFrame.

        This method standardizes coordinate column names to 'latitude' and 'longitude'
        for consistent internal representation. It also ensures each point has a unique
        identifier in the 'poi_id' column.

        Args:
            points: Input points in various formats:
                - List of (latitude, longitude) tuples
                - List of dictionaries with coordinate keys
                - DataFrame with coordinate columns
                - GeoDataFrame with point geometries

        Returns:
            gpd.GeoDataFrame: Standardized GeoDataFrame with 'latitude', 'longitude',
                             and 'poi_id' columns

        Raises:
            ValueError: If points format is not supported or coordinate columns cannot be detected
        """
        if isinstance(points, gpd.GeoDataFrame):
            # Convert geometry to lat/lon if needed
            if points.geometry.name == "geometry":
                points = points.copy()
                points["latitude"] = points.geometry.y
                points["longitude"] = points.geometry.x
            if poi_id_column not in points.columns:
                points["poi_id"] = [f"poi_{i}" for i in range(len(points))]
            else:
                points = points.rename(
                    columns={poi_id_column: "poi_id"},
                )
                if points["poi_id"].duplicated().any():
                    raise ValueError(
                        f"Column '{poi_id_column}' provided as 'poi_id_column' contains duplicate values."
                    )

            if points.crs != "EPSG:4326":
                points = points.to_crs("EPSG:4326")
            return points

        elif isinstance(points, pd.DataFrame):
            # Detect and standardize coordinate columns
            try:
                lat_col, lon_col = detect_coordinate_columns(points)
                points = points.copy()
                points["latitude"] = points[lat_col]
                points["longitude"] = points[lon_col]
                if poi_id_column not in points.columns:
                    points["poi_id"] = [f"poi_{i}" for i in range(len(points))]
                else:
                    points = points.rename(
                        columns={poi_id_column: "poi_id"},
                    )
                    if points["poi_id"].duplicated().any():
                        raise ValueError(
                            f"Column '{poi_id_column}' provided as 'poi_id_column' contains duplicate values."
                        )
                return convert_to_geodataframe(points)
            except ValueError as e:
                raise ValueError(
                    f"Could not detect coordinate columns in DataFrame: {str(e)}"
                )

        elif isinstance(points, list):
            if len(points) == 0:
                return gpd.GeoDataFrame(
                    columns=["latitude", "longitude", "poi_id", "geometry"],
                    geometry="geometry",
                    crs="EPSG:4326",
                )

            if isinstance(points[0], tuple) and len(points[0]) == 2:
                # List of (lat, lon) tuples
                df = pd.DataFrame(points, columns=["latitude", "longitude"])
                df["poi_id"] = [f"poi_{i}" for i in range(len(points))]
                return convert_to_geodataframe(df)

            elif isinstance(points[0], dict):
                # List of dictionaries
                df = pd.DataFrame(points)
                try:
                    lat_col, lon_col = detect_coordinate_columns(df)
                    df["latitude"] = df[lat_col]
                    df["longitude"] = df[lon_col]
                    if poi_id_column not in df.columns:
                        df["poi_id"] = [f"poi_{i}" for i in range(len(points))]
                    else:
                        df = df.rename(
                            columns={poi_id_column: "poi_id"},
                        )
                        if df["poi_id"].duplicated().any():
                            raise ValueError(
                                f"Column '{poi_id_column}' provided as 'poi_id_column' contains duplicate values."
                            )
                    return convert_to_geodataframe(df)
                except ValueError as e:
                    raise ValueError(
                        f"Could not detect coordinate columns in dictionary list: {str(e)}"
                    )

        raise ValueError("Unsupported points input type for PoiViewGenerator.")

    @property
    def points_gdf(self) -> gpd.GeoDataFrame:
        """Gets the internal GeoDataFrame of points of interest."""
        return self._points_gdf

    @property
    def view(self) -> pd.DataFrame:
        """The DataFrame representing the current point of interest view."""
        return self._view

    def _update_view(self, new_data: pd.DataFrame) -> None:
        """
        Internal helper to update the main view DataFrame with new columns.
        This method is designed to be called by map_* methods.

        Args:
            new_data (pd.DataFrame): A DataFrame containing 'poi_id' and new columns
                                     to be merged into the main view.
        """
        if "poi_id" not in new_data.columns:
            available_cols = list(new_data.columns)
            raise ValueError(
                f"new_data DataFrame must contain 'poi_id' column. "
                f"Available columns: {available_cols}"
            )

        # Check for poi_id mismatches
        original_poi_ids = set(self._view["poi_id"])
        new_poi_ids = set(new_data["poi_id"])
        missing_pois = original_poi_ids - new_poi_ids

        if missing_pois:
            self.logger.warning(
                f"{len(missing_pois)} POIs will have NaN values for new columns"
            )

        # Ensure poi_id is the index for efficient merging
        # Create a copy to avoid SettingWithCopyWarning if new_data is a slice
        new_data_indexed = new_data.set_index("poi_id").copy()

        # Merge on 'poi_id' (which is now the index of self._view and new_data_indexed)
        # Using left join to keep all POIs from the original view
        self._view = (
            self._view.set_index("poi_id")
            .join(new_data_indexed, how="left")
            .reset_index()
        )

        self.logger.debug(
            f"View updated with columns: {list(new_data_indexed.columns)}"
        )

    def map_nearest_points(
        self,
        points_df: Union[pd.DataFrame, gpd.GeoDataFrame],
        id_column: str,
        lat_column: Optional[str] = None,
        lon_column: Optional[str] = None,
        output_prefix: str = "nearest",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps nearest points from a given DataFrame to the POIs.

        Enriches the `points_gdf` with the ID and distance to the nearest point
        from the input DataFrame for each POI.

        Args:
            points_df (Union[pd.DataFrame, gpd.GeoDataFrame]):
                DataFrame containing points to find nearest neighbors from.
                Must have latitude and longitude columns or point geometries.
            id_column (str):
                Name of the column containing unique identifiers for each point.
            lat_column (str, optional):
                Name of the latitude column in points_df. If None, will attempt to detect it
                or extract from geometry if points_df is a GeoDataFrame.
            lon_column (str, optional):
                Name of the longitude column in points_df. If None, will attempt to detect it
                or extract from geometry if points_df is a GeoDataFrame.
            output_prefix (str, optional):
                Prefix for the output column names. Defaults to "nearest".
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with new columns:
                          '{output_prefix}_id' and '{output_prefix}_distance'.
                          Returns a copy of the current `points_gdf` if no points are found.

        Raises:
            ValueError: If required columns are missing from points_df or if coordinate
                       columns cannot be detected or extracted from geometry.
        """
        self.logger.info(
            f"Mapping nearest points from {points_df.__class__.__name__} to POIs"
        )

        # Validate input DataFrame
        if points_df.empty:
            self.logger.info("No points found in the input DataFrame")
            return self.view

        # Handle GeoDataFrame
        if isinstance(points_df, gpd.GeoDataFrame):
            points_df = points_df.copy()
            if points_df.geometry.name == "geometry":
                points_df["latitude"] = points_df.geometry.y
                points_df["longitude"] = points_df.geometry.x
                lat_column = "latitude"
                lon_column = "longitude"
                self.logger.info("Extracted coordinates from geometry")

        # Detect coordinate columns if not provided
        if lat_column is None or lon_column is None:
            try:
                detected_lat, detected_lon = detect_coordinate_columns(points_df)
                lat_column = lat_column or detected_lat
                lon_column = lon_column or detected_lon
                self.logger.info(
                    f"Detected coordinate columns: {lat_column}, {lon_column}"
                )
            except ValueError as e:
                raise ValueError(f"Could not detect coordinate columns: {str(e)}")

        # Validate required columns
        required_columns = [lat_column, lon_column, id_column]
        missing_columns = [
            col for col in required_columns if col not in points_df.columns
        ]
        if missing_columns:
            raise ValueError(
                f"Missing required columns in points_df: {missing_columns}"
            )

        from gigaspatial.processing.geo import calculate_distance

        self.logger.info("Calculating nearest points for each POI")
        tree = cKDTree(points_df[[lat_column, lon_column]])
        points_df_poi = self.points_gdf.copy()
        _, idx = tree.query(points_df_poi[["latitude", "longitude"]], k=1)
        df_nearest = points_df.iloc[idx]
        dist = calculate_distance(
            lat1=points_df_poi.latitude,
            lon1=points_df_poi.longitude,
            lat2=df_nearest[lat_column],
            lon2=df_nearest[lon_column],
        )
        # Create a temporary DataFrame to hold the results for merging
        temp_result_df = pd.DataFrame(
            {
                "poi_id": points_df_poi["poi_id"],
                f"{output_prefix}_id": points_df.iloc[idx][id_column].values,
                f"{output_prefix}_distance": dist,
            }
        )
        self._update_view(temp_result_df)
        self.logger.info(
            f"Nearest points mapping complete with prefix '{output_prefix}'"
        )
        return self.view

    def map_google_buildings(
        self,
        handler: Optional[GoogleOpenBuildingsHandler] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps Google Open Buildings data to the POIs by finding the nearest building.

        Enriches the `points_gdf` with the ID and distance to the nearest
        Google Open Building for each POI.

        Args:
            data_config (Optional[GoogleOpenBuildingsConfig]):
                Configuration for accessing Google Open Buildings data. If None, a
                default `GoogleOpenBuildingsConfig` will be used.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with new columns:
                          'nearest_google_building_id' and 'nearest_google_building_distance'.
                          Returns a copy of the current `points_gdf` if no buildings are found.
        """
        self.logger.info("Mapping Google Open Buildings data to POIs")
        handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)

        self.logger.info("Loading Google Buildings point data")
        buildings_df = handler.load_points(
            self.points_gdf, ensure_available=self.config.ensure_available
        )
        if buildings_df is None or len(buildings_df) == 0:
            self.logger.info("No Google buildings data found for the provided POIs")
            return self.view

        return self.map_nearest_points(
            points_df=buildings_df,
            id_column="full_plus_code",
            output_prefix="nearest_google_building",
            **kwargs,
        )

    def map_ms_buildings(
        self,
        handler: Optional[MSBuildingsHandler] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps Microsoft Global Buildings data to the POIs by finding the nearest building.

        Enriches the `points_gdf` with the ID and distance to the nearest
        Microsoft Global Building for each POI. If buildings don't have an ID column,
        creates a unique ID using the building's coordinates.

        Args:
            data_config (Optional[MSBuildingsConfig]):
                Configuration for accessing Microsoft Global Buildings data. If None, a
                default `MSBuildingsConfig` will be used.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with new columns:
                          'nearest_ms_building_id' and 'nearest_ms_building_distance'.
                          Returns a copy of the current `points_gdf` if no buildings are found.
        """
        self.logger.info("Mapping Microsoft Global Buildings data to POIs")
        handler = handler or MSBuildingsHandler(data_store=self.data_store)
        self.logger.info("Loading Microsoft Buildings polygon data")
        buildings_gdf = handler.load_data(
            self.points_gdf, ensure_available=self.config.ensure_available
        )
        if buildings_gdf is None or len(buildings_gdf) == 0:
            self.logger.info("No Microsoft buildings data found for the provided POIs")
            return self.points_gdf.copy()

        building_centroids = get_centroids(buildings_gdf)

        if "building_id" not in buildings_gdf:
            self.logger.info("Creating building IDs from coordinates")
            building_centroids["building_id"] = building_centroids.apply(
                lambda row: f"{row.geometry.y:.6f}_{row.geometry.x:.6f}",
                axis=1,
            )

        return self.map_nearest_points(
            points_df=building_centroids,
            id_column="building_id",
            output_prefix="nearest_ms_building",
            **kwargs,
        )

    def map_zonal_stats(
        self,
        data: Union[List[TifProcessor], gpd.GeoDataFrame],
        stat: str = "mean",
        map_radius_meters: Optional[float] = None,
        output_column: str = "zonal_stat",
        value_column: Optional[str] = None,
        area_weighted: bool = False,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps zonal statistics from raster or polygon data to POIs.

        Can operate in three modes:
        1. Raster point sampling: Directly samples raster values at POI locations
        2. Raster zonal statistics: Creates buffers around POIs and calculates statistics within them
        3. Polygon aggregation: Aggregates polygon data to POI buffers with optional area weighting

        Args:
            data (Union[List[TifProcessor], gpd.GeoDataFrame]):
                Either a list of TifProcessor objects containing raster data to sample,
                or a GeoDataFrame containing polygon data to aggregate.
            stat (str, optional):
                For raster data: Statistic to calculate ("sum", "mean", "median", "min", "max").
                For polygon data: Aggregation method to use.
                Defaults to "mean".
            map_radius_meters (float, optional):
                If provided, creates circular buffers of this radius around each POI
                and calculates statistics within the buffers. If None, samples directly
                at POI locations (only for raster data).
            output_column (str, optional):
                Name of the output column to store the results. Defaults to "zonal_stat".
            value_column (str, optional):
                For polygon data: Name of the column to aggregate. Required for polygon data.
                Not used for raster data.
            area_weighted (bool, optional):
                For polygon data: Whether to weight values by fractional area of
                intersection. Defaults to False.
            **kwargs:
                Additional keyword arguments passed to the sampling/aggregation functions.

        Returns:
            pd.DataFrame: The updated GeoDataFrame with a new column containing the
                          calculated statistics. Returns a copy of the current `points_gdf`
                          if no valid data is found.

        Raises:
            ValueError: If no valid data is provided, if parameters are incompatible,
                      or if required parameters (value_column) are missing for polygon data.
        """

        if isinstance(data, list) and all(isinstance(x, TifProcessor) for x in data):
            results_df = pd.DataFrame({"poi_id": self.points_gdf["poi_id"]})

            # Handle raster data
            if not data:
                self.logger.info("No valid raster data found for the provided POIs")
                return self.view

            raster_crs = data[0].crs

            if not all(tp.crs == raster_crs for tp in data):
                raise ValueError(
                    "All TifProcessors must have the same CRS for zonal statistics."
                )

            if map_radius_meters is not None:
                self.logger.info(
                    f"Calculating {stat} within {map_radius_meters}m buffers around POIs"
                )
                # Create buffers around POIs
                buffers_gdf = buffer_geodataframe(
                    self.points_gdf,
                    buffer_distance_meters=map_radius_meters,
                    cap_style="round",
                )

                # Calculate zonal statistics
                sampled_values = sample_multiple_tifs_by_polygons(
                    tif_processors=data,
                    polygon_list=buffers_gdf.to_crs(raster_crs).geometry,
                    stat=stat,
                    **kwargs,
                )
            else:
                self.logger.info(f"Sampling {stat} at POI locations")
                # Sample directly at POI locations
                coord_list = (
                    self.points_gdf.to_crs(raster_crs).get_coordinates().to_numpy()
                )
                sampled_values = sample_multiple_tifs_by_coordinates(
                    tif_processors=data, coordinate_list=coord_list, **kwargs
                )

            results_df[output_column] = sampled_values

        elif isinstance(data, gpd.GeoDataFrame):
            # Handle polygon data
            if data.empty:
                self.logger.info("No valid polygon data found for the provided POIs")
                return self.points_gdf.copy()

            if map_radius_meters is None:
                raise ValueError("map_radius_meters must be provided for polygon data")

            if value_column is None:
                raise ValueError("value_column must be provided for polygon data")

            if value_column not in data.columns:
                raise ValueError(
                    f"Value column '{value_column}' not found in input polygon GeoDataFrame."
                )

            self.logger.info(
                f"Aggregating {value_column} within {map_radius_meters}m buffers around POIs"
            )

            # Create buffers around POIs
            buffer_gdf = buffer_geodataframe(
                self.points_gdf,
                buffer_distance_meters=map_radius_meters,
                cap_style="round",
            )

            # Aggregate polygons to buffers
            aggregation_result_gdf = aggregate_polygons_to_zones(
                polygons=data,
                zones=buffer_gdf,
                value_columns=value_column,
                aggregation=stat,
                area_weighted=area_weighted,
                zone_id_column="poi_id",
                **kwargs,
            )

            results_df = aggregation_result_gdf[["poi_id", value_column]].copy()

        else:
            raise ValueError(
                "data must be either a list of TifProcessor objects or a GeoDataFrame"
            )

        self._update_view(results_df)
        self.logger.info(
            f"Zonal statistics mapping complete for column(s) derived from '{output_column}' or '{value_column}'"
        )
        return self.view

    def map_built_s(
        self,
        map_radius_meters: float = 150,
        stat: str = "sum",
        dataset_year=2020,
        dataset_resolution=100,
        output_column="built_surface_m2",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps GHSL Built Surface (GHS_BUILT_S) data to the POIs.

        Calculates the sum of built surface area within a specified buffer
        radius around each POI. Enriches `points_gdf` with the 'built_surface_m2' column.

        Args:
            data_config (Optional[GHSLDataConfig]):
                Configuration for accessing GHSL Built Surface data. If None, a
                default `GHSLDataConfig` for 'GHS_BUILT_S' will be used.
            map_radius_meters (float):
                The buffer distance in meters around each POI to calculate
                zonal statistics for built surface. Defaults to 150 meters.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with a new column:
                          'built_surface_m2'. Returns a copy of the current
                          `points_gdf` if no GHSL Built Surface data is found.
        """
        self.logger.info("Mapping GHSL Built Surface data to POIs")
        handler = GHSLDataHandler(
            product="GHS_BUILT_S",
            year=dataset_year,
            resolution=dataset_resolution,
            data_store=self.data_store,
            **kwargs,
        )
        self.logger.info("Loading GHSL Built Surface raster tiles")
        tif_processors = handler.load_data(
            self.points_gdf.copy(), ensure_available=self.config.ensure_available
        )

        return self.map_zonal_stats(
            data=tif_processors,
            stat=stat,
            map_radius_meters=map_radius_meters,
            output_column=output_column,
            **kwargs,
        )

    def map_smod(
        self,
        stat="median",
        dataset_year=2020,
        dataset_resolution=1000,
        output_column="smod_class",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps GHSL Settlement Model (SMOD) data to the POIs.

        Samples the SMOD class value at each POI's location. Enriches `points_gdf`
        with the 'smod_class' column.

        Args:
            data_config (Optional[GHSLDataConfig]):
                Configuration for accessing GHSL SMOD data. If None, a
                default `GHSLDataConfig` for 'GHS_SMOD' will be used.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with a new column:
                          'smod_class'. Returns a copy of the current
                          `points_gdf` if no GHSL SMOD data is found.
        """
        self.logger.info("Mapping GHSL Settlement Model (SMOD) data to POIs")
        handler = GHSLDataHandler(
            product="GHS_SMOD",
            year=dataset_year,
            resolution=dataset_resolution,
            data_store=self.data_store,
            coord_system=54009,
            **kwargs,
        )

        self.logger.info("Loading GHSL SMOD raster tiles")
        tif_processors = handler.load_data(
            self.points_gdf.copy(), ensure_available=self.config.ensure_available
        )

        return self.map_zonal_stats(
            data=tif_processors,
            stat=stat,  # Use median for categorical data
            output_column=output_column,
            **kwargs,
        )

    def save_view(
        self,
        name: str,
        output_format: Optional[str] = None,
    ) -> Path:
        """
        Saves the current POI view (the enriched DataFrame) to a file.

        The output path and format are determined by the `config`
        or overridden by the `output_format` parameter.

        Args:
            name (str): The base name for the output file (without extension).
            output_format (Optional[str]):
                The desired output format (e.g., "csv", "geojson"). If None,
                the `output_format` from `config` will be used.

        Returns:
            Path: The full path to the saved output file.
        """
        format_to_use = output_format or self.config.output_format
        output_path = self.config.base_path / f"{name}.{format_to_use}"

        self.logger.info(f"Saving POI view to {output_path}")
        # Save the current view, which is a pandas DataFrame, not a GeoDataFrame
        # GeoJSON/Shapefile formats would require converting back to GeoDataFrame first.
        # For CSV, Parquet, Feather, this is fine.
        if format_to_use in ["geojson", "shp", "gpkg"]:
            self.logger.warning(
                f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
            )
            # Re-add geometry for saving to geospatial formats
            view_to_save_gdf = self.view.merge(
                self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
            )
            write_dataset(
                data=view_to_save_gdf,
                path=str(output_path),
                data_store=self.data_store,
            )
        else:
            write_dataset(
                data=self.view,  # Use the internal _view DataFrame
                path=str(output_path),
                data_store=self.data_store,
            )

        return output_path

    def to_dataframe(self) -> pd.DataFrame:
        """
        Returns the current POI view as a DataFrame.

        This method combines all accumulated variables in the view

        Returns:
            pd.DataFrame: The current view.
        """
        return self.view

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Returns the current POI view merged with the original point geometries as a GeoDataFrame.

        This method combines all accumulated variables in the view with the corresponding
        point geometries, providing a spatially-enabled DataFrame for further analysis or export.

        Returns:
            gpd.GeoDataFrame: The current view merged with point geometries.
        """
        return self.view.merge(
            self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
        )

    def chain_operations(self, operations: List[dict]) -> "PoiViewGenerator":
        """
        Chain multiple mapping operations for fluent interface.

        Args:
            operations: List of dicts with 'method' and 'kwargs' keys

        Example:
            generator.chain_operations([
                {'method': 'map_google_buildings', 'kwargs': {}},
                {'method': 'map_built_s', 'kwargs': {'map_radius_meters': 200}},
            ])
        """
        for op in operations:
            method_name = op["method"]
            kwargs = op.get("kwargs", {})
            if hasattr(self, method_name):
                getattr(self, method_name)(**kwargs)
            else:
                raise AttributeError(f"Method {method_name} not found")
        return self

    def validate_data_coverage(self, data_bounds: gpd.GeoDataFrame) -> dict:
        """
        Validate how many POIs fall within the data coverage area.

        Returns:
            dict: Coverage statistics
        """
        poi_within = self.points_gdf.within(data_bounds.union_all())
        coverage_stats = {
            "total_pois": len(self.points_gdf),
            "covered_pois": poi_within.sum(),
            "coverage_percentage": (poi_within.sum() / len(self.points_gdf)) * 100,
            "uncovered_pois": (~poi_within).sum(),
        }
        return coverage_stats
points_gdf: gpd.GeoDataFrame property

Gets the internal GeoDataFrame of points of interest.

view: pd.DataFrame property

The DataFrame representing the current point of interest view.

__init__(points, poi_id_column='poi_id', config=None, data_store=None, logger=None)

Initializes the PoiViewGenerator with the input points and configurations.

The input points are converted into an internal GeoDataFrame (_points_gdf) for consistent geospatial operations.

Parameters:

Name Type Description Default
points Union[List[Tuple[float, float]], List[dict], DataFrame, GeoDataFrame]

The input points of interest. Can be: - A list of (latitude, longitude) tuples. - A list of dictionaries, where each dict must contain 'latitude' and 'longitude' keys. - A pandas DataFrame with 'latitude' and 'longitude' columns. - A geopandas GeoDataFrame (expected to have a 'geometry' column representing points).

required
generator_config Optional[PoiViewGeneratorConfig]

Configuration for the POI view generation process. If None, a default PoiViewGeneratorConfig will be used.

required
data_store Optional[DataStore]

An instance of a data store for managing data access (e.g., LocalDataStore). If None, a default LocalDataStore will be used.

None
Source code in gigaspatial/generators/poi.py
def __init__(
    self,
    points: Union[
        List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame
    ],
    poi_id_column: str = "poi_id",
    config: Optional[PoiViewGeneratorConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: logging.Logger = None,
):
    """
    Initializes the PoiViewGenerator with the input points and configurations.

    The input `points` are converted into an internal GeoDataFrame
    (`_points_gdf`) for consistent geospatial operations.

    Args:
        points (Union[List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame]):
            The input points of interest. Can be:
            - A list of (latitude, longitude) tuples.
            - A list of dictionaries, where each dict must contain 'latitude' and 'longitude' keys.
            - A pandas DataFrame with 'latitude' and 'longitude' columns.
            - A geopandas GeoDataFrame (expected to have a 'geometry' column representing points).
        generator_config (Optional[PoiViewGeneratorConfig]):
            Configuration for the POI view generation process. If None, a
            default `PoiViewGeneratorConfig` will be used.
        data_store (Optional[DataStore]):
            An instance of a data store for managing data access (e.g., LocalDataStore).
            If None, a default `LocalDataStore` will be used.
    """
    if hasattr(points, "__len__") and len(points) == 0:
        raise ValueError("Points input cannot be empty")

    self.config = config or PoiViewGeneratorConfig()
    self.data_store = data_store or LocalDataStore()
    self.logger = logger or global_config.get_logger(self.__class__.__name__)
    self._points_gdf = self._init_points_gdf(points, poi_id_column)
    self._view: pd.DataFrame = self._points_gdf.drop(columns=["geometry"])
chain_operations(operations)

Chain multiple mapping operations for fluent interface.

Parameters:

Name Type Description Default
operations List[dict]

List of dicts with 'method' and 'kwargs' keys

required
Example

generator.chain_operations([ {'method': 'map_google_buildings', 'kwargs': {}}, {'method': 'map_built_s', 'kwargs': {'map_radius_meters': 200}}, ])

Source code in gigaspatial/generators/poi.py
def chain_operations(self, operations: List[dict]) -> "PoiViewGenerator":
    """
    Chain multiple mapping operations for fluent interface.

    Args:
        operations: List of dicts with 'method' and 'kwargs' keys

    Example:
        generator.chain_operations([
            {'method': 'map_google_buildings', 'kwargs': {}},
            {'method': 'map_built_s', 'kwargs': {'map_radius_meters': 200}},
        ])
    """
    for op in operations:
        method_name = op["method"]
        kwargs = op.get("kwargs", {})
        if hasattr(self, method_name):
            getattr(self, method_name)(**kwargs)
        else:
            raise AttributeError(f"Method {method_name} not found")
    return self
map_built_s(map_radius_meters=150, stat='sum', dataset_year=2020, dataset_resolution=100, output_column='built_surface_m2', **kwargs)

Maps GHSL Built Surface (GHS_BUILT_S) data to the POIs.

Calculates the sum of built surface area within a specified buffer radius around each POI. Enriches points_gdf with the 'built_surface_m2' column.

Parameters:

Name Type Description Default
data_config Optional[GHSLDataConfig]

Configuration for accessing GHSL Built Surface data. If None, a default GHSLDataConfig for 'GHS_BUILT_S' will be used.

required
map_radius_meters float

The buffer distance in meters around each POI to calculate zonal statistics for built surface. Defaults to 150 meters.

150
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with a new column: 'built_surface_m2'. Returns a copy of the current points_gdf if no GHSL Built Surface data is found.

Source code in gigaspatial/generators/poi.py
def map_built_s(
    self,
    map_radius_meters: float = 150,
    stat: str = "sum",
    dataset_year=2020,
    dataset_resolution=100,
    output_column="built_surface_m2",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps GHSL Built Surface (GHS_BUILT_S) data to the POIs.

    Calculates the sum of built surface area within a specified buffer
    radius around each POI. Enriches `points_gdf` with the 'built_surface_m2' column.

    Args:
        data_config (Optional[GHSLDataConfig]):
            Configuration for accessing GHSL Built Surface data. If None, a
            default `GHSLDataConfig` for 'GHS_BUILT_S' will be used.
        map_radius_meters (float):
            The buffer distance in meters around each POI to calculate
            zonal statistics for built surface. Defaults to 150 meters.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with a new column:
                      'built_surface_m2'. Returns a copy of the current
                      `points_gdf` if no GHSL Built Surface data is found.
    """
    self.logger.info("Mapping GHSL Built Surface data to POIs")
    handler = GHSLDataHandler(
        product="GHS_BUILT_S",
        year=dataset_year,
        resolution=dataset_resolution,
        data_store=self.data_store,
        **kwargs,
    )
    self.logger.info("Loading GHSL Built Surface raster tiles")
    tif_processors = handler.load_data(
        self.points_gdf.copy(), ensure_available=self.config.ensure_available
    )

    return self.map_zonal_stats(
        data=tif_processors,
        stat=stat,
        map_radius_meters=map_radius_meters,
        output_column=output_column,
        **kwargs,
    )
map_google_buildings(handler=None, **kwargs)

Maps Google Open Buildings data to the POIs by finding the nearest building.

Enriches the points_gdf with the ID and distance to the nearest Google Open Building for each POI.

Parameters:

Name Type Description Default
data_config Optional[GoogleOpenBuildingsConfig]

Configuration for accessing Google Open Buildings data. If None, a default GoogleOpenBuildingsConfig will be used.

required
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with new columns: 'nearest_google_building_id' and 'nearest_google_building_distance'. Returns a copy of the current points_gdf if no buildings are found.

Source code in gigaspatial/generators/poi.py
def map_google_buildings(
    self,
    handler: Optional[GoogleOpenBuildingsHandler] = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Maps Google Open Buildings data to the POIs by finding the nearest building.

    Enriches the `points_gdf` with the ID and distance to the nearest
    Google Open Building for each POI.

    Args:
        data_config (Optional[GoogleOpenBuildingsConfig]):
            Configuration for accessing Google Open Buildings data. If None, a
            default `GoogleOpenBuildingsConfig` will be used.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with new columns:
                      'nearest_google_building_id' and 'nearest_google_building_distance'.
                      Returns a copy of the current `points_gdf` if no buildings are found.
    """
    self.logger.info("Mapping Google Open Buildings data to POIs")
    handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)

    self.logger.info("Loading Google Buildings point data")
    buildings_df = handler.load_points(
        self.points_gdf, ensure_available=self.config.ensure_available
    )
    if buildings_df is None or len(buildings_df) == 0:
        self.logger.info("No Google buildings data found for the provided POIs")
        return self.view

    return self.map_nearest_points(
        points_df=buildings_df,
        id_column="full_plus_code",
        output_prefix="nearest_google_building",
        **kwargs,
    )
map_ms_buildings(handler=None, **kwargs)

Maps Microsoft Global Buildings data to the POIs by finding the nearest building.

Enriches the points_gdf with the ID and distance to the nearest Microsoft Global Building for each POI. If buildings don't have an ID column, creates a unique ID using the building's coordinates.

Parameters:

Name Type Description Default
data_config Optional[MSBuildingsConfig]

Configuration for accessing Microsoft Global Buildings data. If None, a default MSBuildingsConfig will be used.

required
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with new columns: 'nearest_ms_building_id' and 'nearest_ms_building_distance'. Returns a copy of the current points_gdf if no buildings are found.

Source code in gigaspatial/generators/poi.py
def map_ms_buildings(
    self,
    handler: Optional[MSBuildingsHandler] = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Maps Microsoft Global Buildings data to the POIs by finding the nearest building.

    Enriches the `points_gdf` with the ID and distance to the nearest
    Microsoft Global Building for each POI. If buildings don't have an ID column,
    creates a unique ID using the building's coordinates.

    Args:
        data_config (Optional[MSBuildingsConfig]):
            Configuration for accessing Microsoft Global Buildings data. If None, a
            default `MSBuildingsConfig` will be used.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with new columns:
                      'nearest_ms_building_id' and 'nearest_ms_building_distance'.
                      Returns a copy of the current `points_gdf` if no buildings are found.
    """
    self.logger.info("Mapping Microsoft Global Buildings data to POIs")
    handler = handler or MSBuildingsHandler(data_store=self.data_store)
    self.logger.info("Loading Microsoft Buildings polygon data")
    buildings_gdf = handler.load_data(
        self.points_gdf, ensure_available=self.config.ensure_available
    )
    if buildings_gdf is None or len(buildings_gdf) == 0:
        self.logger.info("No Microsoft buildings data found for the provided POIs")
        return self.points_gdf.copy()

    building_centroids = get_centroids(buildings_gdf)

    if "building_id" not in buildings_gdf:
        self.logger.info("Creating building IDs from coordinates")
        building_centroids["building_id"] = building_centroids.apply(
            lambda row: f"{row.geometry.y:.6f}_{row.geometry.x:.6f}",
            axis=1,
        )

    return self.map_nearest_points(
        points_df=building_centroids,
        id_column="building_id",
        output_prefix="nearest_ms_building",
        **kwargs,
    )
map_nearest_points(points_df, id_column, lat_column=None, lon_column=None, output_prefix='nearest', **kwargs)

Maps nearest points from a given DataFrame to the POIs.

Enriches the points_gdf with the ID and distance to the nearest point from the input DataFrame for each POI.

Parameters:

Name Type Description Default
points_df Union[DataFrame, GeoDataFrame]

DataFrame containing points to find nearest neighbors from. Must have latitude and longitude columns or point geometries.

required
id_column str

Name of the column containing unique identifiers for each point.

required
lat_column str

Name of the latitude column in points_df. If None, will attempt to detect it or extract from geometry if points_df is a GeoDataFrame.

None
lon_column str

Name of the longitude column in points_df. If None, will attempt to detect it or extract from geometry if points_df is a GeoDataFrame.

None
output_prefix str

Prefix for the output column names. Defaults to "nearest".

'nearest'
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with new columns: '{output_prefix}_id' and '{output_prefix}_distance'. Returns a copy of the current points_gdf if no points are found.

Raises:

Type Description
ValueError

If required columns are missing from points_df or if coordinate columns cannot be detected or extracted from geometry.

Source code in gigaspatial/generators/poi.py
def map_nearest_points(
    self,
    points_df: Union[pd.DataFrame, gpd.GeoDataFrame],
    id_column: str,
    lat_column: Optional[str] = None,
    lon_column: Optional[str] = None,
    output_prefix: str = "nearest",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps nearest points from a given DataFrame to the POIs.

    Enriches the `points_gdf` with the ID and distance to the nearest point
    from the input DataFrame for each POI.

    Args:
        points_df (Union[pd.DataFrame, gpd.GeoDataFrame]):
            DataFrame containing points to find nearest neighbors from.
            Must have latitude and longitude columns or point geometries.
        id_column (str):
            Name of the column containing unique identifiers for each point.
        lat_column (str, optional):
            Name of the latitude column in points_df. If None, will attempt to detect it
            or extract from geometry if points_df is a GeoDataFrame.
        lon_column (str, optional):
            Name of the longitude column in points_df. If None, will attempt to detect it
            or extract from geometry if points_df is a GeoDataFrame.
        output_prefix (str, optional):
            Prefix for the output column names. Defaults to "nearest".
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with new columns:
                      '{output_prefix}_id' and '{output_prefix}_distance'.
                      Returns a copy of the current `points_gdf` if no points are found.

    Raises:
        ValueError: If required columns are missing from points_df or if coordinate
                   columns cannot be detected or extracted from geometry.
    """
    self.logger.info(
        f"Mapping nearest points from {points_df.__class__.__name__} to POIs"
    )

    # Validate input DataFrame
    if points_df.empty:
        self.logger.info("No points found in the input DataFrame")
        return self.view

    # Handle GeoDataFrame
    if isinstance(points_df, gpd.GeoDataFrame):
        points_df = points_df.copy()
        if points_df.geometry.name == "geometry":
            points_df["latitude"] = points_df.geometry.y
            points_df["longitude"] = points_df.geometry.x
            lat_column = "latitude"
            lon_column = "longitude"
            self.logger.info("Extracted coordinates from geometry")

    # Detect coordinate columns if not provided
    if lat_column is None or lon_column is None:
        try:
            detected_lat, detected_lon = detect_coordinate_columns(points_df)
            lat_column = lat_column or detected_lat
            lon_column = lon_column or detected_lon
            self.logger.info(
                f"Detected coordinate columns: {lat_column}, {lon_column}"
            )
        except ValueError as e:
            raise ValueError(f"Could not detect coordinate columns: {str(e)}")

    # Validate required columns
    required_columns = [lat_column, lon_column, id_column]
    missing_columns = [
        col for col in required_columns if col not in points_df.columns
    ]
    if missing_columns:
        raise ValueError(
            f"Missing required columns in points_df: {missing_columns}"
        )

    from gigaspatial.processing.geo import calculate_distance

    self.logger.info("Calculating nearest points for each POI")
    tree = cKDTree(points_df[[lat_column, lon_column]])
    points_df_poi = self.points_gdf.copy()
    _, idx = tree.query(points_df_poi[["latitude", "longitude"]], k=1)
    df_nearest = points_df.iloc[idx]
    dist = calculate_distance(
        lat1=points_df_poi.latitude,
        lon1=points_df_poi.longitude,
        lat2=df_nearest[lat_column],
        lon2=df_nearest[lon_column],
    )
    # Create a temporary DataFrame to hold the results for merging
    temp_result_df = pd.DataFrame(
        {
            "poi_id": points_df_poi["poi_id"],
            f"{output_prefix}_id": points_df.iloc[idx][id_column].values,
            f"{output_prefix}_distance": dist,
        }
    )
    self._update_view(temp_result_df)
    self.logger.info(
        f"Nearest points mapping complete with prefix '{output_prefix}'"
    )
    return self.view
map_smod(stat='median', dataset_year=2020, dataset_resolution=1000, output_column='smod_class', **kwargs)

Maps GHSL Settlement Model (SMOD) data to the POIs.

Samples the SMOD class value at each POI's location. Enriches points_gdf with the 'smod_class' column.

Parameters:

Name Type Description Default
data_config Optional[GHSLDataConfig]

Configuration for accessing GHSL SMOD data. If None, a default GHSLDataConfig for 'GHS_SMOD' will be used.

required
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with a new column: 'smod_class'. Returns a copy of the current points_gdf if no GHSL SMOD data is found.

Source code in gigaspatial/generators/poi.py
def map_smod(
    self,
    stat="median",
    dataset_year=2020,
    dataset_resolution=1000,
    output_column="smod_class",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps GHSL Settlement Model (SMOD) data to the POIs.

    Samples the SMOD class value at each POI's location. Enriches `points_gdf`
    with the 'smod_class' column.

    Args:
        data_config (Optional[GHSLDataConfig]):
            Configuration for accessing GHSL SMOD data. If None, a
            default `GHSLDataConfig` for 'GHS_SMOD' will be used.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with a new column:
                      'smod_class'. Returns a copy of the current
                      `points_gdf` if no GHSL SMOD data is found.
    """
    self.logger.info("Mapping GHSL Settlement Model (SMOD) data to POIs")
    handler = GHSLDataHandler(
        product="GHS_SMOD",
        year=dataset_year,
        resolution=dataset_resolution,
        data_store=self.data_store,
        coord_system=54009,
        **kwargs,
    )

    self.logger.info("Loading GHSL SMOD raster tiles")
    tif_processors = handler.load_data(
        self.points_gdf.copy(), ensure_available=self.config.ensure_available
    )

    return self.map_zonal_stats(
        data=tif_processors,
        stat=stat,  # Use median for categorical data
        output_column=output_column,
        **kwargs,
    )
map_zonal_stats(data, stat='mean', map_radius_meters=None, output_column='zonal_stat', value_column=None, area_weighted=False, **kwargs)

Maps zonal statistics from raster or polygon data to POIs.

Can operate in three modes: 1. Raster point sampling: Directly samples raster values at POI locations 2. Raster zonal statistics: Creates buffers around POIs and calculates statistics within them 3. Polygon aggregation: Aggregates polygon data to POI buffers with optional area weighting

Parameters:

Name Type Description Default
data Union[List[TifProcessor], GeoDataFrame]

Either a list of TifProcessor objects containing raster data to sample, or a GeoDataFrame containing polygon data to aggregate.

required
stat str

For raster data: Statistic to calculate ("sum", "mean", "median", "min", "max"). For polygon data: Aggregation method to use. Defaults to "mean".

'mean'
map_radius_meters float

If provided, creates circular buffers of this radius around each POI and calculates statistics within the buffers. If None, samples directly at POI locations (only for raster data).

None
output_column str

Name of the output column to store the results. Defaults to "zonal_stat".

'zonal_stat'
value_column str

For polygon data: Name of the column to aggregate. Required for polygon data. Not used for raster data.

None
area_weighted bool

For polygon data: Whether to weight values by fractional area of intersection. Defaults to False.

False
**kwargs

Additional keyword arguments passed to the sampling/aggregation functions.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with a new column containing the calculated statistics. Returns a copy of the current points_gdf if no valid data is found.

Raises:

Type Description
ValueError

If no valid data is provided, if parameters are incompatible, or if required parameters (value_column) are missing for polygon data.

Source code in gigaspatial/generators/poi.py
def map_zonal_stats(
    self,
    data: Union[List[TifProcessor], gpd.GeoDataFrame],
    stat: str = "mean",
    map_radius_meters: Optional[float] = None,
    output_column: str = "zonal_stat",
    value_column: Optional[str] = None,
    area_weighted: bool = False,
    **kwargs,
) -> pd.DataFrame:
    """
    Maps zonal statistics from raster or polygon data to POIs.

    Can operate in three modes:
    1. Raster point sampling: Directly samples raster values at POI locations
    2. Raster zonal statistics: Creates buffers around POIs and calculates statistics within them
    3. Polygon aggregation: Aggregates polygon data to POI buffers with optional area weighting

    Args:
        data (Union[List[TifProcessor], gpd.GeoDataFrame]):
            Either a list of TifProcessor objects containing raster data to sample,
            or a GeoDataFrame containing polygon data to aggregate.
        stat (str, optional):
            For raster data: Statistic to calculate ("sum", "mean", "median", "min", "max").
            For polygon data: Aggregation method to use.
            Defaults to "mean".
        map_radius_meters (float, optional):
            If provided, creates circular buffers of this radius around each POI
            and calculates statistics within the buffers. If None, samples directly
            at POI locations (only for raster data).
        output_column (str, optional):
            Name of the output column to store the results. Defaults to "zonal_stat".
        value_column (str, optional):
            For polygon data: Name of the column to aggregate. Required for polygon data.
            Not used for raster data.
        area_weighted (bool, optional):
            For polygon data: Whether to weight values by fractional area of
            intersection. Defaults to False.
        **kwargs:
            Additional keyword arguments passed to the sampling/aggregation functions.

    Returns:
        pd.DataFrame: The updated GeoDataFrame with a new column containing the
                      calculated statistics. Returns a copy of the current `points_gdf`
                      if no valid data is found.

    Raises:
        ValueError: If no valid data is provided, if parameters are incompatible,
                  or if required parameters (value_column) are missing for polygon data.
    """

    if isinstance(data, list) and all(isinstance(x, TifProcessor) for x in data):
        results_df = pd.DataFrame({"poi_id": self.points_gdf["poi_id"]})

        # Handle raster data
        if not data:
            self.logger.info("No valid raster data found for the provided POIs")
            return self.view

        raster_crs = data[0].crs

        if not all(tp.crs == raster_crs for tp in data):
            raise ValueError(
                "All TifProcessors must have the same CRS for zonal statistics."
            )

        if map_radius_meters is not None:
            self.logger.info(
                f"Calculating {stat} within {map_radius_meters}m buffers around POIs"
            )
            # Create buffers around POIs
            buffers_gdf = buffer_geodataframe(
                self.points_gdf,
                buffer_distance_meters=map_radius_meters,
                cap_style="round",
            )

            # Calculate zonal statistics
            sampled_values = sample_multiple_tifs_by_polygons(
                tif_processors=data,
                polygon_list=buffers_gdf.to_crs(raster_crs).geometry,
                stat=stat,
                **kwargs,
            )
        else:
            self.logger.info(f"Sampling {stat} at POI locations")
            # Sample directly at POI locations
            coord_list = (
                self.points_gdf.to_crs(raster_crs).get_coordinates().to_numpy()
            )
            sampled_values = sample_multiple_tifs_by_coordinates(
                tif_processors=data, coordinate_list=coord_list, **kwargs
            )

        results_df[output_column] = sampled_values

    elif isinstance(data, gpd.GeoDataFrame):
        # Handle polygon data
        if data.empty:
            self.logger.info("No valid polygon data found for the provided POIs")
            return self.points_gdf.copy()

        if map_radius_meters is None:
            raise ValueError("map_radius_meters must be provided for polygon data")

        if value_column is None:
            raise ValueError("value_column must be provided for polygon data")

        if value_column not in data.columns:
            raise ValueError(
                f"Value column '{value_column}' not found in input polygon GeoDataFrame."
            )

        self.logger.info(
            f"Aggregating {value_column} within {map_radius_meters}m buffers around POIs"
        )

        # Create buffers around POIs
        buffer_gdf = buffer_geodataframe(
            self.points_gdf,
            buffer_distance_meters=map_radius_meters,
            cap_style="round",
        )

        # Aggregate polygons to buffers
        aggregation_result_gdf = aggregate_polygons_to_zones(
            polygons=data,
            zones=buffer_gdf,
            value_columns=value_column,
            aggregation=stat,
            area_weighted=area_weighted,
            zone_id_column="poi_id",
            **kwargs,
        )

        results_df = aggregation_result_gdf[["poi_id", value_column]].copy()

    else:
        raise ValueError(
            "data must be either a list of TifProcessor objects or a GeoDataFrame"
        )

    self._update_view(results_df)
    self.logger.info(
        f"Zonal statistics mapping complete for column(s) derived from '{output_column}' or '{value_column}'"
    )
    return self.view
save_view(name, output_format=None)

Saves the current POI view (the enriched DataFrame) to a file.

The output path and format are determined by the config or overridden by the output_format parameter.

Parameters:

Name Type Description Default
name str

The base name for the output file (without extension).

required
output_format Optional[str]

The desired output format (e.g., "csv", "geojson"). If None, the output_format from config will be used.

None

Returns:

Name Type Description
Path Path

The full path to the saved output file.

Source code in gigaspatial/generators/poi.py
def save_view(
    self,
    name: str,
    output_format: Optional[str] = None,
) -> Path:
    """
    Saves the current POI view (the enriched DataFrame) to a file.

    The output path and format are determined by the `config`
    or overridden by the `output_format` parameter.

    Args:
        name (str): The base name for the output file (without extension).
        output_format (Optional[str]):
            The desired output format (e.g., "csv", "geojson"). If None,
            the `output_format` from `config` will be used.

    Returns:
        Path: The full path to the saved output file.
    """
    format_to_use = output_format or self.config.output_format
    output_path = self.config.base_path / f"{name}.{format_to_use}"

    self.logger.info(f"Saving POI view to {output_path}")
    # Save the current view, which is a pandas DataFrame, not a GeoDataFrame
    # GeoJSON/Shapefile formats would require converting back to GeoDataFrame first.
    # For CSV, Parquet, Feather, this is fine.
    if format_to_use in ["geojson", "shp", "gpkg"]:
        self.logger.warning(
            f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
        )
        # Re-add geometry for saving to geospatial formats
        view_to_save_gdf = self.view.merge(
            self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
        )
        write_dataset(
            data=view_to_save_gdf,
            path=str(output_path),
            data_store=self.data_store,
        )
    else:
        write_dataset(
            data=self.view,  # Use the internal _view DataFrame
            path=str(output_path),
            data_store=self.data_store,
        )

    return output_path
to_dataframe()

Returns the current POI view as a DataFrame.

This method combines all accumulated variables in the view

Returns:

Type Description
DataFrame

pd.DataFrame: The current view.

Source code in gigaspatial/generators/poi.py
def to_dataframe(self) -> pd.DataFrame:
    """
    Returns the current POI view as a DataFrame.

    This method combines all accumulated variables in the view

    Returns:
        pd.DataFrame: The current view.
    """
    return self.view
to_geodataframe()

Returns the current POI view merged with the original point geometries as a GeoDataFrame.

This method combines all accumulated variables in the view with the corresponding point geometries, providing a spatially-enabled DataFrame for further analysis or export.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The current view merged with point geometries.

Source code in gigaspatial/generators/poi.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Returns the current POI view merged with the original point geometries as a GeoDataFrame.

    This method combines all accumulated variables in the view with the corresponding
    point geometries, providing a spatially-enabled DataFrame for further analysis or export.

    Returns:
        gpd.GeoDataFrame: The current view merged with point geometries.
    """
    return self.view.merge(
        self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
    )
validate_data_coverage(data_bounds)

Validate how many POIs fall within the data coverage area.

Returns:

Name Type Description
dict dict

Coverage statistics

Source code in gigaspatial/generators/poi.py
def validate_data_coverage(self, data_bounds: gpd.GeoDataFrame) -> dict:
    """
    Validate how many POIs fall within the data coverage area.

    Returns:
        dict: Coverage statistics
    """
    poi_within = self.points_gdf.within(data_bounds.union_all())
    coverage_stats = {
        "total_pois": len(self.points_gdf),
        "covered_pois": poi_within.sum(),
        "coverage_percentage": (poi_within.sum() / len(self.points_gdf)) * 100,
        "uncovered_pois": (~poi_within).sum(),
    }
    return coverage_stats

PoiViewGeneratorConfig

Configuration for POI (Point of Interest) view generation.

Attributes:

Name Type Description
base_path Path

The base directory where generated POI views will be saved. Defaults to a path retrieved from config.

output_format str

The default format for saving output files (e.g., "csv", "geojson"). Defaults to "csv".

Source code in gigaspatial/generators/poi.py
@dataclass
class PoiViewGeneratorConfig:
    """
    Configuration for POI (Point of Interest) view generation.

    Attributes:
        base_path (Path): The base directory where generated POI views will be saved.
                          Defaults to a path retrieved from `config`.
        output_format (str): The default format for saving output files (e.g., "csv", "geojson").
                             Defaults to "csv".
    """

    base_path: Path = Field(default=global_config.get_path("poi", "views"))
    output_format: str = "csv"
    ensure_available: bool = True

zonal

admin

AdminBoundariesViewGenerator

Bases: GeometryBasedZonalViewGenerator[T]

Generates zonal views using administrative boundaries as the zones.

This class specializes in creating zonal views where the zones are defined by administrative boundaries (e.g., countries, states, districts) at a specified administrative level. It extends the GeometryBasedZonalViewGenerator and leverages the AdminBoundaries handler to load the necessary geographical data.

The administrative boundaries serve as the base geometries to which other geospatial data (points, polygons, rasters) can be mapped and aggregated.

Attributes:

Name Type Description
country str

The name or code of the country for which to load administrative boundaries.

admin_level int

The administrative level to load (e.g., 0 for country, 1 for states/provinces).

admin_path Union[str, Path]

Optional path to a local GeoJSON/Shapefile containing the administrative boundaries. If provided, this local file will be used instead of downloading.

config Optional[ZonalViewGeneratorConfig]

Configuration for the zonal view generation process.

data_store Optional[DataStore]

A DataStore instance for accessing data.

logger Optional[Logger]

A logger instance for logging messages.

Source code in gigaspatial/generators/zonal/admin.py
class AdminBoundariesViewGenerator(GeometryBasedZonalViewGenerator[T]):
    """
    Generates zonal views using administrative boundaries as the zones.

    This class specializes in creating zonal views where the zones are defined by
    administrative boundaries (e.g., countries, states, districts) at a specified
    administrative level. It extends the `GeometryBasedZonalViewGenerator` and
    leverages the `AdminBoundaries` handler to load the necessary geographical data.

    The administrative boundaries serve as the base geometries to which other
    geospatial data (points, polygons, rasters) can be mapped and aggregated.

    Attributes:
        country (str): The name or code of the country for which to load administrative boundaries.
        admin_level (int): The administrative level to load (e.g., 0 for country, 1 for states/provinces).
        admin_path (Union[str, Path], optional): Optional path to a local GeoJSON/Shapefile
            containing the administrative boundaries. If provided, this local file will be
            used instead of downloading.
        config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generation process.
        data_store (Optional[DataStore]): A DataStore instance for accessing data.
        logger (Optional[logging.Logger]): A logger instance for logging messages.
    """

    def __init__(
        self,
        country: str,
        admin_level: int,
        data_store: Optional[DataStore] = None,
        admin_path: Optional[Union[str, Path]] = None,
        config: Optional[ZonalViewGeneratorConfig] = None,
        logger: logging.Logger = None,
    ):
        """
        Initializes the AdminBoundariesViewGenerator.

        Args:
            country (str): The name or code of the country (e.g., "USA", "Germany").
            admin_level (int): The administrative level to load (e.g., 0 for country, 1 for states, 2 for districts).
            admin_path (Union[str, Path], optional): Path to a local administrative boundaries file (GeoJSON, Shapefile).
                                                     If provided, overrides default data loading.
            config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generator.
                                                         If None, a default config will be used.
            data_store (Optional[DataStore]): Data storage interface. If None, LocalDataStore is used.
            logger (Optional[logging.Logger]): Custom logger instance. If None, a default logger is used.
        """

        super().__init__(
            zone_data=self._init_zone_data(
                country, admin_level, data_store, admin_path
            ),
            zone_id_column="id",
            config=config,
            data_store=data_store,
            logger=logger,
        )
        self.logger.info(
            f"Initialized AdminBoundariesViewGenerator for {country} (level {admin_level})"
        )

    def _init_zone_data(
        self,
        country,
        admin_level,
        data_store: Optional[DataStore] = None,
        admin_path: Optional[Union[str, Path]] = None,
    ):
        gdf_boundaries = AdminBoundaries.create(
            country, admin_level, data_store, admin_path
        ).to_geodataframe()
        return gdf_boundaries
__init__(country, admin_level, data_store=None, admin_path=None, config=None, logger=None)

Initializes the AdminBoundariesViewGenerator.

Parameters:

Name Type Description Default
country str

The name or code of the country (e.g., "USA", "Germany").

required
admin_level int

The administrative level to load (e.g., 0 for country, 1 for states, 2 for districts).

required
admin_path Union[str, Path]

Path to a local administrative boundaries file (GeoJSON, Shapefile). If provided, overrides default data loading.

None
config Optional[ZonalViewGeneratorConfig]

Configuration for the zonal view generator. If None, a default config will be used.

None
data_store Optional[DataStore]

Data storage interface. If None, LocalDataStore is used.

None
logger Optional[Logger]

Custom logger instance. If None, a default logger is used.

None
Source code in gigaspatial/generators/zonal/admin.py
def __init__(
    self,
    country: str,
    admin_level: int,
    data_store: Optional[DataStore] = None,
    admin_path: Optional[Union[str, Path]] = None,
    config: Optional[ZonalViewGeneratorConfig] = None,
    logger: logging.Logger = None,
):
    """
    Initializes the AdminBoundariesViewGenerator.

    Args:
        country (str): The name or code of the country (e.g., "USA", "Germany").
        admin_level (int): The administrative level to load (e.g., 0 for country, 1 for states, 2 for districts).
        admin_path (Union[str, Path], optional): Path to a local administrative boundaries file (GeoJSON, Shapefile).
                                                 If provided, overrides default data loading.
        config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generator.
                                                     If None, a default config will be used.
        data_store (Optional[DataStore]): Data storage interface. If None, LocalDataStore is used.
        logger (Optional[logging.Logger]): Custom logger instance. If None, a default logger is used.
    """

    super().__init__(
        zone_data=self._init_zone_data(
            country, admin_level, data_store, admin_path
        ),
        zone_id_column="id",
        config=config,
        data_store=data_store,
        logger=logger,
    )
    self.logger.info(
        f"Initialized AdminBoundariesViewGenerator for {country} (level {admin_level})"
    )

base

ZonalViewGenerator

Bases: ABC, Generic[T]

Base class for mapping data to zonal datasets.

This class provides the framework for mapping various data sources (points, polygons, rasters) to zonal geometries like grid tiles or catchment areas. It serves as an abstract base class that must be subclassed to implement specific zonal systems.

The class supports three main types of data mapping: - Point data aggregation to zones - Polygon data aggregation with optional area weighting - Raster data sampling and statistics

Attributes:

Name Type Description
data_store DataStore

The data store for accessing input data.

generator_config ZonalViewGeneratorConfig

Configuration for the generator.

logger

Logger instance for this class.

Source code in gigaspatial/generators/zonal/base.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
class ZonalViewGenerator(ABC, Generic[T]):
    """Base class for mapping data to zonal datasets.

    This class provides the framework for mapping various data sources (points, polygons, rasters)
    to zonal geometries like grid tiles or catchment areas. It serves as an abstract base class
    that must be subclassed to implement specific zonal systems.

    The class supports three main types of data mapping:
    - Point data aggregation to zones
    - Polygon data aggregation with optional area weighting
    - Raster data sampling and statistics

    Attributes:
        data_store (DataStore): The data store for accessing input data.
        generator_config (ZonalViewGeneratorConfig): Configuration for the generator.
        logger: Logger instance for this class.
    """

    def __init__(
        self,
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):
        """Initialize the ZonalViewGenerator.

        Args:
            generator_config (ZonalViewGeneratorConfig, optional): Configuration for the generator.
                If None, uses default configuration.
            data_store (DataStore, optional): The data store for accessing input data.
                If None, uses LocalDataStore.
        """
        self.config = config or ZonalViewGeneratorConfig()
        self.data_store = data_store or LocalDataStore()
        self.logger = logger or global_config.get_logger(self.__class__.__name__)
        self._view: Optional[pd.DataFrame] = None

    @abstractmethod
    def get_zonal_geometries(self) -> List[Polygon]:
        """Get the geometries of the zones.

        This method must be implemented by subclasses to return the actual geometric
        shapes of the zones (e.g., grid tiles, catchment boundaries, administrative areas).

        Returns:
            List[Polygon]: A list of Shapely Polygon objects representing zone geometries.
        """
        pass

    @abstractmethod
    def get_zone_identifiers(self) -> List[T]:
        """Get unique identifiers for each zone.

        This method must be implemented by subclasses to return identifiers that
        correspond one-to-one with the geometries returned by get_zonal_geometries().

        Returns:
            List[T]: A list of zone identifiers (e.g., quadkeys, H3 indices, tile IDs).
                The type T is determined by the specific zonal system implementation.
        """
        pass

    def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert zones to a GeoDataFrame.

        Creates a GeoDataFrame containing zone identifiers and their corresponding
        geometries in WGS84 (EPSG:4326) coordinate reference system.

        Returns:
            gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns,
                where zone_id contains the identifiers and geometry contains the
                corresponding Polygon objects.
        """
        return gpd.GeoDataFrame(
            {
                "zone_id": self.get_zone_identifiers(),
                "geometry": self.get_zonal_geometries(),
            },
            crs="EPSG:4326",
        )

    @property
    def zone_gdf(self) -> gpd.GeoDataFrame:
        """Cached GeoDataFrame of zones.

        Returns:
            gpd.GeoDataFrame: Lazily-computed and cached GeoDataFrame of zone geometries
                and identifiers.
        """
        if not hasattr(self, "_zone_gdf"):
            self._zone_gdf = self.get_zone_geodataframe()
        return self._zone_gdf

    @property
    def view(self) -> pd.DataFrame:
        """The DataFrame representing the current zonal view.

        Returns:
            pd.DataFrame: The DataFrame containing zone IDs, and
                              any added variables. If no variables have been added,
                              it returns the base `zone_gdf` without geometries.
        """
        if self._view is None:
            self._view = self.zone_gdf.drop(columns="geometry")
        return self._view

    def add_variable_to_view(self, data_dict: Dict, column_name: str) -> None:
        """
        Adds a new variable (column) to the zonal view GeoDataFrame.

        This method takes a dictionary (typically the result of map_points or map_polygons)
        and adds its values as a new column to the internal `_view` (or `zone_gdf` if not yet initialized).
        The dictionary keys are expected to be the `zone_id` values.

        Args:
            data_dict (Dict): A dictionary where keys are `zone_id`s and values are
                              the data to be added.
            column_name (str): The name of the new column to be added to the GeoDataFrame.
        Raises:
            ValueError: If the `data_dict` keys do not match the `zone_id`s in the zonal view.
                        If the `column_name` already exists in the zonal view.
        """
        if self._view is None:
            self._view = self.zone_gdf.drop(columns="geometry")

        if column_name in self._view.columns:
            raise ValueError(
                f"Column '{column_name}' already exists in the zonal view."
            )

        # Create a pandas Series from the dictionary, aligning by index (zone_id)
        new_series = pd.Series(data_dict, name=column_name)

        # Before merging, ensure the zone_ids in data_dict match those in _view
        missing_zones_in_data = set(self._view["zone_id"]) - set(new_series.index)
        extra_zones_in_data = set(new_series.index) - set(self._view["zone_id"])

        if missing_zones_in_data:
            self.logger.warning(
                f"Warning: {len(missing_zones_in_data)} zone(s) from the zonal view "
                f"are missing in the provided data_dict for column '{column_name}'. "
                f"These zones will have NaN values for '{column_name}'. Missing: {list(missing_zones_in_data)[:5]}..."
            )
        if extra_zones_in_data:
            self.logger.warning(
                f"Warning: {len(extra_zones_in_data)} zone(s) in the provided data_dict "
                f"are not present in the zonal view for column '{column_name}'. "
                f"These will be ignored. Extra: {list(extra_zones_in_data)[:5]}..."
            )

        # Merge the new series with the _view based on 'zone_id'
        # Using .set_index() for efficient alignment
        original_index_name = self._view.index.name
        self._view = self._view.set_index("zone_id").join(new_series).reset_index()
        if original_index_name:  # Restore original index name if it existed
            self._view.index.name = original_index_name
        else:  # If it was a default integer index, ensure it's not named 'index'
            self._view.index.name = None

        self.logger.info(f"Added variable '{column_name}' to the zonal view.")

    def map_points(
        self,
        points: Union[pd.DataFrame, gpd.GeoDataFrame],
        value_columns: Optional[Union[str, List[str]]] = None,
        aggregation: Union[str, Dict[str, str]] = "count",
        predicate: str = "within",
        output_suffix: str = "",
        mapping_function: Optional[Callable] = None,
        **mapping_kwargs,
    ) -> Dict:
        """Map point data to zones with spatial aggregation.

        Aggregates point data to zones using spatial relationships. Points can be
        counted or have their attribute values aggregated using various statistical methods.

        Args:
            points (Union[pd.DataFrame, gpd.GeoDataFrame]): The point data to map.
                Must contain geometry information if DataFrame.
            value_columns (Union[str, List[str]], optional): Column name(s) containing
                values to aggregate. If None, only point counts are performed.
            aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use.
                Can be a single string ("count", "mean", "sum", "min", "max", etc.)
                or a dictionary mapping column names to aggregation methods.
            predicate (str): Spatial predicate for point-to-zone relationship.
                Options include "within", "intersects", "contains". Defaults to "within".
            output_suffix (str): Suffix to add to output column names. Defaults to empty string.
            mapping_function (Callable, optional): Custom function for mapping points to zones.
                If provided, signature should be mapping_function(self, points, **mapping_kwargs).
                When used, all other parameters except mapping_kwargs are ignored.
            **mapping_kwargs: Additional keyword arguments passed to the mapping function.

        Returns:
            Dict: Dictionary with zone IDs as keys and aggregated values as values.
                If value_columns is None, returns point counts per zone.
                If value_columns is specified, returns aggregated values per zone.
        """
        if mapping_function is not None:
            return mapping_function(self, points, **mapping_kwargs)

        else:
            self.logger.warning(
                "Using default points mapping implementation. Consider creating a specialized mapping function."
            )
            result = aggregate_points_to_zones(
                points=points,
                zones=self.zone_gdf,
                value_columns=value_columns,
                aggregation=aggregation,
                point_zone_predicate=predicate,
                zone_id_column="zone_id",
                output_suffix=output_suffix,
            )

            if isinstance(value_columns, str):
                return result.set_index("zone_id")[value_columns].to_dict()
            elif isinstance(value_columns, list):
                # If multiple value columns, return a dictionary of dictionaries
                # Or, if preferred, a dictionary where values are lists/tuples of results
                # For now, let's return a dict of series, which is common.
                # The previous version implied a single dictionary result from map_points/polygons
                # but with multiple columns, it's usually {zone_id: {col1: val1, col2: val2}}
                # or {col_name: {zone_id: val}}
                # In this version, it'll return a dictionary for each column.
                return {
                    col: result.set_index("zone_id")[col].to_dict()
                    for col in value_columns
                }
            else:  # If value_columns is None, it should return point_count
                return result.set_index("zone_id")["point_count"].to_dict()

    def map_polygons(
        self,
        polygons,
        value_columns: Optional[Union[str, List[str]]] = None,
        aggregation: Union[str, Dict[str, str]] = "count",
        predicate: str = "intersects",
        **kwargs,
    ) -> Dict:
        """
        Maps polygon data to the instance's zones and aggregates values.

        This method leverages `aggregate_polygons_to_zones` to perform a spatial
        aggregation of polygon data onto the zones stored within this object instance.
        It can count polygons, or aggregate their values, based on different spatial
        relationships defined by the `predicate`.

        Args:
            polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
                The polygon data to map. Must contain geometry information if a
                DataFrame.
            value_columns (Union[str, List[str]], optional):
                The column name(s) from the `polygons` data to aggregate. If `None`,
                the method will automatically count the number of polygons that
                match the given `predicate` for each zone.
            aggregation (Union[str, Dict[str, str]], optional):
                The aggregation method(s) to use. Can be a single string (e.g., "sum",
                "mean", "max") or a dictionary mapping column names to specific
                aggregation methods. This is ignored and set to "count" if
                `value_columns` is `None`. Defaults to "count".
            predicate (Literal["intersects", "within", "fractional"], optional):
                The spatial relationship to use for aggregation:
                - "intersects": Counts or aggregates values for any polygon that
                  intersects a zone.
                - "within": Counts or aggregates values for polygons that are
                  entirely contained within a zone.
                - "fractional": Performs area-weighted aggregation. The value of a
                  polygon is distributed proportionally to the area of its overlap
                  with each zone.
                Defaults to "intersects".
            **kwargs:
                Additional keyword arguments to be passed to the underlying
                `aggregate_polygons_to_zones_new` function.

        Returns:
            Dict:
                A dictionary or a nested dictionary containing the aggregated values,
                with zone IDs as keys. If `value_columns` is a single string, the
                return value is a dictionary mapping zone ID to the aggregated value.
                If `value_columns` is a list, the return value is a nested dictionary
                mapping each column name to its own dictionary of aggregated values.

        Raises:
            ValueError: If `value_columns` is of an unexpected type after processing.

        Example:
            >>> # Assuming 'self' is an object with a 'zone_gdf' attribute
            >>> # Count all land parcels that intersect each zone
            >>> parcel_counts = self.map_polygons(landuse_polygons)
            >>>
            >>> # Aggregate total population within zones using area weighting
            >>> population_by_zone = self.map_polygons(
            ...     landuse_polygons,
            ...     value_columns="population",
            ...     predicate="fractional",
            ...     aggregation="sum"
            ... )
            >>>
            >>> # Get the sum of residential area and count of buildings within each zone
            >>> residential_stats = self.map_polygons(
            ...     building_polygons,
            ...     value_columns=["residential_area_sqm", "building_id"],
            ...     aggregation={"residential_area_sqm": "sum", "building_id": "count"},
            ...     predicate="intersects"
            ... )
        """

        if value_columns is None:
            self.logger.warning(
                f"No value_columns specified. Defaulting to counting polygons with {predicate} predicate."
            )
            temp_value_col = "_temp_polygon_count_dummy"
            polygons[temp_value_col] = 1
            actual_value_columns = temp_value_col
            aggregation = "count"  # Force count if no value columns
        else:
            actual_value_columns = value_columns

        result = aggregate_polygons_to_zones(
            polygons=polygons,
            zones=self.zone_gdf,
            value_columns=actual_value_columns,
            aggregation=aggregation,
            predicate=predicate,
            zone_id_column="zone_id",
        )

        # Convert the result GeoDataFrame to the expected dictionary format
        if isinstance(actual_value_columns, str):
            return result.set_index("zone_id")[actual_value_columns].to_dict()
        elif isinstance(actual_value_columns, list):
            return {
                col: result.set_index("zone_id")[col].to_dict()
                for col in actual_value_columns
            }
        else:
            raise ValueError("Unexpected type for actual_value_columns.")

    def map_rasters(
        self,
        tif_processors: List[TifProcessor],
        mapping_function: Optional[Callable] = None,
        stat: str = "mean",
        **mapping_kwargs,
    ) -> Union[np.ndarray, Dict]:
        """Map raster data to zones using zonal statistics.

        Samples raster values within each zone and computes statistics. Automatically
        handles coordinate reference system transformations between raster and zone data.

        Args:
            tif_processors (List[TifProcessor]): List of TifProcessor objects for
                accessing raster data. All processors should have the same CRS.
            mapping_function (Callable, optional): Custom function for mapping rasters
                to zones. If provided, signature should be mapping_function(self, tif_processors, **mapping_kwargs).
                When used, stat and other parameters except mapping_kwargs are ignored.
            stat (str): Statistic to calculate when aggregating raster values within
                each zone. Options include "mean", "sum", "min", "max", "std", etc.
                Defaults to "mean".
            **mapping_kwargs: Additional keyword arguments passed to the mapping function.

        Returns:
            Union[np.ndarray, Dict]: By default, returns a NumPy array of sampled values
                with shape (n_zones, 1), taking the first non-nodata value encountered.
                Custom mapping functions may return different data structures.

        Note:
            If the coordinate reference system of the rasters differs from the zones,
            the zone geometries will be automatically transformed to match the raster CRS.
        """
        if mapping_function is not None:
            return mapping_function(self, tif_processors, **mapping_kwargs)

        self.logger.warning(
            "Using default raster mapping implementation. Consider creating a specialized mapping function."
        )

        raster_crs = tif_processors[0].crs

        if raster_crs != self.zone_gdf.crs:
            self.logger.info(f"Projecting zones to raster CRS: {raster_crs}")
            zone_geoms = self._get_transformed_geometries(raster_crs)
        else:
            zone_geoms = self.get_zonal_geometries()

        # Sample raster values
        sampled_values = sample_multiple_tifs_by_polygons(
            tif_processors=tif_processors, polygon_list=zone_geoms, stat=stat
        )

        zone_ids = self.get_zone_identifiers()

        return {zone_id: value for zone_id, value in zip(zone_ids, sampled_values)}

    @lru_cache(maxsize=32)
    def _get_transformed_geometries(self, target_crs):
        """Get zone geometries transformed to target coordinate reference system.

        This method is cached to avoid repeated coordinate transformations for
        the same target CRS.

        Args:
            target_crs: Target coordinate reference system for transformation.

        Returns:
            List[Polygon]: List of zone geometries transformed to the target CRS.
        """
        return self.zone_gdf.to_crs(target_crs).geometry.tolist()

    def save_view(
        self,
        name: str,
        output_format: Optional[str] = None,
    ) -> Path:
        """Save the generated zonal view to disk.

        Args:
            name (str): Base name for the output file (without extension).
            output_format (str, optional): File format to save in (e.g., "parquet",
                "geojson", "shp"). If None, uses the format specified in config.

        Returns:
            Path: The full path where the view was saved.

        Note:
            The output directory is determined by the config.base_path setting.
            The file extension is automatically added based on the output format.
            This method now saves the internal `self.view`.
        """
        if self._view is None:
            self.logger.warning(
                "No variables have been added to the zonal view. Saving the base zone_gdf."
            )
            view_to_save = self.zone_gdf
        else:
            view_to_save = self._view

        format_to_use = output_format or self.config.output_format
        output_path = self.config.base_path / f"{name}.{format_to_use}"

        self.logger.info(f"Saving zonal view to {output_path}")

        if format_to_use in ["geojson", "shp", "gpkg"]:
            self.logger.warning(
                f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
            )
            # Re-add geometry for saving to geospatial formats
            view_to_save = self.view.merge(
                self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
            )

        write_dataset(
            data=view_to_save,
            path=str(output_path),
            data_store=self.data_store,
        )

        return output_path

    def to_dataframe(self) -> pd.DataFrame:
        """
        Returns the current zonal view as a DataFrame.

        This method combines all accumulated variables in the view

        Returns:
            pd.DataFrame: The current view.
        """
        return self.view

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Returns the current zonal view merged with zone geometries as a GeoDataFrame.

        This method combines all accumulated variables in the view with the corresponding
        zone geometries, providing a spatially-enabled DataFrame for further analysis or export.

        Returns:
            gpd.GeoDataFrame: The current view merged with zone geometries.
        """
        return self.view.merge(
            self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
        )
view: pd.DataFrame property

The DataFrame representing the current zonal view.

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame containing zone IDs, and any added variables. If no variables have been added, it returns the base zone_gdf without geometries.

zone_gdf: gpd.GeoDataFrame property

Cached GeoDataFrame of zones.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Lazily-computed and cached GeoDataFrame of zone geometries and identifiers.

__init__(config=None, data_store=None, logger=None)

Initialize the ZonalViewGenerator.

Parameters:

Name Type Description Default
generator_config ZonalViewGeneratorConfig

Configuration for the generator. If None, uses default configuration.

required
data_store DataStore

The data store for accessing input data. If None, uses LocalDataStore.

None
Source code in gigaspatial/generators/zonal/base.py
def __init__(
    self,
    config: Optional[ZonalViewGeneratorConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: logging.Logger = None,
):
    """Initialize the ZonalViewGenerator.

    Args:
        generator_config (ZonalViewGeneratorConfig, optional): Configuration for the generator.
            If None, uses default configuration.
        data_store (DataStore, optional): The data store for accessing input data.
            If None, uses LocalDataStore.
    """
    self.config = config or ZonalViewGeneratorConfig()
    self.data_store = data_store or LocalDataStore()
    self.logger = logger or global_config.get_logger(self.__class__.__name__)
    self._view: Optional[pd.DataFrame] = None
add_variable_to_view(data_dict, column_name)

Adds a new variable (column) to the zonal view GeoDataFrame.

This method takes a dictionary (typically the result of map_points or map_polygons) and adds its values as a new column to the internal _view (or zone_gdf if not yet initialized). The dictionary keys are expected to be the zone_id values.

Parameters:

Name Type Description Default
data_dict Dict

A dictionary where keys are zone_ids and values are the data to be added.

required
column_name str

The name of the new column to be added to the GeoDataFrame.

required
Source code in gigaspatial/generators/zonal/base.py
def add_variable_to_view(self, data_dict: Dict, column_name: str) -> None:
    """
    Adds a new variable (column) to the zonal view GeoDataFrame.

    This method takes a dictionary (typically the result of map_points or map_polygons)
    and adds its values as a new column to the internal `_view` (or `zone_gdf` if not yet initialized).
    The dictionary keys are expected to be the `zone_id` values.

    Args:
        data_dict (Dict): A dictionary where keys are `zone_id`s and values are
                          the data to be added.
        column_name (str): The name of the new column to be added to the GeoDataFrame.
    Raises:
        ValueError: If the `data_dict` keys do not match the `zone_id`s in the zonal view.
                    If the `column_name` already exists in the zonal view.
    """
    if self._view is None:
        self._view = self.zone_gdf.drop(columns="geometry")

    if column_name in self._view.columns:
        raise ValueError(
            f"Column '{column_name}' already exists in the zonal view."
        )

    # Create a pandas Series from the dictionary, aligning by index (zone_id)
    new_series = pd.Series(data_dict, name=column_name)

    # Before merging, ensure the zone_ids in data_dict match those in _view
    missing_zones_in_data = set(self._view["zone_id"]) - set(new_series.index)
    extra_zones_in_data = set(new_series.index) - set(self._view["zone_id"])

    if missing_zones_in_data:
        self.logger.warning(
            f"Warning: {len(missing_zones_in_data)} zone(s) from the zonal view "
            f"are missing in the provided data_dict for column '{column_name}'. "
            f"These zones will have NaN values for '{column_name}'. Missing: {list(missing_zones_in_data)[:5]}..."
        )
    if extra_zones_in_data:
        self.logger.warning(
            f"Warning: {len(extra_zones_in_data)} zone(s) in the provided data_dict "
            f"are not present in the zonal view for column '{column_name}'. "
            f"These will be ignored. Extra: {list(extra_zones_in_data)[:5]}..."
        )

    # Merge the new series with the _view based on 'zone_id'
    # Using .set_index() for efficient alignment
    original_index_name = self._view.index.name
    self._view = self._view.set_index("zone_id").join(new_series).reset_index()
    if original_index_name:  # Restore original index name if it existed
        self._view.index.name = original_index_name
    else:  # If it was a default integer index, ensure it's not named 'index'
        self._view.index.name = None

    self.logger.info(f"Added variable '{column_name}' to the zonal view.")
get_zonal_geometries() abstractmethod

Get the geometries of the zones.

This method must be implemented by subclasses to return the actual geometric shapes of the zones (e.g., grid tiles, catchment boundaries, administrative areas).

Returns:

Type Description
List[Polygon]

List[Polygon]: A list of Shapely Polygon objects representing zone geometries.

Source code in gigaspatial/generators/zonal/base.py
@abstractmethod
def get_zonal_geometries(self) -> List[Polygon]:
    """Get the geometries of the zones.

    This method must be implemented by subclasses to return the actual geometric
    shapes of the zones (e.g., grid tiles, catchment boundaries, administrative areas).

    Returns:
        List[Polygon]: A list of Shapely Polygon objects representing zone geometries.
    """
    pass
get_zone_geodataframe()

Convert zones to a GeoDataFrame.

Creates a GeoDataFrame containing zone identifiers and their corresponding geometries in WGS84 (EPSG:4326) coordinate reference system.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns, where zone_id contains the identifiers and geometry contains the corresponding Polygon objects.

Source code in gigaspatial/generators/zonal/base.py
def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert zones to a GeoDataFrame.

    Creates a GeoDataFrame containing zone identifiers and their corresponding
    geometries in WGS84 (EPSG:4326) coordinate reference system.

    Returns:
        gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns,
            where zone_id contains the identifiers and geometry contains the
            corresponding Polygon objects.
    """
    return gpd.GeoDataFrame(
        {
            "zone_id": self.get_zone_identifiers(),
            "geometry": self.get_zonal_geometries(),
        },
        crs="EPSG:4326",
    )
get_zone_identifiers() abstractmethod

Get unique identifiers for each zone.

This method must be implemented by subclasses to return identifiers that correspond one-to-one with the geometries returned by get_zonal_geometries().

Returns:

Type Description
List[T]

List[T]: A list of zone identifiers (e.g., quadkeys, H3 indices, tile IDs). The type T is determined by the specific zonal system implementation.

Source code in gigaspatial/generators/zonal/base.py
@abstractmethod
def get_zone_identifiers(self) -> List[T]:
    """Get unique identifiers for each zone.

    This method must be implemented by subclasses to return identifiers that
    correspond one-to-one with the geometries returned by get_zonal_geometries().

    Returns:
        List[T]: A list of zone identifiers (e.g., quadkeys, H3 indices, tile IDs).
            The type T is determined by the specific zonal system implementation.
    """
    pass
map_points(points, value_columns=None, aggregation='count', predicate='within', output_suffix='', mapping_function=None, **mapping_kwargs)

Map point data to zones with spatial aggregation.

Aggregates point data to zones using spatial relationships. Points can be counted or have their attribute values aggregated using various statistical methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

The point data to map. Must contain geometry information if DataFrame.

required
value_columns Union[str, List[str]]

Column name(s) containing values to aggregate. If None, only point counts are performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use. Can be a single string ("count", "mean", "sum", "min", "max", etc.) or a dictionary mapping column names to aggregation methods.

'count'
predicate str

Spatial predicate for point-to-zone relationship. Options include "within", "intersects", "contains". Defaults to "within".

'within'
output_suffix str

Suffix to add to output column names. Defaults to empty string.

''
mapping_function Callable

Custom function for mapping points to zones. If provided, signature should be mapping_function(self, points, **mapping_kwargs). When used, all other parameters except mapping_kwargs are ignored.

None
**mapping_kwargs

Additional keyword arguments passed to the mapping function.

{}

Returns:

Name Type Description
Dict Dict

Dictionary with zone IDs as keys and aggregated values as values. If value_columns is None, returns point counts per zone. If value_columns is specified, returns aggregated values per zone.

Source code in gigaspatial/generators/zonal/base.py
def map_points(
    self,
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    predicate: str = "within",
    output_suffix: str = "",
    mapping_function: Optional[Callable] = None,
    **mapping_kwargs,
) -> Dict:
    """Map point data to zones with spatial aggregation.

    Aggregates point data to zones using spatial relationships. Points can be
    counted or have their attribute values aggregated using various statistical methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): The point data to map.
            Must contain geometry information if DataFrame.
        value_columns (Union[str, List[str]], optional): Column name(s) containing
            values to aggregate. If None, only point counts are performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use.
            Can be a single string ("count", "mean", "sum", "min", "max", etc.)
            or a dictionary mapping column names to aggregation methods.
        predicate (str): Spatial predicate for point-to-zone relationship.
            Options include "within", "intersects", "contains". Defaults to "within".
        output_suffix (str): Suffix to add to output column names. Defaults to empty string.
        mapping_function (Callable, optional): Custom function for mapping points to zones.
            If provided, signature should be mapping_function(self, points, **mapping_kwargs).
            When used, all other parameters except mapping_kwargs are ignored.
        **mapping_kwargs: Additional keyword arguments passed to the mapping function.

    Returns:
        Dict: Dictionary with zone IDs as keys and aggregated values as values.
            If value_columns is None, returns point counts per zone.
            If value_columns is specified, returns aggregated values per zone.
    """
    if mapping_function is not None:
        return mapping_function(self, points, **mapping_kwargs)

    else:
        self.logger.warning(
            "Using default points mapping implementation. Consider creating a specialized mapping function."
        )
        result = aggregate_points_to_zones(
            points=points,
            zones=self.zone_gdf,
            value_columns=value_columns,
            aggregation=aggregation,
            point_zone_predicate=predicate,
            zone_id_column="zone_id",
            output_suffix=output_suffix,
        )

        if isinstance(value_columns, str):
            return result.set_index("zone_id")[value_columns].to_dict()
        elif isinstance(value_columns, list):
            # If multiple value columns, return a dictionary of dictionaries
            # Or, if preferred, a dictionary where values are lists/tuples of results
            # For now, let's return a dict of series, which is common.
            # The previous version implied a single dictionary result from map_points/polygons
            # but with multiple columns, it's usually {zone_id: {col1: val1, col2: val2}}
            # or {col_name: {zone_id: val}}
            # In this version, it'll return a dictionary for each column.
            return {
                col: result.set_index("zone_id")[col].to_dict()
                for col in value_columns
            }
        else:  # If value_columns is None, it should return point_count
            return result.set_index("zone_id")["point_count"].to_dict()
map_polygons(polygons, value_columns=None, aggregation='count', predicate='intersects', **kwargs)

Maps polygon data to the instance's zones and aggregates values.

This method leverages aggregate_polygons_to_zones to perform a spatial aggregation of polygon data onto the zones stored within this object instance. It can count polygons, or aggregate their values, based on different spatial relationships defined by the predicate.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

The polygon data to map. Must contain geometry information if a DataFrame.

required
value_columns Union[str, List[str]]

The column name(s) from the polygons data to aggregate. If None, the method will automatically count the number of polygons that match the given predicate for each zone.

None
aggregation Union[str, Dict[str, str]]

The aggregation method(s) to use. Can be a single string (e.g., "sum", "mean", "max") or a dictionary mapping column names to specific aggregation methods. This is ignored and set to "count" if value_columns is None. Defaults to "count".

'count'
predicate Literal[intersects, within, fractional]

The spatial relationship to use for aggregation: - "intersects": Counts or aggregates values for any polygon that intersects a zone. - "within": Counts or aggregates values for polygons that are entirely contained within a zone. - "fractional": Performs area-weighted aggregation. The value of a polygon is distributed proportionally to the area of its overlap with each zone. Defaults to "intersects".

'intersects'
**kwargs

Additional keyword arguments to be passed to the underlying aggregate_polygons_to_zones_new function.

{}

Returns:

Name Type Description
Dict Dict

A dictionary or a nested dictionary containing the aggregated values, with zone IDs as keys. If value_columns is a single string, the return value is a dictionary mapping zone ID to the aggregated value. If value_columns is a list, the return value is a nested dictionary mapping each column name to its own dictionary of aggregated values.

Raises:

Type Description
ValueError

If value_columns is of an unexpected type after processing.

Example
Assuming 'self' is an object with a 'zone_gdf' attribute
Count all land parcels that intersect each zone

parcel_counts = self.map_polygons(landuse_polygons)

Aggregate total population within zones using area weighting

population_by_zone = self.map_polygons( ... landuse_polygons, ... value_columns="population", ... predicate="fractional", ... aggregation="sum" ... )

Get the sum of residential area and count of buildings within each zone

residential_stats = self.map_polygons( ... building_polygons, ... value_columns=["residential_area_sqm", "building_id"], ... aggregation={"residential_area_sqm": "sum", "building_id": "count"}, ... predicate="intersects" ... )

Source code in gigaspatial/generators/zonal/base.py
def map_polygons(
    self,
    polygons,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    predicate: str = "intersects",
    **kwargs,
) -> Dict:
    """
    Maps polygon data to the instance's zones and aggregates values.

    This method leverages `aggregate_polygons_to_zones` to perform a spatial
    aggregation of polygon data onto the zones stored within this object instance.
    It can count polygons, or aggregate their values, based on different spatial
    relationships defined by the `predicate`.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
            The polygon data to map. Must contain geometry information if a
            DataFrame.
        value_columns (Union[str, List[str]], optional):
            The column name(s) from the `polygons` data to aggregate. If `None`,
            the method will automatically count the number of polygons that
            match the given `predicate` for each zone.
        aggregation (Union[str, Dict[str, str]], optional):
            The aggregation method(s) to use. Can be a single string (e.g., "sum",
            "mean", "max") or a dictionary mapping column names to specific
            aggregation methods. This is ignored and set to "count" if
            `value_columns` is `None`. Defaults to "count".
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation:
            - "intersects": Counts or aggregates values for any polygon that
              intersects a zone.
            - "within": Counts or aggregates values for polygons that are
              entirely contained within a zone.
            - "fractional": Performs area-weighted aggregation. The value of a
              polygon is distributed proportionally to the area of its overlap
              with each zone.
            Defaults to "intersects".
        **kwargs:
            Additional keyword arguments to be passed to the underlying
            `aggregate_polygons_to_zones_new` function.

    Returns:
        Dict:
            A dictionary or a nested dictionary containing the aggregated values,
            with zone IDs as keys. If `value_columns` is a single string, the
            return value is a dictionary mapping zone ID to the aggregated value.
            If `value_columns` is a list, the return value is a nested dictionary
            mapping each column name to its own dictionary of aggregated values.

    Raises:
        ValueError: If `value_columns` is of an unexpected type after processing.

    Example:
        >>> # Assuming 'self' is an object with a 'zone_gdf' attribute
        >>> # Count all land parcels that intersect each zone
        >>> parcel_counts = self.map_polygons(landuse_polygons)
        >>>
        >>> # Aggregate total population within zones using area weighting
        >>> population_by_zone = self.map_polygons(
        ...     landuse_polygons,
        ...     value_columns="population",
        ...     predicate="fractional",
        ...     aggregation="sum"
        ... )
        >>>
        >>> # Get the sum of residential area and count of buildings within each zone
        >>> residential_stats = self.map_polygons(
        ...     building_polygons,
        ...     value_columns=["residential_area_sqm", "building_id"],
        ...     aggregation={"residential_area_sqm": "sum", "building_id": "count"},
        ...     predicate="intersects"
        ... )
    """

    if value_columns is None:
        self.logger.warning(
            f"No value_columns specified. Defaulting to counting polygons with {predicate} predicate."
        )
        temp_value_col = "_temp_polygon_count_dummy"
        polygons[temp_value_col] = 1
        actual_value_columns = temp_value_col
        aggregation = "count"  # Force count if no value columns
    else:
        actual_value_columns = value_columns

    result = aggregate_polygons_to_zones(
        polygons=polygons,
        zones=self.zone_gdf,
        value_columns=actual_value_columns,
        aggregation=aggregation,
        predicate=predicate,
        zone_id_column="zone_id",
    )

    # Convert the result GeoDataFrame to the expected dictionary format
    if isinstance(actual_value_columns, str):
        return result.set_index("zone_id")[actual_value_columns].to_dict()
    elif isinstance(actual_value_columns, list):
        return {
            col: result.set_index("zone_id")[col].to_dict()
            for col in actual_value_columns
        }
    else:
        raise ValueError("Unexpected type for actual_value_columns.")
map_rasters(tif_processors, mapping_function=None, stat='mean', **mapping_kwargs)

Map raster data to zones using zonal statistics.

Samples raster values within each zone and computes statistics. Automatically handles coordinate reference system transformations between raster and zone data.

Parameters:

Name Type Description Default
tif_processors List[TifProcessor]

List of TifProcessor objects for accessing raster data. All processors should have the same CRS.

required
mapping_function Callable

Custom function for mapping rasters to zones. If provided, signature should be mapping_function(self, tif_processors, **mapping_kwargs). When used, stat and other parameters except mapping_kwargs are ignored.

None
stat str

Statistic to calculate when aggregating raster values within each zone. Options include "mean", "sum", "min", "max", "std", etc. Defaults to "mean".

'mean'
**mapping_kwargs

Additional keyword arguments passed to the mapping function.

{}

Returns:

Type Description
Union[ndarray, Dict]

Union[np.ndarray, Dict]: By default, returns a NumPy array of sampled values with shape (n_zones, 1), taking the first non-nodata value encountered. Custom mapping functions may return different data structures.

Note

If the coordinate reference system of the rasters differs from the zones, the zone geometries will be automatically transformed to match the raster CRS.

Source code in gigaspatial/generators/zonal/base.py
def map_rasters(
    self,
    tif_processors: List[TifProcessor],
    mapping_function: Optional[Callable] = None,
    stat: str = "mean",
    **mapping_kwargs,
) -> Union[np.ndarray, Dict]:
    """Map raster data to zones using zonal statistics.

    Samples raster values within each zone and computes statistics. Automatically
    handles coordinate reference system transformations between raster and zone data.

    Args:
        tif_processors (List[TifProcessor]): List of TifProcessor objects for
            accessing raster data. All processors should have the same CRS.
        mapping_function (Callable, optional): Custom function for mapping rasters
            to zones. If provided, signature should be mapping_function(self, tif_processors, **mapping_kwargs).
            When used, stat and other parameters except mapping_kwargs are ignored.
        stat (str): Statistic to calculate when aggregating raster values within
            each zone. Options include "mean", "sum", "min", "max", "std", etc.
            Defaults to "mean".
        **mapping_kwargs: Additional keyword arguments passed to the mapping function.

    Returns:
        Union[np.ndarray, Dict]: By default, returns a NumPy array of sampled values
            with shape (n_zones, 1), taking the first non-nodata value encountered.
            Custom mapping functions may return different data structures.

    Note:
        If the coordinate reference system of the rasters differs from the zones,
        the zone geometries will be automatically transformed to match the raster CRS.
    """
    if mapping_function is not None:
        return mapping_function(self, tif_processors, **mapping_kwargs)

    self.logger.warning(
        "Using default raster mapping implementation. Consider creating a specialized mapping function."
    )

    raster_crs = tif_processors[0].crs

    if raster_crs != self.zone_gdf.crs:
        self.logger.info(f"Projecting zones to raster CRS: {raster_crs}")
        zone_geoms = self._get_transformed_geometries(raster_crs)
    else:
        zone_geoms = self.get_zonal_geometries()

    # Sample raster values
    sampled_values = sample_multiple_tifs_by_polygons(
        tif_processors=tif_processors, polygon_list=zone_geoms, stat=stat
    )

    zone_ids = self.get_zone_identifiers()

    return {zone_id: value for zone_id, value in zip(zone_ids, sampled_values)}
save_view(name, output_format=None)

Save the generated zonal view to disk.

Parameters:

Name Type Description Default
name str

Base name for the output file (without extension).

required
output_format str

File format to save in (e.g., "parquet", "geojson", "shp"). If None, uses the format specified in config.

None

Returns:

Name Type Description
Path Path

The full path where the view was saved.

Note

The output directory is determined by the config.base_path setting. The file extension is automatically added based on the output format. This method now saves the internal self.view.

Source code in gigaspatial/generators/zonal/base.py
def save_view(
    self,
    name: str,
    output_format: Optional[str] = None,
) -> Path:
    """Save the generated zonal view to disk.

    Args:
        name (str): Base name for the output file (without extension).
        output_format (str, optional): File format to save in (e.g., "parquet",
            "geojson", "shp"). If None, uses the format specified in config.

    Returns:
        Path: The full path where the view was saved.

    Note:
        The output directory is determined by the config.base_path setting.
        The file extension is automatically added based on the output format.
        This method now saves the internal `self.view`.
    """
    if self._view is None:
        self.logger.warning(
            "No variables have been added to the zonal view. Saving the base zone_gdf."
        )
        view_to_save = self.zone_gdf
    else:
        view_to_save = self._view

    format_to_use = output_format or self.config.output_format
    output_path = self.config.base_path / f"{name}.{format_to_use}"

    self.logger.info(f"Saving zonal view to {output_path}")

    if format_to_use in ["geojson", "shp", "gpkg"]:
        self.logger.warning(
            f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
        )
        # Re-add geometry for saving to geospatial formats
        view_to_save = self.view.merge(
            self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
        )

    write_dataset(
        data=view_to_save,
        path=str(output_path),
        data_store=self.data_store,
    )

    return output_path
to_dataframe()

Returns the current zonal view as a DataFrame.

This method combines all accumulated variables in the view

Returns:

Type Description
DataFrame

pd.DataFrame: The current view.

Source code in gigaspatial/generators/zonal/base.py
def to_dataframe(self) -> pd.DataFrame:
    """
    Returns the current zonal view as a DataFrame.

    This method combines all accumulated variables in the view

    Returns:
        pd.DataFrame: The current view.
    """
    return self.view
to_geodataframe()

Returns the current zonal view merged with zone geometries as a GeoDataFrame.

This method combines all accumulated variables in the view with the corresponding zone geometries, providing a spatially-enabled DataFrame for further analysis or export.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The current view merged with zone geometries.

Source code in gigaspatial/generators/zonal/base.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Returns the current zonal view merged with zone geometries as a GeoDataFrame.

    This method combines all accumulated variables in the view with the corresponding
    zone geometries, providing a spatially-enabled DataFrame for further analysis or export.

    Returns:
        gpd.GeoDataFrame: The current view merged with zone geometries.
    """
    return self.view.merge(
        self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
    )
ZonalViewGeneratorConfig

Bases: BaseModel

Configuration for zonal view generation.

Attributes:

Name Type Description
base_path Path

Base directory path for storing zonal views. Defaults to configured zonal views path.

output_format str

Default output format for saved views. Defaults to "parquet".

Source code in gigaspatial/generators/zonal/base.py
class ZonalViewGeneratorConfig(BaseModel):
    """Configuration for zonal view generation.

    Attributes:
        base_path (Path): Base directory path for storing zonal views. Defaults to
            configured zonal views path.
        output_format (str): Default output format for saved views. Defaults to "parquet".
    """

    base_path: Path = Field(default=global_config.get_path("zonal", "views"))
    output_format: str = "parquet"
    ensure_available: bool = True

geometry

GeometryBasedZonalViewGenerator

Bases: ZonalViewGenerator[T]

Mid-level class for zonal view generation based on geometries with identifiers.

This class serves as an intermediate between the abstract ZonalViewGenerator and specific implementations like MercatorViewGenerator or H3ViewGenerator. It handles the common case where zones are defined by a mapping between zone identifiers and geometries, either provided as a dictionary or as a GeoDataFrame.

The class extends the base functionality with methods for mapping common geospatial datasets including GHSL (Global Human Settlement Layer), Google Open Buildings, and Microsoft Global Buildings data.

Attributes:

Name Type Description
zone_dict Dict[T, Polygon]

Mapping of zone identifiers to geometries.

zone_id_column str

Name of the column containing zone identifiers.

zone_data_crs str

Coordinate reference system of the zone data.

_zone_gdf GeoDataFrame

Cached GeoDataFrame representation of zones.

data_store DataStore

For accessing input data.

config ZonalViewGeneratorConfig

Configuration for view generation.

logger ZonalViewGeneratorConfig

Logger instance for this class.

Source code in gigaspatial/generators/zonal/geometry.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
class GeometryBasedZonalViewGenerator(ZonalViewGenerator[T]):
    """Mid-level class for zonal view generation based on geometries with identifiers.

    This class serves as an intermediate between the abstract ZonalViewGenerator and specific
    implementations like MercatorViewGenerator or H3ViewGenerator. It handles the common case
    where zones are defined by a mapping between zone identifiers and geometries, either
    provided as a dictionary or as a GeoDataFrame.

    The class extends the base functionality with methods for mapping common geospatial
    datasets including GHSL (Global Human Settlement Layer), Google Open Buildings,
    and Microsoft Global Buildings data.

    Attributes:
        zone_dict (Dict[T, Polygon]): Mapping of zone identifiers to geometries.
        zone_id_column (str): Name of the column containing zone identifiers.
        zone_data_crs (str): Coordinate reference system of the zone data.
        _zone_gdf (gpd.GeoDataFrame): Cached GeoDataFrame representation of zones.
        data_store (DataStore): For accessing input data.
        config (ZonalViewGeneratorConfig): Configuration for view generation.
        logger: Logger instance for this class.
    """

    def __init__(
        self,
        zone_data: Union[Dict[T, Polygon], gpd.GeoDataFrame],
        zone_id_column: str = "zone_id",
        zone_data_crs: str = "EPSG:4326",
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):
        """Initialize with zone geometries and identifiers.

        Args:
            zone_data (Union[Dict[T, Polygon], gpd.GeoDataFrame]): Zone definitions.
                Either a dictionary mapping zone identifiers to Polygon/MultiPolygon geometries,
                or a GeoDataFrame with geometries and a zone identifier column.
            zone_id_column (str): Name of the column containing zone identifiers.
                Only used if zone_data is a GeoDataFrame. Defaults to "zone_id".
            zone_data_crs (str): Coordinate reference system of the zone data.
                Defaults to "EPSG:4326" (WGS84).
            config (ZonalViewGeneratorConfig, optional): Generator configuration.
                If None, uses default configuration.
            data_store (DataStore, optional): Data store for accessing input data.
                If None, uses LocalDataStore.

        Raises:
            TypeError: If zone_data is not a dictionary or GeoDataFrame, or if dictionary
                values are not Polygon/MultiPolygon geometries.
            ValueError: If zone_id_column is not found in GeoDataFrame, or if the provided
                CRS doesn't match the GeoDataFrame's CRS.
        """
        super().__init__(config=config, data_store=data_store, logger=logger)

        self.zone_id_column = zone_id_column
        self.zone_data_crs = zone_data_crs

        # Store zone data based on input type
        if isinstance(zone_data, dict):
            for zone_id, geom in zone_data.items():
                if not isinstance(geom, (Polygon, MultiPolygon)):
                    raise TypeError(
                        f"Zone {zone_id}: Expected (Multi)Polygon, got {type(geom).__name__}"
                    )

            # Store the original dictionary
            self.zone_dict = zone_data

            # Also create a GeoDataFrame for consistent access
            self._zone_gdf = gpd.GeoDataFrame(
                {
                    "zone_id": list(zone_data.keys()),
                    "geometry": list(zone_data.values()),
                },
                crs=zone_data_crs,
            )
            self.zone_id_column = "zone_id"
        else:
            if not isinstance(zone_data, gpd.GeoDataFrame):
                raise TypeError(
                    "zone_data must be either a Dict[T, Polygon] or a GeoDataFrame"
                )

            if zone_id_column not in zone_data.columns:
                raise ValueError(
                    f"Zone ID column '{zone_id_column}' not found in GeoDataFrame"
                )

            if zone_data_crs != zone_data.crs:
                raise ValueError(
                    f"Provided data crs '{zone_data_crs}' does not match to the crs of the data '{zone_data.crs}'"
                )

            # Store the GeoDataFrame
            self._zone_gdf = zone_data.rename(columns={zone_id_column: "zone_id"})

            # Also create a dictionary for fast lookups
            self.zone_dict = dict(zip(zone_data[zone_id_column], zone_data.geometry))

    def get_zonal_geometries(self) -> List[Polygon]:
        """Get the geometry of each zone.

        Returns:
            List[Polygon]: A list of zone geometries in the order they appear in the
                underlying GeoDataFrame.
        """
        return self._zone_gdf.geometry.tolist()

    def get_zone_identifiers(self) -> List[T]:
        """Get the identifier for each zone.

        Returns:
            List[T]: A list of zone identifiers in the order they appear in the
                underlying GeoDataFrame.
        """
        return self._zone_gdf.zone_id.tolist()

    def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert zones to a GeoDataFrame with standardized column names.

        Returns:
            gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns.
                The zone_id column is renamed from the original zone_id_column if different.
        """
        # If we already have a GeoDataFrame, just rename the ID column if needed
        result = self._zone_gdf.copy()
        if self.zone_id_column != "zone_id":
            result = result.rename(columns={self.zone_id_column: "zone_id"})
        return result

    def map_built_s(
        self,
        year=2020,
        resolution=100,
        stat: str = "sum",
        name_prefix: str = "built_surface_m2_",
        **kwargs,
    ) -> pd.DataFrame:
        """Map GHSL Built-up Surface data to zones.

        Convenience method for mapping Global Human Settlement Layer Built-up Surface
        data using appropriate default parameters for built surface analysis.

        Args:
            ghsl_data_config (GHSLDataConfig): Configuration for GHSL Built-up Surface data.
                Defaults to GHS_BUILT_S product for 2020 at 100m resolution.
            stat (str): Statistic to calculate for built surface values within each zone.
                Defaults to "sum" which gives total built surface area.
            name_prefix (str): Prefix for the output column name. Defaults to "built_surface_m2_".

        Returns:
            pd.DataFrame: Updated GeoDataFrame with zones and built surface metrics.
                Adds a column named "{name_prefix}{stat}" containing the aggregated values.
        """
        handler = GHSLDataHandler(
            product="GHS_BUILT_S",
            year=year,
            resolution=resolution,
            data_store=self.data_store,
            **kwargs,
        )

        return self.map_ghsl(
            handler=handler, stat=stat, name_prefix=name_prefix, **kwargs
        )

    def map_smod(
        self,
        year=2020,
        resolution=1000,
        stat: str = "median",
        name_prefix: str = "smod_class_",
        **kwargs,
    ) -> pd.DataFrame:
        """Map GHSL Settlement Model data to zones.

        Convenience method for mapping Global Human Settlement Layer Settlement Model
        data using appropriate default parameters for settlement classification analysis.

        Args:
            ghsl_data_config (GHSLDataConfig): Configuration for GHSL Settlement Model data.
                Defaults to GHS_SMOD product for 2020 at 1000m resolution in Mollweide projection.
            stat (str): Statistic to calculate for settlement class values within each zone.
                Defaults to "median" which gives the predominant settlement class.
            name_prefix (str): Prefix for the output column name. Defaults to "smod_class_".

        Returns:
            pd.DataFrame: Updated DataFrame with zones and settlement classification.
                Adds a column named "{name_prefix}{stat}" containing the aggregated values.
        """
        handler = GHSLDataHandler(
            product="GHS_SMOD",
            year=year,
            resolution=resolution,
            data_store=self.data_store,
            coord_system=54009,
            **kwargs,
        )

        return self.map_ghsl(
            handler=handler, stat=stat, name_prefix=name_prefix, **kwargs
        )

    def map_ghsl(
        self,
        handler: GHSLDataHandler,
        stat: str,
        name_prefix: Optional[str] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """Map Global Human Settlement Layer data to zones.

        Loads and processes GHSL raster data for the intersecting tiles, then samples
        the raster values within each zone using the specified statistic.

        Args:
            ghsl_data_config (GHSLDataConfig): Configuration specifying which GHSL
                product, year, resolution, and coordinate system to use.
            stat (str): Statistic to calculate for raster values within each zone.
                Common options: "mean", "sum", "median", "min", "max".
            name_prefix (str, optional): Prefix for the output column name.
                If None, uses the GHSL product name in lowercase followed by underscore.

        Returns:
            pd.DataFrame: Updated DataFrame with GHSL metrics.
                Adds a column named "{name_prefix}{stat}" containing the sampled values.

        Note:
            The method automatically determines which GHSL tiles intersect with the zones
            and loads only the necessary data for efficient processing.
        """
        handler = handler or GHSLDataHandler(data_store=self.data_store, **kwargs)
        self.logger.info(
            f"Mapping {handler.config.product} data (year: {handler.config.year}, resolution: {handler.config.resolution}m)"
        )
        tif_processors = handler.load_data(
            self.zone_gdf, ensure_available=self.config.ensure_available
        )

        self.logger.info(
            f"Sampling {handler.config.product} data using '{stat}' statistic"
        )
        sampled_values = self.map_rasters(tif_processors=tif_processors, stat=stat)

        name_prefix = (
            name_prefix if name_prefix else handler.config.product.lower() + "_"
        )
        column_name = f"{name_prefix}{stat}"
        self.add_variable_to_view(sampled_values, column_name)

        return self.view

    def map_google_buildings(
        self,
        handler: Optional[GoogleOpenBuildingsHandler] = None,
        use_polygons: bool = False,
    ) -> pd.DataFrame:
        """Map Google Open Buildings data to zones.

        Processes Google Open Buildings dataset to calculate building counts and total
        building area within each zone. Can use either point centroids (faster) or
        polygon geometries (more accurate) for spatial operations.

        Args:
            google_open_buildings_config (GoogleOpenBuildingsConfig): Configuration
                for accessing Google Open Buildings data. Uses default configuration if not provided.
            use_polygons (bool): Whether to use polygon geometries for buildings.
                If True, uses actual building polygons for more accurate area calculations
                but with slower performance. If False, uses building centroids with
                area values from attributes for faster processing. Defaults to False.

        Returns:
            pd.DataFrame: Updated DataFrame with building metrics.
                Adds columns:
                - 'google_buildings_count': Number of buildings in each zone
                - 'google_buildings_area_in_meters': Total building area in square meters

        Note:
            If no Google Buildings data is found for the zones, returns the original
            GeoDataFrame unchanged with a warning logged.
        """
        self.logger.info(
            f"Mapping Google Open Buildings data (use_polygons={use_polygons})"
        )

        self.logger.info("Loading Google Buildings point data")
        handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)
        buildings_df = handler.load_points(
            self.zone_gdf, ensure_available=self.config.ensure_available
        )

        if buildings_df.empty:
            self.logger.warning("No Google buildings data found for the provided zones")
            return self._zone_gdf.copy()

        if not use_polygons:
            self.logger.info("Aggregating building data using points with attributes")
            result = self.map_points(
                points=buildings_df,
                value_columns=["full_plus_code", "area_in_meters"],
                aggregation={"full_plus_code": "count", "area_in_meters": "sum"},
                predicate="within",
            )

            count_result = result["full_plus_code"]
            area_result = result["area_in_meters"]

        else:
            self.logger.info(
                "Loading Google Buildings polygon data for more accurate mapping"
            )
            buildings_gdf = handler.load_polygons(
                self.zone_gdf, self.config.ensure_available
            )

            self.logger.info(
                "Calculating building areas with area-weighted aggregation"
            )
            area_result = self.map_polygons(
                buildings_gdf,
                value_columns="area_in_meters",
                aggregation="sum",
                predicate="fractional",
            )

            self.logger.info("Counting buildings using points data")
            count_result = self.map_points(points=buildings_df, predicate="within")

        self.add_variable_to_view(count_result, "google_buildings_count")
        self.add_variable_to_view(area_result, "google_buildings_area_in_meters")

        return self.view

    def map_ms_buildings(
        self,
        handler: Optional[MSBuildingsHandler] = None,
        use_polygons: bool = False,
    ) -> gpd.GeoDataFrame:
        """Map Microsoft Global Buildings data to zones.

        Processes Microsoft Global Buildings dataset to calculate building counts and
        total building area within each zone. Can use either centroid points (faster)
        or polygon geometries (more accurate) for spatial operations.

        Args:
            ms_buildings_config (MSBuildingsConfig, optional): Configuration for
                accessing Microsoft Global Buildings data. If None, uses default configuration.
            use_polygons (bool): Whether to use polygon geometries for buildings.
                If True, uses actual building polygons for more accurate area calculations
                but with slower performance. If False, uses building centroids with
                area values from attributes for faster processing. Defaults to False.

        Returns:
            gpd.GeoDataFrame: Updated GeoDataFrame with zones and building metrics.
                Adds columns:
                - 'ms_buildings_count': Number of buildings in each zone
                - 'ms_buildings_area_in_meters': Total building area in square meters

        Note:
            If no Microsoft Buildings data is found for the zones, returns the original
            GeoDataFrame unchanged with a warning logged. Building areas are calculated
            in meters using appropriate UTM projections.
        """
        self.logger.info("Mapping Microsoft Global Buildings data")

        self.logger.info("Loading Microsoft Buildings polygon data")
        handler = MSBuildingsHandler(data_store=self.data_store)
        buildings_gdf = handler.load_data(
            self.zone_gdf, ensure_available=self.config.ensure_available
        )

        # Check if we found any buildings
        if buildings_gdf.empty:
            self.logger.warning(
                "No Microsoft buildings data found for the provided zones"
            )
            return self._zone_gdf.copy()

        buildings_gdf = add_area_in_meters(
            buildings_gdf, area_column_name="area_in_meters"
        )

        building_centroids = get_centroids(buildings_gdf)

        if not use_polygons:
            self.logger.info("Aggregating building data using points with attributes")

            result = self.map_points(
                points=building_centroids,
                value_columns=["type", "area_in_meters"],
                aggregation={"type": "count", "area_in_meters": "sum"},
                predicate="within",
            )

            count_result = result["type"]
            area_result = result["area_in_meters"]
        else:

            self.logger.info(
                "Calculating building areas with area-weighted aggregation"
            )
            area_result = self.map_polygons(
                buildings_gdf,
                value_columns="area_in_meters",
                aggregation="sum",
                predicate="fractional",
            )

            self.logger.info("Counting Microsoft buildings per zone")

            count_result = self.map_points(
                points=building_centroids, predicate="within"
            )

        self.add_variable_to_view(count_result, "ms_buildings_count")
        self.add_variable_to_view(area_result, "ms_buildings_area_in_meters")

        return self.view

    def map_ghsl_pop(
        self,
        year=2020,
        resolution=100,
        stat: str = "sum",
        name_prefix: str = "ghsl_pop_",
        predicate: Literal["intersects", "fractional"] = "intersects",
        **kwargs,
    ):
        handler = GHSLDataHandler(
            product="GHS_POP",
            year=year,
            resolution=resolution,
            data_store=self.data_store,
            **kwargs,
        )

        if predicate == "fractional":
            if resolution == 100:
                self.logger.warning(
                    "Fractional aggregations only supported for datasets with 1000m resolution. Using `intersects` as predicate"
                )
                predicate = "intersects"
            else:
                gdf_pop = handler.load_into_geodataframe()

                result = self.map_polygons(
                    gdf_pop,
                    value_columns="pixel_value",
                    aggregation="sum",
                    predicate="fractional",
                )

                column_name = f"{name_prefix}{stat}"
                self.add_variable_to_view(result, column_name)
                return self.view

        return self.map_ghsl(
            handler=handler, stat=stat, name_prefix=name_prefix, **kwargs
        )
__init__(zone_data, zone_id_column='zone_id', zone_data_crs='EPSG:4326', config=None, data_store=None, logger=None)

Initialize with zone geometries and identifiers.

Parameters:

Name Type Description Default
zone_data Union[Dict[T, Polygon], GeoDataFrame]

Zone definitions. Either a dictionary mapping zone identifiers to Polygon/MultiPolygon geometries, or a GeoDataFrame with geometries and a zone identifier column.

required
zone_id_column str

Name of the column containing zone identifiers. Only used if zone_data is a GeoDataFrame. Defaults to "zone_id".

'zone_id'
zone_data_crs str

Coordinate reference system of the zone data. Defaults to "EPSG:4326" (WGS84).

'EPSG:4326'
config ZonalViewGeneratorConfig

Generator configuration. If None, uses default configuration.

None
data_store DataStore

Data store for accessing input data. If None, uses LocalDataStore.

None

Raises:

Type Description
TypeError

If zone_data is not a dictionary or GeoDataFrame, or if dictionary values are not Polygon/MultiPolygon geometries.

ValueError

If zone_id_column is not found in GeoDataFrame, or if the provided CRS doesn't match the GeoDataFrame's CRS.

Source code in gigaspatial/generators/zonal/geometry.py
def __init__(
    self,
    zone_data: Union[Dict[T, Polygon], gpd.GeoDataFrame],
    zone_id_column: str = "zone_id",
    zone_data_crs: str = "EPSG:4326",
    config: Optional[ZonalViewGeneratorConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: logging.Logger = None,
):
    """Initialize with zone geometries and identifiers.

    Args:
        zone_data (Union[Dict[T, Polygon], gpd.GeoDataFrame]): Zone definitions.
            Either a dictionary mapping zone identifiers to Polygon/MultiPolygon geometries,
            or a GeoDataFrame with geometries and a zone identifier column.
        zone_id_column (str): Name of the column containing zone identifiers.
            Only used if zone_data is a GeoDataFrame. Defaults to "zone_id".
        zone_data_crs (str): Coordinate reference system of the zone data.
            Defaults to "EPSG:4326" (WGS84).
        config (ZonalViewGeneratorConfig, optional): Generator configuration.
            If None, uses default configuration.
        data_store (DataStore, optional): Data store for accessing input data.
            If None, uses LocalDataStore.

    Raises:
        TypeError: If zone_data is not a dictionary or GeoDataFrame, or if dictionary
            values are not Polygon/MultiPolygon geometries.
        ValueError: If zone_id_column is not found in GeoDataFrame, or if the provided
            CRS doesn't match the GeoDataFrame's CRS.
    """
    super().__init__(config=config, data_store=data_store, logger=logger)

    self.zone_id_column = zone_id_column
    self.zone_data_crs = zone_data_crs

    # Store zone data based on input type
    if isinstance(zone_data, dict):
        for zone_id, geom in zone_data.items():
            if not isinstance(geom, (Polygon, MultiPolygon)):
                raise TypeError(
                    f"Zone {zone_id}: Expected (Multi)Polygon, got {type(geom).__name__}"
                )

        # Store the original dictionary
        self.zone_dict = zone_data

        # Also create a GeoDataFrame for consistent access
        self._zone_gdf = gpd.GeoDataFrame(
            {
                "zone_id": list(zone_data.keys()),
                "geometry": list(zone_data.values()),
            },
            crs=zone_data_crs,
        )
        self.zone_id_column = "zone_id"
    else:
        if not isinstance(zone_data, gpd.GeoDataFrame):
            raise TypeError(
                "zone_data must be either a Dict[T, Polygon] or a GeoDataFrame"
            )

        if zone_id_column not in zone_data.columns:
            raise ValueError(
                f"Zone ID column '{zone_id_column}' not found in GeoDataFrame"
            )

        if zone_data_crs != zone_data.crs:
            raise ValueError(
                f"Provided data crs '{zone_data_crs}' does not match to the crs of the data '{zone_data.crs}'"
            )

        # Store the GeoDataFrame
        self._zone_gdf = zone_data.rename(columns={zone_id_column: "zone_id"})

        # Also create a dictionary for fast lookups
        self.zone_dict = dict(zip(zone_data[zone_id_column], zone_data.geometry))
get_zonal_geometries()

Get the geometry of each zone.

Returns:

Type Description
List[Polygon]

List[Polygon]: A list of zone geometries in the order they appear in the underlying GeoDataFrame.

Source code in gigaspatial/generators/zonal/geometry.py
def get_zonal_geometries(self) -> List[Polygon]:
    """Get the geometry of each zone.

    Returns:
        List[Polygon]: A list of zone geometries in the order they appear in the
            underlying GeoDataFrame.
    """
    return self._zone_gdf.geometry.tolist()
get_zone_geodataframe()

Convert zones to a GeoDataFrame with standardized column names.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns. The zone_id column is renamed from the original zone_id_column if different.

Source code in gigaspatial/generators/zonal/geometry.py
def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert zones to a GeoDataFrame with standardized column names.

    Returns:
        gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns.
            The zone_id column is renamed from the original zone_id_column if different.
    """
    # If we already have a GeoDataFrame, just rename the ID column if needed
    result = self._zone_gdf.copy()
    if self.zone_id_column != "zone_id":
        result = result.rename(columns={self.zone_id_column: "zone_id"})
    return result
get_zone_identifiers()

Get the identifier for each zone.

Returns:

Type Description
List[T]

List[T]: A list of zone identifiers in the order they appear in the underlying GeoDataFrame.

Source code in gigaspatial/generators/zonal/geometry.py
def get_zone_identifiers(self) -> List[T]:
    """Get the identifier for each zone.

    Returns:
        List[T]: A list of zone identifiers in the order they appear in the
            underlying GeoDataFrame.
    """
    return self._zone_gdf.zone_id.tolist()
map_built_s(year=2020, resolution=100, stat='sum', name_prefix='built_surface_m2_', **kwargs)

Map GHSL Built-up Surface data to zones.

Convenience method for mapping Global Human Settlement Layer Built-up Surface data using appropriate default parameters for built surface analysis.

Parameters:

Name Type Description Default
ghsl_data_config GHSLDataConfig

Configuration for GHSL Built-up Surface data. Defaults to GHS_BUILT_S product for 2020 at 100m resolution.

required
stat str

Statistic to calculate for built surface values within each zone. Defaults to "sum" which gives total built surface area.

'sum'
name_prefix str

Prefix for the output column name. Defaults to "built_surface_m2_".

'built_surface_m2_'

Returns:

Type Description
DataFrame

pd.DataFrame: Updated GeoDataFrame with zones and built surface metrics. Adds a column named "{name_prefix}{stat}" containing the aggregated values.

Source code in gigaspatial/generators/zonal/geometry.py
def map_built_s(
    self,
    year=2020,
    resolution=100,
    stat: str = "sum",
    name_prefix: str = "built_surface_m2_",
    **kwargs,
) -> pd.DataFrame:
    """Map GHSL Built-up Surface data to zones.

    Convenience method for mapping Global Human Settlement Layer Built-up Surface
    data using appropriate default parameters for built surface analysis.

    Args:
        ghsl_data_config (GHSLDataConfig): Configuration for GHSL Built-up Surface data.
            Defaults to GHS_BUILT_S product for 2020 at 100m resolution.
        stat (str): Statistic to calculate for built surface values within each zone.
            Defaults to "sum" which gives total built surface area.
        name_prefix (str): Prefix for the output column name. Defaults to "built_surface_m2_".

    Returns:
        pd.DataFrame: Updated GeoDataFrame with zones and built surface metrics.
            Adds a column named "{name_prefix}{stat}" containing the aggregated values.
    """
    handler = GHSLDataHandler(
        product="GHS_BUILT_S",
        year=year,
        resolution=resolution,
        data_store=self.data_store,
        **kwargs,
    )

    return self.map_ghsl(
        handler=handler, stat=stat, name_prefix=name_prefix, **kwargs
    )
map_ghsl(handler, stat, name_prefix=None, **kwargs)

Map Global Human Settlement Layer data to zones.

Loads and processes GHSL raster data for the intersecting tiles, then samples the raster values within each zone using the specified statistic.

Parameters:

Name Type Description Default
ghsl_data_config GHSLDataConfig

Configuration specifying which GHSL product, year, resolution, and coordinate system to use.

required
stat str

Statistic to calculate for raster values within each zone. Common options: "mean", "sum", "median", "min", "max".

required
name_prefix str

Prefix for the output column name. If None, uses the GHSL product name in lowercase followed by underscore.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with GHSL metrics. Adds a column named "{name_prefix}{stat}" containing the sampled values.

Note

The method automatically determines which GHSL tiles intersect with the zones and loads only the necessary data for efficient processing.

Source code in gigaspatial/generators/zonal/geometry.py
def map_ghsl(
    self,
    handler: GHSLDataHandler,
    stat: str,
    name_prefix: Optional[str] = None,
    **kwargs,
) -> pd.DataFrame:
    """Map Global Human Settlement Layer data to zones.

    Loads and processes GHSL raster data for the intersecting tiles, then samples
    the raster values within each zone using the specified statistic.

    Args:
        ghsl_data_config (GHSLDataConfig): Configuration specifying which GHSL
            product, year, resolution, and coordinate system to use.
        stat (str): Statistic to calculate for raster values within each zone.
            Common options: "mean", "sum", "median", "min", "max".
        name_prefix (str, optional): Prefix for the output column name.
            If None, uses the GHSL product name in lowercase followed by underscore.

    Returns:
        pd.DataFrame: Updated DataFrame with GHSL metrics.
            Adds a column named "{name_prefix}{stat}" containing the sampled values.

    Note:
        The method automatically determines which GHSL tiles intersect with the zones
        and loads only the necessary data for efficient processing.
    """
    handler = handler or GHSLDataHandler(data_store=self.data_store, **kwargs)
    self.logger.info(
        f"Mapping {handler.config.product} data (year: {handler.config.year}, resolution: {handler.config.resolution}m)"
    )
    tif_processors = handler.load_data(
        self.zone_gdf, ensure_available=self.config.ensure_available
    )

    self.logger.info(
        f"Sampling {handler.config.product} data using '{stat}' statistic"
    )
    sampled_values = self.map_rasters(tif_processors=tif_processors, stat=stat)

    name_prefix = (
        name_prefix if name_prefix else handler.config.product.lower() + "_"
    )
    column_name = f"{name_prefix}{stat}"
    self.add_variable_to_view(sampled_values, column_name)

    return self.view
map_google_buildings(handler=None, use_polygons=False)

Map Google Open Buildings data to zones.

Processes Google Open Buildings dataset to calculate building counts and total building area within each zone. Can use either point centroids (faster) or polygon geometries (more accurate) for spatial operations.

Parameters:

Name Type Description Default
google_open_buildings_config GoogleOpenBuildingsConfig

Configuration for accessing Google Open Buildings data. Uses default configuration if not provided.

required
use_polygons bool

Whether to use polygon geometries for buildings. If True, uses actual building polygons for more accurate area calculations but with slower performance. If False, uses building centroids with area values from attributes for faster processing. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with building metrics. Adds columns: - 'google_buildings_count': Number of buildings in each zone - 'google_buildings_area_in_meters': Total building area in square meters

Note

If no Google Buildings data is found for the zones, returns the original GeoDataFrame unchanged with a warning logged.

Source code in gigaspatial/generators/zonal/geometry.py
def map_google_buildings(
    self,
    handler: Optional[GoogleOpenBuildingsHandler] = None,
    use_polygons: bool = False,
) -> pd.DataFrame:
    """Map Google Open Buildings data to zones.

    Processes Google Open Buildings dataset to calculate building counts and total
    building area within each zone. Can use either point centroids (faster) or
    polygon geometries (more accurate) for spatial operations.

    Args:
        google_open_buildings_config (GoogleOpenBuildingsConfig): Configuration
            for accessing Google Open Buildings data. Uses default configuration if not provided.
        use_polygons (bool): Whether to use polygon geometries for buildings.
            If True, uses actual building polygons for more accurate area calculations
            but with slower performance. If False, uses building centroids with
            area values from attributes for faster processing. Defaults to False.

    Returns:
        pd.DataFrame: Updated DataFrame with building metrics.
            Adds columns:
            - 'google_buildings_count': Number of buildings in each zone
            - 'google_buildings_area_in_meters': Total building area in square meters

    Note:
        If no Google Buildings data is found for the zones, returns the original
        GeoDataFrame unchanged with a warning logged.
    """
    self.logger.info(
        f"Mapping Google Open Buildings data (use_polygons={use_polygons})"
    )

    self.logger.info("Loading Google Buildings point data")
    handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)
    buildings_df = handler.load_points(
        self.zone_gdf, ensure_available=self.config.ensure_available
    )

    if buildings_df.empty:
        self.logger.warning("No Google buildings data found for the provided zones")
        return self._zone_gdf.copy()

    if not use_polygons:
        self.logger.info("Aggregating building data using points with attributes")
        result = self.map_points(
            points=buildings_df,
            value_columns=["full_plus_code", "area_in_meters"],
            aggregation={"full_plus_code": "count", "area_in_meters": "sum"},
            predicate="within",
        )

        count_result = result["full_plus_code"]
        area_result = result["area_in_meters"]

    else:
        self.logger.info(
            "Loading Google Buildings polygon data for more accurate mapping"
        )
        buildings_gdf = handler.load_polygons(
            self.zone_gdf, self.config.ensure_available
        )

        self.logger.info(
            "Calculating building areas with area-weighted aggregation"
        )
        area_result = self.map_polygons(
            buildings_gdf,
            value_columns="area_in_meters",
            aggregation="sum",
            predicate="fractional",
        )

        self.logger.info("Counting buildings using points data")
        count_result = self.map_points(points=buildings_df, predicate="within")

    self.add_variable_to_view(count_result, "google_buildings_count")
    self.add_variable_to_view(area_result, "google_buildings_area_in_meters")

    return self.view
map_ms_buildings(handler=None, use_polygons=False)

Map Microsoft Global Buildings data to zones.

Processes Microsoft Global Buildings dataset to calculate building counts and total building area within each zone. Can use either centroid points (faster) or polygon geometries (more accurate) for spatial operations.

Parameters:

Name Type Description Default
ms_buildings_config MSBuildingsConfig

Configuration for accessing Microsoft Global Buildings data. If None, uses default configuration.

required
use_polygons bool

Whether to use polygon geometries for buildings. If True, uses actual building polygons for more accurate area calculations but with slower performance. If False, uses building centroids with area values from attributes for faster processing. Defaults to False.

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Updated GeoDataFrame with zones and building metrics. Adds columns: - 'ms_buildings_count': Number of buildings in each zone - 'ms_buildings_area_in_meters': Total building area in square meters

Note

If no Microsoft Buildings data is found for the zones, returns the original GeoDataFrame unchanged with a warning logged. Building areas are calculated in meters using appropriate UTM projections.

Source code in gigaspatial/generators/zonal/geometry.py
def map_ms_buildings(
    self,
    handler: Optional[MSBuildingsHandler] = None,
    use_polygons: bool = False,
) -> gpd.GeoDataFrame:
    """Map Microsoft Global Buildings data to zones.

    Processes Microsoft Global Buildings dataset to calculate building counts and
    total building area within each zone. Can use either centroid points (faster)
    or polygon geometries (more accurate) for spatial operations.

    Args:
        ms_buildings_config (MSBuildingsConfig, optional): Configuration for
            accessing Microsoft Global Buildings data. If None, uses default configuration.
        use_polygons (bool): Whether to use polygon geometries for buildings.
            If True, uses actual building polygons for more accurate area calculations
            but with slower performance. If False, uses building centroids with
            area values from attributes for faster processing. Defaults to False.

    Returns:
        gpd.GeoDataFrame: Updated GeoDataFrame with zones and building metrics.
            Adds columns:
            - 'ms_buildings_count': Number of buildings in each zone
            - 'ms_buildings_area_in_meters': Total building area in square meters

    Note:
        If no Microsoft Buildings data is found for the zones, returns the original
        GeoDataFrame unchanged with a warning logged. Building areas are calculated
        in meters using appropriate UTM projections.
    """
    self.logger.info("Mapping Microsoft Global Buildings data")

    self.logger.info("Loading Microsoft Buildings polygon data")
    handler = MSBuildingsHandler(data_store=self.data_store)
    buildings_gdf = handler.load_data(
        self.zone_gdf, ensure_available=self.config.ensure_available
    )

    # Check if we found any buildings
    if buildings_gdf.empty:
        self.logger.warning(
            "No Microsoft buildings data found for the provided zones"
        )
        return self._zone_gdf.copy()

    buildings_gdf = add_area_in_meters(
        buildings_gdf, area_column_name="area_in_meters"
    )

    building_centroids = get_centroids(buildings_gdf)

    if not use_polygons:
        self.logger.info("Aggregating building data using points with attributes")

        result = self.map_points(
            points=building_centroids,
            value_columns=["type", "area_in_meters"],
            aggregation={"type": "count", "area_in_meters": "sum"},
            predicate="within",
        )

        count_result = result["type"]
        area_result = result["area_in_meters"]
    else:

        self.logger.info(
            "Calculating building areas with area-weighted aggregation"
        )
        area_result = self.map_polygons(
            buildings_gdf,
            value_columns="area_in_meters",
            aggregation="sum",
            predicate="fractional",
        )

        self.logger.info("Counting Microsoft buildings per zone")

        count_result = self.map_points(
            points=building_centroids, predicate="within"
        )

    self.add_variable_to_view(count_result, "ms_buildings_count")
    self.add_variable_to_view(area_result, "ms_buildings_area_in_meters")

    return self.view
map_smod(year=2020, resolution=1000, stat='median', name_prefix='smod_class_', **kwargs)

Map GHSL Settlement Model data to zones.

Convenience method for mapping Global Human Settlement Layer Settlement Model data using appropriate default parameters for settlement classification analysis.

Parameters:

Name Type Description Default
ghsl_data_config GHSLDataConfig

Configuration for GHSL Settlement Model data. Defaults to GHS_SMOD product for 2020 at 1000m resolution in Mollweide projection.

required
stat str

Statistic to calculate for settlement class values within each zone. Defaults to "median" which gives the predominant settlement class.

'median'
name_prefix str

Prefix for the output column name. Defaults to "smod_class_".

'smod_class_'

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with zones and settlement classification. Adds a column named "{name_prefix}{stat}" containing the aggregated values.

Source code in gigaspatial/generators/zonal/geometry.py
def map_smod(
    self,
    year=2020,
    resolution=1000,
    stat: str = "median",
    name_prefix: str = "smod_class_",
    **kwargs,
) -> pd.DataFrame:
    """Map GHSL Settlement Model data to zones.

    Convenience method for mapping Global Human Settlement Layer Settlement Model
    data using appropriate default parameters for settlement classification analysis.

    Args:
        ghsl_data_config (GHSLDataConfig): Configuration for GHSL Settlement Model data.
            Defaults to GHS_SMOD product for 2020 at 1000m resolution in Mollweide projection.
        stat (str): Statistic to calculate for settlement class values within each zone.
            Defaults to "median" which gives the predominant settlement class.
        name_prefix (str): Prefix for the output column name. Defaults to "smod_class_".

    Returns:
        pd.DataFrame: Updated DataFrame with zones and settlement classification.
            Adds a column named "{name_prefix}{stat}" containing the aggregated values.
    """
    handler = GHSLDataHandler(
        product="GHS_SMOD",
        year=year,
        resolution=resolution,
        data_store=self.data_store,
        coord_system=54009,
        **kwargs,
    )

    return self.map_ghsl(
        handler=handler, stat=stat, name_prefix=name_prefix, **kwargs
    )

mercator

MercatorViewGenerator

Bases: GeometryBasedZonalViewGenerator[T]

Generates zonal views using Mercator tiles as the zones.

This class specializes in creating zonal views where the zones are defined by Mercator tiles. It extends the GeometryBasedZonalViewGenerator and leverages the MercatorTiles and CountryMercatorTiles classes to generate tiles based on various input sources.

The primary input source defines the geographical area of interest. This can be a country, a specific geometry, a set of points, or even a list of predefined quadkeys. The zoom_level determines the granularity of the Mercator tiles.

Attributes:

Name Type Description
source Union[str, BaseGeometry, GeoDataFrame, List[Union[Point, Tuple[float, float]]], List[str]]

Specifies the geographic area or specific tiles to use. Can be: - A country name (str): Uses CountryMercatorTiles to generate tiles covering the country. - A Shapely geometry (BaseGeometry): Uses MercatorTiles.from_spatial to create tiles intersecting the geometry. - A GeoDataFrame (gpd.GeoDataFrame): Uses MercatorTiles.from_spatial to create tiles intersecting the geometries. - A list of points (List[Union[Point, Tuple[float, float]]]): Uses MercatorTiles.from_spatial to create tiles containing the points. - A list of quadkeys (List[str]): Uses MercatorTiles.from_quadkeys to use the specified tiles directly.

zoom_level int

The zoom level of the Mercator tiles. Higher zoom levels result in smaller, more detailed tiles.

predicate str

The spatial predicate used when filtering tiles based on a spatial source (e.g., "intersects", "contains"). Defaults to "intersects".

config Optional[ZonalViewGeneratorConfig]

Configuration for the zonal view generation process.

data_store Optional[DataStore]

A DataStore instance for accessing data.

logger Optional[Logger]

A logger instance for logging.

Methods:

Name Description
_init_zone_data

Initializes the Mercator tile GeoDataFrame based on the input source.

# Inherits other methods from GeometryBasedZonalViewGenerator, such as
Example
Create a MercatorViewGenerator for tiles covering Germany at zoom level 6

generator = MercatorViewGenerator(source="Germany", zoom_level=6)

Create a MercatorViewGenerator for tiles intersecting a specific polygon

polygon = ... # Define a Shapely Polygon generator = MercatorViewGenerator(source=polygon, zoom_level=8)

Create a MercatorViewGenerator from a list of quadkeys

quadkeys = ["0020023131023032", "0020023131023033"] generator = MercatorViewGenerator(source=quadkeys, zoom_level=12)

Source code in gigaspatial/generators/zonal/mercator.py
class MercatorViewGenerator(GeometryBasedZonalViewGenerator[T]):
    """
    Generates zonal views using Mercator tiles as the zones.

    This class specializes in creating zonal views where the zones are defined by
    Mercator tiles. It extends the `GeometryBasedZonalViewGenerator` and leverages
    the `MercatorTiles` and `CountryMercatorTiles` classes to generate tiles based on
    various input sources.

    The primary input source defines the geographical area of interest. This can be
    a country, a specific geometry, a set of points, or even a list of predefined
    quadkeys. The `zoom_level` determines the granularity of the Mercator tiles.

    Attributes:
        source (Union[str, BaseGeometry, gpd.GeoDataFrame, List[Union[Point, Tuple[float, float]]], List[str]]):
            Specifies the geographic area or specific tiles to use. Can be:
            - A country name (str): Uses `CountryMercatorTiles` to generate tiles covering the country.
            - A Shapely geometry (BaseGeometry):  Uses `MercatorTiles.from_spatial` to create tiles intersecting the geometry.
            - A GeoDataFrame (gpd.GeoDataFrame): Uses `MercatorTiles.from_spatial` to create tiles intersecting the geometries.
            - A list of points (List[Union[Point, Tuple[float, float]]]):  Uses `MercatorTiles.from_spatial` to create tiles containing the points.
            - A list of quadkeys (List[str]): Uses `MercatorTiles.from_quadkeys` to use the specified tiles directly.
        zoom_level (int): The zoom level of the Mercator tiles. Higher zoom levels result in smaller, more detailed tiles.
        predicate (str):  The spatial predicate used when filtering tiles based on a spatial source (e.g., "intersects", "contains"). Defaults to "intersects".
        config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generation process.
        data_store (Optional[DataStore]):  A DataStore instance for accessing data.
        logger (Optional[logging.Logger]):  A logger instance for logging.

    Methods:
        _init_zone_data(source, zoom_level, predicate):  Initializes the Mercator tile GeoDataFrame based on the input source.
        # Inherits other methods from GeometryBasedZonalViewGenerator, such as:
        # map_ghsl(), map_google_buildings(), map_ms_buildings(), aggregate_data(), save_view()

    Example:
        # Create a MercatorViewGenerator for tiles covering Germany at zoom level 6
        generator = MercatorViewGenerator(source="Germany", zoom_level=6)

        # Create a MercatorViewGenerator for tiles intersecting a specific polygon
        polygon = ... # Define a Shapely Polygon
        generator = MercatorViewGenerator(source=polygon, zoom_level=8)

        # Create a MercatorViewGenerator from a list of quadkeys
        quadkeys = ["0020023131023032", "0020023131023033"]
        generator = MercatorViewGenerator(source=quadkeys, zoom_level=12)
    """

    def __init__(
        self,
        source: Union[
            str,  # country
            BaseGeometry,  # shapely geom
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],  # points
            List[str],  # quadkeys
        ],
        zoom_level: int,
        predicate="intersects",
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):

        super().__init__(
            zone_data=self._init_zone_data(source, zoom_level, predicate, data_store),
            zone_id_column="quadkey",
            config=config,
            data_store=data_store,
            logger=logger,
        )
        self.logger.info(f"Initialized MercatorViewGenerator")

    def _init_zone_data(self, source, zoom_level, predicate, data_store=None):
        if isinstance(source, str):
            tiles = CountryMercatorTiles.create(
                country=source, zoom_level=zoom_level, data_store=data_store
            )
        elif isinstance(source, (BaseGeometry, Iterable)):
            if isinstance(source, Iterable) and all(
                isinstance(qk, str) for qk in source
            ):
                tiles = MercatorTiles.from_quadkeys(source)
            else:
                tiles = MercatorTiles.from_spatial(
                    source=source, zoom_level=zoom_level, predicate=predicate
                )
        else:
            raise TypeError(
                f"Unsupported source type for MercatorViewGenerator. 'source' must be "
                f"a country name (str), a Shapely geometry, a GeoDataFrame, "
                f"a list of quadkeys (str), or a list of (lon, lat) tuples/Shapely Point objects. "
                f"Received type: {type(source)}."
            )

        return tiles.to_geodataframe()