Skip to content

Generators Module

gigaspatial.generators

poi

PoiViewGenerator

POI View Generator for integrating various geospatial datasets such as Google Open Buildings, Microsoft Global Buildings, GHSL Built Surface, and GHSL Settlement Model (SMOD) data with Points of Interest (POIs).

This class provides methods to load, process, and map external geospatial data to a given set of POIs, enriching them with relevant attributes. It leverages handler/reader classes for efficient data access and processing.

The POIs can be initialized from a list of (latitude, longitude) tuples, a list of dictionaries, a pandas DataFrame, or a geopandas GeoDataFrame.

Source code in gigaspatial/generators/poi.py
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
class PoiViewGenerator:
    """
    POI View Generator for integrating various geospatial datasets
    such as Google Open Buildings, Microsoft Global Buildings, GHSL Built Surface,
    and GHSL Settlement Model (SMOD) data with Points of Interest (POIs).

    This class provides methods to load, process, and map external geospatial
    data to a given set of POIs, enriching them with relevant attributes.
    It leverages handler/reader classes for efficient data access and processing.

    The POIs can be initialized from a list of (latitude, longitude) tuples,
    a list of dictionaries, a pandas DataFrame, or a geopandas GeoDataFrame.
    """

    def __init__(
        self,
        points: Union[
            List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame
        ],
        poi_id_column: str = "poi_id",
        config: Optional[PoiViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):
        """
        Initializes the PoiViewGenerator with the input points and configurations.

        The input `points` are converted into an internal GeoDataFrame
        (`_points_gdf`) for consistent geospatial operations.

        Args:
            points (Union[List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame]):
                The input points of interest. Can be:
                - A list of (latitude, longitude) tuples.
                - A list of dictionaries, where each dict must contain 'latitude' and 'longitude' keys.
                - A pandas DataFrame with 'latitude' and 'longitude' columns.
                - A geopandas GeoDataFrame (expected to have a 'geometry' column representing points).
            generator_config (Optional[PoiViewGeneratorConfig]):
                Configuration for the POI view generation process. If None, a
                default `PoiViewGeneratorConfig` will be used.
            data_store (Optional[DataStore]):
                An instance of a data store for managing data access (e.g., LocalDataStore).
                If None, a default `LocalDataStore` will be used.
        """
        if hasattr(points, "__len__") and len(points) == 0:
            raise ValueError("Points input cannot be empty")

        self.config = config or PoiViewGeneratorConfig()
        self.data_store = data_store or LocalDataStore()
        self.logger = logger or global_config.get_logger(self.__class__.__name__)
        self._points_gdf = self._init_points_gdf(points, poi_id_column)
        self._view: pd.DataFrame = self._points_gdf.drop(columns=["geometry"])

    @staticmethod
    def _init_points_gdf(
        points: Union[
            List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame
        ],
        poi_id_column: str,
    ) -> gpd.GeoDataFrame:
        """
        Internal static method to convert various point input formats into a GeoDataFrame.

        This method standardizes coordinate column names to 'latitude' and 'longitude'
        for consistent internal representation. It also ensures each point has a unique
        identifier in the 'poi_id' column.

        Args:
            points: Input points in various formats:
                - List of (latitude, longitude) tuples
                - List of dictionaries with coordinate keys
                - DataFrame with coordinate columns
                - GeoDataFrame with point geometries

        Returns:
            gpd.GeoDataFrame: Standardized GeoDataFrame with 'latitude', 'longitude',
                             and 'poi_id' columns

        Raises:
            ValueError: If points format is not supported or coordinate columns cannot be detected
        """
        if isinstance(points, gpd.GeoDataFrame):
            # Check for duplicate indices and reset if found
            if points.index.duplicated().any():
                warnings.warn(
                    "Duplicate indices detected in GeoDataFrame. Resetting index to ensure unique indices.",
                    UserWarning,
                    stacklevel=2,
                )
                points = points.reset_index(drop=True)
            # Convert geometry to lat/lon if needed
            if points.geometry.name == "geometry":
                points = points.copy()
                points["latitude"] = points.geometry.y
                points["longitude"] = points.geometry.x
            if poi_id_column not in points.columns:
                points["poi_id"] = [f"poi_{i}" for i in range(len(points))]
            else:
                points = points.rename(
                    columns={poi_id_column: "poi_id"},
                )
                if points["poi_id"].duplicated().any():
                    raise ValueError(
                        f"Column '{poi_id_column}' provided as 'poi_id_column' contains duplicate values."
                    )

            if points.crs != "EPSG:4326":
                points = points.to_crs("EPSG:4326")
            return points

        elif isinstance(points, pd.DataFrame):
            # Detect and standardize coordinate columns
            try:
                # Check for duplicate indices and reset if found
                if points.index.duplicated().any():
                    warnings.warn(
                        "Duplicate indices detected in DataFrame. Resetting index to ensure unique indices.",
                        UserWarning,
                        stacklevel=2,
                    )
                    points = points.reset_index(drop=True)
                lat_col, lon_col = detect_coordinate_columns(points)
                points = points.copy()
                points["latitude"] = points[lat_col]
                points["longitude"] = points[lon_col]
                if poi_id_column not in points.columns:
                    points[poi_id_column] = [f"poi_{i}" for i in range(len(points))]
                else:
                    points = points.rename(
                        columns={poi_id_column: "poi_id"},
                    )
                    if points["poi_id"].duplicated().any():
                        raise ValueError(
                            f"Column '{poi_id_column}' provided as 'poi_id_column' contains duplicate values."
                        )
                return convert_to_geodataframe(
                    points, lat_col="latitude", lon_col="longitude"
                )
            except ValueError as e:
                raise ValueError(
                    f"Could not detect coordinate columns in DataFrame: {str(e)}"
                )

        elif isinstance(points, list):
            if len(points) == 0:
                return gpd.GeoDataFrame(
                    columns=["latitude", "longitude", "poi_id", "geometry"],
                    geometry="geometry",
                    crs="EPSG:4326",
                )

            if isinstance(points[0], tuple) and len(points[0]) == 2:
                # List of (lat, lon) tuples
                df = pd.DataFrame(points, columns=["latitude", "longitude"])
                df["poi_id"] = [f"poi_{i}" for i in range(len(points))]
                return convert_to_geodataframe(df)

            elif isinstance(points[0], dict):
                # List of dictionaries
                df = pd.DataFrame(points)
                # Check for duplicate indices and reset if found
                if df.index.duplicated().any():
                    warnings.warn(
                        "Duplicate indices detected in DataFrame created from dictionary list. Resetting index to ensure unique indices.",
                        UserWarning,
                        stacklevel=2,
                    )
                    df = df.reset_index(drop=True)
                try:
                    lat_col, lon_col = detect_coordinate_columns(df)
                    df["latitude"] = df[lat_col]
                    df["longitude"] = df[lon_col]
                    if poi_id_column not in df.columns:
                        df["poi_id"] = [f"poi_{i}" for i in range(len(points))]
                    else:
                        df = df.rename(
                            columns={poi_id_column: "poi_id"},
                        )
                        if df["poi_id"].duplicated().any():
                            raise ValueError(
                                f"Column '{poi_id_column}' provided as 'poi_id_column' contains duplicate values."
                            )
                    return convert_to_geodataframe(
                        df, lat_col="latitude", lon_col="longitude"
                    )
                except ValueError as e:
                    raise ValueError(
                        f"Could not detect coordinate columns in dictionary list: {str(e)}"
                    )

        raise ValueError("Unsupported points input type for PoiViewGenerator.")

    @property
    def points_gdf(self) -> gpd.GeoDataFrame:
        """Gets the internal GeoDataFrame of points of interest."""
        return self._points_gdf

    @property
    def view(self) -> pd.DataFrame:
        """The DataFrame representing the current point of interest view."""
        return self._view

    def _update_view(self, new_data: pd.DataFrame) -> None:
        """
        Internal helper to update the main view DataFrame with new columns.
        This method is designed to be called by map_* methods.

        Args:
            new_data (pd.DataFrame): A DataFrame containing 'poi_id' and new columns
                                     to be merged into the main view.
        """
        if "poi_id" not in new_data.columns:
            available_cols = list(new_data.columns)
            raise ValueError(
                f"new_data DataFrame must contain 'poi_id' column. "
                f"Available columns: {available_cols}"
            )

        # Check for poi_id mismatches
        original_poi_ids = set(self._view["poi_id"])
        new_poi_ids = set(new_data["poi_id"])
        missing_pois = original_poi_ids - new_poi_ids

        if missing_pois:
            self.logger.warning(
                f"{len(missing_pois)} POIs will have NaN values for new columns"
            )

        # Ensure poi_id is the index for efficient merging
        # Create a copy to avoid SettingWithCopyWarning if new_data is a slice
        new_data_indexed = new_data.set_index("poi_id").copy()

        # Merge on 'poi_id' (which is now the index of self._view and new_data_indexed)
        # Using left join to keep all POIs from the original view
        self._view = (
            self._view.set_index("poi_id")
            .join(new_data_indexed, how="left")
            .reset_index()
        )

        self.logger.debug(
            f"View updated with columns: {list(new_data_indexed.columns)}"
        )

    def map_nearest_points(
        self,
        points_df: Union[pd.DataFrame, gpd.GeoDataFrame],
        id_column: Optional[str] = None,
        lat_column: Optional[str] = None,
        lon_column: Optional[str] = None,
        output_prefix: str = "nearest",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps nearest points from a given DataFrame to the POIs.

        Enriches the `points_gdf` with the ID and distance to the nearest point
        from the input DataFrame for each POI.

        Args:
            points_df (Union[pd.DataFrame, gpd.GeoDataFrame]):
                DataFrame containing points to find nearest neighbors from.
                Must have latitude and longitude columns or point geometries.
            id_column (str, optional):
                Name of the column containing unique identifiers for each point.
                If None, the index of points_df will be used instead.
            lat_column (str, optional):
                Name of the latitude column in points_df. If None, will attempt to detect it
                or extract from geometry if points_df is a GeoDataFrame.
            lon_column (str, optional):
                Name of the longitude column in points_df. If None, will attempt to detect it
                or extract from geometry if points_df is a GeoDataFrame.
            output_prefix (str, optional):
                Prefix for the output column names. Defaults to "nearest".
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with new columns:
                          '{output_prefix}_id' and '{output_prefix}_distance'.
                          Returns a copy of the current `points_gdf` if no points are found.

        Raises:
            ValueError: If required columns are missing from points_df or if coordinate
                       columns cannot be detected or extracted from geometry.
        """
        self.logger.info(
            f"Mapping nearest points from {points_df.__class__.__name__} to POIs"
        )

        # Validate input DataFrame
        if points_df.empty:
            self.logger.info("No points found in the input DataFrame")
            return self.view

        # Handle GeoDataFrame
        if isinstance(points_df, gpd.GeoDataFrame):
            points_df = points_df.copy()
            if points_df.geometry.name == "geometry":
                points_df["latitude"] = points_df.geometry.y
                points_df["longitude"] = points_df.geometry.x
                lat_column = "latitude"
                lon_column = "longitude"
                self.logger.info("Extracted coordinates from geometry")

        # Detect coordinate columns if not provided
        if lat_column is None or lon_column is None:
            try:
                detected_lat, detected_lon = detect_coordinate_columns(points_df)
                lat_column = lat_column or detected_lat
                lon_column = lon_column or detected_lon
                self.logger.info(
                    f"Detected coordinate columns: {lat_column}, {lon_column}"
                )
            except ValueError as e:
                raise ValueError(f"Could not detect coordinate columns: {str(e)}")

        # Validate required columns
        required_columns = [lat_column, lon_column]
        if id_column is not None:
            required_columns.append(id_column)
        missing_columns = [
            col for col in required_columns if col not in points_df.columns
        ]
        if missing_columns:
            raise ValueError(
                f"Missing required columns in points_df: {missing_columns}"
            )

        from gigaspatial.processing.geo import calculate_distance

        self.logger.info("Calculating nearest points for each POI")
        tree = cKDTree(points_df[[lat_column, lon_column]])
        points_df_poi = self.points_gdf.copy()
        _, idx = tree.query(points_df_poi[["latitude", "longitude"]], k=1)
        df_nearest = points_df.iloc[idx]
        dist = calculate_distance(
            lat1=points_df_poi.latitude,
            lon1=points_df_poi.longitude,
            lat2=df_nearest[lat_column],
            lon2=df_nearest[lon_column],
        )
        # Create a temporary DataFrame to hold the results for merging
        # Use id_column if provided, otherwise use index
        if id_column is not None:
            nearest_ids = points_df.iloc[idx][id_column].values
        else:
            nearest_ids = points_df.index[idx].values

        temp_result_df = pd.DataFrame(
            {
                "poi_id": points_df_poi["poi_id"],
                f"{output_prefix}_id": nearest_ids,
                f"{output_prefix}_distance": dist,
            }
        )

        self.logger.info(
            f"Nearest points mapping complete with prefix '{output_prefix}'"
        )
        return temp_result_df  # Return the DataFrame

    def map_google_buildings(
        self,
        handler: Optional[GoogleOpenBuildingsHandler] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps Google Open Buildings data to the POIs by finding the nearest building.

        Enriches the `points_gdf` with the ID and distance to the nearest
        Google Open Building for each POI.

        Args:
            data_config (Optional[GoogleOpenBuildingsConfig]):
                Configuration for accessing Google Open Buildings data. If None, a
                default `GoogleOpenBuildingsConfig` will be used.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with new columns:
                          'nearest_google_building_id' and 'nearest_google_building_distance'.
                          Returns a copy of the current `points_gdf` if no buildings are found.
        """
        self.logger.info("Mapping Google Open Buildings data to POIs")
        handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)

        self.logger.info("Loading Google Buildings point data")
        buildings_df = handler.load_points(
            self.points_gdf, ensure_available=self.config.ensure_available, **kwargs
        )
        if buildings_df is None or len(buildings_df) == 0:
            self.logger.info("No Google buildings data found for the provided POIs")
            return self.view

        mapped_data = self.map_nearest_points(
            points_df=buildings_df,
            id_column="full_plus_code",
            output_prefix="nearest_google_building",
            **kwargs,
        )
        self._update_view(mapped_data)
        return self.view

    def map_ms_buildings(
        self,
        handler: Optional[MSBuildingsHandler] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps Microsoft Global Buildings data to the POIs by finding the nearest building.

        Enriches the `points_gdf` with the ID and distance to the nearest
        Microsoft Global Building for each POI. If buildings don't have an ID column,
        creates a unique ID using the building's coordinates.

        Args:
            data_config (Optional[MSBuildingsConfig]):
                Configuration for accessing Microsoft Global Buildings data. If None, a
                default `MSBuildingsConfig` will be used.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with new columns:
                          'nearest_ms_building_id' and 'nearest_ms_building_distance'.
                          Returns a copy of the current `points_gdf` if no buildings are found.
        """
        self.logger.info("Mapping Microsoft Global Buildings data to POIs")
        handler = handler or MSBuildingsHandler(data_store=self.data_store)
        self.logger.info("Loading Microsoft Buildings polygon data")
        buildings_gdf = handler.load_data(
            self.points_gdf, ensure_available=self.config.ensure_available
        )
        if buildings_gdf is None or len(buildings_gdf) == 0:
            self.logger.info("No Microsoft buildings data found for the provided POIs")
            return self.points_gdf.copy()

        building_centroids = get_centroids(buildings_gdf)

        if "building_id" not in buildings_gdf:
            self.logger.info("Creating building IDs from coordinates")
            building_centroids["building_id"] = building_centroids.apply(
                lambda row: f"{row.geometry.y:.6f}_{row.geometry.x:.6f}",
                axis=1,
            )

        mapped_data = self.map_nearest_points(
            points_df=building_centroids,
            id_column="building_id",
            output_prefix="nearest_ms_building",
            **kwargs,
        )
        self._update_view(mapped_data)
        return self.view

    def map_zonal_stats(
        self,
        data: Union[List[TifProcessor], gpd.GeoDataFrame],
        stat: str = "mean",
        map_radius_meters: Optional[float] = None,
        output_column: str = "zonal_stat",
        value_column: Optional[str] = None,
        predicate: Literal["intersects", "within", "fractional"] = "intersects",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps zonal statistics from raster or polygon data to POIs.

        Can operate in three modes:
        1. Raster point sampling: Directly samples raster values at POI locations
        2. Raster zonal statistics: Creates buffers around POIs and calculates statistics within them
        3. Polygon aggregation: Aggregates polygon data to POI buffers with optional area weighting

        Args:
            data (Union[TifProcessor, List[TifProcessor], gpd.GeoDataFrame]):
                Either a TifProcessor object, a list of TifProcessor objects (which will be merged
                into a single TifProcessor for processing), or a GeoDataFrame containing polygon
                data to aggregate.
            stat (str, optional):
                For raster data: Statistic to calculate ("sum", "mean", "median", "min", "max").
                For polygon data: Aggregation method to use.
                Defaults to "mean".
            map_radius_meters (float, optional):
                If provided, creates circular buffers of this radius around each POI
                and calculates statistics within the buffers. If None, samples directly
                at POI locations (only for raster data).
            output_column (str, optional):
                Name of the output column to store the results. Defaults to "zonal_stat".
            value_column (str, optional):
                For polygon data: Name of the column to aggregate. Required for polygon data.
                Not used for raster data.
            predicate (Literal["intersects", "within", "fractional"], optional):
                The spatial relationship to use for aggregation. Defaults to "intersects".
            **kwargs:
                Additional keyword arguments passed to the sampling/aggregation functions.

        Returns:
            pd.DataFrame: The updated GeoDataFrame with a new column containing the
                          calculated statistics. Returns a copy of the current `points_gdf`
                          if no valid data is found.

        Raises:
            ValueError: If no valid data is provided, if parameters are incompatible,
                      or if required parameters (value_column) are missing for polygon data.
        """

        raster_processor: Optional[TifProcessor] = None

        if isinstance(data, TifProcessor):
            raster_processor = data
        elif isinstance(data, list) and all(isinstance(x, TifProcessor) for x in data):
            if not data:
                self.logger.info("No valid raster data provided")
                return self.view

            if len(data) > 1:
                all_source_paths = [tp.dataset_path for tp in data]

                self.logger.info(
                    f"Merging {len(all_source_paths)} rasters into a single TifProcessor for zonal statistics."
                )
                raster_processor = TifProcessor(
                    dataset_path=all_source_paths,
                    data_store=self.data_store,
                    **kwargs,
                )
            else:
                raster_processor = data[0]

        if raster_processor:
            results_df = pd.DataFrame({"poi_id": self.points_gdf["poi_id"]})
            raster_crs = raster_processor.crs

            if map_radius_meters is not None:
                self.logger.info(
                    f"Calculating {stat} within {map_radius_meters}m buffers around POIs"
                )
                # Create buffers around POIs
                buffers_gdf = buffer_geodataframe(
                    self.points_gdf,
                    buffer_distance_meters=map_radius_meters,
                    cap_style="round",
                )

                # Calculate zonal statistics
                sampled_values = raster_processor.sample_by_polygons(
                    polygon_list=buffers_gdf.to_crs(raster_crs).geometry,
                    stat=stat,
                )
            else:
                self.logger.info(f"Sampling {stat} at POI locations")
                # Sample directly at POI locations
                coord_list = (
                    self.points_gdf.to_crs(raster_crs).get_coordinates().to_numpy()
                )
                sampled_values = raster_processor.sample_by_coordinates(
                    coordinate_list=coord_list, **kwargs
                )

            results_df[output_column] = sampled_values

        elif isinstance(data, gpd.GeoDataFrame):
            # Handle polygon data
            if data.empty:
                self.logger.info("No valid GeoDataFrame data provided")
                return pd.DataFrame(
                    columns=["poi_id", output_column]
                )  # Return empty DataFrame

            if map_radius_meters is None:
                raise ValueError(
                    "map_radius_meters must be provided for for GeoDataFrame data"
                )

            # Create buffers around POIs
            buffer_gdf = buffer_geodataframe(
                self.points_gdf,
                buffer_distance_meters=map_radius_meters,
                cap_style="round",
            )

            if any(data.geom_type.isin(["MultiPoint", "Point"])):

                self.logger.info(
                    f"Aggregating point data within {map_radius_meters}m buffers around POIs using predicate '{predicate}'"
                )

                # If no value_column, default to 'count'
                if value_column is None:
                    actual_stat = "count"
                    self.logger.warning(
                        "No value_column provided for point data. Defaulting to 'count' aggregation."
                    )
                else:
                    actual_stat = stat
                    if value_column not in data.columns:
                        raise ValueError(
                            f"Value column '{value_column}' not found in input GeoDataFrame."
                        )

                aggregation_result_gdf = aggregate_points_to_zones(
                    points=data,
                    zones=buffer_gdf,
                    value_columns=value_column,
                    aggregation=actual_stat,
                    point_zone_predicate=predicate,  # can't be `fractional``
                    zone_id_column="poi_id",
                    output_suffix="",
                    drop_geometry=True,
                    **kwargs,
                )

                output_col_from_agg = (
                    f"{value_column}_{actual_stat}" if value_column else "point_count"
                )
                results_df = aggregation_result_gdf[["poi_id", output_col_from_agg]]

                if output_column != "zonal_stat":
                    results_df = results_df.rename(
                        columns={output_col_from_agg: output_column}
                    )

            else:
                if value_column is None:
                    raise ValueError(
                        "value_column must be provided for polygon data aggregation."
                    )
                if value_column not in data.columns:
                    raise ValueError(
                        f"Value column '{value_column}' not found in input GeoDataFrame."
                    )
                self.logger.info(
                    f"Aggregating polygon data within {map_radius_meters}m buffers around POIs using predicate '{predicate}'"
                )

                # Aggregate polygons to buffers
                aggregation_result_gdf = aggregate_polygons_to_zones(
                    polygons=data,
                    zones=buffer_gdf,
                    value_columns=value_column,
                    aggregation=stat,
                    predicate=predicate,
                    zone_id_column="poi_id",
                    output_suffix="",
                    drop_geometry=True,
                    **kwargs,
                )

                output_col_from_agg = value_column

            results_df = aggregation_result_gdf[["poi_id", output_col_from_agg]]

            if output_column != "zonal_stat":
                results_df = results_df.rename(
                    columns={output_col_from_agg: output_column}
                )

        else:
            raise ValueError(
                "data must be either a list of TifProcessor objects or a GeoDataFrame"
            )

        # self._update_view(results_df) # Removed direct view update
        self.logger.info(
            f"Zonal statistics mapping complete for column(s) derived from '{output_column}' or '{value_column}'"
        )
        return results_df  # Return the DataFrame

    def map_built_s(
        self,
        map_radius_meters: float = 150,
        stat: str = "sum",
        dataset_year=2020,
        dataset_resolution=100,
        output_column="built_surface_m2",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps GHSL Built Surface (GHS_BUILT_S) data to the POIs.

        Calculates the sum of built surface area within a specified buffer
        radius around each POI. Enriches `points_gdf` with the 'built_surface_m2' column.

        Args:
            data_config (Optional[GHSLDataConfig]):
                Configuration for accessing GHSL Built Surface data. If None, a
                default `GHSLDataConfig` for 'GHS_BUILT_S' will be used.
            map_radius_meters (float):
                The buffer distance in meters around each POI to calculate
                zonal statistics for built surface. Defaults to 150 meters.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with a new column:
                          'built_surface_m2'. Returns a copy of the current
                          `points_gdf` if no GHSL Built Surface data is found.
        """
        self.logger.info("Mapping GHSL Built Surface data to POIs")
        handler = GHSLDataHandler(
            product="GHS_BUILT_S",
            year=dataset_year,
            resolution=dataset_resolution,
            data_store=self.data_store,
            **kwargs,
        )
        self.logger.info("Loading GHSL Built Surface raster tiles")
        tif_processors = handler.load_data(
            self.points_gdf.copy(),
            ensure_available=self.config.ensure_available,
            merge_rasters=True,
            **kwargs,
        )

        mapped_data = self.map_zonal_stats(
            data=tif_processors,
            stat=stat,
            map_radius_meters=map_radius_meters,
            output_column=output_column,
            **kwargs,
        )
        self._update_view(mapped_data)
        return self.view

    def map_smod(
        self,
        stat="median",
        dataset_year=2020,
        dataset_resolution=1000,
        output_column="smod_class",
        **kwargs,
    ) -> pd.DataFrame:
        """
        Maps GHSL Settlement Model (SMOD) data to the POIs.

        Samples the SMOD class value at each POI's location. Enriches `points_gdf`
        with the 'smod_class' column.

        Args:
            data_config (Optional[GHSLDataConfig]):
                Configuration for accessing GHSL SMOD data. If None, a
                default `GHSLDataConfig` for 'GHS_SMOD' will be used.
            **kwargs:
                Additional keyword arguments passed to the data reader (if applicable).

        Returns:
            pd.DataFrame: The updated GeoDataFrame with a new column:
                          'smod_class'. Returns a copy of the current
                          `points_gdf` if no GHSL SMOD data is found.
        """
        self.logger.info("Mapping GHSL Settlement Model (SMOD) data to POIs")
        handler = GHSLDataHandler(
            product="GHS_SMOD",
            year=dataset_year,
            resolution=dataset_resolution,
            data_store=self.data_store,
            coord_system=54009,
            **kwargs,
        )

        self.logger.info("Loading GHSL SMOD raster tiles")
        tif_processors = handler.load_data(
            self.points_gdf.copy(),
            ensure_available=self.config.ensure_available,
            merge_rasters=True,
            **kwargs,
        )

        mapped_data = self.map_zonal_stats(
            data=tif_processors,
            stat=stat,
            output_column=output_column,
            **kwargs,
        )
        self._update_view(mapped_data)
        return self.view

    def map_wp_pop(
        self,
        country: Union[str, List[str]],
        map_radius_meters: float,
        resolution=1000,
        predicate: Literal[
            "centroid_within", "intersects", "fractional", "within"
        ] = "intersects",
        output_column: str = "population",
        **kwargs,
    ):
        # Ensure country is always a list for consistent handling
        countries_list = [country] if isinstance(country, str) else country

        handler = WPPopulationHandler(
            resolution=resolution,
            data_store=self.data_store,
            **kwargs,
        )

        # Restrict to single country for age_structures project
        if handler.config.project == "age_structures" and len(countries_list) > 1:
            raise ValueError(
                "For 'age_structures' project, only a single country can be processed at a time."
            )

        self.logger.info(
            f"Mapping WorldPop Population data (year: {handler.config.year}, resolution: {handler.config.resolution}m)"
        )

        if predicate == "fractional" and resolution == 100:
            self.logger.warning(
                "Fractional aggregations only supported for datasets with 1000m resolution. Using `intersects` as predicate"
            )
            predicate = "intersects"

        data_to_process: Union[List[TifProcessor], gpd.GeoDataFrame, pd.DataFrame]

        if predicate == "centroid_within":
            if handler.config.project == "age_structures":
                # Load individual tif processors for the single country
                all_tif_processors = handler.load_data(
                    countries_list[0],
                    ensure_available=self.config.ensure_available,
                    **kwargs,
                )

                # Sum results from each tif_processor separately
                summed_results_by_poi = {
                    poi_id: 0.0 for poi_id in self.points_gdf["poi_id"].unique()
                }

                self.logger.info(
                    f"Sampling individual age_structures rasters using 'sum' statistic and summing per POI."
                )
                for tif_processor in all_tif_processors:
                    single_raster_df = self.map_zonal_stats(
                        data=tif_processor,
                        stat="sum",
                        map_radius_meters=map_radius_meters,
                        value_column="pixel_value",
                        predicate=predicate,
                        output_column=output_column,  # This output_column will be in the temporary DF
                        **kwargs,
                    )
                    # Add values from this single raster to the cumulative sum
                    for _, row in single_raster_df.iterrows():
                        summed_results_by_poi[row["poi_id"]] += row[output_column]

                # Convert the summed dictionary back to a DataFrame
                data_to_process = pd.DataFrame(
                    list(summed_results_by_poi.items()),
                    columns=["poi_id", output_column],
                )

            else:
                # Existing behavior for non-age_structures projects or if merging is fine
                # 'data_to_process' will be a list of TifProcessor objects, which map_zonal_stats will merge
                data_to_process = []
                for c in countries_list:
                    data_to_process.extend(
                        handler.load_data(
                            c, ensure_available=self.config.ensure_available, **kwargs
                        )
                    )
        else:
            # 'data_to_process' will be a GeoDataFrame
            data_to_process = pd.concat(
                [
                    handler.load_into_geodataframe(
                        c, ensure_available=self.config.ensure_available, **kwargs
                    )
                    for c in countries_list  # Original iteration over countries_list
                ],
                ignore_index=True,
            )

        self.logger.info(
            f"Mapping WorldPop Population data into {map_radius_meters}m zones around POIs using 'sum' statistic"
        )

        final_mapped_df: pd.DataFrame

        # If 'data_to_process' is already the summed DataFrame (from age_structures/centroid_within branch),
        # use it directly.
        if (
            isinstance(data_to_process, pd.DataFrame)
            and output_column in data_to_process.columns
            and "poi_id" in data_to_process.columns
        ):
            final_mapped_df = data_to_process
        else:
            # For other cases, proceed with the original call to map_zonal_stats
            final_mapped_df = self.map_zonal_stats(
                data=data_to_process,
                stat="sum",
                map_radius_meters=map_radius_meters,
                value_column="pixel_value",
                predicate=predicate,
                output_column=output_column,
                **kwargs,
            )
        self._update_view(
            final_mapped_df
        )  # Update the view with the final mapped DataFrame
        return self.view

    def save_view(
        self,
        name: str,
        output_format: Optional[str] = None,
    ) -> Path:
        """
        Saves the current POI view (the enriched DataFrame) to a file.

        The output path and format are determined by the `config`
        or overridden by the `output_format` parameter.

        Args:
            name (str): The base name for the output file (without extension).
            output_format (Optional[str]):
                The desired output format (e.g., "csv", "geojson"). If None,
                the `output_format` from `config` will be used.

        Returns:
            Path: The full path to the saved output file.
        """
        format_to_use = output_format or self.config.output_format
        output_path = self.config.base_path / f"{name}.{format_to_use}"

        self.logger.info(f"Saving POI view to {output_path}")
        # Save the current view, which is a pandas DataFrame, not a GeoDataFrame
        # GeoJSON/Shapefile formats would require converting back to GeoDataFrame first.
        # For CSV, Parquet, Feather, this is fine.
        if format_to_use in ["geojson", "shp", "gpkg"]:
            self.logger.warning(
                f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
            )
            # Re-add geometry for saving to geospatial formats
            view_to_save_gdf = self.view.merge(
                self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
            )
            write_dataset(
                data=view_to_save_gdf,
                path=str(output_path),
                data_store=self.data_store,
            )
        else:
            write_dataset(
                data=self.view,  # Use the internal _view DataFrame
                path=str(output_path),
                data_store=self.data_store,
            )

        return output_path

    def to_dataframe(self) -> pd.DataFrame:
        """
        Returns the current POI view as a DataFrame.

        This method combines all accumulated variables in the view

        Returns:
            pd.DataFrame: The current view.
        """
        return self.view

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Returns the current POI view merged with the original point geometries as a GeoDataFrame.

        This method combines all accumulated variables in the view with the corresponding
        point geometries, providing a spatially-enabled DataFrame for further analysis or export.

        Returns:
            gpd.GeoDataFrame: The current view merged with point geometries.
        """
        return gpd.GeoDataFrame(
            self.view.merge(
                self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
            ),
            crs="EPSG:4326",
        )

    def chain_operations(self, operations: List[dict]) -> "PoiViewGenerator":
        """
        Chain multiple mapping operations for fluent interface.

        Args:
            operations: List of dicts with 'method' and 'kwargs' keys

        Example:
            generator.chain_operations([
                {'method': 'map_google_buildings', 'kwargs': {}},
                {'method': 'map_built_s', 'kwargs': {'map_radius_meters': 200}},
            ])
        """
        for op in operations:
            method_name = op["method"]
            kwargs = op.get("kwargs", {})
            if hasattr(self, method_name):
                getattr(self, method_name)(**kwargs)
            else:
                raise AttributeError(f"Method {method_name} not found")
        return self

    def validate_data_coverage(self, data_bounds: gpd.GeoDataFrame) -> dict:
        """
        Validate how many POIs fall within the data coverage area.

        Returns:
            dict: Coverage statistics
        """
        poi_within = self.points_gdf.within(data_bounds.union_all())
        coverage_stats = {
            "total_pois": len(self.points_gdf),
            "covered_pois": poi_within.sum(),
            "coverage_percentage": (poi_within.sum() / len(self.points_gdf)) * 100,
            "uncovered_pois": (~poi_within).sum(),
        }
        return coverage_stats

    def find_nearest_buildings(
        self,
        country: str,
        search_radius: float = 1000,
        source_filter: Literal["google", "microsoft"] = None,
        find_nearest_globally: bool = False,
        **kwargs,
    ) -> pd.DataFrame:
        """
        Find the nearest building to each POI within a specified search radius.

        This method processes building data by:
        1. Filtering to only building tiles that intersect POI buffers (partitioned datasets)
        2. Finding the nearest building candidate per POI (nearest-neighbor search)
        3. Computing final POI-to-building distances in **meters** using haversine distance

        Parameters
        ----------
        country : str
            Country code for which to load building data.

        search_radius : float, default=1000
            Search radius in meters. Only buildings within this distance from a POI
            will be considered. For better performance, use the smallest radius
            that meets your requirements.

        source_filter : {'google', 'microsoft'}, optional
            Filter buildings by data source. If None, uses buildings from all sources.

        find_nearest_globally : bool, default=False
            If True, finds the true nearest building regardless of distance.
            This overrides search_radius and may be significantly slower.
            When False, uses the efficient radius-limited search.

        **kwargs : dict
            Additional arguments passed to the building data handler.

        Returns
        -------
        pd.DataFrame
            DataFrame with columns:
            - poi_id: Original POI identifier
            - nearest_building_distance_m: Distance to nearest building in meters.
            NaN if no building found within the search constraints.
            - building_within_{search_radius}m: Boolean indicating if a building
            was found within the specified search_radius.

        Notes
        -----
        - Distances are computed in meters using haversine (great-circle) distance via
        `calculate_distance`.
        - Nearest-neighbor candidate selection is performed using coordinates extracted
        from geometries.
        - For countries with a single building file (no partitioning),
        the search is performed globally regardless of search_radius.
        - For partitioned countries, search_radius optimizes performance by
        filtering which tiles to process.

        Examples
        --------
        >>> # Find buildings within 350m of POIs
        >>> result = poi_gdf.find_nearest_buildings("USA", search_radius=350)

        >>> # Find nearest building globally (may be slow for partitioned countries)
        >>> result = poi_gdf.find_nearest_buildings("USA", find_nearest_globally=True)
        """

        # ---------------------------------------------------------
        # 1. PARAMETER VALIDATION AND LOGGING
        # ---------------------------------------------------------

        if find_nearest_globally:
            self.logger.info(
                "Global nearest building search enabled. "
                "This may be slower than radius-limited search."
            )
            return self._find_nearest_building_globally(
                country=country, source_filter=source_filter, **kwargs
            )

        # Warn for large search radii
        if search_radius > 5000:
            self.logger.warning(
                f"Large search radius ({search_radius}m) may impact performance. "
                "Consider using progressive expansion for better efficiency."
            )

        self.logger.info(
            f"Mapping Google-Microsoft Buildings data to POIs within {search_radius}m"
        )

        # ---------------------------------------------------------
        # 2. LOAD BUILDING FILES
        # ---------------------------------------------------------

        from gigaspatial.handlers.google_ms_combined_buildings import (
            GoogleMSBuildingsHandler,
        )

        handler = GoogleMSBuildingsHandler(partition_strategy="s2_grid")

        if self.config.ensure_available:
            if not handler.ensure_data_available(country, **kwargs):
                raise RuntimeError("Could not ensure data availability for loading")

        building_files = handler.reader.resolve_source_paths(country, **kwargs)

        result = GoogleMSBuildingsEngine.nearest_buildings_to_pois(
            handler=handler,
            building_files=building_files,
            pois_gdf=self.points_gdf,
            source_filter=source_filter,
            search_radius_m=search_radius,
            logger=self.logger,
        )

        distances_clean = result.distances_m.replace(np.inf, np.nan)
        result_df = pd.DataFrame(
            {
                "poi_id": result.distances_m.index,
                "nearest_building_distance_m": distances_clean,
                f"building_within_{search_radius}m": result.distances_m
                <= search_radius,
            }
        )

        # Update the view and return
        self._update_view(result_df)
        return self.view

    def _find_nearest_building_globally(
        self,
        country: str,
        source_filter: Literal["google", "microsoft"] = None,
        max_global_search: float = 10000,
        **kwargs,
    ) -> pd.DataFrame:
        """Find the true nearest building for each POI using progressive expansion."""

        self.logger.info("Starting global nearest building search...")

        from gigaspatial.handlers.google_ms_combined_buildings import (
            GoogleMSBuildingsHandler,
        )

        handler = GoogleMSBuildingsHandler(partition_strategy="s2_grid")
        building_files = handler.reader.resolve_source_paths(country, **kwargs)

        # Single file: use optimized path
        if len(building_files) == 1:
            self.logger.info("Single file country: using single-file optimization.")

            result = GoogleMSBuildingsEngine.nearest_buildings_to_pois(
                handler=handler,
                building_files=building_files,
                pois_gdf=self.points_gdf,
                source_filter=source_filter,
                search_radius_m=max_global_search,
                logger=self.logger,
            )

            result_df = pd.DataFrame(
                {
                    "poi_id": result.distances_m.index,
                    "nearest_building_distance_m": result.distances_m.replace(
                        np.inf, np.nan
                    ),
                    f"building_within_{max_global_search}m": result.distances_m
                    <= max_global_search,
                }
            )

            found_count = result_df["nearest_building_distance_m"].notna().sum()
            self.logger.info(
                f"Single file search complete. Found {found_count}/{len(result_df)} POIs."
            )

            self._update_view(result_df)
            return self.view

        # Partitioned case: progressive radius expansion
        self.logger.info(
            f"Partitioned country: progressive expansion up to {max_global_search}m"
        )

        radii = [350, 1000, 2500, 5000, max_global_search]

        # FIX: Track POIs that still need searching (use set for efficiency)
        remaining_poi_ids = set(self.points_gdf.poi_id)
        global_results = pd.Series(np.inf, index=self.points_gdf.poi_id, dtype=float)

        # FIX: Track processed tiles to avoid re-scanning
        processed_tiles = set()

        for radius in radii:
            if not remaining_poi_ids:
                self.logger.info("All POIs found buildings. Stopping early.")
                break

            self.logger.info(
                f"Searching radius {radius}m for {len(remaining_poi_ids)} POIs"
            )

            # Create jobs for this radius
            jobs = GoogleMSBuildingsEngine.create_partitioned_jobs_for_pois(
                self.points_gdf,
                building_files,
                search_radius_m=radius,
            )

            # FIX: Only process tiles we haven't seen yet
            new_jobs = [(f, pois) for f, pois in jobs if f not in processed_tiles]

            if not new_jobs:
                self.logger.debug(f"No new tiles at radius {radius}m")
                continue

            # Process new tiles (don't update the view inside the engine)
            temp_result = GoogleMSBuildingsEngine.nearest_buildings_to_pois(
                handler=handler,
                building_files=[f for f, _ in new_jobs],
                pois_gdf=self.points_gdf,
                source_filter=source_filter,
                search_radius_m=radius,
                logger=self.logger,
            )
            temp_min_dists = temp_result.distances_m

            # Update global results and track found POIs
            found_in_this_iteration = set()

            for poi_id in remaining_poi_ids:
                dist = temp_min_dists[poi_id]
                if dist < global_results[poi_id]:
                    global_results[poi_id] = dist
                    if dist <= radius:  # Found within this radius
                        found_in_this_iteration.add(poi_id)

            # FIX: Remove found POIs from remaining set
            remaining_poi_ids -= found_in_this_iteration

            # Mark these tiles as processed
            processed_tiles.update(f for f, _ in new_jobs)

            self.logger.info(
                f"Found {len(found_in_this_iteration)} POIs at radius {radius}m. "
                f"{len(remaining_poi_ids)} remaining."
            )

        # Create final results
        result_df = pd.DataFrame(
            {
                "poi_id": global_results.index,
                "nearest_building_distance_m": global_results.replace(np.inf, np.nan),
                f"building_within_{max_global_search}m": global_results
                <= max_global_search,
            }
        )

        found_count = result_df["nearest_building_distance_m"].notna().sum()
        self.logger.info(
            f"Global search complete. Found buildings for {found_count}/{len(result_df)} POIs."
        )

        self._update_view(result_df)
        return self.view
points_gdf: gpd.GeoDataFrame property

Gets the internal GeoDataFrame of points of interest.

view: pd.DataFrame property

The DataFrame representing the current point of interest view.

__init__(points, poi_id_column='poi_id', config=None, data_store=None, logger=None)

Initializes the PoiViewGenerator with the input points and configurations.

The input points are converted into an internal GeoDataFrame (_points_gdf) for consistent geospatial operations.

Parameters:

Name Type Description Default
points Union[List[Tuple[float, float]], List[dict], DataFrame, GeoDataFrame]

The input points of interest. Can be: - A list of (latitude, longitude) tuples. - A list of dictionaries, where each dict must contain 'latitude' and 'longitude' keys. - A pandas DataFrame with 'latitude' and 'longitude' columns. - A geopandas GeoDataFrame (expected to have a 'geometry' column representing points).

required
generator_config Optional[PoiViewGeneratorConfig]

Configuration for the POI view generation process. If None, a default PoiViewGeneratorConfig will be used.

required
data_store Optional[DataStore]

An instance of a data store for managing data access (e.g., LocalDataStore). If None, a default LocalDataStore will be used.

None
Source code in gigaspatial/generators/poi.py
def __init__(
    self,
    points: Union[
        List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame
    ],
    poi_id_column: str = "poi_id",
    config: Optional[PoiViewGeneratorConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: logging.Logger = None,
):
    """
    Initializes the PoiViewGenerator with the input points and configurations.

    The input `points` are converted into an internal GeoDataFrame
    (`_points_gdf`) for consistent geospatial operations.

    Args:
        points (Union[List[Tuple[float, float]], List[dict], pd.DataFrame, gpd.GeoDataFrame]):
            The input points of interest. Can be:
            - A list of (latitude, longitude) tuples.
            - A list of dictionaries, where each dict must contain 'latitude' and 'longitude' keys.
            - A pandas DataFrame with 'latitude' and 'longitude' columns.
            - A geopandas GeoDataFrame (expected to have a 'geometry' column representing points).
        generator_config (Optional[PoiViewGeneratorConfig]):
            Configuration for the POI view generation process. If None, a
            default `PoiViewGeneratorConfig` will be used.
        data_store (Optional[DataStore]):
            An instance of a data store for managing data access (e.g., LocalDataStore).
            If None, a default `LocalDataStore` will be used.
    """
    if hasattr(points, "__len__") and len(points) == 0:
        raise ValueError("Points input cannot be empty")

    self.config = config or PoiViewGeneratorConfig()
    self.data_store = data_store or LocalDataStore()
    self.logger = logger or global_config.get_logger(self.__class__.__name__)
    self._points_gdf = self._init_points_gdf(points, poi_id_column)
    self._view: pd.DataFrame = self._points_gdf.drop(columns=["geometry"])
chain_operations(operations)

Chain multiple mapping operations for fluent interface.

Parameters:

Name Type Description Default
operations List[dict]

List of dicts with 'method' and 'kwargs' keys

required
Example

generator.chain_operations([ {'method': 'map_google_buildings', 'kwargs': {}}, {'method': 'map_built_s', 'kwargs': {'map_radius_meters': 200}}, ])

Source code in gigaspatial/generators/poi.py
def chain_operations(self, operations: List[dict]) -> "PoiViewGenerator":
    """
    Chain multiple mapping operations for fluent interface.

    Args:
        operations: List of dicts with 'method' and 'kwargs' keys

    Example:
        generator.chain_operations([
            {'method': 'map_google_buildings', 'kwargs': {}},
            {'method': 'map_built_s', 'kwargs': {'map_radius_meters': 200}},
        ])
    """
    for op in operations:
        method_name = op["method"]
        kwargs = op.get("kwargs", {})
        if hasattr(self, method_name):
            getattr(self, method_name)(**kwargs)
        else:
            raise AttributeError(f"Method {method_name} not found")
    return self
find_nearest_buildings(country, search_radius=1000, source_filter=None, find_nearest_globally=False, **kwargs)

Find the nearest building to each POI within a specified search radius.

This method processes building data by: 1. Filtering to only building tiles that intersect POI buffers (partitioned datasets) 2. Finding the nearest building candidate per POI (nearest-neighbor search) 3. Computing final POI-to-building distances in meters using haversine distance

Parameters

country : str Country code for which to load building data.

float, default=1000

Search radius in meters. Only buildings within this distance from a POI will be considered. For better performance, use the smallest radius that meets your requirements.

{'google', 'microsoft'}, optional

Filter buildings by data source. If None, uses buildings from all sources.

bool, default=False

If True, finds the true nearest building regardless of distance. This overrides search_radius and may be significantly slower. When False, uses the efficient radius-limited search.

**kwargs : dict Additional arguments passed to the building data handler.

Returns

pd.DataFrame DataFrame with columns: - poi_id: Original POI identifier - nearest_building_distance_m: Distance to nearest building in meters. NaN if no building found within the search constraints. - building_within_{search_radius}m: Boolean indicating if a building was found within the specified search_radius.

Notes
  • Distances are computed in meters using haversine (great-circle) distance via calculate_distance.
  • Nearest-neighbor candidate selection is performed using coordinates extracted from geometries.
  • For countries with a single building file (no partitioning), the search is performed globally regardless of search_radius.
  • For partitioned countries, search_radius optimizes performance by filtering which tiles to process.
Examples
Find buildings within 350m of POIs

result = poi_gdf.find_nearest_buildings("USA", search_radius=350)

Find nearest building globally (may be slow for partitioned countries)

result = poi_gdf.find_nearest_buildings("USA", find_nearest_globally=True)

Source code in gigaspatial/generators/poi.py
def find_nearest_buildings(
    self,
    country: str,
    search_radius: float = 1000,
    source_filter: Literal["google", "microsoft"] = None,
    find_nearest_globally: bool = False,
    **kwargs,
) -> pd.DataFrame:
    """
    Find the nearest building to each POI within a specified search radius.

    This method processes building data by:
    1. Filtering to only building tiles that intersect POI buffers (partitioned datasets)
    2. Finding the nearest building candidate per POI (nearest-neighbor search)
    3. Computing final POI-to-building distances in **meters** using haversine distance

    Parameters
    ----------
    country : str
        Country code for which to load building data.

    search_radius : float, default=1000
        Search radius in meters. Only buildings within this distance from a POI
        will be considered. For better performance, use the smallest radius
        that meets your requirements.

    source_filter : {'google', 'microsoft'}, optional
        Filter buildings by data source. If None, uses buildings from all sources.

    find_nearest_globally : bool, default=False
        If True, finds the true nearest building regardless of distance.
        This overrides search_radius and may be significantly slower.
        When False, uses the efficient radius-limited search.

    **kwargs : dict
        Additional arguments passed to the building data handler.

    Returns
    -------
    pd.DataFrame
        DataFrame with columns:
        - poi_id: Original POI identifier
        - nearest_building_distance_m: Distance to nearest building in meters.
        NaN if no building found within the search constraints.
        - building_within_{search_radius}m: Boolean indicating if a building
        was found within the specified search_radius.

    Notes
    -----
    - Distances are computed in meters using haversine (great-circle) distance via
    `calculate_distance`.
    - Nearest-neighbor candidate selection is performed using coordinates extracted
    from geometries.
    - For countries with a single building file (no partitioning),
    the search is performed globally regardless of search_radius.
    - For partitioned countries, search_radius optimizes performance by
    filtering which tiles to process.

    Examples
    --------
    >>> # Find buildings within 350m of POIs
    >>> result = poi_gdf.find_nearest_buildings("USA", search_radius=350)

    >>> # Find nearest building globally (may be slow for partitioned countries)
    >>> result = poi_gdf.find_nearest_buildings("USA", find_nearest_globally=True)
    """

    # ---------------------------------------------------------
    # 1. PARAMETER VALIDATION AND LOGGING
    # ---------------------------------------------------------

    if find_nearest_globally:
        self.logger.info(
            "Global nearest building search enabled. "
            "This may be slower than radius-limited search."
        )
        return self._find_nearest_building_globally(
            country=country, source_filter=source_filter, **kwargs
        )

    # Warn for large search radii
    if search_radius > 5000:
        self.logger.warning(
            f"Large search radius ({search_radius}m) may impact performance. "
            "Consider using progressive expansion for better efficiency."
        )

    self.logger.info(
        f"Mapping Google-Microsoft Buildings data to POIs within {search_radius}m"
    )

    # ---------------------------------------------------------
    # 2. LOAD BUILDING FILES
    # ---------------------------------------------------------

    from gigaspatial.handlers.google_ms_combined_buildings import (
        GoogleMSBuildingsHandler,
    )

    handler = GoogleMSBuildingsHandler(partition_strategy="s2_grid")

    if self.config.ensure_available:
        if not handler.ensure_data_available(country, **kwargs):
            raise RuntimeError("Could not ensure data availability for loading")

    building_files = handler.reader.resolve_source_paths(country, **kwargs)

    result = GoogleMSBuildingsEngine.nearest_buildings_to_pois(
        handler=handler,
        building_files=building_files,
        pois_gdf=self.points_gdf,
        source_filter=source_filter,
        search_radius_m=search_radius,
        logger=self.logger,
    )

    distances_clean = result.distances_m.replace(np.inf, np.nan)
    result_df = pd.DataFrame(
        {
            "poi_id": result.distances_m.index,
            "nearest_building_distance_m": distances_clean,
            f"building_within_{search_radius}m": result.distances_m
            <= search_radius,
        }
    )

    # Update the view and return
    self._update_view(result_df)
    return self.view
map_built_s(map_radius_meters=150, stat='sum', dataset_year=2020, dataset_resolution=100, output_column='built_surface_m2', **kwargs)

Maps GHSL Built Surface (GHS_BUILT_S) data to the POIs.

Calculates the sum of built surface area within a specified buffer radius around each POI. Enriches points_gdf with the 'built_surface_m2' column.

Parameters:

Name Type Description Default
data_config Optional[GHSLDataConfig]

Configuration for accessing GHSL Built Surface data. If None, a default GHSLDataConfig for 'GHS_BUILT_S' will be used.

required
map_radius_meters float

The buffer distance in meters around each POI to calculate zonal statistics for built surface. Defaults to 150 meters.

150
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with a new column: 'built_surface_m2'. Returns a copy of the current points_gdf if no GHSL Built Surface data is found.

Source code in gigaspatial/generators/poi.py
def map_built_s(
    self,
    map_radius_meters: float = 150,
    stat: str = "sum",
    dataset_year=2020,
    dataset_resolution=100,
    output_column="built_surface_m2",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps GHSL Built Surface (GHS_BUILT_S) data to the POIs.

    Calculates the sum of built surface area within a specified buffer
    radius around each POI. Enriches `points_gdf` with the 'built_surface_m2' column.

    Args:
        data_config (Optional[GHSLDataConfig]):
            Configuration for accessing GHSL Built Surface data. If None, a
            default `GHSLDataConfig` for 'GHS_BUILT_S' will be used.
        map_radius_meters (float):
            The buffer distance in meters around each POI to calculate
            zonal statistics for built surface. Defaults to 150 meters.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with a new column:
                      'built_surface_m2'. Returns a copy of the current
                      `points_gdf` if no GHSL Built Surface data is found.
    """
    self.logger.info("Mapping GHSL Built Surface data to POIs")
    handler = GHSLDataHandler(
        product="GHS_BUILT_S",
        year=dataset_year,
        resolution=dataset_resolution,
        data_store=self.data_store,
        **kwargs,
    )
    self.logger.info("Loading GHSL Built Surface raster tiles")
    tif_processors = handler.load_data(
        self.points_gdf.copy(),
        ensure_available=self.config.ensure_available,
        merge_rasters=True,
        **kwargs,
    )

    mapped_data = self.map_zonal_stats(
        data=tif_processors,
        stat=stat,
        map_radius_meters=map_radius_meters,
        output_column=output_column,
        **kwargs,
    )
    self._update_view(mapped_data)
    return self.view
map_google_buildings(handler=None, **kwargs)

Maps Google Open Buildings data to the POIs by finding the nearest building.

Enriches the points_gdf with the ID and distance to the nearest Google Open Building for each POI.

Parameters:

Name Type Description Default
data_config Optional[GoogleOpenBuildingsConfig]

Configuration for accessing Google Open Buildings data. If None, a default GoogleOpenBuildingsConfig will be used.

required
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with new columns: 'nearest_google_building_id' and 'nearest_google_building_distance'. Returns a copy of the current points_gdf if no buildings are found.

Source code in gigaspatial/generators/poi.py
def map_google_buildings(
    self,
    handler: Optional[GoogleOpenBuildingsHandler] = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Maps Google Open Buildings data to the POIs by finding the nearest building.

    Enriches the `points_gdf` with the ID and distance to the nearest
    Google Open Building for each POI.

    Args:
        data_config (Optional[GoogleOpenBuildingsConfig]):
            Configuration for accessing Google Open Buildings data. If None, a
            default `GoogleOpenBuildingsConfig` will be used.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with new columns:
                      'nearest_google_building_id' and 'nearest_google_building_distance'.
                      Returns a copy of the current `points_gdf` if no buildings are found.
    """
    self.logger.info("Mapping Google Open Buildings data to POIs")
    handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)

    self.logger.info("Loading Google Buildings point data")
    buildings_df = handler.load_points(
        self.points_gdf, ensure_available=self.config.ensure_available, **kwargs
    )
    if buildings_df is None or len(buildings_df) == 0:
        self.logger.info("No Google buildings data found for the provided POIs")
        return self.view

    mapped_data = self.map_nearest_points(
        points_df=buildings_df,
        id_column="full_plus_code",
        output_prefix="nearest_google_building",
        **kwargs,
    )
    self._update_view(mapped_data)
    return self.view
map_ms_buildings(handler=None, **kwargs)

Maps Microsoft Global Buildings data to the POIs by finding the nearest building.

Enriches the points_gdf with the ID and distance to the nearest Microsoft Global Building for each POI. If buildings don't have an ID column, creates a unique ID using the building's coordinates.

Parameters:

Name Type Description Default
data_config Optional[MSBuildingsConfig]

Configuration for accessing Microsoft Global Buildings data. If None, a default MSBuildingsConfig will be used.

required
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with new columns: 'nearest_ms_building_id' and 'nearest_ms_building_distance'. Returns a copy of the current points_gdf if no buildings are found.

Source code in gigaspatial/generators/poi.py
def map_ms_buildings(
    self,
    handler: Optional[MSBuildingsHandler] = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Maps Microsoft Global Buildings data to the POIs by finding the nearest building.

    Enriches the `points_gdf` with the ID and distance to the nearest
    Microsoft Global Building for each POI. If buildings don't have an ID column,
    creates a unique ID using the building's coordinates.

    Args:
        data_config (Optional[MSBuildingsConfig]):
            Configuration for accessing Microsoft Global Buildings data. If None, a
            default `MSBuildingsConfig` will be used.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with new columns:
                      'nearest_ms_building_id' and 'nearest_ms_building_distance'.
                      Returns a copy of the current `points_gdf` if no buildings are found.
    """
    self.logger.info("Mapping Microsoft Global Buildings data to POIs")
    handler = handler or MSBuildingsHandler(data_store=self.data_store)
    self.logger.info("Loading Microsoft Buildings polygon data")
    buildings_gdf = handler.load_data(
        self.points_gdf, ensure_available=self.config.ensure_available
    )
    if buildings_gdf is None or len(buildings_gdf) == 0:
        self.logger.info("No Microsoft buildings data found for the provided POIs")
        return self.points_gdf.copy()

    building_centroids = get_centroids(buildings_gdf)

    if "building_id" not in buildings_gdf:
        self.logger.info("Creating building IDs from coordinates")
        building_centroids["building_id"] = building_centroids.apply(
            lambda row: f"{row.geometry.y:.6f}_{row.geometry.x:.6f}",
            axis=1,
        )

    mapped_data = self.map_nearest_points(
        points_df=building_centroids,
        id_column="building_id",
        output_prefix="nearest_ms_building",
        **kwargs,
    )
    self._update_view(mapped_data)
    return self.view
map_nearest_points(points_df, id_column=None, lat_column=None, lon_column=None, output_prefix='nearest', **kwargs)

Maps nearest points from a given DataFrame to the POIs.

Enriches the points_gdf with the ID and distance to the nearest point from the input DataFrame for each POI.

Parameters:

Name Type Description Default
points_df Union[DataFrame, GeoDataFrame]

DataFrame containing points to find nearest neighbors from. Must have latitude and longitude columns or point geometries.

required
id_column str

Name of the column containing unique identifiers for each point. If None, the index of points_df will be used instead.

None
lat_column str

Name of the latitude column in points_df. If None, will attempt to detect it or extract from geometry if points_df is a GeoDataFrame.

None
lon_column str

Name of the longitude column in points_df. If None, will attempt to detect it or extract from geometry if points_df is a GeoDataFrame.

None
output_prefix str

Prefix for the output column names. Defaults to "nearest".

'nearest'
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with new columns: '{output_prefix}_id' and '{output_prefix}_distance'. Returns a copy of the current points_gdf if no points are found.

Raises:

Type Description
ValueError

If required columns are missing from points_df or if coordinate columns cannot be detected or extracted from geometry.

Source code in gigaspatial/generators/poi.py
def map_nearest_points(
    self,
    points_df: Union[pd.DataFrame, gpd.GeoDataFrame],
    id_column: Optional[str] = None,
    lat_column: Optional[str] = None,
    lon_column: Optional[str] = None,
    output_prefix: str = "nearest",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps nearest points from a given DataFrame to the POIs.

    Enriches the `points_gdf` with the ID and distance to the nearest point
    from the input DataFrame for each POI.

    Args:
        points_df (Union[pd.DataFrame, gpd.GeoDataFrame]):
            DataFrame containing points to find nearest neighbors from.
            Must have latitude and longitude columns or point geometries.
        id_column (str, optional):
            Name of the column containing unique identifiers for each point.
            If None, the index of points_df will be used instead.
        lat_column (str, optional):
            Name of the latitude column in points_df. If None, will attempt to detect it
            or extract from geometry if points_df is a GeoDataFrame.
        lon_column (str, optional):
            Name of the longitude column in points_df. If None, will attempt to detect it
            or extract from geometry if points_df is a GeoDataFrame.
        output_prefix (str, optional):
            Prefix for the output column names. Defaults to "nearest".
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with new columns:
                      '{output_prefix}_id' and '{output_prefix}_distance'.
                      Returns a copy of the current `points_gdf` if no points are found.

    Raises:
        ValueError: If required columns are missing from points_df or if coordinate
                   columns cannot be detected or extracted from geometry.
    """
    self.logger.info(
        f"Mapping nearest points from {points_df.__class__.__name__} to POIs"
    )

    # Validate input DataFrame
    if points_df.empty:
        self.logger.info("No points found in the input DataFrame")
        return self.view

    # Handle GeoDataFrame
    if isinstance(points_df, gpd.GeoDataFrame):
        points_df = points_df.copy()
        if points_df.geometry.name == "geometry":
            points_df["latitude"] = points_df.geometry.y
            points_df["longitude"] = points_df.geometry.x
            lat_column = "latitude"
            lon_column = "longitude"
            self.logger.info("Extracted coordinates from geometry")

    # Detect coordinate columns if not provided
    if lat_column is None or lon_column is None:
        try:
            detected_lat, detected_lon = detect_coordinate_columns(points_df)
            lat_column = lat_column or detected_lat
            lon_column = lon_column or detected_lon
            self.logger.info(
                f"Detected coordinate columns: {lat_column}, {lon_column}"
            )
        except ValueError as e:
            raise ValueError(f"Could not detect coordinate columns: {str(e)}")

    # Validate required columns
    required_columns = [lat_column, lon_column]
    if id_column is not None:
        required_columns.append(id_column)
    missing_columns = [
        col for col in required_columns if col not in points_df.columns
    ]
    if missing_columns:
        raise ValueError(
            f"Missing required columns in points_df: {missing_columns}"
        )

    from gigaspatial.processing.geo import calculate_distance

    self.logger.info("Calculating nearest points for each POI")
    tree = cKDTree(points_df[[lat_column, lon_column]])
    points_df_poi = self.points_gdf.copy()
    _, idx = tree.query(points_df_poi[["latitude", "longitude"]], k=1)
    df_nearest = points_df.iloc[idx]
    dist = calculate_distance(
        lat1=points_df_poi.latitude,
        lon1=points_df_poi.longitude,
        lat2=df_nearest[lat_column],
        lon2=df_nearest[lon_column],
    )
    # Create a temporary DataFrame to hold the results for merging
    # Use id_column if provided, otherwise use index
    if id_column is not None:
        nearest_ids = points_df.iloc[idx][id_column].values
    else:
        nearest_ids = points_df.index[idx].values

    temp_result_df = pd.DataFrame(
        {
            "poi_id": points_df_poi["poi_id"],
            f"{output_prefix}_id": nearest_ids,
            f"{output_prefix}_distance": dist,
        }
    )

    self.logger.info(
        f"Nearest points mapping complete with prefix '{output_prefix}'"
    )
    return temp_result_df  # Return the DataFrame
map_smod(stat='median', dataset_year=2020, dataset_resolution=1000, output_column='smod_class', **kwargs)

Maps GHSL Settlement Model (SMOD) data to the POIs.

Samples the SMOD class value at each POI's location. Enriches points_gdf with the 'smod_class' column.

Parameters:

Name Type Description Default
data_config Optional[GHSLDataConfig]

Configuration for accessing GHSL SMOD data. If None, a default GHSLDataConfig for 'GHS_SMOD' will be used.

required
**kwargs

Additional keyword arguments passed to the data reader (if applicable).

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with a new column: 'smod_class'. Returns a copy of the current points_gdf if no GHSL SMOD data is found.

Source code in gigaspatial/generators/poi.py
def map_smod(
    self,
    stat="median",
    dataset_year=2020,
    dataset_resolution=1000,
    output_column="smod_class",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps GHSL Settlement Model (SMOD) data to the POIs.

    Samples the SMOD class value at each POI's location. Enriches `points_gdf`
    with the 'smod_class' column.

    Args:
        data_config (Optional[GHSLDataConfig]):
            Configuration for accessing GHSL SMOD data. If None, a
            default `GHSLDataConfig` for 'GHS_SMOD' will be used.
        **kwargs:
            Additional keyword arguments passed to the data reader (if applicable).

    Returns:
        pd.DataFrame: The updated GeoDataFrame with a new column:
                      'smod_class'. Returns a copy of the current
                      `points_gdf` if no GHSL SMOD data is found.
    """
    self.logger.info("Mapping GHSL Settlement Model (SMOD) data to POIs")
    handler = GHSLDataHandler(
        product="GHS_SMOD",
        year=dataset_year,
        resolution=dataset_resolution,
        data_store=self.data_store,
        coord_system=54009,
        **kwargs,
    )

    self.logger.info("Loading GHSL SMOD raster tiles")
    tif_processors = handler.load_data(
        self.points_gdf.copy(),
        ensure_available=self.config.ensure_available,
        merge_rasters=True,
        **kwargs,
    )

    mapped_data = self.map_zonal_stats(
        data=tif_processors,
        stat=stat,
        output_column=output_column,
        **kwargs,
    )
    self._update_view(mapped_data)
    return self.view
map_zonal_stats(data, stat='mean', map_radius_meters=None, output_column='zonal_stat', value_column=None, predicate='intersects', **kwargs)

Maps zonal statistics from raster or polygon data to POIs.

Can operate in three modes: 1. Raster point sampling: Directly samples raster values at POI locations 2. Raster zonal statistics: Creates buffers around POIs and calculates statistics within them 3. Polygon aggregation: Aggregates polygon data to POI buffers with optional area weighting

Parameters:

Name Type Description Default
data Union[TifProcessor, List[TifProcessor], GeoDataFrame]

Either a TifProcessor object, a list of TifProcessor objects (which will be merged into a single TifProcessor for processing), or a GeoDataFrame containing polygon data to aggregate.

required
stat str

For raster data: Statistic to calculate ("sum", "mean", "median", "min", "max"). For polygon data: Aggregation method to use. Defaults to "mean".

'mean'
map_radius_meters float

If provided, creates circular buffers of this radius around each POI and calculates statistics within the buffers. If None, samples directly at POI locations (only for raster data).

None
output_column str

Name of the output column to store the results. Defaults to "zonal_stat".

'zonal_stat'
value_column str

For polygon data: Name of the column to aggregate. Required for polygon data. Not used for raster data.

None
predicate Literal['intersects', 'within', 'fractional']

The spatial relationship to use for aggregation. Defaults to "intersects".

'intersects'
**kwargs

Additional keyword arguments passed to the sampling/aggregation functions.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: The updated GeoDataFrame with a new column containing the calculated statistics. Returns a copy of the current points_gdf if no valid data is found.

Raises:

Type Description
ValueError

If no valid data is provided, if parameters are incompatible, or if required parameters (value_column) are missing for polygon data.

Source code in gigaspatial/generators/poi.py
def map_zonal_stats(
    self,
    data: Union[List[TifProcessor], gpd.GeoDataFrame],
    stat: str = "mean",
    map_radius_meters: Optional[float] = None,
    output_column: str = "zonal_stat",
    value_column: Optional[str] = None,
    predicate: Literal["intersects", "within", "fractional"] = "intersects",
    **kwargs,
) -> pd.DataFrame:
    """
    Maps zonal statistics from raster or polygon data to POIs.

    Can operate in three modes:
    1. Raster point sampling: Directly samples raster values at POI locations
    2. Raster zonal statistics: Creates buffers around POIs and calculates statistics within them
    3. Polygon aggregation: Aggregates polygon data to POI buffers with optional area weighting

    Args:
        data (Union[TifProcessor, List[TifProcessor], gpd.GeoDataFrame]):
            Either a TifProcessor object, a list of TifProcessor objects (which will be merged
            into a single TifProcessor for processing), or a GeoDataFrame containing polygon
            data to aggregate.
        stat (str, optional):
            For raster data: Statistic to calculate ("sum", "mean", "median", "min", "max").
            For polygon data: Aggregation method to use.
            Defaults to "mean".
        map_radius_meters (float, optional):
            If provided, creates circular buffers of this radius around each POI
            and calculates statistics within the buffers. If None, samples directly
            at POI locations (only for raster data).
        output_column (str, optional):
            Name of the output column to store the results. Defaults to "zonal_stat".
        value_column (str, optional):
            For polygon data: Name of the column to aggregate. Required for polygon data.
            Not used for raster data.
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation. Defaults to "intersects".
        **kwargs:
            Additional keyword arguments passed to the sampling/aggregation functions.

    Returns:
        pd.DataFrame: The updated GeoDataFrame with a new column containing the
                      calculated statistics. Returns a copy of the current `points_gdf`
                      if no valid data is found.

    Raises:
        ValueError: If no valid data is provided, if parameters are incompatible,
                  or if required parameters (value_column) are missing for polygon data.
    """

    raster_processor: Optional[TifProcessor] = None

    if isinstance(data, TifProcessor):
        raster_processor = data
    elif isinstance(data, list) and all(isinstance(x, TifProcessor) for x in data):
        if not data:
            self.logger.info("No valid raster data provided")
            return self.view

        if len(data) > 1:
            all_source_paths = [tp.dataset_path for tp in data]

            self.logger.info(
                f"Merging {len(all_source_paths)} rasters into a single TifProcessor for zonal statistics."
            )
            raster_processor = TifProcessor(
                dataset_path=all_source_paths,
                data_store=self.data_store,
                **kwargs,
            )
        else:
            raster_processor = data[0]

    if raster_processor:
        results_df = pd.DataFrame({"poi_id": self.points_gdf["poi_id"]})
        raster_crs = raster_processor.crs

        if map_radius_meters is not None:
            self.logger.info(
                f"Calculating {stat} within {map_radius_meters}m buffers around POIs"
            )
            # Create buffers around POIs
            buffers_gdf = buffer_geodataframe(
                self.points_gdf,
                buffer_distance_meters=map_radius_meters,
                cap_style="round",
            )

            # Calculate zonal statistics
            sampled_values = raster_processor.sample_by_polygons(
                polygon_list=buffers_gdf.to_crs(raster_crs).geometry,
                stat=stat,
            )
        else:
            self.logger.info(f"Sampling {stat} at POI locations")
            # Sample directly at POI locations
            coord_list = (
                self.points_gdf.to_crs(raster_crs).get_coordinates().to_numpy()
            )
            sampled_values = raster_processor.sample_by_coordinates(
                coordinate_list=coord_list, **kwargs
            )

        results_df[output_column] = sampled_values

    elif isinstance(data, gpd.GeoDataFrame):
        # Handle polygon data
        if data.empty:
            self.logger.info("No valid GeoDataFrame data provided")
            return pd.DataFrame(
                columns=["poi_id", output_column]
            )  # Return empty DataFrame

        if map_radius_meters is None:
            raise ValueError(
                "map_radius_meters must be provided for for GeoDataFrame data"
            )

        # Create buffers around POIs
        buffer_gdf = buffer_geodataframe(
            self.points_gdf,
            buffer_distance_meters=map_radius_meters,
            cap_style="round",
        )

        if any(data.geom_type.isin(["MultiPoint", "Point"])):

            self.logger.info(
                f"Aggregating point data within {map_radius_meters}m buffers around POIs using predicate '{predicate}'"
            )

            # If no value_column, default to 'count'
            if value_column is None:
                actual_stat = "count"
                self.logger.warning(
                    "No value_column provided for point data. Defaulting to 'count' aggregation."
                )
            else:
                actual_stat = stat
                if value_column not in data.columns:
                    raise ValueError(
                        f"Value column '{value_column}' not found in input GeoDataFrame."
                    )

            aggregation_result_gdf = aggregate_points_to_zones(
                points=data,
                zones=buffer_gdf,
                value_columns=value_column,
                aggregation=actual_stat,
                point_zone_predicate=predicate,  # can't be `fractional``
                zone_id_column="poi_id",
                output_suffix="",
                drop_geometry=True,
                **kwargs,
            )

            output_col_from_agg = (
                f"{value_column}_{actual_stat}" if value_column else "point_count"
            )
            results_df = aggregation_result_gdf[["poi_id", output_col_from_agg]]

            if output_column != "zonal_stat":
                results_df = results_df.rename(
                    columns={output_col_from_agg: output_column}
                )

        else:
            if value_column is None:
                raise ValueError(
                    "value_column must be provided for polygon data aggregation."
                )
            if value_column not in data.columns:
                raise ValueError(
                    f"Value column '{value_column}' not found in input GeoDataFrame."
                )
            self.logger.info(
                f"Aggregating polygon data within {map_radius_meters}m buffers around POIs using predicate '{predicate}'"
            )

            # Aggregate polygons to buffers
            aggregation_result_gdf = aggregate_polygons_to_zones(
                polygons=data,
                zones=buffer_gdf,
                value_columns=value_column,
                aggregation=stat,
                predicate=predicate,
                zone_id_column="poi_id",
                output_suffix="",
                drop_geometry=True,
                **kwargs,
            )

            output_col_from_agg = value_column

        results_df = aggregation_result_gdf[["poi_id", output_col_from_agg]]

        if output_column != "zonal_stat":
            results_df = results_df.rename(
                columns={output_col_from_agg: output_column}
            )

    else:
        raise ValueError(
            "data must be either a list of TifProcessor objects or a GeoDataFrame"
        )

    # self._update_view(results_df) # Removed direct view update
    self.logger.info(
        f"Zonal statistics mapping complete for column(s) derived from '{output_column}' or '{value_column}'"
    )
    return results_df  # Return the DataFrame
save_view(name, output_format=None)

Saves the current POI view (the enriched DataFrame) to a file.

The output path and format are determined by the config or overridden by the output_format parameter.

Parameters:

Name Type Description Default
name str

The base name for the output file (without extension).

required
output_format Optional[str]

The desired output format (e.g., "csv", "geojson"). If None, the output_format from config will be used.

None

Returns:

Name Type Description
Path Path

The full path to the saved output file.

Source code in gigaspatial/generators/poi.py
def save_view(
    self,
    name: str,
    output_format: Optional[str] = None,
) -> Path:
    """
    Saves the current POI view (the enriched DataFrame) to a file.

    The output path and format are determined by the `config`
    or overridden by the `output_format` parameter.

    Args:
        name (str): The base name for the output file (without extension).
        output_format (Optional[str]):
            The desired output format (e.g., "csv", "geojson"). If None,
            the `output_format` from `config` will be used.

    Returns:
        Path: The full path to the saved output file.
    """
    format_to_use = output_format or self.config.output_format
    output_path = self.config.base_path / f"{name}.{format_to_use}"

    self.logger.info(f"Saving POI view to {output_path}")
    # Save the current view, which is a pandas DataFrame, not a GeoDataFrame
    # GeoJSON/Shapefile formats would require converting back to GeoDataFrame first.
    # For CSV, Parquet, Feather, this is fine.
    if format_to_use in ["geojson", "shp", "gpkg"]:
        self.logger.warning(
            f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
        )
        # Re-add geometry for saving to geospatial formats
        view_to_save_gdf = self.view.merge(
            self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
        )
        write_dataset(
            data=view_to_save_gdf,
            path=str(output_path),
            data_store=self.data_store,
        )
    else:
        write_dataset(
            data=self.view,  # Use the internal _view DataFrame
            path=str(output_path),
            data_store=self.data_store,
        )

    return output_path
to_dataframe()

Returns the current POI view as a DataFrame.

This method combines all accumulated variables in the view

Returns:

Type Description
DataFrame

pd.DataFrame: The current view.

Source code in gigaspatial/generators/poi.py
def to_dataframe(self) -> pd.DataFrame:
    """
    Returns the current POI view as a DataFrame.

    This method combines all accumulated variables in the view

    Returns:
        pd.DataFrame: The current view.
    """
    return self.view
to_geodataframe()

Returns the current POI view merged with the original point geometries as a GeoDataFrame.

This method combines all accumulated variables in the view with the corresponding point geometries, providing a spatially-enabled DataFrame for further analysis or export.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The current view merged with point geometries.

Source code in gigaspatial/generators/poi.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Returns the current POI view merged with the original point geometries as a GeoDataFrame.

    This method combines all accumulated variables in the view with the corresponding
    point geometries, providing a spatially-enabled DataFrame for further analysis or export.

    Returns:
        gpd.GeoDataFrame: The current view merged with point geometries.
    """
    return gpd.GeoDataFrame(
        self.view.merge(
            self.points_gdf[["poi_id", "geometry"]], on="poi_id", how="left"
        ),
        crs="EPSG:4326",
    )
validate_data_coverage(data_bounds)

Validate how many POIs fall within the data coverage area.

Returns:

Name Type Description
dict dict

Coverage statistics

Source code in gigaspatial/generators/poi.py
def validate_data_coverage(self, data_bounds: gpd.GeoDataFrame) -> dict:
    """
    Validate how many POIs fall within the data coverage area.

    Returns:
        dict: Coverage statistics
    """
    poi_within = self.points_gdf.within(data_bounds.union_all())
    coverage_stats = {
        "total_pois": len(self.points_gdf),
        "covered_pois": poi_within.sum(),
        "coverage_percentage": (poi_within.sum() / len(self.points_gdf)) * 100,
        "uncovered_pois": (~poi_within).sum(),
    }
    return coverage_stats

PoiViewGeneratorConfig

Configuration for POI (Point of Interest) view generation.

Attributes:

Name Type Description
base_path Path

The base directory where generated POI views will be saved. Defaults to a path retrieved from config.

output_format str

The default format for saving output files (e.g., "csv", "geojson"). Defaults to "csv".

Source code in gigaspatial/generators/poi.py
@dataclass
class PoiViewGeneratorConfig:
    """
    Configuration for POI (Point of Interest) view generation.

    Attributes:
        base_path (Path): The base directory where generated POI views will be saved.
                          Defaults to a path retrieved from `config`.
        output_format (str): The default format for saving output files (e.g., "csv", "geojson").
                             Defaults to "csv".
    """

    base_path: Path = Field(default=global_config.get_path("poi", "views"))
    output_format: str = "csv"
    ensure_available: bool = True

zonal

admin

AdminBoundariesViewGenerator

Bases: GeometryBasedZonalViewGenerator[T]

Generates zonal views using administrative boundaries as the zones.

This class specializes in creating zonal views where the zones are defined by administrative boundaries (e.g., countries, states, districts) at a specified administrative level. It extends the GeometryBasedZonalViewGenerator and leverages the AdminBoundaries handler to load the necessary geographical data.

The administrative boundaries serve as the base geometries to which other geospatial data (points, polygons, rasters) can be mapped and aggregated.

Attributes:

Name Type Description
country str

The name or code of the country for which to load administrative boundaries.

admin_level int

The administrative level to load (e.g., 0 for country, 1 for states/provinces).

admin_path Union[str, Path]

Optional path to a local GeoJSON/Shapefile containing the administrative boundaries. If provided, this local file will be used instead of downloading.

config Optional[ZonalViewGeneratorConfig]

Configuration for the zonal view generation process.

data_store Optional[DataStore]

A DataStore instance for accessing data.

logger Optional[Logger]

A logger instance for logging messages.

Source code in gigaspatial/generators/zonal/admin.py
class AdminBoundariesViewGenerator(GeometryBasedZonalViewGenerator[T]):
    """
    Generates zonal views using administrative boundaries as the zones.

    This class specializes in creating zonal views where the zones are defined by
    administrative boundaries (e.g., countries, states, districts) at a specified
    administrative level. It extends the `GeometryBasedZonalViewGenerator` and
    leverages the `AdminBoundaries` handler to load the necessary geographical data.

    The administrative boundaries serve as the base geometries to which other
    geospatial data (points, polygons, rasters) can be mapped and aggregated.

    Attributes:
        country (str): The name or code of the country for which to load administrative boundaries.
        admin_level (int): The administrative level to load (e.g., 0 for country, 1 for states/provinces).
        admin_path (Union[str, Path], optional): Optional path to a local GeoJSON/Shapefile
            containing the administrative boundaries. If provided, this local file will be
            used instead of downloading.
        config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generation process.
        data_store (Optional[DataStore]): A DataStore instance for accessing data.
        logger (Optional[logging.Logger]): A logger instance for logging messages.
    """

    def __init__(
        self,
        country: str,
        admin_level: int,
        data_store: Optional[DataStore] = None,
        admin_path: Optional[Union[str, Path]] = None,
        config: Optional[ZonalViewGeneratorConfig] = None,
        logger: logging.Logger = None,
    ):
        """
        Initializes the AdminBoundariesViewGenerator.

        Args:
            country (str): The name or code of the country (e.g., "USA", "Germany").
            admin_level (int): The administrative level to load (e.g., 0 for country, 1 for states, 2 for districts).
            admin_path (Union[str, Path], optional): Path to a local administrative boundaries file (GeoJSON, Shapefile).
                                                     If provided, overrides default data loading.
            config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generator.
                                                         If None, a default config will be used.
            data_store (Optional[DataStore]): Data storage interface. If None, LocalDataStore is used.
            logger (Optional[logging.Logger]): Custom logger instance. If None, a default logger is used.
        """

        super().__init__(
            zone_data=self._init_zone_data(
                country, admin_level, data_store, admin_path
            ),
            zone_id_column="id",
            config=config,
            data_store=data_store,
            logger=logger,
        )
        self._country = country
        self.logger.info(
            f"Initialized AdminBoundariesViewGenerator for {country} (level {admin_level})"
        )

    def _init_zone_data(
        self,
        country,
        admin_level,
        data_store: Optional[DataStore] = None,
        admin_path: Optional[Union[str, Path]] = None,
    ):
        gdf_boundaries = AdminBoundaries.create(
            country, admin_level, data_store, admin_path
        ).to_geodataframe()
        return gdf_boundaries

    def map_wp_pop(
        self,
        country=None,
        resolution=1000,
        predicate="intersects",
        output_column="population",
        **kwargs,
    ):
        country = self._country if country is None else country
        return super().map_wp_pop(
            country, resolution, predicate, output_column, **kwargs
        )
__init__(country, admin_level, data_store=None, admin_path=None, config=None, logger=None)

Initializes the AdminBoundariesViewGenerator.

Parameters:

Name Type Description Default
country str

The name or code of the country (e.g., "USA", "Germany").

required
admin_level int

The administrative level to load (e.g., 0 for country, 1 for states, 2 for districts).

required
admin_path Union[str, Path]

Path to a local administrative boundaries file (GeoJSON, Shapefile). If provided, overrides default data loading.

None
config Optional[ZonalViewGeneratorConfig]

Configuration for the zonal view generator. If None, a default config will be used.

None
data_store Optional[DataStore]

Data storage interface. If None, LocalDataStore is used.

None
logger Optional[Logger]

Custom logger instance. If None, a default logger is used.

None
Source code in gigaspatial/generators/zonal/admin.py
def __init__(
    self,
    country: str,
    admin_level: int,
    data_store: Optional[DataStore] = None,
    admin_path: Optional[Union[str, Path]] = None,
    config: Optional[ZonalViewGeneratorConfig] = None,
    logger: logging.Logger = None,
):
    """
    Initializes the AdminBoundariesViewGenerator.

    Args:
        country (str): The name or code of the country (e.g., "USA", "Germany").
        admin_level (int): The administrative level to load (e.g., 0 for country, 1 for states, 2 for districts).
        admin_path (Union[str, Path], optional): Path to a local administrative boundaries file (GeoJSON, Shapefile).
                                                 If provided, overrides default data loading.
        config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generator.
                                                     If None, a default config will be used.
        data_store (Optional[DataStore]): Data storage interface. If None, LocalDataStore is used.
        logger (Optional[logging.Logger]): Custom logger instance. If None, a default logger is used.
    """

    super().__init__(
        zone_data=self._init_zone_data(
            country, admin_level, data_store, admin_path
        ),
        zone_id_column="id",
        config=config,
        data_store=data_store,
        logger=logger,
    )
    self._country = country
    self.logger.info(
        f"Initialized AdminBoundariesViewGenerator for {country} (level {admin_level})"
    )

base

ZonalViewGenerator

Bases: ABC, Generic[T]

Base class for mapping data to zonal datasets.

This class provides the framework for mapping various data sources (points, polygons, rasters) to zonal geometries like grid tiles or catchment areas. It serves as an abstract base class that must be subclassed to implement specific zonal systems.

The class supports three main types of data mapping: - Point data aggregation to zones - Polygon data aggregation with optional area weighting - Raster data sampling and statistics

Attributes:

Name Type Description
data_store DataStore

The data store for accessing input data.

generator_config ZonalViewGeneratorConfig

Configuration for the generator.

logger

Logger instance for this class.

Source code in gigaspatial/generators/zonal/base.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class ZonalViewGenerator(ABC, Generic[T]):
    """Base class for mapping data to zonal datasets.

    This class provides the framework for mapping various data sources (points, polygons, rasters)
    to zonal geometries like grid tiles or catchment areas. It serves as an abstract base class
    that must be subclassed to implement specific zonal systems.

    The class supports three main types of data mapping:
    - Point data aggregation to zones
    - Polygon data aggregation with optional area weighting
    - Raster data sampling and statistics

    Attributes:
        data_store (DataStore): The data store for accessing input data.
        generator_config (ZonalViewGeneratorConfig): Configuration for the generator.
        logger: Logger instance for this class.
    """

    def __init__(
        self,
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):
        """Initialize the ZonalViewGenerator.

        Args:
            generator_config (ZonalViewGeneratorConfig, optional): Configuration for the generator.
                If None, uses default configuration.
            data_store (DataStore, optional): The data store for accessing input data.
                If None, uses LocalDataStore.
        """
        self.config = config or ZonalViewGeneratorConfig()
        self.data_store = data_store or LocalDataStore()
        self.logger = logger or global_config.get_logger(self.__class__.__name__)
        self._view: Optional[pd.DataFrame] = None

    @abstractmethod
    def get_zonal_geometries(self) -> List[Polygon]:
        """Get the geometries of the zones.

        This method must be implemented by subclasses to return the actual geometric
        shapes of the zones (e.g., grid tiles, catchment boundaries, administrative areas).

        Returns:
            List[Polygon]: A list of Shapely Polygon objects representing zone geometries.
        """
        pass

    @abstractmethod
    def get_zone_identifiers(self) -> List[T]:
        """Get unique identifiers for each zone.

        This method must be implemented by subclasses to return identifiers that
        correspond one-to-one with the geometries returned by get_zonal_geometries().

        Returns:
            List[T]: A list of zone identifiers (e.g., quadkeys, H3 indices, tile IDs).
                The type T is determined by the specific zonal system implementation.
        """
        pass

    def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert zones to a GeoDataFrame.

        Creates a GeoDataFrame containing zone identifiers and their corresponding
        geometries in WGS84 (EPSG:4326) coordinate reference system.

        Returns:
            gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns,
                where zone_id contains the identifiers and geometry contains the
                corresponding Polygon objects.
        """
        return gpd.GeoDataFrame(
            {
                "zone_id": self.get_zone_identifiers(),
                "geometry": self.get_zonal_geometries(),
            },
            crs="EPSG:4326",
        )

    @property
    def zone_gdf(self) -> gpd.GeoDataFrame:
        """Cached GeoDataFrame of zones.

        Returns:
            gpd.GeoDataFrame: Lazily-computed and cached GeoDataFrame of zone geometries
                and identifiers.
        """
        if not hasattr(self, "_zone_gdf"):
            self._zone_gdf = self.get_zone_geodataframe()
        return self._zone_gdf

    @property
    def view(self) -> pd.DataFrame:
        """The DataFrame representing the current zonal view.

        Returns:
            pd.DataFrame: The DataFrame containing zone IDs, and
                              any added variables. If no variables have been added,
                              it returns the base `zone_gdf` without geometries.
        """
        if self._view is None:
            self._view = self.zone_gdf.drop(columns="geometry")
        return self._view

    def add_variable_to_view(self, data_dict: Dict, column_name: str) -> None:
        """
        Adds a new variable (column) to the zonal view GeoDataFrame.

        This method takes a dictionary (typically the result of map_points or map_polygons)
        and adds its values as a new column to the internal `_view` (or `zone_gdf` if not yet initialized).
        The dictionary keys are expected to be the `zone_id` values.

        Args:
            data_dict (Dict): A dictionary where keys are `zone_id`s and values are
                              the data to be added.
            column_name (str): The name of the new column to be added to the GeoDataFrame.
        Raises:
            ValueError: If the `data_dict` keys do not match the `zone_id`s in the zonal view.
                        If the `column_name` already exists in the zonal view.
        """
        if self._view is None:
            self._view = self.zone_gdf.drop(columns="geometry")

        if column_name in self._view.columns:
            raise ValueError(
                f"Column '{column_name}' already exists in the zonal view."
            )

        # Create a pandas Series from the dictionary, aligning by index (zone_id)
        new_series = pd.Series(data_dict, name=column_name)

        # Before merging, ensure the zone_ids in data_dict match those in _view
        missing_zones_in_data = set(self._view["zone_id"]) - set(new_series.index)
        extra_zones_in_data = set(new_series.index) - set(self._view["zone_id"])

        if missing_zones_in_data:
            self.logger.warning(
                f"Warning: {len(missing_zones_in_data)} zone(s) from the zonal view "
                f"are missing in the provided data_dict for column '{column_name}'. "
                f"These zones will have NaN values for '{column_name}'. Missing: {list(missing_zones_in_data)[:5]}..."
            )
        if extra_zones_in_data:
            self.logger.warning(
                f"Warning: {len(extra_zones_in_data)} zone(s) in the provided data_dict "
                f"are not present in the zonal view for column '{column_name}'. "
                f"These will be ignored. Extra: {list(extra_zones_in_data)[:5]}..."
            )

        # Merge the new series with the _view based on 'zone_id'
        # Using .set_index() for efficient alignment
        original_index_name = self._view.index.name
        self._view = self._view.set_index("zone_id").join(new_series).reset_index()
        if original_index_name:  # Restore original index name if it existed
            self._view.index.name = original_index_name
        else:  # If it was a default integer index, ensure it's not named 'index'
            self._view.index.name = None

        self.logger.info(f"Added variable '{column_name}' to the zonal view.")

    def map_points(
        self,
        points: Union[pd.DataFrame, gpd.GeoDataFrame],
        value_columns: Optional[Union[str, List[str]]] = None,
        aggregation: Union[str, Dict[str, str]] = "count",
        predicate: str = "within",
        output_suffix: str = "",
    ) -> Dict:
        """Map point data to zones with spatial aggregation.

        Aggregates point data to zones using spatial relationships. Points can be
        counted or have their attribute values aggregated using various statistical methods.

        Args:
            points (Union[pd.DataFrame, gpd.GeoDataFrame]): The point data to map.
                Must contain geometry information if DataFrame.
            value_columns (Union[str, List[str]], optional): Column name(s) containing
                values to aggregate. If None, only point counts are performed.
            aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use.
                Can be a single string ("count", "mean", "sum", "min", "max", etc.)
                or a dictionary mapping column names to aggregation methods.
            predicate (str): Spatial predicate for point-to-zone relationship.
                Options include "within", "intersects", "contains". Defaults to "within".
            output_suffix (str): Suffix to add to output column names. Defaults to empty string.

        Returns:
            Dict: Dictionary with zone IDs as keys and aggregated values as values.
                If value_columns is None, returns point counts per zone.
                If value_columns is specified, returns aggregated values per zone.
        """

        self.logger.warning(
            "Using default points mapping implementation. Consider creating a specialized mapping function."
        )
        result = aggregate_points_to_zones(
            points=points,
            zones=self.zone_gdf,
            value_columns=value_columns,
            aggregation=aggregation,
            point_zone_predicate=predicate,
            zone_id_column="zone_id",
            output_suffix=output_suffix,
        )

        if isinstance(value_columns, str):
            return result.set_index("zone_id")[value_columns].to_dict()
        elif isinstance(value_columns, list):
            # If multiple value columns, return a dictionary of dictionaries
            # Or, if preferred, a dictionary where values are lists/tuples of results
            # For now, let's return a dict of series, which is common.
            # The previous version implied a single dictionary result from map_points/polygons
            # but with multiple columns, it's usually {zone_id: {col1: val1, col2: val2}}
            # or {col_name: {zone_id: val}}
            # In this version, it'll return a dictionary for each column.
            return {
                col: result.set_index("zone_id")[col].to_dict() for col in value_columns
            }
        else:  # If value_columns is None, it should return point_count
            self.logger.warning(
                "No `value_columns` provided. Mapping point counts. Consider passing `value_columns` and `aggregation` or `mapping_function`."
            )
            return result.set_index("zone_id")["point_count"].to_dict()

    def map_polygons(
        self,
        polygons,
        value_columns: Optional[Union[str, List[str]]] = None,
        aggregation: Union[str, Dict[str, str]] = "count",
        predicate: str = "intersects",
        **kwargs,
    ) -> Dict:
        """
        Maps polygon data to the instance's zones and aggregates values.

        This method leverages `aggregate_polygons_to_zones` to perform a spatial
        aggregation of polygon data onto the zones stored within this object instance.
        It can count polygons, or aggregate their values, based on different spatial
        relationships defined by the `predicate`.

        Args:
            polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
                The polygon data to map. Must contain geometry information if a
                DataFrame.
            value_columns (Union[str, List[str]], optional):
                The column name(s) from the `polygons` data to aggregate. If `None`,
                the method will automatically count the number of polygons that
                match the given `predicate` for each zone.
            aggregation (Union[str, Dict[str, str]], optional):
                The aggregation method(s) to use. Can be a single string (e.g., "sum",
                "mean", "max") or a dictionary mapping column names to specific
                aggregation methods. This is ignored and set to "count" if
                `value_columns` is `None`. Defaults to "count".
            predicate (Literal["intersects", "within", "fractional"], optional):
                The spatial relationship to use for aggregation:
                - "intersects": Counts or aggregates values for any polygon that
                  intersects a zone.
                - "within": Counts or aggregates values for polygons that are
                  entirely contained within a zone.
                - "fractional": Performs area-weighted aggregation. The value of a
                  polygon is distributed proportionally to the area of its overlap
                  with each zone.
                Defaults to "intersects".
            **kwargs:
                Additional keyword arguments to be passed to the underlying
                `aggregate_polygons_to_zones_new` function.

        Returns:
            Dict:
                A dictionary or a nested dictionary containing the aggregated values,
                with zone IDs as keys. If `value_columns` is a single string, the
                return value is a dictionary mapping zone ID to the aggregated value.
                If `value_columns` is a list, the return value is a nested dictionary
                mapping each column name to its own dictionary of aggregated values.

        Raises:
            ValueError: If `value_columns` is of an unexpected type after processing.

        Example:
            >>> # Assuming 'self' is an object with a 'zone_gdf' attribute
            >>> # Count all land parcels that intersect each zone
            >>> parcel_counts = self.map_polygons(landuse_polygons)
            >>>
            >>> # Aggregate total population within zones using area weighting
            >>> population_by_zone = self.map_polygons(
            ...     landuse_polygons,
            ...     value_columns="population",
            ...     predicate="fractional",
            ...     aggregation="sum"
            ... )
            >>>
            >>> # Get the sum of residential area and count of buildings within each zone
            >>> residential_stats = self.map_polygons(
            ...     building_polygons,
            ...     value_columns=["residential_area_sqm", "building_id"],
            ...     aggregation={"residential_area_sqm": "sum", "building_id": "count"},
            ...     predicate="intersects"
            ... )
        """

        if value_columns is None:
            self.logger.warning(
                f"No value_columns specified. Defaulting to counting polygons with {predicate} predicate."
            )
            temp_value_col = "_temp_polygon_count_dummy"
            polygons[temp_value_col] = 1
            actual_value_columns = temp_value_col
            aggregation = "count"  # Force count if no value columns
        else:
            actual_value_columns = value_columns

        result = aggregate_polygons_to_zones(
            polygons=polygons,
            zones=self.zone_gdf,
            value_columns=actual_value_columns,
            aggregation=aggregation,
            predicate=predicate,
            zone_id_column="zone_id",
        )

        # Convert the result GeoDataFrame to the expected dictionary format
        if isinstance(actual_value_columns, str):
            return result.set_index("zone_id")[actual_value_columns].to_dict()
        elif isinstance(actual_value_columns, list):
            return {
                col: result.set_index("zone_id")[col].to_dict()
                for col in actual_value_columns
            }
        else:
            raise ValueError("Unexpected type for actual_value_columns.")

    def map_rasters(
        self,
        raster_data: Union[TifProcessor, List[TifProcessor]],
        stat: str = "mean",
        **kwargs,
    ) -> Dict:
        """Map raster data to zones using zonal statistics.

        Samples raster values within each zone and computes statistics. Automatically
        handles coordinate reference system transformations between raster and zone data.

        Args:
            raster_data (Union[TifProcessor, List[TifProcessor]]):
                Either a TifProcessor object or a list of TifProcessor objects (which will be merged
                into a single TifProcessor for processing).
            mapping_function (Callable, optional): Custom function for mapping rasters
                to zones. If provided, signature should be mapping_function(self, tif_processors, **mapping_kwargs).
                When used, stat and other parameters except mapping_kwargs are ignored.
            stat (str): Statistic to calculate when aggregating raster values within
                each zone. Options include "mean", "sum", "min", "max", "std", etc.
                Defaults to "mean".
            **mapping_kwargs: Additional keyword arguments for raster data.

        Returns:
            Dict: By default, returns a dictionary of sampled values
                with zone IDs as keys.

        Note:
            If the coordinate reference system of the rasters differs from the zones,
            the zone geometries will be automatically transformed to match the raster CRS.
        """
        raster_processor: Optional[TifProcessor] = None

        if isinstance(raster_data, TifProcessor):
            raster_processor = raster_data
        elif isinstance(raster_data, list) and all(
            isinstance(x, TifProcessor) for x in raster_data
        ):
            if not raster_data:
                self.logger.info("No valid raster data provided")
                return self.view

            if len(raster_data) > 1:
                all_source_paths = [tp.dataset_path for tp in raster_data]

                self.logger.info(
                    f"Merging {len(all_source_paths)} rasters into a single TifProcessor for zonal statistics."
                )
                raster_processor = TifProcessor(
                    dataset_path=all_source_paths, data_store=self.data_store, **kwargs
                )
            else:
                raster_processor = raster_data[0]
        else:
            raise ValueError(
                "raster_data must be a TifProcessor object or a list of TifProcessor objects."
            )

        raster_crs = raster_processor.crs

        if raster_crs != self.zone_gdf.crs:
            self.logger.info(f"Projecting zones to raster CRS: {raster_crs}")
            zone_geoms = self._get_transformed_geometries(raster_crs)
        else:
            zone_geoms = self.get_zonal_geometries()

        # Sample raster values
        sampled_values = raster_processor.sample_by_polygons(
            polygon_list=zone_geoms, stat=stat
        )

        zone_ids = self.get_zone_identifiers()

        return {zone_id: value for zone_id, value in zip(zone_ids, sampled_values)}

    @lru_cache(maxsize=32)
    def _get_transformed_geometries(self, target_crs):
        """Get zone geometries transformed to target coordinate reference system.

        This method is cached to avoid repeated coordinate transformations for
        the same target CRS.

        Args:
            target_crs: Target coordinate reference system for transformation.

        Returns:
            List[Polygon]: List of zone geometries transformed to the target CRS.
        """
        return self.zone_gdf.to_crs(target_crs).geometry.tolist()

    def save_view(
        self,
        name: str,
        output_format: Optional[str] = None,
    ) -> Path:
        """Save the generated zonal view to disk.

        Args:
            name (str): Base name for the output file (without extension).
            output_format (str, optional): File format to save in (e.g., "parquet",
                "geojson", "shp"). If None, uses the format specified in config.

        Returns:
            Path: The full path where the view was saved.

        Note:
            The output directory is determined by the config.base_path setting.
            The file extension is automatically added based on the output format.
            This method now saves the internal `self.view`.
        """
        if self._view is None:
            self.logger.warning(
                "No variables have been added to the zonal view. Saving the base zone_gdf."
            )
            view_to_save = self.zone_gdf
        else:
            view_to_save = self._view

        format_to_use = output_format or self.config.output_format
        output_path = self.config.base_path / f"{name}.{format_to_use}"

        self.logger.info(f"Saving zonal view to {output_path}")

        if format_to_use in ["geojson", "shp", "gpkg"]:
            self.logger.warning(
                f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
            )
            # Re-add geometry for saving to geospatial formats
            view_to_save = self.view.merge(
                self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
            )

        write_dataset(
            data=view_to_save,
            path=str(output_path),
            data_store=self.data_store,
        )

        return output_path

    def to_dataframe(self) -> pd.DataFrame:
        """
        Returns the current zonal view as a DataFrame.

        This method combines all accumulated variables in the view

        Returns:
            pd.DataFrame: The current view.
        """
        return self.view

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Returns the current zonal view merged with zone geometries as a GeoDataFrame.

        This method combines all accumulated variables in the view with the corresponding
        zone geometries, providing a spatially-enabled DataFrame for further analysis or export.

        Returns:
            gpd.GeoDataFrame: The current view merged with zone geometries.
        """
        return gpd.GeoDataFrame(
            (self.view).merge(
                self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
            ),
            crs=self.zone_gdf.crs,
        )
view: pd.DataFrame property

The DataFrame representing the current zonal view.

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame containing zone IDs, and any added variables. If no variables have been added, it returns the base zone_gdf without geometries.

zone_gdf: gpd.GeoDataFrame property

Cached GeoDataFrame of zones.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Lazily-computed and cached GeoDataFrame of zone geometries and identifiers.

__init__(config=None, data_store=None, logger=None)

Initialize the ZonalViewGenerator.

Parameters:

Name Type Description Default
generator_config ZonalViewGeneratorConfig

Configuration for the generator. If None, uses default configuration.

required
data_store DataStore

The data store for accessing input data. If None, uses LocalDataStore.

None
Source code in gigaspatial/generators/zonal/base.py
def __init__(
    self,
    config: Optional[ZonalViewGeneratorConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: logging.Logger = None,
):
    """Initialize the ZonalViewGenerator.

    Args:
        generator_config (ZonalViewGeneratorConfig, optional): Configuration for the generator.
            If None, uses default configuration.
        data_store (DataStore, optional): The data store for accessing input data.
            If None, uses LocalDataStore.
    """
    self.config = config or ZonalViewGeneratorConfig()
    self.data_store = data_store or LocalDataStore()
    self.logger = logger or global_config.get_logger(self.__class__.__name__)
    self._view: Optional[pd.DataFrame] = None
add_variable_to_view(data_dict, column_name)

Adds a new variable (column) to the zonal view GeoDataFrame.

This method takes a dictionary (typically the result of map_points or map_polygons) and adds its values as a new column to the internal _view (or zone_gdf if not yet initialized). The dictionary keys are expected to be the zone_id values.

Parameters:

Name Type Description Default
data_dict Dict

A dictionary where keys are zone_ids and values are the data to be added.

required
column_name str

The name of the new column to be added to the GeoDataFrame.

required
Source code in gigaspatial/generators/zonal/base.py
def add_variable_to_view(self, data_dict: Dict, column_name: str) -> None:
    """
    Adds a new variable (column) to the zonal view GeoDataFrame.

    This method takes a dictionary (typically the result of map_points or map_polygons)
    and adds its values as a new column to the internal `_view` (or `zone_gdf` if not yet initialized).
    The dictionary keys are expected to be the `zone_id` values.

    Args:
        data_dict (Dict): A dictionary where keys are `zone_id`s and values are
                          the data to be added.
        column_name (str): The name of the new column to be added to the GeoDataFrame.
    Raises:
        ValueError: If the `data_dict` keys do not match the `zone_id`s in the zonal view.
                    If the `column_name` already exists in the zonal view.
    """
    if self._view is None:
        self._view = self.zone_gdf.drop(columns="geometry")

    if column_name in self._view.columns:
        raise ValueError(
            f"Column '{column_name}' already exists in the zonal view."
        )

    # Create a pandas Series from the dictionary, aligning by index (zone_id)
    new_series = pd.Series(data_dict, name=column_name)

    # Before merging, ensure the zone_ids in data_dict match those in _view
    missing_zones_in_data = set(self._view["zone_id"]) - set(new_series.index)
    extra_zones_in_data = set(new_series.index) - set(self._view["zone_id"])

    if missing_zones_in_data:
        self.logger.warning(
            f"Warning: {len(missing_zones_in_data)} zone(s) from the zonal view "
            f"are missing in the provided data_dict for column '{column_name}'. "
            f"These zones will have NaN values for '{column_name}'. Missing: {list(missing_zones_in_data)[:5]}..."
        )
    if extra_zones_in_data:
        self.logger.warning(
            f"Warning: {len(extra_zones_in_data)} zone(s) in the provided data_dict "
            f"are not present in the zonal view for column '{column_name}'. "
            f"These will be ignored. Extra: {list(extra_zones_in_data)[:5]}..."
        )

    # Merge the new series with the _view based on 'zone_id'
    # Using .set_index() for efficient alignment
    original_index_name = self._view.index.name
    self._view = self._view.set_index("zone_id").join(new_series).reset_index()
    if original_index_name:  # Restore original index name if it existed
        self._view.index.name = original_index_name
    else:  # If it was a default integer index, ensure it's not named 'index'
        self._view.index.name = None

    self.logger.info(f"Added variable '{column_name}' to the zonal view.")
get_zonal_geometries() abstractmethod

Get the geometries of the zones.

This method must be implemented by subclasses to return the actual geometric shapes of the zones (e.g., grid tiles, catchment boundaries, administrative areas).

Returns:

Type Description
List[Polygon]

List[Polygon]: A list of Shapely Polygon objects representing zone geometries.

Source code in gigaspatial/generators/zonal/base.py
@abstractmethod
def get_zonal_geometries(self) -> List[Polygon]:
    """Get the geometries of the zones.

    This method must be implemented by subclasses to return the actual geometric
    shapes of the zones (e.g., grid tiles, catchment boundaries, administrative areas).

    Returns:
        List[Polygon]: A list of Shapely Polygon objects representing zone geometries.
    """
    pass
get_zone_geodataframe()

Convert zones to a GeoDataFrame.

Creates a GeoDataFrame containing zone identifiers and their corresponding geometries in WGS84 (EPSG:4326) coordinate reference system.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns, where zone_id contains the identifiers and geometry contains the corresponding Polygon objects.

Source code in gigaspatial/generators/zonal/base.py
def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert zones to a GeoDataFrame.

    Creates a GeoDataFrame containing zone identifiers and their corresponding
    geometries in WGS84 (EPSG:4326) coordinate reference system.

    Returns:
        gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns,
            where zone_id contains the identifiers and geometry contains the
            corresponding Polygon objects.
    """
    return gpd.GeoDataFrame(
        {
            "zone_id": self.get_zone_identifiers(),
            "geometry": self.get_zonal_geometries(),
        },
        crs="EPSG:4326",
    )
get_zone_identifiers() abstractmethod

Get unique identifiers for each zone.

This method must be implemented by subclasses to return identifiers that correspond one-to-one with the geometries returned by get_zonal_geometries().

Returns:

Type Description
List[T]

List[T]: A list of zone identifiers (e.g., quadkeys, H3 indices, tile IDs). The type T is determined by the specific zonal system implementation.

Source code in gigaspatial/generators/zonal/base.py
@abstractmethod
def get_zone_identifiers(self) -> List[T]:
    """Get unique identifiers for each zone.

    This method must be implemented by subclasses to return identifiers that
    correspond one-to-one with the geometries returned by get_zonal_geometries().

    Returns:
        List[T]: A list of zone identifiers (e.g., quadkeys, H3 indices, tile IDs).
            The type T is determined by the specific zonal system implementation.
    """
    pass
map_points(points, value_columns=None, aggregation='count', predicate='within', output_suffix='')

Map point data to zones with spatial aggregation.

Aggregates point data to zones using spatial relationships. Points can be counted or have their attribute values aggregated using various statistical methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

The point data to map. Must contain geometry information if DataFrame.

required
value_columns Union[str, List[str]]

Column name(s) containing values to aggregate. If None, only point counts are performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use. Can be a single string ("count", "mean", "sum", "min", "max", etc.) or a dictionary mapping column names to aggregation methods.

'count'
predicate str

Spatial predicate for point-to-zone relationship. Options include "within", "intersects", "contains". Defaults to "within".

'within'
output_suffix str

Suffix to add to output column names. Defaults to empty string.

''

Returns:

Name Type Description
Dict Dict

Dictionary with zone IDs as keys and aggregated values as values. If value_columns is None, returns point counts per zone. If value_columns is specified, returns aggregated values per zone.

Source code in gigaspatial/generators/zonal/base.py
def map_points(
    self,
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    predicate: str = "within",
    output_suffix: str = "",
) -> Dict:
    """Map point data to zones with spatial aggregation.

    Aggregates point data to zones using spatial relationships. Points can be
    counted or have their attribute values aggregated using various statistical methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): The point data to map.
            Must contain geometry information if DataFrame.
        value_columns (Union[str, List[str]], optional): Column name(s) containing
            values to aggregate. If None, only point counts are performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use.
            Can be a single string ("count", "mean", "sum", "min", "max", etc.)
            or a dictionary mapping column names to aggregation methods.
        predicate (str): Spatial predicate for point-to-zone relationship.
            Options include "within", "intersects", "contains". Defaults to "within".
        output_suffix (str): Suffix to add to output column names. Defaults to empty string.

    Returns:
        Dict: Dictionary with zone IDs as keys and aggregated values as values.
            If value_columns is None, returns point counts per zone.
            If value_columns is specified, returns aggregated values per zone.
    """

    self.logger.warning(
        "Using default points mapping implementation. Consider creating a specialized mapping function."
    )
    result = aggregate_points_to_zones(
        points=points,
        zones=self.zone_gdf,
        value_columns=value_columns,
        aggregation=aggregation,
        point_zone_predicate=predicate,
        zone_id_column="zone_id",
        output_suffix=output_suffix,
    )

    if isinstance(value_columns, str):
        return result.set_index("zone_id")[value_columns].to_dict()
    elif isinstance(value_columns, list):
        # If multiple value columns, return a dictionary of dictionaries
        # Or, if preferred, a dictionary where values are lists/tuples of results
        # For now, let's return a dict of series, which is common.
        # The previous version implied a single dictionary result from map_points/polygons
        # but with multiple columns, it's usually {zone_id: {col1: val1, col2: val2}}
        # or {col_name: {zone_id: val}}
        # In this version, it'll return a dictionary for each column.
        return {
            col: result.set_index("zone_id")[col].to_dict() for col in value_columns
        }
    else:  # If value_columns is None, it should return point_count
        self.logger.warning(
            "No `value_columns` provided. Mapping point counts. Consider passing `value_columns` and `aggregation` or `mapping_function`."
        )
        return result.set_index("zone_id")["point_count"].to_dict()
map_polygons(polygons, value_columns=None, aggregation='count', predicate='intersects', **kwargs)

Maps polygon data to the instance's zones and aggregates values.

This method leverages aggregate_polygons_to_zones to perform a spatial aggregation of polygon data onto the zones stored within this object instance. It can count polygons, or aggregate their values, based on different spatial relationships defined by the predicate.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

The polygon data to map. Must contain geometry information if a DataFrame.

required
value_columns Union[str, List[str]]

The column name(s) from the polygons data to aggregate. If None, the method will automatically count the number of polygons that match the given predicate for each zone.

None
aggregation Union[str, Dict[str, str]]

The aggregation method(s) to use. Can be a single string (e.g., "sum", "mean", "max") or a dictionary mapping column names to specific aggregation methods. This is ignored and set to "count" if value_columns is None. Defaults to "count".

'count'
predicate Literal[intersects, within, fractional]

The spatial relationship to use for aggregation: - "intersects": Counts or aggregates values for any polygon that intersects a zone. - "within": Counts or aggregates values for polygons that are entirely contained within a zone. - "fractional": Performs area-weighted aggregation. The value of a polygon is distributed proportionally to the area of its overlap with each zone. Defaults to "intersects".

'intersects'
**kwargs

Additional keyword arguments to be passed to the underlying aggregate_polygons_to_zones_new function.

{}

Returns:

Name Type Description
Dict Dict

A dictionary or a nested dictionary containing the aggregated values, with zone IDs as keys. If value_columns is a single string, the return value is a dictionary mapping zone ID to the aggregated value. If value_columns is a list, the return value is a nested dictionary mapping each column name to its own dictionary of aggregated values.

Raises:

Type Description
ValueError

If value_columns is of an unexpected type after processing.

Example
Assuming 'self' is an object with a 'zone_gdf' attribute
Count all land parcels that intersect each zone

parcel_counts = self.map_polygons(landuse_polygons)

Aggregate total population within zones using area weighting

population_by_zone = self.map_polygons( ... landuse_polygons, ... value_columns="population", ... predicate="fractional", ... aggregation="sum" ... )

Get the sum of residential area and count of buildings within each zone

residential_stats = self.map_polygons( ... building_polygons, ... value_columns=["residential_area_sqm", "building_id"], ... aggregation={"residential_area_sqm": "sum", "building_id": "count"}, ... predicate="intersects" ... )

Source code in gigaspatial/generators/zonal/base.py
def map_polygons(
    self,
    polygons,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    predicate: str = "intersects",
    **kwargs,
) -> Dict:
    """
    Maps polygon data to the instance's zones and aggregates values.

    This method leverages `aggregate_polygons_to_zones` to perform a spatial
    aggregation of polygon data onto the zones stored within this object instance.
    It can count polygons, or aggregate their values, based on different spatial
    relationships defined by the `predicate`.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
            The polygon data to map. Must contain geometry information if a
            DataFrame.
        value_columns (Union[str, List[str]], optional):
            The column name(s) from the `polygons` data to aggregate. If `None`,
            the method will automatically count the number of polygons that
            match the given `predicate` for each zone.
        aggregation (Union[str, Dict[str, str]], optional):
            The aggregation method(s) to use. Can be a single string (e.g., "sum",
            "mean", "max") or a dictionary mapping column names to specific
            aggregation methods. This is ignored and set to "count" if
            `value_columns` is `None`. Defaults to "count".
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation:
            - "intersects": Counts or aggregates values for any polygon that
              intersects a zone.
            - "within": Counts or aggregates values for polygons that are
              entirely contained within a zone.
            - "fractional": Performs area-weighted aggregation. The value of a
              polygon is distributed proportionally to the area of its overlap
              with each zone.
            Defaults to "intersects".
        **kwargs:
            Additional keyword arguments to be passed to the underlying
            `aggregate_polygons_to_zones_new` function.

    Returns:
        Dict:
            A dictionary or a nested dictionary containing the aggregated values,
            with zone IDs as keys. If `value_columns` is a single string, the
            return value is a dictionary mapping zone ID to the aggregated value.
            If `value_columns` is a list, the return value is a nested dictionary
            mapping each column name to its own dictionary of aggregated values.

    Raises:
        ValueError: If `value_columns` is of an unexpected type after processing.

    Example:
        >>> # Assuming 'self' is an object with a 'zone_gdf' attribute
        >>> # Count all land parcels that intersect each zone
        >>> parcel_counts = self.map_polygons(landuse_polygons)
        >>>
        >>> # Aggregate total population within zones using area weighting
        >>> population_by_zone = self.map_polygons(
        ...     landuse_polygons,
        ...     value_columns="population",
        ...     predicate="fractional",
        ...     aggregation="sum"
        ... )
        >>>
        >>> # Get the sum of residential area and count of buildings within each zone
        >>> residential_stats = self.map_polygons(
        ...     building_polygons,
        ...     value_columns=["residential_area_sqm", "building_id"],
        ...     aggregation={"residential_area_sqm": "sum", "building_id": "count"},
        ...     predicate="intersects"
        ... )
    """

    if value_columns is None:
        self.logger.warning(
            f"No value_columns specified. Defaulting to counting polygons with {predicate} predicate."
        )
        temp_value_col = "_temp_polygon_count_dummy"
        polygons[temp_value_col] = 1
        actual_value_columns = temp_value_col
        aggregation = "count"  # Force count if no value columns
    else:
        actual_value_columns = value_columns

    result = aggregate_polygons_to_zones(
        polygons=polygons,
        zones=self.zone_gdf,
        value_columns=actual_value_columns,
        aggregation=aggregation,
        predicate=predicate,
        zone_id_column="zone_id",
    )

    # Convert the result GeoDataFrame to the expected dictionary format
    if isinstance(actual_value_columns, str):
        return result.set_index("zone_id")[actual_value_columns].to_dict()
    elif isinstance(actual_value_columns, list):
        return {
            col: result.set_index("zone_id")[col].to_dict()
            for col in actual_value_columns
        }
    else:
        raise ValueError("Unexpected type for actual_value_columns.")
map_rasters(raster_data, stat='mean', **kwargs)

Map raster data to zones using zonal statistics.

Samples raster values within each zone and computes statistics. Automatically handles coordinate reference system transformations between raster and zone data.

Parameters:

Name Type Description Default
raster_data Union[TifProcessor, List[TifProcessor]]

Either a TifProcessor object or a list of TifProcessor objects (which will be merged into a single TifProcessor for processing).

required
mapping_function Callable

Custom function for mapping rasters to zones. If provided, signature should be mapping_function(self, tif_processors, **mapping_kwargs). When used, stat and other parameters except mapping_kwargs are ignored.

required
stat str

Statistic to calculate when aggregating raster values within each zone. Options include "mean", "sum", "min", "max", "std", etc. Defaults to "mean".

'mean'
**mapping_kwargs

Additional keyword arguments for raster data.

required

Returns:

Name Type Description
Dict Dict

By default, returns a dictionary of sampled values with zone IDs as keys.

Note

If the coordinate reference system of the rasters differs from the zones, the zone geometries will be automatically transformed to match the raster CRS.

Source code in gigaspatial/generators/zonal/base.py
def map_rasters(
    self,
    raster_data: Union[TifProcessor, List[TifProcessor]],
    stat: str = "mean",
    **kwargs,
) -> Dict:
    """Map raster data to zones using zonal statistics.

    Samples raster values within each zone and computes statistics. Automatically
    handles coordinate reference system transformations between raster and zone data.

    Args:
        raster_data (Union[TifProcessor, List[TifProcessor]]):
            Either a TifProcessor object or a list of TifProcessor objects (which will be merged
            into a single TifProcessor for processing).
        mapping_function (Callable, optional): Custom function for mapping rasters
            to zones. If provided, signature should be mapping_function(self, tif_processors, **mapping_kwargs).
            When used, stat and other parameters except mapping_kwargs are ignored.
        stat (str): Statistic to calculate when aggregating raster values within
            each zone. Options include "mean", "sum", "min", "max", "std", etc.
            Defaults to "mean".
        **mapping_kwargs: Additional keyword arguments for raster data.

    Returns:
        Dict: By default, returns a dictionary of sampled values
            with zone IDs as keys.

    Note:
        If the coordinate reference system of the rasters differs from the zones,
        the zone geometries will be automatically transformed to match the raster CRS.
    """
    raster_processor: Optional[TifProcessor] = None

    if isinstance(raster_data, TifProcessor):
        raster_processor = raster_data
    elif isinstance(raster_data, list) and all(
        isinstance(x, TifProcessor) for x in raster_data
    ):
        if not raster_data:
            self.logger.info("No valid raster data provided")
            return self.view

        if len(raster_data) > 1:
            all_source_paths = [tp.dataset_path for tp in raster_data]

            self.logger.info(
                f"Merging {len(all_source_paths)} rasters into a single TifProcessor for zonal statistics."
            )
            raster_processor = TifProcessor(
                dataset_path=all_source_paths, data_store=self.data_store, **kwargs
            )
        else:
            raster_processor = raster_data[0]
    else:
        raise ValueError(
            "raster_data must be a TifProcessor object or a list of TifProcessor objects."
        )

    raster_crs = raster_processor.crs

    if raster_crs != self.zone_gdf.crs:
        self.logger.info(f"Projecting zones to raster CRS: {raster_crs}")
        zone_geoms = self._get_transformed_geometries(raster_crs)
    else:
        zone_geoms = self.get_zonal_geometries()

    # Sample raster values
    sampled_values = raster_processor.sample_by_polygons(
        polygon_list=zone_geoms, stat=stat
    )

    zone_ids = self.get_zone_identifiers()

    return {zone_id: value for zone_id, value in zip(zone_ids, sampled_values)}
save_view(name, output_format=None)

Save the generated zonal view to disk.

Parameters:

Name Type Description Default
name str

Base name for the output file (without extension).

required
output_format str

File format to save in (e.g., "parquet", "geojson", "shp"). If None, uses the format specified in config.

None

Returns:

Name Type Description
Path Path

The full path where the view was saved.

Note

The output directory is determined by the config.base_path setting. The file extension is automatically added based on the output format. This method now saves the internal self.view.

Source code in gigaspatial/generators/zonal/base.py
def save_view(
    self,
    name: str,
    output_format: Optional[str] = None,
) -> Path:
    """Save the generated zonal view to disk.

    Args:
        name (str): Base name for the output file (without extension).
        output_format (str, optional): File format to save in (e.g., "parquet",
            "geojson", "shp"). If None, uses the format specified in config.

    Returns:
        Path: The full path where the view was saved.

    Note:
        The output directory is determined by the config.base_path setting.
        The file extension is automatically added based on the output format.
        This method now saves the internal `self.view`.
    """
    if self._view is None:
        self.logger.warning(
            "No variables have been added to the zonal view. Saving the base zone_gdf."
        )
        view_to_save = self.zone_gdf
    else:
        view_to_save = self._view

    format_to_use = output_format or self.config.output_format
    output_path = self.config.base_path / f"{name}.{format_to_use}"

    self.logger.info(f"Saving zonal view to {output_path}")

    if format_to_use in ["geojson", "shp", "gpkg"]:
        self.logger.warning(
            f"Saving to {format_to_use} requires converting back to GeoDataFrame. Geometry column will be re-added."
        )
        # Re-add geometry for saving to geospatial formats
        view_to_save = self.view.merge(
            self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
        )

    write_dataset(
        data=view_to_save,
        path=str(output_path),
        data_store=self.data_store,
    )

    return output_path
to_dataframe()

Returns the current zonal view as a DataFrame.

This method combines all accumulated variables in the view

Returns:

Type Description
DataFrame

pd.DataFrame: The current view.

Source code in gigaspatial/generators/zonal/base.py
def to_dataframe(self) -> pd.DataFrame:
    """
    Returns the current zonal view as a DataFrame.

    This method combines all accumulated variables in the view

    Returns:
        pd.DataFrame: The current view.
    """
    return self.view
to_geodataframe()

Returns the current zonal view merged with zone geometries as a GeoDataFrame.

This method combines all accumulated variables in the view with the corresponding zone geometries, providing a spatially-enabled DataFrame for further analysis or export.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The current view merged with zone geometries.

Source code in gigaspatial/generators/zonal/base.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Returns the current zonal view merged with zone geometries as a GeoDataFrame.

    This method combines all accumulated variables in the view with the corresponding
    zone geometries, providing a spatially-enabled DataFrame for further analysis or export.

    Returns:
        gpd.GeoDataFrame: The current view merged with zone geometries.
    """
    return gpd.GeoDataFrame(
        (self.view).merge(
            self.zone_gdf[["zone_id", "geometry"]], on="zone_id", how="left"
        ),
        crs=self.zone_gdf.crs,
    )
ZonalViewGeneratorConfig

Bases: BaseModel

Configuration for zonal view generation.

Attributes:

Name Type Description
base_path Path

Base directory path for storing zonal views. Defaults to configured zonal views path.

output_format str

Default output format for saved views. Defaults to "parquet".

Source code in gigaspatial/generators/zonal/base.py
class ZonalViewGeneratorConfig(BaseModel):
    """Configuration for zonal view generation.

    Attributes:
        base_path (Path): Base directory path for storing zonal views. Defaults to
            configured zonal views path.
        output_format (str): Default output format for saved views. Defaults to "parquet".
    """

    base_path: Path = Field(default=global_config.get_path("zonal", "views"))
    output_format: str = "parquet"
    ensure_available: bool = True

geometry

GeometryBasedZonalViewGenerator

Bases: ZonalViewGenerator[T]

Mid-level class for zonal view generation based on geometries with identifiers.

This class serves as an intermediate between the abstract ZonalViewGenerator and specific implementations like MercatorViewGenerator or H3ViewGenerator. It handles the common case where zones are defined by a mapping between zone identifiers and geometries, either provided as a dictionary or as a GeoDataFrame.

The class extends the base functionality with methods for mapping common geospatial datasets including GHSL (Global Human Settlement Layer), Google Open Buildings, and Microsoft Global Buildings data.

Attributes:

Name Type Description
zone_dict Dict[T, Polygon]

Mapping of zone identifiers to geometries.

zone_id_column str

Name of the column containing zone identifiers.

zone_data_crs str

Coordinate reference system of the zone data.

_zone_gdf GeoDataFrame

Cached GeoDataFrame representation of zones.

data_store DataStore

For accessing input data.

config ZonalViewGeneratorConfig

Configuration for view generation.

logger ZonalViewGeneratorConfig

Logger instance for this class.

Source code in gigaspatial/generators/zonal/geometry.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
class GeometryBasedZonalViewGenerator(ZonalViewGenerator[T]):
    """Mid-level class for zonal view generation based on geometries with identifiers.

    This class serves as an intermediate between the abstract ZonalViewGenerator and specific
    implementations like MercatorViewGenerator or H3ViewGenerator. It handles the common case
    where zones are defined by a mapping between zone identifiers and geometries, either
    provided as a dictionary or as a GeoDataFrame.

    The class extends the base functionality with methods for mapping common geospatial
    datasets including GHSL (Global Human Settlement Layer), Google Open Buildings,
    and Microsoft Global Buildings data.

    Attributes:
        zone_dict (Dict[T, Polygon]): Mapping of zone identifiers to geometries.
        zone_id_column (str): Name of the column containing zone identifiers.
        zone_data_crs (str): Coordinate reference system of the zone data.
        _zone_gdf (gpd.GeoDataFrame): Cached GeoDataFrame representation of zones.
        data_store (DataStore): For accessing input data.
        config (ZonalViewGeneratorConfig): Configuration for view generation.
        logger: Logger instance for this class.
    """

    def __init__(
        self,
        zone_data: Union[Dict[T, Polygon], gpd.GeoDataFrame],
        zone_id_column: str = "zone_id",
        zone_data_crs: str = "EPSG:4326",
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):
        """Initialize with zone geometries and identifiers.

        Args:
            zone_data (Union[Dict[T, Polygon], gpd.GeoDataFrame]): Zone definitions.
                Either a dictionary mapping zone identifiers to Polygon/MultiPolygon geometries,
                or a GeoDataFrame with geometries and a zone identifier column.
            zone_id_column (str): Name of the column containing zone identifiers.
                Only used if zone_data is a GeoDataFrame. Defaults to "zone_id".
            zone_data_crs (str): Coordinate reference system of the zone data.
                Defaults to "EPSG:4326" (WGS84).
            config (ZonalViewGeneratorConfig, optional): Generator configuration.
                If None, uses default configuration.
            data_store (DataStore, optional): Data store for accessing input data.
                If None, uses LocalDataStore.

        Raises:
            TypeError: If zone_data is not a dictionary or GeoDataFrame, or if dictionary
                values are not Polygon/MultiPolygon geometries.
            ValueError: If zone_id_column is not found in GeoDataFrame, or if the provided
                CRS doesn't match the GeoDataFrame's CRS.
        """
        super().__init__(config=config, data_store=data_store, logger=logger)

        self.zone_id_column = zone_id_column
        self.zone_data_crs = zone_data_crs

        # Store zone data based on input type
        if isinstance(zone_data, dict):
            for zone_id, geom in zone_data.items():
                if not isinstance(geom, (Polygon, MultiPolygon)):
                    raise TypeError(
                        f"Zone {zone_id}: Expected (Multi)Polygon, got {type(geom).__name__}"
                    )

            # Store the original dictionary
            self.zone_dict = zone_data

            # Also create a GeoDataFrame for consistent access
            self._zone_gdf = gpd.GeoDataFrame(
                {
                    "zone_id": list(zone_data.keys()),
                    "geometry": list(zone_data.values()),
                },
                crs=zone_data_crs,
            )
            self.zone_id_column = "zone_id"
        else:
            if not isinstance(zone_data, gpd.GeoDataFrame):
                raise TypeError(
                    "zone_data must be either a Dict[T, Polygon] or a GeoDataFrame"
                )

            if zone_id_column not in zone_data.columns:
                raise ValueError(
                    f"Zone ID column '{zone_id_column}' not found in GeoDataFrame"
                )

            if zone_data_crs != zone_data.crs:
                raise ValueError(
                    f"Provided data crs '{zone_data_crs}' does not match to the crs of the data '{zone_data.crs}'"
                )

            # Store the GeoDataFrame
            self._zone_gdf = zone_data.rename(columns={zone_id_column: "zone_id"})

            # Also create a dictionary for fast lookups
            self.zone_dict = dict(zip(zone_data[zone_id_column], zone_data.geometry))

    def get_zonal_geometries(self) -> List[Polygon]:
        """Get the geometry of each zone.

        Returns:
            List[Polygon]: A list of zone geometries in the order they appear in the
                underlying GeoDataFrame.
        """
        return self._zone_gdf.geometry.tolist()

    def get_zone_identifiers(self) -> List[T]:
        """Get the identifier for each zone.

        Returns:
            List[T]: A list of zone identifiers in the order they appear in the
                underlying GeoDataFrame.
        """
        return self._zone_gdf.zone_id.tolist()

    def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert zones to a GeoDataFrame with standardized column names.

        Returns:
            gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns.
                The zone_id column is renamed from the original zone_id_column if different.
        """
        # Since _zone_gdf is already created with 'zone_id' column in the constructor,
        # we just need to return a copy of it
        return self._zone_gdf.copy()

    @property
    def zone_gdf(self) -> gpd.GeoDataFrame:
        """Override the base class zone_gdf property to ensure correct column names.

        Returns:
            gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns.
        """
        return self._zone_gdf.copy()

    def map_built_s(
        self,
        year=2020,
        resolution=100,
        stat: str = "sum",
        output_column: str = "built_surface_m2",
        **kwargs,
    ) -> pd.DataFrame:
        """Map GHSL Built-up Surface data to zones.

        Convenience method for mapping Global Human Settlement Layer Built-up Surface
        data using appropriate default parameters for built surface analysis.

        Args:
            year: The year of the data (default: 2020)
            resolution: The resolution in meters (default: 100)
            stat (str): Statistic to calculate for built surface values within each zone.
                Defaults to "sum" which gives total built surface area.
            output_column (str): The output column name. Defaults to "built_surface_m2".
        Returns:
            pd.DataFrame: Updated view DataFrame and settlement classification.
                Adds a column with `output_column` containing the aggregated values.
        """
        handler = GHSLDataHandler(
            product="GHS_BUILT_S",
            year=year,
            resolution=resolution,
            data_store=self.data_store,
            **kwargs,
        )

        return self.map_ghsl(
            handler=handler, stat=stat, output_column=output_column, **kwargs
        )

    def map_smod(
        self,
        year=2020,
        resolution=1000,
        stat: str = "median",
        output_column: str = "smod_class",
        **kwargs,
    ) -> pd.DataFrame:
        """Map GHSL Settlement Model data to zones.

        Convenience method for mapping Global Human Settlement Layer Settlement Model
        data using appropriate default parameters for settlement classification analysis.

        Args:
            year: The year of the data (default: 2020)
            resolution: The resolution in meters (default: 1000)
            stat (str): Statistic to calculate for settlement class values within each zone.
                Defaults to "median" which gives the predominant settlement class.
            output_column (str): The output column name. Defaults to "smod_class".
        Returns:
            pd.DataFrame: Updated view DataFrame and settlement classification.
                Adds a column with `output_column` containing the aggregated values.
        """
        handler = GHSLDataHandler(
            product="GHS_SMOD",
            year=year,
            resolution=resolution,
            data_store=self.data_store,
            coord_system=54009,
            **kwargs,
        )

        return self.map_ghsl(
            handler=handler, stat=stat, output_column=output_column, **kwargs
        )

    def map_ghsl(
        self,
        handler: GHSLDataHandler,
        stat: str,
        output_column: Optional[str] = None,
        **kwargs,
    ) -> pd.DataFrame:
        """Map Global Human Settlement Layer data to zones.

        Loads and processes GHSL raster data for the intersecting tiles, then samples
        the raster values within each zone using the specified statistic.

        Args:
            hander (GHSLDataHandler): Handler for the GHSL data.
            stat (str): Statistic to calculate for raster values within each zone.
                Common options: "mean", "sum", "median", "min", "max".
            output_column (str): The output column name.
                If None, uses the GHSL product name in lowercase followed by underscore.

        Returns:
            pd.DataFrame: Updated DataFrame with GHSL metrics.
                Adds a column named as `output_column` containing the sampled values.

        Note:
            The method automatically determines which GHSL tiles intersect with the zones
            and loads only the necessary data for efficient processing.
        """
        coord_system = kwargs.pop("coord_system", None)
        if not coord_system:
            coord_system = 4326 if self.zone_data_crs == "EPSG:4326" else 54009

        handler = handler or GHSLDataHandler(
            data_store=self.data_store, coord_system=coord_system, **kwargs
        )
        self.logger.info(
            f"Mapping {handler.config.product} data (year: {handler.config.year}, resolution: {handler.config.resolution}m)"
        )
        tif_processors = handler.load_data(
            self.zone_gdf,
            ensure_available=self.config.ensure_available,
            merge_rasters=True,
            **kwargs,
        )

        self.logger.info(
            f"Sampling {handler.config.product} data using '{stat}' statistic"
        )
        sampled_values = self.map_rasters(raster_data=tif_processors, stat=stat)

        column_name = (
            output_column
            if output_column
            else f"{handler.config.product.lower()}_{stat}"
        )

        self.add_variable_to_view(sampled_values, column_name)

        return self.view

    def map_google_buildings(
        self,
        handler: Optional[GoogleOpenBuildingsHandler] = None,
        use_polygons: bool = False,
        **kwargs,
    ) -> pd.DataFrame:
        """Map Google Open Buildings data to zones.

        Processes Google Open Buildings dataset to calculate building counts and total
        building area within each zone. Can use either point centroids (faster) or
        polygon geometries (more accurate) for spatial operations.

        Args:
            google_open_buildings_config (GoogleOpenBuildingsConfig): Configuration
                for accessing Google Open Buildings data. Uses default configuration if not provided.
            use_polygons (bool): Whether to use polygon geometries for buildings.
                If True, uses actual building polygons for more accurate area calculations
                but with slower performance. If False, uses building centroids with
                area values from attributes for faster processing. Defaults to False.

        Returns:
            pd.DataFrame: Updated DataFrame with building metrics.
                Adds columns:
                - 'google_buildings_count': Number of buildings in each zone
                - 'google_buildings_area_in_meters': Total building area in square meters

        Note:
            If no Google Buildings data is found for the zones, returns the original
            GeoDataFrame unchanged with a warning logged.
        """
        self.logger.info(
            f"Mapping Google Open Buildings data (use_polygons={use_polygons})"
        )

        self.logger.info("Loading Google Buildings point data")
        handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)
        buildings_df = handler.load_points(
            self.zone_gdf, ensure_available=self.config.ensure_available, **kwargs
        )

        if buildings_df.empty:
            self.logger.warning("No Google buildings data found for the provided zones")
            return self._zone_gdf.copy()

        if not use_polygons:
            self.logger.info("Aggregating building data using points with attributes")
            result = self.map_points(
                points=buildings_df,
                value_columns=["full_plus_code", "area_in_meters"],
                aggregation={"full_plus_code": "count", "area_in_meters": "sum"},
                predicate="within",
            )

            count_result = result["full_plus_code"]
            area_result = result["area_in_meters"]

        else:
            self.logger.info(
                "Loading Google Buildings polygon data for more accurate mapping"
            )
            buildings_gdf = handler.load_polygons(
                self.zone_gdf, ensure_available=self.config.ensure_available, **kwargs
            )

            self.logger.info(
                "Calculating building areas with area-weighted aggregation"
            )
            area_result = self.map_polygons(
                buildings_gdf,
                value_columns="area_in_meters",
                aggregation="sum",
                predicate="fractional",
            )

            self.logger.info("Counting buildings using points data")
            count_result = self.map_points(points=buildings_df, predicate="within")

        self.add_variable_to_view(count_result, "google_buildings_count")
        self.add_variable_to_view(area_result, "google_buildings_area_in_meters")

        return self.view

    def map_ms_buildings(
        self,
        handler: Optional[MSBuildingsHandler] = None,
        use_polygons: bool = False,
    ) -> gpd.GeoDataFrame:
        """Map Microsoft Global Buildings data to zones.

        Processes Microsoft Global Buildings dataset to calculate building counts and
        total building area within each zone. Can use either centroid points (faster)
        or polygon geometries (more accurate) for spatial operations.

        Args:
            ms_buildings_config (MSBuildingsConfig, optional): Configuration for
                accessing Microsoft Global Buildings data. If None, uses default configuration.
            use_polygons (bool): Whether to use polygon geometries for buildings.
                If True, uses actual building polygons for more accurate area calculations
                but with slower performance. If False, uses building centroids with
                area values from attributes for faster processing. Defaults to False.

        Returns:
            gpd.GeoDataFrame: Updated GeoDataFrame with zones and building metrics.
                Adds columns:
                - 'ms_buildings_count': Number of buildings in each zone
                - 'ms_buildings_area_in_meters': Total building area in square meters

        Note:
            If no Microsoft Buildings data is found for the zones, returns the original
            GeoDataFrame unchanged with a warning logged. Building areas are calculated
            in meters using appropriate UTM projections.
        """
        self.logger.info("Mapping Microsoft Global Buildings data")

        self.logger.info("Loading Microsoft Buildings polygon data")
        handler = MSBuildingsHandler(data_store=self.data_store)
        buildings_gdf = handler.load_data(
            self.zone_gdf, ensure_available=self.config.ensure_available
        )

        # Check if we found any buildings
        if buildings_gdf.empty:
            self.logger.warning(
                "No Microsoft buildings data found for the provided zones"
            )
            return self._zone_gdf.copy()

        buildings_gdf = add_area_in_meters(
            buildings_gdf, area_column_name="area_in_meters"
        )

        building_centroids = get_centroids(buildings_gdf)

        if not use_polygons:
            self.logger.info("Aggregating building data using points with attributes")

            result = self.map_points(
                points=building_centroids,
                value_columns=["type", "area_in_meters"],
                aggregation={"type": "count", "area_in_meters": "sum"},
                predicate="within",
            )

            count_result = result["type"]
            area_result = result["area_in_meters"]
        else:

            self.logger.info(
                "Calculating building areas with area-weighted aggregation"
            )
            area_result = self.map_polygons(
                buildings_gdf,
                value_columns="area_in_meters",
                aggregation="sum",
                predicate="fractional",
            )

            self.logger.info("Counting Microsoft buildings per zone")

            count_result = self.map_points(
                points=building_centroids, predicate="within"
            )

        self.add_variable_to_view(count_result, "ms_buildings_count")
        self.add_variable_to_view(area_result, "ms_buildings_area_in_meters")

        return self.view

    def map_buildings(
        self,
        country: str,
        source_filter: Literal["google", "microsoft"] = None,
        output_column: str = "building_count",
        **kwargs,
    ):
        """Map Google-Microsoft combined buildings data to zones.

        Efficiently counts buildings within each zone by leveraging spatial indexing
        and S2 grid partitioning. For partitioned datasets, only loads building tiles
        that intersect with zones, significantly improving performance for large countries.

        The method uses Shapely's STRtree for fast spatial queries, enabling efficient
        intersection testing between millions of buildings and zone geometries.

        Parameters
        ----------
        country : str
            ISO 3166-1 alpha-3 country code (e.g., 'USA', 'BRA', 'IND').
            Must match the country codes used in the building dataset.

        source_filter : {'google', 'microsoft'}, optional
            Filter buildings by data source. Options:
            - 'google': Only count buildings from Google Open Buildings
            - 'microsoft': Only count buildings from Microsoft Global Buildings
            - None (default): Count buildings from both sources

        output_column : str, default='building_count'
            Name of the column to add to the view containing building counts.
            Must be a valid pandas column name.

        **kwargs : dict
            Additional keyword arguments passed to GoogleMSBuildingsHandler.
            Common options include data versioning or custom data paths.

        Returns
        -------
        pd.DataFrame
            Updated view DataFrame with building counts added.
            The DataFrame includes all original zone columns plus the new
            output_column containing integer counts of buildings per zone.

        Notes
        -----
        Algorithm Overview:
        1. **Single-file countries**: Processes all zones against the entire dataset
        in one pass (optimal for small countries or non-partitioned data).

        2. **Partitioned countries**: Uses S2 grid spatial index to:
        - Identify which S2 cells intersect with zones
        - Load only intersecting building tiles (not entire country)
        - Process each tile independently for memory efficiency
        - Accumulate counts across tiles

        Performance Characteristics:
        - For partitioned data with N zones and M S2 tiles:
        * Only loads ~sqrt(M) tiles that intersect zones (huge savings)
        * Uses STRtree with O(log k) query time per zone
        * Memory usage: O(max_buildings_per_tile + N)

        - For single-file data:
        * Loads entire building dataset once
        * Single STRtree query for all zones (vectorized)

        The spatial query uses the 'intersects' predicate, meaning a building is
        counted if its polygon boundary touches or overlaps with a zone's boundary.
        Buildings on zone borders may be counted in multiple zones.

        Examples
        --------
        Count all buildings in H3 hexagons across the USA:

        >>> h3_zones = H3ViewGenerator(resolution=8, bbox=usa_bbox)
        >>> h3_zones.map_buildings("USA")
        >>> print(h3_zones.view[['zone_id', 'building_count']].head())
        zone_id  building_count
        0  88...01            1250
        1  88...02             890
        2  88...03               0

        Count only Google buildings with custom column name:

        >>> zones.map_buildings(
        ...     "BRA",
        ...     source_filter="google",
        ...     output_column="google_building_count"
        ... )

        Compare building counts from different sources:

        >>> zones.map_buildings("IND", source_filter="google",
        ...                     output_column="google_buildings")
        >>> zones.map_buildings("IND", source_filter="microsoft",
        ...                     output_column="ms_buildings")
        >>> zones.view['total_buildings'] = (
        ...     zones.view['google_buildings'] + zones.view['ms_buildings']
        ... )

        See Also
        --------
        map_google_buildings : Map Google Open Buildings data only
        map_ms_buildings : Map Microsoft Global Buildings data only
        GoogleMSBuildingsHandler : Handler for combined building datasets

        References
        ----------
        .. [1] Google Open Buildings: https://sites.research.google/open-buildings/
        .. [2] Microsoft Global Buildings: https://github.com/microsoft/GlobalMLBuildingFootprints
        """

        self.logger.info(f"Mapping Google-Microsoft Buildings data to zones")

        from gigaspatial.handlers.google_ms_combined_buildings import (
            GoogleMSBuildingsHandler,
        )

        handler = GoogleMSBuildingsHandler(partition_strategy="s2_grid")

        if self.config.ensure_available:
            if not handler.ensure_data_available(country, **kwargs):
                raise RuntimeError("Could not ensure data availability for loading")

        building_files = handler.reader.resolve_source_paths(country, **kwargs)

        result = GoogleMSBuildingsEngine.count_buildings_in_zones(
            handler=handler,
            building_files=building_files,
            zones_gdf=self.zone_gdf,
            source_filter=source_filter,
            logger=self.logger,
        )

        # Log summary
        total_buildings = result.counts.sum()
        zones_with_buildings = (result.counts > 0).sum()
        self.logger.info(
            f"Mapping complete: {total_buildings:,.0f} buildings across "
            f"{zones_with_buildings}/{len(result.counts)} zones"
        )

        # Update the view and return
        self.add_variable_to_view(result.counts, output_column)
        return self.view

    def map_ghsl_pop(
        self,
        resolution=100,
        stat: str = "sum",
        output_column: str = "ghsl_pop",
        predicate: Literal["intersects", "fractional"] = "intersects",
        **kwargs,
    ):
        handler = GHSLDataHandler(
            product="GHS_POP",
            resolution=resolution,
            data_store=self.data_store,
            **kwargs,
        )

        if predicate == "fractional":
            if resolution == 100:
                self.logger.warning(
                    "Fractional aggregations only supported for datasets with 1000m resolution. Using `intersects` as predicate"
                )
                predicate = "intersects"
            else:
                gdf_pop = handler.load_into_geodataframe(self.zone_gdf, **kwargs)

                result = self.map_polygons(
                    gdf_pop,
                    value_columns="pixel_value",
                    aggregation="sum",
                    predicate="fractional",
                )

                self.add_variable_to_view(result, output_column)
                return self.view

        return self.map_ghsl(
            handler=handler, stat=stat, output_column=output_column, **kwargs
        )

    def map_wp_pop(
        self,
        country: Union[str, List[str]],
        resolution=1000,
        predicate: Literal[
            "centroid_within", "intersects", "fractional"
        ] = "intersects",
        output_column: str = "population",
        **kwargs,
    ):

        # Ensure country is always a list for consistent handling
        countries_list = [country] if isinstance(country, str) else country

        handler = WPPopulationHandler(
            resolution=resolution,
            data_store=self.data_store,
            **kwargs,
        )

        # Restrict to single country for age_structures project
        if handler.config.project == "age_structures" and len(countries_list) > 1:
            raise ValueError(
                "For 'age_structures' project, only a single country can be processed at a time."
            )

        self.logger.info(
            f"Mapping WorldPop Population data (year: {handler.config.year}, resolution: {handler.config.resolution}m)"
        )

        if predicate == "fractional" and resolution == 100:
            self.logger.warning(
                "Fractional aggregations only supported for datasets with 1000m resolution. Using `intersects` as predicate"
            )
            predicate = "intersects"

        if predicate == "centroid_within":
            if handler.config.project == "age_structures":
                # Load individual tif processors for the single country
                all_tif_processors = handler.load_data(
                    countries_list[0],
                    ensure_available=self.config.ensure_available,
                    **kwargs,
                )

                # Sum results from each tif_processor separately
                all_results_by_zone = {
                    zone_id: 0 for zone_id in self.get_zone_identifiers()
                }
                self.logger.info(
                    f"Sampling individual age_structures rasters using 'sum' statistic and summing per zone."
                )
                for tif_processor in all_tif_processors:
                    single_raster_result = self.map_rasters(
                        raster_data=tif_processor, stat="sum"
                    )
                    for zone_id, value in single_raster_result.items():
                        all_results_by_zone[zone_id] += value
                result = all_results_by_zone
            else:
                # Existing behavior for non-age_structures projects or if merging is fine
                tif_processors = []
                for c in countries_list:
                    tif_processors.extend(
                        handler.load_data(
                            c,
                            ensure_available=self.config.ensure_available,
                            **kwargs,
                        )
                    )
                self.logger.info(
                    f"Sampling WorldPop Population data using 'sum' statistic"
                )
                result = self.map_rasters(raster_data=tif_processors, stat="sum")
        else:
            gdf_pop = pd.concat(
                [
                    handler.load_into_geodataframe(
                        c,
                        ensure_available=self.config.ensure_available,
                        **kwargs,
                    )
                    for c in countries_list
                ],
                ignore_index=True,
            )

            self.logger.info(f"Aggregating WorldPop Population data to the zones.")
            result = self.map_polygons(
                gdf_pop,
                value_columns="pixel_value",
                aggregation="sum",
                predicate=predicate,
            )

        self.add_variable_to_view(result, output_column)

        return self.view
zone_gdf: gpd.GeoDataFrame property

Override the base class zone_gdf property to ensure correct column names.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns.

__init__(zone_data, zone_id_column='zone_id', zone_data_crs='EPSG:4326', config=None, data_store=None, logger=None)

Initialize with zone geometries and identifiers.

Parameters:

Name Type Description Default
zone_data Union[Dict[T, Polygon], GeoDataFrame]

Zone definitions. Either a dictionary mapping zone identifiers to Polygon/MultiPolygon geometries, or a GeoDataFrame with geometries and a zone identifier column.

required
zone_id_column str

Name of the column containing zone identifiers. Only used if zone_data is a GeoDataFrame. Defaults to "zone_id".

'zone_id'
zone_data_crs str

Coordinate reference system of the zone data. Defaults to "EPSG:4326" (WGS84).

'EPSG:4326'
config ZonalViewGeneratorConfig

Generator configuration. If None, uses default configuration.

None
data_store DataStore

Data store for accessing input data. If None, uses LocalDataStore.

None

Raises:

Type Description
TypeError

If zone_data is not a dictionary or GeoDataFrame, or if dictionary values are not Polygon/MultiPolygon geometries.

ValueError

If zone_id_column is not found in GeoDataFrame, or if the provided CRS doesn't match the GeoDataFrame's CRS.

Source code in gigaspatial/generators/zonal/geometry.py
def __init__(
    self,
    zone_data: Union[Dict[T, Polygon], gpd.GeoDataFrame],
    zone_id_column: str = "zone_id",
    zone_data_crs: str = "EPSG:4326",
    config: Optional[ZonalViewGeneratorConfig] = None,
    data_store: Optional[DataStore] = None,
    logger: logging.Logger = None,
):
    """Initialize with zone geometries and identifiers.

    Args:
        zone_data (Union[Dict[T, Polygon], gpd.GeoDataFrame]): Zone definitions.
            Either a dictionary mapping zone identifiers to Polygon/MultiPolygon geometries,
            or a GeoDataFrame with geometries and a zone identifier column.
        zone_id_column (str): Name of the column containing zone identifiers.
            Only used if zone_data is a GeoDataFrame. Defaults to "zone_id".
        zone_data_crs (str): Coordinate reference system of the zone data.
            Defaults to "EPSG:4326" (WGS84).
        config (ZonalViewGeneratorConfig, optional): Generator configuration.
            If None, uses default configuration.
        data_store (DataStore, optional): Data store for accessing input data.
            If None, uses LocalDataStore.

    Raises:
        TypeError: If zone_data is not a dictionary or GeoDataFrame, or if dictionary
            values are not Polygon/MultiPolygon geometries.
        ValueError: If zone_id_column is not found in GeoDataFrame, or if the provided
            CRS doesn't match the GeoDataFrame's CRS.
    """
    super().__init__(config=config, data_store=data_store, logger=logger)

    self.zone_id_column = zone_id_column
    self.zone_data_crs = zone_data_crs

    # Store zone data based on input type
    if isinstance(zone_data, dict):
        for zone_id, geom in zone_data.items():
            if not isinstance(geom, (Polygon, MultiPolygon)):
                raise TypeError(
                    f"Zone {zone_id}: Expected (Multi)Polygon, got {type(geom).__name__}"
                )

        # Store the original dictionary
        self.zone_dict = zone_data

        # Also create a GeoDataFrame for consistent access
        self._zone_gdf = gpd.GeoDataFrame(
            {
                "zone_id": list(zone_data.keys()),
                "geometry": list(zone_data.values()),
            },
            crs=zone_data_crs,
        )
        self.zone_id_column = "zone_id"
    else:
        if not isinstance(zone_data, gpd.GeoDataFrame):
            raise TypeError(
                "zone_data must be either a Dict[T, Polygon] or a GeoDataFrame"
            )

        if zone_id_column not in zone_data.columns:
            raise ValueError(
                f"Zone ID column '{zone_id_column}' not found in GeoDataFrame"
            )

        if zone_data_crs != zone_data.crs:
            raise ValueError(
                f"Provided data crs '{zone_data_crs}' does not match to the crs of the data '{zone_data.crs}'"
            )

        # Store the GeoDataFrame
        self._zone_gdf = zone_data.rename(columns={zone_id_column: "zone_id"})

        # Also create a dictionary for fast lookups
        self.zone_dict = dict(zip(zone_data[zone_id_column], zone_data.geometry))
get_zonal_geometries()

Get the geometry of each zone.

Returns:

Type Description
List[Polygon]

List[Polygon]: A list of zone geometries in the order they appear in the underlying GeoDataFrame.

Source code in gigaspatial/generators/zonal/geometry.py
def get_zonal_geometries(self) -> List[Polygon]:
    """Get the geometry of each zone.

    Returns:
        List[Polygon]: A list of zone geometries in the order they appear in the
            underlying GeoDataFrame.
    """
    return self._zone_gdf.geometry.tolist()
get_zone_geodataframe()

Convert zones to a GeoDataFrame with standardized column names.

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns. The zone_id column is renamed from the original zone_id_column if different.

Source code in gigaspatial/generators/zonal/geometry.py
def get_zone_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert zones to a GeoDataFrame with standardized column names.

    Returns:
        gpd.GeoDataFrame: A GeoDataFrame with 'zone_id' and 'geometry' columns.
            The zone_id column is renamed from the original zone_id_column if different.
    """
    # Since _zone_gdf is already created with 'zone_id' column in the constructor,
    # we just need to return a copy of it
    return self._zone_gdf.copy()
get_zone_identifiers()

Get the identifier for each zone.

Returns:

Type Description
List[T]

List[T]: A list of zone identifiers in the order they appear in the underlying GeoDataFrame.

Source code in gigaspatial/generators/zonal/geometry.py
def get_zone_identifiers(self) -> List[T]:
    """Get the identifier for each zone.

    Returns:
        List[T]: A list of zone identifiers in the order they appear in the
            underlying GeoDataFrame.
    """
    return self._zone_gdf.zone_id.tolist()
map_buildings(country, source_filter=None, output_column='building_count', **kwargs)

Map Google-Microsoft combined buildings data to zones.

Efficiently counts buildings within each zone by leveraging spatial indexing and S2 grid partitioning. For partitioned datasets, only loads building tiles that intersect with zones, significantly improving performance for large countries.

The method uses Shapely's STRtree for fast spatial queries, enabling efficient intersection testing between millions of buildings and zone geometries.

Parameters

country : str ISO 3166-1 alpha-3 country code (e.g., 'USA', 'BRA', 'IND'). Must match the country codes used in the building dataset.

{'google', 'microsoft'}, optional

Filter buildings by data source. Options: - 'google': Only count buildings from Google Open Buildings - 'microsoft': Only count buildings from Microsoft Global Buildings - None (default): Count buildings from both sources

str, default='building_count'

Name of the column to add to the view containing building counts. Must be a valid pandas column name.

**kwargs : dict Additional keyword arguments passed to GoogleMSBuildingsHandler. Common options include data versioning or custom data paths.

Returns

pd.DataFrame Updated view DataFrame with building counts added. The DataFrame includes all original zone columns plus the new output_column containing integer counts of buildings per zone.

Notes

Algorithm Overview: 1. Single-file countries: Processes all zones against the entire dataset in one pass (optimal for small countries or non-partitioned data).

  1. Partitioned countries: Uses S2 grid spatial index to:
  2. Identify which S2 cells intersect with zones
  3. Load only intersecting building tiles (not entire country)
  4. Process each tile independently for memory efficiency
  5. Accumulate counts across tiles

Performance Characteristics: - For partitioned data with N zones and M S2 tiles: * Only loads ~sqrt(M) tiles that intersect zones (huge savings) * Uses STRtree with O(log k) query time per zone * Memory usage: O(max_buildings_per_tile + N)

  • For single-file data:
  • Loads entire building dataset once
  • Single STRtree query for all zones (vectorized)

The spatial query uses the 'intersects' predicate, meaning a building is counted if its polygon boundary touches or overlaps with a zone's boundary. Buildings on zone borders may be counted in multiple zones.

Examples

Count all buildings in H3 hexagons across the USA:

h3_zones = H3ViewGenerator(resolution=8, bbox=usa_bbox) h3_zones.map_buildings("USA") print(h3_zones.view[['zone_id', 'building_count']].head()) zone_id building_count 0 88...01 1250 1 88...02 890 2 88...03 0

Count only Google buildings with custom column name:

zones.map_buildings( ... "BRA", ... source_filter="google", ... output_column="google_building_count" ... )

Compare building counts from different sources:

zones.map_buildings("IND", source_filter="google", ... output_column="google_buildings") zones.map_buildings("IND", source_filter="microsoft", ... output_column="ms_buildings") zones.view['total_buildings'] = ( ... zones.view['google_buildings'] + zones.view['ms_buildings'] ... )

See Also

map_google_buildings : Map Google Open Buildings data only map_ms_buildings : Map Microsoft Global Buildings data only GoogleMSBuildingsHandler : Handler for combined building datasets

References

.. [1] Google Open Buildings: https://sites.research.google/open-buildings/ .. [2] Microsoft Global Buildings: https://github.com/microsoft/GlobalMLBuildingFootprints

Source code in gigaspatial/generators/zonal/geometry.py
def map_buildings(
    self,
    country: str,
    source_filter: Literal["google", "microsoft"] = None,
    output_column: str = "building_count",
    **kwargs,
):
    """Map Google-Microsoft combined buildings data to zones.

    Efficiently counts buildings within each zone by leveraging spatial indexing
    and S2 grid partitioning. For partitioned datasets, only loads building tiles
    that intersect with zones, significantly improving performance for large countries.

    The method uses Shapely's STRtree for fast spatial queries, enabling efficient
    intersection testing between millions of buildings and zone geometries.

    Parameters
    ----------
    country : str
        ISO 3166-1 alpha-3 country code (e.g., 'USA', 'BRA', 'IND').
        Must match the country codes used in the building dataset.

    source_filter : {'google', 'microsoft'}, optional
        Filter buildings by data source. Options:
        - 'google': Only count buildings from Google Open Buildings
        - 'microsoft': Only count buildings from Microsoft Global Buildings
        - None (default): Count buildings from both sources

    output_column : str, default='building_count'
        Name of the column to add to the view containing building counts.
        Must be a valid pandas column name.

    **kwargs : dict
        Additional keyword arguments passed to GoogleMSBuildingsHandler.
        Common options include data versioning or custom data paths.

    Returns
    -------
    pd.DataFrame
        Updated view DataFrame with building counts added.
        The DataFrame includes all original zone columns plus the new
        output_column containing integer counts of buildings per zone.

    Notes
    -----
    Algorithm Overview:
    1. **Single-file countries**: Processes all zones against the entire dataset
    in one pass (optimal for small countries or non-partitioned data).

    2. **Partitioned countries**: Uses S2 grid spatial index to:
    - Identify which S2 cells intersect with zones
    - Load only intersecting building tiles (not entire country)
    - Process each tile independently for memory efficiency
    - Accumulate counts across tiles

    Performance Characteristics:
    - For partitioned data with N zones and M S2 tiles:
    * Only loads ~sqrt(M) tiles that intersect zones (huge savings)
    * Uses STRtree with O(log k) query time per zone
    * Memory usage: O(max_buildings_per_tile + N)

    - For single-file data:
    * Loads entire building dataset once
    * Single STRtree query for all zones (vectorized)

    The spatial query uses the 'intersects' predicate, meaning a building is
    counted if its polygon boundary touches or overlaps with a zone's boundary.
    Buildings on zone borders may be counted in multiple zones.

    Examples
    --------
    Count all buildings in H3 hexagons across the USA:

    >>> h3_zones = H3ViewGenerator(resolution=8, bbox=usa_bbox)
    >>> h3_zones.map_buildings("USA")
    >>> print(h3_zones.view[['zone_id', 'building_count']].head())
    zone_id  building_count
    0  88...01            1250
    1  88...02             890
    2  88...03               0

    Count only Google buildings with custom column name:

    >>> zones.map_buildings(
    ...     "BRA",
    ...     source_filter="google",
    ...     output_column="google_building_count"
    ... )

    Compare building counts from different sources:

    >>> zones.map_buildings("IND", source_filter="google",
    ...                     output_column="google_buildings")
    >>> zones.map_buildings("IND", source_filter="microsoft",
    ...                     output_column="ms_buildings")
    >>> zones.view['total_buildings'] = (
    ...     zones.view['google_buildings'] + zones.view['ms_buildings']
    ... )

    See Also
    --------
    map_google_buildings : Map Google Open Buildings data only
    map_ms_buildings : Map Microsoft Global Buildings data only
    GoogleMSBuildingsHandler : Handler for combined building datasets

    References
    ----------
    .. [1] Google Open Buildings: https://sites.research.google/open-buildings/
    .. [2] Microsoft Global Buildings: https://github.com/microsoft/GlobalMLBuildingFootprints
    """

    self.logger.info(f"Mapping Google-Microsoft Buildings data to zones")

    from gigaspatial.handlers.google_ms_combined_buildings import (
        GoogleMSBuildingsHandler,
    )

    handler = GoogleMSBuildingsHandler(partition_strategy="s2_grid")

    if self.config.ensure_available:
        if not handler.ensure_data_available(country, **kwargs):
            raise RuntimeError("Could not ensure data availability for loading")

    building_files = handler.reader.resolve_source_paths(country, **kwargs)

    result = GoogleMSBuildingsEngine.count_buildings_in_zones(
        handler=handler,
        building_files=building_files,
        zones_gdf=self.zone_gdf,
        source_filter=source_filter,
        logger=self.logger,
    )

    # Log summary
    total_buildings = result.counts.sum()
    zones_with_buildings = (result.counts > 0).sum()
    self.logger.info(
        f"Mapping complete: {total_buildings:,.0f} buildings across "
        f"{zones_with_buildings}/{len(result.counts)} zones"
    )

    # Update the view and return
    self.add_variable_to_view(result.counts, output_column)
    return self.view
map_built_s(year=2020, resolution=100, stat='sum', output_column='built_surface_m2', **kwargs)

Map GHSL Built-up Surface data to zones.

Convenience method for mapping Global Human Settlement Layer Built-up Surface data using appropriate default parameters for built surface analysis.

Parameters:

Name Type Description Default
year

The year of the data (default: 2020)

2020
resolution

The resolution in meters (default: 100)

100
stat str

Statistic to calculate for built surface values within each zone. Defaults to "sum" which gives total built surface area.

'sum'
output_column str

The output column name. Defaults to "built_surface_m2".

'built_surface_m2'
Source code in gigaspatial/generators/zonal/geometry.py
def map_built_s(
    self,
    year=2020,
    resolution=100,
    stat: str = "sum",
    output_column: str = "built_surface_m2",
    **kwargs,
) -> pd.DataFrame:
    """Map GHSL Built-up Surface data to zones.

    Convenience method for mapping Global Human Settlement Layer Built-up Surface
    data using appropriate default parameters for built surface analysis.

    Args:
        year: The year of the data (default: 2020)
        resolution: The resolution in meters (default: 100)
        stat (str): Statistic to calculate for built surface values within each zone.
            Defaults to "sum" which gives total built surface area.
        output_column (str): The output column name. Defaults to "built_surface_m2".
    Returns:
        pd.DataFrame: Updated view DataFrame and settlement classification.
            Adds a column with `output_column` containing the aggregated values.
    """
    handler = GHSLDataHandler(
        product="GHS_BUILT_S",
        year=year,
        resolution=resolution,
        data_store=self.data_store,
        **kwargs,
    )

    return self.map_ghsl(
        handler=handler, stat=stat, output_column=output_column, **kwargs
    )
map_ghsl(handler, stat, output_column=None, **kwargs)

Map Global Human Settlement Layer data to zones.

Loads and processes GHSL raster data for the intersecting tiles, then samples the raster values within each zone using the specified statistic.

Parameters:

Name Type Description Default
hander GHSLDataHandler

Handler for the GHSL data.

required
stat str

Statistic to calculate for raster values within each zone. Common options: "mean", "sum", "median", "min", "max".

required
output_column str

The output column name. If None, uses the GHSL product name in lowercase followed by underscore.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with GHSL metrics. Adds a column named as output_column containing the sampled values.

Note

The method automatically determines which GHSL tiles intersect with the zones and loads only the necessary data for efficient processing.

Source code in gigaspatial/generators/zonal/geometry.py
def map_ghsl(
    self,
    handler: GHSLDataHandler,
    stat: str,
    output_column: Optional[str] = None,
    **kwargs,
) -> pd.DataFrame:
    """Map Global Human Settlement Layer data to zones.

    Loads and processes GHSL raster data for the intersecting tiles, then samples
    the raster values within each zone using the specified statistic.

    Args:
        hander (GHSLDataHandler): Handler for the GHSL data.
        stat (str): Statistic to calculate for raster values within each zone.
            Common options: "mean", "sum", "median", "min", "max".
        output_column (str): The output column name.
            If None, uses the GHSL product name in lowercase followed by underscore.

    Returns:
        pd.DataFrame: Updated DataFrame with GHSL metrics.
            Adds a column named as `output_column` containing the sampled values.

    Note:
        The method automatically determines which GHSL tiles intersect with the zones
        and loads only the necessary data for efficient processing.
    """
    coord_system = kwargs.pop("coord_system", None)
    if not coord_system:
        coord_system = 4326 if self.zone_data_crs == "EPSG:4326" else 54009

    handler = handler or GHSLDataHandler(
        data_store=self.data_store, coord_system=coord_system, **kwargs
    )
    self.logger.info(
        f"Mapping {handler.config.product} data (year: {handler.config.year}, resolution: {handler.config.resolution}m)"
    )
    tif_processors = handler.load_data(
        self.zone_gdf,
        ensure_available=self.config.ensure_available,
        merge_rasters=True,
        **kwargs,
    )

    self.logger.info(
        f"Sampling {handler.config.product} data using '{stat}' statistic"
    )
    sampled_values = self.map_rasters(raster_data=tif_processors, stat=stat)

    column_name = (
        output_column
        if output_column
        else f"{handler.config.product.lower()}_{stat}"
    )

    self.add_variable_to_view(sampled_values, column_name)

    return self.view
map_google_buildings(handler=None, use_polygons=False, **kwargs)

Map Google Open Buildings data to zones.

Processes Google Open Buildings dataset to calculate building counts and total building area within each zone. Can use either point centroids (faster) or polygon geometries (more accurate) for spatial operations.

Parameters:

Name Type Description Default
google_open_buildings_config GoogleOpenBuildingsConfig

Configuration for accessing Google Open Buildings data. Uses default configuration if not provided.

required
use_polygons bool

Whether to use polygon geometries for buildings. If True, uses actual building polygons for more accurate area calculations but with slower performance. If False, uses building centroids with area values from attributes for faster processing. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with building metrics. Adds columns: - 'google_buildings_count': Number of buildings in each zone - 'google_buildings_area_in_meters': Total building area in square meters

Note

If no Google Buildings data is found for the zones, returns the original GeoDataFrame unchanged with a warning logged.

Source code in gigaspatial/generators/zonal/geometry.py
def map_google_buildings(
    self,
    handler: Optional[GoogleOpenBuildingsHandler] = None,
    use_polygons: bool = False,
    **kwargs,
) -> pd.DataFrame:
    """Map Google Open Buildings data to zones.

    Processes Google Open Buildings dataset to calculate building counts and total
    building area within each zone. Can use either point centroids (faster) or
    polygon geometries (more accurate) for spatial operations.

    Args:
        google_open_buildings_config (GoogleOpenBuildingsConfig): Configuration
            for accessing Google Open Buildings data. Uses default configuration if not provided.
        use_polygons (bool): Whether to use polygon geometries for buildings.
            If True, uses actual building polygons for more accurate area calculations
            but with slower performance. If False, uses building centroids with
            area values from attributes for faster processing. Defaults to False.

    Returns:
        pd.DataFrame: Updated DataFrame with building metrics.
            Adds columns:
            - 'google_buildings_count': Number of buildings in each zone
            - 'google_buildings_area_in_meters': Total building area in square meters

    Note:
        If no Google Buildings data is found for the zones, returns the original
        GeoDataFrame unchanged with a warning logged.
    """
    self.logger.info(
        f"Mapping Google Open Buildings data (use_polygons={use_polygons})"
    )

    self.logger.info("Loading Google Buildings point data")
    handler = handler or GoogleOpenBuildingsHandler(data_store=self.data_store)
    buildings_df = handler.load_points(
        self.zone_gdf, ensure_available=self.config.ensure_available, **kwargs
    )

    if buildings_df.empty:
        self.logger.warning("No Google buildings data found for the provided zones")
        return self._zone_gdf.copy()

    if not use_polygons:
        self.logger.info("Aggregating building data using points with attributes")
        result = self.map_points(
            points=buildings_df,
            value_columns=["full_plus_code", "area_in_meters"],
            aggregation={"full_plus_code": "count", "area_in_meters": "sum"},
            predicate="within",
        )

        count_result = result["full_plus_code"]
        area_result = result["area_in_meters"]

    else:
        self.logger.info(
            "Loading Google Buildings polygon data for more accurate mapping"
        )
        buildings_gdf = handler.load_polygons(
            self.zone_gdf, ensure_available=self.config.ensure_available, **kwargs
        )

        self.logger.info(
            "Calculating building areas with area-weighted aggregation"
        )
        area_result = self.map_polygons(
            buildings_gdf,
            value_columns="area_in_meters",
            aggregation="sum",
            predicate="fractional",
        )

        self.logger.info("Counting buildings using points data")
        count_result = self.map_points(points=buildings_df, predicate="within")

    self.add_variable_to_view(count_result, "google_buildings_count")
    self.add_variable_to_view(area_result, "google_buildings_area_in_meters")

    return self.view
map_ms_buildings(handler=None, use_polygons=False)

Map Microsoft Global Buildings data to zones.

Processes Microsoft Global Buildings dataset to calculate building counts and total building area within each zone. Can use either centroid points (faster) or polygon geometries (more accurate) for spatial operations.

Parameters:

Name Type Description Default
ms_buildings_config MSBuildingsConfig

Configuration for accessing Microsoft Global Buildings data. If None, uses default configuration.

required
use_polygons bool

Whether to use polygon geometries for buildings. If True, uses actual building polygons for more accurate area calculations but with slower performance. If False, uses building centroids with area values from attributes for faster processing. Defaults to False.

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Updated GeoDataFrame with zones and building metrics. Adds columns: - 'ms_buildings_count': Number of buildings in each zone - 'ms_buildings_area_in_meters': Total building area in square meters

Note

If no Microsoft Buildings data is found for the zones, returns the original GeoDataFrame unchanged with a warning logged. Building areas are calculated in meters using appropriate UTM projections.

Source code in gigaspatial/generators/zonal/geometry.py
def map_ms_buildings(
    self,
    handler: Optional[MSBuildingsHandler] = None,
    use_polygons: bool = False,
) -> gpd.GeoDataFrame:
    """Map Microsoft Global Buildings data to zones.

    Processes Microsoft Global Buildings dataset to calculate building counts and
    total building area within each zone. Can use either centroid points (faster)
    or polygon geometries (more accurate) for spatial operations.

    Args:
        ms_buildings_config (MSBuildingsConfig, optional): Configuration for
            accessing Microsoft Global Buildings data. If None, uses default configuration.
        use_polygons (bool): Whether to use polygon geometries for buildings.
            If True, uses actual building polygons for more accurate area calculations
            but with slower performance. If False, uses building centroids with
            area values from attributes for faster processing. Defaults to False.

    Returns:
        gpd.GeoDataFrame: Updated GeoDataFrame with zones and building metrics.
            Adds columns:
            - 'ms_buildings_count': Number of buildings in each zone
            - 'ms_buildings_area_in_meters': Total building area in square meters

    Note:
        If no Microsoft Buildings data is found for the zones, returns the original
        GeoDataFrame unchanged with a warning logged. Building areas are calculated
        in meters using appropriate UTM projections.
    """
    self.logger.info("Mapping Microsoft Global Buildings data")

    self.logger.info("Loading Microsoft Buildings polygon data")
    handler = MSBuildingsHandler(data_store=self.data_store)
    buildings_gdf = handler.load_data(
        self.zone_gdf, ensure_available=self.config.ensure_available
    )

    # Check if we found any buildings
    if buildings_gdf.empty:
        self.logger.warning(
            "No Microsoft buildings data found for the provided zones"
        )
        return self._zone_gdf.copy()

    buildings_gdf = add_area_in_meters(
        buildings_gdf, area_column_name="area_in_meters"
    )

    building_centroids = get_centroids(buildings_gdf)

    if not use_polygons:
        self.logger.info("Aggregating building data using points with attributes")

        result = self.map_points(
            points=building_centroids,
            value_columns=["type", "area_in_meters"],
            aggregation={"type": "count", "area_in_meters": "sum"},
            predicate="within",
        )

        count_result = result["type"]
        area_result = result["area_in_meters"]
    else:

        self.logger.info(
            "Calculating building areas with area-weighted aggregation"
        )
        area_result = self.map_polygons(
            buildings_gdf,
            value_columns="area_in_meters",
            aggregation="sum",
            predicate="fractional",
        )

        self.logger.info("Counting Microsoft buildings per zone")

        count_result = self.map_points(
            points=building_centroids, predicate="within"
        )

    self.add_variable_to_view(count_result, "ms_buildings_count")
    self.add_variable_to_view(area_result, "ms_buildings_area_in_meters")

    return self.view
map_smod(year=2020, resolution=1000, stat='median', output_column='smod_class', **kwargs)

Map GHSL Settlement Model data to zones.

Convenience method for mapping Global Human Settlement Layer Settlement Model data using appropriate default parameters for settlement classification analysis.

Parameters:

Name Type Description Default
year

The year of the data (default: 2020)

2020
resolution

The resolution in meters (default: 1000)

1000
stat str

Statistic to calculate for settlement class values within each zone. Defaults to "median" which gives the predominant settlement class.

'median'
output_column str

The output column name. Defaults to "smod_class".

'smod_class'
Source code in gigaspatial/generators/zonal/geometry.py
def map_smod(
    self,
    year=2020,
    resolution=1000,
    stat: str = "median",
    output_column: str = "smod_class",
    **kwargs,
) -> pd.DataFrame:
    """Map GHSL Settlement Model data to zones.

    Convenience method for mapping Global Human Settlement Layer Settlement Model
    data using appropriate default parameters for settlement classification analysis.

    Args:
        year: The year of the data (default: 2020)
        resolution: The resolution in meters (default: 1000)
        stat (str): Statistic to calculate for settlement class values within each zone.
            Defaults to "median" which gives the predominant settlement class.
        output_column (str): The output column name. Defaults to "smod_class".
    Returns:
        pd.DataFrame: Updated view DataFrame and settlement classification.
            Adds a column with `output_column` containing the aggregated values.
    """
    handler = GHSLDataHandler(
        product="GHS_SMOD",
        year=year,
        resolution=resolution,
        data_store=self.data_store,
        coord_system=54009,
        **kwargs,
    )

    return self.map_ghsl(
        handler=handler, stat=stat, output_column=output_column, **kwargs
    )

h3

H3ViewGenerator

Bases: GeometryBasedZonalViewGenerator[T]

Generates zonal views using H3 hexagons as the zones.

Mirrors MercatorViewGenerator/S2ViewGenerator but uses H3 cells (resolutions 0-15). The input source defines the area/cells and resolution determines the granularity.

Supported sources: - Country string → CountryH3Hexagons.create - Shapely geometry or GeoDataFrame → H3Hexagons.from_spatial - List of points (shapely Points or (lon, lat) tuples) → from_spatial - List of H3 indexes (strings) → H3Hexagons.from_hexagons

Source code in gigaspatial/generators/zonal/h3.py
class H3ViewGenerator(GeometryBasedZonalViewGenerator[T]):
    """
    Generates zonal views using H3 hexagons as the zones.

    Mirrors `MercatorViewGenerator`/`S2ViewGenerator` but uses H3 cells
    (resolutions 0-15). The input `source` defines the area/cells and
    `resolution` determines the granularity.

    Supported sources:
    - Country string → `CountryH3Hexagons.create`
    - Shapely geometry or GeoDataFrame → `H3Hexagons.from_spatial`
    - List of points (shapely Points or (lon, lat) tuples) → `from_spatial`
    - List of H3 indexes (strings) → `H3Hexagons.from_hexagons`
    """

    def __init__(
        self,
        source: Union[
            str,  # country
            BaseGeometry,  # shapely geom
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],  # points
            List[str],  # h3 indexes
        ],
        resolution: int,
        contain: str = "overlap",
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):

        super().__init__(
            zone_data=self._init_zone_data(
                source, resolution, contain=contain, data_store=data_store
            ),
            zone_id_column="h3",
            config=config,
            data_store=data_store,
            logger=logger,
        )
        self.logger.info("Initialized H3ViewGenerator")

    def _init_zone_data(
        self,
        source,
        resolution: int,
        contain: str = "overlap",
        data_store: Optional[DataStore] = None,
    ):
        if isinstance(source, str):
            hexes = CountryH3Hexagons.create(
                country=source,
                resolution=resolution,
                contain=contain,
                data_store=data_store,
            )
            self._country = source
        elif isinstance(source, (BaseGeometry, gpd.GeoDataFrame, Iterable)):
            if isinstance(source, Iterable) and all(isinstance(h, str) for h in source):
                hexes = H3Hexagons.from_hexagons(list(source))
            else:
                hexes = H3Hexagons.from_spatial(
                    source=source, resolution=resolution, contain=contain
                )
        else:
            raise TypeError(
                "Unsupported source type for H3ViewGenerator. 'source' must be "
                "a country name (str), a Shapely geometry, a GeoDataFrame, "
                "a list of H3 indexes (str), or a list of (lon, lat) tuples/Shapely Point objects. "
                f"Received type: {type(source)}."
            )

        return hexes.to_geodataframe()

    def map_wp_pop(
        self,
        country=None,
        resolution=1000,
        predicate="intersects",
        output_column="population",
        **kwargs,
    ):
        if hasattr(self, "_country") and country is None:
            country = self._country

        return super().map_wp_pop(
            country, resolution, predicate, output_column, **kwargs
        )

mercator

MercatorViewGenerator

Bases: GeometryBasedZonalViewGenerator[T]

Generates zonal views using Mercator tiles as the zones.

This class specializes in creating zonal views where the zones are defined by Mercator tiles. It extends the GeometryBasedZonalViewGenerator and leverages the MercatorTiles and CountryMercatorTiles classes to generate tiles based on various input sources.

The primary input source defines the geographical area of interest. This can be a country, a specific geometry, a set of points, or even a list of predefined quadkeys. The zoom_level determines the granularity of the Mercator tiles.

Attributes:

Name Type Description
source Union[str, BaseGeometry, GeoDataFrame, List[Union[Point, Tuple[float, float]]], List[str]]

Specifies the geographic area or specific tiles to use. Can be: - A country name (str): Uses CountryMercatorTiles to generate tiles covering the country. - A Shapely geometry (BaseGeometry): Uses MercatorTiles.from_spatial to create tiles intersecting the geometry. - A GeoDataFrame (gpd.GeoDataFrame): Uses MercatorTiles.from_spatial to create tiles intersecting the geometries. - A list of points (List[Union[Point, Tuple[float, float]]]): Uses MercatorTiles.from_spatial to create tiles containing the points. - A list of quadkeys (List[str]): Uses MercatorTiles.from_quadkeys to use the specified tiles directly.

zoom_level int

The zoom level of the Mercator tiles. Higher zoom levels result in smaller, more detailed tiles.

predicate str

The spatial predicate used when filtering tiles based on a spatial source (e.g., "intersects", "contains"). Defaults to "intersects".

config Optional[ZonalViewGeneratorConfig]

Configuration for the zonal view generation process.

data_store Optional[DataStore]

A DataStore instance for accessing data.

logger Optional[Logger]

A logger instance for logging.

Methods:

Name Description
_init_zone_data

Initializes the Mercator tile GeoDataFrame based on the input source.

# Inherits other methods from GeometryBasedZonalViewGenerator, such as
Example
Create a MercatorViewGenerator for tiles covering Germany at zoom level 6

generator = MercatorViewGenerator(source="Germany", zoom_level=6)

Create a MercatorViewGenerator for tiles intersecting a specific polygon

polygon = ... # Define a Shapely Polygon generator = MercatorViewGenerator(source=polygon, zoom_level=8)

Create a MercatorViewGenerator from a list of quadkeys

quadkeys = ["0020023131023032", "0020023131023033"] generator = MercatorViewGenerator(source=quadkeys, zoom_level=12)

Source code in gigaspatial/generators/zonal/mercator.py
class MercatorViewGenerator(GeometryBasedZonalViewGenerator[T]):
    """
    Generates zonal views using Mercator tiles as the zones.

    This class specializes in creating zonal views where the zones are defined by
    Mercator tiles. It extends the `GeometryBasedZonalViewGenerator` and leverages
    the `MercatorTiles` and `CountryMercatorTiles` classes to generate tiles based on
    various input sources.

    The primary input source defines the geographical area of interest. This can be
    a country, a specific geometry, a set of points, or even a list of predefined
    quadkeys. The `zoom_level` determines the granularity of the Mercator tiles.

    Attributes:
        source (Union[str, BaseGeometry, gpd.GeoDataFrame, List[Union[Point, Tuple[float, float]]], List[str]]):
            Specifies the geographic area or specific tiles to use. Can be:
            - A country name (str): Uses `CountryMercatorTiles` to generate tiles covering the country.
            - A Shapely geometry (BaseGeometry):  Uses `MercatorTiles.from_spatial` to create tiles intersecting the geometry.
            - A GeoDataFrame (gpd.GeoDataFrame): Uses `MercatorTiles.from_spatial` to create tiles intersecting the geometries.
            - A list of points (List[Union[Point, Tuple[float, float]]]):  Uses `MercatorTiles.from_spatial` to create tiles containing the points.
            - A list of quadkeys (List[str]): Uses `MercatorTiles.from_quadkeys` to use the specified tiles directly.
        zoom_level (int): The zoom level of the Mercator tiles. Higher zoom levels result in smaller, more detailed tiles.
        predicate (str):  The spatial predicate used when filtering tiles based on a spatial source (e.g., "intersects", "contains"). Defaults to "intersects".
        config (Optional[ZonalViewGeneratorConfig]): Configuration for the zonal view generation process.
        data_store (Optional[DataStore]):  A DataStore instance for accessing data.
        logger (Optional[logging.Logger]):  A logger instance for logging.

    Methods:
        _init_zone_data(source, zoom_level, predicate):  Initializes the Mercator tile GeoDataFrame based on the input source.
        # Inherits other methods from GeometryBasedZonalViewGenerator, such as:
        # map_ghsl(), map_google_buildings(), map_ms_buildings(), aggregate_data(), save_view()

    Example:
        # Create a MercatorViewGenerator for tiles covering Germany at zoom level 6
        generator = MercatorViewGenerator(source="Germany", zoom_level=6)

        # Create a MercatorViewGenerator for tiles intersecting a specific polygon
        polygon = ... # Define a Shapely Polygon
        generator = MercatorViewGenerator(source=polygon, zoom_level=8)

        # Create a MercatorViewGenerator from a list of quadkeys
        quadkeys = ["0020023131023032", "0020023131023033"]
        generator = MercatorViewGenerator(source=quadkeys, zoom_level=12)
    """

    def __init__(
        self,
        source: Union[
            str,  # country
            BaseGeometry,  # shapely geom
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],  # points
            List[str],  # quadkeys
        ],
        zoom_level: int,
        predicate="intersects",
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
    ):

        super().__init__(
            zone_data=self._init_zone_data(source, zoom_level, predicate, data_store),
            zone_id_column="quadkey",
            config=config,
            data_store=data_store,
            logger=logger,
        )
        self.logger.info(f"Initialized MercatorViewGenerator")

    def _init_zone_data(self, source, zoom_level, predicate, data_store=None):
        if isinstance(source, str):
            tiles = CountryMercatorTiles.create(
                country=source, zoom_level=zoom_level, data_store=data_store
            )
            self._country = source
        elif isinstance(source, (BaseGeometry, Iterable)):
            if isinstance(source, Iterable) and all(
                isinstance(qk, str) for qk in source
            ):
                tiles = MercatorTiles.from_quadkeys(source)
            else:
                tiles = MercatorTiles.from_spatial(
                    source=source, zoom_level=zoom_level, predicate=predicate
                )
        else:
            raise TypeError(
                f"Unsupported source type for MercatorViewGenerator. 'source' must be "
                f"a country name (str), a Shapely geometry, a GeoDataFrame, "
                f"a list of quadkeys (str), or a list of (lon, lat) tuples/Shapely Point objects. "
                f"Received type: {type(source)}."
            )

        return tiles.to_geodataframe()

    def map_wp_pop(
        self,
        country=None,
        resolution=1000,
        predicate="intersects",
        output_column="population",
        **kwargs,
    ):
        if hasattr(self, "_country") and country is None:
            country = self._country

        return super().map_wp_pop(
            country, resolution, predicate, output_column, **kwargs
        )

s2

S2ViewGenerator

Bases: GeometryBasedZonalViewGenerator[T]

Generates zonal views using Google S2 cells as the zones.

This mirrors the MercatorViewGenerator but uses S2 cells (levels 0-30) as zone units. The primary input source defines the geographic area of interest and the level determines the granularity of S2 cells.

Attributes:

Name Type Description
source Union[str, BaseGeometry, GeoDataFrame, List[Union[Point, Tuple[float, float]]], List[Union[int, str]]]

Specifies the geographic area or specific cells to use. Can be: - A country name (str): Uses CountryS2Cells to generate cells covering the country. - A Shapely geometry (BaseGeometry): Uses S2Cells.from_spatial to create cells intersecting the geometry. - A GeoDataFrame (gpd.GeoDataFrame): Uses S2Cells.from_spatial to create cells intersecting geometries or from points. - A list of points (List[Union[Point, Tuple[float, float]]]): Uses S2Cells.from_points via from_spatial. - A list of cell identifiers (List[Union[int, str]]): Uses S2Cells.from_cells (accepts integer IDs or token strings).

level int

The S2 level (0-30). Higher levels produce smaller cells.

config Optional[ZonalViewGeneratorConfig]

Configuration for generation.

data_store Optional[DataStore]

Optional data store instance.

logger Optional[Logger]

Optional logger.

Source code in gigaspatial/generators/zonal/s2.py
class S2ViewGenerator(GeometryBasedZonalViewGenerator[T]):
    """
    Generates zonal views using Google S2 cells as the zones.

    This mirrors the `MercatorViewGenerator` but uses S2 cells (levels 0-30)
    as zone units. The primary input source defines the geographic area of
    interest and the `level` determines the granularity of S2 cells.

    Attributes:
        source (Union[str, BaseGeometry, gpd.GeoDataFrame, List[Union[Point, Tuple[float, float]]], List[Union[int, str]]]):
            Specifies the geographic area or specific cells to use. Can be:
            - A country name (str): Uses `CountryS2Cells` to generate cells covering the country.
            - A Shapely geometry (BaseGeometry): Uses `S2Cells.from_spatial` to create cells intersecting the geometry.
            - A GeoDataFrame (gpd.GeoDataFrame): Uses `S2Cells.from_spatial` to create cells intersecting geometries or from points.
            - A list of points (List[Union[Point, Tuple[float, float]]]): Uses `S2Cells.from_points` via `from_spatial`.
            - A list of cell identifiers (List[Union[int, str]]): Uses `S2Cells.from_cells` (accepts integer IDs or token strings).
        level (int): The S2 level (0-30). Higher levels produce smaller cells.
        config (Optional[ZonalViewGeneratorConfig]): Configuration for generation.
        data_store (Optional[DataStore]): Optional data store instance.
        logger (Optional[logging.Logger]): Optional logger.
    """

    def __init__(
        self,
        source: Union[
            str,  # country
            BaseGeometry,  # shapely geom
            gpd.GeoDataFrame,
            List[Union[Point, Tuple[float, float]]],  # points
            List[Union[int, str]],  # cell ids or tokens
        ],
        level: int,
        config: Optional[ZonalViewGeneratorConfig] = None,
        data_store: Optional[DataStore] = None,
        logger: logging.Logger = None,
        max_cells: int = 1000,
    ):

        super().__init__(
            zone_data=self._init_zone_data(
                source, level, data_store=data_store, max_cells=max_cells
            ),
            zone_id_column="cell_token",
            config=config,
            data_store=data_store,
            logger=logger,
        )
        self.logger.info("Initialized S2ViewGenerator")

    def _init_zone_data(
        self,
        source,
        level: int,
        data_store: Optional[DataStore] = None,
        max_cells: int = 1000,
    ):
        if isinstance(source, str):
            cells = CountryS2Cells.create(
                country=source, level=level, data_store=data_store, max_cells=max_cells
            )
            self._country = source
        elif isinstance(source, (BaseGeometry, gpd.GeoDataFrame, Iterable)):
            # If it's an explicit cells list of ids/tokens
            if isinstance(source, Iterable) and all(
                isinstance(c, (int, str)) for c in source
            ):
                cells = S2Cells.from_cells(list(source))
            else:
                # Spatial extraction from geometry/points/gdf
                cells = S2Cells.from_spatial(
                    source=source, level=level, max_cells=max_cells
                )
        else:
            raise TypeError(
                "Unsupported source type for S2ViewGenerator. 'source' must be "
                "a country name (str), a Shapely geometry, a GeoDataFrame, "
                "a list of S2 cell ids/tokens (int/str), or a list of (lon, lat) tuples/Shapely Point objects. "
                f"Received type: {type(source)}."
            )

        return cells.to_geodataframe()

    def map_wp_pop(
        self,
        country=None,
        resolution=1000,
        predicate="intersects",
        output_column="population",
        **kwargs,
    ):
        if hasattr(self, "_country") and country is None:
            country = self._country

        return super().map_wp_pop(
            country, resolution, predicate, output_column, **kwargs
        )