Skip to content

Processing Module

gigaspatial.processing

DataStore

Bases: ABC

Abstract base class defining the interface for data store implementations. This class serves as a parent for both local and cloud-based storage solutions.

Source code in gigaspatial/core/io/data_store.py
class DataStore(ABC):
    """
    Abstract base class defining the interface for data store implementations.
    This class serves as a parent for both local and cloud-based storage solutions.
    """

    @abstractmethod
    def read_file(self, path: str) -> Any:
        """
        Read contents of a file from the data store.

        Args:
            path: Path to the file to read

        Returns:
            Contents of the file

        Raises:
            IOError: If file cannot be read
        """
        pass

    @abstractmethod
    def write_file(self, path: str, data: Any) -> None:
        """
        Write data to a file in the data store.

        Args:
            path: Path where to write the file
            data: Data to write to the file

        Raises:
            IOError: If file cannot be written
        """
        pass

    @abstractmethod
    def file_exists(self, path: str) -> bool:
        """
        Check if a file exists in the data store.

        Args:
            path: Path to check

        Returns:
            True if file exists, False otherwise
        """
        pass

    @abstractmethod
    def list_files(self, path: str) -> List[str]:
        """
        List all files in a directory.

        Args:
            path: Directory path to list

        Returns:
            List of file paths in the directory
        """
        pass

    @abstractmethod
    def walk(self, top: str) -> Generator:
        """
        Walk through directory tree, similar to os.walk().

        Args:
            top: Starting directory for the walk

        Returns:
            Generator yielding tuples of (dirpath, dirnames, filenames)
        """
        pass

    @abstractmethod
    def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
        """
        Context manager for file operations.

        Args:
            file: Path to the file
            mode: File mode ('r', 'w', 'rb', 'wb')

        Yields:
            File-like object

        Raises:
            IOError: If file cannot be opened
        """
        pass

    @abstractmethod
    def is_file(self, path: str) -> bool:
        """
        Check if path points to a file.

        Args:
            path: Path to check

        Returns:
            True if path is a file, False otherwise
        """
        pass

    @abstractmethod
    def is_dir(self, path: str) -> bool:
        """
        Check if path points to a directory.

        Args:
            path: Path to check

        Returns:
            True if path is a directory, False otherwise
        """
        pass

    @abstractmethod
    def remove(self, path: str) -> None:
        """
        Remove a file.

        Args:
            path: Path to the file to remove

        Raises:
            IOError: If file cannot be removed
        """
        pass

    @abstractmethod
    def rmdir(self, dir: str) -> None:
        """
        Remove a directory and all its contents.

        Args:
            dir: Path to the directory to remove

        Raises:
            IOError: If directory cannot be removed
        """
        pass

file_exists(path) abstractmethod

Check if a file exists in the data store.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def file_exists(self, path: str) -> bool:
    """
    Check if a file exists in the data store.

    Args:
        path: Path to check

    Returns:
        True if file exists, False otherwise
    """
    pass

is_dir(path) abstractmethod

Check if path points to a directory.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a directory, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_dir(self, path: str) -> bool:
    """
    Check if path points to a directory.

    Args:
        path: Path to check

    Returns:
        True if path is a directory, False otherwise
    """
    pass

is_file(path) abstractmethod

Check if path points to a file.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a file, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_file(self, path: str) -> bool:
    """
    Check if path points to a file.

    Args:
        path: Path to check

    Returns:
        True if path is a file, False otherwise
    """
    pass

list_files(path) abstractmethod

List all files in a directory.

Parameters:

Name Type Description Default
path str

Directory path to list

required

Returns:

Type Description
List[str]

List of file paths in the directory

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def list_files(self, path: str) -> List[str]:
    """
    List all files in a directory.

    Args:
        path: Directory path to list

    Returns:
        List of file paths in the directory
    """
    pass

open(file, mode='r') abstractmethod

Context manager for file operations.

Parameters:

Name Type Description Default
file str

Path to the file

required
mode str

File mode ('r', 'w', 'rb', 'wb')

'r'

Yields:

Type Description
Union[str, bytes]

File-like object

Raises:

Type Description
IOError

If file cannot be opened

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
    """
    Context manager for file operations.

    Args:
        file: Path to the file
        mode: File mode ('r', 'w', 'rb', 'wb')

    Yields:
        File-like object

    Raises:
        IOError: If file cannot be opened
    """
    pass

read_file(path) abstractmethod

Read contents of a file from the data store.

Parameters:

Name Type Description Default
path str

Path to the file to read

required

Returns:

Type Description
Any

Contents of the file

Raises:

Type Description
IOError

If file cannot be read

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def read_file(self, path: str) -> Any:
    """
    Read contents of a file from the data store.

    Args:
        path: Path to the file to read

    Returns:
        Contents of the file

    Raises:
        IOError: If file cannot be read
    """
    pass

remove(path) abstractmethod

Remove a file.

Parameters:

Name Type Description Default
path str

Path to the file to remove

required

Raises:

Type Description
IOError

If file cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def remove(self, path: str) -> None:
    """
    Remove a file.

    Args:
        path: Path to the file to remove

    Raises:
        IOError: If file cannot be removed
    """
    pass

rmdir(dir) abstractmethod

Remove a directory and all its contents.

Parameters:

Name Type Description Default
dir str

Path to the directory to remove

required

Raises:

Type Description
IOError

If directory cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def rmdir(self, dir: str) -> None:
    """
    Remove a directory and all its contents.

    Args:
        dir: Path to the directory to remove

    Raises:
        IOError: If directory cannot be removed
    """
    pass

walk(top) abstractmethod

Walk through directory tree, similar to os.walk().

Parameters:

Name Type Description Default
top str

Starting directory for the walk

required

Returns:

Type Description
Generator

Generator yielding tuples of (dirpath, dirnames, filenames)

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def walk(self, top: str) -> Generator:
    """
    Walk through directory tree, similar to os.walk().

    Args:
        top: Starting directory for the walk

    Returns:
        Generator yielding tuples of (dirpath, dirnames, filenames)
    """
    pass

write_file(path, data) abstractmethod

Write data to a file in the data store.

Parameters:

Name Type Description Default
path str

Path where to write the file

required
data Any

Data to write to the file

required

Raises:

Type Description
IOError

If file cannot be written

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def write_file(self, path: str, data: Any) -> None:
    """
    Write data to a file in the data store.

    Args:
        path: Path where to write the file
        data: Data to write to the file

    Raises:
        IOError: If file cannot be written
    """
    pass

LocalDataStore

Bases: DataStore

Implementation for local filesystem storage.

Source code in gigaspatial/core/io/local_data_store.py
class LocalDataStore(DataStore):
    """Implementation for local filesystem storage."""

    def __init__(self, base_path: Union[str, Path] = ""):
        super().__init__()
        self.base_path = Path(base_path).resolve()

    def _resolve_path(self, path: str) -> Path:
        """Resolve path relative to base directory."""
        return self.base_path / path

    def read_file(self, path: str) -> bytes:
        full_path = self._resolve_path(path)
        with open(full_path, "rb") as f:
            return f.read()

    def write_file(self, path: str, data: Union[bytes, str]) -> None:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)

        if isinstance(data, str):
            mode = "w"
            encoding = "utf-8"
        else:
            mode = "wb"
            encoding = None

        with open(full_path, mode, encoding=encoding) as f:
            f.write(data)

    def file_exists(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def list_files(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)
        return [
            str(f.relative_to(self.base_path))
            for f in full_path.iterdir()
            if f.is_file()
        ]

    def walk(self, top: str) -> Generator[Tuple[str, List[str], List[str]], None, None]:
        full_path = self._resolve_path(top)
        for root, dirs, files in os.walk(full_path):
            rel_root = str(Path(root).relative_to(self.base_path))
            yield rel_root, dirs, files

    def list_directories(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)

        if not full_path.exists():
            return []

        if not full_path.is_dir():
            return []

        return [d.name for d in full_path.iterdir() if d.is_dir()]

    def open(self, path: str, mode: str = "r") -> IO:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)
        return open(full_path, mode)

    def is_file(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def is_dir(self, path: str) -> bool:
        return self._resolve_path(path).is_dir()

    def remove(self, path: str) -> None:
        full_path = self._resolve_path(path)
        if full_path.is_file():
            os.remove(full_path)

    def copy_file(self, src: str, dst: str) -> None:
        """Copy a file from src to dst."""
        src_path = self._resolve_path(src)
        dst_path = self._resolve_path(dst)
        self.mkdir(str(dst_path.parent), exist_ok=True)
        shutil.copy2(src_path, dst_path)

    def rmdir(self, directory: str) -> None:
        full_path = self._resolve_path(directory)
        if full_path.is_dir():
            os.rmdir(full_path)

    def mkdir(self, path: str, exist_ok: bool = False) -> None:
        full_path = self._resolve_path(path)
        full_path.mkdir(parents=True, exist_ok=exist_ok)

    def exists(self, path: str) -> bool:
        return self._resolve_path(path).exists()

copy_file(src, dst)

Copy a file from src to dst.

Source code in gigaspatial/core/io/local_data_store.py
def copy_file(self, src: str, dst: str) -> None:
    """Copy a file from src to dst."""
    src_path = self._resolve_path(src)
    dst_path = self._resolve_path(dst)
    self.mkdir(str(dst_path.parent), exist_ok=True)
    shutil.copy2(src_path, dst_path)

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data. Can merge multiple rasters into one during initialization.

Source code in gigaspatial/processing/tif_processor.py
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.
    Can merge multiple rasters into one during initialization.
    """

    dataset_path: Union[Path, str, List[Union[Path, str]]]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba", "multi"] = "single"
    merge_method: Literal["first", "last", "min", "max", "mean"] = "first"
    target_crs: Optional[str] = None  # For reprojection if needed
    resampling_method: Resampling = Resampling.nearest
    reprojection_resolution: Optional[Tuple[float, float]] = None

    def __post_init__(self):
        """Validate inputs, merge rasters if needed, and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(self.__class__.__name__)
        self._cache = {}
        self._temp_dir = tempfile.mkdtemp()
        self._merged_file_path = None
        self._reprojected_file_path = None

        # Handle multiple dataset paths
        if isinstance(self.dataset_path, list):
            if len(self.dataset_path) > 1:
                self.dataset_paths = [Path(p) for p in self.dataset_path]
                self._validate_multiple_datasets()
                self._merge_rasters()
                self.dataset_path = self._merged_file_path
        else:
            self.dataset_paths = [Path(self.dataset_path)]
            if not self.data_store.file_exists(str(self.dataset_path)):
                raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

            # Reproject single raster during initialization if target_crs is set
            if self.target_crs:
                self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
                with self.data_store.open(str(self.dataset_path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            self._reprojected_file_path = self._reproject_to_temp_file(
                                src, self.target_crs
                            )
                self.dataset_path = self._reprojected_file_path

        self._load_metadata()

        # Validate mode and band count
        if self.mode == "rgba" and self.count != 4:
            raise ValueError("RGBA mode requires a 4-band TIF file")
        if self.mode == "rgb" and self.count != 3:
            raise ValueError("RGB mode requires a 3-band TIF file")
        if self.mode == "single" and self.count != 1:
            raise ValueError("Single mode requires a 1-band TIF file")
        if self.mode == "multi" and self.count < 2:
            raise ValueError("Multi mode requires a TIF file with 2 or more bands")

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset, handling temporary reprojected files."""
        if self._merged_file_path:
            with rasterio.open(self._merged_file_path) as src:
                yield src
        elif self._reprojected_file_path:
            with rasterio.open(self._reprojected_file_path) as src:
                yield src
        else:
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        yield src

    def reproject_to(
        self,
        target_crs: str,
        output_path: Optional[Union[str, Path]] = None,
        resampling_method: Optional[Resampling] = None,
        resolution: Optional[Tuple[float, float]] = None,
    ):
        """
        Reprojects the current raster to a new CRS and optionally saves it.

        Args:
            target_crs: The CRS to reproject to (e.g., "EPSG:4326").
            output_path: The path to save the reprojected raster. If None,
                         it is saved to a temporary file.
            resampling_method: The resampling method to use.
            resolution: The target resolution (pixel size) in the new CRS.
        """
        self.logger.info(f"Reprojecting raster to {target_crs}...")

        # Use provided or default values
        resampling_method = resampling_method or self.resampling_method
        resolution = resolution or self.reprojection_resolution

        with self.open_dataset() as src:
            if src.crs.to_string() == target_crs:
                self.logger.info(
                    "Raster is already in the target CRS. No reprojection needed."
                )
                # If output_path is specified, copy the file
                if output_path:
                    self.data_store.copy_file(str(self.dataset_path), output_path)
                return self.dataset_path

            dst_path = output_path or os.path.join(
                self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
            )

            with rasterio.open(
                dst_path,
                "w",
                **self._get_reprojection_profile(src, target_crs, resolution),
            ) as dst:
                for band_idx in range(1, src.count + 1):
                    reproject(
                        source=rasterio.band(src, band_idx),
                        destination=rasterio.band(dst, band_idx),
                        src_transform=src.transform,
                        src_crs=src.crs,
                        dst_transform=dst.transform,
                        dst_crs=dst.crs,
                        resampling=resampling_method,
                        num_threads=multiprocessing.cpu_count(),
                    )

            self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
            return Path(dst_path)

    def get_raster_info(self) -> Dict[str, Any]:
        """Get comprehensive raster information."""
        return {
            "count": self.count,
            "width": self.width,
            "height": self.height,
            "crs": self.crs,
            "bounds": self.bounds,
            "transform": self.transform,
            "dtypes": self.dtype,
            "nodata": self.nodata,
            "mode": self.mode,
            "is_merged": self.is_merged,
            "source_count": self.source_count,
        }

    def _reproject_to_temp_file(
        self, src: rasterio.DatasetReader, target_crs: str
    ) -> str:
        """Helper to reproject a raster and save it to a temporary file."""
        dst_path = os.path.join(
            self._temp_dir, f"reprojected_temp_{os.urandom(8).hex()}.tif"
        )
        profile = self._get_reprojection_profile(
            src, target_crs, self.reprojection_resolution
        )

        with rasterio.open(dst_path, "w", **profile) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=self.resampling_method,
                )
        return dst_path

    def _validate_multiple_datasets(self):
        """Validate that all datasets exist and have compatible properties."""
        if len(self.dataset_paths) < 2:
            raise ValueError("Multiple dataset paths required for merging")

        with self.data_store.open(str(self.dataset_paths[0]), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as ref_src:
                    ref_count = ref_src.count
                    ref_dtype = ref_src.dtypes[0]
                    ref_crs = ref_src.crs
                    ref_transform = ref_src.transform
                    ref_nodata = ref_src.nodata

        for i, path in enumerate(self.dataset_paths[1:], 1):
            with self.data_store.open(str(path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        if src.count != ref_count:
                            raise ValueError(
                                f"Dataset {i} has {src.count} bands, expected {ref_count}"
                            )
                        if src.dtypes[0] != ref_dtype:
                            raise ValueError(
                                f"Dataset {i} has dtype {src.dtypes[0]}, expected {ref_dtype}"
                            )
                        if not self.target_crs and src.crs != ref_crs:
                            self.logger.warning(
                                f"Dataset {i} has CRS {src.crs}, expected {ref_crs}. "
                                "Consider setting target_crs parameter for reprojection before merging."
                            )
                        if self.target_crs is None and not self._transforms_compatible(
                            src.transform, ref_transform
                        ):
                            self.logger.warning(
                                f"Dataset {i} has different resolution. Resampling may be needed."
                            )
                        if src.nodata != ref_nodata:
                            self.logger.warning(
                                f"Dataset {i} has different nodata value: {src.nodata} vs {ref_nodata}"
                            )

    def _get_reprojection_profile(
        self,
        src: rasterio.DatasetReader,
        target_crs: str,
        resolution: Optional[Tuple[float, float]],
        compression: str = "lzw",
    ):
        """Calculates and returns the profile for a reprojected raster."""
        if resolution:
            src_res = (abs(src.transform.a), abs(src.transform.e))
            self.logger.info(
                f"Using target resolution: {resolution}. Source resolution: {src_res}."
            )
            # Calculate transform and dimensions based on the new resolution
            dst_transform, width, height = calculate_default_transform(
                src.crs,
                target_crs,
                src.width,
                src.height,
                *src.bounds,
                resolution=resolution,
            )
        else:
            # Keep original resolution but reproject
            dst_transform, width, height = calculate_default_transform(
                src.crs, target_crs, src.width, src.height, *src.bounds
            )

        profile = src.profile.copy()
        profile.update(
            {
                "crs": target_crs,
                "transform": dst_transform,
                "width": width,
                "height": height,
                "compress": compression,  # Add compression to save space
            }
        )
        return profile

    def _transforms_compatible(self, transform1, transform2, tolerance=1e-6):
        """Check if two transforms have compatible pixel sizes."""
        return (
            abs(transform1.a - transform2.a) < tolerance
            and abs(transform1.e - transform2.e) < tolerance
        )

    def _merge_rasters(self):
        """Merge multiple rasters into a single raster."""
        self.logger.info(f"Merging {len(self.dataset_paths)} rasters...")

        # Open all datasets and handle reprojection if needed
        datasets_to_merge = []
        temp_reprojected_files = []
        try:
            for path in self.dataset_paths:
                with self.data_store.open(str(path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            if self.target_crs and src.crs != self.target_crs:
                                self.logger.info(
                                    f"Reprojecting {path.name} to {self.target_crs} before merging."
                                )
                                reprojected_path = self._reproject_to_temp_file(
                                    src, self.target_crs
                                )
                                temp_reprojected_files.append(reprojected_path)
                                datasets_to_merge.append(
                                    rasterio.open(reprojected_path)
                                )
                            else:
                                temp_path = os.path.join(
                                    self._temp_dir,
                                    f"temp_{path.stem}_{os.urandom(4).hex()}.tif",
                                )
                                temp_reprojected_files.append(temp_path)

                                profile = src.profile
                                with rasterio.open(temp_path, "w", **profile) as dst:
                                    dst.write(src.read())
                                datasets_to_merge.append(rasterio.open(temp_path))

            self._merged_file_path = os.path.join(self._temp_dir, "merged_raster.tif")

            if self.merge_method == "mean":
                merged_array, merged_transform = self._merge_with_mean(
                    datasets_to_merge
                )
            else:
                merged_array, merged_transform = merge(
                    datasets_to_merge,
                    method=self.merge_method,
                    resampling=self.resampling_method,
                )

            # Get profile from the first file in the list (all should be compatible now)
            ref_src = datasets_to_merge[0]
            profile = ref_src.profile.copy()
            profile.update(
                {
                    "height": merged_array.shape[-2],
                    "width": merged_array.shape[-1],
                    "transform": merged_transform,
                    "crs": self.target_crs if self.target_crs else ref_src.crs,
                }
            )

            with rasterio.open(self._merged_file_path, "w", **profile) as dst:
                dst.write(merged_array)
        finally:
            for dataset in datasets_to_merge:
                if hasattr(dataset, "close"):
                    dataset.close()

            # Clean up temporary files immediately
            for temp_file in temp_reprojected_files:
                try:
                    os.remove(temp_file)
                except OSError:
                    pass

        self.logger.info("Raster merging completed!")

    def _merge_with_mean(self, src_files):
        """Merge rasters using mean aggregation."""
        # Get bounds and resolution for merged raster
        bounds = src_files[0].bounds
        transform = src_files[0].transform

        for src in src_files[1:]:
            bounds = rasterio.coords.BoundingBox(
                min(bounds.left, src.bounds.left),
                min(bounds.bottom, src.bounds.bottom),
                max(bounds.right, src.bounds.right),
                max(bounds.top, src.bounds.top),
            )

        # Calculate dimensions for merged raster
        width = int((bounds.right - bounds.left) / abs(transform.a))
        height = int((bounds.top - bounds.bottom) / abs(transform.e))

        # Create new transform for merged bounds
        merged_transform = rasterio.transform.from_bounds(
            bounds.left, bounds.bottom, bounds.right, bounds.top, width, height
        )

        estimated_memory = height * width * src_files[0].count * 8  # float64
        if estimated_memory > 1e9:  # 1GB threshold
            self.logger.warning(
                f"Large memory usage expected: {estimated_memory/1e9:.1f}GB"
            )

        # Initialize arrays for sum and count
        sum_array = np.zeros((src_files[0].count, height, width), dtype=np.float64)
        count_array = np.zeros((height, width), dtype=np.int32)

        # Process each source file
        for src in src_files:
            # Read data
            data = src.read()

            # Calculate offset in merged raster
            src_bounds = src.bounds
            col_off = int((src_bounds.left - bounds.left) / abs(transform.a))
            row_off = int((bounds.top - src_bounds.top) / abs(transform.e))

            # Get valid data mask
            if src.nodata is not None:
                valid_mask = data[0] != src.nodata
            else:
                valid_mask = np.ones(data[0].shape, dtype=bool)

            # Add to sum and count arrays
            end_row = row_off + data.shape[1]
            end_col = col_off + data.shape[2]

            sum_array[:, row_off:end_row, col_off:end_col] += np.where(
                valid_mask, data, 0
            )
            count_array[row_off:end_row, col_off:end_col] += valid_mask.astype(np.int32)

        # Calculate mean
        mean_array = np.divide(
            sum_array,
            count_array,
            out=np.full_like(
                sum_array, src_files[0].nodata or 0, dtype=sum_array.dtype
            ),
            where=count_array > 0,
        )

        return mean_array.astype(src_files[0].dtypes[0]), merged_transform

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        try:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]
        except (rasterio.errors.RasterioIOError, FileNotFoundError) as e:
            raise FileNotFoundError(f"Could not read raster metadata: {e}")
        except Exception as e:
            raise RuntimeError(f"Unexpected error loading metadata: {e}")

    @property
    def is_merged(self) -> bool:
        """Check if this processor was created from multiple rasters."""
        return len(self.dataset_paths) > 1

    @property
    def source_count(self) -> int:
        """Get the number of source rasters."""
        return len(self.dataset_paths)

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def dtype(self):
        """Get the data types from the TIF file"""
        return self._cache.get("dtype", [])

    @property
    def width(self):
        return self._cache["width"]

    @property
    def height(self):
        return self._cache["height"]

    def to_dataframe(self, drop_nodata=True, **kwargs) -> pd.DataFrame:
        try:
            if self.mode == "single":
                df = self._to_band_dataframe(drop_nodata=drop_nodata, **kwargs)
            elif self.mode == "rgb":
                df = self._to_rgb_dataframe(drop_nodata=drop_nodata)
            elif self.mode == "rgba":
                df = self._to_rgba_dataframe(drop_transparent=drop_nodata)
            elif self.mode == "multi":
                df = self._to_multi_band_dataframe(drop_nodata=drop_nodata, **kwargs)
            else:
                raise ValueError(
                    f"Invalid mode: {self.mode}. Must be one of: single, rgb, rgba, multi"
                )
        except Exception as e:
            raise ValueError(
                f"Failed to process TIF file in mode '{self.mode}'. "
                f"Please ensure the file is valid and matches the selected mode. "
                f"Original error: {str(e)}"
            )

        return df

    def to_geodataframe(self, **kwargs) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.
        """
        df = self.to_dataframe(**kwargs)

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        return gdf

    def to_graph(
        self,
        connectivity: Literal[4, 8] = 4,
        band: Optional[int] = None,
        include_coordinates: bool = False,
        graph_type: Literal["networkx", "sparse"] = "networkx",
        chunk_size: Optional[int] = None,
    ) -> Union[nx.Graph, sp.csr_matrix]:
        """
        Convert raster to graph based on pixel adjacency.
        """
        if chunk_size is not None:
            raise NotImplementedError("Chunked processing is not yet implemented.")

        with self.open_dataset() as src:
            band_idx = band - 1 if band is not None else 0
            if band_idx < 0 or band_idx >= src.count:
                raise ValueError(
                    f"Band {band} not available. Raster has {src.count} bands"
                )

            data = src.read(band_idx + 1)
            nodata = src.nodata if src.nodata is not None else self.nodata
            valid_mask = (
                data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
            )

            height, width = data.shape

            # Find all valid pixels
            valid_rows, valid_cols = np.where(valid_mask)
            num_valid_pixels = len(valid_rows)

            # Create a sequential mapping from (row, col) to a node ID
            node_map = np.full(data.shape, -1, dtype=int)
            node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

            # Define neighborhood offsets
            if connectivity == 4:
                # von Neumann neighborhood (4-connectivity)
                offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
            else:  # connectivity == 8
                # Moore neighborhood (8-connectivity)
                offsets = [
                    (-1, -1),
                    (-1, 0),
                    (-1, 1),
                    (0, -1),
                    (0, 1),
                    (1, -1),
                    (1, 0),
                    (1, 1),
                ]

            # Collect nodes and edges
            nodes_to_add = []
            edges_to_add = []

            for i in range(num_valid_pixels):
                row, col = valid_rows[i], valid_cols[i]
                current_node_id = node_map[row, col]

                # Prepare node attributes
                node_attrs = {"value": float(data[row, col])}
                if include_coordinates:
                    x, y = src.xy(row, col)
                    node_attrs["x"] = x
                    node_attrs["y"] = y
                nodes_to_add.append((current_node_id, node_attrs))

                # Find neighbors and collect edges
                for dy, dx in offsets:
                    neighbor_row, neighbor_col = row + dy, col + dx

                    # Check if neighbor is within bounds and is a valid pixel
                    if (
                        0 <= neighbor_row < height
                        and 0 <= neighbor_col < width
                        and valid_mask[neighbor_row, neighbor_col]
                    ):
                        neighbor_node_id = node_map[neighbor_row, neighbor_col]

                        # Ensure each edge is added only once
                        if current_node_id < neighbor_node_id:
                            neighbor_value = float(data[neighbor_row, neighbor_col])
                            edges_to_add.append(
                                (current_node_id, neighbor_node_id, neighbor_value)
                            )

            if graph_type == "networkx":
                G = nx.Graph()
                G.add_nodes_from(nodes_to_add)
                G.add_weighted_edges_from(edges_to_add)
                return G
            else:  # sparse matrix
                edges_array = np.array(edges_to_add)
                row_indices = edges_array[:, 0]
                col_indices = edges_array[:, 1]
                weights = edges_array[:, 2]

                # Add reverse edges for symmetric matrix
                row_indices.extend(col_indices)
                col_indices.extend(row_indices)
                weights.extend(weights)

                return sp.coo_matrix(
                    (weights, (row_indices, col_indices)),
                    shape=(num_valid_pixels, num_valid_pixels),
                ).tocsr()

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]], **kwargs
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            elif self.count > 1:
                return np.array(
                    [vals for vals in src.sample(coordinate_list, **kwargs)]
                )
            else:
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self,
        polygon_list,
        stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
    ):
        """
        Sample raster values by polygons and compute statistic(s) for each polygon.

        Args:
            polygon_list: List of shapely Polygon or MultiPolygon objects.
            stat: Statistic(s) to compute. Can be:
                - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
                - Single callable: custom function that takes array and returns scalar
                - List of strings/callables: multiple statistics to compute

        Returns:
            If single stat: np.ndarray of computed statistics for each polygon
            If multiple stats: List of dictionaries with stat names as keys
        """
        # Determine if single or multiple stats
        single_stat = not isinstance(stat, list)
        stats_list = [stat] if single_stat else stat

        # Prepare stat functions
        stat_funcs = []
        stat_names = []

        for s in stats_list:
            if callable(s):
                stat_funcs.append(s)
                stat_names.append(
                    s.__name__
                    if hasattr(s, "__name__")
                    else f"custom_{len(stat_names)}"
                )
            else:
                # Handle string statistics
                if s == "count":
                    stat_funcs.append(len)
                else:
                    stat_funcs.append(getattr(np, s))
                stat_names.append(s)

        results = []

        with self.open_dataset() as src:
            for polygon in tqdm(polygon_list):
                try:
                    out_image, _ = mask(src, [polygon], crop=True, filled=False)

                    # Use masked arrays for more efficient nodata handling
                    if hasattr(out_image, "mask"):
                        valid_data = out_image.compressed()
                    else:
                        valid_data = (
                            out_image[out_image != self.nodata]
                            if self.nodata
                            else out_image.flatten()
                        )

                    if len(valid_data) == 0:
                        if single_stat:
                            results.append(np.nan)
                        else:
                            results.append({name: np.nan for name in stat_names})
                    else:
                        if single_stat:
                            results.append(stat_funcs[0](valid_data))
                        else:
                            # Compute all statistics for this polygon
                            polygon_stats = {}
                            for func, name in zip(stat_funcs, stat_names):
                                try:
                                    polygon_stats[name] = func(valid_data)
                                except Exception:
                                    polygon_stats[name] = np.nan
                            results.append(polygon_stats)

                except Exception:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})

        return np.array(results) if single_stat else results

    def sample_by_polygons_batched(
        self,
        polygon_list: List[Union[Polygon, MultiPolygon]],
        stat: Union[str, Callable] = "mean",
        batch_size: int = 100,
        n_workers: int = 4,
        **kwargs,
    ) -> np.ndarray:
        """
        Sample raster values by polygons in parallel using batching.
        """

        def _chunk_list(data_list, chunk_size):
            """Yield successive chunks from data_list."""
            for i in range(0, len(data_list), chunk_size):
                yield data_list[i : i + chunk_size]

        if len(polygon_list) == 0:
            return np.array([])

        stat_func = stat if callable(stat) else getattr(np, stat)

        polygon_chunks = list(_chunk_list(polygon_list, batch_size))

        with multiprocessing.Pool(
            initializer=self._initializer_worker, processes=n_workers
        ) as pool:
            process_func = partial(self._process_polygon_batch, stat_func=stat_func)
            batched_results = list(
                tqdm(
                    pool.imap(process_func, polygon_chunks),
                    total=len(polygon_chunks),
                    desc=f"Sampling polygons",
                )
            )

            results = [item for sublist in batched_results for item in sublist]

        return np.array(results)

    def _initializer_worker(self):
        """
        Initializer function for each worker process.
        Opens the raster dataset and stores it in a process-local variable.
        This function runs once per worker, not for every task.
        """
        global src_handle, memfile_handle
        with self.data_store.open(str(self.dataset_path), "rb") as f:
            memfile_handle = rasterio.MemoryFile(f.read())
            src_handle = memfile_handle.open()

    def _process_single_polygon(self, polygon, stat_func):
        """
        Helper function to process a single polygon.
        This will be run in a separate process.
        """
        global src_handle
        if src_handle is None:
            # This should not happen if the initializer is set up correctly,
            # but it's a good defensive check.
            raise RuntimeError("Raster dataset not initialized in this process.")

        try:
            out_image, _ = mask(src_handle, [polygon], crop=True, filled=False)

            if hasattr(out_image, "mask"):
                valid_data = out_image.compressed()
            else:
                valid_data = (
                    out_image[out_image != self.nodata]
                    if self.nodata
                    else out_image.flatten()
                )

            if len(valid_data) == 0:
                return np.nan
            else:
                return stat_func(valid_data)
        except Exception:
            return np.nan

    def _process_polygon_batch(self, polygon_batch, stat_func):
        """
        Processes a batch of polygons.
        """
        return [
            self._process_single_polygon(polygon, stat_func)
            for polygon in polygon_batch
        ]

    def _to_rgba_dataframe(self, drop_transparent: bool = False) -> pd.DataFrame:
        """
        Convert RGBA TIF to DataFrame with separate columns for R, G, B, A values.
        """
        self.logger.info("Processing RGBA dataset...")

        with self.open_dataset() as src:
            if self.count != 4:
                raise ValueError("RGBA mode requires a 4-band TIF file")

            # Read all four bands
            red, green, blue, alpha = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_transparent:
                mask = alpha > 0
                red = np.extract(mask, red)
                green = np.extract(mask, green)
                blue = np.extract(mask, blue)
                alpha = np.extract(mask, alpha)
                lons = np.extract(mask, x_coords)
                lats = np.extract(mask, y_coords)
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()
                alpha = alpha.flatten()

            # Create DataFrame with RGBA values
            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                    "alpha": alpha,
                }
            )

            # Normalize alpha values if they're not in [0, 1] range
            if data["alpha"].max() > 1:
                data["alpha"] = data["alpha"] / data["alpha"].max()

        self.logger.info("RGBA dataset is processed!")
        return data

    def _to_rgb_dataframe(self, drop_nodata: bool = True) -> pd.DataFrame:
        """Convert RGB TIF to DataFrame with separate columns for R, G, B values."""
        if self.mode != "rgb":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing RGB dataset...")

        with self.open_dataset() as src:
            if self.count != 3:
                raise ValueError("RGB mode requires a 3-band TIF file")

            # Read all three bands
            red, green, blue = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    mask = ~(
                        (red == nodata_value)
                        | (green == nodata_value)
                        | (blue == nodata_value)
                    )
                    red = np.extract(mask, red)
                    green = np.extract(mask, green)
                    blue = np.extract(mask, blue)
                    lons = np.extract(mask, x_coords)
                    lats = np.extract(mask, y_coords)
                else:
                    lons = x_coords.flatten()
                    lats = y_coords.flatten()
                    red = red.flatten()
                    green = green.flatten()
                    blue = blue.flatten()
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()

            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                }
            )

        self.logger.info("RGB dataset is processed!")
        return data

    def _to_band_dataframe(
        self, band_number: int = 1, drop_nodata: bool = True, drop_values: list = []
    ) -> pd.DataFrame:
        """Process single-band TIF to DataFrame."""
        if self.mode != "single":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing single-band dataset...")

        if band_number <= 0 or band_number > self.count:
            self.logger.error(
                f"Error: Band number {band_number} is out of range. The file has {self.count} bands."
            )
            return None

        with self.open_dataset() as src:

            band = src.read(band_number)

            x_coords, y_coords = self._get_pixel_coordinates()

            values_to_mask = []
            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    values_to_mask.append(nodata_value)

            if drop_values:
                values_to_mask.extend(drop_values)

            if values_to_mask:
                data_mask = ~np.isin(band, values_to_mask)
                pixel_values = np.extract(data_mask, band)
                lons = np.extract(data_mask, x_coords)
                lats = np.extract(data_mask, y_coords)
            else:
                pixel_values = band.flatten()
                lons = x_coords.flatten()
                lats = y_coords.flatten()

            data = pd.DataFrame({"lon": lons, "lat": lats, "pixel_value": pixel_values})

        self.logger.info("Dataset is processed!")
        return data

    def _to_multi_band_dataframe(
        self,
        drop_nodata: bool = True,
        drop_values: list = [],
        band_names: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """
        Process multi-band TIF to DataFrame with all bands included.

        Args:
            drop_nodata (bool): Whether to drop nodata values. Defaults to True.
            drop_values (list): Additional values to drop from the dataset. Defaults to empty list.
            band_names (Optional[List[str]]): Custom names for the bands. If None, bands will be named using
                                            the band descriptions from the GeoTIFF metadata if available,
                                            otherwise 'band_1', 'band_2', etc.

        Returns:
            pd.DataFrame: DataFrame containing coordinates and all band values
        """
        self.logger.info("Processing multi-band dataset...")

        with self.open_dataset() as src:
            # Read all bands
            stack = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            # Initialize dictionary with coordinates
            data_dict = {"lon": x_coords.flatten(), "lat": y_coords.flatten()}

            # Get band descriptions from metadata if available
            if band_names is None and hasattr(src, "descriptions") and src.descriptions:
                band_names = [
                    desc if desc else f"band_{i+1}"
                    for i, desc in enumerate(src.descriptions)
                ]

            # Process each band
            for band_idx in range(self.count):
                band_data = stack[band_idx]

                # Handle nodata and other values to drop
                if drop_nodata or drop_values:
                    values_to_mask = []
                    if drop_nodata and src.nodata is not None:
                        values_to_mask.append(src.nodata)
                    if drop_values:
                        values_to_mask.extend(drop_values)

                    if values_to_mask:
                        data_mask = ~np.isin(band_data, values_to_mask)
                        band_values = np.extract(data_mask, band_data)
                        if band_idx == 0:  # Only need to mask coordinates once
                            data_dict["lon"] = np.extract(data_mask, x_coords)
                            data_dict["lat"] = np.extract(data_mask, y_coords)
                    else:
                        band_values = band_data.flatten()
                else:
                    band_values = band_data.flatten()

                # Use custom band names if provided, otherwise use descriptions or default naming
                band_name = (
                    band_names[band_idx]
                    if band_names and len(band_names) > band_idx
                    else f"band_{band_idx + 1}"
                )
                data_dict[band_name] = band_values

        self.logger.info("Multi-band dataset is processed!")
        return pd.DataFrame(data_dict)

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]

    def __enter__(self):
        return self

    def __del__(self):
        """Clean up temporary files and directories."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir, ignore_errors=True)

    def cleanup(self):
        """Explicit cleanup method for better control."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir)
            self.logger.info("Cleaned up temporary files")

    def __exit__(self, exc_type, exc_value, traceback):
        """Proper context manager exit with cleanup."""
        self.cleanup()
        return False

bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

dtype property

Get the data types from the TIF file

is_merged: bool property

Check if this processor was created from multiple rasters.

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

source_count: int property

Get the number of source rasters.

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__del__()

Clean up temporary files and directories.

Source code in gigaspatial/processing/tif_processor.py
def __del__(self):
    """Clean up temporary files and directories."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir, ignore_errors=True)

__exit__(exc_type, exc_value, traceback)

Proper context manager exit with cleanup.

Source code in gigaspatial/processing/tif_processor.py
def __exit__(self, exc_type, exc_value, traceback):
    """Proper context manager exit with cleanup."""
    self.cleanup()
    return False

__post_init__()

Validate inputs, merge rasters if needed, and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs, merge rasters if needed, and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(self.__class__.__name__)
    self._cache = {}
    self._temp_dir = tempfile.mkdtemp()
    self._merged_file_path = None
    self._reprojected_file_path = None

    # Handle multiple dataset paths
    if isinstance(self.dataset_path, list):
        if len(self.dataset_path) > 1:
            self.dataset_paths = [Path(p) for p in self.dataset_path]
            self._validate_multiple_datasets()
            self._merge_rasters()
            self.dataset_path = self._merged_file_path
    else:
        self.dataset_paths = [Path(self.dataset_path)]
        if not self.data_store.file_exists(str(self.dataset_path)):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        # Reproject single raster during initialization if target_crs is set
        if self.target_crs:
            self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        self._reprojected_file_path = self._reproject_to_temp_file(
                            src, self.target_crs
                        )
            self.dataset_path = self._reprojected_file_path

    self._load_metadata()

    # Validate mode and band count
    if self.mode == "rgba" and self.count != 4:
        raise ValueError("RGBA mode requires a 4-band TIF file")
    if self.mode == "rgb" and self.count != 3:
        raise ValueError("RGB mode requires a 3-band TIF file")
    if self.mode == "single" and self.count != 1:
        raise ValueError("Single mode requires a 1-band TIF file")
    if self.mode == "multi" and self.count < 2:
        raise ValueError("Multi mode requires a TIF file with 2 or more bands")

cleanup()

Explicit cleanup method for better control.

Source code in gigaspatial/processing/tif_processor.py
def cleanup(self):
    """Explicit cleanup method for better control."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir)
        self.logger.info("Cleaned up temporary files")

get_raster_info()

Get comprehensive raster information.

Source code in gigaspatial/processing/tif_processor.py
def get_raster_info(self) -> Dict[str, Any]:
    """Get comprehensive raster information."""
    return {
        "count": self.count,
        "width": self.width,
        "height": self.height,
        "crs": self.crs,
        "bounds": self.bounds,
        "transform": self.transform,
        "dtypes": self.dtype,
        "nodata": self.nodata,
        "mode": self.mode,
        "is_merged": self.is_merged,
        "source_count": self.source_count,
    }

open_dataset()

Context manager for accessing the dataset, handling temporary reprojected files.

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset, handling temporary reprojected files."""
    if self._merged_file_path:
        with rasterio.open(self._merged_file_path) as src:
            yield src
    elif self._reprojected_file_path:
        with rasterio.open(self._reprojected_file_path) as src:
            yield src
    else:
        with self.data_store.open(str(self.dataset_path), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src

reproject_to(target_crs, output_path=None, resampling_method=None, resolution=None)

Reprojects the current raster to a new CRS and optionally saves it.

Parameters:

Name Type Description Default
target_crs str

The CRS to reproject to (e.g., "EPSG:4326").

required
output_path Optional[Union[str, Path]]

The path to save the reprojected raster. If None, it is saved to a temporary file.

None
resampling_method Optional[Resampling]

The resampling method to use.

None
resolution Optional[Tuple[float, float]]

The target resolution (pixel size) in the new CRS.

None
Source code in gigaspatial/processing/tif_processor.py
def reproject_to(
    self,
    target_crs: str,
    output_path: Optional[Union[str, Path]] = None,
    resampling_method: Optional[Resampling] = None,
    resolution: Optional[Tuple[float, float]] = None,
):
    """
    Reprojects the current raster to a new CRS and optionally saves it.

    Args:
        target_crs: The CRS to reproject to (e.g., "EPSG:4326").
        output_path: The path to save the reprojected raster. If None,
                     it is saved to a temporary file.
        resampling_method: The resampling method to use.
        resolution: The target resolution (pixel size) in the new CRS.
    """
    self.logger.info(f"Reprojecting raster to {target_crs}...")

    # Use provided or default values
    resampling_method = resampling_method or self.resampling_method
    resolution = resolution or self.reprojection_resolution

    with self.open_dataset() as src:
        if src.crs.to_string() == target_crs:
            self.logger.info(
                "Raster is already in the target CRS. No reprojection needed."
            )
            # If output_path is specified, copy the file
            if output_path:
                self.data_store.copy_file(str(self.dataset_path), output_path)
            return self.dataset_path

        dst_path = output_path or os.path.join(
            self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
        )

        with rasterio.open(
            dst_path,
            "w",
            **self._get_reprojection_profile(src, target_crs, resolution),
        ) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=resampling_method,
                    num_threads=multiprocessing.cpu_count(),
                )

        self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
        return Path(dst_path)

sample_by_polygons(polygon_list, stat='mean')

Sample raster values by polygons and compute statistic(s) for each polygon.

Parameters:

Name Type Description Default
polygon_list

List of shapely Polygon or MultiPolygon objects.

required
stat Union[str, Callable, List[Union[str, Callable]]]

Statistic(s) to compute. Can be: - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count' - Single callable: custom function that takes array and returns scalar - List of strings/callables: multiple statistics to compute

'mean'

Returns:

Type Description

If single stat: np.ndarray of computed statistics for each polygon

If multiple stats: List of dictionaries with stat names as keys

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self,
    polygon_list,
    stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
):
    """
    Sample raster values by polygons and compute statistic(s) for each polygon.

    Args:
        polygon_list: List of shapely Polygon or MultiPolygon objects.
        stat: Statistic(s) to compute. Can be:
            - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
            - Single callable: custom function that takes array and returns scalar
            - List of strings/callables: multiple statistics to compute

    Returns:
        If single stat: np.ndarray of computed statistics for each polygon
        If multiple stats: List of dictionaries with stat names as keys
    """
    # Determine if single or multiple stats
    single_stat = not isinstance(stat, list)
    stats_list = [stat] if single_stat else stat

    # Prepare stat functions
    stat_funcs = []
    stat_names = []

    for s in stats_list:
        if callable(s):
            stat_funcs.append(s)
            stat_names.append(
                s.__name__
                if hasattr(s, "__name__")
                else f"custom_{len(stat_names)}"
            )
        else:
            # Handle string statistics
            if s == "count":
                stat_funcs.append(len)
            else:
                stat_funcs.append(getattr(np, s))
            stat_names.append(s)

    results = []

    with self.open_dataset() as src:
        for polygon in tqdm(polygon_list):
            try:
                out_image, _ = mask(src, [polygon], crop=True, filled=False)

                # Use masked arrays for more efficient nodata handling
                if hasattr(out_image, "mask"):
                    valid_data = out_image.compressed()
                else:
                    valid_data = (
                        out_image[out_image != self.nodata]
                        if self.nodata
                        else out_image.flatten()
                    )

                if len(valid_data) == 0:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})
                else:
                    if single_stat:
                        results.append(stat_funcs[0](valid_data))
                    else:
                        # Compute all statistics for this polygon
                        polygon_stats = {}
                        for func, name in zip(stat_funcs, stat_names):
                            try:
                                polygon_stats[name] = func(valid_data)
                            except Exception:
                                polygon_stats[name] = np.nan
                        results.append(polygon_stats)

            except Exception:
                if single_stat:
                    results.append(np.nan)
                else:
                    results.append({name: np.nan for name in stat_names})

    return np.array(results) if single_stat else results

sample_by_polygons_batched(polygon_list, stat='mean', batch_size=100, n_workers=4, **kwargs)

Sample raster values by polygons in parallel using batching.

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons_batched(
    self,
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: Union[str, Callable] = "mean",
    batch_size: int = 100,
    n_workers: int = 4,
    **kwargs,
) -> np.ndarray:
    """
    Sample raster values by polygons in parallel using batching.
    """

    def _chunk_list(data_list, chunk_size):
        """Yield successive chunks from data_list."""
        for i in range(0, len(data_list), chunk_size):
            yield data_list[i : i + chunk_size]

    if len(polygon_list) == 0:
        return np.array([])

    stat_func = stat if callable(stat) else getattr(np, stat)

    polygon_chunks = list(_chunk_list(polygon_list, batch_size))

    with multiprocessing.Pool(
        initializer=self._initializer_worker, processes=n_workers
    ) as pool:
        process_func = partial(self._process_polygon_batch, stat_func=stat_func)
        batched_results = list(
            tqdm(
                pool.imap(process_func, polygon_chunks),
                total=len(polygon_chunks),
                desc=f"Sampling polygons",
            )
        )

        results = [item for sublist in batched_results for item in sublist]

    return np.array(results)

to_geodataframe(**kwargs)

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Source code in gigaspatial/processing/tif_processor.py
def to_geodataframe(self, **kwargs) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.
    """
    df = self.to_dataframe(**kwargs)

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    return gdf

to_graph(connectivity=4, band=None, include_coordinates=False, graph_type='networkx', chunk_size=None)

Convert raster to graph based on pixel adjacency.

Source code in gigaspatial/processing/tif_processor.py
def to_graph(
    self,
    connectivity: Literal[4, 8] = 4,
    band: Optional[int] = None,
    include_coordinates: bool = False,
    graph_type: Literal["networkx", "sparse"] = "networkx",
    chunk_size: Optional[int] = None,
) -> Union[nx.Graph, sp.csr_matrix]:
    """
    Convert raster to graph based on pixel adjacency.
    """
    if chunk_size is not None:
        raise NotImplementedError("Chunked processing is not yet implemented.")

    with self.open_dataset() as src:
        band_idx = band - 1 if band is not None else 0
        if band_idx < 0 or band_idx >= src.count:
            raise ValueError(
                f"Band {band} not available. Raster has {src.count} bands"
            )

        data = src.read(band_idx + 1)
        nodata = src.nodata if src.nodata is not None else self.nodata
        valid_mask = (
            data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
        )

        height, width = data.shape

        # Find all valid pixels
        valid_rows, valid_cols = np.where(valid_mask)
        num_valid_pixels = len(valid_rows)

        # Create a sequential mapping from (row, col) to a node ID
        node_map = np.full(data.shape, -1, dtype=int)
        node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

        # Define neighborhood offsets
        if connectivity == 4:
            # von Neumann neighborhood (4-connectivity)
            offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        else:  # connectivity == 8
            # Moore neighborhood (8-connectivity)
            offsets = [
                (-1, -1),
                (-1, 0),
                (-1, 1),
                (0, -1),
                (0, 1),
                (1, -1),
                (1, 0),
                (1, 1),
            ]

        # Collect nodes and edges
        nodes_to_add = []
        edges_to_add = []

        for i in range(num_valid_pixels):
            row, col = valid_rows[i], valid_cols[i]
            current_node_id = node_map[row, col]

            # Prepare node attributes
            node_attrs = {"value": float(data[row, col])}
            if include_coordinates:
                x, y = src.xy(row, col)
                node_attrs["x"] = x
                node_attrs["y"] = y
            nodes_to_add.append((current_node_id, node_attrs))

            # Find neighbors and collect edges
            for dy, dx in offsets:
                neighbor_row, neighbor_col = row + dy, col + dx

                # Check if neighbor is within bounds and is a valid pixel
                if (
                    0 <= neighbor_row < height
                    and 0 <= neighbor_col < width
                    and valid_mask[neighbor_row, neighbor_col]
                ):
                    neighbor_node_id = node_map[neighbor_row, neighbor_col]

                    # Ensure each edge is added only once
                    if current_node_id < neighbor_node_id:
                        neighbor_value = float(data[neighbor_row, neighbor_col])
                        edges_to_add.append(
                            (current_node_id, neighbor_node_id, neighbor_value)
                        )

        if graph_type == "networkx":
            G = nx.Graph()
            G.add_nodes_from(nodes_to_add)
            G.add_weighted_edges_from(edges_to_add)
            return G
        else:  # sparse matrix
            edges_array = np.array(edges_to_add)
            row_indices = edges_array[:, 0]
            col_indices = edges_array[:, 1]
            weights = edges_array[:, 2]

            # Add reverse edges for symmetric matrix
            row_indices.extend(col_indices)
            col_indices.extend(row_indices)
            weights.extend(weights)

            return sp.coo_matrix(
                (weights, (row_indices, col_indices)),
                shape=(num_valid_pixels, num_valid_pixels),
            ).tocsr()

add_area_in_meters(gdf, area_column_name='area_in_meters')

Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries. area_column_name : str, optional Name of the new column to store the area values. Default is "area_m2".

Returns:

geopandas.GeoDataFrame The input GeoDataFrame with an additional column for the area in square meters.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def add_area_in_meters(
    gdf: gpd.GeoDataFrame, area_column_name: str = "area_in_meters"
) -> gpd.GeoDataFrame:
    """
    Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.
    area_column_name : str, optional
        Name of the new column to store the area values. Default is "area_m2".

    Returns:
    -------
    geopandas.GeoDataFrame
        The input GeoDataFrame with an additional column for the area in square meters.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Create a copy of the GeoDataFrame to avoid modifying the original
    gdf_with_area = gdf.copy()

    # Calculate the UTM CRS for accurate area calculation
    try:
        utm_crs = gdf_with_area.estimate_utm_crs()
    except Exception as e:
        LOGGER.warning(
            f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
        )
        utm_crs = "EPSG:3857"  # Fallback to Web Mercator

    # Transform to UTM CRS and calculate the area in square meters
    gdf_with_area[area_column_name] = gdf_with_area.to_crs(utm_crs).geometry.area

    return gdf_with_area

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

aggregate_points_to_zones(points, zones, value_columns=None, aggregation='count', point_zone_predicate='within', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate point data to zones with flexible aggregation methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

Point data to aggregate

required
zones GeoDataFrame

Zones to aggregate points to

required
value_columns Optional[Union[str, List[str]]]

Column(s) containing values to aggregate If None, only counts will be performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max") - Dict: Map column names to aggregation methods

'count'
point_zone_predicate str

Spatial predicate for point-to-zone relationship Options: "within", "intersects", "contains"

'within'
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated point values

Example

poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count") poi_value_mean = aggregate_points_to_zones( ... pois, zones, value_columns="score", aggregation="mean" ... ) poi_multiple = aggregate_points_to_zones( ... pois, zones, ... value_columns=["score", "visits"], ... aggregation={"score": "mean", "visits": "sum"} ... )

Source code in gigaspatial/processing/geo.py
def aggregate_points_to_zones(
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    point_zone_predicate: str = "within",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate point data to zones with flexible aggregation methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): Point data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate points to
        value_columns (Optional[Union[str, List[str]]]): Column(s) containing values to aggregate
            If None, only counts will be performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max")
            - Dict: Map column names to aggregation methods
        point_zone_predicate (str): Spatial predicate for point-to-zone relationship
            Options: "within", "intersects", "contains"
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated point values

    Example:
        >>> poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count")
        >>> poi_value_mean = aggregate_points_to_zones(
        ...     pois, zones, value_columns="score", aggregation="mean"
        ... )
        >>> poi_multiple = aggregate_points_to_zones(
        ...     pois, zones,
        ...     value_columns=["score", "visits"],
        ...     aggregation={"score": "mean", "visits": "sum"}
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert points to GeoDataFrame if necessary
    if not isinstance(points, gpd.GeoDataFrame):
        points_gdf = convert_to_geodataframe(points)
    else:
        points_gdf = points.copy()

    # Ensure CRS match
    if points_gdf.crs != zones.crs:
        points_gdf = points_gdf.to_crs(zones.crs)

    # Handle value columns
    if value_columns is not None:
        if isinstance(value_columns, str):
            value_columns = [value_columns]

        # Validate that all value columns exist
        missing_cols = [col for col in value_columns if col not in points_gdf.columns]
        if missing_cols:
            raise ValueError(f"Value columns not found in points data: {missing_cols}")

    # Handle aggregation method
    agg_funcs = {}

    if isinstance(aggregation, str):
        if aggregation == "count":
            # Special case for count (doesn't need value columns)
            agg_funcs["__count"] = "count"
        elif value_columns is not None:
            # Apply the same aggregation to all value columns
            agg_funcs = {col: aggregation for col in value_columns}
        else:
            raise ValueError(
                "Value columns must be specified for aggregation methods other than 'count'"
            )
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        if value_columns is None:
            raise ValueError(
                "Value columns must be specified when using a dictionary of aggregation methods"
            )

        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    # Spatial join
    joined = gpd.sjoin(points_gdf, zones, how="inner", predicate=point_zone_predicate)

    # Perform aggregation
    if "geometry" in joined.columns and not all(
        value == "count" for value in agg_funcs.values()
    ):
        # Drop geometry for non-count aggregations to avoid errors
        joined = joined.drop(columns=["geometry"])

    if "__count" in agg_funcs:
        # Count points per zone
        counts = (
            joined.groupby(zone_id_column)
            .size()
            .reset_index(name=f"point_count{output_suffix}")
        )
        result = result.merge(counts, on=zone_id_column, how="left")
        result[f"point_count{output_suffix}"] = (
            result[f"point_count{output_suffix}"].fillna(0).astype(int)
        )
    else:
        # Aggregate values
        aggregated = joined.groupby(zone_id_column).agg(agg_funcs).reset_index()

        # Rename columns to include aggregation method
        if len(value_columns) > 0:
            # Handle MultiIndex columns from pandas aggregation
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    (
                        f"{col[0]}_{col[1]}{output_suffix}"
                        if col[0] != zone_id_column
                        else zone_id_column
                    )
                    for col in aggregated.columns
                ]

            # Merge back to zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])
        return result

    return result

aggregate_polygons_to_zones(polygons, zones, value_columns, aggregation='sum', predicate='intersects', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregates polygon data to zones based on a specified spatial relationship.

This function performs a spatial join between polygons and zones and then aggregates values from the polygons to their corresponding zones. The aggregation method depends on the predicate parameter, which determines the nature of the spatial relationship.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.

required
zones GeoDataFrame

The target zones to which the polygon data will be aggregated.

required
value_columns Union[str, List[str]]

The column(s) in polygons containing the numeric values to aggregate.

required
aggregation Union[str, Dict[str, str]]

The aggregation method(s) to use. Can be a single string (e.g., "sum", "mean", "max") to apply the same method to all columns, or a dictionary mapping column names to aggregation methods (e.g., {'population': 'sum'}). Defaults to "sum".

'sum'
predicate Literal['intersects', 'within', 'fractional']

The spatial relationship to use for aggregation: - "intersects": Aggregates values for any polygon that intersects a zone. - "within": Aggregates values for polygons entirely contained within a zone. - "fractional": Performs area-weighted aggregation. The value of a polygon is distributed proportionally to the area of its overlap with each zone. This requires calculating a UTM CRS for accurate area measurements. Defaults to "intersects".

'intersects'
zone_id_column str

The name of the column in zones that contains the unique zone identifiers. Defaults to "zone_id".

'zone_id'
output_suffix str

A suffix to add to the names of the new aggregated columns in the output GeoDataFrame. Defaults to "".

''
drop_geometry bool

If True, the geometry column will be dropped from the output GeoDataFrame. Defaults to False.

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The zones GeoDataFrame with new columns containing the aggregated values. Zones with no intersecting or contained polygons will have 0 values.

Raises:

Type Description
TypeError

If zones is not a GeoDataFrame or polygons cannot be converted.

ValueError

If zone_id_column or any value_columns are not found, or if the geometry types in polygons are not polygons.

RuntimeError

If an error occurs during the area-weighted aggregation process.

Example

import geopandas as gpd

Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames

Aggregate total population within each grid zone using area-weighting

pop_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="population", ... predicate="fractional", ... aggregation="sum", ... output_suffix="_pop" ... )

Aggregate the count of landuse parcels intersecting each zone

count_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="parcel_id", ... predicate="intersects", ... aggregation="count" ... )

Source code in gigaspatial/processing/geo.py
def aggregate_polygons_to_zones(
    polygons: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Union[str, List[str]],
    aggregation: Union[str, Dict[str, str]] = "sum",
    predicate: Literal["intersects", "within", "fractional"] = "intersects",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregates polygon data to zones based on a specified spatial relationship.

    This function performs a spatial join between polygons and zones and then
    aggregates values from the polygons to their corresponding zones. The aggregation
    method depends on the `predicate` parameter, which determines the nature of the
    spatial relationship.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
            Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.
        zones (gpd.GeoDataFrame):
            The target zones to which the polygon data will be aggregated.
        value_columns (Union[str, List[str]]):
            The column(s) in `polygons` containing the numeric values to aggregate.
        aggregation (Union[str, Dict[str, str]], optional):
            The aggregation method(s) to use. Can be a single string (e.g., "sum",
            "mean", "max") to apply the same method to all columns, or a dictionary
            mapping column names to aggregation methods (e.g., `{'population': 'sum'}`).
            Defaults to "sum".
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation:
            - "intersects": Aggregates values for any polygon that intersects a zone.
            - "within": Aggregates values for polygons entirely contained within a zone.
            - "fractional": Performs area-weighted aggregation. The value of a polygon
              is distributed proportionally to the area of its overlap with each zone.
              This requires calculating a UTM CRS for accurate area measurements.
            Defaults to "intersects".
        zone_id_column (str, optional):
            The name of the column in `zones` that contains the unique zone identifiers.
            Defaults to "zone_id".
        output_suffix (str, optional):
            A suffix to add to the names of the new aggregated columns in the output
            GeoDataFrame. Defaults to "".
        drop_geometry (bool, optional):
            If True, the geometry column will be dropped from the output GeoDataFrame.
            Defaults to False.

    Returns:
        gpd.GeoDataFrame:
            The `zones` GeoDataFrame with new columns containing the aggregated values.
            Zones with no intersecting or contained polygons will have `0` values.

    Raises:
        TypeError: If `zones` is not a GeoDataFrame or `polygons` cannot be converted.
        ValueError: If `zone_id_column` or any `value_columns` are not found, or
                    if the geometry types in `polygons` are not polygons.
        RuntimeError: If an error occurs during the area-weighted aggregation process.

    Example:
        >>> import geopandas as gpd
        >>> # Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames
        >>> # Aggregate total population within each grid zone using area-weighting
        >>> pop_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="population",
        ...     predicate="fractional",
        ...     aggregation="sum",
        ...     output_suffix="_pop"
        ... )
        >>> # Aggregate the count of landuse parcels intersecting each zone
        >>> count_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="parcel_id",
        ...     predicate="intersects",
        ...     aggregation="count"
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zones.empty:
        raise ValueError("zones GeoDataFrame is empty")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    if predicate not in ["intersects", "within", "fractional"]:
        raise ValueError(
            f"Unsupported predicate: {predicate}. Predicate can be one of `intersects`, `within`, `fractional`"
        )

    # Convert polygons to GeoDataFrame if necessary
    if not isinstance(polygons, gpd.GeoDataFrame):
        try:
            polygons_gdf = convert_to_geodataframe(polygons)
        except Exception as e:
            raise TypeError(
                f"polygons must be a GeoDataFrame or convertible to one: {e}"
            )
    else:
        polygons_gdf = polygons.copy()

    if polygons_gdf.empty:
        LOGGER.warning("Empty polygons GeoDataFrame provided")
        return zones

    # Validate geometry types
    non_polygon_geoms = [
        geom_type
        for geom_type in polygons_gdf.geometry.geom_type.unique()
        if geom_type not in ["Polygon", "MultiPolygon"]
    ]
    if non_polygon_geoms:
        raise ValueError(
            f"Input contains non-polygon geometries: {non_polygon_geoms}. "
            "Use aggregate_points_to_zones for point data."
        )

    # Process value columns
    if isinstance(value_columns, str):
        value_columns = [value_columns]

    # Validate that all value columns exist
    missing_cols = [col for col in value_columns if col not in polygons_gdf.columns]
    if missing_cols:
        raise ValueError(f"Value columns not found in polygons data: {missing_cols}")

    # Check for column name conflicts with zone_id_column
    if zone_id_column in polygons_gdf.columns:
        raise ValueError(
            f"Column name conflict: polygons DataFrame contains column '{zone_id_column}' "
            f"which conflicts with the zone identifier column. Please rename this column "
            f"in the polygons data to avoid confusion."
        )

    # Ensure CRS match
    if polygons_gdf.crs != zones.crs:
        polygons_gdf = polygons_gdf.to_crs(zones.crs)

    # Handle aggregation method
    agg_funcs = _process_aggregation_methods(aggregation, value_columns)

    # Prepare minimal zones for spatial operations (only zone_id_column and geometry)
    minimal_zones = zones[[zone_id_column, "geometry"]].copy()

    if predicate == "fractional":
        aggregated_data = _fractional_aggregation(
            polygons_gdf, minimal_zones, value_columns, agg_funcs, zone_id_column
        )
    else:
        aggregated_data = _simple_aggregation(
            polygons_gdf,
            minimal_zones,
            value_columns,
            agg_funcs,
            zone_id_column,
            predicate,
        )

    # Merge aggregated results back to complete zones data
    result = zones.merge(
        aggregated_data[[col for col in aggregated_data.columns if col != "geometry"]],
        on=zone_id_column,
        how="left",
    )

    # Fill NaN values with zeros for the newly aggregated columns only
    aggregated_cols = [col for col in result.columns if col not in zones.columns]
    for col in aggregated_cols:
        if pd.api.types.is_numeric_dtype(result[col]):
            result[col] = result[col].fillna(0)

    # Apply output suffix consistently to result columns only
    if output_suffix:
        rename_dict = {col: f"{col}{output_suffix}" for col in aggregated_cols}
        result = result.rename(columns=rename_dict)

    if drop_geometry:
        result = result.drop(columns=["geometry"])

    return result

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: Union[float, np.array, pd.Series],
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        try:
            utm_crs = gdf_work.estimate_utm_crs()
        except Exception as e:
            LOGGER.warning(
                f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
            )
            utm_crs = "EPSG:3857"  # Fallback to Web Mercator

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            distance=buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

get_centroids(gdf)

Calculate the centroids of a (Multi)Polygon GeoDataFrame.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries.

Returns:

geopandas.GeoDataFrame A new GeoDataFrame with Point geometries representing the centroids.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def get_centroids(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate the centroids of a (Multi)Polygon GeoDataFrame.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with Point geometries representing the centroids.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Calculate centroids
    centroids = gdf.copy()
    centroids["geometry"] = centroids.geometry.centroid

    return centroids

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

sample_multiple_tifs_by_coordinates(tif_processors, coordinate_list)

Sample raster values from multiple TIFF files for given coordinates.

Parameters: - tif_processors: List of TifProcessor instances. - coordinate_list: List of (x, y) coordinates.

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_coordinates(
    tif_processors: List[TifProcessor], coordinate_list: List[Tuple[float, float]]
):
    """
    Sample raster values from multiple TIFF files for given coordinates.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - coordinate_list: List of (x, y) coordinates.

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(coordinate_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_coordinates(coordinate_list=coordinate_list)

        if tp.nodata is not None:
            mask = (np.isnan(sampled_values)) & (
                values != tp.nodata
            )  # Replace only NaNs
        else:
            mask = np.isnan(sampled_values)  # No explicit nodata, replace all NaNs

        sampled_values[mask] = values[mask]  # Update only missing values

    return sampled_values

sample_multiple_tifs_by_polygons(tif_processors, polygon_list, stat='mean')

Sample raster values from multiple TIFF files for polygons in a list and join the results.

Parameters: - tif_processors: List of TifProcessor instances. - polygon_list: List of polygon geometries (can include MultiPolygons). - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_polygons(
    tif_processors: List[TifProcessor],
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: str = "mean",
) -> np.ndarray:
    """
    Sample raster values from multiple TIFF files for polygons in a list and join the results.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - polygon_list: List of polygon geometries (can include MultiPolygons).
    - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(polygon_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_polygons(polygon_list=polygon_list, stat=stat)

        mask = np.isnan(sampled_values)  # replace all NaNs

        sampled_values[mask] = values[mask]  # Update only values with samapled value

    return sampled_values

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

algorithms

build_distance_graph(left_df, right_df, distance_threshold, max_k=100, return_dataframe=False, verbose=True, exclude_same_index=None)

Build a graph of spatial matches between two dataframes using KD-tree.

Parameters:

Name Type Description Default
left_df Union[DataFrame, GeoDataFrame]

Left dataframe to match from

required
right_df Union[DataFrame, GeoDataFrame]

Right dataframe to match to

required
distance_threshold float

Maximum distance for matching (in meters)

required
max_k int

Maximum number of neighbors to consider per point (default: 100)

100
return_dataframe bool

If True, also return the matches DataFrame

False
verbose bool

If True, print statistics about the graph

True
exclude_same_index Optional[bool]

If True, exclude self-matches. If None, auto-detect based on df equality

None

Returns:

Type Description
Union[Graph, Tuple[Graph, DataFrame]]

NetworkX Graph, or tuple of (Graph, DataFrame) if return_dataframe=True

Raises:

Type Description
ValueError

If distance_threshold is negative or max_k is not positive

Source code in gigaspatial/processing/algorithms.py
def build_distance_graph(
    left_df: Union[pd.DataFrame, gpd.GeoDataFrame],
    right_df: Union[pd.DataFrame, gpd.GeoDataFrame],
    distance_threshold: float,
    max_k: int = 100,
    return_dataframe: bool = False,
    verbose: bool = True,
    exclude_same_index: Optional[bool] = None,
) -> Union[nx.Graph, Tuple[nx.Graph, pd.DataFrame]]:
    """
    Build a graph of spatial matches between two dataframes using KD-tree.

    Args:
        left_df: Left dataframe to match from
        right_df: Right dataframe to match to
        distance_threshold: Maximum distance for matching (in meters)
        max_k: Maximum number of neighbors to consider per point (default: 100)
        return_dataframe: If True, also return the matches DataFrame
        verbose: If True, print statistics about the graph
        exclude_same_index: If True, exclude self-matches. If None, auto-detect based on df equality

    Returns:
        NetworkX Graph, or tuple of (Graph, DataFrame) if return_dataframe=True

    Raises:
        ValueError: If distance_threshold is negative or max_k is not positive
    """

    # Input validation
    if distance_threshold < 0:
        raise ValueError("distance_threshold must be non-negative")

    if max_k <= 0:
        raise ValueError("max_k must be positive")

    if left_df.empty or right_df.empty:
        if verbose:
            LOGGER.warning("Warning: One or both dataframes are empty")
        G = nx.Graph()
        return (G, pd.DataFrame()) if return_dataframe else G

    def get_utm_coordinates(df: Union[pd.DataFrame, gpd.GeoDataFrame]) -> np.ndarray:
        """Extract coordinates as numpy array in UTM projection."""
        if isinstance(df, pd.DataFrame):
            gdf = convert_to_geodataframe(df)
        else:
            gdf = df.copy()

        # More robust UTM CRS estimation
        try:
            gdf_utm = gdf.to_crs(gdf.estimate_utm_crs())
        except Exception as e:
            if verbose:
                LOGGER.warning(
                    f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
                )
            gdf_utm = gdf.to_crs("EPSG:3857")  # Fallback to Web Mercator

        return gdf_utm.get_coordinates().to_numpy()

    # Auto-detect same dataframe case
    if exclude_same_index is None:
        exclude_same_index = left_df.equals(right_df)
        if verbose and exclude_same_index:
            LOGGER.info("Auto-detected same dataframe - excluding self-matches")

    # Get coordinates
    left_coords = get_utm_coordinates(left_df)
    right_coords = (
        get_utm_coordinates(right_df) if not exclude_same_index else left_coords
    )

    # Build KD-tree and query
    kdtree = cKDTree(right_coords)

    # Use the provided max_k parameter, but don't exceed available points
    k_to_use = min(max_k, len(right_coords))

    if verbose and k_to_use < max_k:
        LOGGER.info(
            f"Note: max_k ({max_k}) reduced to {k_to_use} (number of available points)"
        )

    # Note: Distance calculations here are based on Euclidean distance in UTM projection.
    # This can introduce errors up to ~50 cm for a 50 meter threshold, especially near the poles where distortion increases.
    distances, indices = kdtree.query(
        left_coords, k=k_to_use, distance_upper_bound=distance_threshold
    )

    # Handle single k case (when k_to_use = 1, results are 1D)
    if distances.ndim == 1:
        distances = distances.reshape(-1, 1)
        indices = indices.reshape(-1, 1)

    # Extract valid pairs using vectorized operations
    left_indices = np.arange(len(distances))[:, np.newaxis]
    left_indices = np.broadcast_to(left_indices, distances.shape)
    valid_mask = np.isfinite(distances)

    if exclude_same_index:
        same_index_mask = left_indices == indices
        valid_mask = valid_mask & ~same_index_mask

    valid_left = left_indices[valid_mask]
    valid_right = indices[valid_mask]
    valid_distances = distances[valid_mask]

    # Map back to original indices
    valid_left_indices = left_df.index.values[valid_left]
    valid_right_indices = right_df.index.values[valid_right]

    # Create matches DataFrame
    matches_df = pd.DataFrame(
        {
            "left_idx": valid_left_indices,
            "right_idx": valid_right_indices,
            "distance": valid_distances,
        }
    )

    # Build graph more efficiently
    G = nx.from_pandas_edgelist(
        matches_df,
        source="left_idx",
        target="right_idx",
        edge_attr="distance",
        create_using=nx.Graph(),
    )

    # Add isolated nodes (nodes without any matches within threshold)
    # This ensures all original indices are represented in the graph
    all_left_nodes = set(left_df.index.values)
    all_right_nodes = set(right_df.index.values)

    if not exclude_same_index:
        all_nodes = all_left_nodes | all_right_nodes
    else:
        all_nodes = all_left_nodes  # Same dataframe, so same node set

    # Add nodes that don't have edges
    existing_nodes = set(G.nodes())
    isolated_nodes = all_nodes - existing_nodes
    G.add_nodes_from(isolated_nodes)

    # Print statistics
    if verbose:
        print(
            f"Total potential matches: {len(left_df)} × {len(right_df)} = {len(left_df) * len(right_df):,}"
        )
        print(f"Matches found within {distance_threshold}m: {len(matches_df):,}")
        print(f"Graph nodes: {G.number_of_nodes():,}")
        print(f"Graph edges: {G.number_of_edges():,}")

        components = list(nx.connected_components(G))
        print(f"Connected components: {len(components):,}")

        if len(components) > 1:
            component_sizes = [len(c) for c in components]
            print(f"Largest component size: {max(component_sizes):,}")
            print(
                f"Isolated nodes: {sum(1 for size in component_sizes if size == 1):,}"
            )

        if len(matches_df) > 0:
            print(
                f"Distance stats - min: {matches_df['distance'].min():.1f}m, "
                f"max: {matches_df['distance'].max():.1f}m, "
                f"mean: {matches_df['distance'].mean():.1f}m"
            )

    return (G, matches_df) if return_dataframe else G

geo

add_area_in_meters(gdf, area_column_name='area_in_meters')

Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries. area_column_name : str, optional Name of the new column to store the area values. Default is "area_m2".

Returns:

geopandas.GeoDataFrame The input GeoDataFrame with an additional column for the area in square meters.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def add_area_in_meters(
    gdf: gpd.GeoDataFrame, area_column_name: str = "area_in_meters"
) -> gpd.GeoDataFrame:
    """
    Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.
    area_column_name : str, optional
        Name of the new column to store the area values. Default is "area_m2".

    Returns:
    -------
    geopandas.GeoDataFrame
        The input GeoDataFrame with an additional column for the area in square meters.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Create a copy of the GeoDataFrame to avoid modifying the original
    gdf_with_area = gdf.copy()

    # Calculate the UTM CRS for accurate area calculation
    try:
        utm_crs = gdf_with_area.estimate_utm_crs()
    except Exception as e:
        LOGGER.warning(
            f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
        )
        utm_crs = "EPSG:3857"  # Fallback to Web Mercator

    # Transform to UTM CRS and calculate the area in square meters
    gdf_with_area[area_column_name] = gdf_with_area.to_crs(utm_crs).geometry.area

    return gdf_with_area

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

aggregate_points_to_zones(points, zones, value_columns=None, aggregation='count', point_zone_predicate='within', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate point data to zones with flexible aggregation methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

Point data to aggregate

required
zones GeoDataFrame

Zones to aggregate points to

required
value_columns Optional[Union[str, List[str]]]

Column(s) containing values to aggregate If None, only counts will be performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max") - Dict: Map column names to aggregation methods

'count'
point_zone_predicate str

Spatial predicate for point-to-zone relationship Options: "within", "intersects", "contains"

'within'
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated point values

Example

poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count") poi_value_mean = aggregate_points_to_zones( ... pois, zones, value_columns="score", aggregation="mean" ... ) poi_multiple = aggregate_points_to_zones( ... pois, zones, ... value_columns=["score", "visits"], ... aggregation={"score": "mean", "visits": "sum"} ... )

Source code in gigaspatial/processing/geo.py
def aggregate_points_to_zones(
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    point_zone_predicate: str = "within",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate point data to zones with flexible aggregation methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): Point data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate points to
        value_columns (Optional[Union[str, List[str]]]): Column(s) containing values to aggregate
            If None, only counts will be performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max")
            - Dict: Map column names to aggregation methods
        point_zone_predicate (str): Spatial predicate for point-to-zone relationship
            Options: "within", "intersects", "contains"
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated point values

    Example:
        >>> poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count")
        >>> poi_value_mean = aggregate_points_to_zones(
        ...     pois, zones, value_columns="score", aggregation="mean"
        ... )
        >>> poi_multiple = aggregate_points_to_zones(
        ...     pois, zones,
        ...     value_columns=["score", "visits"],
        ...     aggregation={"score": "mean", "visits": "sum"}
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert points to GeoDataFrame if necessary
    if not isinstance(points, gpd.GeoDataFrame):
        points_gdf = convert_to_geodataframe(points)
    else:
        points_gdf = points.copy()

    # Ensure CRS match
    if points_gdf.crs != zones.crs:
        points_gdf = points_gdf.to_crs(zones.crs)

    # Handle value columns
    if value_columns is not None:
        if isinstance(value_columns, str):
            value_columns = [value_columns]

        # Validate that all value columns exist
        missing_cols = [col for col in value_columns if col not in points_gdf.columns]
        if missing_cols:
            raise ValueError(f"Value columns not found in points data: {missing_cols}")

    # Handle aggregation method
    agg_funcs = {}

    if isinstance(aggregation, str):
        if aggregation == "count":
            # Special case for count (doesn't need value columns)
            agg_funcs["__count"] = "count"
        elif value_columns is not None:
            # Apply the same aggregation to all value columns
            agg_funcs = {col: aggregation for col in value_columns}
        else:
            raise ValueError(
                "Value columns must be specified for aggregation methods other than 'count'"
            )
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        if value_columns is None:
            raise ValueError(
                "Value columns must be specified when using a dictionary of aggregation methods"
            )

        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    # Spatial join
    joined = gpd.sjoin(points_gdf, zones, how="inner", predicate=point_zone_predicate)

    # Perform aggregation
    if "geometry" in joined.columns and not all(
        value == "count" for value in agg_funcs.values()
    ):
        # Drop geometry for non-count aggregations to avoid errors
        joined = joined.drop(columns=["geometry"])

    if "__count" in agg_funcs:
        # Count points per zone
        counts = (
            joined.groupby(zone_id_column)
            .size()
            .reset_index(name=f"point_count{output_suffix}")
        )
        result = result.merge(counts, on=zone_id_column, how="left")
        result[f"point_count{output_suffix}"] = (
            result[f"point_count{output_suffix}"].fillna(0).astype(int)
        )
    else:
        # Aggregate values
        aggregated = joined.groupby(zone_id_column).agg(agg_funcs).reset_index()

        # Rename columns to include aggregation method
        if len(value_columns) > 0:
            # Handle MultiIndex columns from pandas aggregation
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    (
                        f"{col[0]}_{col[1]}{output_suffix}"
                        if col[0] != zone_id_column
                        else zone_id_column
                    )
                    for col in aggregated.columns
                ]

            # Merge back to zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])
        return result

    return result

aggregate_polygons_to_zones(polygons, zones, value_columns, aggregation='sum', predicate='intersects', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregates polygon data to zones based on a specified spatial relationship.

This function performs a spatial join between polygons and zones and then aggregates values from the polygons to their corresponding zones. The aggregation method depends on the predicate parameter, which determines the nature of the spatial relationship.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.

required
zones GeoDataFrame

The target zones to which the polygon data will be aggregated.

required
value_columns Union[str, List[str]]

The column(s) in polygons containing the numeric values to aggregate.

required
aggregation Union[str, Dict[str, str]]

The aggregation method(s) to use. Can be a single string (e.g., "sum", "mean", "max") to apply the same method to all columns, or a dictionary mapping column names to aggregation methods (e.g., {'population': 'sum'}). Defaults to "sum".

'sum'
predicate Literal['intersects', 'within', 'fractional']

The spatial relationship to use for aggregation: - "intersects": Aggregates values for any polygon that intersects a zone. - "within": Aggregates values for polygons entirely contained within a zone. - "fractional": Performs area-weighted aggregation. The value of a polygon is distributed proportionally to the area of its overlap with each zone. This requires calculating a UTM CRS for accurate area measurements. Defaults to "intersects".

'intersects'
zone_id_column str

The name of the column in zones that contains the unique zone identifiers. Defaults to "zone_id".

'zone_id'
output_suffix str

A suffix to add to the names of the new aggregated columns in the output GeoDataFrame. Defaults to "".

''
drop_geometry bool

If True, the geometry column will be dropped from the output GeoDataFrame. Defaults to False.

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: The zones GeoDataFrame with new columns containing the aggregated values. Zones with no intersecting or contained polygons will have 0 values.

Raises:

Type Description
TypeError

If zones is not a GeoDataFrame or polygons cannot be converted.

ValueError

If zone_id_column or any value_columns are not found, or if the geometry types in polygons are not polygons.

RuntimeError

If an error occurs during the area-weighted aggregation process.

Example

import geopandas as gpd

Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames
Aggregate total population within each grid zone using area-weighting

pop_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="population", ... predicate="fractional", ... aggregation="sum", ... output_suffix="_pop" ... )

Aggregate the count of landuse parcels intersecting each zone

count_by_zone = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns="parcel_id", ... predicate="intersects", ... aggregation="count" ... )

Source code in gigaspatial/processing/geo.py
def aggregate_polygons_to_zones(
    polygons: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Union[str, List[str]],
    aggregation: Union[str, Dict[str, str]] = "sum",
    predicate: Literal["intersects", "within", "fractional"] = "intersects",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregates polygon data to zones based on a specified spatial relationship.

    This function performs a spatial join between polygons and zones and then
    aggregates values from the polygons to their corresponding zones. The aggregation
    method depends on the `predicate` parameter, which determines the nature of the
    spatial relationship.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]):
            Polygon data to aggregate. Must be a GeoDataFrame or convertible to one.
        zones (gpd.GeoDataFrame):
            The target zones to which the polygon data will be aggregated.
        value_columns (Union[str, List[str]]):
            The column(s) in `polygons` containing the numeric values to aggregate.
        aggregation (Union[str, Dict[str, str]], optional):
            The aggregation method(s) to use. Can be a single string (e.g., "sum",
            "mean", "max") to apply the same method to all columns, or a dictionary
            mapping column names to aggregation methods (e.g., `{'population': 'sum'}`).
            Defaults to "sum".
        predicate (Literal["intersects", "within", "fractional"], optional):
            The spatial relationship to use for aggregation:
            - "intersects": Aggregates values for any polygon that intersects a zone.
            - "within": Aggregates values for polygons entirely contained within a zone.
            - "fractional": Performs area-weighted aggregation. The value of a polygon
              is distributed proportionally to the area of its overlap with each zone.
              This requires calculating a UTM CRS for accurate area measurements.
            Defaults to "intersects".
        zone_id_column (str, optional):
            The name of the column in `zones` that contains the unique zone identifiers.
            Defaults to "zone_id".
        output_suffix (str, optional):
            A suffix to add to the names of the new aggregated columns in the output
            GeoDataFrame. Defaults to "".
        drop_geometry (bool, optional):
            If True, the geometry column will be dropped from the output GeoDataFrame.
            Defaults to False.

    Returns:
        gpd.GeoDataFrame:
            The `zones` GeoDataFrame with new columns containing the aggregated values.
            Zones with no intersecting or contained polygons will have `0` values.

    Raises:
        TypeError: If `zones` is not a GeoDataFrame or `polygons` cannot be converted.
        ValueError: If `zone_id_column` or any `value_columns` are not found, or
                    if the geometry types in `polygons` are not polygons.
        RuntimeError: If an error occurs during the area-weighted aggregation process.

    Example:
        >>> import geopandas as gpd
        >>> # Assuming 'landuse_polygons' and 'grid_zones' are GeoDataFrames
        >>> # Aggregate total population within each grid zone using area-weighting
        >>> pop_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="population",
        ...     predicate="fractional",
        ...     aggregation="sum",
        ...     output_suffix="_pop"
        ... )
        >>> # Aggregate the count of landuse parcels intersecting each zone
        >>> count_by_zone = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns="parcel_id",
        ...     predicate="intersects",
        ...     aggregation="count"
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zones.empty:
        raise ValueError("zones GeoDataFrame is empty")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    if predicate not in ["intersects", "within", "fractional"]:
        raise ValueError(
            f"Unsupported predicate: {predicate}. Predicate can be one of `intersects`, `within`, `fractional`"
        )

    # Convert polygons to GeoDataFrame if necessary
    if not isinstance(polygons, gpd.GeoDataFrame):
        try:
            polygons_gdf = convert_to_geodataframe(polygons)
        except Exception as e:
            raise TypeError(
                f"polygons must be a GeoDataFrame or convertible to one: {e}"
            )
    else:
        polygons_gdf = polygons.copy()

    if polygons_gdf.empty:
        LOGGER.warning("Empty polygons GeoDataFrame provided")
        return zones

    # Validate geometry types
    non_polygon_geoms = [
        geom_type
        for geom_type in polygons_gdf.geometry.geom_type.unique()
        if geom_type not in ["Polygon", "MultiPolygon"]
    ]
    if non_polygon_geoms:
        raise ValueError(
            f"Input contains non-polygon geometries: {non_polygon_geoms}. "
            "Use aggregate_points_to_zones for point data."
        )

    # Process value columns
    if isinstance(value_columns, str):
        value_columns = [value_columns]

    # Validate that all value columns exist
    missing_cols = [col for col in value_columns if col not in polygons_gdf.columns]
    if missing_cols:
        raise ValueError(f"Value columns not found in polygons data: {missing_cols}")

    # Check for column name conflicts with zone_id_column
    if zone_id_column in polygons_gdf.columns:
        raise ValueError(
            f"Column name conflict: polygons DataFrame contains column '{zone_id_column}' "
            f"which conflicts with the zone identifier column. Please rename this column "
            f"in the polygons data to avoid confusion."
        )

    # Ensure CRS match
    if polygons_gdf.crs != zones.crs:
        polygons_gdf = polygons_gdf.to_crs(zones.crs)

    # Handle aggregation method
    agg_funcs = _process_aggregation_methods(aggregation, value_columns)

    # Prepare minimal zones for spatial operations (only zone_id_column and geometry)
    minimal_zones = zones[[zone_id_column, "geometry"]].copy()

    if predicate == "fractional":
        aggregated_data = _fractional_aggregation(
            polygons_gdf, minimal_zones, value_columns, agg_funcs, zone_id_column
        )
    else:
        aggregated_data = _simple_aggregation(
            polygons_gdf,
            minimal_zones,
            value_columns,
            agg_funcs,
            zone_id_column,
            predicate,
        )

    # Merge aggregated results back to complete zones data
    result = zones.merge(
        aggregated_data[[col for col in aggregated_data.columns if col != "geometry"]],
        on=zone_id_column,
        how="left",
    )

    # Fill NaN values with zeros for the newly aggregated columns only
    aggregated_cols = [col for col in result.columns if col not in zones.columns]
    for col in aggregated_cols:
        if pd.api.types.is_numeric_dtype(result[col]):
            result[col] = result[col].fillna(0)

    # Apply output suffix consistently to result columns only
    if output_suffix:
        rename_dict = {col: f"{col}{output_suffix}" for col in aggregated_cols}
        result = result.rename(columns=rename_dict)

    if drop_geometry:
        result = result.drop(columns=["geometry"])

    return result

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: Union[float, np.array, pd.Series],
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        try:
            utm_crs = gdf_work.estimate_utm_crs()
        except Exception as e:
            LOGGER.warning(
                f"Warning: UTM CRS estimation failed, using Web Mercator. Error: {e}"
            )
            utm_crs = "EPSG:3857"  # Fallback to Web Mercator

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            distance=buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

get_centroids(gdf)

Calculate the centroids of a (Multi)Polygon GeoDataFrame.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries.

Returns:

geopandas.GeoDataFrame A new GeoDataFrame with Point geometries representing the centroids.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def get_centroids(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate the centroids of a (Multi)Polygon GeoDataFrame.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with Point geometries representing the centroids.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Calculate centroids
    centroids = gdf.copy()
    centroids["geometry"] = centroids.geometry.centroid

    return centroids

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

sat_images

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

tif_processor

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data. Can merge multiple rasters into one during initialization.

Source code in gigaspatial/processing/tif_processor.py
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.
    Can merge multiple rasters into one during initialization.
    """

    dataset_path: Union[Path, str, List[Union[Path, str]]]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba", "multi"] = "single"
    merge_method: Literal["first", "last", "min", "max", "mean"] = "first"
    target_crs: Optional[str] = None  # For reprojection if needed
    resampling_method: Resampling = Resampling.nearest
    reprojection_resolution: Optional[Tuple[float, float]] = None

    def __post_init__(self):
        """Validate inputs, merge rasters if needed, and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(self.__class__.__name__)
        self._cache = {}
        self._temp_dir = tempfile.mkdtemp()
        self._merged_file_path = None
        self._reprojected_file_path = None

        # Handle multiple dataset paths
        if isinstance(self.dataset_path, list):
            if len(self.dataset_path) > 1:
                self.dataset_paths = [Path(p) for p in self.dataset_path]
                self._validate_multiple_datasets()
                self._merge_rasters()
                self.dataset_path = self._merged_file_path
        else:
            self.dataset_paths = [Path(self.dataset_path)]
            if not self.data_store.file_exists(str(self.dataset_path)):
                raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

            # Reproject single raster during initialization if target_crs is set
            if self.target_crs:
                self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
                with self.data_store.open(str(self.dataset_path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            self._reprojected_file_path = self._reproject_to_temp_file(
                                src, self.target_crs
                            )
                self.dataset_path = self._reprojected_file_path

        self._load_metadata()

        # Validate mode and band count
        if self.mode == "rgba" and self.count != 4:
            raise ValueError("RGBA mode requires a 4-band TIF file")
        if self.mode == "rgb" and self.count != 3:
            raise ValueError("RGB mode requires a 3-band TIF file")
        if self.mode == "single" and self.count != 1:
            raise ValueError("Single mode requires a 1-band TIF file")
        if self.mode == "multi" and self.count < 2:
            raise ValueError("Multi mode requires a TIF file with 2 or more bands")

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset, handling temporary reprojected files."""
        if self._merged_file_path:
            with rasterio.open(self._merged_file_path) as src:
                yield src
        elif self._reprojected_file_path:
            with rasterio.open(self._reprojected_file_path) as src:
                yield src
        else:
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        yield src

    def reproject_to(
        self,
        target_crs: str,
        output_path: Optional[Union[str, Path]] = None,
        resampling_method: Optional[Resampling] = None,
        resolution: Optional[Tuple[float, float]] = None,
    ):
        """
        Reprojects the current raster to a new CRS and optionally saves it.

        Args:
            target_crs: The CRS to reproject to (e.g., "EPSG:4326").
            output_path: The path to save the reprojected raster. If None,
                         it is saved to a temporary file.
            resampling_method: The resampling method to use.
            resolution: The target resolution (pixel size) in the new CRS.
        """
        self.logger.info(f"Reprojecting raster to {target_crs}...")

        # Use provided or default values
        resampling_method = resampling_method or self.resampling_method
        resolution = resolution or self.reprojection_resolution

        with self.open_dataset() as src:
            if src.crs.to_string() == target_crs:
                self.logger.info(
                    "Raster is already in the target CRS. No reprojection needed."
                )
                # If output_path is specified, copy the file
                if output_path:
                    self.data_store.copy_file(str(self.dataset_path), output_path)
                return self.dataset_path

            dst_path = output_path or os.path.join(
                self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
            )

            with rasterio.open(
                dst_path,
                "w",
                **self._get_reprojection_profile(src, target_crs, resolution),
            ) as dst:
                for band_idx in range(1, src.count + 1):
                    reproject(
                        source=rasterio.band(src, band_idx),
                        destination=rasterio.band(dst, band_idx),
                        src_transform=src.transform,
                        src_crs=src.crs,
                        dst_transform=dst.transform,
                        dst_crs=dst.crs,
                        resampling=resampling_method,
                        num_threads=multiprocessing.cpu_count(),
                    )

            self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
            return Path(dst_path)

    def get_raster_info(self) -> Dict[str, Any]:
        """Get comprehensive raster information."""
        return {
            "count": self.count,
            "width": self.width,
            "height": self.height,
            "crs": self.crs,
            "bounds": self.bounds,
            "transform": self.transform,
            "dtypes": self.dtype,
            "nodata": self.nodata,
            "mode": self.mode,
            "is_merged": self.is_merged,
            "source_count": self.source_count,
        }

    def _reproject_to_temp_file(
        self, src: rasterio.DatasetReader, target_crs: str
    ) -> str:
        """Helper to reproject a raster and save it to a temporary file."""
        dst_path = os.path.join(
            self._temp_dir, f"reprojected_temp_{os.urandom(8).hex()}.tif"
        )
        profile = self._get_reprojection_profile(
            src, target_crs, self.reprojection_resolution
        )

        with rasterio.open(dst_path, "w", **profile) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=self.resampling_method,
                )
        return dst_path

    def _validate_multiple_datasets(self):
        """Validate that all datasets exist and have compatible properties."""
        if len(self.dataset_paths) < 2:
            raise ValueError("Multiple dataset paths required for merging")

        with self.data_store.open(str(self.dataset_paths[0]), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as ref_src:
                    ref_count = ref_src.count
                    ref_dtype = ref_src.dtypes[0]
                    ref_crs = ref_src.crs
                    ref_transform = ref_src.transform
                    ref_nodata = ref_src.nodata

        for i, path in enumerate(self.dataset_paths[1:], 1):
            with self.data_store.open(str(path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        if src.count != ref_count:
                            raise ValueError(
                                f"Dataset {i} has {src.count} bands, expected {ref_count}"
                            )
                        if src.dtypes[0] != ref_dtype:
                            raise ValueError(
                                f"Dataset {i} has dtype {src.dtypes[0]}, expected {ref_dtype}"
                            )
                        if not self.target_crs and src.crs != ref_crs:
                            self.logger.warning(
                                f"Dataset {i} has CRS {src.crs}, expected {ref_crs}. "
                                "Consider setting target_crs parameter for reprojection before merging."
                            )
                        if self.target_crs is None and not self._transforms_compatible(
                            src.transform, ref_transform
                        ):
                            self.logger.warning(
                                f"Dataset {i} has different resolution. Resampling may be needed."
                            )
                        if src.nodata != ref_nodata:
                            self.logger.warning(
                                f"Dataset {i} has different nodata value: {src.nodata} vs {ref_nodata}"
                            )

    def _get_reprojection_profile(
        self,
        src: rasterio.DatasetReader,
        target_crs: str,
        resolution: Optional[Tuple[float, float]],
        compression: str = "lzw",
    ):
        """Calculates and returns the profile for a reprojected raster."""
        if resolution:
            src_res = (abs(src.transform.a), abs(src.transform.e))
            self.logger.info(
                f"Using target resolution: {resolution}. Source resolution: {src_res}."
            )
            # Calculate transform and dimensions based on the new resolution
            dst_transform, width, height = calculate_default_transform(
                src.crs,
                target_crs,
                src.width,
                src.height,
                *src.bounds,
                resolution=resolution,
            )
        else:
            # Keep original resolution but reproject
            dst_transform, width, height = calculate_default_transform(
                src.crs, target_crs, src.width, src.height, *src.bounds
            )

        profile = src.profile.copy()
        profile.update(
            {
                "crs": target_crs,
                "transform": dst_transform,
                "width": width,
                "height": height,
                "compress": compression,  # Add compression to save space
            }
        )
        return profile

    def _transforms_compatible(self, transform1, transform2, tolerance=1e-6):
        """Check if two transforms have compatible pixel sizes."""
        return (
            abs(transform1.a - transform2.a) < tolerance
            and abs(transform1.e - transform2.e) < tolerance
        )

    def _merge_rasters(self):
        """Merge multiple rasters into a single raster."""
        self.logger.info(f"Merging {len(self.dataset_paths)} rasters...")

        # Open all datasets and handle reprojection if needed
        datasets_to_merge = []
        temp_reprojected_files = []
        try:
            for path in self.dataset_paths:
                with self.data_store.open(str(path), "rb") as f:
                    with rasterio.MemoryFile(f.read()) as memfile:
                        with memfile.open() as src:
                            if self.target_crs and src.crs != self.target_crs:
                                self.logger.info(
                                    f"Reprojecting {path.name} to {self.target_crs} before merging."
                                )
                                reprojected_path = self._reproject_to_temp_file(
                                    src, self.target_crs
                                )
                                temp_reprojected_files.append(reprojected_path)
                                datasets_to_merge.append(
                                    rasterio.open(reprojected_path)
                                )
                            else:
                                temp_path = os.path.join(
                                    self._temp_dir,
                                    f"temp_{path.stem}_{os.urandom(4).hex()}.tif",
                                )
                                temp_reprojected_files.append(temp_path)

                                profile = src.profile
                                with rasterio.open(temp_path, "w", **profile) as dst:
                                    dst.write(src.read())
                                datasets_to_merge.append(rasterio.open(temp_path))

            self._merged_file_path = os.path.join(self._temp_dir, "merged_raster.tif")

            if self.merge_method == "mean":
                merged_array, merged_transform = self._merge_with_mean(
                    datasets_to_merge
                )
            else:
                merged_array, merged_transform = merge(
                    datasets_to_merge,
                    method=self.merge_method,
                    resampling=self.resampling_method,
                )

            # Get profile from the first file in the list (all should be compatible now)
            ref_src = datasets_to_merge[0]
            profile = ref_src.profile.copy()
            profile.update(
                {
                    "height": merged_array.shape[-2],
                    "width": merged_array.shape[-1],
                    "transform": merged_transform,
                    "crs": self.target_crs if self.target_crs else ref_src.crs,
                }
            )

            with rasterio.open(self._merged_file_path, "w", **profile) as dst:
                dst.write(merged_array)
        finally:
            for dataset in datasets_to_merge:
                if hasattr(dataset, "close"):
                    dataset.close()

            # Clean up temporary files immediately
            for temp_file in temp_reprojected_files:
                try:
                    os.remove(temp_file)
                except OSError:
                    pass

        self.logger.info("Raster merging completed!")

    def _merge_with_mean(self, src_files):
        """Merge rasters using mean aggregation."""
        # Get bounds and resolution for merged raster
        bounds = src_files[0].bounds
        transform = src_files[0].transform

        for src in src_files[1:]:
            bounds = rasterio.coords.BoundingBox(
                min(bounds.left, src.bounds.left),
                min(bounds.bottom, src.bounds.bottom),
                max(bounds.right, src.bounds.right),
                max(bounds.top, src.bounds.top),
            )

        # Calculate dimensions for merged raster
        width = int((bounds.right - bounds.left) / abs(transform.a))
        height = int((bounds.top - bounds.bottom) / abs(transform.e))

        # Create new transform for merged bounds
        merged_transform = rasterio.transform.from_bounds(
            bounds.left, bounds.bottom, bounds.right, bounds.top, width, height
        )

        estimated_memory = height * width * src_files[0].count * 8  # float64
        if estimated_memory > 1e9:  # 1GB threshold
            self.logger.warning(
                f"Large memory usage expected: {estimated_memory/1e9:.1f}GB"
            )

        # Initialize arrays for sum and count
        sum_array = np.zeros((src_files[0].count, height, width), dtype=np.float64)
        count_array = np.zeros((height, width), dtype=np.int32)

        # Process each source file
        for src in src_files:
            # Read data
            data = src.read()

            # Calculate offset in merged raster
            src_bounds = src.bounds
            col_off = int((src_bounds.left - bounds.left) / abs(transform.a))
            row_off = int((bounds.top - src_bounds.top) / abs(transform.e))

            # Get valid data mask
            if src.nodata is not None:
                valid_mask = data[0] != src.nodata
            else:
                valid_mask = np.ones(data[0].shape, dtype=bool)

            # Add to sum and count arrays
            end_row = row_off + data.shape[1]
            end_col = col_off + data.shape[2]

            sum_array[:, row_off:end_row, col_off:end_col] += np.where(
                valid_mask, data, 0
            )
            count_array[row_off:end_row, col_off:end_col] += valid_mask.astype(np.int32)

        # Calculate mean
        mean_array = np.divide(
            sum_array,
            count_array,
            out=np.full_like(
                sum_array, src_files[0].nodata or 0, dtype=sum_array.dtype
            ),
            where=count_array > 0,
        )

        return mean_array.astype(src_files[0].dtypes[0]), merged_transform

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        try:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]
        except (rasterio.errors.RasterioIOError, FileNotFoundError) as e:
            raise FileNotFoundError(f"Could not read raster metadata: {e}")
        except Exception as e:
            raise RuntimeError(f"Unexpected error loading metadata: {e}")

    @property
    def is_merged(self) -> bool:
        """Check if this processor was created from multiple rasters."""
        return len(self.dataset_paths) > 1

    @property
    def source_count(self) -> int:
        """Get the number of source rasters."""
        return len(self.dataset_paths)

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def dtype(self):
        """Get the data types from the TIF file"""
        return self._cache.get("dtype", [])

    @property
    def width(self):
        return self._cache["width"]

    @property
    def height(self):
        return self._cache["height"]

    def to_dataframe(self, drop_nodata=True, **kwargs) -> pd.DataFrame:
        try:
            if self.mode == "single":
                df = self._to_band_dataframe(drop_nodata=drop_nodata, **kwargs)
            elif self.mode == "rgb":
                df = self._to_rgb_dataframe(drop_nodata=drop_nodata)
            elif self.mode == "rgba":
                df = self._to_rgba_dataframe(drop_transparent=drop_nodata)
            elif self.mode == "multi":
                df = self._to_multi_band_dataframe(drop_nodata=drop_nodata, **kwargs)
            else:
                raise ValueError(
                    f"Invalid mode: {self.mode}. Must be one of: single, rgb, rgba, multi"
                )
        except Exception as e:
            raise ValueError(
                f"Failed to process TIF file in mode '{self.mode}'. "
                f"Please ensure the file is valid and matches the selected mode. "
                f"Original error: {str(e)}"
            )

        return df

    def to_geodataframe(self, **kwargs) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.
        """
        df = self.to_dataframe(**kwargs)

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        return gdf

    def to_graph(
        self,
        connectivity: Literal[4, 8] = 4,
        band: Optional[int] = None,
        include_coordinates: bool = False,
        graph_type: Literal["networkx", "sparse"] = "networkx",
        chunk_size: Optional[int] = None,
    ) -> Union[nx.Graph, sp.csr_matrix]:
        """
        Convert raster to graph based on pixel adjacency.
        """
        if chunk_size is not None:
            raise NotImplementedError("Chunked processing is not yet implemented.")

        with self.open_dataset() as src:
            band_idx = band - 1 if band is not None else 0
            if band_idx < 0 or band_idx >= src.count:
                raise ValueError(
                    f"Band {band} not available. Raster has {src.count} bands"
                )

            data = src.read(band_idx + 1)
            nodata = src.nodata if src.nodata is not None else self.nodata
            valid_mask = (
                data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
            )

            height, width = data.shape

            # Find all valid pixels
            valid_rows, valid_cols = np.where(valid_mask)
            num_valid_pixels = len(valid_rows)

            # Create a sequential mapping from (row, col) to a node ID
            node_map = np.full(data.shape, -1, dtype=int)
            node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

            # Define neighborhood offsets
            if connectivity == 4:
                # von Neumann neighborhood (4-connectivity)
                offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
            else:  # connectivity == 8
                # Moore neighborhood (8-connectivity)
                offsets = [
                    (-1, -1),
                    (-1, 0),
                    (-1, 1),
                    (0, -1),
                    (0, 1),
                    (1, -1),
                    (1, 0),
                    (1, 1),
                ]

            # Collect nodes and edges
            nodes_to_add = []
            edges_to_add = []

            for i in range(num_valid_pixels):
                row, col = valid_rows[i], valid_cols[i]
                current_node_id = node_map[row, col]

                # Prepare node attributes
                node_attrs = {"value": float(data[row, col])}
                if include_coordinates:
                    x, y = src.xy(row, col)
                    node_attrs["x"] = x
                    node_attrs["y"] = y
                nodes_to_add.append((current_node_id, node_attrs))

                # Find neighbors and collect edges
                for dy, dx in offsets:
                    neighbor_row, neighbor_col = row + dy, col + dx

                    # Check if neighbor is within bounds and is a valid pixel
                    if (
                        0 <= neighbor_row < height
                        and 0 <= neighbor_col < width
                        and valid_mask[neighbor_row, neighbor_col]
                    ):
                        neighbor_node_id = node_map[neighbor_row, neighbor_col]

                        # Ensure each edge is added only once
                        if current_node_id < neighbor_node_id:
                            neighbor_value = float(data[neighbor_row, neighbor_col])
                            edges_to_add.append(
                                (current_node_id, neighbor_node_id, neighbor_value)
                            )

            if graph_type == "networkx":
                G = nx.Graph()
                G.add_nodes_from(nodes_to_add)
                G.add_weighted_edges_from(edges_to_add)
                return G
            else:  # sparse matrix
                edges_array = np.array(edges_to_add)
                row_indices = edges_array[:, 0]
                col_indices = edges_array[:, 1]
                weights = edges_array[:, 2]

                # Add reverse edges for symmetric matrix
                row_indices.extend(col_indices)
                col_indices.extend(row_indices)
                weights.extend(weights)

                return sp.coo_matrix(
                    (weights, (row_indices, col_indices)),
                    shape=(num_valid_pixels, num_valid_pixels),
                ).tocsr()

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]], **kwargs
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            elif self.count > 1:
                return np.array(
                    [vals for vals in src.sample(coordinate_list, **kwargs)]
                )
            else:
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self,
        polygon_list,
        stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
    ):
        """
        Sample raster values by polygons and compute statistic(s) for each polygon.

        Args:
            polygon_list: List of shapely Polygon or MultiPolygon objects.
            stat: Statistic(s) to compute. Can be:
                - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
                - Single callable: custom function that takes array and returns scalar
                - List of strings/callables: multiple statistics to compute

        Returns:
            If single stat: np.ndarray of computed statistics for each polygon
            If multiple stats: List of dictionaries with stat names as keys
        """
        # Determine if single or multiple stats
        single_stat = not isinstance(stat, list)
        stats_list = [stat] if single_stat else stat

        # Prepare stat functions
        stat_funcs = []
        stat_names = []

        for s in stats_list:
            if callable(s):
                stat_funcs.append(s)
                stat_names.append(
                    s.__name__
                    if hasattr(s, "__name__")
                    else f"custom_{len(stat_names)}"
                )
            else:
                # Handle string statistics
                if s == "count":
                    stat_funcs.append(len)
                else:
                    stat_funcs.append(getattr(np, s))
                stat_names.append(s)

        results = []

        with self.open_dataset() as src:
            for polygon in tqdm(polygon_list):
                try:
                    out_image, _ = mask(src, [polygon], crop=True, filled=False)

                    # Use masked arrays for more efficient nodata handling
                    if hasattr(out_image, "mask"):
                        valid_data = out_image.compressed()
                    else:
                        valid_data = (
                            out_image[out_image != self.nodata]
                            if self.nodata
                            else out_image.flatten()
                        )

                    if len(valid_data) == 0:
                        if single_stat:
                            results.append(np.nan)
                        else:
                            results.append({name: np.nan for name in stat_names})
                    else:
                        if single_stat:
                            results.append(stat_funcs[0](valid_data))
                        else:
                            # Compute all statistics for this polygon
                            polygon_stats = {}
                            for func, name in zip(stat_funcs, stat_names):
                                try:
                                    polygon_stats[name] = func(valid_data)
                                except Exception:
                                    polygon_stats[name] = np.nan
                            results.append(polygon_stats)

                except Exception:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})

        return np.array(results) if single_stat else results

    def sample_by_polygons_batched(
        self,
        polygon_list: List[Union[Polygon, MultiPolygon]],
        stat: Union[str, Callable] = "mean",
        batch_size: int = 100,
        n_workers: int = 4,
        **kwargs,
    ) -> np.ndarray:
        """
        Sample raster values by polygons in parallel using batching.
        """

        def _chunk_list(data_list, chunk_size):
            """Yield successive chunks from data_list."""
            for i in range(0, len(data_list), chunk_size):
                yield data_list[i : i + chunk_size]

        if len(polygon_list) == 0:
            return np.array([])

        stat_func = stat if callable(stat) else getattr(np, stat)

        polygon_chunks = list(_chunk_list(polygon_list, batch_size))

        with multiprocessing.Pool(
            initializer=self._initializer_worker, processes=n_workers
        ) as pool:
            process_func = partial(self._process_polygon_batch, stat_func=stat_func)
            batched_results = list(
                tqdm(
                    pool.imap(process_func, polygon_chunks),
                    total=len(polygon_chunks),
                    desc=f"Sampling polygons",
                )
            )

            results = [item for sublist in batched_results for item in sublist]

        return np.array(results)

    def _initializer_worker(self):
        """
        Initializer function for each worker process.
        Opens the raster dataset and stores it in a process-local variable.
        This function runs once per worker, not for every task.
        """
        global src_handle, memfile_handle
        with self.data_store.open(str(self.dataset_path), "rb") as f:
            memfile_handle = rasterio.MemoryFile(f.read())
            src_handle = memfile_handle.open()

    def _process_single_polygon(self, polygon, stat_func):
        """
        Helper function to process a single polygon.
        This will be run in a separate process.
        """
        global src_handle
        if src_handle is None:
            # This should not happen if the initializer is set up correctly,
            # but it's a good defensive check.
            raise RuntimeError("Raster dataset not initialized in this process.")

        try:
            out_image, _ = mask(src_handle, [polygon], crop=True, filled=False)

            if hasattr(out_image, "mask"):
                valid_data = out_image.compressed()
            else:
                valid_data = (
                    out_image[out_image != self.nodata]
                    if self.nodata
                    else out_image.flatten()
                )

            if len(valid_data) == 0:
                return np.nan
            else:
                return stat_func(valid_data)
        except Exception:
            return np.nan

    def _process_polygon_batch(self, polygon_batch, stat_func):
        """
        Processes a batch of polygons.
        """
        return [
            self._process_single_polygon(polygon, stat_func)
            for polygon in polygon_batch
        ]

    def _to_rgba_dataframe(self, drop_transparent: bool = False) -> pd.DataFrame:
        """
        Convert RGBA TIF to DataFrame with separate columns for R, G, B, A values.
        """
        self.logger.info("Processing RGBA dataset...")

        with self.open_dataset() as src:
            if self.count != 4:
                raise ValueError("RGBA mode requires a 4-band TIF file")

            # Read all four bands
            red, green, blue, alpha = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_transparent:
                mask = alpha > 0
                red = np.extract(mask, red)
                green = np.extract(mask, green)
                blue = np.extract(mask, blue)
                alpha = np.extract(mask, alpha)
                lons = np.extract(mask, x_coords)
                lats = np.extract(mask, y_coords)
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()
                alpha = alpha.flatten()

            # Create DataFrame with RGBA values
            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                    "alpha": alpha,
                }
            )

            # Normalize alpha values if they're not in [0, 1] range
            if data["alpha"].max() > 1:
                data["alpha"] = data["alpha"] / data["alpha"].max()

        self.logger.info("RGBA dataset is processed!")
        return data

    def _to_rgb_dataframe(self, drop_nodata: bool = True) -> pd.DataFrame:
        """Convert RGB TIF to DataFrame with separate columns for R, G, B values."""
        if self.mode != "rgb":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing RGB dataset...")

        with self.open_dataset() as src:
            if self.count != 3:
                raise ValueError("RGB mode requires a 3-band TIF file")

            # Read all three bands
            red, green, blue = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    mask = ~(
                        (red == nodata_value)
                        | (green == nodata_value)
                        | (blue == nodata_value)
                    )
                    red = np.extract(mask, red)
                    green = np.extract(mask, green)
                    blue = np.extract(mask, blue)
                    lons = np.extract(mask, x_coords)
                    lats = np.extract(mask, y_coords)
                else:
                    lons = x_coords.flatten()
                    lats = y_coords.flatten()
                    red = red.flatten()
                    green = green.flatten()
                    blue = blue.flatten()
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()

            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                }
            )

        self.logger.info("RGB dataset is processed!")
        return data

    def _to_band_dataframe(
        self, band_number: int = 1, drop_nodata: bool = True, drop_values: list = []
    ) -> pd.DataFrame:
        """Process single-band TIF to DataFrame."""
        if self.mode != "single":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing single-band dataset...")

        if band_number <= 0 or band_number > self.count:
            self.logger.error(
                f"Error: Band number {band_number} is out of range. The file has {self.count} bands."
            )
            return None

        with self.open_dataset() as src:

            band = src.read(band_number)

            x_coords, y_coords = self._get_pixel_coordinates()

            values_to_mask = []
            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    values_to_mask.append(nodata_value)

            if drop_values:
                values_to_mask.extend(drop_values)

            if values_to_mask:
                data_mask = ~np.isin(band, values_to_mask)
                pixel_values = np.extract(data_mask, band)
                lons = np.extract(data_mask, x_coords)
                lats = np.extract(data_mask, y_coords)
            else:
                pixel_values = band.flatten()
                lons = x_coords.flatten()
                lats = y_coords.flatten()

            data = pd.DataFrame({"lon": lons, "lat": lats, "pixel_value": pixel_values})

        self.logger.info("Dataset is processed!")
        return data

    def _to_multi_band_dataframe(
        self,
        drop_nodata: bool = True,
        drop_values: list = [],
        band_names: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """
        Process multi-band TIF to DataFrame with all bands included.

        Args:
            drop_nodata (bool): Whether to drop nodata values. Defaults to True.
            drop_values (list): Additional values to drop from the dataset. Defaults to empty list.
            band_names (Optional[List[str]]): Custom names for the bands. If None, bands will be named using
                                            the band descriptions from the GeoTIFF metadata if available,
                                            otherwise 'band_1', 'band_2', etc.

        Returns:
            pd.DataFrame: DataFrame containing coordinates and all band values
        """
        self.logger.info("Processing multi-band dataset...")

        with self.open_dataset() as src:
            # Read all bands
            stack = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            # Initialize dictionary with coordinates
            data_dict = {"lon": x_coords.flatten(), "lat": y_coords.flatten()}

            # Get band descriptions from metadata if available
            if band_names is None and hasattr(src, "descriptions") and src.descriptions:
                band_names = [
                    desc if desc else f"band_{i+1}"
                    for i, desc in enumerate(src.descriptions)
                ]

            # Process each band
            for band_idx in range(self.count):
                band_data = stack[band_idx]

                # Handle nodata and other values to drop
                if drop_nodata or drop_values:
                    values_to_mask = []
                    if drop_nodata and src.nodata is not None:
                        values_to_mask.append(src.nodata)
                    if drop_values:
                        values_to_mask.extend(drop_values)

                    if values_to_mask:
                        data_mask = ~np.isin(band_data, values_to_mask)
                        band_values = np.extract(data_mask, band_data)
                        if band_idx == 0:  # Only need to mask coordinates once
                            data_dict["lon"] = np.extract(data_mask, x_coords)
                            data_dict["lat"] = np.extract(data_mask, y_coords)
                    else:
                        band_values = band_data.flatten()
                else:
                    band_values = band_data.flatten()

                # Use custom band names if provided, otherwise use descriptions or default naming
                band_name = (
                    band_names[band_idx]
                    if band_names and len(band_names) > band_idx
                    else f"band_{band_idx + 1}"
                )
                data_dict[band_name] = band_values

        self.logger.info("Multi-band dataset is processed!")
        return pd.DataFrame(data_dict)

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]

    def __enter__(self):
        return self

    def __del__(self):
        """Clean up temporary files and directories."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir, ignore_errors=True)

    def cleanup(self):
        """Explicit cleanup method for better control."""
        if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
            shutil.rmtree(self._temp_dir)
            self.logger.info("Cleaned up temporary files")

    def __exit__(self, exc_type, exc_value, traceback):
        """Proper context manager exit with cleanup."""
        self.cleanup()
        return False
bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

dtype property

Get the data types from the TIF file

is_merged: bool property

Check if this processor was created from multiple rasters.

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

source_count: int property

Get the number of source rasters.

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__del__()

Clean up temporary files and directories.

Source code in gigaspatial/processing/tif_processor.py
def __del__(self):
    """Clean up temporary files and directories."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir, ignore_errors=True)
__exit__(exc_type, exc_value, traceback)

Proper context manager exit with cleanup.

Source code in gigaspatial/processing/tif_processor.py
def __exit__(self, exc_type, exc_value, traceback):
    """Proper context manager exit with cleanup."""
    self.cleanup()
    return False
__post_init__()

Validate inputs, merge rasters if needed, and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs, merge rasters if needed, and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(self.__class__.__name__)
    self._cache = {}
    self._temp_dir = tempfile.mkdtemp()
    self._merged_file_path = None
    self._reprojected_file_path = None

    # Handle multiple dataset paths
    if isinstance(self.dataset_path, list):
        if len(self.dataset_path) > 1:
            self.dataset_paths = [Path(p) for p in self.dataset_path]
            self._validate_multiple_datasets()
            self._merge_rasters()
            self.dataset_path = self._merged_file_path
    else:
        self.dataset_paths = [Path(self.dataset_path)]
        if not self.data_store.file_exists(str(self.dataset_path)):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        # Reproject single raster during initialization if target_crs is set
        if self.target_crs:
            self.logger.info(f"Reprojecting single raster to {self.target_crs}...")
            with self.data_store.open(str(self.dataset_path), "rb") as f:
                with rasterio.MemoryFile(f.read()) as memfile:
                    with memfile.open() as src:
                        self._reprojected_file_path = self._reproject_to_temp_file(
                            src, self.target_crs
                        )
            self.dataset_path = self._reprojected_file_path

    self._load_metadata()

    # Validate mode and band count
    if self.mode == "rgba" and self.count != 4:
        raise ValueError("RGBA mode requires a 4-band TIF file")
    if self.mode == "rgb" and self.count != 3:
        raise ValueError("RGB mode requires a 3-band TIF file")
    if self.mode == "single" and self.count != 1:
        raise ValueError("Single mode requires a 1-band TIF file")
    if self.mode == "multi" and self.count < 2:
        raise ValueError("Multi mode requires a TIF file with 2 or more bands")
cleanup()

Explicit cleanup method for better control.

Source code in gigaspatial/processing/tif_processor.py
def cleanup(self):
    """Explicit cleanup method for better control."""
    if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
        shutil.rmtree(self._temp_dir)
        self.logger.info("Cleaned up temporary files")
get_raster_info()

Get comprehensive raster information.

Source code in gigaspatial/processing/tif_processor.py
def get_raster_info(self) -> Dict[str, Any]:
    """Get comprehensive raster information."""
    return {
        "count": self.count,
        "width": self.width,
        "height": self.height,
        "crs": self.crs,
        "bounds": self.bounds,
        "transform": self.transform,
        "dtypes": self.dtype,
        "nodata": self.nodata,
        "mode": self.mode,
        "is_merged": self.is_merged,
        "source_count": self.source_count,
    }
open_dataset()

Context manager for accessing the dataset, handling temporary reprojected files.

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset, handling temporary reprojected files."""
    if self._merged_file_path:
        with rasterio.open(self._merged_file_path) as src:
            yield src
    elif self._reprojected_file_path:
        with rasterio.open(self._reprojected_file_path) as src:
            yield src
    else:
        with self.data_store.open(str(self.dataset_path), "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src
reproject_to(target_crs, output_path=None, resampling_method=None, resolution=None)

Reprojects the current raster to a new CRS and optionally saves it.

Parameters:

Name Type Description Default
target_crs str

The CRS to reproject to (e.g., "EPSG:4326").

required
output_path Optional[Union[str, Path]]

The path to save the reprojected raster. If None, it is saved to a temporary file.

None
resampling_method Optional[Resampling]

The resampling method to use.

None
resolution Optional[Tuple[float, float]]

The target resolution (pixel size) in the new CRS.

None
Source code in gigaspatial/processing/tif_processor.py
def reproject_to(
    self,
    target_crs: str,
    output_path: Optional[Union[str, Path]] = None,
    resampling_method: Optional[Resampling] = None,
    resolution: Optional[Tuple[float, float]] = None,
):
    """
    Reprojects the current raster to a new CRS and optionally saves it.

    Args:
        target_crs: The CRS to reproject to (e.g., "EPSG:4326").
        output_path: The path to save the reprojected raster. If None,
                     it is saved to a temporary file.
        resampling_method: The resampling method to use.
        resolution: The target resolution (pixel size) in the new CRS.
    """
    self.logger.info(f"Reprojecting raster to {target_crs}...")

    # Use provided or default values
    resampling_method = resampling_method or self.resampling_method
    resolution = resolution or self.reprojection_resolution

    with self.open_dataset() as src:
        if src.crs.to_string() == target_crs:
            self.logger.info(
                "Raster is already in the target CRS. No reprojection needed."
            )
            # If output_path is specified, copy the file
            if output_path:
                self.data_store.copy_file(str(self.dataset_path), output_path)
            return self.dataset_path

        dst_path = output_path or os.path.join(
            self._temp_dir, f"reprojected_single_{os.urandom(8).hex()}.tif"
        )

        with rasterio.open(
            dst_path,
            "w",
            **self._get_reprojection_profile(src, target_crs, resolution),
        ) as dst:
            for band_idx in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, band_idx),
                    destination=rasterio.band(dst, band_idx),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=dst.transform,
                    dst_crs=dst.crs,
                    resampling=resampling_method,
                    num_threads=multiprocessing.cpu_count(),
                )

        self.logger.info(f"Reprojection complete. Output saved to {dst_path}")
        return Path(dst_path)
sample_by_polygons(polygon_list, stat='mean')

Sample raster values by polygons and compute statistic(s) for each polygon.

Parameters:

Name Type Description Default
polygon_list

List of shapely Polygon or MultiPolygon objects.

required
stat Union[str, Callable, List[Union[str, Callable]]]

Statistic(s) to compute. Can be: - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count' - Single callable: custom function that takes array and returns scalar - List of strings/callables: multiple statistics to compute

'mean'

Returns:

Type Description

If single stat: np.ndarray of computed statistics for each polygon

If multiple stats: List of dictionaries with stat names as keys

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self,
    polygon_list,
    stat: Union[str, Callable, List[Union[str, Callable]]] = "mean",
):
    """
    Sample raster values by polygons and compute statistic(s) for each polygon.

    Args:
        polygon_list: List of shapely Polygon or MultiPolygon objects.
        stat: Statistic(s) to compute. Can be:
            - Single string: 'mean', 'median', 'sum', 'min', 'max', 'std', 'count'
            - Single callable: custom function that takes array and returns scalar
            - List of strings/callables: multiple statistics to compute

    Returns:
        If single stat: np.ndarray of computed statistics for each polygon
        If multiple stats: List of dictionaries with stat names as keys
    """
    # Determine if single or multiple stats
    single_stat = not isinstance(stat, list)
    stats_list = [stat] if single_stat else stat

    # Prepare stat functions
    stat_funcs = []
    stat_names = []

    for s in stats_list:
        if callable(s):
            stat_funcs.append(s)
            stat_names.append(
                s.__name__
                if hasattr(s, "__name__")
                else f"custom_{len(stat_names)}"
            )
        else:
            # Handle string statistics
            if s == "count":
                stat_funcs.append(len)
            else:
                stat_funcs.append(getattr(np, s))
            stat_names.append(s)

    results = []

    with self.open_dataset() as src:
        for polygon in tqdm(polygon_list):
            try:
                out_image, _ = mask(src, [polygon], crop=True, filled=False)

                # Use masked arrays for more efficient nodata handling
                if hasattr(out_image, "mask"):
                    valid_data = out_image.compressed()
                else:
                    valid_data = (
                        out_image[out_image != self.nodata]
                        if self.nodata
                        else out_image.flatten()
                    )

                if len(valid_data) == 0:
                    if single_stat:
                        results.append(np.nan)
                    else:
                        results.append({name: np.nan for name in stat_names})
                else:
                    if single_stat:
                        results.append(stat_funcs[0](valid_data))
                    else:
                        # Compute all statistics for this polygon
                        polygon_stats = {}
                        for func, name in zip(stat_funcs, stat_names):
                            try:
                                polygon_stats[name] = func(valid_data)
                            except Exception:
                                polygon_stats[name] = np.nan
                        results.append(polygon_stats)

            except Exception:
                if single_stat:
                    results.append(np.nan)
                else:
                    results.append({name: np.nan for name in stat_names})

    return np.array(results) if single_stat else results
sample_by_polygons_batched(polygon_list, stat='mean', batch_size=100, n_workers=4, **kwargs)

Sample raster values by polygons in parallel using batching.

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons_batched(
    self,
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: Union[str, Callable] = "mean",
    batch_size: int = 100,
    n_workers: int = 4,
    **kwargs,
) -> np.ndarray:
    """
    Sample raster values by polygons in parallel using batching.
    """

    def _chunk_list(data_list, chunk_size):
        """Yield successive chunks from data_list."""
        for i in range(0, len(data_list), chunk_size):
            yield data_list[i : i + chunk_size]

    if len(polygon_list) == 0:
        return np.array([])

    stat_func = stat if callable(stat) else getattr(np, stat)

    polygon_chunks = list(_chunk_list(polygon_list, batch_size))

    with multiprocessing.Pool(
        initializer=self._initializer_worker, processes=n_workers
    ) as pool:
        process_func = partial(self._process_polygon_batch, stat_func=stat_func)
        batched_results = list(
            tqdm(
                pool.imap(process_func, polygon_chunks),
                total=len(polygon_chunks),
                desc=f"Sampling polygons",
            )
        )

        results = [item for sublist in batched_results for item in sublist]

    return np.array(results)
to_geodataframe(**kwargs)

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Source code in gigaspatial/processing/tif_processor.py
def to_geodataframe(self, **kwargs) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.
    """
    df = self.to_dataframe(**kwargs)

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    return gdf
to_graph(connectivity=4, band=None, include_coordinates=False, graph_type='networkx', chunk_size=None)

Convert raster to graph based on pixel adjacency.

Source code in gigaspatial/processing/tif_processor.py
def to_graph(
    self,
    connectivity: Literal[4, 8] = 4,
    band: Optional[int] = None,
    include_coordinates: bool = False,
    graph_type: Literal["networkx", "sparse"] = "networkx",
    chunk_size: Optional[int] = None,
) -> Union[nx.Graph, sp.csr_matrix]:
    """
    Convert raster to graph based on pixel adjacency.
    """
    if chunk_size is not None:
        raise NotImplementedError("Chunked processing is not yet implemented.")

    with self.open_dataset() as src:
        band_idx = band - 1 if band is not None else 0
        if band_idx < 0 or band_idx >= src.count:
            raise ValueError(
                f"Band {band} not available. Raster has {src.count} bands"
            )

        data = src.read(band_idx + 1)
        nodata = src.nodata if src.nodata is not None else self.nodata
        valid_mask = (
            data != nodata if nodata is not None else np.ones_like(data, dtype=bool)
        )

        height, width = data.shape

        # Find all valid pixels
        valid_rows, valid_cols = np.where(valid_mask)
        num_valid_pixels = len(valid_rows)

        # Create a sequential mapping from (row, col) to a node ID
        node_map = np.full(data.shape, -1, dtype=int)
        node_map[valid_rows, valid_cols] = np.arange(num_valid_pixels)

        # Define neighborhood offsets
        if connectivity == 4:
            # von Neumann neighborhood (4-connectivity)
            offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        else:  # connectivity == 8
            # Moore neighborhood (8-connectivity)
            offsets = [
                (-1, -1),
                (-1, 0),
                (-1, 1),
                (0, -1),
                (0, 1),
                (1, -1),
                (1, 0),
                (1, 1),
            ]

        # Collect nodes and edges
        nodes_to_add = []
        edges_to_add = []

        for i in range(num_valid_pixels):
            row, col = valid_rows[i], valid_cols[i]
            current_node_id = node_map[row, col]

            # Prepare node attributes
            node_attrs = {"value": float(data[row, col])}
            if include_coordinates:
                x, y = src.xy(row, col)
                node_attrs["x"] = x
                node_attrs["y"] = y
            nodes_to_add.append((current_node_id, node_attrs))

            # Find neighbors and collect edges
            for dy, dx in offsets:
                neighbor_row, neighbor_col = row + dy, col + dx

                # Check if neighbor is within bounds and is a valid pixel
                if (
                    0 <= neighbor_row < height
                    and 0 <= neighbor_col < width
                    and valid_mask[neighbor_row, neighbor_col]
                ):
                    neighbor_node_id = node_map[neighbor_row, neighbor_col]

                    # Ensure each edge is added only once
                    if current_node_id < neighbor_node_id:
                        neighbor_value = float(data[neighbor_row, neighbor_col])
                        edges_to_add.append(
                            (current_node_id, neighbor_node_id, neighbor_value)
                        )

        if graph_type == "networkx":
            G = nx.Graph()
            G.add_nodes_from(nodes_to_add)
            G.add_weighted_edges_from(edges_to_add)
            return G
        else:  # sparse matrix
            edges_array = np.array(edges_to_add)
            row_indices = edges_array[:, 0]
            col_indices = edges_array[:, 1]
            weights = edges_array[:, 2]

            # Add reverse edges for symmetric matrix
            row_indices.extend(col_indices)
            col_indices.extend(row_indices)
            weights.extend(weights)

            return sp.coo_matrix(
                (weights, (row_indices, col_indices)),
                shape=(num_valid_pixels, num_valid_pixels),
            ).tocsr()

sample_multiple_tifs_by_coordinates(tif_processors, coordinate_list)

Sample raster values from multiple TIFF files for given coordinates.

Parameters: - tif_processors: List of TifProcessor instances. - coordinate_list: List of (x, y) coordinates.

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_coordinates(
    tif_processors: List[TifProcessor], coordinate_list: List[Tuple[float, float]]
):
    """
    Sample raster values from multiple TIFF files for given coordinates.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - coordinate_list: List of (x, y) coordinates.

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(coordinate_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_coordinates(coordinate_list=coordinate_list)

        if tp.nodata is not None:
            mask = (np.isnan(sampled_values)) & (
                values != tp.nodata
            )  # Replace only NaNs
        else:
            mask = np.isnan(sampled_values)  # No explicit nodata, replace all NaNs

        sampled_values[mask] = values[mask]  # Update only missing values

    return sampled_values

sample_multiple_tifs_by_polygons(tif_processors, polygon_list, stat='mean')

Sample raster values from multiple TIFF files for polygons in a list and join the results.

Parameters: - tif_processors: List of TifProcessor instances. - polygon_list: List of polygon geometries (can include MultiPolygons). - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_polygons(
    tif_processors: List[TifProcessor],
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: str = "mean",
) -> np.ndarray:
    """
    Sample raster values from multiple TIFF files for polygons in a list and join the results.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - polygon_list: List of polygon geometries (can include MultiPolygons).
    - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(polygon_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_polygons(polygon_list=polygon_list, stat=stat)

        mask = np.isnan(sampled_values)  # replace all NaNs

        sampled_values[mask] = values[mask]  # Update only values with samapled value

    return sampled_values

utils

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df