Skip to content

Processing Module

gigaspatial.processing

DataStore

Bases: ABC

Abstract base class defining the interface for data store implementations. This class serves as a parent for both local and cloud-based storage solutions.

Source code in gigaspatial/core/io/data_store.py
class DataStore(ABC):
    """
    Abstract base class defining the interface for data store implementations.
    This class serves as a parent for both local and cloud-based storage solutions.
    """

    @abstractmethod
    def read_file(self, path: str) -> Any:
        """
        Read contents of a file from the data store.

        Args:
            path: Path to the file to read

        Returns:
            Contents of the file

        Raises:
            IOError: If file cannot be read
        """
        pass

    @abstractmethod
    def write_file(self, path: str, data: Any) -> None:
        """
        Write data to a file in the data store.

        Args:
            path: Path where to write the file
            data: Data to write to the file

        Raises:
            IOError: If file cannot be written
        """
        pass

    @abstractmethod
    def file_exists(self, path: str) -> bool:
        """
        Check if a file exists in the data store.

        Args:
            path: Path to check

        Returns:
            True if file exists, False otherwise
        """
        pass

    @abstractmethod
    def list_files(self, path: str) -> List[str]:
        """
        List all files in a directory.

        Args:
            path: Directory path to list

        Returns:
            List of file paths in the directory
        """
        pass

    @abstractmethod
    def walk(self, top: str) -> Generator:
        """
        Walk through directory tree, similar to os.walk().

        Args:
            top: Starting directory for the walk

        Returns:
            Generator yielding tuples of (dirpath, dirnames, filenames)
        """
        pass

    @abstractmethod
    def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
        """
        Context manager for file operations.

        Args:
            file: Path to the file
            mode: File mode ('r', 'w', 'rb', 'wb')

        Yields:
            File-like object

        Raises:
            IOError: If file cannot be opened
        """
        pass

    @abstractmethod
    def is_file(self, path: str) -> bool:
        """
        Check if path points to a file.

        Args:
            path: Path to check

        Returns:
            True if path is a file, False otherwise
        """
        pass

    @abstractmethod
    def is_dir(self, path: str) -> bool:
        """
        Check if path points to a directory.

        Args:
            path: Path to check

        Returns:
            True if path is a directory, False otherwise
        """
        pass

    @abstractmethod
    def remove(self, path: str) -> None:
        """
        Remove a file.

        Args:
            path: Path to the file to remove

        Raises:
            IOError: If file cannot be removed
        """
        pass

    @abstractmethod
    def rmdir(self, dir: str) -> None:
        """
        Remove a directory and all its contents.

        Args:
            dir: Path to the directory to remove

        Raises:
            IOError: If directory cannot be removed
        """
        pass

file_exists(path) abstractmethod

Check if a file exists in the data store.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def file_exists(self, path: str) -> bool:
    """
    Check if a file exists in the data store.

    Args:
        path: Path to check

    Returns:
        True if file exists, False otherwise
    """
    pass

is_dir(path) abstractmethod

Check if path points to a directory.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a directory, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_dir(self, path: str) -> bool:
    """
    Check if path points to a directory.

    Args:
        path: Path to check

    Returns:
        True if path is a directory, False otherwise
    """
    pass

is_file(path) abstractmethod

Check if path points to a file.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a file, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_file(self, path: str) -> bool:
    """
    Check if path points to a file.

    Args:
        path: Path to check

    Returns:
        True if path is a file, False otherwise
    """
    pass

list_files(path) abstractmethod

List all files in a directory.

Parameters:

Name Type Description Default
path str

Directory path to list

required

Returns:

Type Description
List[str]

List of file paths in the directory

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def list_files(self, path: str) -> List[str]:
    """
    List all files in a directory.

    Args:
        path: Directory path to list

    Returns:
        List of file paths in the directory
    """
    pass

open(file, mode='r') abstractmethod

Context manager for file operations.

Parameters:

Name Type Description Default
file str

Path to the file

required
mode str

File mode ('r', 'w', 'rb', 'wb')

'r'

Yields:

Type Description
Union[str, bytes]

File-like object

Raises:

Type Description
IOError

If file cannot be opened

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
    """
    Context manager for file operations.

    Args:
        file: Path to the file
        mode: File mode ('r', 'w', 'rb', 'wb')

    Yields:
        File-like object

    Raises:
        IOError: If file cannot be opened
    """
    pass

read_file(path) abstractmethod

Read contents of a file from the data store.

Parameters:

Name Type Description Default
path str

Path to the file to read

required

Returns:

Type Description
Any

Contents of the file

Raises:

Type Description
IOError

If file cannot be read

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def read_file(self, path: str) -> Any:
    """
    Read contents of a file from the data store.

    Args:
        path: Path to the file to read

    Returns:
        Contents of the file

    Raises:
        IOError: If file cannot be read
    """
    pass

remove(path) abstractmethod

Remove a file.

Parameters:

Name Type Description Default
path str

Path to the file to remove

required

Raises:

Type Description
IOError

If file cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def remove(self, path: str) -> None:
    """
    Remove a file.

    Args:
        path: Path to the file to remove

    Raises:
        IOError: If file cannot be removed
    """
    pass

rmdir(dir) abstractmethod

Remove a directory and all its contents.

Parameters:

Name Type Description Default
dir str

Path to the directory to remove

required

Raises:

Type Description
IOError

If directory cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def rmdir(self, dir: str) -> None:
    """
    Remove a directory and all its contents.

    Args:
        dir: Path to the directory to remove

    Raises:
        IOError: If directory cannot be removed
    """
    pass

walk(top) abstractmethod

Walk through directory tree, similar to os.walk().

Parameters:

Name Type Description Default
top str

Starting directory for the walk

required

Returns:

Type Description
Generator

Generator yielding tuples of (dirpath, dirnames, filenames)

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def walk(self, top: str) -> Generator:
    """
    Walk through directory tree, similar to os.walk().

    Args:
        top: Starting directory for the walk

    Returns:
        Generator yielding tuples of (dirpath, dirnames, filenames)
    """
    pass

write_file(path, data) abstractmethod

Write data to a file in the data store.

Parameters:

Name Type Description Default
path str

Path where to write the file

required
data Any

Data to write to the file

required

Raises:

Type Description
IOError

If file cannot be written

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def write_file(self, path: str, data: Any) -> None:
    """
    Write data to a file in the data store.

    Args:
        path: Path where to write the file
        data: Data to write to the file

    Raises:
        IOError: If file cannot be written
    """
    pass

LocalDataStore

Bases: DataStore

Implementation for local filesystem storage.

Source code in gigaspatial/core/io/local_data_store.py
class LocalDataStore(DataStore):
    """Implementation for local filesystem storage."""

    def __init__(self, base_path: Union[str, Path] = ""):
        super().__init__()
        self.base_path = Path(base_path).resolve()

    def _resolve_path(self, path: str) -> Path:
        """Resolve path relative to base directory."""
        return self.base_path / path

    def read_file(self, path: str) -> bytes:
        full_path = self._resolve_path(path)
        with open(full_path, "rb") as f:
            return f.read()

    def write_file(self, path: str, data: Union[bytes, str]) -> None:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)

        if isinstance(data, str):
            mode = "w"
            encoding = "utf-8"
        else:
            mode = "wb"
            encoding = None

        with open(full_path, mode, encoding=encoding) as f:
            f.write(data)

    def file_exists(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def list_files(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)
        return [
            str(f.relative_to(self.base_path))
            for f in full_path.iterdir()
            if f.is_file()
        ]

    def walk(self, top: str) -> Generator[Tuple[str, List[str], List[str]], None, None]:
        full_path = self._resolve_path(top)
        for root, dirs, files in os.walk(full_path):
            rel_root = str(Path(root).relative_to(self.base_path))
            yield rel_root, dirs, files

    def list_directories(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)

        if not full_path.exists():
            return []

        if not full_path.is_dir():
            return []

        return [d.name for d in full_path.iterdir() if d.is_dir()]

    def open(self, path: str, mode: str = "r") -> IO:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)
        return open(full_path, mode)

    def is_file(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def is_dir(self, path: str) -> bool:
        return self._resolve_path(path).is_dir()

    def remove(self, path: str) -> None:
        full_path = self._resolve_path(path)
        if full_path.is_file():
            os.remove(full_path)

    def copy_file(self, src: str, dst: str) -> None:
        """Copy a file from src to dst."""
        src_path = self._resolve_path(src)
        dst_path = self._resolve_path(dst)
        self.mkdir(str(dst_path.parent), exist_ok=True)
        shutil.copy2(src_path, dst_path)

    def rmdir(self, directory: str) -> None:
        full_path = self._resolve_path(directory)
        if full_path.is_dir():
            os.rmdir(full_path)

    def mkdir(self, path: str, exist_ok: bool = False) -> None:
        full_path = self._resolve_path(path)
        full_path.mkdir(parents=True, exist_ok=exist_ok)

    def exists(self, path: str) -> bool:
        return self._resolve_path(path).exists()

copy_file(src, dst)

Copy a file from src to dst.

Source code in gigaspatial/core/io/local_data_store.py
def copy_file(self, src: str, dst: str) -> None:
    """Copy a file from src to dst."""
    src_path = self._resolve_path(src)
    dst_path = self._resolve_path(dst)
    self.mkdir(str(dst_path.parent), exist_ok=True)
    shutil.copy2(src_path, dst_path)

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data. Can merge multiple rasters into one during initialization.

Source code in gigaspatial/processing/tif_processor.py
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.
    Can merge multiple rasters into one during initialization.
    """

    dataset_path: Union[Path, str, List[Union[Path, str]]]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba", "multi"] = "single"
    merge_method: Literal["first", "last", "min", "max", "mean"] = "first"
    target_crs: Optional[str] = None  # For reprojection if needed
    resampling_method: Resampling = Resampling.nearest
    reprojection_resolution: Optional[Tuple[float, float]] = None

    def __post_init__(self):
        """Validate inputs, merge rasters if needed, and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(self.__class__.__name__)
        self._cache = {}
        self._temp_dir = tempfile.mkdtemp()
        self._merged_file_path = None
        self._reprojected_file_path = None

        # Handle multiple dataset paths
        if isinstance(self.dataset_path, list):
            if len(self.dataset_path) > 1:
                self.dataset_paths = [Path(p) for p in self.dataset_path]
                self._validate_multiple_datasets()
                self._merge_rasters()
                self.dataset_path = self._merged_file_path
        else:
            self.dataset_paths = [Path(self.dataset_path)]
            if not self.data_store.file_exists(str(self.dataset_path)):
                raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

            # Reproject single raster during initialization if target_crs is set
            if self.target_crs:
                self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
                with self.data_store.open(str(self.dataset_path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            self._reprojected_file_path = self._reproject_to_temp_file(
                                src, self.target_crs
                            )
                self.dataset_path = self._reprojected_file_path

        self._load_metadata()
        self._validate_mode_band_compatibility()

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset, handling temporary reprojected files."""
        if self._merged_file_path:
            with rasterio.open(self._merged_file_path) as src:
                yield src
        elif self._reprojected_file_path:
            with rasterio.open(self._reprojected_file_path) as src:
                yield src
        elif isinstance(self.data_store, LocalDataStore):
            with rasterio.open(str(self.dataset_path)) as src:
                yield src
        else:
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        yield src

    def reproject_to(
        self,
        target_crs: str,
        output_path: Optional[Union[str, Path]] = None,
        resampling_method: Optional[Resampling] = None,
        resolution: Optional[Tuple[float, float]] = None,
    ):
        """
        Reprojects the current raster to a new CRS and optionally saves it.

        Args:
            target_crs: The CRS to reproject to (e.g., "EPSG:4326").
            output_path: The path to save the reprojected raster. If None,
                         it is saved to a temporary file.
            resampling_method: The resampling method to use.
            resolution: The target resolution (pixel size) in the new CRS.
        """
        self.logger.info(f"Reprojecting raster to {target_crs}...")

        # Use provided or default values
        resampling_method = resampling_method or self.resampling_method
        resolution = resolution or self.reprojection_resolution

        with self.open_dataset() as src:
            if src.crs.to_string() == target_crs:
                self.logger.info(
                    "Raster is already in the target CRS. No reprojection needed."
                )
                # If output_path is specified, copy the file
                if output_path:
                    self.data_store.copy_file(str(self.dataset_path), output_path)
                return self.dataset_path

            dst_path = output_path or os.path.join(
                self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
            )

            with rasterio.open(
                dst_path,
                "w",
                **self._get_reprojection_profile(src, target_crs, resolution),
            ) as dst:
                for band_idx in range(1, src.count + 1):
                    reproject(
                        source=rasterio.band(src, band_idx),
                        destination=rasterio.band(dst, band_idx),
                        src_transform=src.transform,
                        src_crs=src.crs,
                        dst_transform=dst.transform,
                        dst_crs=dst.crs,
                        resampling=resampling_method,
                        num_threads=multiprocessing.cpu_count(),
                    )

            self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
            return Path(dst_path)

    def get_raster_info(self) -> Dict[str, Any]:
        """Get comprehensive raster information."""
        return {
            "count": self.count,
            "width": self.width,
            "height": self.height,
            "crs": self.crs,
            "bounds": self.bounds,
            "transform": self.transform,
            "dtypes": self.dtype,
            "nodata": self.nodata,
            "mode": self.mode,
            "is_merged": self.is_merged,
            "source_count": self.source_count,
        }

    def _reproject_to_temp_file(
        self, src: rasterio.DatasetReader, target_crs: str
    ) -> str:
        """Helper to reproject a raster and save it to a temporary file."""
        dst_path = os.path.join(
            self._temp_dir, f"reprojected_temp_{os.urandom(8).hex()}.tif"
        )
        profile = self._get_reprojection_profile(
            src, target_crs, self.reprojection_resolution
        )

        with rasterio.open(dst_path, "w", **profile) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=self.resampling_method,
                )
        return dst_path

    def _validate_multiple_datasets(self):
        """Validate that all datasets exist and have compatible properties."""
        if len(self.dataset_paths) < 2:
            raise ValueError("Multiple dataset paths required for merging")

        with self.data_store.open(str(self.dataset_paths[0]), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as ref_src:
                    ref_count = ref_src.count
                    ref_dtype = ref_src.dtypes[0]
                    ref_crs = ref_src.crs
                    ref_transform = ref_src.transform
                    ref_nodata = ref_src.nodata

        for i, path in enumerate(self.dataset_paths[1:], 1):
            with self.data_store.open(str(path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        if src.count != ref_count:
                            raise ValueError(
                                f"Dataset {i} has {src.count} bands, expected {ref_count}"
                            )
                        if src.dtypes[0] != ref_dtype:
                            raise ValueError(
                                f"Dataset {i} has dtype {src.dtypes[0]}, expected {ref_dtype}"
                            )
                        if not self.target_crs and src.crs != ref_crs:
                            self.logger.warning(
                                f"Dataset {i} has CRS {src.crs}, expected {ref_crs}. "
                                "Consider setting target_crs parameter for reprojection before merging."
                            )
                        if self.target_crs is None and not self._transforms_compatible(
                            src.transform, ref_transform
                        ):
                            self.logger.warning(
                                f"Dataset {i} has different resolution. Resampling may be needed."
                            )
                        if src.nodata != ref_nodata:
                            self.logger.warning(
                                f"Dataset {i} has different nodata value: {src.nodata} vs {ref_nodata}"
                            )

    def _get_reprojection_profile(
        self,
        src: rasterio.DatasetReader,
        target_crs: str,
        resolution: Optional[Tuple[float, float]],
        compression: str = "lzw",
    ):
        """Calculates and returns the profile for a reprojected raster."""
        if resolution:
            src_res = (abs(src.transform.a), abs(src.transform.e))
            self.logger.info(
                f"Using target resolution: {resolution}. Source resolution: {src_res}."
            )
            # Calculate transform and dimensions based on the new resolution
            dst_transform, width, height = calculate_default_transform(
                src.crs,
                target_crs,
                src.width,
                src.height,
                *src.bounds,
                resolution=resolution,
            )
        else:
            # Keep original resolution but reproject
            dst_transform, width, height = calculate_default_transform(
                src.crs, target_crs, src.width, src.height, *src.bounds
            )

        profile = src.profile.copy()
        profile.update(
            {
                "crs": target_crs,
                "transform": dst_transform,
                "width": width,
                "height": height,
                "compress": compression,  # Add compression to save space
            }
        )
        return profile

    def _transforms_compatible(self, transform1, transform2, tolerance=1e-6):
        """Check if two transforms have compatible pixel sizes."""
        return (
            abs(transform1.a - transform2.a) < tolerance
            and abs(transform1.e - transform2.e) < tolerance
        )

    def _merge_rasters(self):
        """Merge multiple rasters into a single raster."""
        self.logger.info(f"Merging {len(self.dataset_paths)} rasters...")

        # Open all datasets and handle reprojection if needed
        datasets_to_merge = []
        temp_reprojected_files = []
        try:
            for path in self.dataset_paths:
                with self.data_store.open(str(path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            if self.target_crs and src.crs != self.target_crs:
                                self.logger.info(
                                    f"Reprojecting {path.name} to {self.target_crs} before merging."
                                )
                                reprojected_path = self._reproject_to_temp_file(
                                    src, self.target_crs
                                )
                                temp_reprojected_files.append(reprojected_path)
                                datasets_to_merge.append(
                                    rasterio.open(reprojected_path)
                                )
                            else:
                                temp_path = os.path.join(
                                    self._temp_dir,
                                    f"temp_{path.stem}_{os.urandom(4).hex()}.tif",
                                )
                                temp_reprojected_files.append(temp_path)

                                profile = src.profile
                                with rasterio.open(temp_path, "w", **profile) as dst:
                                    dst.write(src.read())
                                datasets_to_merge.append(rasterio.open(temp_path))

            self._merged_file_path = os.path.join(self._temp_dir, "merged_raster.tif")

            if self.merge_method == "mean":
                merged_array, merged_transform = self._merge_with_mean(
                    datasets_to_merge
                )
            else:
                merged_array, merged_transform = merge(
                    datasets_to_merge,
                    method=self.merge_method,
                    resampling=self.resampling_method,
                )

            # Get profile from the first file in the list (all should be compatible now)
            ref_src = datasets_to_merge[0]
            profile = ref_src.profile.copy()
            profile.update(
                {
                    "height": merged_array.shape[-2],
                    "width": merged_array.shape[-1],
                    "transform": merged_transform,
                    "crs": self.target_crs if self.target_crs else ref_src.crs,
                }
            )

            with rasterio.open(self._merged_file_path, "w", **profile) as dst:
                dst.write(merged_array)
        finally:
            for dataset in datasets_to_merge:
                if hasattr(dataset, "close"):
                    dataset.close()

            # Clean up temporary files immediately
            for temp_file in temp_reprojected_files:
                try:
                    os.remove(temp_file)
                except OSError:
                    pass

        self.logger.info("Raster merging completed!")

    def _merge_with_mean(self, src_files):
        """Merge rasters using mean aggregation."""
        # Get bounds and resolution for merged raster
        bounds = src_files[0].bounds
        transform = src_files[0].transform

        for src in src_files[1:]:
            bounds = rasterio.coords.BoundingBox(
                min(bounds.left, src.bounds.left),
                min(bounds.bottom, src.bounds.bottom),
                max(bounds.right, src.bounds.right),
                max(bounds.top, src.bounds.top),
            )

        # Calculate dimensions for merged raster
        width = int((bounds.right - bounds.left) / abs(transform.a))
        height = int((bounds.top - bounds.bottom) / abs(transform.e))

        # Create new transform for merged bounds
        merged_transform = rasterio.transform.from_bounds(
            bounds.left, bounds.bottom, bounds.right, bounds.top, width, height
        )

        estimated_memory = height * width * src_files[0].count * 8  # float64
        if estimated_memory > 1e9:  # 1GB threshold
            self.logger.warning(
                f"Large memory usage expected: {estimated_memory/1e9:.1f}GB"
            )

        # Initialize arrays for sum and count
        sum_array = np.zeros((src_files[0].count, height, width), dtype=np.float64)
        count_array = np.zeros((height, width), dtype=np.int32)

        # Process each source file
        for src in src_files:
            # Read data
            data = src.read()

            # Calculate offset in merged raster
            src_bounds = src.bounds
            col_off = int((src_bounds.left - bounds.left) / abs(transform.a))
            row_off = int((bounds.top - src_bounds.top) / abs(transform.e))

            # Get valid data mask
            if src.nodata is not None:
                valid_mask = data[0] != src.nodata
            else:
                valid_mask = np.ones(data[0].shape, dtype=bool)

            # Add to sum and count arrays
            end_row = row_off + data.shape[1]
            end_col = col_off + data.shape[2]

            sum_array[:, row_off:end_row, col_off:end_col] += np.where(
                valid_mask, data, 0
            )
            count_array[row_off:end_row, col_off:end_col] += valid_mask.astype(np.int32)

        # Calculate mean
        mean_array = np.divide(
            sum_array,
            count_array,
            out=np.full_like(
                sum_array, src_files[0].nodata or 0, dtype=sum_array.dtype
            ),
            where=count_array > 0,
        )

        return mean_array.astype(src_files[0].dtypes[0]), merged_transform

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        try:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]
        except (rasterio.errors.RasterioIOError, FileNotFoundError) as e:
            raise FileNotFoundError(f"Could not read raster metadata: {e}")
        except Exception as e:
            raise RuntimeError(f"Unexpected error loading metadata: {e}")

    @property
    def is_merged(self) -> bool:
        """Check if this processor was created from multiple rasters."""
        return len(self.dataset_paths) > 1

    @property
    def source_count(self) -> int:
        """Get the number of source rasters."""
        return len(self.dataset_paths)

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def dtype(self):
        """Get the data types from the TIF file"""
        return self._cache.get("dtype", [])

    @property
    def width(self):
        return self._cache["width"]

    @property
    def height(self):
        return self._cache["height"]

    def to_dataframe(
        self, drop_nodata=True, check_memory=True, **kwargs
    ) -> pd.DataFrame:
        """
        Convert raster to DataFrame.

        Args:
            drop_nodata: Whether to drop nodata values
            check_memory: Whether to check memory before operation (default True)
            **kwargs: Additional arguments

        Returns:
            pd.DataFrame with raster data
        """
        # Memory guard check
        if check_memory:
            self._memory_guard("conversion", threshold_percent=80.0)

        try:
            if self.mode == "single":
                return self._to_dataframe(
                    band_number=kwargs.get("band_number", 1),
                    drop_nodata=drop_nodata,
                    band_names=kwargs.get("band_names", None),
                )
            else:
                return self._to_dataframe(
                    band_number=None,  # All bands
                    drop_nodata=drop_nodata,
                    band_names=kwargs.get("band_names", None),
                )
        except Exception as e:
            raise ValueError(
                f"Failed to process TIF file in mode '{self.mode}'. "
                f"Please ensure the file is valid and matches the selected mode. "
                f"Original error: {str(e)}"
            )

        return df

    def to_geodataframe(self, check_memory=True, **kwargs) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.

        Args:
            check_memory: Whether to check memory before operation
            **kwargs: Additional arguments passed to to_dataframe()

        Returns:
            gpd.GeoDataFrame with raster data
        """
        # Memory guard check
        if check_memory:
            self._memory_guard("conversion", threshold_percent=80.0)

        df = self.to_dataframe(check_memory=False, **kwargs)

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        return gdf

    def to_dataframe_chunked(
        self, drop_nodata=True, chunk_size=None, target_memory_mb=500, **kwargs
    ):
        """
        Convert raster to DataFrame using chunked processing for memory efficiency.

        Automatically routes to the appropriate chunked method based on mode.
        Chunk size is automatically calculated based on target memory usage.

        Args:
            drop_nodata: Whether to drop nodata values
            chunk_size: Number of rows per chunk (auto-calculated if None)
            target_memory_mb: Target memory per chunk in MB (default 500)
            **kwargs: Additional arguments (band_number, band_names, etc.)
        """

        if chunk_size is None:
            chunk_size = self._calculate_optimal_chunk_size(
                "conversion", target_memory_mb
            )

        windows = self._get_chunk_windows(chunk_size)

        # SIMPLE ROUTING
        if self.mode == "single":
            return self._to_dataframe_chunked(
                windows,
                band_number=kwargs.get("band_number", 1),
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )
        else:  # rgb, rgba, multi
            return self._to_dataframe_chunked(
                windows,
                band_number=None,
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )

    def clip_to_geometry(
        self,
        geometry: Union[
            Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, List[dict], dict
        ],
        crop: bool = True,
        all_touched: bool = True,
        invert: bool = False,
        nodata: Optional[Union[int, float]] = None,
        pad: bool = False,
        pad_width: float = 0.5,
        return_clipped_processor: bool = True,
    ) -> Union["TifProcessor", tuple]:
        """
        Clip raster to geometry boundaries.

        Parameters:
        -----------
        geometry : various
            Geometry to clip to. Can be:
            - Shapely Polygon or MultiPolygon
            - GeoDataFrame or GeoSeries
            - List of GeoJSON-like dicts
            - Single GeoJSON-like dict
        crop : bool, default True
            Whether to crop the raster to the extent of the geometry
        all_touched : bool, default True
            Include pixels that touch the geometry boundary
        invert : bool, default False
            If True, mask pixels inside geometry instead of outside
        nodata : int or float, optional
            Value to use for masked pixels. If None, uses raster's nodata value
        pad : bool, default False
            Pad geometry by half pixel before clipping
        pad_width : float, default 0.5
            Width of padding in pixels if pad=True
        return_clipped_processor : bool, default True
            If True, returns new TifProcessor with clipped data
            If False, returns (clipped_array, transform, metadata)

        Returns:
        --------
        TifProcessor or tuple
            Either new TifProcessor instance or (array, transform, metadata) tuple
        """
        # Handle different geometry input types
        shapes = self._prepare_geometry_for_clipping(geometry)

        # Validate CRS compatibility
        self._validate_geometry_crs(geometry)

        # Perform the clipping
        with self.open_dataset() as src:
            try:
                clipped_data, clipped_transform = mask(
                    dataset=src,
                    shapes=shapes,
                    crop=crop,
                    all_touched=all_touched,
                    invert=invert,
                    nodata=nodata,
                    pad=pad,
                    pad_width=pad_width,
                    filled=True,
                )

                # Update metadata for the clipped raster
                clipped_meta = src.meta.copy()
                clipped_meta.update(
                    {
                        "height": clipped_data.shape[1],
                        "width": clipped_data.shape[2],
                        "transform": clipped_transform,
                        "nodata": nodata if nodata is not None else src.nodata,
                    }
                )

            except ValueError as e:
                if "Input shapes do not overlap raster" in str(e):
                    raise ValueError(
                        "The geometry does not overlap with the raster. "
                        "Check that both are in the same coordinate reference system."
                    ) from e
                else:
                    raise e

        if return_clipped_processor:
            # Create a new TifProcessor with the clipped data
            return self._create_clipped_processor(clipped_data, clipped_meta)
        else:
            return clipped_data, clipped_transform, clipped_meta

    def clip_to_bounds(
        self,
        bounds: tuple,
        bounds_crs: Optional[str] = None,
        return_clipped_processor: bool = True,
    ) -> Union["TifProcessor", tuple]:
        """
        Clip raster to rectangular bounds.

        Parameters:
        -----------
        bounds : tuple
            Bounding box as (minx, miny, maxx, maxy)
        bounds_crs : str, optional
            CRS of the bounds. If None, assumes same as raster CRS
        return_clipped_processor : bool, default True
            If True, returns new TifProcessor, else returns (array, transform, metadata)

        Returns:
        --------
        TifProcessor or tuple
            Either new TifProcessor instance or (array, transform, metadata) tuple
        """
        # Create bounding box geometry
        bbox_geom = box(*bounds)

        # If bounds_crs is specified and different from raster CRS, create GeoDataFrame for reprojection
        if bounds_crs is not None:
            raster_crs = self.crs

            if not self.crs == bounds_crs:
                # Create GeoDataFrame with bounds CRS and reproject
                bbox_gdf = gpd.GeoDataFrame([1], geometry=[bbox_geom], crs=bounds_crs)
                bbox_gdf = bbox_gdf.to_crs(raster_crs)
                bbox_geom = bbox_gdf.geometry.iloc[0]

        return self.clip_to_geometry(
            geometry=bbox_geom,
            crop=True,
            return_clipped_processor=return_clipped_processor,
        )

    def to_graph(
        self,
        connectivity: Literal[4, 8] = 4,
        band: Optional[int] = None,
        include_coordinates: bool = False,
        graph_type: Literal["networkx", "sparse"] = "networkx",
        check_memory: bool = True,
    ) -> Union[nx.Graph, sp.csr_matrix]:
        """
        Convert raster to graph based on pixel adjacency.

        Args:
            connectivity: 4 or 8-connectivity
            band: Band number (1-indexed)
            include_coordinates: Include x,y coordinates in nodes
            graph_type: 'networkx' or 'sparse'
            check_memory: Whether to check memory before operation

        Returns:
            Graph representation of raster
        """

        # Memory guard check
        if check_memory:
            self._memory_guard("graph", threshold_percent=80.0)

        with self.open_dataset() as src:
            band_idx = band - 1 if band is not None else 0
            if band_idx < 0 or band_idx >= src.count:
                raise ValueError(
                    f"Band {band} not available. Raster has {src.count} bands"
                )

            data = src.read(band_idx + 1)
            nodata = src.nodata if src.nodata is not None else self.nodata
            valid_mask = (
                data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
            )

            height, width = data.shape

            # Find all valid pixels
            valid_rows, valid_cols = np.where(valid_mask)
            num_valid_pixels = len(valid_rows)

            # Create a sequential mapping from (row, col) to a node ID
            node_map = np.full(data.shape, -1, dtype=int)
            node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

            # Define neighborhood offsets
            if connectivity == 4:
                # von Neumann neighborhood (4-connectivity)
                offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
            else:  # connectivity == 8
                # Moore neighborhood (8-connectivity)
                offsets = [
                    (-1, -1),
                    (-1, 0),
                    (-1, 1),
                    (0, -1),
                    (0, 1),
                    (1, -1),
                    (1, 0),
                    (1, 1),
                ]

            # Collect nodes and edges
            nodes_to_add = []
            edges_to_add = []

            for i in range(num_valid_pixels):
                row, col = valid_rows[i], valid_cols[i]
                current_node_id = node_map[row, col]

                # Prepare node attributes
                node_attrs = {"value": float(data[row, col])}
                if include_coordinates:
                    x, y = src.xy(row, col)
                    node_attrs["x"] = x
                    node_attrs["y"] = y
                nodes_to_add.append((current_node_id, node_attrs))

                # Find neighbors and collect edges
                for dy, dx in offsets:
                    neighbor_row, neighbor_col = row + dy, col + dx

                    # Check if neighbor is within bounds and is a valid pixel
                    if (
                        0 <= neighbor_row < height
                        and 0 <= neighbor_col < width
                        and valid_mask[neighbor_row, neighbor_col]
                    ):
                        neighbor_node_id = node_map[neighbor_row, neighbor_col]

                        # Ensure each edge is added only once
                        if current_node_id < neighbor_node_id:
                            neighbor_value = float(data[neighbor_row, neighbor_col])
                            edges_to_add.append(
                                (current_node_id, neighbor_node_id, neighbor_value)
                            )

            if graph_type == "networkx":
                G = nx.Graph()
                G.add_nodes_from(nodes_to_add)
                G.add_weighted_edges_from(edges_to_add)
                return G
            else:  # sparse matrix
                edges_array = np.array(edges_to_add)
                row_indices = edges_array[:, 0]
                col_indices = edges_array[:, 1]
                weights = edges_array[:, 2]

                # Add reverse edges for symmetric matrix
                from_idx = np.append(row_indices, col_indices)
                to_idx = np.append(col_indices, row_indices)
                weights = np.append(weights, weights)

                return sp.coo_matrix(
                    (weights, (from_idx, to_idx)),
                    shape=(num_valid_pixels, num_valid_pixels),
                ).tocsr()

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]], **kwargs
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            elif self.count > 1:
                return np.array(
                    [vals for vals in src.sample(coordinate_list, **kwargs)]
                )
            else:
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self,
        polygon_list,
        stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
    ):
        """
        Sample raster values by polygons and compute statistic(s) for each polygon.

        Args:
            polygon_list: List of shapely Polygon or MultiPolygon objects.
            stat: Statistic(s) to compute. Can be:
                - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
                - Single callable: custom function that takes array and returns scalar
                - List of strings/callables: multiple statistics to compute

        Returns:
            If single stat: np.ndarray of computed statistics for each polygon
            If multiple stats: List of dictionaries with stat names as keys
        """
        # Determine if single or multiple stats
        single_stat = not isinstance(stat, list)
        stats_list = [stat] if single_stat else stat

        # Prepare stat functions
        stat_funcs = []
        stat_names = []

        for s in stats_list:
            if callable(s):
                stat_funcs.append(s)
                stat_names.append(
                    s.__name__
                    if hasattr(s, "__name__")
                    else f"custom_{len(stat_names)}"
                )
            else:
                # Handle string statistics
                if s == "count":
                    stat_funcs.append(len)
                else:
                    stat_funcs.append(getattr(np, s))
                stat_names.append(s)

        results = []

        with self.open_dataset() as src:
            for polygon in tqdm(polygon_list):
                try:
                    out_image, _ = mask(src, [polygon], crop=True, filled=False)

                    # Use masked arrays for more efficient nodata handling
                    if hasattr(out_image, "mask"):
                        valid_data = out_image.compressed()
                    else:
                        valid_data = (
                            out_image[out_image != self.nodata]
                            if self.nodata
                            else out_image.flatten()
                        )

                    if len(valid_data) == 0:
                        if single_stat:
                            results.append(np.nan)
                        else:
                            results.append({name: np.nan for name in stat_names})
                    else:
                        if single_stat:
                            results.append(stat_funcs[0](valid_data))
                        else:
                            # Compute all statistics for this polygon
                            polygon_stats = {}
                            for func, name in zip(stat_funcs, stat_names):
                                try:
                                    polygon_stats[name] = func(valid_data)
                                except Exception:
                                    polygon_stats[name] = np.nan
                            results.append(polygon_stats)

                except Exception:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})

        return np.array(results) if single_stat else results

    def sample_by_polygons_batched(
        self,
        polygon_list: List[Union[Polygon, MultiPolygon]],
        stat: Union[str, Callable] = "mean",
        batch_size: int = 100,
        n_workers: int = 4,
        show_progress: bool = True,
        check_memory: bool = True,
        **kwargs,
    ) -> np.ndarray:
        """
        Sample raster values by polygons in parallel using batching.

        Args:
            polygon_list: List of Shapely Polygon or MultiPolygon objects
            stat: Statistic to compute
            batch_size: Number of polygons per batch
            n_workers: Number of worker processes
            show_progress: Whether to display progress bar
            check_memory: Whether to check memory before operation
            **kwargs: Additional arguments

        Returns:
            np.ndarray of statistics for each polygon
        """
        import sys

        # Memory guard check with n_workers consideration
        if check_memory:
            is_safe = self._memory_guard(
                "batched_sampling",
                threshold_percent=85.0,
                n_workers=n_workers,
                raise_error=False,
            )

            if not is_safe:
                # Suggest reducing n_workers
                memory_info = self._check_available_memory()
                estimates = self._estimate_memory_usage("batched_sampling", n_workers=1)

                # Calculate optimal workers
                suggested_workers = max(
                    1, int(memory_info["available"] * 0.7 / estimates["per_worker"])
                )

                warnings.warn(
                    f"Consider reducing n_workers from {n_workers} to {suggested_workers} "
                    f"to reduce memory pressure.",
                    ResourceWarning,
                )

        # Platform check
        if sys.platform in ["win32", "darwin"]:
            import warnings
            import multiprocessing as mp

            if mp.get_start_method(allow_none=True) != "fork":
                warnings.warn(
                    "Batched sampling may not work on Windows/macOS. "
                    "Use sample_by_polygons() if you encounter errors.",
                    RuntimeWarning,
                )

        def _chunk_list(data_list, chunk_size):
            """Yield successive chunks from data_list."""
            for i in range(0, len(data_list), chunk_size):
                yield data_list[i : i + chunk_size]

        if len(polygon_list) == 0:
            return np.array([])

        stat_func = stat if callable(stat) else getattr(np, stat)
        polygon_chunks = list(_chunk_list(polygon_list, batch_size))

        with multiprocessing.Pool(
            initializer=self._initializer_worker, processes=n_workers
        ) as pool:
            process_func = partial(self._process_polygon_batch, stat_func=stat_func)
            if show_progress:
                batched_results = list(
                    tqdm(
                        pool.imap(process_func, polygon_chunks),
                        total=len(polygon_chunks),
                        desc=f"Sampling polygons",
                    )
                )
            else:
                batched_results = list(pool.imap(process_func, polygon_chunks))

            results = [item for sublist in batched_results for item in sublist]

        return np.array(results)

    def _initializer_worker(self):
        """
        Initializer function for each worker process.
        Opens the raster dataset and stores it in a process-local variable.
        This function runs once per worker, not for every task.
        """
        global src_handle, memfile_handle

        # Priority: merged > reprojected > original (same as open_dataset)
        local_file_path = None
        if self._merged_file_path:
            # Merged file is a local temp file
            local_file_path = self._merged_file_path
        elif self._reprojected_file_path:
            # Reprojected file is a local temp file
            local_file_path = self._reprojected_file_path
        elif isinstance(self.data_store, LocalDataStore):
            # Local file - can open directly
            local_file_path = str(self.dataset_path)

        if local_file_path:
            # Open local file directly
            with open(local_file_path, "rb") as f:
                memfile_handle = rasterio.MemoryFile(f.read())
                src_handle = memfile_handle.open()
        else:
            # Custom DataStore
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                memfile_handle = rasterio.MemoryFile(f.read())
                src_handle = memfile_handle.open()

    def _get_worker_dataset(self):
        """Get dataset handle for worker process."""
        global src_handle
        if src_handle is None:
            raise RuntimeError("Raster dataset not initialized in this process.")
        return src_handle

    def _process_single_polygon(self, polygon, stat_func):
        """
        Helper function to process a single polygon.
        This will be run in a separate process.
        """
        try:
            src = self._get_worker_dataset()
            out_image, _ = mask(src, [polygon], crop=True, filled=False)

            if hasattr(out_image, "mask"):
                valid_data = out_image.compressed()
            else:
                valid_data = (
                    out_image[out_image != self.nodata]
                    if self.nodata
                    else out_image.flatten()
                )

            return stat_func(valid_data) if len(valid_data) > 0 else np.nan
        except RuntimeError as e:
            self.logger.error(f"Worker not initialized: {e}")
            return np.nan
        except Exception as e:
            self.logger.debug(f"Error processing polygon: {e}")
            return np.nan

    def _process_polygon_batch(self, polygon_batch, stat_func):
        """
        Processes a batch of polygons.
        """
        return [
            self._process_single_polygon(polygon, stat_func)
            for polygon in polygon_batch
        ]

    def _to_dataframe(
        self,
        band_number: Optional[int] = None,
        drop_nodata: bool = True,
        band_names: Optional[Union[str, List[str]]] = None,
    ) -> pd.DataFrame:
        """
        Process TIF to DataFrame - handles both single-band and multi-band.

        Args:
            band_number: Specific band to read (1-indexed). If None, reads all bands.
            drop_no Whether to drop nodata values
            band_names: Custom names for bands (multi-band only)

        Returns:
            pd.DataFrame with lon, lat, and band value(s)
        """
        with self.open_dataset() as src:
            if band_number is not None:
                # SINGLE BAND MODE
                band = src.read(band_number)
                mask = self._build_data_mask(band, drop_nodata, src.nodata)
                lons, lats = self._extract_coordinates_with_mask(mask)
                pixel_values = (
                    np.extract(mask, band) if mask is not None else band.flatten()
                )
                band_name = band_names if isinstance(band_names, str) else "pixel_value"

                return pd.DataFrame({"lon": lons, "lat": lats, band_name: pixel_values})
            else:
                # MULTI-BAND MODE (all bands)
                stack = src.read()

                # Auto-detect band names by mode
                if band_names is None:
                    if self.mode == "rgb":
                        band_names = ["red", "green", "blue"]
                    elif self.mode == "rgba":
                        band_names = ["red", "green", "blue", "alpha"]
                    else:
                        band_names = [
                            src.descriptions[i] or f"band_{i+1}"
                            for i in range(self.count)
                        ]

                # Build mask (checks ALL bands!)
                mask = self._build_multi_band_mask(stack, drop_nodata, src.nodata)

                # Create DataFrame
                data_dict = self._bands_to_dict(stack, self.count, band_names, mask)
                df = pd.DataFrame(data_dict)

                # RGBA: normalize alpha if needed
                if (
                    self.mode == "rgba"
                    and "alpha" in df.columns
                    and df["alpha"].max() > 1
                ):
                    df["alpha"] = df["alpha"] / 255.0

            return df

    def _to_dataframe_chunked(
        self,
        windows: List[rasterio.windows.Window],
        band_number: Optional[int] = None,
        drop_nodata: bool = True,
        band_names: Optional[Union[str, List[str]]] = None,
        show_progress: bool = True,
    ) -> pd.DataFrame:
        """Universal chunked converter for ALL modes."""

        chunks = []
        iterator = tqdm(windows, desc="Processing chunks") if show_progress else windows

        with self.open_dataset() as src:
            # Auto-detect band names ONCE (before loop)
            if band_number is None and band_names is None:
                if self.mode == "rgb":
                    band_names = ["red", "green", "blue"]
                elif self.mode == "rgba":
                    band_names = ["red", "green", "blue", "alpha"]
                else:  # multi
                    band_names = [
                        src.descriptions[i] or f"band_{i+1}" for i in range(self.count)
                    ]

            for window in iterator:
                if band_number is not None:
                    # SINGLE BAND
                    band_chunk = src.read(band_number, window=window)
                    mask = self._build_data_mask(band_chunk, drop_nodata, src.nodata)
                    lons, lats = self._get_chunk_coordinates(window, src)
                    band_name = (
                        band_names if isinstance(band_names, str) else "pixel_value"
                    )

                    # Build chunk DataFrame (could use helper but simple enough)
                    if mask is not None:
                        mask_flat = mask.flatten()
                        chunk_df = pd.DataFrame(
                            {
                                "lon": lons[mask_flat],
                                "lat": lats[mask_flat],
                                band_name: band_chunk.flatten()[mask_flat],
                            }
                        )
                    else:
                        chunk_df = pd.DataFrame(
                            {"lon": lons, "lat": lats, band_name: band_chunk.flatten()}
                        )
                else:
                    # MULTI-BAND (includes RGB/RGBA)
                    stack_chunk = src.read(window=window)
                    mask = self._build_multi_band_mask(
                        stack_chunk, drop_nodata, src.nodata
                    )
                    lons, lats = self._get_chunk_coordinates(window, src)

                    # Build DataFrame using helper
                    band_dict = {
                        band_names[i]: stack_chunk[i] for i in range(self.count)
                    }
                    chunk_df = self._build_chunk_dataframe(lons, lats, band_dict, mask)

                    # RGBA: normalize alpha
                    if self.mode == "rgba" and "alpha" in chunk_df.columns:
                        if chunk_df["alpha"].max() > 1:
                            chunk_df["alpha"] = chunk_df["alpha"] / 255.0

                chunks.append(chunk_df)

        result = pd.concat(chunks, ignore_index=True)
        return result

    def _prepare_geometry_for_clipping(
        self,
        geometry: Union[
            Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, List[dict], dict
        ],
    ) -> List[dict]:
        """Convert various geometry formats to list of GeoJSON-like dicts for rasterio.mask"""

        if isinstance(geometry, (Polygon, MultiPolygon)):
            # Shapely geometry
            return [geometry.__geo_interface__]

        elif isinstance(geometry, gpd.GeoDataFrame):
            # GeoDataFrame - use all geometries
            return [
                geom.__geo_interface__ for geom in geometry.geometry if geom is not None
            ]

        elif isinstance(geometry, gpd.GeoSeries):
            # GeoSeries
            return [geom.__geo_interface__ for geom in geometry if geom is not None]

        elif isinstance(geometry, dict):
            # Single GeoJSON-like dict
            return [geometry]

        elif isinstance(geometry, list):
            # List of GeoJSON-like dicts
            return geometry

        else:
            raise TypeError(
                f"Unsupported geometry type: {type(geometry)}. "
                "Supported types: Shapely geometries, GeoDataFrame, GeoSeries, "
                "GeoJSON-like dict, or list of GeoJSON-like dicts."
            )

    def _validate_geometry_crs(
        self,
        original_geometry: Any,
    ) -> None:
        """Validate that geometry CRS matches raster CRS"""

        # Get raster CRS
        raster_crs = self.crs

        # Try to get geometry CRS
        geometry_crs = None

        if isinstance(original_geometry, (gpd.GeoDataFrame, gpd.GeoSeries)):
            geometry_crs = original_geometry.crs
        elif hasattr(original_geometry, "crs"):
            geometry_crs = original_geometry.crs

        # Warn if CRS mismatch detected
        if geometry_crs is not None and raster_crs is not None:
            if not raster_crs == geometry_crs:
                self.logger.warning(
                    f"CRS mismatch detected! Raster CRS: {raster_crs}, "
                    f"Geometry CRS: {geometry_crs}. "
                    "Consider reprojecting geometry to match raster CRS for accurate clipping."
                )

    def _create_clipped_processor(
        self, clipped_data: np.ndarray, clipped_meta: dict
    ) -> "TifProcessor":
        """
        Helper to create a new TifProcessor instance from clipped data.
        Saves the clipped data to a temporary file and initializes a new TifProcessor.
        """
        clipped_file_path = os.path.join(
            self._temp_dir, f"clipped_temp_{os.urandom(8).hex()}.tif"
        )
        with rasterio.open(clipped_file_path, "w", **clipped_meta) as dst:
            dst.write(clipped_data)

        self.logger.info(f"Clipped raster saved to temporary file: {clipped_file_path}")

        # Create a new TifProcessor instance with the clipped data
        # Pass relevant parameters from the current instance to maintain consistency
        return TifProcessor(
            dataset_path=clipped_file_path,
            data_store=self.data_store,
            mode=self.mode,
        )

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]

    def _get_chunk_coordinates(self, window, src):
        """Get coordinates for a specific window chunk."""
        transform = src.window_transform(window)
        rows, cols = np.meshgrid(
            np.arange(window.height), np.arange(window.width), indexing="ij"
        )
        xs, ys = rasterio.transform.xy(transform, rows.flatten(), cols.flatten())
        return np.array(xs), np.array(ys)

    def _extract_coordinates_with_mask(self, mask=None):
        """Extract flattened coordinates, optionally applying a mask."""
        x_coords, y_coords = self._get_pixel_coordinates()

        if mask is not None:
            return np.extract(mask, x_coords), np.extract(mask, y_coords)

        return x_coords.flatten(), y_coords.flatten()

    def _build_data_mask(self, data, drop_nodata=True, nodata_value=None):
        """Build a boolean mask for filtering data based on nodata values."""
        if not drop_nodata or nodata_value is None:
            return None

        return data != nodata_value

    def _build_multi_band_mask(
        self,
        bands: np.ndarray,
        drop_nodata: bool = True,
        nodata_value: Optional[float] = None,
    ) -> Optional[np.ndarray]:
        """
        Build mask for multi-band data - drops pixels where ANY band has nodata.

        Args:
            bands: 3D array of shape (n_bands, height, width)
            drop_nodata Whether to drop nodata values
            nodata_value: The nodata value to check

        Returns:
            Boolean mask or None if no masking needed
        """
        if not drop_nodata or nodata_value is None:
            return None

        # Check if ANY band has nodata at each pixel location
        has_nodata = np.any(bands == nodata_value, axis=0)

        # Return True where ALL bands are valid
        valid_mask = ~has_nodata

        return valid_mask if not valid_mask.all() else None

    def _bands_to_dict(self, bands, band_count, band_names, mask=None):
        """Read specified bands and return as a dictionary with optional masking."""

        lons, lats = self._extract_coordinates_with_mask(mask)
        data_dict = {"lon": lons, "lat": lats}

        for idx, name in enumerate(band_names[:band_count]):
            band_data = bands[idx]
            data_dict[name] = (
                np.extract(mask, band_data) if mask is not None else band_data.flatten()
            )

        return data_dict

    def _calculate_optimal_chunk_size(
        self, operation: str = "conversion", target_memory_mb: int = 500
    ) -> int:
        """
        Calculate optimal chunk size (number of rows) based on target memory usage.

        Args:
            operation: Type of operation ('conversion', 'graph')
            target_memory_mb: Target memory per chunk in megabytes

        Returns:
            Number of rows per chunk
        """
        bytes_per_element = np.dtype(self.dtype).itemsize
        n_bands = self.count
        width = self.width

        # Adjust for operation type
        if operation == "conversion":
            # DataFrame overhead is roughly 2x
            bytes_per_row = width * n_bands * bytes_per_element * 2
        elif operation == "graph":
            # Graph needs additional space for edges
            bytes_per_row = width * bytes_per_element * 4  # Estimate
        else:
            bytes_per_row = width * n_bands * bytes_per_element

        target_bytes = target_memory_mb * 1024 * 1024
        chunk_rows = max(1, int(target_bytes / bytes_per_row))

        # Ensure chunk size doesn't exceed total height
        chunk_rows = min(chunk_rows, self.height)

        self.logger.info(
            f"Calculated chunk size: {chunk_rows} rows "
            f"(~{self._format_bytes(chunk_rows * bytes_per_row)} per chunk)"
        )

        return chunk_rows

    def _get_chunk_windows(self, chunk_size: int) -> List[rasterio.windows.Window]:
        """
        Generate window objects for chunked reading.

        Args:
            chunk_size: Number of rows per chunk

        Returns:
            List of rasterio.windows.Window objects
        """
        windows = []
        for row_start in range(0, self.height, chunk_size):
            row_end = min(row_start + chunk_size, self.height)
            window = rasterio.windows.Window(
                col_off=0,
                row_off=row_start,
                width=self.width,
                height=row_end - row_start,
            )
            windows.append(window)

        return windows

    def _format_bytes(self, bytes_value: int) -> str:
        """Convert bytes to human-readable format."""
        for unit in ["B", "KB", "MB", "GB", "TB"]:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} PB"

    def _check_available_memory(self) -> dict:
        """
        Check available system memory.

        Returns:
            Dict with total, available, and used memory info
        """
        import psutil

        memory = psutil.virtual_memory()
        return {
            "total": memory.total,
            "available": memory.available,
            "used": memory.used,
            "percent": memory.percent,
            "available_human": self._format_bytes(memory.available),
        }

    def _estimate_memory_usage(
        self, operation: str = "conversion", n_workers: int = 1
    ) -> dict:
        """
        Estimate memory usage for various operations.

        Args:
            operation: Type of operation ('conversion', 'batched_sampling', 'merge', 'graph')
            n_workers: Number of workers (for batched_sampling)

        Returns:
            Dict with estimated memory usage in bytes and human-readable format
        """
        bytes_per_element = np.dtype(self.dtype).itemsize
        n_pixels = self.width * self.height
        n_bands = self.count

        estimates = {}

        if operation == "conversion":
            # to_dataframe/to_geodataframe: full raster + DataFrame overhead
            raster_memory = n_pixels * n_bands * bytes_per_element
            # DataFrame overhead (roughly 2x for storage + processing)
            dataframe_memory = (
                n_pixels * n_bands * 16
            )  # 16 bytes per value in DataFrame
            total = raster_memory + dataframe_memory
            estimates["raster"] = raster_memory
            estimates["dataframe"] = dataframe_memory
            estimates["total"] = total

        elif operation == "batched_sampling":
            # Each worker loads full raster into MemoryFile
            # Need to get file size
            if self._merged_file_path:
                file_path = self._merged_file_path
            elif self._reprojected_file_path:
                file_path = self._reprojected_file_path
            else:
                file_path = str(self.dataset_path)

            try:
                import os

                file_size = os.path.getsize(file_path)
            except:
                # Estimate if can't get file size
                file_size = n_pixels * n_bands * bytes_per_element * 1.2  # Add overhead

            estimates["per_worker"] = file_size
            estimates["total"] = file_size * n_workers

        elif operation == "merge":
            # _merge_with_mean uses float64 arrays
            raster_memory = n_pixels * n_bands * 8  # float64
            estimates["sum_array"] = raster_memory
            estimates["count_array"] = n_pixels * 4  # int32
            estimates["total"] = raster_memory + n_pixels * 4

        elif operation == "graph":
            # to_graph: data + node_map + edges
            data_memory = n_pixels * bytes_per_element
            node_map_memory = n_pixels * 4  # int32
            # Estimate edges (rough: 4-connectivity = 4 edges per pixel)
            edges_memory = n_pixels * 4 * 3 * 8  # 3 values per edge, float64
            total = data_memory + node_map_memory + edges_memory
            estimates["data"] = data_memory
            estimates["node_map"] = node_map_memory
            estimates["edges"] = edges_memory
            estimates["total"] = total

        # Add human-readable format
        estimates["human_readable"] = self._format_bytes(estimates["total"])

        return estimates

    def _memory_guard(
        self,
        operation: str,
        threshold_percent: float = 80.0,
        n_workers: Optional[int] = None,
        raise_error: bool = False,
    ) -> bool:
        """
        Check if operation is safe to perform given memory constraints.

        Args:
            operation: Type of operation to check
            threshold_percent: Maximum % of available memory to use (default 80%)
            n_workers: Number of workers (for batched operations)
            raise_error: If True, raise MemoryError instead of warning

        Returns:
            True if operation is safe, False otherwise

        Raises:
            MemoryError: If raise_error=True and memory insufficient
        """
        import warnings

        estimates = self._estimate_memory_usage(operation, n_workers=n_workers or 1)
        memory_info = self._check_available_memory()

        estimated_usage = estimates["total"]
        available = memory_info["available"]
        threshold = available * (threshold_percent / 100.0)

        is_safe = estimated_usage <= threshold

        if not is_safe:
            usage_str = self._format_bytes(estimated_usage)
            available_str = memory_info["available_human"]

            message = (
                f"Memory warning: {operation} operation may require {usage_str} "
                f"but only {available_str} is available. "
                f"Current memory usage: {memory_info['percent']:.1f}%"
            )

            if raise_error:
                raise MemoryError(message)
            else:
                warnings.warn(message, ResourceWarning)
                if hasattr(self, "logger"):
                    self.logger.warning(message)

        return is_safe

    def _validate_mode_band_compatibility(self):
        """Validate that mode matches band count."""
        mode_requirements = {
            "single": (1, "1-band"),
            "rgb": (3, "3-band"),
            "rgba": (4, "4-band"),
        }

        if self.mode in mode_requirements:
            required_count, description = mode_requirements[self.mode]
            if self.count != required_count:
                raise ValueError(
                    f"{self.mode.upper()} mode requires a {description} TIF file"
                )
        elif self.mode == "multi" and self.count < 2:
            raise ValueError("Multi mode requires a TIF file with 2 or more bands")

    def __enter__(self):
        return self

    def __del__(self):
        """Clean up temporary files and directories."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir, ignore_errors=True)

    def cleanup(self):
        """Explicit cleanup method for better control."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir)
            self.logger.info("Cleaned up temporary files")

    def __exit__(self, exc_type, exc_value, traceback):
        """Proper context manager exit with cleanup."""
        self.cleanup()
        return False

bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

dtype property

Get the data types from the TIF file

is_merged: bool property

Check if this processor was created from multiple rasters.

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

source_count: int property

Get the number of source rasters.

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__del__()

Clean up temporary files and directories.

Source code in gigaspatial/processing/tif_processor.py
def __del__(self):
    """Clean up temporary files and directories."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir, ignore_errors=True)

__exit__(exc_type, exc_value, traceback)

Proper context manager exit with cleanup.

Source code in gigaspatial/processing/tif_processor.py
def __exit__(self, exc_type, exc_value, traceback):
    """Proper context manager exit with cleanup."""
    self.cleanup()
    return False

__post_init__()

Validate inputs, merge rasters if needed, and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs, merge rasters if needed, and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(self.__class__.__name__)
    self._cache = {}
    self._temp_dir = tempfile.mkdtemp()
    self._merged_file_path = None
    self._reprojected_file_path = None

    # Handle multiple dataset paths
    if isinstance(self.dataset_path, list):
        if len(self.dataset_path) > 1:
            self.dataset_paths = [Path(p) for p in self.dataset_path]
            self._validate_multiple_datasets()
            self._merge_rasters()
            self.dataset_path = self._merged_file_path
    else:
        self.dataset_paths = [Path(self.dataset_path)]
        if not self.data_store.file_exists(str(self.dataset_path)):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        # Reproject single raster during initialization if target_crs is set
        if self.target_crs:
            self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        self._reprojected_file_path = self._reproject_to_temp_file(
                            src, self.target_crs
                        )
            self.dataset_path = self._reprojected_file_path

    self._load_metadata()
    self._validate_mode_band_compatibility()

cleanup()

Explicit cleanup method for better control.

Source code in gigaspatial/processing/tif_processor.py
def cleanup(self):
    """Explicit cleanup method for better control."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir)
        self.logger.info("Cleaned up temporary files")

clip_to_bounds(bounds, bounds_crs=None, return_clipped_processor=True)

Clip raster to rectangular bounds.

Parameters:

bounds : tuple Bounding box as (minx, miny, maxx, maxy) bounds_crs : str, optional CRS of the bounds. If None, assumes same as raster CRS return_clipped_processor : bool, default True If True, returns new TifProcessor, else returns (array, transform, metadata)

Returns:

TifProcessor or tuple Either new TifProcessor instance or (array, transform, metadata) tuple

Source code in gigaspatial/processing/tif_processor.py
def clip_to_bounds(
    self,
    bounds: tuple,
    bounds_crs: Optional[str] = None,
    return_clipped_processor: bool = True,
) -> Union["TifProcessor", tuple]:
    """
    Clip raster to rectangular bounds.

    Parameters:
    -----------
    bounds : tuple
        Bounding box as (minx, miny, maxx, maxy)
    bounds_crs : str, optional
        CRS of the bounds. If None, assumes same as raster CRS
    return_clipped_processor : bool, default True
        If True, returns new TifProcessor, else returns (array, transform, metadata)

    Returns:
    --------
    TifProcessor or tuple
        Either new TifProcessor instance or (array, transform, metadata) tuple
    """
    # Create bounding box geometry
    bbox_geom = box(*bounds)

    # If bounds_crs is specified and different from raster CRS, create GeoDataFrame for reprojection
    if bounds_crs is not None:
        raster_crs = self.crs

        if not self.crs == bounds_crs:
            # Create GeoDataFrame with bounds CRS and reproject
            bbox_gdf = gpd.GeoDataFrame([1], geometry=[bbox_geom], crs=bounds_crs)
            bbox_gdf = bbox_gdf.to_crs(raster_crs)
            bbox_geom = bbox_gdf.geometry.iloc[0]

    return self.clip_to_geometry(
        geometry=bbox_geom,
        crop=True,
        return_clipped_processor=return_clipped_processor,
    )

clip_to_geometry(geometry, crop=True, all_touched=True, invert=False, nodata=None, pad=False, pad_width=0.5, return_clipped_processor=True)

Clip raster to geometry boundaries.

Parameters:

geometry : various Geometry to clip to. Can be: - Shapely Polygon or MultiPolygon - GeoDataFrame or GeoSeries - List of GeoJSON-like dicts - Single GeoJSON-like dict crop : bool, default True Whether to crop the raster to the extent of the geometry all_touched : bool, default True Include pixels that touch the geometry boundary invert : bool, default False If True, mask pixels inside geometry instead of outside nodata : int or float, optional Value to use for masked pixels. If None, uses raster's nodata value pad : bool, default False Pad geometry by half pixel before clipping pad_width : float, default 0.5 Width of padding in pixels if pad=True return_clipped_processor : bool, default True If True, returns new TifProcessor with clipped data If False, returns (clipped_array, transform, metadata)

Returns:

TifProcessor or tuple Either new TifProcessor instance or (array, transform, metadata) tuple

Source code in gigaspatial/processing/tif_processor.py
def clip_to_geometry(
    self,
    geometry: Union[
        Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, List[dict], dict
    ],
    crop: bool = True,
    all_touched: bool = True,
    invert: bool = False,
    nodata: Optional[Union[int, float]] = None,
    pad: bool = False,
    pad_width: float = 0.5,
    return_clipped_processor: bool = True,
) -> Union["TifProcessor", tuple]:
    """
    Clip raster to geometry boundaries.

    Parameters:
    -----------
    geometry : various
        Geometry to clip to. Can be:
        - Shapely Polygon or MultiPolygon
        - GeoDataFrame or GeoSeries
        - List of GeoJSON-like dicts
        - Single GeoJSON-like dict
    crop : bool, default True
        Whether to crop the raster to the extent of the geometry
    all_touched : bool, default True
        Include pixels that touch the geometry boundary
    invert : bool, default False
        If True, mask pixels inside geometry instead of outside
    nodata : int or float, optional
        Value to use for masked pixels. If None, uses raster's nodata value
    pad : bool, default False
        Pad geometry by half pixel before clipping
    pad_width : float, default 0.5
        Width of padding in pixels if pad=True
    return_clipped_processor : bool, default True
        If True, returns new TifProcessor with clipped data
        If False, returns (clipped_array, transform, metadata)

    Returns:
    --------
    TifProcessor or tuple
        Either new TifProcessor instance or (array, transform, metadata) tuple
    """
    # Handle different geometry input types
    shapes = self._prepare_geometry_for_clipping(geometry)

    # Validate CRS compatibility
    self._validate_geometry_crs(geometry)

    # Perform the clipping
    with self.open_dataset() as src:
        try:
            clipped_data, clipped_transform = mask(
                dataset=src,
                shapes=shapes,
                crop=crop,
                all_touched=all_touched,
                invert=invert,
                nodata=nodata,
                pad=pad,
                pad_width=pad_width,
                filled=True,
            )

            # Update metadata for the clipped raster
            clipped_meta = src.meta.copy()
            clipped_meta.update(
                {
                    "height": clipped_data.shape[1],
                    "width": clipped_data.shape[2],
                    "transform": clipped_transform,
                    "nodata": nodata if nodata is not None else src.nodata,
                }
            )

        except ValueError as e:
            if "Input shapes do not overlap raster" in str(e):
                raise ValueError(
                    "The geometry does not overlap with the raster. "
                    "Check that both are in the same coordinate reference system."
                ) from e
            else:
                raise e

    if return_clipped_processor:
        # Create a new TifProcessor with the clipped data
        return self._create_clipped_processor(clipped_data, clipped_meta)
    else:
        return clipped_data, clipped_transform, clipped_meta

get_raster_info()

Get comprehensive raster information.

Source code in gigaspatial/processing/tif_processor.py
def get_raster_info(self) -> Dict[str, Any]:
    """Get comprehensive raster information."""
    return {
        "count": self.count,
        "width": self.width,
        "height": self.height,
        "crs": self.crs,
        "bounds": self.bounds,
        "transform": self.transform,
        "dtypes": self.dtype,
        "nodata": self.nodata,
        "mode": self.mode,
        "is_merged": self.is_merged,
        "source_count": self.source_count,
    }

open_dataset()

Context manager for accessing the dataset, handling temporary reprojected files.

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset, handling temporary reprojected files."""
    if self._merged_file_path:
        with rasterio.open(self._merged_file_path) as src:
            yield src
    elif self._reprojected_file_path:
        with rasterio.open(self._reprojected_file_path) as src:
            yield src
    elif isinstance(self.data_store, LocalDataStore):
        with rasterio.open(str(self.dataset_path)) as src:
            yield src
    else:
        with self.data_store.open(str(self.dataset_path), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src

reproject_to(target_crs, output_path=None, resampling_method=None, resolution=None)

Reprojects the current raster to a new CRS and optionally saves it.

Parameters:

Name Type Description Default
target_crs str

The CRS to reproject to (e.g., "EPSG:4326").

required
output_path Optional[Union[str, Path]]

The path to save the reprojected raster. If None, it is saved to a temporary file.

None
resampling_method Optional[Resampling]

The resampling method to use.

None
resolution Optional[Tuple[float, float]]

The target resolution (pixel size) in the new CRS.

None
Source code in gigaspatial/processing/tif_processor.py
def reproject_to(
    self,
    target_crs: str,
    output_path: Optional[Union[str, Path]] = None,
    resampling_method: Optional[Resampling] = None,
    resolution: Optional[Tuple[float, float]] = None,
):
    """
    Reprojects the current raster to a new CRS and optionally saves it.

    Args:
        target_crs: The CRS to reproject to (e.g., "EPSG:4326").
        output_path: The path to save the reprojected raster. If None,
                     it is saved to a temporary file.
        resampling_method: The resampling method to use.
        resolution: The target resolution (pixel size) in the new CRS.
    """
    self.logger.info(f"Reprojecting raster to {target_crs}...")

    # Use provided or default values
    resampling_method = resampling_method or self.resampling_method
    resolution = resolution or self.reprojection_resolution

    with self.open_dataset() as src:
        if src.crs.to_string() == target_crs:
            self.logger.info(
                "Raster is already in the target CRS. No reprojection needed."
            )
            # If output_path is specified, copy the file
            if output_path:
                self.data_store.copy_file(str(self.dataset_path), output_path)
            return self.dataset_path

        dst_path = output_path or os.path.join(
            self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
        )

        with rasterio.open(
            dst_path,
            "w",
            **self._get_reprojection_profile(src, target_crs, resolution),
        ) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=resampling_method,
                    num_threads=multiprocessing.cpu_count(),
                )

        self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
        return Path(dst_path)

sample_by_polygons(polygon_list, stat='mean')

Sample raster values by polygons and compute statistic(s) for each polygon.

Parameters:

Name Type Description Default
polygon_list

List of shapely Polygon or MultiPolygon objects.

required
stat Union[str, Callable, List[Union[str, Callable]]]

Statistic(s) to compute. Can be: - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count' - Single callable: custom function that takes array and returns scalar - List of strings/callables: multiple statistics to compute

'mean'

Returns:

Type Description

If single stat: np.ndarray of computed statistics for each polygon

If multiple stats: List of dictionaries with stat names as keys

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self,
    polygon_list,
    stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
):
    """
    Sample raster values by polygons and compute statistic(s) for each polygon.

    Args:
        polygon_list: List of shapely Polygon or MultiPolygon objects.
        stat: Statistic(s) to compute. Can be:
            - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
            - Single callable: custom function that takes array and returns scalar
            - List of strings/callables: multiple statistics to compute

    Returns:
        If single stat: np.ndarray of computed statistics for each polygon
        If multiple stats: List of dictionaries with stat names as keys
    """
    # Determine if single or multiple stats
    single_stat = not isinstance(stat, list)
    stats_list = [stat] if single_stat else stat

    # Prepare stat functions
    stat_funcs = []
    stat_names = []

    for s in stats_list:
        if callable(s):
            stat_funcs.append(s)
            stat_names.append(
                s.__name__
                if hasattr(s, "__name__")
                else f"custom_{len(stat_names)}"
            )
        else:
            # Handle string statistics
            if s == "count":
                stat_funcs.append(len)
            else:
                stat_funcs.append(getattr(np, s))
            stat_names.append(s)

    results = []

    with self.open_dataset() as src:
        for polygon in tqdm(polygon_list):
            try:
                out_image, _ = mask(src, [polygon], crop=True, filled=False)

                # Use masked arrays for more efficient nodata handling
                if hasattr(out_image, "mask"):
                    valid_data = out_image.compressed()
                else:
                    valid_data = (
                        out_image[out_image != self.nodata]
                        if self.nodata
                        else out_image.flatten()
                    )

                if len(valid_data) == 0:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})
                else:
                    if single_stat:
                        results.append(stat_funcs[0](valid_data))
                    else:
                        # Compute all statistics for this polygon
                        polygon_stats = {}
                        for func, name in zip(stat_funcs, stat_names):
                            try:
                                polygon_stats[name] = func(valid_data)
                            except Exception:
                                polygon_stats[name] = np.nan
                        results.append(polygon_stats)

            except Exception:
                if single_stat:
                    results.append(np.nan)
                else:
                    results.append({name: np.nan for name in stat_names})

    return np.array(results) if single_stat else results

sample_by_polygons_batched(polygon_list, stat='mean', batch_size=100, n_workers=4, show_progress=True, check_memory=True, **kwargs)

Sample raster values by polygons in parallel using batching.

Parameters:

Name Type Description Default
polygon_list List[Union[Polygon, MultiPolygon]]

List of Shapely Polygon or MultiPolygon objects

required
stat Union[str, Callable]

Statistic to compute

'mean'
batch_size int

Number of polygons per batch

100
n_workers int

Number of worker processes

4
show_progress bool

Whether to display progress bar

True
check_memory bool

Whether to check memory before operation

True
**kwargs

Additional arguments

{}

Returns:

Type Description
ndarray

np.ndarray of statistics for each polygon

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons_batched(
    self,
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: Union[str, Callable] = "mean",
    batch_size: int = 100,
    n_workers: int = 4,
    show_progress: bool = True,
    check_memory: bool = True,
    **kwargs,
) -> np.ndarray:
    """
    Sample raster values by polygons in parallel using batching.

    Args:
        polygon_list: List of Shapely Polygon or MultiPolygon objects
        stat: Statistic to compute
        batch_size: Number of polygons per batch
        n_workers: Number of worker processes
        show_progress: Whether to display progress bar
        check_memory: Whether to check memory before operation
        **kwargs: Additional arguments

    Returns:
        np.ndarray of statistics for each polygon
    """
    import sys

    # Memory guard check with n_workers consideration
    if check_memory:
        is_safe = self._memory_guard(
            "batched_sampling",
            threshold_percent=85.0,
            n_workers=n_workers,
            raise_error=False,
        )

        if not is_safe:
            # Suggest reducing n_workers
            memory_info = self._check_available_memory()
            estimates = self._estimate_memory_usage("batched_sampling", n_workers=1)

            # Calculate optimal workers
            suggested_workers = max(
                1, int(memory_info["available"] * 0.7 / estimates["per_worker"])
            )

            warnings.warn(
                f"Consider reducing n_workers from {n_workers} to {suggested_workers} "
                f"to reduce memory pressure.",
                ResourceWarning,
            )

    # Platform check
    if sys.platform in ["win32", "darwin"]:
        import warnings
        import multiprocessing as mp

        if mp.get_start_method(allow_none=True) != "fork":
            warnings.warn(
                "Batched sampling may not work on Windows/macOS. "
                "Use sample_by_polygons() if you encounter errors.",
                RuntimeWarning,
            )

    def _chunk_list(data_list, chunk_size):
        """Yield successive chunks from data_list."""
        for i in range(0, len(data_list), chunk_size):
            yield data_list[i : i + chunk_size]

    if len(polygon_list) == 0:
        return np.array([])

    stat_func = stat if callable(stat) else getattr(np, stat)
    polygon_chunks = list(_chunk_list(polygon_list, batch_size))

    with multiprocessing.Pool(
        initializer=self._initializer_worker, processes=n_workers
    ) as pool:
        process_func = partial(self._process_polygon_batch, stat_func=stat_func)
        if show_progress:
            batched_results = list(
                tqdm(
                    pool.imap(process_func, polygon_chunks),
                    total=len(polygon_chunks),
                    desc=f"Sampling polygons",
                )
            )
        else:
            batched_results = list(pool.imap(process_func, polygon_chunks))

        results = [item for sublist in batched_results for item in sublist]

    return np.array(results)

to_dataframe(drop_nodata=True, check_memory=True, **kwargs)

Convert raster to DataFrame.

Parameters:

Name Type Description Default
drop_nodata

Whether to drop nodata values

True
check_memory

Whether to check memory before operation (default True)

True
**kwargs

Additional arguments

{}

Returns:

Type Description
DataFrame

pd.DataFrame with raster data

Source code in gigaspatial/processing/tif_processor.py
def to_dataframe(
    self, drop_nodata=True, check_memory=True, **kwargs
) -> pd.DataFrame:
    """
    Convert raster to DataFrame.

    Args:
        drop_nodata: Whether to drop nodata values
        check_memory: Whether to check memory before operation (default True)
        **kwargs: Additional arguments

    Returns:
        pd.DataFrame with raster data
    """
    # Memory guard check
    if check_memory:
        self._memory_guard("conversion", threshold_percent=80.0)

    try:
        if self.mode == "single":
            return self._to_dataframe(
                band_number=kwargs.get("band_number", 1),
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )
        else:
            return self._to_dataframe(
                band_number=None,  # All bands
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )
    except Exception as e:
        raise ValueError(
            f"Failed to process TIF file in mode '{self.mode}'. "
            f"Please ensure the file is valid and matches the selected mode. "
            f"Original error: {str(e)}"
        )

    return df

to_dataframe_chunked(drop_nodata=True, chunk_size=None, target_memory_mb=500, **kwargs)

Convert raster to DataFrame using chunked processing for memory efficiency.

Automatically routes to the appropriate chunked method based on mode. Chunk size is automatically calculated based on target memory usage.

Parameters:

Name Type Description Default
drop_nodata

Whether to drop nodata values

True
chunk_size

Number of rows per chunk (auto-calculated if None)

None
target_memory_mb

Target memory per chunk in MB (default 500)

500
**kwargs

Additional arguments (band_number, band_names, etc.)

{}
Source code in gigaspatial/processing/tif_processor.py
def to_dataframe_chunked(
    self, drop_nodata=True, chunk_size=None, target_memory_mb=500, **kwargs
):
    """
    Convert raster to DataFrame using chunked processing for memory efficiency.

    Automatically routes to the appropriate chunked method based on mode.
    Chunk size is automatically calculated based on target memory usage.

    Args:
        drop_nodata: Whether to drop nodata values
        chunk_size: Number of rows per chunk (auto-calculated if None)
        target_memory_mb: Target memory per chunk in MB (default 500)
        **kwargs: Additional arguments (band_number, band_names, etc.)
    """

    if chunk_size is None:
        chunk_size = self._calculate_optimal_chunk_size(
            "conversion", target_memory_mb
        )

    windows = self._get_chunk_windows(chunk_size)

    # SIMPLE ROUTING
    if self.mode == "single":
        return self._to_dataframe_chunked(
            windows,
            band_number=kwargs.get("band_number", 1),
            drop_nodata=drop_nodata,
            band_names=kwargs.get("band_names", None),
        )
    else:  # rgb, rgba, multi
        return self._to_dataframe_chunked(
            windows,
            band_number=None,
            drop_nodata=drop_nodata,
            band_names=kwargs.get("band_names", None),
        )

to_geodataframe(check_memory=True, **kwargs)

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Parameters:

Name Type Description Default
check_memory

Whether to check memory before operation

True
**kwargs

Additional arguments passed to to_dataframe()

{}

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame with raster data

Source code in gigaspatial/processing/tif_processor.py
def to_geodataframe(self, check_memory=True, **kwargs) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.

    Args:
        check_memory: Whether to check memory before operation
        **kwargs: Additional arguments passed to to_dataframe()

    Returns:
        gpd.GeoDataFrame with raster data
    """
    # Memory guard check
    if check_memory:
        self._memory_guard("conversion", threshold_percent=80.0)

    df = self.to_dataframe(check_memory=False, **kwargs)

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    return gdf

to_graph(connectivity=4, band=None, include_coordinates=False, graph_type='networkx', check_memory=True)

Convert raster to graph based on pixel adjacency.

Parameters:

Name Type Description Default
connectivity Literal[4, 8]

4 or 8-connectivity

4
band Optional[int]

Band number (1-indexed)

None
include_coordinates bool

Include x,y coordinates in nodes

False
graph_type Literal['networkx', 'sparse']

'networkx' or 'sparse'

'networkx'
check_memory bool

Whether to check memory before operation

True

Returns:

Type Description
Union[Graph, csr_matrix]

Graph representation of raster

Source code in gigaspatial/processing/tif_processor.py
def to_graph(
    self,
    connectivity: Literal[4, 8] = 4,
    band: Optional[int] = None,
    include_coordinates: bool = False,
    graph_type: Literal["networkx", "sparse"] = "networkx",
    check_memory: bool = True,
) -> Union[nx.Graph, sp.csr_matrix]:
    """
    Convert raster to graph based on pixel adjacency.

    Args:
        connectivity: 4 or 8-connectivity
        band: Band number (1-indexed)
        include_coordinates: Include x,y coordinates in nodes
        graph_type: 'networkx' or 'sparse'
        check_memory: Whether to check memory before operation

    Returns:
        Graph representation of raster
    """

    # Memory guard check
    if check_memory:
        self._memory_guard("graph", threshold_percent=80.0)

    with self.open_dataset() as src:
        band_idx = band - 1 if band is not None else 0
        if band_idx < 0 or band_idx >= src.count:
            raise ValueError(
                f"Band {band} not available. Raster has {src.count} bands"
            )

        data = src.read(band_idx + 1)
        nodata = src.nodata if src.nodata is not None else self.nodata
        valid_mask = (
            data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
        )

        height, width = data.shape

        # Find all valid pixels
        valid_rows, valid_cols = np.where(valid_mask)
        num_valid_pixels = len(valid_rows)

        # Create a sequential mapping from (row, col) to a node ID
        node_map = np.full(data.shape, -1, dtype=int)
        node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

        # Define neighborhood offsets
        if connectivity == 4:
            # von Neumann neighborhood (4-connectivity)
            offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        else:  # connectivity == 8
            # Moore neighborhood (8-connectivity)
            offsets = [
                (-1, -1),
                (-1, 0),
                (-1, 1),
                (0, -1),
                (0, 1),
                (1, -1),
                (1, 0),
                (1, 1),
            ]

        # Collect nodes and edges
        nodes_to_add = []
        edges_to_add = []

        for i in range(num_valid_pixels):
            row, col = valid_rows[i], valid_cols[i]
            current_node_id = node_map[row, col]

            # Prepare node attributes
            node_attrs = {"value": float(data[row, col])}
            if include_coordinates:
                x, y = src.xy(row, col)
                node_attrs["x"] = x
                node_attrs["y"] = y
            nodes_to_add.append((current_node_id, node_attrs))

            # Find neighbors and collect edges
            for dy, dx in offsets:
                neighbor_row, neighbor_col = row + dy, col + dx

                # Check if neighbor is within bounds and is a valid pixel
                if (
                    0 <= neighbor_row < height
                    and 0 <= neighbor_col < width
                    and valid_mask[neighbor_row, neighbor_col]
                ):
                    neighbor_node_id = node_map[neighbor_row, neighbor_col]

                    # Ensure each edge is added only once
                    if current_node_id < neighbor_node_id:
                        neighbor_value = float(data[neighbor_row, neighbor_col])
                        edges_to_add.append(
                            (current_node_id, neighbor_node_id, neighbor_value)
                        )

        if graph_type == "networkx":
            G = nx.Graph()
            G.add_nodes_from(nodes_to_add)
            G.add_weighted_edges_from(edges_to_add)
            return G
        else:  # sparse matrix
            edges_array = np.array(edges_to_add)
            row_indices = edges_array[:, 0]
            col_indices = edges_array[:, 1]
            weights = edges_array[:, 2]

            # Add reverse edges for symmetric matrix
            from_idx = np.append(row_indices, col_indices)
            to_idx = np.append(col_indices, row_indices)
            weights = np.append(weights, weights)

            return sp.coo_matrix(
                (weights, (from_idx, to_idx)),
                shape=(num_valid_pixels, num_valid_pixels),
            ).tocsr()

add_area_in_meters(gdf, area_column_name='area_in_meters')

Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries. area_column_name : str, optional Name of the new column to store the area values. Default is "area_m2".

Returns:

geopandas.GeoDataFrame The input GeoDataFrame with an additional column for the area in square meters.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def add_area_in_meters(
    gdf: gpd.GeoDataFrame, area_column_name: str = "area_in_meters"
) -> gpd.GeoDataFrame:
    """
    Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.
    area_column_name : str, optional
        Name of the new column to store the area values. Default is "area_m2".

    Returns:
    -------
    geopandas.GeoDataFrame
        The input GeoDataFrame with an additional column for the area in square meters.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Create a copy of the GeoDataFrame to avoid modifying the original
    gdf_with_area = gdf.copy()

    # Calculate the UTM CRS for accurate area calculation
    try:
        utm_crs = gdf_with_area.estimate_utm_crs()
    except Exception as e:
        LOGGER.warning(
            f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
        )
        utm_crs = "EPSG:3857"  # Fallback to Web Mercator

    # Transform to UTM CRS and calculate the area in square meters
    gdf_with_area[area_column_name] = gdf_with_area.to_crs(utm_crs).geometry.area

    return gdf_with_area

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

aggregate_points_to_zones(points, zones, value_columns=None, aggregation='count', point_zone_predicate='within', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate point data to zones with flexible aggregation methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

Point data to aggregate

required
zones GeoDataFrame

Zones to aggregate points to

required
value_columns Optional[Union[str, List[str]]]

Column(s) containing values to aggregate If None, only counts will be performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max") - Dict: Map column names to aggregation methods

'count'
point_zone_predicate str

Spatial predicate for point-to-zone relationship Options: "within", "intersects"

'within'
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated point values

Example

poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count") poi_value_mean = aggregate_points_to_zones( ... pois, zones, value_columns="score", aggregation="mean" ... ) poi_multiple = aggregate_points_to_zones( ... pois, zones, ... value_columns=["score", "visits"], ... aggregation={"score": "mean", "visits": "sum"} ... )

Source code in gigaspatial/processing/geo.py
def aggregate_points_to_zones(
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    point_zone_predicate: str = "within",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate point data to zones with flexible aggregation methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): Point data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate points to
        value_columns (Optional[Union[str, List[str]]]): Column(s) containing values to aggregate
            If None, only counts will be performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max")
            - Dict: Map column names to aggregation methods
        point_zone_predicate (str): Spatial predicate for point-to-zone relationship
            Options: "within", "intersects"
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated point values

    Example:
        >>> poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count")
        >>> poi_value_mean = aggregate_points_to_zones(
        ...     pois, zones, value_columns="score", aggregation="mean"
        ... )
        >>> poi_multiple = aggregate_points_to_zones(
        ...     pois, zones,
        ...     value_columns=["score", "visits"],
        ...     aggregation={"score": "mean", "visits": "sum"}
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert points to GeoDataFrame if necessary
    if not isinstance(points, gpd.GeoDataFrame):
        points_gdf = convert_to_geodataframe(points)
    else:
        points_gdf = points.copy()

    # Ensure CRS match
    if points_gdf.crs != zones.crs:
        points_gdf = points_gdf.to_crs(zones.crs)

    # Handle value columns
    if value_columns is not None:
        if isinstance(value_columns, str):
            value_columns = [value_columns]

        # Validate that all value columns exist
        missing_cols = [col for col in value_columns if col not in points_gdf.columns]
        if missing_cols:
            raise ValueError(f"Value columns not found in points data: {missing_cols}")

    # Handle aggregation method
    agg_funcs = {}

    if isinstance(aggregation, str):
        if aggregation == "count":
            # Special case for count (doesn't need value columns)
            agg_funcs["__count"] = "count"
        elif value_columns is not None:
            # Apply the same aggregation to all value columns
            agg_funcs = {col: aggregation for col in value_columns}
        else:
            raise ValueError(
                "Value columns must be specified for aggregation methods other than 'count'"
            )
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        if value_columns is None:
            raise ValueError(
                "Value columns must be specified when using a dictionary of aggregation methods"
            )

        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    # Spatial join
    joined = gpd.sjoin(points_gdf, zones, how="inner", predicate=point_zone_predicate)

    # Perform aggregation
    if "geometry" in joined.columns and not all(
        value == "count" for value in agg_funcs.values()
    ):
        # Drop geometry for non-count aggregations to avoid errors
        joined = joined.drop(columns=["geometry"])

    if "__count" in agg_funcs:
        # Count points per zone
        counts = (
            joined.groupby(zone_id_column)
            .size()
            .reset_index(name=f"point_count{output_suffix}")
        )
        result = result.merge(counts, on=zone_id_column, how="left")
        result[f"point_count{output_suffix}"] = (
            result[f"point_count{output_suffix}"].fillna(0).astype(int)
        )
    else:
        # Aggregate values
        aggregated = joined.groupby(zone_id_column).agg(agg_funcs).reset_index()

        # Rename columns to include aggregation method
        if len(value_columns) > 0:
            # Handle MultiIndex columns from pandas aggregation
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    (
                        f"{col[0]}_{col[1]}{output_suffix}"
                        if col[0] != zone_id_column
                        else zone_id_column
                    )
                    for col in aggregated.columns
                ]

            # Merge back to zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])
        return result

    return result

aggregate_polygons_to_zones(polygons, zones, value_columns, aggregation='sum', predicate='intersects', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregates polygon data to zones based on a specified spatial relationship.

This function performs a spatial join between polygons and zones and then aggregates values from the polygons to their corresponding zones. The aggregation method depends on the predicate parameter, which determines the nature of the spatial relationship.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.

required
zones GeoDataFrame

The target zones to which the polygon data will be aggregated.

required
value_columns Union[str, List[str]]

The column(s) in polygons containing the numeric values to aggregate.

required
aggregation Union[str, Dict[str, str]]

The aggregation method(s) to use. Can be a single string (e.g., "sum", "mean", "max") to apply the same method to all columns, or a dictionary mapping column names to aggregation methods (e.g., {'population': 'sum'}). Defaults to "sum".

'sum'
predicate Literal['intersects', 'within', 'fractional']

The spatial relationship to use for aggregation: - "intersects": Aggregates values for any polygon that intersects a zone. - "within": Aggregates values for polygons entirely contained within a zone. - "fractional": Performs area-weighted aggregation. The value of a polygon is distributed proportionally to the area of its overlap with each zone. This requires calculating a UTM CRS for accurate area measurements. Defaults to "intersects".

'intersects'
zone_id_column str

The name of the column in zones that contains the unique zone identifiers. Defaults to "zone_id".

'zone_id'
output_suffix str

A suffix to add to the names of the new aggregated columns in the output GeoDataFrame. Defaults to "".

''
drop_geometry bool

If True, the geometry column will be dropped from the output GeoDataFrame. Defaults to False.

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The zones GeoDataFrame with new columns containing the aggregated values. Zones with no intersecting or contained polygons will have 0 values.

Raises:

Type Description
TypeError

If zones is not a GeoDataFrame or polygons cannot be converted.

ValueError

If zone_id_column or any value_columns are not found, or if the geometry types in polygons are not polygons.

RuntimeError

If an error occurs during the area-weighted aggregation process.

Example

import geopandas as gpd

Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames

Aggregate total population within each grid zone using area-weighting

pop_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="population", ... predicate="fractional", ... aggregation="sum", ... output_suffix="_pop" ... )

Aggregate the count of landuse parcels intersecting each zone

count_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="parcel_id", ... predicate="intersects", ... aggregation="count" ... )

Source code in gigaspatial/processing/geo.py
def aggregate_polygons_to_zones(
    polygons: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Union[str, List[str]],
    aggregation: Union[str, Dict[str, str]] = "sum",
    predicate: Literal["intersects", "within", "fractional"] = "intersects",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregates polygon data to zones based on a specified spatial relationship.

    This function performs a spatial join between polygons and zones and then
    aggregates values from the polygons to their corresponding zones. The aggregation
    method depends on the `predicate` parameter, which determines the nature of the
    spatial relationship.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
            Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.
        zones (gpd.GeoDataFrame):
            The target zones to which the polygon data will be aggregated.
        value_columns (Union[str, List[str]]):
            The column(s) in `polygons` containing the numeric values to aggregate.
        aggregation (Union[str, Dict[str, str]], optional):
            The aggregation method(s) to use. Can be a single string (e.g., "sum",
            "mean", "max") to apply the same method to all columns, or a dictionary
            mapping column names to aggregation methods (e.g., `{'population': 'sum'}`).
            Defaults to "sum".
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation:
            - "intersects": Aggregates values for any polygon that intersects a zone.
            - "within": Aggregates values for polygons entirely contained within a zone.
            - "fractional": Performs area-weighted aggregation. The value of a polygon
              is distributed proportionally to the area of its overlap with each zone.
              This requires calculating a UTM CRS for accurate area measurements.
            Defaults to "intersects".
        zone_id_column (str, optional):
            The name of the column in `zones` that contains the unique zone identifiers.
            Defaults to "zone_id".
        output_suffix (str, optional):
            A suffix to add to the names of the new aggregated columns in the output
            GeoDataFrame. Defaults to "".
        drop_geometry (bool, optional):
            If True, the geometry column will be dropped from the output GeoDataFrame.
            Defaults to False.

    Returns:
        gpd.GeoDataFrame:
            The `zones` GeoDataFrame with new columns containing the aggregated values.
            Zones with no intersecting or contained polygons will have `0` values.

    Raises:
        TypeError: If `zones` is not a GeoDataFrame or `polygons` cannot be converted.
        ValueError: If `zone_id_column` or any `value_columns` are not found, or
                    if the geometry types in `polygons` are not polygons.
        RuntimeError: If an error occurs during the area-weighted aggregation process.

    Example:
        >>> import geopandas as gpd
        >>> # Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames
        >>> # Aggregate total population within each grid zone using area-weighting
        >>> pop_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="population",
        ...     predicate="fractional",
        ...     aggregation="sum",
        ...     output_suffix="_pop"
        ... )
        >>> # Aggregate the count of landuse parcels intersecting each zone
        >>> count_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="parcel_id",
        ...     predicate="intersects",
        ...     aggregation="count"
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zones.empty:
        raise ValueError("zones GeoDataFrame is empty")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    if predicate not in ["intersects", "within", "fractional"]:
        raise ValueError(
            f"Unsupported predicate: {predicate}. Predicate can be one of `intersects`, `within`, `fractional`"
        )

    # Convert polygons to GeoDataFrame if necessary
    if not isinstance(polygons, gpd.GeoDataFrame):
        try:
            polygons_gdf = convert_to_geodataframe(polygons)
        except Exception as e:
            raise TypeError(
                f"polygons must be a GeoDataFrame or convertible to one: {e}"
            )
    else:
        polygons_gdf = polygons.copy()

    if polygons_gdf.empty:
        LOGGER.warning("Empty polygons GeoDataFrame provided")
        return zones

    # Validate geometry types
    non_polygon_geoms = [
        geom_type
        for geom_type in polygons_gdf.geometry.geom_type.unique()
        if geom_type not in ["Polygon", "MultiPolygon"]
    ]
    if non_polygon_geoms:
        raise ValueError(
            f"Input contains non-polygon geometries: {non_polygon_geoms}. "
            "Use aggregate_points_to_zones for point data."
        )

    # Process value columns
    if isinstance(value_columns, str):
        value_columns = [value_columns]

    # Validate that all value columns exist
    missing_cols = [col for col in value_columns if col not in polygons_gdf.columns]
    if missing_cols:
        raise ValueError(f"Value columns not found in polygons data: {missing_cols}")

    # Check for column name conflicts with zone_id_column
    if zone_id_column in polygons_gdf.columns:
        raise ValueError(
            f"Column name conflict: polygons DataFrame contains column '{zone_id_column}' "
            f"which conflicts with the zone identifier column. Please rename this column "
            f"in the polygons data to avoid confusion."
        )

    # Ensure CRS match
    if polygons_gdf.crs != zones.crs:
        polygons_gdf = polygons_gdf.to_crs(zones.crs)

    # Handle aggregation method
    agg_funcs = _process_aggregation_methods(aggregation, value_columns)

    # Prepare minimal zones for spatial operations (only zone_id_column and geometry)
    minimal_zones = zones[[zone_id_column, "geometry"]].copy()

    if predicate == "fractional":
        aggregated_data = _fractional_aggregation(
            polygons_gdf, minimal_zones, value_columns, agg_funcs, zone_id_column
        )
    else:
        aggregated_data = _simple_aggregation(
            polygons_gdf,
            minimal_zones,
            value_columns,
            agg_funcs,
            zone_id_column,
            predicate,
        )

    # Merge aggregated results back to complete zones data
    result = zones.merge(
        aggregated_data[[col for col in aggregated_data.columns if col != "geometry"]],
        on=zone_id_column,
        how="left",
    )

    # Fill NaN values with zeros for the newly aggregated columns only
    aggregated_cols = [col for col in result.columns if col not in zones.columns]
    for col in aggregated_cols:
        if pd.api.types.is_numeric_dtype(result[col]):
            result[col] = result[col].fillna(0)

    # Apply output suffix consistently to result columns only
    if output_suffix:
        rename_dict = {col: f"{col}{output_suffix}" for col in aggregated_cols}
        result = result.rename(columns=rename_dict)

    if drop_geometry:
        result = result.drop(columns=["geometry"])

    return result

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: Union[float, np.array, pd.Series],
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        try:
            utm_crs = gdf_work.estimate_utm_crs()
        except Exception as e:
            LOGGER.warning(
                f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
            )
            utm_crs = "EPSG:3857"  # Fallback to Web Mercator

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            distance=buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

get_centroids(gdf)

Calculate the centroids of a (Multi)Polygon GeoDataFrame.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries.

Returns:

geopandas.GeoDataFrame A new GeoDataFrame with Point geometries representing the centroids.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def get_centroids(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate the centroids of a (Multi)Polygon GeoDataFrame.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with Point geometries representing the centroids.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Calculate centroids
    centroids = gdf.copy()
    centroids["geometry"] = centroids.geometry.centroid

    return centroids

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

algorithms

build_distance_graph(left_df, right_df, distance_threshold, max_k=100, return_dataframe=False, verbose=True, exclude_same_index=None)

Build a graph of spatial matches between two dataframes using KD-tree.

Parameters:

Name Type Description Default
left_df Union[DataFrame, GeoDataFrame]

Left dataframe to match from

required
right_df Union[DataFrame, GeoDataFrame]

Right dataframe to match to

required
distance_threshold float

Maximum distance for matching (in meters)

required
max_k int

Maximum number of neighbors to consider per point (default: 100)

100
return_dataframe bool

If True, also return the matches DataFrame

False
verbose bool

If True, print statistics about the graph

True
exclude_same_index Optional[bool]

If True, exclude self-matches. If None, auto-detect based on df equality

None

Returns:

Type Description
Union[Graph, Tuple[Graph, DataFrame]]

NetworkX Graph, or tuple of (Graph, DataFrame) if return_dataframe=True

Raises:

Type Description
ValueError

If distance_threshold is negative or max_k is not positive

Source code in gigaspatial/processing/algorithms.py
def build_distance_graph(
    left_df: Union[pd.DataFrame, gpd.GeoDataFrame],
    right_df: Union[pd.DataFrame, gpd.GeoDataFrame],
    distance_threshold: float,
    max_k: int = 100,
    return_dataframe: bool = False,
    verbose: bool = True,
    exclude_same_index: Optional[bool] = None,
) -> Union[nx.Graph, Tuple[nx.Graph, pd.DataFrame]]:
    """
    Build a graph of spatial matches between two dataframes using KD-tree.

    Args:
        left_df: Left dataframe to match from
        right_df: Right dataframe to match to
        distance_threshold: Maximum distance for matching (in meters)
        max_k: Maximum number of neighbors to consider per point (default: 100)
        return_dataframe: If True, also return the matches DataFrame
        verbose: If True, print statistics about the graph
        exclude_same_index: If True, exclude self-matches. If None, auto-detect based on df equality

    Returns:
        NetworkX Graph, or tuple of (Graph, DataFrame) if return_dataframe=True

    Raises:
        ValueError: If distance_threshold is negative or max_k is not positive
    """

    # Input validation
    if distance_threshold < 0:
        raise ValueError("distance_threshold must be non-negative")

    if max_k <= 0:
        raise ValueError("max_k must be positive")

    if left_df.empty or right_df.empty:
        if verbose:
            LOGGER.warning("Warning: One or both dataframes are empty")
        G = nx.Graph()
        return (G, pd.DataFrame()) if return_dataframe else G

    def get_utm_coordinates(df: Union[pd.DataFrame, gpd.GeoDataFrame]) -> np.ndarray:
        """Extract coordinates as numpy array in UTM projection."""
        if isinstance(df, pd.DataFrame):
            gdf = convert_to_geodataframe(df)
        else:
            gdf = df.copy()

        # More robust UTM CRS estimation
        try:
            gdf_utm = gdf.to_crs(gdf.estimate_utm_crs())
        except Exception as e:
            if verbose:
                LOGGER.warning(
                    f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
                )
            gdf_utm = gdf.to_crs("EPSG:3857")  # Fallback to Web Mercator

        return gdf_utm.get_coordinates().to_numpy()

    # Auto-detect same dataframe case
    if exclude_same_index is None:
        exclude_same_index = left_df.equals(right_df)
        if verbose and exclude_same_index:
            LOGGER.info("Auto-detected same dataframe - excluding self-matches")

    # Get coordinates
    left_coords = get_utm_coordinates(left_df)
    right_coords = (
        get_utm_coordinates(right_df) if not exclude_same_index else left_coords
    )

    # Build KD-tree and query
    kdtree = cKDTree(right_coords)

    # Use the provided max_k parameter, but don't exceed available points
    k_to_use = min(max_k, len(right_coords))

    if verbose and k_to_use < max_k:
        LOGGER.info(
            f"Note: max_k ({max_k}) reduced to {k_to_use} (number of available points)"
        )

    # Note: Distance calculations here are based on Euclidean distance in UTM projection.
    # This can introduce errors up to ~50 cm for a 50 meter threshold, especially near the poles where distortion increases.
    distances, indices = kdtree.query(
        left_coords, k=k_to_use, distance_upper_bound=distance_threshold
    )

    # Handle single k case (when k_to_use = 1, results are 1D)
    if distances.ndim == 1:
        distances = distances.reshape(-1, 1)
        indices = indices.reshape(-1, 1)

    # Extract valid pairs using vectorized operations
    left_indices = np.arange(len(distances))[:, np.newaxis]
    left_indices = np.broadcast_to(left_indices, distances.shape)
    valid_mask = np.isfinite(distances)

    if exclude_same_index:
        same_index_mask = left_indices == indices
        valid_mask = valid_mask & ~same_index_mask

    valid_left = left_indices[valid_mask]
    valid_right = indices[valid_mask]
    valid_distances = distances[valid_mask]

    # Map back to original indices
    valid_left_indices = left_df.index.values[valid_left]
    valid_right_indices = right_df.index.values[valid_right]

    # Create matches DataFrame
    matches_df = pd.DataFrame(
        {
            "left_idx": valid_left_indices,
            "right_idx": valid_right_indices,
            "distance": valid_distances,
        }
    )

    # Build graph more efficiently
    G = nx.from_pandas_edgelist(
        matches_df,
        source="left_idx",
        target="right_idx",
        edge_attr="distance",
        create_using=nx.Graph(),
    )

    # Add isolated nodes (nodes without any matches within threshold)
    # This ensures all original indices are represented in the graph
    all_left_nodes = set(left_df.index.values)
    all_right_nodes = set(right_df.index.values)

    if not exclude_same_index:
        all_nodes = all_left_nodes | all_right_nodes
    else:
        all_nodes = all_left_nodes  # Same dataframe, so same node set

    # Add nodes that don't have edges
    existing_nodes = set(G.nodes())
    isolated_nodes = all_nodes - existing_nodes
    G.add_nodes_from(isolated_nodes)

    # Print statistics
    if verbose:
        print(
            f"Total potential matches: {len(left_df)} × {len(right_df)} = {len(left_df) * len(right_df):,}"
        )
        print(f"Matches found within {distance_threshold}m: {len(matches_df):,}")
        print(f"Graph nodes: {G.number_of_nodes():,}")
        print(f"Graph edges: {G.number_of_edges():,}")

        components = list(nx.connected_components(G))
        print(f"Connected components: {len(components):,}")

        if len(components) > 1:
            component_sizes = [len(c) for c in components]
            print(f"Largest component size: {max(component_sizes):,}")
            print(
                f"Isolated nodes: {sum(1 for size in component_sizes if size == 1):,}"
            )

        if len(matches_df) > 0:
            print(
                f"Distance stats - min: {matches_df['distance'].min():.1f}m, "
                f"max: {matches_df['distance'].max():.1f}m, "
                f"mean: {matches_df['distance'].mean():.1f}m"
            )

    return (G, matches_df) if return_dataframe else G

geo

add_area_in_meters(gdf, area_column_name='area_in_meters')

Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries. area_column_name : str, optional Name of the new column to store the area values. Default is "area_m2".

Returns:

geopandas.GeoDataFrame The input GeoDataFrame with an additional column for the area in square meters.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def add_area_in_meters(
    gdf: gpd.GeoDataFrame, area_column_name: str = "area_in_meters"
) -> gpd.GeoDataFrame:
    """
    Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.
    area_column_name : str, optional
        Name of the new column to store the area values. Default is "area_m2".

    Returns:
    -------
    geopandas.GeoDataFrame
        The input GeoDataFrame with an additional column for the area in square meters.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Create a copy of the GeoDataFrame to avoid modifying the original
    gdf_with_area = gdf.copy()

    # Calculate the UTM CRS for accurate area calculation
    try:
        utm_crs = gdf_with_area.estimate_utm_crs()
    except Exception as e:
        LOGGER.warning(
            f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
        )
        utm_crs = "EPSG:3857"  # Fallback to Web Mercator

    # Transform to UTM CRS and calculate the area in square meters
    gdf_with_area[area_column_name] = gdf_with_area.to_crs(utm_crs).geometry.area

    return gdf_with_area

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

aggregate_points_to_zones(points, zones, value_columns=None, aggregation='count', point_zone_predicate='within', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate point data to zones with flexible aggregation methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

Point data to aggregate

required
zones GeoDataFrame

Zones to aggregate points to

required
value_columns Optional[Union[str, List[str]]]

Column(s) containing values to aggregate If None, only counts will be performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max") - Dict: Map column names to aggregation methods

'count'
point_zone_predicate str

Spatial predicate for point-to-zone relationship Options: "within", "intersects"

'within'
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated point values

Example

poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count") poi_value_mean = aggregate_points_to_zones( ... pois, zones, value_columns="score", aggregation="mean" ... ) poi_multiple = aggregate_points_to_zones( ... pois, zones, ... value_columns=["score", "visits"], ... aggregation={"score": "mean", "visits": "sum"} ... )

Source code in gigaspatial/processing/geo.py
def aggregate_points_to_zones(
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    point_zone_predicate: str = "within",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate point data to zones with flexible aggregation methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): Point data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate points to
        value_columns (Optional[Union[str, List[str]]]): Column(s) containing values to aggregate
            If None, only counts will be performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max")
            - Dict: Map column names to aggregation methods
        point_zone_predicate (str): Spatial predicate for point-to-zone relationship
            Options: "within", "intersects"
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated point values

    Example:
        >>> poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count")
        >>> poi_value_mean = aggregate_points_to_zones(
        ...     pois, zones, value_columns="score", aggregation="mean"
        ... )
        >>> poi_multiple = aggregate_points_to_zones(
        ...     pois, zones,
        ...     value_columns=["score", "visits"],
        ...     aggregation={"score": "mean", "visits": "sum"}
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert points to GeoDataFrame if necessary
    if not isinstance(points, gpd.GeoDataFrame):
        points_gdf = convert_to_geodataframe(points)
    else:
        points_gdf = points.copy()

    # Ensure CRS match
    if points_gdf.crs != zones.crs:
        points_gdf = points_gdf.to_crs(zones.crs)

    # Handle value columns
    if value_columns is not None:
        if isinstance(value_columns, str):
            value_columns = [value_columns]

        # Validate that all value columns exist
        missing_cols = [col for col in value_columns if col not in points_gdf.columns]
        if missing_cols:
            raise ValueError(f"Value columns not found in points data: {missing_cols}")

    # Handle aggregation method
    agg_funcs = {}

    if isinstance(aggregation, str):
        if aggregation == "count":
            # Special case for count (doesn't need value columns)
            agg_funcs["__count"] = "count"
        elif value_columns is not None:
            # Apply the same aggregation to all value columns
            agg_funcs = {col: aggregation for col in value_columns}
        else:
            raise ValueError(
                "Value columns must be specified for aggregation methods other than 'count'"
            )
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        if value_columns is None:
            raise ValueError(
                "Value columns must be specified when using a dictionary of aggregation methods"
            )

        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    # Spatial join
    joined = gpd.sjoin(points_gdf, zones, how="inner", predicate=point_zone_predicate)

    # Perform aggregation
    if "geometry" in joined.columns and not all(
        value == "count" for value in agg_funcs.values()
    ):
        # Drop geometry for non-count aggregations to avoid errors
        joined = joined.drop(columns=["geometry"])

    if "__count" in agg_funcs:
        # Count points per zone
        counts = (
            joined.groupby(zone_id_column)
            .size()
            .reset_index(name=f"point_count{output_suffix}")
        )
        result = result.merge(counts, on=zone_id_column, how="left")
        result[f"point_count{output_suffix}"] = (
            result[f"point_count{output_suffix}"].fillna(0).astype(int)
        )
    else:
        # Aggregate values
        aggregated = joined.groupby(zone_id_column).agg(agg_funcs).reset_index()

        # Rename columns to include aggregation method
        if len(value_columns) > 0:
            # Handle MultiIndex columns from pandas aggregation
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    (
                        f"{col[0]}_{col[1]}{output_suffix}"
                        if col[0] != zone_id_column
                        else zone_id_column
                    )
                    for col in aggregated.columns
                ]

            # Merge back to zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])
        return result

    return result

aggregate_polygons_to_zones(polygons, zones, value_columns, aggregation='sum', predicate='intersects', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregates polygon data to zones based on a specified spatial relationship.

This function performs a spatial join between polygons and zones and then aggregates values from the polygons to their corresponding zones. The aggregation method depends on the predicate parameter, which determines the nature of the spatial relationship.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.

required
zones GeoDataFrame

The target zones to which the polygon data will be aggregated.

required
value_columns Union[str, List[str]]

The column(s) in polygons containing the numeric values to aggregate.

required
aggregation Union[str, Dict[str, str]]

The aggregation method(s) to use. Can be a single string (e.g., "sum", "mean", "max") to apply the same method to all columns, or a dictionary mapping column names to aggregation methods (e.g., {'population': 'sum'}). Defaults to "sum".

'sum'
predicate Literal['intersects', 'within', 'fractional']

The spatial relationship to use for aggregation: - "intersects": Aggregates values for any polygon that intersects a zone. - "within": Aggregates values for polygons entirely contained within a zone. - "fractional": Performs area-weighted aggregation. The value of a polygon is distributed proportionally to the area of its overlap with each zone. This requires calculating a UTM CRS for accurate area measurements. Defaults to "intersects".

'intersects'
zone_id_column str

The name of the column in zones that contains the unique zone identifiers. Defaults to "zone_id".

'zone_id'
output_suffix str

A suffix to add to the names of the new aggregated columns in the output GeoDataFrame. Defaults to "".

''
drop_geometry bool

If True, the geometry column will be dropped from the output GeoDataFrame. Defaults to False.

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The zones GeoDataFrame with new columns containing the aggregated values. Zones with no intersecting or contained polygons will have 0 values.

Raises:

Type Description
TypeError

If zones is not a GeoDataFrame or polygons cannot be converted.

ValueError

If zone_id_column or any value_columns are not found, or if the geometry types in polygons are not polygons.

RuntimeError

If an error occurs during the area-weighted aggregation process.

Example

import geopandas as gpd

Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames
Aggregate total population within each grid zone using area-weighting

pop_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="population", ... predicate="fractional", ... aggregation="sum", ... output_suffix="_pop" ... )

Aggregate the count of landuse parcels intersecting each zone

count_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="parcel_id", ... predicate="intersects", ... aggregation="count" ... )

Source code in gigaspatial/processing/geo.py
def aggregate_polygons_to_zones(
    polygons: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Union[str, List[str]],
    aggregation: Union[str, Dict[str, str]] = "sum",
    predicate: Literal["intersects", "within", "fractional"] = "intersects",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregates polygon data to zones based on a specified spatial relationship.

    This function performs a spatial join between polygons and zones and then
    aggregates values from the polygons to their corresponding zones. The aggregation
    method depends on the `predicate` parameter, which determines the nature of the
    spatial relationship.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
            Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.
        zones (gpd.GeoDataFrame):
            The target zones to which the polygon data will be aggregated.
        value_columns (Union[str, List[str]]):
            The column(s) in `polygons` containing the numeric values to aggregate.
        aggregation (Union[str, Dict[str, str]], optional):
            The aggregation method(s) to use. Can be a single string (e.g., "sum",
            "mean", "max") to apply the same method to all columns, or a dictionary
            mapping column names to aggregation methods (e.g., `{'population': 'sum'}`).
            Defaults to "sum".
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation:
            - "intersects": Aggregates values for any polygon that intersects a zone.
            - "within": Aggregates values for polygons entirely contained within a zone.
            - "fractional": Performs area-weighted aggregation. The value of a polygon
              is distributed proportionally to the area of its overlap with each zone.
              This requires calculating a UTM CRS for accurate area measurements.
            Defaults to "intersects".
        zone_id_column (str, optional):
            The name of the column in `zones` that contains the unique zone identifiers.
            Defaults to "zone_id".
        output_suffix (str, optional):
            A suffix to add to the names of the new aggregated columns in the output
            GeoDataFrame. Defaults to "".
        drop_geometry (bool, optional):
            If True, the geometry column will be dropped from the output GeoDataFrame.
            Defaults to False.

    Returns:
        gpd.GeoDataFrame:
            The `zones` GeoDataFrame with new columns containing the aggregated values.
            Zones with no intersecting or contained polygons will have `0` values.

    Raises:
        TypeError: If `zones` is not a GeoDataFrame or `polygons` cannot be converted.
        ValueError: If `zone_id_column` or any `value_columns` are not found, or
                    if the geometry types in `polygons` are not polygons.
        RuntimeError: If an error occurs during the area-weighted aggregation process.

    Example:
        >>> import geopandas as gpd
        >>> # Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames
        >>> # Aggregate total population within each grid zone using area-weighting
        >>> pop_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="population",
        ...     predicate="fractional",
        ...     aggregation="sum",
        ...     output_suffix="_pop"
        ... )
        >>> # Aggregate the count of landuse parcels intersecting each zone
        >>> count_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="parcel_id",
        ...     predicate="intersects",
        ...     aggregation="count"
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zones.empty:
        raise ValueError("zones GeoDataFrame is empty")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    if predicate not in ["intersects", "within", "fractional"]:
        raise ValueError(
            f"Unsupported predicate: {predicate}. Predicate can be one of `intersects`, `within`, `fractional`"
        )

    # Convert polygons to GeoDataFrame if necessary
    if not isinstance(polygons, gpd.GeoDataFrame):
        try:
            polygons_gdf = convert_to_geodataframe(polygons)
        except Exception as e:
            raise TypeError(
                f"polygons must be a GeoDataFrame or convertible to one: {e}"
            )
    else:
        polygons_gdf = polygons.copy()

    if polygons_gdf.empty:
        LOGGER.warning("Empty polygons GeoDataFrame provided")
        return zones

    # Validate geometry types
    non_polygon_geoms = [
        geom_type
        for geom_type in polygons_gdf.geometry.geom_type.unique()
        if geom_type not in ["Polygon", "MultiPolygon"]
    ]
    if non_polygon_geoms:
        raise ValueError(
            f"Input contains non-polygon geometries: {non_polygon_geoms}. "
            "Use aggregate_points_to_zones for point data."
        )

    # Process value columns
    if isinstance(value_columns, str):
        value_columns = [value_columns]

    # Validate that all value columns exist
    missing_cols = [col for col in value_columns if col not in polygons_gdf.columns]
    if missing_cols:
        raise ValueError(f"Value columns not found in polygons data: {missing_cols}")

    # Check for column name conflicts with zone_id_column
    if zone_id_column in polygons_gdf.columns:
        raise ValueError(
            f"Column name conflict: polygons DataFrame contains column '{zone_id_column}' "
            f"which conflicts with the zone identifier column. Please rename this column "
            f"in the polygons data to avoid confusion."
        )

    # Ensure CRS match
    if polygons_gdf.crs != zones.crs:
        polygons_gdf = polygons_gdf.to_crs(zones.crs)

    # Handle aggregation method
    agg_funcs = _process_aggregation_methods(aggregation, value_columns)

    # Prepare minimal zones for spatial operations (only zone_id_column and geometry)
    minimal_zones = zones[[zone_id_column, "geometry"]].copy()

    if predicate == "fractional":
        aggregated_data = _fractional_aggregation(
            polygons_gdf, minimal_zones, value_columns, agg_funcs, zone_id_column
        )
    else:
        aggregated_data = _simple_aggregation(
            polygons_gdf,
            minimal_zones,
            value_columns,
            agg_funcs,
            zone_id_column,
            predicate,
        )

    # Merge aggregated results back to complete zones data
    result = zones.merge(
        aggregated_data[[col for col in aggregated_data.columns if col != "geometry"]],
        on=zone_id_column,
        how="left",
    )

    # Fill NaN values with zeros for the newly aggregated columns only
    aggregated_cols = [col for col in result.columns if col not in zones.columns]
    for col in aggregated_cols:
        if pd.api.types.is_numeric_dtype(result[col]):
            result[col] = result[col].fillna(0)

    # Apply output suffix consistently to result columns only
    if output_suffix:
        rename_dict = {col: f"{col}{output_suffix}" for col in aggregated_cols}
        result = result.rename(columns=rename_dict)

    if drop_geometry:
        result = result.drop(columns=["geometry"])

    return result

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: Union[float, np.array, pd.Series],
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        try:
            utm_crs = gdf_work.estimate_utm_crs()
        except Exception as e:
            LOGGER.warning(
                f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
            )
            utm_crs = "EPSG:3857"  # Fallback to Web Mercator

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            distance=buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

get_centroids(gdf)

Calculate the centroids of a (Multi)Polygon GeoDataFrame.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries.

Returns:

geopandas.GeoDataFrame A new GeoDataFrame with Point geometries representing the centroids.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def get_centroids(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate the centroids of a (Multi)Polygon GeoDataFrame.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with Point geometries representing the centroids.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Calculate centroids
    centroids = gdf.copy()
    centroids["geometry"] = centroids.geometry.centroid

    return centroids

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

sat_images

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

tif_processor

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data. Can merge multiple rasters into one during initialization.

Source code in gigaspatial/processing/tif_processor.py
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.
    Can merge multiple rasters into one during initialization.
    """

    dataset_path: Union[Path, str, List[Union[Path, str]]]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba", "multi"] = "single"
    merge_method: Literal["first", "last", "min", "max", "mean"] = "first"
    target_crs: Optional[str] = None  # For reprojection if needed
    resampling_method: Resampling = Resampling.nearest
    reprojection_resolution: Optional[Tuple[float, float]] = None

    def __post_init__(self):
        """Validate inputs, merge rasters if needed, and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(self.__class__.__name__)
        self._cache = {}
        self._temp_dir = tempfile.mkdtemp()
        self._merged_file_path = None
        self._reprojected_file_path = None

        # Handle multiple dataset paths
        if isinstance(self.dataset_path, list):
            if len(self.dataset_path) > 1:
                self.dataset_paths = [Path(p) for p in self.dataset_path]
                self._validate_multiple_datasets()
                self._merge_rasters()
                self.dataset_path = self._merged_file_path
        else:
            self.dataset_paths = [Path(self.dataset_path)]
            if not self.data_store.file_exists(str(self.dataset_path)):
                raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

            # Reproject single raster during initialization if target_crs is set
            if self.target_crs:
                self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
                with self.data_store.open(str(self.dataset_path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            self._reprojected_file_path = self._reproject_to_temp_file(
                                src, self.target_crs
                            )
                self.dataset_path = self._reprojected_file_path

        self._load_metadata()
        self._validate_mode_band_compatibility()

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset, handling temporary reprojected files."""
        if self._merged_file_path:
            with rasterio.open(self._merged_file_path) as src:
                yield src
        elif self._reprojected_file_path:
            with rasterio.open(self._reprojected_file_path) as src:
                yield src
        elif isinstance(self.data_store, LocalDataStore):
            with rasterio.open(str(self.dataset_path)) as src:
                yield src
        else:
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        yield src

    def reproject_to(
        self,
        target_crs: str,
        output_path: Optional[Union[str, Path]] = None,
        resampling_method: Optional[Resampling] = None,
        resolution: Optional[Tuple[float, float]] = None,
    ):
        """
        Reprojects the current raster to a new CRS and optionally saves it.

        Args:
            target_crs: The CRS to reproject to (e.g., "EPSG:4326").
            output_path: The path to save the reprojected raster. If None,
                         it is saved to a temporary file.
            resampling_method: The resampling method to use.
            resolution: The target resolution (pixel size) in the new CRS.
        """
        self.logger.info(f"Reprojecting raster to {target_crs}...")

        # Use provided or default values
        resampling_method = resampling_method or self.resampling_method
        resolution = resolution or self.reprojection_resolution

        with self.open_dataset() as src:
            if src.crs.to_string() == target_crs:
                self.logger.info(
                    "Raster is already in the target CRS. No reprojection needed."
                )
                # If output_path is specified, copy the file
                if output_path:
                    self.data_store.copy_file(str(self.dataset_path), output_path)
                return self.dataset_path

            dst_path = output_path or os.path.join(
                self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
            )

            with rasterio.open(
                dst_path,
                "w",
                **self._get_reprojection_profile(src, target_crs, resolution),
            ) as dst:
                for band_idx in range(1, src.count + 1):
                    reproject(
                        source=rasterio.band(src, band_idx),
                        destination=rasterio.band(dst, band_idx),
                        src_transform=src.transform,
                        src_crs=src.crs,
                        dst_transform=dst.transform,
                        dst_crs=dst.crs,
                        resampling=resampling_method,
                        num_threads=multiprocessing.cpu_count(),
                    )

            self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
            return Path(dst_path)

    def get_raster_info(self) -> Dict[str, Any]:
        """Get comprehensive raster information."""
        return {
            "count": self.count,
            "width": self.width,
            "height": self.height,
            "crs": self.crs,
            "bounds": self.bounds,
            "transform": self.transform,
            "dtypes": self.dtype,
            "nodata": self.nodata,
            "mode": self.mode,
            "is_merged": self.is_merged,
            "source_count": self.source_count,
        }

    def _reproject_to_temp_file(
        self, src: rasterio.DatasetReader, target_crs: str
    ) -> str:
        """Helper to reproject a raster and save it to a temporary file."""
        dst_path = os.path.join(
            self._temp_dir, f"reprojected_temp_{os.urandom(8).hex()}.tif"
        )
        profile = self._get_reprojection_profile(
            src, target_crs, self.reprojection_resolution
        )

        with rasterio.open(dst_path, "w", **profile) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=self.resampling_method,
                )
        return dst_path

    def _validate_multiple_datasets(self):
        """Validate that all datasets exist and have compatible properties."""
        if len(self.dataset_paths) < 2:
            raise ValueError("Multiple dataset paths required for merging")

        with self.data_store.open(str(self.dataset_paths[0]), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as ref_src:
                    ref_count = ref_src.count
                    ref_dtype = ref_src.dtypes[0]
                    ref_crs = ref_src.crs
                    ref_transform = ref_src.transform
                    ref_nodata = ref_src.nodata

        for i, path in enumerate(self.dataset_paths[1:], 1):
            with self.data_store.open(str(path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        if src.count != ref_count:
                            raise ValueError(
                                f"Dataset {i} has {src.count} bands, expected {ref_count}"
                            )
                        if src.dtypes[0] != ref_dtype:
                            raise ValueError(
                                f"Dataset {i} has dtype {src.dtypes[0]}, expected {ref_dtype}"
                            )
                        if not self.target_crs and src.crs != ref_crs:
                            self.logger.warning(
                                f"Dataset {i} has CRS {src.crs}, expected {ref_crs}. "
                                "Consider setting target_crs parameter for reprojection before merging."
                            )
                        if self.target_crs is None and not self._transforms_compatible(
                            src.transform, ref_transform
                        ):
                            self.logger.warning(
                                f"Dataset {i} has different resolution. Resampling may be needed."
                            )
                        if src.nodata != ref_nodata:
                            self.logger.warning(
                                f"Dataset {i} has different nodata value: {src.nodata} vs {ref_nodata}"
                            )

    def _get_reprojection_profile(
        self,
        src: rasterio.DatasetReader,
        target_crs: str,
        resolution: Optional[Tuple[float, float]],
        compression: str = "lzw",
    ):
        """Calculates and returns the profile for a reprojected raster."""
        if resolution:
            src_res = (abs(src.transform.a), abs(src.transform.e))
            self.logger.info(
                f"Using target resolution: {resolution}. Source resolution: {src_res}."
            )
            # Calculate transform and dimensions based on the new resolution
            dst_transform, width, height = calculate_default_transform(
                src.crs,
                target_crs,
                src.width,
                src.height,
                *src.bounds,
                resolution=resolution,
            )
        else:
            # Keep original resolution but reproject
            dst_transform, width, height = calculate_default_transform(
                src.crs, target_crs, src.width, src.height, *src.bounds
            )

        profile = src.profile.copy()
        profile.update(
            {
                "crs": target_crs,
                "transform": dst_transform,
                "width": width,
                "height": height,
                "compress": compression,  # Add compression to save space
            }
        )
        return profile

    def _transforms_compatible(self, transform1, transform2, tolerance=1e-6):
        """Check if two transforms have compatible pixel sizes."""
        return (
            abs(transform1.a - transform2.a) < tolerance
            and abs(transform1.e - transform2.e) < tolerance
        )

    def _merge_rasters(self):
        """Merge multiple rasters into a single raster."""
        self.logger.info(f"Merging {len(self.dataset_paths)} rasters...")

        # Open all datasets and handle reprojection if needed
        datasets_to_merge = []
        temp_reprojected_files = []
        try:
            for path in self.dataset_paths:
                with self.data_store.open(str(path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            if self.target_crs and src.crs != self.target_crs:
                                self.logger.info(
                                    f"Reprojecting {path.name} to {self.target_crs} before merging."
                                )
                                reprojected_path = self._reproject_to_temp_file(
                                    src, self.target_crs
                                )
                                temp_reprojected_files.append(reprojected_path)
                                datasets_to_merge.append(
                                    rasterio.open(reprojected_path)
                                )
                            else:
                                temp_path = os.path.join(
                                    self._temp_dir,
                                    f"temp_{path.stem}_{os.urandom(4).hex()}.tif",
                                )
                                temp_reprojected_files.append(temp_path)

                                profile = src.profile
                                with rasterio.open(temp_path, "w", **profile) as dst:
                                    dst.write(src.read())
                                datasets_to_merge.append(rasterio.open(temp_path))

            self._merged_file_path = os.path.join(self._temp_dir, "merged_raster.tif")

            if self.merge_method == "mean":
                merged_array, merged_transform = self._merge_with_mean(
                    datasets_to_merge
                )
            else:
                merged_array, merged_transform = merge(
                    datasets_to_merge,
                    method=self.merge_method,
                    resampling=self.resampling_method,
                )

            # Get profile from the first file in the list (all should be compatible now)
            ref_src = datasets_to_merge[0]
            profile = ref_src.profile.copy()
            profile.update(
                {
                    "height": merged_array.shape[-2],
                    "width": merged_array.shape[-1],
                    "transform": merged_transform,
                    "crs": self.target_crs if self.target_crs else ref_src.crs,
                }
            )

            with rasterio.open(self._merged_file_path, "w", **profile) as dst:
                dst.write(merged_array)
        finally:
            for dataset in datasets_to_merge:
                if hasattr(dataset, "close"):
                    dataset.close()

            # Clean up temporary files immediately
            for temp_file in temp_reprojected_files:
                try:
                    os.remove(temp_file)
                except OSError:
                    pass

        self.logger.info("Raster merging completed!")

    def _merge_with_mean(self, src_files):
        """Merge rasters using mean aggregation."""
        # Get bounds and resolution for merged raster
        bounds = src_files[0].bounds
        transform = src_files[0].transform

        for src in src_files[1:]:
            bounds = rasterio.coords.BoundingBox(
                min(bounds.left, src.bounds.left),
                min(bounds.bottom, src.bounds.bottom),
                max(bounds.right, src.bounds.right),
                max(bounds.top, src.bounds.top),
            )

        # Calculate dimensions for merged raster
        width = int((bounds.right - bounds.left) / abs(transform.a))
        height = int((bounds.top - bounds.bottom) / abs(transform.e))

        # Create new transform for merged bounds
        merged_transform = rasterio.transform.from_bounds(
            bounds.left, bounds.bottom, bounds.right, bounds.top, width, height
        )

        estimated_memory = height * width * src_files[0].count * 8  # float64
        if estimated_memory > 1e9:  # 1GB threshold
            self.logger.warning(
                f"Large memory usage expected: {estimated_memory/1e9:.1f}GB"
            )

        # Initialize arrays for sum and count
        sum_array = np.zeros((src_files[0].count, height, width), dtype=np.float64)
        count_array = np.zeros((height, width), dtype=np.int32)

        # Process each source file
        for src in src_files:
            # Read data
            data = src.read()

            # Calculate offset in merged raster
            src_bounds = src.bounds
            col_off = int((src_bounds.left - bounds.left) / abs(transform.a))
            row_off = int((bounds.top - src_bounds.top) / abs(transform.e))

            # Get valid data mask
            if src.nodata is not None:
                valid_mask = data[0] != src.nodata
            else:
                valid_mask = np.ones(data[0].shape, dtype=bool)

            # Add to sum and count arrays
            end_row = row_off + data.shape[1]
            end_col = col_off + data.shape[2]

            sum_array[:, row_off:end_row, col_off:end_col] += np.where(
                valid_mask, data, 0
            )
            count_array[row_off:end_row, col_off:end_col] += valid_mask.astype(np.int32)

        # Calculate mean
        mean_array = np.divide(
            sum_array,
            count_array,
            out=np.full_like(
                sum_array, src_files[0].nodata or 0, dtype=sum_array.dtype
            ),
            where=count_array > 0,
        )

        return mean_array.astype(src_files[0].dtypes[0]), merged_transform

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        try:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]
        except (rasterio.errors.RasterioIOError, FileNotFoundError) as e:
            raise FileNotFoundError(f"Could not read raster metadata: {e}")
        except Exception as e:
            raise RuntimeError(f"Unexpected error loading metadata: {e}")

    @property
    def is_merged(self) -> bool:
        """Check if this processor was created from multiple rasters."""
        return len(self.dataset_paths) > 1

    @property
    def source_count(self) -> int:
        """Get the number of source rasters."""
        return len(self.dataset_paths)

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def dtype(self):
        """Get the data types from the TIF file"""
        return self._cache.get("dtype", [])

    @property
    def width(self):
        return self._cache["width"]

    @property
    def height(self):
        return self._cache["height"]

    def to_dataframe(
        self, drop_nodata=True, check_memory=True, **kwargs
    ) -> pd.DataFrame:
        """
        Convert raster to DataFrame.

        Args:
            drop_nodata: Whether to drop nodata values
            check_memory: Whether to check memory before operation (default True)
            **kwargs: Additional arguments

        Returns:
            pd.DataFrame with raster data
        """
        # Memory guard check
        if check_memory:
            self._memory_guard("conversion", threshold_percent=80.0)

        try:
            if self.mode == "single":
                return self._to_dataframe(
                    band_number=kwargs.get("band_number", 1),
                    drop_nodata=drop_nodata,
                    band_names=kwargs.get("band_names", None),
                )
            else:
                return self._to_dataframe(
                    band_number=None,  # All bands
                    drop_nodata=drop_nodata,
                    band_names=kwargs.get("band_names", None),
                )
        except Exception as e:
            raise ValueError(
                f"Failed to process TIF file in mode '{self.mode}'. "
                f"Please ensure the file is valid and matches the selected mode. "
                f"Original error: {str(e)}"
            )

        return df

    def to_geodataframe(self, check_memory=True, **kwargs) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.

        Args:
            check_memory: Whether to check memory before operation
            **kwargs: Additional arguments passed to to_dataframe()

        Returns:
            gpd.GeoDataFrame with raster data
        """
        # Memory guard check
        if check_memory:
            self._memory_guard("conversion", threshold_percent=80.0)

        df = self.to_dataframe(check_memory=False, **kwargs)

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        return gdf

    def to_dataframe_chunked(
        self, drop_nodata=True, chunk_size=None, target_memory_mb=500, **kwargs
    ):
        """
        Convert raster to DataFrame using chunked processing for memory efficiency.

        Automatically routes to the appropriate chunked method based on mode.
        Chunk size is automatically calculated based on target memory usage.

        Args:
            drop_nodata: Whether to drop nodata values
            chunk_size: Number of rows per chunk (auto-calculated if None)
            target_memory_mb: Target memory per chunk in MB (default 500)
            **kwargs: Additional arguments (band_number, band_names, etc.)
        """

        if chunk_size is None:
            chunk_size = self._calculate_optimal_chunk_size(
                "conversion", target_memory_mb
            )

        windows = self._get_chunk_windows(chunk_size)

        # SIMPLE ROUTING
        if self.mode == "single":
            return self._to_dataframe_chunked(
                windows,
                band_number=kwargs.get("band_number", 1),
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )
        else:  # rgb, rgba, multi
            return self._to_dataframe_chunked(
                windows,
                band_number=None,
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )

    def clip_to_geometry(
        self,
        geometry: Union[
            Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, List[dict], dict
        ],
        crop: bool = True,
        all_touched: bool = True,
        invert: bool = False,
        nodata: Optional[Union[int, float]] = None,
        pad: bool = False,
        pad_width: float = 0.5,
        return_clipped_processor: bool = True,
    ) -> Union["TifProcessor", tuple]:
        """
        Clip raster to geometry boundaries.

        Parameters:
        -----------
        geometry : various
            Geometry to clip to. Can be:
            - Shapely Polygon or MultiPolygon
            - GeoDataFrame or GeoSeries
            - List of GeoJSON-like dicts
            - Single GeoJSON-like dict
        crop : bool, default True
            Whether to crop the raster to the extent of the geometry
        all_touched : bool, default True
            Include pixels that touch the geometry boundary
        invert : bool, default False
            If True, mask pixels inside geometry instead of outside
        nodata : int or float, optional
            Value to use for masked pixels. If None, uses raster's nodata value
        pad : bool, default False
            Pad geometry by half pixel before clipping
        pad_width : float, default 0.5
            Width of padding in pixels if pad=True
        return_clipped_processor : bool, default True
            If True, returns new TifProcessor with clipped data
            If False, returns (clipped_array, transform, metadata)

        Returns:
        --------
        TifProcessor or tuple
            Either new TifProcessor instance or (array, transform, metadata) tuple
        """
        # Handle different geometry input types
        shapes = self._prepare_geometry_for_clipping(geometry)

        # Validate CRS compatibility
        self._validate_geometry_crs(geometry)

        # Perform the clipping
        with self.open_dataset() as src:
            try:
                clipped_data, clipped_transform = mask(
                    dataset=src,
                    shapes=shapes,
                    crop=crop,
                    all_touched=all_touched,
                    invert=invert,
                    nodata=nodata,
                    pad=pad,
                    pad_width=pad_width,
                    filled=True,
                )

                # Update metadata for the clipped raster
                clipped_meta = src.meta.copy()
                clipped_meta.update(
                    {
                        "height": clipped_data.shape[1],
                        "width": clipped_data.shape[2],
                        "transform": clipped_transform,
                        "nodata": nodata if nodata is not None else src.nodata,
                    }
                )

            except ValueError as e:
                if "Input shapes do not overlap raster" in str(e):
                    raise ValueError(
                        "The geometry does not overlap with the raster. "
                        "Check that both are in the same coordinate reference system."
                    ) from e
                else:
                    raise e

        if return_clipped_processor:
            # Create a new TifProcessor with the clipped data
            return self._create_clipped_processor(clipped_data, clipped_meta)
        else:
            return clipped_data, clipped_transform, clipped_meta

    def clip_to_bounds(
        self,
        bounds: tuple,
        bounds_crs: Optional[str] = None,
        return_clipped_processor: bool = True,
    ) -> Union["TifProcessor", tuple]:
        """
        Clip raster to rectangular bounds.

        Parameters:
        -----------
        bounds : tuple
            Bounding box as (minx, miny, maxx, maxy)
        bounds_crs : str, optional
            CRS of the bounds. If None, assumes same as raster CRS
        return_clipped_processor : bool, default True
            If True, returns new TifProcessor, else returns (array, transform, metadata)

        Returns:
        --------
        TifProcessor or tuple
            Either new TifProcessor instance or (array, transform, metadata) tuple
        """
        # Create bounding box geometry
        bbox_geom = box(*bounds)

        # If bounds_crs is specified and different from raster CRS, create GeoDataFrame for reprojection
        if bounds_crs is not None:
            raster_crs = self.crs

            if not self.crs == bounds_crs:
                # Create GeoDataFrame with bounds CRS and reproject
                bbox_gdf = gpd.GeoDataFrame([1], geometry=[bbox_geom], crs=bounds_crs)
                bbox_gdf = bbox_gdf.to_crs(raster_crs)
                bbox_geom = bbox_gdf.geometry.iloc[0]

        return self.clip_to_geometry(
            geometry=bbox_geom,
            crop=True,
            return_clipped_processor=return_clipped_processor,
        )

    def to_graph(
        self,
        connectivity: Literal[4, 8] = 4,
        band: Optional[int] = None,
        include_coordinates: bool = False,
        graph_type: Literal["networkx", "sparse"] = "networkx",
        check_memory: bool = True,
    ) -> Union[nx.Graph, sp.csr_matrix]:
        """
        Convert raster to graph based on pixel adjacency.

        Args:
            connectivity: 4 or 8-connectivity
            band: Band number (1-indexed)
            include_coordinates: Include x,y coordinates in nodes
            graph_type: 'networkx' or 'sparse'
            check_memory: Whether to check memory before operation

        Returns:
            Graph representation of raster
        """

        # Memory guard check
        if check_memory:
            self._memory_guard("graph", threshold_percent=80.0)

        with self.open_dataset() as src:
            band_idx = band - 1 if band is not None else 0
            if band_idx < 0 or band_idx >= src.count:
                raise ValueError(
                    f"Band {band} not available. Raster has {src.count} bands"
                )

            data = src.read(band_idx + 1)
            nodata = src.nodata if src.nodata is not None else self.nodata
            valid_mask = (
                data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
            )

            height, width = data.shape

            # Find all valid pixels
            valid_rows, valid_cols = np.where(valid_mask)
            num_valid_pixels = len(valid_rows)

            # Create a sequential mapping from (row, col) to a node ID
            node_map = np.full(data.shape, -1, dtype=int)
            node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

            # Define neighborhood offsets
            if connectivity == 4:
                # von Neumann neighborhood (4-connectivity)
                offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
            else:  # connectivity == 8
                # Moore neighborhood (8-connectivity)
                offsets = [
                    (-1, -1),
                    (-1, 0),
                    (-1, 1),
                    (0, -1),
                    (0, 1),
                    (1, -1),
                    (1, 0),
                    (1, 1),
                ]

            # Collect nodes and edges
            nodes_to_add = []
            edges_to_add = []

            for i in range(num_valid_pixels):
                row, col = valid_rows[i], valid_cols[i]
                current_node_id = node_map[row, col]

                # Prepare node attributes
                node_attrs = {"value": float(data[row, col])}
                if include_coordinates:
                    x, y = src.xy(row, col)
                    node_attrs["x"] = x
                    node_attrs["y"] = y
                nodes_to_add.append((current_node_id, node_attrs))

                # Find neighbors and collect edges
                for dy, dx in offsets:
                    neighbor_row, neighbor_col = row + dy, col + dx

                    # Check if neighbor is within bounds and is a valid pixel
                    if (
                        0 <= neighbor_row < height
                        and 0 <= neighbor_col < width
                        and valid_mask[neighbor_row, neighbor_col]
                    ):
                        neighbor_node_id = node_map[neighbor_row, neighbor_col]

                        # Ensure each edge is added only once
                        if current_node_id < neighbor_node_id:
                            neighbor_value = float(data[neighbor_row, neighbor_col])
                            edges_to_add.append(
                                (current_node_id, neighbor_node_id, neighbor_value)
                            )

            if graph_type == "networkx":
                G = nx.Graph()
                G.add_nodes_from(nodes_to_add)
                G.add_weighted_edges_from(edges_to_add)
                return G
            else:  # sparse matrix
                edges_array = np.array(edges_to_add)
                row_indices = edges_array[:, 0]
                col_indices = edges_array[:, 1]
                weights = edges_array[:, 2]

                # Add reverse edges for symmetric matrix
                from_idx = np.append(row_indices, col_indices)
                to_idx = np.append(col_indices, row_indices)
                weights = np.append(weights, weights)

                return sp.coo_matrix(
                    (weights, (from_idx, to_idx)),
                    shape=(num_valid_pixels, num_valid_pixels),
                ).tocsr()

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]], **kwargs
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            elif self.count > 1:
                return np.array(
                    [vals for vals in src.sample(coordinate_list, **kwargs)]
                )
            else:
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self,
        polygon_list,
        stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
    ):
        """
        Sample raster values by polygons and compute statistic(s) for each polygon.

        Args:
            polygon_list: List of shapely Polygon or MultiPolygon objects.
            stat: Statistic(s) to compute. Can be:
                - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
                - Single callable: custom function that takes array and returns scalar
                - List of strings/callables: multiple statistics to compute

        Returns:
            If single stat: np.ndarray of computed statistics for each polygon
            If multiple stats: List of dictionaries with stat names as keys
        """
        # Determine if single or multiple stats
        single_stat = not isinstance(stat, list)
        stats_list = [stat] if single_stat else stat

        # Prepare stat functions
        stat_funcs = []
        stat_names = []

        for s in stats_list:
            if callable(s):
                stat_funcs.append(s)
                stat_names.append(
                    s.__name__
                    if hasattr(s, "__name__")
                    else f"custom_{len(stat_names)}"
                )
            else:
                # Handle string statistics
                if s == "count":
                    stat_funcs.append(len)
                else:
                    stat_funcs.append(getattr(np, s))
                stat_names.append(s)

        results = []

        with self.open_dataset() as src:
            for polygon in tqdm(polygon_list):
                try:
                    out_image, _ = mask(src, [polygon], crop=True, filled=False)

                    # Use masked arrays for more efficient nodata handling
                    if hasattr(out_image, "mask"):
                        valid_data = out_image.compressed()
                    else:
                        valid_data = (
                            out_image[out_image != self.nodata]
                            if self.nodata
                            else out_image.flatten()
                        )

                    if len(valid_data) == 0:
                        if single_stat:
                            results.append(np.nan)
                        else:
                            results.append({name: np.nan for name in stat_names})
                    else:
                        if single_stat:
                            results.append(stat_funcs[0](valid_data))
                        else:
                            # Compute all statistics for this polygon
                            polygon_stats = {}
                            for func, name in zip(stat_funcs, stat_names):
                                try:
                                    polygon_stats[name] = func(valid_data)
                                except Exception:
                                    polygon_stats[name] = np.nan
                            results.append(polygon_stats)

                except Exception:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})

        return np.array(results) if single_stat else results

    def sample_by_polygons_batched(
        self,
        polygon_list: List[Union[Polygon, MultiPolygon]],
        stat: Union[str, Callable] = "mean",
        batch_size: int = 100,
        n_workers: int = 4,
        show_progress: bool = True,
        check_memory: bool = True,
        **kwargs,
    ) -> np.ndarray:
        """
        Sample raster values by polygons in parallel using batching.

        Args:
            polygon_list: List of Shapely Polygon or MultiPolygon objects
            stat: Statistic to compute
            batch_size: Number of polygons per batch
            n_workers: Number of worker processes
            show_progress: Whether to display progress bar
            check_memory: Whether to check memory before operation
            **kwargs: Additional arguments

        Returns:
            np.ndarray of statistics for each polygon
        """
        import sys

        # Memory guard check with n_workers consideration
        if check_memory:
            is_safe = self._memory_guard(
                "batched_sampling",
                threshold_percent=85.0,
                n_workers=n_workers,
                raise_error=False,
            )

            if not is_safe:
                # Suggest reducing n_workers
                memory_info = self._check_available_memory()
                estimates = self._estimate_memory_usage("batched_sampling", n_workers=1)

                # Calculate optimal workers
                suggested_workers = max(
                    1, int(memory_info["available"] * 0.7 / estimates["per_worker"])
                )

                warnings.warn(
                    f"Consider reducing n_workers from {n_workers} to {suggested_workers} "
                    f"to reduce memory pressure.",
                    ResourceWarning,
                )

        # Platform check
        if sys.platform in ["win32", "darwin"]:
            import warnings
            import multiprocessing as mp

            if mp.get_start_method(allow_none=True) != "fork":
                warnings.warn(
                    "Batched sampling may not work on Windows/macOS. "
                    "Use sample_by_polygons() if you encounter errors.",
                    RuntimeWarning,
                )

        def _chunk_list(data_list, chunk_size):
            """Yield successive chunks from data_list."""
            for i in range(0, len(data_list), chunk_size):
                yield data_list[i : i + chunk_size]

        if len(polygon_list) == 0:
            return np.array([])

        stat_func = stat if callable(stat) else getattr(np, stat)
        polygon_chunks = list(_chunk_list(polygon_list, batch_size))

        with multiprocessing.Pool(
            initializer=self._initializer_worker, processes=n_workers
        ) as pool:
            process_func = partial(self._process_polygon_batch, stat_func=stat_func)
            if show_progress:
                batched_results = list(
                    tqdm(
                        pool.imap(process_func, polygon_chunks),
                        total=len(polygon_chunks),
                        desc=f"Sampling polygons",
                    )
                )
            else:
                batched_results = list(pool.imap(process_func, polygon_chunks))

            results = [item for sublist in batched_results for item in sublist]

        return np.array(results)

    def _initializer_worker(self):
        """
        Initializer function for each worker process.
        Opens the raster dataset and stores it in a process-local variable.
        This function runs once per worker, not for every task.
        """
        global src_handle, memfile_handle

        # Priority: merged > reprojected > original (same as open_dataset)
        local_file_path = None
        if self._merged_file_path:
            # Merged file is a local temp file
            local_file_path = self._merged_file_path
        elif self._reprojected_file_path:
            # Reprojected file is a local temp file
            local_file_path = self._reprojected_file_path
        elif isinstance(self.data_store, LocalDataStore):
            # Local file - can open directly
            local_file_path = str(self.dataset_path)

        if local_file_path:
            # Open local file directly
            with open(local_file_path, "rb") as f:
                memfile_handle = rasterio.MemoryFile(f.read())
                src_handle = memfile_handle.open()
        else:
            # Custom DataStore
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                memfile_handle = rasterio.MemoryFile(f.read())
                src_handle = memfile_handle.open()

    def _get_worker_dataset(self):
        """Get dataset handle for worker process."""
        global src_handle
        if src_handle is None:
            raise RuntimeError("Raster dataset not initialized in this process.")
        return src_handle

    def _process_single_polygon(self, polygon, stat_func):
        """
        Helper function to process a single polygon.
        This will be run in a separate process.
        """
        try:
            src = self._get_worker_dataset()
            out_image, _ = mask(src, [polygon], crop=True, filled=False)

            if hasattr(out_image, "mask"):
                valid_data = out_image.compressed()
            else:
                valid_data = (
                    out_image[out_image != self.nodata]
                    if self.nodata
                    else out_image.flatten()
                )

            return stat_func(valid_data) if len(valid_data) > 0 else np.nan
        except RuntimeError as e:
            self.logger.error(f"Worker not initialized: {e}")
            return np.nan
        except Exception as e:
            self.logger.debug(f"Error processing polygon: {e}")
            return np.nan

    def _process_polygon_batch(self, polygon_batch, stat_func):
        """
        Processes a batch of polygons.
        """
        return [
            self._process_single_polygon(polygon, stat_func)
            for polygon in polygon_batch
        ]

    def _to_dataframe(
        self,
        band_number: Optional[int] = None,
        drop_nodata: bool = True,
        band_names: Optional[Union[str, List[str]]] = None,
    ) -> pd.DataFrame:
        """
        Process TIF to DataFrame - handles both single-band and multi-band.

        Args:
            band_number: Specific band to read (1-indexed). If None, reads all bands.
            drop_no Whether to drop nodata values
            band_names: Custom names for bands (multi-band only)

        Returns:
            pd.DataFrame with lon, lat, and band value(s)
        """
        with self.open_dataset() as src:
            if band_number is not None:
                # SINGLE BAND MODE
                band = src.read(band_number)
                mask = self._build_data_mask(band, drop_nodata, src.nodata)
                lons, lats = self._extract_coordinates_with_mask(mask)
                pixel_values = (
                    np.extract(mask, band) if mask is not None else band.flatten()
                )
                band_name = band_names if isinstance(band_names, str) else "pixel_value"

                return pd.DataFrame({"lon": lons, "lat": lats, band_name: pixel_values})
            else:
                # MULTI-BAND MODE (all bands)
                stack = src.read()

                # Auto-detect band names by mode
                if band_names is None:
                    if self.mode == "rgb":
                        band_names = ["red", "green", "blue"]
                    elif self.mode == "rgba":
                        band_names = ["red", "green", "blue", "alpha"]
                    else:
                        band_names = [
                            src.descriptions[i] or f"band_{i+1}"
                            for i in range(self.count)
                        ]

                # Build mask (checks ALL bands!)
                mask = self._build_multi_band_mask(stack, drop_nodata, src.nodata)

                # Create DataFrame
                data_dict = self._bands_to_dict(stack, self.count, band_names, mask)
                df = pd.DataFrame(data_dict)

                # RGBA: normalize alpha if needed
                if (
                    self.mode == "rgba"
                    and "alpha" in df.columns
                    and df["alpha"].max() > 1
                ):
                    df["alpha"] = df["alpha"] / 255.0

            return df

    def _to_dataframe_chunked(
        self,
        windows: List[rasterio.windows.Window],
        band_number: Optional[int] = None,
        drop_nodata: bool = True,
        band_names: Optional[Union[str, List[str]]] = None,
        show_progress: bool = True,
    ) -> pd.DataFrame:
        """Universal chunked converter for ALL modes."""

        chunks = []
        iterator = tqdm(windows, desc="Processing chunks") if show_progress else windows

        with self.open_dataset() as src:
            # Auto-detect band names ONCE (before loop)
            if band_number is None and band_names is None:
                if self.mode == "rgb":
                    band_names = ["red", "green", "blue"]
                elif self.mode == "rgba":
                    band_names = ["red", "green", "blue", "alpha"]
                else:  # multi
                    band_names = [
                        src.descriptions[i] or f"band_{i+1}" for i in range(self.count)
                    ]

            for window in iterator:
                if band_number is not None:
                    # SINGLE BAND
                    band_chunk = src.read(band_number, window=window)
                    mask = self._build_data_mask(band_chunk, drop_nodata, src.nodata)
                    lons, lats = self._get_chunk_coordinates(window, src)
                    band_name = (
                        band_names if isinstance(band_names, str) else "pixel_value"
                    )

                    # Build chunk DataFrame (could use helper but simple enough)
                    if mask is not None:
                        mask_flat = mask.flatten()
                        chunk_df = pd.DataFrame(
                            {
                                "lon": lons[mask_flat],
                                "lat": lats[mask_flat],
                                band_name: band_chunk.flatten()[mask_flat],
                            }
                        )
                    else:
                        chunk_df = pd.DataFrame(
                            {"lon": lons, "lat": lats, band_name: band_chunk.flatten()}
                        )
                else:
                    # MULTI-BAND (includes RGB/RGBA)
                    stack_chunk = src.read(window=window)
                    mask = self._build_multi_band_mask(
                        stack_chunk, drop_nodata, src.nodata
                    )
                    lons, lats = self._get_chunk_coordinates(window, src)

                    # Build DataFrame using helper
                    band_dict = {
                        band_names[i]: stack_chunk[i] for i in range(self.count)
                    }
                    chunk_df = self._build_chunk_dataframe(lons, lats, band_dict, mask)

                    # RGBA: normalize alpha
                    if self.mode == "rgba" and "alpha" in chunk_df.columns:
                        if chunk_df["alpha"].max() > 1:
                            chunk_df["alpha"] = chunk_df["alpha"] / 255.0

                chunks.append(chunk_df)

        result = pd.concat(chunks, ignore_index=True)
        return result

    def _prepare_geometry_for_clipping(
        self,
        geometry: Union[
            Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, List[dict], dict
        ],
    ) -> List[dict]:
        """Convert various geometry formats to list of GeoJSON-like dicts for rasterio.mask"""

        if isinstance(geometry, (Polygon, MultiPolygon)):
            # Shapely geometry
            return [geometry.__geo_interface__]

        elif isinstance(geometry, gpd.GeoDataFrame):
            # GeoDataFrame - use all geometries
            return [
                geom.__geo_interface__ for geom in geometry.geometry if geom is not None
            ]

        elif isinstance(geometry, gpd.GeoSeries):
            # GeoSeries
            return [geom.__geo_interface__ for geom in geometry if geom is not None]

        elif isinstance(geometry, dict):
            # Single GeoJSON-like dict
            return [geometry]

        elif isinstance(geometry, list):
            # List of GeoJSON-like dicts
            return geometry

        else:
            raise TypeError(
                f"Unsupported geometry type: {type(geometry)}. "
                "Supported types: Shapely geometries, GeoDataFrame, GeoSeries, "
                "GeoJSON-like dict, or list of GeoJSON-like dicts."
            )

    def _validate_geometry_crs(
        self,
        original_geometry: Any,
    ) -> None:
        """Validate that geometry CRS matches raster CRS"""

        # Get raster CRS
        raster_crs = self.crs

        # Try to get geometry CRS
        geometry_crs = None

        if isinstance(original_geometry, (gpd.GeoDataFrame, gpd.GeoSeries)):
            geometry_crs = original_geometry.crs
        elif hasattr(original_geometry, "crs"):
            geometry_crs = original_geometry.crs

        # Warn if CRS mismatch detected
        if geometry_crs is not None and raster_crs is not None:
            if not raster_crs == geometry_crs:
                self.logger.warning(
                    f"CRS mismatch detected! Raster CRS: {raster_crs}, "
                    f"Geometry CRS: {geometry_crs}. "
                    "Consider reprojecting geometry to match raster CRS for accurate clipping."
                )

    def _create_clipped_processor(
        self, clipped_data: np.ndarray, clipped_meta: dict
    ) -> "TifProcessor":
        """
        Helper to create a new TifProcessor instance from clipped data.
        Saves the clipped data to a temporary file and initializes a new TifProcessor.
        """
        clipped_file_path = os.path.join(
            self._temp_dir, f"clipped_temp_{os.urandom(8).hex()}.tif"
        )
        with rasterio.open(clipped_file_path, "w", **clipped_meta) as dst:
            dst.write(clipped_data)

        self.logger.info(f"Clipped raster saved to temporary file: {clipped_file_path}")

        # Create a new TifProcessor instance with the clipped data
        # Pass relevant parameters from the current instance to maintain consistency
        return TifProcessor(
            dataset_path=clipped_file_path,
            data_store=self.data_store,
            mode=self.mode,
        )

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]

    def _get_chunk_coordinates(self, window, src):
        """Get coordinates for a specific window chunk."""
        transform = src.window_transform(window)
        rows, cols = np.meshgrid(
            np.arange(window.height), np.arange(window.width), indexing="ij"
        )
        xs, ys = rasterio.transform.xy(transform, rows.flatten(), cols.flatten())
        return np.array(xs), np.array(ys)

    def _extract_coordinates_with_mask(self, mask=None):
        """Extract flattened coordinates, optionally applying a mask."""
        x_coords, y_coords = self._get_pixel_coordinates()

        if mask is not None:
            return np.extract(mask, x_coords), np.extract(mask, y_coords)

        return x_coords.flatten(), y_coords.flatten()

    def _build_data_mask(self, data, drop_nodata=True, nodata_value=None):
        """Build a boolean mask for filtering data based on nodata values."""
        if not drop_nodata or nodata_value is None:
            return None

        return data != nodata_value

    def _build_multi_band_mask(
        self,
        bands: np.ndarray,
        drop_nodata: bool = True,
        nodata_value: Optional[float] = None,
    ) -> Optional[np.ndarray]:
        """
        Build mask for multi-band data - drops pixels where ANY band has nodata.

        Args:
            bands: 3D array of shape (n_bands, height, width)
            drop_nodata Whether to drop nodata values
            nodata_value: The nodata value to check

        Returns:
            Boolean mask or None if no masking needed
        """
        if not drop_nodata or nodata_value is None:
            return None

        # Check if ANY band has nodata at each pixel location
        has_nodata = np.any(bands == nodata_value, axis=0)

        # Return True where ALL bands are valid
        valid_mask = ~has_nodata

        return valid_mask if not valid_mask.all() else None

    def _bands_to_dict(self, bands, band_count, band_names, mask=None):
        """Read specified bands and return as a dictionary with optional masking."""

        lons, lats = self._extract_coordinates_with_mask(mask)
        data_dict = {"lon": lons, "lat": lats}

        for idx, name in enumerate(band_names[:band_count]):
            band_data = bands[idx]
            data_dict[name] = (
                np.extract(mask, band_data) if mask is not None else band_data.flatten()
            )

        return data_dict

    def _calculate_optimal_chunk_size(
        self, operation: str = "conversion", target_memory_mb: int = 500
    ) -> int:
        """
        Calculate optimal chunk size (number of rows) based on target memory usage.

        Args:
            operation: Type of operation ('conversion', 'graph')
            target_memory_mb: Target memory per chunk in megabytes

        Returns:
            Number of rows per chunk
        """
        bytes_per_element = np.dtype(self.dtype).itemsize
        n_bands = self.count
        width = self.width

        # Adjust for operation type
        if operation == "conversion":
            # DataFrame overhead is roughly 2x
            bytes_per_row = width * n_bands * bytes_per_element * 2
        elif operation == "graph":
            # Graph needs additional space for edges
            bytes_per_row = width * bytes_per_element * 4  # Estimate
        else:
            bytes_per_row = width * n_bands * bytes_per_element

        target_bytes = target_memory_mb * 1024 * 1024
        chunk_rows = max(1, int(target_bytes / bytes_per_row))

        # Ensure chunk size doesn't exceed total height
        chunk_rows = min(chunk_rows, self.height)

        self.logger.info(
            f"Calculated chunk size: {chunk_rows} rows "
            f"(~{self._format_bytes(chunk_rows * bytes_per_row)} per chunk)"
        )

        return chunk_rows

    def _get_chunk_windows(self, chunk_size: int) -> List[rasterio.windows.Window]:
        """
        Generate window objects for chunked reading.

        Args:
            chunk_size: Number of rows per chunk

        Returns:
            List of rasterio.windows.Window objects
        """
        windows = []
        for row_start in range(0, self.height, chunk_size):
            row_end = min(row_start + chunk_size, self.height)
            window = rasterio.windows.Window(
                col_off=0,
                row_off=row_start,
                width=self.width,
                height=row_end - row_start,
            )
            windows.append(window)

        return windows

    def _format_bytes(self, bytes_value: int) -> str:
        """Convert bytes to human-readable format."""
        for unit in ["B", "KB", "MB", "GB", "TB"]:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} PB"

    def _check_available_memory(self) -> dict:
        """
        Check available system memory.

        Returns:
            Dict with total, available, and used memory info
        """
        import psutil

        memory = psutil.virtual_memory()
        return {
            "total": memory.total,
            "available": memory.available,
            "used": memory.used,
            "percent": memory.percent,
            "available_human": self._format_bytes(memory.available),
        }

    def _estimate_memory_usage(
        self, operation: str = "conversion", n_workers: int = 1
    ) -> dict:
        """
        Estimate memory usage for various operations.

        Args:
            operation: Type of operation ('conversion', 'batched_sampling', 'merge', 'graph')
            n_workers: Number of workers (for batched_sampling)

        Returns:
            Dict with estimated memory usage in bytes and human-readable format
        """
        bytes_per_element = np.dtype(self.dtype).itemsize
        n_pixels = self.width * self.height
        n_bands = self.count

        estimates = {}

        if operation == "conversion":
            # to_dataframe/to_geodataframe: full raster + DataFrame overhead
            raster_memory = n_pixels * n_bands * bytes_per_element
            # DataFrame overhead (roughly 2x for storage + processing)
            dataframe_memory = (
                n_pixels * n_bands * 16
            )  # 16 bytes per value in DataFrame
            total = raster_memory + dataframe_memory
            estimates["raster"] = raster_memory
            estimates["dataframe"] = dataframe_memory
            estimates["total"] = total

        elif operation == "batched_sampling":
            # Each worker loads full raster into MemoryFile
            # Need to get file size
            if self._merged_file_path:
                file_path = self._merged_file_path
            elif self._reprojected_file_path:
                file_path = self._reprojected_file_path
            else:
                file_path = str(self.dataset_path)

            try:
                import os

                file_size = os.path.getsize(file_path)
            except:
                # Estimate if can't get file size
                file_size = n_pixels * n_bands * bytes_per_element * 1.2  # Add overhead

            estimates["per_worker"] = file_size
            estimates["total"] = file_size * n_workers

        elif operation == "merge":
            # _merge_with_mean uses float64 arrays
            raster_memory = n_pixels * n_bands * 8  # float64
            estimates["sum_array"] = raster_memory
            estimates["count_array"] = n_pixels * 4  # int32
            estimates["total"] = raster_memory + n_pixels * 4

        elif operation == "graph":
            # to_graph: data + node_map + edges
            data_memory = n_pixels * bytes_per_element
            node_map_memory = n_pixels * 4  # int32
            # Estimate edges (rough: 4-connectivity = 4 edges per pixel)
            edges_memory = n_pixels * 4 * 3 * 8  # 3 values per edge, float64
            total = data_memory + node_map_memory + edges_memory
            estimates["data"] = data_memory
            estimates["node_map"] = node_map_memory
            estimates["edges"] = edges_memory
            estimates["total"] = total

        # Add human-readable format
        estimates["human_readable"] = self._format_bytes(estimates["total"])

        return estimates

    def _memory_guard(
        self,
        operation: str,
        threshold_percent: float = 80.0,
        n_workers: Optional[int] = None,
        raise_error: bool = False,
    ) -> bool:
        """
        Check if operation is safe to perform given memory constraints.

        Args:
            operation: Type of operation to check
            threshold_percent: Maximum % of available memory to use (default 80%)
            n_workers: Number of workers (for batched operations)
            raise_error: If True, raise MemoryError instead of warning

        Returns:
            True if operation is safe, False otherwise

        Raises:
            MemoryError: If raise_error=True and memory insufficient
        """
        import warnings

        estimates = self._estimate_memory_usage(operation, n_workers=n_workers or 1)
        memory_info = self._check_available_memory()

        estimated_usage = estimates["total"]
        available = memory_info["available"]
        threshold = available * (threshold_percent / 100.0)

        is_safe = estimated_usage <= threshold

        if not is_safe:
            usage_str = self._format_bytes(estimated_usage)
            available_str = memory_info["available_human"]

            message = (
                f"Memory warning: {operation} operation may require {usage_str} "
                f"but only {available_str} is available. "
                f"Current memory usage: {memory_info['percent']:.1f}%"
            )

            if raise_error:
                raise MemoryError(message)
            else:
                warnings.warn(message, ResourceWarning)
                if hasattr(self, "logger"):
                    self.logger.warning(message)

        return is_safe

    def _validate_mode_band_compatibility(self):
        """Validate that mode matches band count."""
        mode_requirements = {
            "single": (1, "1-band"),
            "rgb": (3, "3-band"),
            "rgba": (4, "4-band"),
        }

        if self.mode in mode_requirements:
            required_count, description = mode_requirements[self.mode]
            if self.count != required_count:
                raise ValueError(
                    f"{self.mode.upper()} mode requires a {description} TIF file"
                )
        elif self.mode == "multi" and self.count < 2:
            raise ValueError("Multi mode requires a TIF file with 2 or more bands")

    def __enter__(self):
        return self

    def __del__(self):
        """Clean up temporary files and directories."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir, ignore_errors=True)

    def cleanup(self):
        """Explicit cleanup method for better control."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir)
            self.logger.info("Cleaned up temporary files")

    def __exit__(self, exc_type, exc_value, traceback):
        """Proper context manager exit with cleanup."""
        self.cleanup()
        return False
bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

dtype property

Get the data types from the TIF file

is_merged: bool property

Check if this processor was created from multiple rasters.

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

source_count: int property

Get the number of source rasters.

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__del__()

Clean up temporary files and directories.

Source code in gigaspatial/processing/tif_processor.py
def __del__(self):
    """Clean up temporary files and directories."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir, ignore_errors=True)
__exit__(exc_type, exc_value, traceback)

Proper context manager exit with cleanup.

Source code in gigaspatial/processing/tif_processor.py
def __exit__(self, exc_type, exc_value, traceback):
    """Proper context manager exit with cleanup."""
    self.cleanup()
    return False
__post_init__()

Validate inputs, merge rasters if needed, and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs, merge rasters if needed, and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(self.__class__.__name__)
    self._cache = {}
    self._temp_dir = tempfile.mkdtemp()
    self._merged_file_path = None
    self._reprojected_file_path = None

    # Handle multiple dataset paths
    if isinstance(self.dataset_path, list):
        if len(self.dataset_path) > 1:
            self.dataset_paths = [Path(p) for p in self.dataset_path]
            self._validate_multiple_datasets()
            self._merge_rasters()
            self.dataset_path = self._merged_file_path
    else:
        self.dataset_paths = [Path(self.dataset_path)]
        if not self.data_store.file_exists(str(self.dataset_path)):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        # Reproject single raster during initialization if target_crs is set
        if self.target_crs:
            self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        self._reprojected_file_path = self._reproject_to_temp_file(
                            src, self.target_crs
                        )
            self.dataset_path = self._reprojected_file_path

    self._load_metadata()
    self._validate_mode_band_compatibility()
cleanup()

Explicit cleanup method for better control.

Source code in gigaspatial/processing/tif_processor.py
def cleanup(self):
    """Explicit cleanup method for better control."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir)
        self.logger.info("Cleaned up temporary files")
clip_to_bounds(bounds, bounds_crs=None, return_clipped_processor=True)

Clip raster to rectangular bounds.

Parameters:

bounds : tuple Bounding box as (minx, miny, maxx, maxy) bounds_crs : str, optional CRS of the bounds. If None, assumes same as raster CRS return_clipped_processor : bool, default True If True, returns new TifProcessor, else returns (array, transform, metadata)

Returns:

TifProcessor or tuple Either new TifProcessor instance or (array, transform, metadata) tuple

Source code in gigaspatial/processing/tif_processor.py
def clip_to_bounds(
    self,
    bounds: tuple,
    bounds_crs: Optional[str] = None,
    return_clipped_processor: bool = True,
) -> Union["TifProcessor", tuple]:
    """
    Clip raster to rectangular bounds.

    Parameters:
    -----------
    bounds : tuple
        Bounding box as (minx, miny, maxx, maxy)
    bounds_crs : str, optional
        CRS of the bounds. If None, assumes same as raster CRS
    return_clipped_processor : bool, default True
        If True, returns new TifProcessor, else returns (array, transform, metadata)

    Returns:
    --------
    TifProcessor or tuple
        Either new TifProcessor instance or (array, transform, metadata) tuple
    """
    # Create bounding box geometry
    bbox_geom = box(*bounds)

    # If bounds_crs is specified and different from raster CRS, create GeoDataFrame for reprojection
    if bounds_crs is not None:
        raster_crs = self.crs

        if not self.crs == bounds_crs:
            # Create GeoDataFrame with bounds CRS and reproject
            bbox_gdf = gpd.GeoDataFrame([1], geometry=[bbox_geom], crs=bounds_crs)
            bbox_gdf = bbox_gdf.to_crs(raster_crs)
            bbox_geom = bbox_gdf.geometry.iloc[0]

    return self.clip_to_geometry(
        geometry=bbox_geom,
        crop=True,
        return_clipped_processor=return_clipped_processor,
    )
clip_to_geometry(geometry, crop=True, all_touched=True, invert=False, nodata=None, pad=False, pad_width=0.5, return_clipped_processor=True)

Clip raster to geometry boundaries.

Parameters:

geometry : various Geometry to clip to. Can be: - Shapely Polygon or MultiPolygon - GeoDataFrame or GeoSeries - List of GeoJSON-like dicts - Single GeoJSON-like dict crop : bool, default True Whether to crop the raster to the extent of the geometry all_touched : bool, default True Include pixels that touch the geometry boundary invert : bool, default False If True, mask pixels inside geometry instead of outside nodata : int or float, optional Value to use for masked pixels. If None, uses raster's nodata value pad : bool, default False Pad geometry by half pixel before clipping pad_width : float, default 0.5 Width of padding in pixels if pad=True return_clipped_processor : bool, default True If True, returns new TifProcessor with clipped data If False, returns (clipped_array, transform, metadata)

Returns:

TifProcessor or tuple Either new TifProcessor instance or (array, transform, metadata) tuple

Source code in gigaspatial/processing/tif_processor.py
def clip_to_geometry(
    self,
    geometry: Union[
        Polygon, MultiPolygon, gpd.GeoDataFrame, gpd.GeoSeries, List[dict], dict
    ],
    crop: bool = True,
    all_touched: bool = True,
    invert: bool = False,
    nodata: Optional[Union[int, float]] = None,
    pad: bool = False,
    pad_width: float = 0.5,
    return_clipped_processor: bool = True,
) -> Union["TifProcessor", tuple]:
    """
    Clip raster to geometry boundaries.

    Parameters:
    -----------
    geometry : various
        Geometry to clip to. Can be:
        - Shapely Polygon or MultiPolygon
        - GeoDataFrame or GeoSeries
        - List of GeoJSON-like dicts
        - Single GeoJSON-like dict
    crop : bool, default True
        Whether to crop the raster to the extent of the geometry
    all_touched : bool, default True
        Include pixels that touch the geometry boundary
    invert : bool, default False
        If True, mask pixels inside geometry instead of outside
    nodata : int or float, optional
        Value to use for masked pixels. If None, uses raster's nodata value
    pad : bool, default False
        Pad geometry by half pixel before clipping
    pad_width : float, default 0.5
        Width of padding in pixels if pad=True
    return_clipped_processor : bool, default True
        If True, returns new TifProcessor with clipped data
        If False, returns (clipped_array, transform, metadata)

    Returns:
    --------
    TifProcessor or tuple
        Either new TifProcessor instance or (array, transform, metadata) tuple
    """
    # Handle different geometry input types
    shapes = self._prepare_geometry_for_clipping(geometry)

    # Validate CRS compatibility
    self._validate_geometry_crs(geometry)

    # Perform the clipping
    with self.open_dataset() as src:
        try:
            clipped_data, clipped_transform = mask(
                dataset=src,
                shapes=shapes,
                crop=crop,
                all_touched=all_touched,
                invert=invert,
                nodata=nodata,
                pad=pad,
                pad_width=pad_width,
                filled=True,
            )

            # Update metadata for the clipped raster
            clipped_meta = src.meta.copy()
            clipped_meta.update(
                {
                    "height": clipped_data.shape[1],
                    "width": clipped_data.shape[2],
                    "transform": clipped_transform,
                    "nodata": nodata if nodata is not None else src.nodata,
                }
            )

        except ValueError as e:
            if "Input shapes do not overlap raster" in str(e):
                raise ValueError(
                    "The geometry does not overlap with the raster. "
                    "Check that both are in the same coordinate reference system."
                ) from e
            else:
                raise e

    if return_clipped_processor:
        # Create a new TifProcessor with the clipped data
        return self._create_clipped_processor(clipped_data, clipped_meta)
    else:
        return clipped_data, clipped_transform, clipped_meta
get_raster_info()

Get comprehensive raster information.

Source code in gigaspatial/processing/tif_processor.py
def get_raster_info(self) -> Dict[str, Any]:
    """Get comprehensive raster information."""
    return {
        "count": self.count,
        "width": self.width,
        "height": self.height,
        "crs": self.crs,
        "bounds": self.bounds,
        "transform": self.transform,
        "dtypes": self.dtype,
        "nodata": self.nodata,
        "mode": self.mode,
        "is_merged": self.is_merged,
        "source_count": self.source_count,
    }
open_dataset()

Context manager for accessing the dataset, handling temporary reprojected files.

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset, handling temporary reprojected files."""
    if self._merged_file_path:
        with rasterio.open(self._merged_file_path) as src:
            yield src
    elif self._reprojected_file_path:
        with rasterio.open(self._reprojected_file_path) as src:
            yield src
    elif isinstance(self.data_store, LocalDataStore):
        with rasterio.open(str(self.dataset_path)) as src:
            yield src
    else:
        with self.data_store.open(str(self.dataset_path), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src
reproject_to(target_crs, output_path=None, resampling_method=None, resolution=None)

Reprojects the current raster to a new CRS and optionally saves it.

Parameters:

Name Type Description Default
target_crs str

The CRS to reproject to (e.g., "EPSG:4326").

required
output_path Optional[Union[str, Path]]

The path to save the reprojected raster. If None, it is saved to a temporary file.

None
resampling_method Optional[Resampling]

The resampling method to use.

None
resolution Optional[Tuple[float, float]]

The target resolution (pixel size) in the new CRS.

None
Source code in gigaspatial/processing/tif_processor.py
def reproject_to(
    self,
    target_crs: str,
    output_path: Optional[Union[str, Path]] = None,
    resampling_method: Optional[Resampling] = None,
    resolution: Optional[Tuple[float, float]] = None,
):
    """
    Reprojects the current raster to a new CRS and optionally saves it.

    Args:
        target_crs: The CRS to reproject to (e.g., "EPSG:4326").
        output_path: The path to save the reprojected raster. If None,
                     it is saved to a temporary file.
        resampling_method: The resampling method to use.
        resolution: The target resolution (pixel size) in the new CRS.
    """
    self.logger.info(f"Reprojecting raster to {target_crs}...")

    # Use provided or default values
    resampling_method = resampling_method or self.resampling_method
    resolution = resolution or self.reprojection_resolution

    with self.open_dataset() as src:
        if src.crs.to_string() == target_crs:
            self.logger.info(
                "Raster is already in the target CRS. No reprojection needed."
            )
            # If output_path is specified, copy the file
            if output_path:
                self.data_store.copy_file(str(self.dataset_path), output_path)
            return self.dataset_path

        dst_path = output_path or os.path.join(
            self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
        )

        with rasterio.open(
            dst_path,
            "w",
            **self._get_reprojection_profile(src, target_crs, resolution),
        ) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=resampling_method,
                    num_threads=multiprocessing.cpu_count(),
                )

        self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
        return Path(dst_path)
sample_by_polygons(polygon_list, stat='mean')

Sample raster values by polygons and compute statistic(s) for each polygon.

Parameters:

Name Type Description Default
polygon_list

List of shapely Polygon or MultiPolygon objects.

required
stat Union[str, Callable, List[Union[str, Callable]]]

Statistic(s) to compute. Can be: - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count' - Single callable: custom function that takes array and returns scalar - List of strings/callables: multiple statistics to compute

'mean'

Returns:

Type Description

If single stat: np.ndarray of computed statistics for each polygon

If multiple stats: List of dictionaries with stat names as keys

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self,
    polygon_list,
    stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
):
    """
    Sample raster values by polygons and compute statistic(s) for each polygon.

    Args:
        polygon_list: List of shapely Polygon or MultiPolygon objects.
        stat: Statistic(s) to compute. Can be:
            - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
            - Single callable: custom function that takes array and returns scalar
            - List of strings/callables: multiple statistics to compute

    Returns:
        If single stat: np.ndarray of computed statistics for each polygon
        If multiple stats: List of dictionaries with stat names as keys
    """
    # Determine if single or multiple stats
    single_stat = not isinstance(stat, list)
    stats_list = [stat] if single_stat else stat

    # Prepare stat functions
    stat_funcs = []
    stat_names = []

    for s in stats_list:
        if callable(s):
            stat_funcs.append(s)
            stat_names.append(
                s.__name__
                if hasattr(s, "__name__")
                else f"custom_{len(stat_names)}"
            )
        else:
            # Handle string statistics
            if s == "count":
                stat_funcs.append(len)
            else:
                stat_funcs.append(getattr(np, s))
            stat_names.append(s)

    results = []

    with self.open_dataset() as src:
        for polygon in tqdm(polygon_list):
            try:
                out_image, _ = mask(src, [polygon], crop=True, filled=False)

                # Use masked arrays for more efficient nodata handling
                if hasattr(out_image, "mask"):
                    valid_data = out_image.compressed()
                else:
                    valid_data = (
                        out_image[out_image != self.nodata]
                        if self.nodata
                        else out_image.flatten()
                    )

                if len(valid_data) == 0:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})
                else:
                    if single_stat:
                        results.append(stat_funcs[0](valid_data))
                    else:
                        # Compute all statistics for this polygon
                        polygon_stats = {}
                        for func, name in zip(stat_funcs, stat_names):
                            try:
                                polygon_stats[name] = func(valid_data)
                            except Exception:
                                polygon_stats[name] = np.nan
                        results.append(polygon_stats)

            except Exception:
                if single_stat:
                    results.append(np.nan)
                else:
                    results.append({name: np.nan for name in stat_names})

    return np.array(results) if single_stat else results
sample_by_polygons_batched(polygon_list, stat='mean', batch_size=100, n_workers=4, show_progress=True, check_memory=True, **kwargs)

Sample raster values by polygons in parallel using batching.

Parameters:

Name Type Description Default
polygon_list List[Union[Polygon, MultiPolygon]]

List of Shapely Polygon or MultiPolygon objects

required
stat Union[str, Callable]

Statistic to compute

'mean'
batch_size int

Number of polygons per batch

100
n_workers int

Number of worker processes

4
show_progress bool

Whether to display progress bar

True
check_memory bool

Whether to check memory before operation

True
**kwargs

Additional arguments

{}

Returns:

Type Description
ndarray

np.ndarray of statistics for each polygon

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons_batched(
    self,
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: Union[str, Callable] = "mean",
    batch_size: int = 100,
    n_workers: int = 4,
    show_progress: bool = True,
    check_memory: bool = True,
    **kwargs,
) -> np.ndarray:
    """
    Sample raster values by polygons in parallel using batching.

    Args:
        polygon_list: List of Shapely Polygon or MultiPolygon objects
        stat: Statistic to compute
        batch_size: Number of polygons per batch
        n_workers: Number of worker processes
        show_progress: Whether to display progress bar
        check_memory: Whether to check memory before operation
        **kwargs: Additional arguments

    Returns:
        np.ndarray of statistics for each polygon
    """
    import sys

    # Memory guard check with n_workers consideration
    if check_memory:
        is_safe = self._memory_guard(
            "batched_sampling",
            threshold_percent=85.0,
            n_workers=n_workers,
            raise_error=False,
        )

        if not is_safe:
            # Suggest reducing n_workers
            memory_info = self._check_available_memory()
            estimates = self._estimate_memory_usage("batched_sampling", n_workers=1)

            # Calculate optimal workers
            suggested_workers = max(
                1, int(memory_info["available"] * 0.7 / estimates["per_worker"])
            )

            warnings.warn(
                f"Consider reducing n_workers from {n_workers} to {suggested_workers} "
                f"to reduce memory pressure.",
                ResourceWarning,
            )

    # Platform check
    if sys.platform in ["win32", "darwin"]:
        import warnings
        import multiprocessing as mp

        if mp.get_start_method(allow_none=True) != "fork":
            warnings.warn(
                "Batched sampling may not work on Windows/macOS. "
                "Use sample_by_polygons() if you encounter errors.",
                RuntimeWarning,
            )

    def _chunk_list(data_list, chunk_size):
        """Yield successive chunks from data_list."""
        for i in range(0, len(data_list), chunk_size):
            yield data_list[i : i + chunk_size]

    if len(polygon_list) == 0:
        return np.array([])

    stat_func = stat if callable(stat) else getattr(np, stat)
    polygon_chunks = list(_chunk_list(polygon_list, batch_size))

    with multiprocessing.Pool(
        initializer=self._initializer_worker, processes=n_workers
    ) as pool:
        process_func = partial(self._process_polygon_batch, stat_func=stat_func)
        if show_progress:
            batched_results = list(
                tqdm(
                    pool.imap(process_func, polygon_chunks),
                    total=len(polygon_chunks),
                    desc=f"Sampling polygons",
                )
            )
        else:
            batched_results = list(pool.imap(process_func, polygon_chunks))

        results = [item for sublist in batched_results for item in sublist]

    return np.array(results)
to_dataframe(drop_nodata=True, check_memory=True, **kwargs)

Convert raster to DataFrame.

Parameters:

Name Type Description Default
drop_nodata

Whether to drop nodata values

True
check_memory

Whether to check memory before operation (default True)

True
**kwargs

Additional arguments

{}

Returns:

Type Description
DataFrame

pd.DataFrame with raster data

Source code in gigaspatial/processing/tif_processor.py
def to_dataframe(
    self, drop_nodata=True, check_memory=True, **kwargs
) -> pd.DataFrame:
    """
    Convert raster to DataFrame.

    Args:
        drop_nodata: Whether to drop nodata values
        check_memory: Whether to check memory before operation (default True)
        **kwargs: Additional arguments

    Returns:
        pd.DataFrame with raster data
    """
    # Memory guard check
    if check_memory:
        self._memory_guard("conversion", threshold_percent=80.0)

    try:
        if self.mode == "single":
            return self._to_dataframe(
                band_number=kwargs.get("band_number", 1),
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )
        else:
            return self._to_dataframe(
                band_number=None,  # All bands
                drop_nodata=drop_nodata,
                band_names=kwargs.get("band_names", None),
            )
    except Exception as e:
        raise ValueError(
            f"Failed to process TIF file in mode '{self.mode}'. "
            f"Please ensure the file is valid and matches the selected mode. "
            f"Original error: {str(e)}"
        )

    return df
to_dataframe_chunked(drop_nodata=True, chunk_size=None, target_memory_mb=500, **kwargs)

Convert raster to DataFrame using chunked processing for memory efficiency.

Automatically routes to the appropriate chunked method based on mode. Chunk size is automatically calculated based on target memory usage.

Parameters:

Name Type Description Default
drop_nodata

Whether to drop nodata values

True
chunk_size

Number of rows per chunk (auto-calculated if None)

None
target_memory_mb

Target memory per chunk in MB (default 500)

500
**kwargs

Additional arguments (band_number, band_names, etc.)

{}
Source code in gigaspatial/processing/tif_processor.py
def to_dataframe_chunked(
    self, drop_nodata=True, chunk_size=None, target_memory_mb=500, **kwargs
):
    """
    Convert raster to DataFrame using chunked processing for memory efficiency.

    Automatically routes to the appropriate chunked method based on mode.
    Chunk size is automatically calculated based on target memory usage.

    Args:
        drop_nodata: Whether to drop nodata values
        chunk_size: Number of rows per chunk (auto-calculated if None)
        target_memory_mb: Target memory per chunk in MB (default 500)
        **kwargs: Additional arguments (band_number, band_names, etc.)
    """

    if chunk_size is None:
        chunk_size = self._calculate_optimal_chunk_size(
            "conversion", target_memory_mb
        )

    windows = self._get_chunk_windows(chunk_size)

    # SIMPLE ROUTING
    if self.mode == "single":
        return self._to_dataframe_chunked(
            windows,
            band_number=kwargs.get("band_number", 1),
            drop_nodata=drop_nodata,
            band_names=kwargs.get("band_names", None),
        )
    else:  # rgb, rgba, multi
        return self._to_dataframe_chunked(
            windows,
            band_number=None,
            drop_nodata=drop_nodata,
            band_names=kwargs.get("band_names", None),
        )
to_geodataframe(check_memory=True, **kwargs)

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Parameters:

Name Type Description Default
check_memory

Whether to check memory before operation

True
**kwargs

Additional arguments passed to to_dataframe()

{}

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame with raster data

Source code in gigaspatial/processing/tif_processor.py
def to_geodataframe(self, check_memory=True, **kwargs) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.

    Args:
        check_memory: Whether to check memory before operation
        **kwargs: Additional arguments passed to to_dataframe()

    Returns:
        gpd.GeoDataFrame with raster data
    """
    # Memory guard check
    if check_memory:
        self._memory_guard("conversion", threshold_percent=80.0)

    df = self.to_dataframe(check_memory=False, **kwargs)

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    return gdf
to_graph(connectivity=4, band=None, include_coordinates=False, graph_type='networkx', check_memory=True)

Convert raster to graph based on pixel adjacency.

Parameters:

Name Type Description Default
connectivity Literal[4, 8]

4 or 8-connectivity

4
band Optional[int]

Band number (1-indexed)

None
include_coordinates bool

Include x,y coordinates in nodes

False
graph_type Literal['networkx', 'sparse']

'networkx' or 'sparse'

'networkx'
check_memory bool

Whether to check memory before operation

True

Returns:

Type Description
Union[Graph, csr_matrix]

Graph representation of raster

Source code in gigaspatial/processing/tif_processor.py
def to_graph(
    self,
    connectivity: Literal[4, 8] = 4,
    band: Optional[int] = None,
    include_coordinates: bool = False,
    graph_type: Literal["networkx", "sparse"] = "networkx",
    check_memory: bool = True,
) -> Union[nx.Graph, sp.csr_matrix]:
    """
    Convert raster to graph based on pixel adjacency.

    Args:
        connectivity: 4 or 8-connectivity
        band: Band number (1-indexed)
        include_coordinates: Include x,y coordinates in nodes
        graph_type: 'networkx' or 'sparse'
        check_memory: Whether to check memory before operation

    Returns:
        Graph representation of raster
    """

    # Memory guard check
    if check_memory:
        self._memory_guard("graph", threshold_percent=80.0)

    with self.open_dataset() as src:
        band_idx = band - 1 if band is not None else 0
        if band_idx < 0 or band_idx >= src.count:
            raise ValueError(
                f"Band {band} not available. Raster has {src.count} bands"
            )

        data = src.read(band_idx + 1)
        nodata = src.nodata if src.nodata is not None else self.nodata
        valid_mask = (
            data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
        )

        height, width = data.shape

        # Find all valid pixels
        valid_rows, valid_cols = np.where(valid_mask)
        num_valid_pixels = len(valid_rows)

        # Create a sequential mapping from (row, col) to a node ID
        node_map = np.full(data.shape, -1, dtype=int)
        node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

        # Define neighborhood offsets
        if connectivity == 4:
            # von Neumann neighborhood (4-connectivity)
            offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        else:  # connectivity == 8
            # Moore neighborhood (8-connectivity)
            offsets = [
                (-1, -1),
                (-1, 0),
                (-1, 1),
                (0, -1),
                (0, 1),
                (1, -1),
                (1, 0),
                (1, 1),
            ]

        # Collect nodes and edges
        nodes_to_add = []
        edges_to_add = []

        for i in range(num_valid_pixels):
            row, col = valid_rows[i], valid_cols[i]
            current_node_id = node_map[row, col]

            # Prepare node attributes
            node_attrs = {"value": float(data[row, col])}
            if include_coordinates:
                x, y = src.xy(row, col)
                node_attrs["x"] = x
                node_attrs["y"] = y
            nodes_to_add.append((current_node_id, node_attrs))

            # Find neighbors and collect edges
            for dy, dx in offsets:
                neighbor_row, neighbor_col = row + dy, col + dx

                # Check if neighbor is within bounds and is a valid pixel
                if (
                    0 <= neighbor_row < height
                    and 0 <= neighbor_col < width
                    and valid_mask[neighbor_row, neighbor_col]
                ):
                    neighbor_node_id = node_map[neighbor_row, neighbor_col]

                    # Ensure each edge is added only once
                    if current_node_id < neighbor_node_id:
                        neighbor_value = float(data[neighbor_row, neighbor_col])
                        edges_to_add.append(
                            (current_node_id, neighbor_node_id, neighbor_value)
                        )

        if graph_type == "networkx":
            G = nx.Graph()
            G.add_nodes_from(nodes_to_add)
            G.add_weighted_edges_from(edges_to_add)
            return G
        else:  # sparse matrix
            edges_array = np.array(edges_to_add)
            row_indices = edges_array[:, 0]
            col_indices = edges_array[:, 1]
            weights = edges_array[:, 2]

            # Add reverse edges for symmetric matrix
            from_idx = np.append(row_indices, col_indices)
            to_idx = np.append(col_indices, row_indices)
            weights = np.append(weights, weights)

            return sp.coo_matrix(
                (weights, (from_idx, to_idx)),
                shape=(num_valid_pixels, num_valid_pixels),
            ).tocsr()

utils

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df