Skip to content

Processing Module

gigaspatial.processing

DataStore

Bases: ABC

Abstract base class defining the interface for data store implementations. This class serves as a parent for both local and cloud-based storage solutions.

Source code in gigaspatial/core/io/data_store.py
class DataStore(ABC):
    """
    Abstract base class defining the interface for data store implementations.
    This class serves as a parent for both local and cloud-based storage solutions.
    """

    @abstractmethod
    def read_file(self, path: str) -> Any:
        """
        Read contents of a file from the data store.

        Args:
            path: Path to the file to read

        Returns:
            Contents of the file

        Raises:
            IOError: If file cannot be read
        """
        pass

    @abstractmethod
    def write_file(self, path: str, data: Any) -> None:
        """
        Write data to a file in the data store.

        Args:
            path: Path where to write the file
            data: Data to write to the file

        Raises:
            IOError: If file cannot be written
        """
        pass

    @abstractmethod
    def file_exists(self, path: str) -> bool:
        """
        Check if a file exists in the data store.

        Args:
            path: Path to check

        Returns:
            True if file exists, False otherwise
        """
        pass

    @abstractmethod
    def list_files(self, path: str) -> List[str]:
        """
        List all files in a directory.

        Args:
            path: Directory path to list

        Returns:
            List of file paths in the directory
        """
        pass

    @abstractmethod
    def walk(self, top: str) -> Generator:
        """
        Walk through directory tree, similar to os.walk().

        Args:
            top: Starting directory for the walk

        Returns:
            Generator yielding tuples of (dirpath, dirnames, filenames)
        """
        pass

    @abstractmethod
    def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
        """
        Context manager for file operations.

        Args:
            file: Path to the file
            mode: File mode ('r', 'w', 'rb', 'wb')

        Yields:
            File-like object

        Raises:
            IOError: If file cannot be opened
        """
        pass

    @abstractmethod
    def is_file(self, path: str) -> bool:
        """
        Check if path points to a file.

        Args:
            path: Path to check

        Returns:
            True if path is a file, False otherwise
        """
        pass

    @abstractmethod
    def is_dir(self, path: str) -> bool:
        """
        Check if path points to a directory.

        Args:
            path: Path to check

        Returns:
            True if path is a directory, False otherwise
        """
        pass

    @abstractmethod
    def remove(self, path: str) -> None:
        """
        Remove a file.

        Args:
            path: Path to the file to remove

        Raises:
            IOError: If file cannot be removed
        """
        pass

    @abstractmethod
    def rmdir(self, dir: str) -> None:
        """
        Remove a directory and all its contents.

        Args:
            dir: Path to the directory to remove

        Raises:
            IOError: If directory cannot be removed
        """
        pass

file_exists(path) abstractmethod

Check if a file exists in the data store.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def file_exists(self, path: str) -> bool:
    """
    Check if a file exists in the data store.

    Args:
        path: Path to check

    Returns:
        True if file exists, False otherwise
    """
    pass

is_dir(path) abstractmethod

Check if path points to a directory.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a directory, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_dir(self, path: str) -> bool:
    """
    Check if path points to a directory.

    Args:
        path: Path to check

    Returns:
        True if path is a directory, False otherwise
    """
    pass

is_file(path) abstractmethod

Check if path points to a file.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a file, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_file(self, path: str) -> bool:
    """
    Check if path points to a file.

    Args:
        path: Path to check

    Returns:
        True if path is a file, False otherwise
    """
    pass

list_files(path) abstractmethod

List all files in a directory.

Parameters:

Name Type Description Default
path str

Directory path to list

required

Returns:

Type Description
List[str]

List of file paths in the directory

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def list_files(self, path: str) -> List[str]:
    """
    List all files in a directory.

    Args:
        path: Directory path to list

    Returns:
        List of file paths in the directory
    """
    pass

open(file, mode='r') abstractmethod

Context manager for file operations.

Parameters:

Name Type Description Default
file str

Path to the file

required
mode str

File mode ('r', 'w', 'rb', 'wb')

'r'

Yields:

Type Description
Union[str, bytes]

File-like object

Raises:

Type Description
IOError

If file cannot be opened

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
    """
    Context manager for file operations.

    Args:
        file: Path to the file
        mode: File mode ('r', 'w', 'rb', 'wb')

    Yields:
        File-like object

    Raises:
        IOError: If file cannot be opened
    """
    pass

read_file(path) abstractmethod

Read contents of a file from the data store.

Parameters:

Name Type Description Default
path str

Path to the file to read

required

Returns:

Type Description
Any

Contents of the file

Raises:

Type Description
IOError

If file cannot be read

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def read_file(self, path: str) -> Any:
    """
    Read contents of a file from the data store.

    Args:
        path: Path to the file to read

    Returns:
        Contents of the file

    Raises:
        IOError: If file cannot be read
    """
    pass

remove(path) abstractmethod

Remove a file.

Parameters:

Name Type Description Default
path str

Path to the file to remove

required

Raises:

Type Description
IOError

If file cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def remove(self, path: str) -> None:
    """
    Remove a file.

    Args:
        path: Path to the file to remove

    Raises:
        IOError: If file cannot be removed
    """
    pass

rmdir(dir) abstractmethod

Remove a directory and all its contents.

Parameters:

Name Type Description Default
dir str

Path to the directory to remove

required

Raises:

Type Description
IOError

If directory cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def rmdir(self, dir: str) -> None:
    """
    Remove a directory and all its contents.

    Args:
        dir: Path to the directory to remove

    Raises:
        IOError: If directory cannot be removed
    """
    pass

walk(top) abstractmethod

Walk through directory tree, similar to os.walk().

Parameters:

Name Type Description Default
top str

Starting directory for the walk

required

Returns:

Type Description
Generator

Generator yielding tuples of (dirpath, dirnames, filenames)

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def walk(self, top: str) -> Generator:
    """
    Walk through directory tree, similar to os.walk().

    Args:
        top: Starting directory for the walk

    Returns:
        Generator yielding tuples of (dirpath, dirnames, filenames)
    """
    pass

write_file(path, data) abstractmethod

Write data to a file in the data store.

Parameters:

Name Type Description Default
path str

Path where to write the file

required
data Any

Data to write to the file

required

Raises:

Type Description
IOError

If file cannot be written

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def write_file(self, path: str, data: Any) -> None:
    """
    Write data to a file in the data store.

    Args:
        path: Path where to write the file
        data: Data to write to the file

    Raises:
        IOError: If file cannot be written
    """
    pass

LocalDataStore

Bases: DataStore

Implementation for local filesystem storage.

Source code in gigaspatial/core/io/local_data_store.py
class LocalDataStore(DataStore):
    """Implementation for local filesystem storage."""

    def __init__(self, base_path: Union[str, Path] = ""):
        super().__init__()
        self.base_path = Path(base_path).resolve()

    def _resolve_path(self, path: str) -> Path:
        """Resolve path relative to base directory."""
        return self.base_path / path

    def read_file(self, path: str) -> bytes:
        full_path = self._resolve_path(path)
        with open(full_path, "rb") as f:
            return f.read()

    def write_file(self, path: str, data: Union[bytes, str]) -> None:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)

        if isinstance(data, str):
            mode = "w"
            encoding = "utf-8"
        else:
            mode = "wb"
            encoding = None

        with open(full_path, mode, encoding=encoding) as f:
            f.write(data)

    def file_exists(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def list_files(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)
        return [
            str(f.relative_to(self.base_path))
            for f in full_path.iterdir()
            if f.is_file()
        ]

    def walk(self, top: str) -> Generator[Tuple[str, List[str], List[str]], None, None]:
        full_path = self._resolve_path(top)
        for root, dirs, files in os.walk(full_path):
            rel_root = str(Path(root).relative_to(self.base_path))
            yield rel_root, dirs, files

    def list_directories(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)

        if not full_path.exists():
            return []

        if not full_path.is_dir():
            return []

        return [d.name for d in full_path.iterdir() if d.is_dir()]

    def open(self, path: str, mode: str = "r") -> IO:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)
        return open(full_path, mode)

    def is_file(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def is_dir(self, path: str) -> bool:
        return self._resolve_path(path).is_dir()

    def remove(self, path: str) -> None:
        full_path = self._resolve_path(path)
        if full_path.is_file():
            os.remove(full_path)

    def rmdir(self, directory: str) -> None:
        full_path = self._resolve_path(directory)
        if full_path.is_dir():
            os.rmdir(full_path)

    def mkdir(self, path: str, exist_ok: bool = False) -> None:
        full_path = self._resolve_path(path)
        full_path.mkdir(parents=True, exist_ok=exist_ok)

    def exists(self, path: str) -> bool:
        return self._resolve_path(path).exists()

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.

Source code in gigaspatial/processing/tif_processor.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.
    """

    dataset_path: Union[Path, str]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba", "multi"] = "single"

    def __post_init__(self):
        """Validate inputs and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(self.__class__.__name__)
        self._cache = {}

        if not self.data_store.file_exists(self.dataset_path):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        self._load_metadata()

        # Validate mode and band count
        if self.mode == "rgba" and self.count != 4:
            raise ValueError("RGBA mode requires a 4-band TIF file")
        if self.mode == "rgb" and self.count != 3:
            raise ValueError("RGB mode requires a 3-band TIF file")
        if self.mode == "single" and self.count != 1:
            raise ValueError("Single mode requires a 1-band TIF file")
        if self.mode == "multi" and self.count < 2:
            raise ValueError("Multi mode requires a TIF file with 2 or more bands")

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset"""
        with self.data_store.open(self.dataset_path, "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        if not self._cache:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def tabular(self) -> pd.DataFrame:
        """Get the data from the TIF file"""
        if not hasattr(self, "_tabular"):
            try:
                if self.mode == "single":
                    self._tabular = self._to_band_dataframe(
                        drop_nodata=True, drop_values=[]
                    )
                elif self.mode == "rgb":
                    self._tabular = self._to_rgb_dataframe(drop_nodata=True)
                elif self.mode == "rgba":
                    self._tabular = self._to_rgba_dataframe(drop_transparent=True)
                elif self.mode == "multi":
                    self._tabular = self._to_multi_band_dataframe(
                        drop_nodata=True,
                        drop_values=[],
                        band_names=None,  # Use default band naming
                    )
                else:
                    raise ValueError(
                        f"Invalid mode: {self.mode}. Must be one of: single, rgb, rgba, multi"
                    )
            except Exception as e:
                raise ValueError(
                    f"Failed to process TIF file in mode '{self.mode}'. "
                    f"Please ensure the file is valid and matches the selected mode. "
                    f"Original error: {str(e)}"
                )

        return self._tabular

    def to_dataframe(self) -> pd.DataFrame:
        return self.tabular

    def get_zoned_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.
        """
        self.logger.info("Converting data to GeoDataFrame with zones...")

        df = self.tabular

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        self.logger.info("Conversion to GeoDataFrame complete!")
        return gdf

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]]
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            else:
                if src.count != 1:
                    raise ValueError("Single band mode requires a 1-band TIF file")
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self, polygon_list: List[Union[Polygon, MultiPolygon]], stat: str = "mean"
    ) -> np.ndarray:
        """
        Sample raster values within each polygon of a GeoDataFrame.

        Parameters:
            polygon_list: List of polygon geometries (can include MultiPolygons).
            stat (str): Aggregation statistic to compute within each polygon.
                        Options: "mean", "median", "sum", "min", "max".
        Returns:
            A NumPy array of sampled values
        """
        self.logger.info("Sampling raster values within polygons...")

        with self.open_dataset() as src:
            results = []

            for geom in polygon_list:
                if geom.is_empty:
                    results.append(np.nan)
                    continue

                try:
                    # Mask the raster with the polygon
                    out_image, _ = mask(src, [geom], crop=True)

                    # Flatten the raster values and remove NoData values
                    values = out_image[out_image != src.nodata].flatten()

                    # Compute the desired statistic
                    if len(values) == 0:
                        results.append(np.nan)
                    else:
                        if stat == "mean":
                            results.append(np.mean(values))
                        elif stat == "median":
                            results.append(np.median(values))
                        elif stat == "sum":
                            results.append(np.sum(values))
                        elif stat == "min":
                            results.append(np.min(values))
                        elif stat == "max":
                            results.append(np.max(values))
                        else:
                            raise ValueError(f"Unknown statistic: {stat}")

                except Exception as e:
                    self.logger.error(f"Error processing polygon: {e}")
                    results.append(np.nan)

        return np.array(results)

    def _to_rgba_dataframe(self, drop_transparent: bool = False) -> pd.DataFrame:
        """
        Convert RGBA TIF to DataFrame with separate columns for R, G, B, A values.
        """
        self.logger.info("Processing RGBA dataset...")

        with self.open_dataset() as src:
            if self.count != 4:
                raise ValueError("RGBA mode requires a 4-band TIF file")

            # Read all four bands
            red, green, blue, alpha = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_transparent:
                mask = alpha > 0
                red = np.extract(mask, red)
                green = np.extract(mask, green)
                blue = np.extract(mask, blue)
                alpha = np.extract(mask, alpha)
                lons = np.extract(mask, x_coords)
                lats = np.extract(mask, y_coords)
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()
                alpha = alpha.flatten()

            # Create DataFrame with RGBA values
            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                    "alpha": alpha,
                }
            )

            # Normalize alpha values if they're not in [0, 1] range
            if data["alpha"].max() > 1:
                data["alpha"] = data["alpha"] / data["alpha"].max()

        self.logger.info("RGBA dataset is processed!")
        return data

    def _to_rgb_dataframe(self, drop_nodata: bool = True) -> pd.DataFrame:
        """Convert RGB TIF to DataFrame with separate columns for R, G, B values."""
        if self.mode != "rgb":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing RGB dataset...")

        with self.open_dataset() as src:
            if self.count != 3:
                raise ValueError("RGB mode requires a 3-band TIF file")

            # Read all three bands
            red, green, blue = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    mask = ~(
                        (red == nodata_value)
                        | (green == nodata_value)
                        | (blue == nodata_value)
                    )
                    red = np.extract(mask, red)
                    green = np.extract(mask, green)
                    blue = np.extract(mask, blue)
                    lons = np.extract(mask, x_coords)
                    lats = np.extract(mask, y_coords)
                else:
                    lons = x_coords.flatten()
                    lats = y_coords.flatten()
                    red = red.flatten()
                    green = green.flatten()
                    blue = blue.flatten()
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()

            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                }
            )

        self.logger.info("RGB dataset is processed!")
        return data

    def _to_band_dataframe(
        self, band_number: int = 1, drop_nodata: bool = True, drop_values: list = []
    ) -> pd.DataFrame:
        """Process single-band TIF to DataFrame."""
        if self.mode != "single":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing single-band dataset...")

        if band_number <= 0 or band_number > self.count:
            self.logger.error(
                f"Error: Band number {band_number} is out of range. The file has {self.count} bands."
            )
            return None

        with self.open_dataset() as src:

            band = src.read(band_number)

            x_coords, y_coords = self._get_pixel_coordinates()

            values_to_mask = []
            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    values_to_mask.append(nodata_value)

            if drop_values:
                values_to_mask.extend(drop_values)

            if values_to_mask:
                data_mask = ~np.isin(band, values_to_mask)
                pixel_values = np.extract(data_mask, band)
                lons = np.extract(data_mask, x_coords)
                lats = np.extract(data_mask, y_coords)
            else:
                pixel_values = band.flatten()
                lons = x_coords.flatten()
                lats = y_coords.flatten()

            data = pd.DataFrame({"lon": lons, "lat": lats, "pixel_value": pixel_values})

        self.logger.info("Dataset is processed!")
        return data

    def _to_multi_band_dataframe(
        self,
        drop_nodata: bool = True,
        drop_values: list = [],
        band_names: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """
        Process multi-band TIF to DataFrame with all bands included.

        Args:
            drop_nodata (bool): Whether to drop nodata values. Defaults to True.
            drop_values (list): Additional values to drop from the dataset. Defaults to empty list.
            band_names (Optional[List[str]]): Custom names for the bands. If None, bands will be named using
                                            the band descriptions from the GeoTIFF metadata if available,
                                            otherwise 'band_1', 'band_2', etc.

        Returns:
            pd.DataFrame: DataFrame containing coordinates and all band values
        """
        self.logger.info("Processing multi-band dataset...")

        with self.open_dataset() as src:
            # Read all bands
            stack = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            # Initialize dictionary with coordinates
            data_dict = {"lon": x_coords.flatten(), "lat": y_coords.flatten()}

            # Get band descriptions from metadata if available
            if band_names is None and hasattr(src, "descriptions") and src.descriptions:
                band_names = [
                    desc if desc else f"band_{i+1}"
                    for i, desc in enumerate(src.descriptions)
                ]

            # Process each band
            for band_idx in range(self.count):
                band_data = stack[band_idx]

                # Handle nodata and other values to drop
                if drop_nodata or drop_values:
                    values_to_mask = []
                    if drop_nodata and src.nodata is not None:
                        values_to_mask.append(src.nodata)
                    if drop_values:
                        values_to_mask.extend(drop_values)

                    if values_to_mask:
                        data_mask = ~np.isin(band_data, values_to_mask)
                        band_values = np.extract(data_mask, band_data)
                        if band_idx == 0:  # Only need to mask coordinates once
                            data_dict["lon"] = np.extract(data_mask, x_coords)
                            data_dict["lat"] = np.extract(data_mask, y_coords)
                    else:
                        band_values = band_data.flatten()
                else:
                    band_values = band_data.flatten()

                # Use custom band names if provided, otherwise use descriptions or default naming
                band_name = (
                    band_names[band_idx]
                    if band_names and len(band_names) > band_idx
                    else f"band_{band_idx + 1}"
                )
                data_dict[band_name] = band_values

        self.logger.info("Multi-band dataset is processed!")
        return pd.DataFrame(data_dict)

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]

bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

tabular: pd.DataFrame property

Get the data from the TIF file

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__post_init__()

Validate inputs and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(self.__class__.__name__)
    self._cache = {}

    if not self.data_store.file_exists(self.dataset_path):
        raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

    self._load_metadata()

    # Validate mode and band count
    if self.mode == "rgba" and self.count != 4:
        raise ValueError("RGBA mode requires a 4-band TIF file")
    if self.mode == "rgb" and self.count != 3:
        raise ValueError("RGB mode requires a 3-band TIF file")
    if self.mode == "single" and self.count != 1:
        raise ValueError("Single mode requires a 1-band TIF file")
    if self.mode == "multi" and self.count < 2:
        raise ValueError("Multi mode requires a TIF file with 2 or more bands")

get_zoned_geodataframe()

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Source code in gigaspatial/processing/tif_processor.py
def get_zoned_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.
    """
    self.logger.info("Converting data to GeoDataFrame with zones...")

    df = self.tabular

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    self.logger.info("Conversion to GeoDataFrame complete!")
    return gdf

open_dataset()

Context manager for accessing the dataset

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset"""
    with self.data_store.open(self.dataset_path, "rb") as f:
        with rasterio.MemoryFile(f.read()) as memfile:
            with memfile.open() as src:
                yield src

sample_by_polygons(polygon_list, stat='mean')

Sample raster values within each polygon of a GeoDataFrame.

Parameters:

Name Type Description Default
polygon_list List[Union[Polygon, MultiPolygon]]

List of polygon geometries (can include MultiPolygons).

required
stat str

Aggregation statistic to compute within each polygon. Options: "mean", "median", "sum", "min", "max".

'mean'
Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self, polygon_list: List[Union[Polygon, MultiPolygon]], stat: str = "mean"
) -> np.ndarray:
    """
    Sample raster values within each polygon of a GeoDataFrame.

    Parameters:
        polygon_list: List of polygon geometries (can include MultiPolygons).
        stat (str): Aggregation statistic to compute within each polygon.
                    Options: "mean", "median", "sum", "min", "max".
    Returns:
        A NumPy array of sampled values
    """
    self.logger.info("Sampling raster values within polygons...")

    with self.open_dataset() as src:
        results = []

        for geom in polygon_list:
            if geom.is_empty:
                results.append(np.nan)
                continue

            try:
                # Mask the raster with the polygon
                out_image, _ = mask(src, [geom], crop=True)

                # Flatten the raster values and remove NoData values
                values = out_image[out_image != src.nodata].flatten()

                # Compute the desired statistic
                if len(values) == 0:
                    results.append(np.nan)
                else:
                    if stat == "mean":
                        results.append(np.mean(values))
                    elif stat == "median":
                        results.append(np.median(values))
                    elif stat == "sum":
                        results.append(np.sum(values))
                    elif stat == "min":
                        results.append(np.min(values))
                    elif stat == "max":
                        results.append(np.max(values))
                    else:
                        raise ValueError(f"Unknown statistic: {stat}")

            except Exception as e:
                self.logger.error(f"Error processing polygon: {e}")
                results.append(np.nan)

    return np.array(results)

add_area_in_meters(gdf, area_column_name='area_in_meters')

Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries. area_column_name : str, optional Name of the new column to store the area values. Default is "area_m2".

Returns:

geopandas.GeoDataFrame The input GeoDataFrame with an additional column for the area in square meters.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def add_area_in_meters(
    gdf: gpd.GeoDataFrame, area_column_name: str = "area_in_meters"
) -> gpd.GeoDataFrame:
    """
    Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.
    area_column_name : str, optional
        Name of the new column to store the area values. Default is "area_m2".

    Returns:
    -------
    geopandas.GeoDataFrame
        The input GeoDataFrame with an additional column for the area in square meters.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Create a copy of the GeoDataFrame to avoid modifying the original
    gdf_with_area = gdf.copy()

    # Calculate the UTM CRS for accurate area calculation
    utm_crs = gdf_with_area.estimate_utm_crs()

    # Transform to UTM CRS and calculate the area in square meters
    gdf_with_area[area_column_name] = gdf_with_area.to_crs(utm_crs).geometry.area

    return gdf_with_area

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

aggregate_points_to_zones(points, zones, value_columns=None, aggregation='count', point_zone_predicate='within', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate point data to zones with flexible aggregation methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

Point data to aggregate

required
zones GeoDataFrame

Zones to aggregate points to

required
value_columns Optional[Union[str, List[str]]]

Column(s) containing values to aggregate If None, only counts will be performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max") - Dict: Map column names to aggregation methods

'count'
point_zone_predicate str

Spatial predicate for point-to-zone relationship Options: "within", "intersects", "contains"

'within'
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated point values

Example

poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count") poi_value_mean = aggregate_points_to_zones( ... pois, zones, value_columns="score", aggregation="mean" ... ) poi_multiple = aggregate_points_to_zones( ... pois, zones, ... value_columns=["score", "visits"], ... aggregation={"score": "mean", "visits": "sum"} ... )

Source code in gigaspatial/processing/geo.py
def aggregate_points_to_zones(
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    point_zone_predicate: str = "within",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate point data to zones with flexible aggregation methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): Point data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate points to
        value_columns (Optional[Union[str, List[str]]]): Column(s) containing values to aggregate
            If None, only counts will be performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max")
            - Dict: Map column names to aggregation methods
        point_zone_predicate (str): Spatial predicate for point-to-zone relationship
            Options: "within", "intersects", "contains"
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated point values

    Example:
        >>> poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count")
        >>> poi_value_mean = aggregate_points_to_zones(
        ...     pois, zones, value_columns="score", aggregation="mean"
        ... )
        >>> poi_multiple = aggregate_points_to_zones(
        ...     pois, zones,
        ...     value_columns=["score", "visits"],
        ...     aggregation={"score": "mean", "visits": "sum"}
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert points to GeoDataFrame if necessary
    if not isinstance(points, gpd.GeoDataFrame):
        points_gdf = convert_to_geodataframe(points)
    else:
        points_gdf = points.copy()

    # Ensure CRS match
    if points_gdf.crs != zones.crs:
        points_gdf = points_gdf.to_crs(zones.crs)

    # Handle value columns
    if value_columns is not None:
        if isinstance(value_columns, str):
            value_columns = [value_columns]

        # Validate that all value columns exist
        missing_cols = [col for col in value_columns if col not in points_gdf.columns]
        if missing_cols:
            raise ValueError(f"Value columns not found in points data: {missing_cols}")

    # Handle aggregation method
    agg_funcs = {}

    if isinstance(aggregation, str):
        if aggregation == "count":
            # Special case for count (doesn't need value columns)
            agg_funcs["__count"] = "count"
        elif value_columns is not None:
            # Apply the same aggregation to all value columns
            agg_funcs = {col: aggregation for col in value_columns}
        else:
            raise ValueError(
                "Value columns must be specified for aggregation methods other than 'count'"
            )
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        if value_columns is None:
            raise ValueError(
                "Value columns must be specified when using a dictionary of aggregation methods"
            )

        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    # Spatial join
    joined = gpd.sjoin(points_gdf, zones, how="inner", predicate=point_zone_predicate)

    # Perform aggregation
    if "geometry" in joined.columns and not all(
        value == "count" for value in agg_funcs.values()
    ):
        # Drop geometry for non-count aggregations to avoid errors
        joined = joined.drop(columns=["geometry"])

    if "__count" in agg_funcs:
        # Count points per zone
        counts = (
            joined.groupby(zone_id_column)
            .size()
            .reset_index(name=f"point_count{output_suffix}")
        )
        result = result.merge(counts, on=zone_id_column, how="left")
        result[f"point_count{output_suffix}"] = (
            result[f"point_count{output_suffix}"].fillna(0).astype(int)
        )
    else:
        # Aggregate values
        aggregated = joined.groupby(zone_id_column).agg(agg_funcs).reset_index()

        # Rename columns to include aggregation method
        if len(value_columns) > 0:
            # Handle MultiIndex columns from pandas aggregation
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    (
                        f"{col[0]}_{col[1]}{output_suffix}"
                        if col[0] != zone_id_column
                        else zone_id_column
                    )
                    for col in aggregated.columns
                ]

            # Merge back to zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])
        return result

    return result

aggregate_polygons_to_zones(polygons, zones, value_columns, aggregation='sum', area_weighted=True, zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate polygon data to zones with area-weighted values.

This function maps polygon data to zones, weighting values by the fractional area of overlap between polygons and zones.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

Polygon data to aggregate

required
zones GeoDataFrame

Zones to aggregate polygons to

required
value_columns Union[str, List[str]]

Column(s) containing values to aggregate

required
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("sum", "mean", "max", etc.) - Dict: Map column names to aggregation methods

'sum'
area_weighted bool

Whether to weight values by fractional area overlap If False, values are not weighted before aggregation

True
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated polygon values

Example

landuse_stats = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns=["area", "population"], ... aggregation="sum" ... )

Source code in gigaspatial/processing/geo.py
def aggregate_polygons_to_zones(
    polygons: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Union[str, List[str]],
    aggregation: Union[str, Dict[str, str]] = "sum",
    area_weighted: bool = True,
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate polygon data to zones with area-weighted values.

    This function maps polygon data to zones, weighting values by the
    fractional area of overlap between polygons and zones.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]): Polygon data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate polygons to
        value_columns (Union[str, List[str]]): Column(s) containing values to aggregate
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("sum", "mean", "max", etc.)
            - Dict: Map column names to aggregation methods
        area_weighted (bool): Whether to weight values by fractional area overlap
            If False, values are not weighted before aggregation
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated polygon values

    Example:
        >>> landuse_stats = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns=["area", "population"],
        ...     aggregation="sum"
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert polygons to GeoDataFrame if necessary
    if not isinstance(polygons, gpd.GeoDataFrame):
        try:
            polygons_gdf = convert_to_geodataframe(polygons)
        except:
            raise TypeError("polygons must be a GeoDataFrame or convertible to one")
    else:
        polygons_gdf = polygons.copy()

    # Validate geometry types
    non_polygon_geoms = [
        geom_type
        for geom_type in polygons_gdf.geometry.geom_type.unique()
        if geom_type not in ["Polygon", "MultiPolygon"]
    ]
    if non_polygon_geoms:
        raise ValueError(
            f"Input contains non-polygon geometries: {non_polygon_geoms}. "
            "Use aggregate_points_to_zones for point data."
        )

    # Process value columns
    if isinstance(value_columns, str):
        value_columns = [value_columns]

    # Validate that all value columns exist
    missing_cols = [col for col in value_columns if col not in polygons_gdf.columns]
    if missing_cols:
        raise ValueError(f"Value columns not found in polygons data: {missing_cols}")

    # Ensure CRS match
    if polygons_gdf.crs != zones.crs:
        polygons_gdf = polygons_gdf.to_crs(zones.crs)

    # Handle aggregation method
    if isinstance(aggregation, str):
        agg_funcs = {col: aggregation for col in value_columns}
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    if area_weighted:
        # Use area-weighted aggregation with polygon overlay
        try:
            # Compute UTM CRS for accurate area calculations
            overlay_utm_crs = polygons_gdf.estimate_utm_crs()

            # Prepare polygons for overlay
            polygons_utm = polygons_gdf.to_crs(overlay_utm_crs)
            polygons_utm["orig_area"] = polygons_utm.area

            # Keep only necessary columns
            overlay_cols = value_columns + ["geometry", "orig_area"]
            overlay_gdf = polygons_utm[overlay_cols].copy()

            # Prepare zones for overlay
            zones_utm = zones.to_crs(overlay_utm_crs)

            # Perform the spatial overlay
            gdf_overlayed = gpd.overlay(
                overlay_gdf, zones_utm[[zone_id_column, "geometry"]], how="intersection"
            )

            # Calculate fractional areas
            gdf_overlayed["intersection_area"] = gdf_overlayed.area
            gdf_overlayed["area_fraction"] = (
                gdf_overlayed["intersection_area"] / gdf_overlayed["orig_area"]
            )

            # Apply area weighting to value columns
            for col in value_columns:
                gdf_overlayed[col] = gdf_overlayed[col] * gdf_overlayed["area_fraction"]

            # Aggregate by zone ID
            aggregated = gdf_overlayed.groupby(zone_id_column)[value_columns].agg(
                agg_funcs
            )

            # Handle column naming for multi-level index
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    f"{col[0]}_{col[1]}{output_suffix}" for col in aggregated.columns
                ]

            # Reset index
            aggregated = aggregated.reset_index()

            # Merge aggregated values back to the zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

        except Exception as e:
            raise RuntimeError(f"Error during area-weighted aggregation: {e}")

    else:
        # Non-weighted aggregation - simpler approach
        # Perform spatial join
        joined = gpd.sjoin(polygons_gdf, zones, how="inner", predicate="intersects")

        # Remove geometry column for aggregation
        if "geometry" in joined.columns:
            joined = joined.drop(columns=["geometry"])

        # Group by zone ID and aggregate
        aggregated = joined.groupby(zone_id_column)[value_columns].agg(agg_funcs)

        # Handle column naming for multi-level index
        if isinstance(aggregated.columns, pd.MultiIndex):
            aggregated.columns = [
                f"{col[0]}_{col[1]}{output_suffix}" for col in aggregated.columns
            ]

        # Reset index and merge back to zones
        aggregated = aggregated.reset_index()
        result = result.merge(aggregated, on=zone_id_column, how="left")

        # Fill NaN values with zeros
        for col in result.columns:
            if (
                col != zone_id_column
                and col != "geometry"
                and pd.api.types.is_numeric_dtype(result[col])
            ):
                result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])

    return result

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: float,
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if not isinstance(buffer_distance_meters, (float, int)):
        raise TypeError("Buffer distance must be a number")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        # Create a custom UTM CRS based on the calculated UTM zone
        utm_crs = gdf_work.estimate_utm_crs()

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

get_centroids(gdf)

Calculate the centroids of a (Multi)Polygon GeoDataFrame.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries.

Returns:

geopandas.GeoDataFrame A new GeoDataFrame with Point geometries representing the centroids.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def get_centroids(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate the centroids of a (Multi)Polygon GeoDataFrame.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with Point geometries representing the centroids.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Calculate centroids
    centroids = gdf.copy()
    centroids["geometry"] = centroids.geometry.centroid

    return centroids

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

sample_multiple_tifs_by_coordinates(tif_processors, coordinate_list)

Sample raster values from multiple TIFF files for given coordinates.

Parameters: - tif_processors: List of TifProcessor instances. - coordinate_list: List of (x, y) coordinates.

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_coordinates(
    tif_processors: List[TifProcessor], coordinate_list: List[Tuple[float, float]]
):
    """
    Sample raster values from multiple TIFF files for given coordinates.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - coordinate_list: List of (x, y) coordinates.

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(coordinate_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_coordinates(coordinate_list=coordinate_list)

        if tp.nodata is not None:
            mask = (np.isnan(sampled_values)) & (
                values != tp.nodata
            )  # Replace only NaNs
        else:
            mask = np.isnan(sampled_values)  # No explicit nodata, replace all NaNs

        sampled_values[mask] = values[mask]  # Update only missing values

    return sampled_values

sample_multiple_tifs_by_polygons(tif_processors, polygon_list, stat='mean')

Sample raster values from multiple TIFF files for polygons in a list and join the results.

Parameters: - tif_processors: List of TifProcessor instances. - polygon_list: List of polygon geometries (can include MultiPolygons). - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_polygons(
    tif_processors: List[TifProcessor],
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: str = "mean",
) -> np.ndarray:
    """
    Sample raster values from multiple TIFF files for polygons in a list and join the results.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - polygon_list: List of polygon geometries (can include MultiPolygons).
    - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(polygon_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_polygons(polygon_list=polygon_list, stat=stat)

        mask = np.isnan(sampled_values)  # replace all NaNs

        sampled_values[mask] = values[mask]  # Update only values with samapled value

    return sampled_values

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

geo

add_area_in_meters(gdf, area_column_name='area_in_meters')

Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries. area_column_name : str, optional Name of the new column to store the area values. Default is "area_m2".

Returns:

geopandas.GeoDataFrame The input GeoDataFrame with an additional column for the area in square meters.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def add_area_in_meters(
    gdf: gpd.GeoDataFrame, area_column_name: str = "area_in_meters"
) -> gpd.GeoDataFrame:
    """
    Calculate the area of (Multi)Polygon geometries in square meters and add it as a new column.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.
    area_column_name : str, optional
        Name of the new column to store the area values. Default is "area_m2".

    Returns:
    -------
    geopandas.GeoDataFrame
        The input GeoDataFrame with an additional column for the area in square meters.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Create a copy of the GeoDataFrame to avoid modifying the original
    gdf_with_area = gdf.copy()

    # Calculate the UTM CRS for accurate area calculation
    utm_crs = gdf_with_area.estimate_utm_crs()

    # Transform to UTM CRS and calculate the area in square meters
    gdf_with_area[area_column_name] = gdf_with_area.to_crs(utm_crs).geometry.area

    return gdf_with_area

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

aggregate_points_to_zones(points, zones, value_columns=None, aggregation='count', point_zone_predicate='within', zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate point data to zones with flexible aggregation methods.

Parameters:

Name Type Description Default
points Union[DataFrame, GeoDataFrame]

Point data to aggregate

required
zones GeoDataFrame

Zones to aggregate points to

required
value_columns Optional[Union[str, List[str]]]

Column(s) containing values to aggregate If None, only counts will be performed.

None
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max") - Dict: Map column names to aggregation methods

'count'
point_zone_predicate str

Spatial predicate for point-to-zone relationship Options: "within", "intersects", "contains"

'within'
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated point values

Example

poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count") poi_value_mean = aggregate_points_to_zones( ... pois, zones, value_columns="score", aggregation="mean" ... ) poi_multiple = aggregate_points_to_zones( ... pois, zones, ... value_columns=["score", "visits"], ... aggregation={"score": "mean", "visits": "sum"} ... )

Source code in gigaspatial/processing/geo.py
def aggregate_points_to_zones(
    points: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Optional[Union[str, List[str]]] = None,
    aggregation: Union[str, Dict[str, str]] = "count",
    point_zone_predicate: str = "within",
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate point data to zones with flexible aggregation methods.

    Args:
        points (Union[pd.DataFrame, gpd.GeoDataFrame]): Point data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate points to
        value_columns (Optional[Union[str, List[str]]]): Column(s) containing values to aggregate
            If None, only counts will be performed.
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("count", "mean", "sum", "min", "max")
            - Dict: Map column names to aggregation methods
        point_zone_predicate (str): Spatial predicate for point-to-zone relationship
            Options: "within", "intersects", "contains"
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated point values

    Example:
        >>> poi_counts = aggregate_points_to_zones(pois, zones, aggregation="count")
        >>> poi_value_mean = aggregate_points_to_zones(
        ...     pois, zones, value_columns="score", aggregation="mean"
        ... )
        >>> poi_multiple = aggregate_points_to_zones(
        ...     pois, zones,
        ...     value_columns=["score", "visits"],
        ...     aggregation={"score": "mean", "visits": "sum"}
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert points to GeoDataFrame if necessary
    if not isinstance(points, gpd.GeoDataFrame):
        points_gdf = convert_to_geodataframe(points)
    else:
        points_gdf = points.copy()

    # Ensure CRS match
    if points_gdf.crs != zones.crs:
        points_gdf = points_gdf.to_crs(zones.crs)

    # Handle value columns
    if value_columns is not None:
        if isinstance(value_columns, str):
            value_columns = [value_columns]

        # Validate that all value columns exist
        missing_cols = [col for col in value_columns if col not in points_gdf.columns]
        if missing_cols:
            raise ValueError(f"Value columns not found in points data: {missing_cols}")

    # Handle aggregation method
    agg_funcs = {}

    if isinstance(aggregation, str):
        if aggregation == "count":
            # Special case for count (doesn't need value columns)
            agg_funcs["__count"] = "count"
        elif value_columns is not None:
            # Apply the same aggregation to all value columns
            agg_funcs = {col: aggregation for col in value_columns}
        else:
            raise ValueError(
                "Value columns must be specified for aggregation methods other than 'count'"
            )
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        if value_columns is None:
            raise ValueError(
                "Value columns must be specified when using a dictionary of aggregation methods"
            )

        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    # Spatial join
    joined = gpd.sjoin(points_gdf, zones, how="inner", predicate=point_zone_predicate)

    # Perform aggregation
    if "geometry" in joined.columns and not all(
        value == "count" for value in agg_funcs.values()
    ):
        # Drop geometry for non-count aggregations to avoid errors
        joined = joined.drop(columns=["geometry"])

    if "__count" in agg_funcs:
        # Count points per zone
        counts = (
            joined.groupby(zone_id_column)
            .size()
            .reset_index(name=f"point_count{output_suffix}")
        )
        result = result.merge(counts, on=zone_id_column, how="left")
        result[f"point_count{output_suffix}"] = (
            result[f"point_count{output_suffix}"].fillna(0).astype(int)
        )
    else:
        # Aggregate values
        aggregated = joined.groupby(zone_id_column).agg(agg_funcs).reset_index()

        # Rename columns to include aggregation method
        if len(value_columns) > 0:
            # Handle MultiIndex columns from pandas aggregation
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    (
                        f"{col[0]}_{col[1]}{output_suffix}"
                        if col[0] != zone_id_column
                        else zone_id_column
                    )
                    for col in aggregated.columns
                ]

            # Merge back to zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])
        return result

    return result

aggregate_polygons_to_zones(polygons, zones, value_columns, aggregation='sum', area_weighted=True, zone_id_column='zone_id', output_suffix='', drop_geometry=False)

Aggregate polygon data to zones with area-weighted values.

This function maps polygon data to zones, weighting values by the fractional area of overlap between polygons and zones.

Parameters:

Name Type Description Default
polygons Union[DataFrame, GeoDataFrame]

Polygon data to aggregate

required
zones GeoDataFrame

Zones to aggregate polygons to

required
value_columns Union[str, List[str]]

Column(s) containing values to aggregate

required
aggregation Union[str, Dict[str, str]]

Aggregation method(s) to use: - Single string: Use same method for all columns ("sum", "mean", "max", etc.) - Dict: Map column names to aggregation methods

'sum'
area_weighted bool

Whether to weight values by fractional area overlap If False, values are not weighted before aggregation

True
zone_id_column str

Column in zones containing zone identifiers

'zone_id'
output_suffix str

Suffix to add to output column names

''
drop_geometry bool

Whether to drop the geometry column from output

False

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Zones with aggregated polygon values

Example

landuse_stats = aggregate_polygons_to_zones( ... landuse_polygons, ... grid_zones, ... value_columns=["area", "population"], ... aggregation="sum" ... )

Source code in gigaspatial/processing/geo.py
def aggregate_polygons_to_zones(
    polygons: Union[pd.DataFrame, gpd.GeoDataFrame],
    zones: gpd.GeoDataFrame,
    value_columns: Union[str, List[str]],
    aggregation: Union[str, Dict[str, str]] = "sum",
    area_weighted: bool = True,
    zone_id_column: str = "zone_id",
    output_suffix: str = "",
    drop_geometry: bool = False,
) -> gpd.GeoDataFrame:
    """
    Aggregate polygon data to zones with area-weighted values.

    This function maps polygon data to zones, weighting values by the
    fractional area of overlap between polygons and zones.

    Args:
        polygons (Union[pd.DataFrame, gpd.GeoDataFrame]): Polygon data to aggregate
        zones (gpd.GeoDataFrame): Zones to aggregate polygons to
        value_columns (Union[str, List[str]]): Column(s) containing values to aggregate
        aggregation (Union[str, Dict[str, str]]): Aggregation method(s) to use:
            - Single string: Use same method for all columns ("sum", "mean", "max", etc.)
            - Dict: Map column names to aggregation methods
        area_weighted (bool): Whether to weight values by fractional area overlap
            If False, values are not weighted before aggregation
        zone_id_column (str): Column in zones containing zone identifiers
        output_suffix (str): Suffix to add to output column names
        drop_geometry (bool): Whether to drop the geometry column from output

    Returns:
        gpd.GeoDataFrame: Zones with aggregated polygon values

    Example:
        >>> landuse_stats = aggregate_polygons_to_zones(
        ...     landuse_polygons,
        ...     grid_zones,
        ...     value_columns=["area", "population"],
        ...     aggregation="sum"
        ... )
    """
    # Input validation
    if not isinstance(zones, gpd.GeoDataFrame):
        raise TypeError("zones must be a GeoDataFrame")

    if zone_id_column not in zones.columns:
        raise ValueError(f"Zone ID column '{zone_id_column}' not found in zones")

    # Convert polygons to GeoDataFrame if necessary
    if not isinstance(polygons, gpd.GeoDataFrame):
        try:
            polygons_gdf = convert_to_geodataframe(polygons)
        except:
            raise TypeError("polygons must be a GeoDataFrame or convertible to one")
    else:
        polygons_gdf = polygons.copy()

    # Validate geometry types
    non_polygon_geoms = [
        geom_type
        for geom_type in polygons_gdf.geometry.geom_type.unique()
        if geom_type not in ["Polygon", "MultiPolygon"]
    ]
    if non_polygon_geoms:
        raise ValueError(
            f"Input contains non-polygon geometries: {non_polygon_geoms}. "
            "Use aggregate_points_to_zones for point data."
        )

    # Process value columns
    if isinstance(value_columns, str):
        value_columns = [value_columns]

    # Validate that all value columns exist
    missing_cols = [col for col in value_columns if col not in polygons_gdf.columns]
    if missing_cols:
        raise ValueError(f"Value columns not found in polygons data: {missing_cols}")

    # Ensure CRS match
    if polygons_gdf.crs != zones.crs:
        polygons_gdf = polygons_gdf.to_crs(zones.crs)

    # Handle aggregation method
    if isinstance(aggregation, str):
        agg_funcs = {col: aggregation for col in value_columns}
    elif isinstance(aggregation, dict):
        # Validate dictionary keys
        missing_aggs = [col for col in value_columns if col not in aggregation]
        extra_aggs = [col for col in aggregation if col not in value_columns]

        if missing_aggs:
            raise ValueError(f"Missing aggregation methods for columns: {missing_aggs}")
        if extra_aggs:
            raise ValueError(
                f"Aggregation methods specified for non-existent columns: {extra_aggs}"
            )

        agg_funcs = aggregation
    else:
        raise TypeError("aggregation must be a string or dictionary")

    # Create a copy of the zones
    result = zones.copy()

    if area_weighted:
        # Use area-weighted aggregation with polygon overlay
        try:
            # Compute UTM CRS for accurate area calculations
            overlay_utm_crs = polygons_gdf.estimate_utm_crs()

            # Prepare polygons for overlay
            polygons_utm = polygons_gdf.to_crs(overlay_utm_crs)
            polygons_utm["orig_area"] = polygons_utm.area

            # Keep only necessary columns
            overlay_cols = value_columns + ["geometry", "orig_area"]
            overlay_gdf = polygons_utm[overlay_cols].copy()

            # Prepare zones for overlay
            zones_utm = zones.to_crs(overlay_utm_crs)

            # Perform the spatial overlay
            gdf_overlayed = gpd.overlay(
                overlay_gdf, zones_utm[[zone_id_column, "geometry"]], how="intersection"
            )

            # Calculate fractional areas
            gdf_overlayed["intersection_area"] = gdf_overlayed.area
            gdf_overlayed["area_fraction"] = (
                gdf_overlayed["intersection_area"] / gdf_overlayed["orig_area"]
            )

            # Apply area weighting to value columns
            for col in value_columns:
                gdf_overlayed[col] = gdf_overlayed[col] * gdf_overlayed["area_fraction"]

            # Aggregate by zone ID
            aggregated = gdf_overlayed.groupby(zone_id_column)[value_columns].agg(
                agg_funcs
            )

            # Handle column naming for multi-level index
            if isinstance(aggregated.columns, pd.MultiIndex):
                aggregated.columns = [
                    f"{col[0]}_{col[1]}{output_suffix}" for col in aggregated.columns
                ]

            # Reset index
            aggregated = aggregated.reset_index()

            # Merge aggregated values back to the zones
            result = result.merge(aggregated, on=zone_id_column, how="left")

            # Fill NaN values with zeros
            for col in result.columns:
                if (
                    col != zone_id_column
                    and col != "geometry"
                    and pd.api.types.is_numeric_dtype(result[col])
                ):
                    result[col] = result[col].fillna(0)

        except Exception as e:
            raise RuntimeError(f"Error during area-weighted aggregation: {e}")

    else:
        # Non-weighted aggregation - simpler approach
        # Perform spatial join
        joined = gpd.sjoin(polygons_gdf, zones, how="inner", predicate="intersects")

        # Remove geometry column for aggregation
        if "geometry" in joined.columns:
            joined = joined.drop(columns=["geometry"])

        # Group by zone ID and aggregate
        aggregated = joined.groupby(zone_id_column)[value_columns].agg(agg_funcs)

        # Handle column naming for multi-level index
        if isinstance(aggregated.columns, pd.MultiIndex):
            aggregated.columns = [
                f"{col[0]}_{col[1]}{output_suffix}" for col in aggregated.columns
            ]

        # Reset index and merge back to zones
        aggregated = aggregated.reset_index()
        result = result.merge(aggregated, on=zone_id_column, how="left")

        # Fill NaN values with zeros
        for col in result.columns:
            if (
                col != zone_id_column
                and col != "geometry"
                and pd.api.types.is_numeric_dtype(result[col])
            ):
                result[col] = result[col].fillna(0)

    if drop_geometry:
        result = result.drop(columns=["geometry"])

    return result

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """
    from gigaspatial.handlers.boundaries import AdminBoundaries

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: float,
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if not isinstance(buffer_distance_meters, (float, int)):
        raise TypeError("Buffer distance must be a number")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        # Create a custom UTM CRS based on the calculated UTM zone
        utm_crs = gdf_work.estimate_utm_crs()

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

get_centroids(gdf)

Calculate the centroids of a (Multi)Polygon GeoDataFrame.

Parameters:

gdf : geopandas.GeoDataFrame GeoDataFrame containing (Multi)Polygon geometries.

Returns:

geopandas.GeoDataFrame A new GeoDataFrame with Point geometries representing the centroids.

Raises:

ValueError If the input GeoDataFrame does not contain (Multi)Polygon geometries.

Source code in gigaspatial/processing/geo.py
def get_centroids(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Calculate the centroids of a (Multi)Polygon GeoDataFrame.

    Parameters:
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing (Multi)Polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with Point geometries representing the centroids.

    Raises:
    ------
    ValueError
        If the input GeoDataFrame does not contain (Multi)Polygon geometries.
    """
    # Validate input geometries
    if not all(gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "Input GeoDataFrame must contain only Polygon or MultiPolygon geometries."
        )

    # Calculate centroids
    centroids = gdf.copy()
    centroids["geometry"] = centroids.geometry.centroid

    return centroids

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

sat_images

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

tif_processor

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.

Source code in gigaspatial/processing/tif_processor.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, RGBA, and multi-band data.
    """

    dataset_path: Union[Path, str]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba", "multi"] = "single"

    def __post_init__(self):
        """Validate inputs and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(self.__class__.__name__)
        self._cache = {}

        if not self.data_store.file_exists(self.dataset_path):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        self._load_metadata()

        # Validate mode and band count
        if self.mode == "rgba" and self.count != 4:
            raise ValueError("RGBA mode requires a 4-band TIF file")
        if self.mode == "rgb" and self.count != 3:
            raise ValueError("RGB mode requires a 3-band TIF file")
        if self.mode == "single" and self.count != 1:
            raise ValueError("Single mode requires a 1-band TIF file")
        if self.mode == "multi" and self.count < 2:
            raise ValueError("Multi mode requires a TIF file with 2 or more bands")

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset"""
        with self.data_store.open(self.dataset_path, "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        if not self._cache:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def tabular(self) -> pd.DataFrame:
        """Get the data from the TIF file"""
        if not hasattr(self, "_tabular"):
            try:
                if self.mode == "single":
                    self._tabular = self._to_band_dataframe(
                        drop_nodata=True, drop_values=[]
                    )
                elif self.mode == "rgb":
                    self._tabular = self._to_rgb_dataframe(drop_nodata=True)
                elif self.mode == "rgba":
                    self._tabular = self._to_rgba_dataframe(drop_transparent=True)
                elif self.mode == "multi":
                    self._tabular = self._to_multi_band_dataframe(
                        drop_nodata=True,
                        drop_values=[],
                        band_names=None,  # Use default band naming
                    )
                else:
                    raise ValueError(
                        f"Invalid mode: {self.mode}. Must be one of: single, rgb, rgba, multi"
                    )
            except Exception as e:
                raise ValueError(
                    f"Failed to process TIF file in mode '{self.mode}'. "
                    f"Please ensure the file is valid and matches the selected mode. "
                    f"Original error: {str(e)}"
                )

        return self._tabular

    def to_dataframe(self) -> pd.DataFrame:
        return self.tabular

    def get_zoned_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.
        """
        self.logger.info("Converting data to GeoDataFrame with zones...")

        df = self.tabular

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        self.logger.info("Conversion to GeoDataFrame complete!")
        return gdf

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]]
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            else:
                if src.count != 1:
                    raise ValueError("Single band mode requires a 1-band TIF file")
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self, polygon_list: List[Union[Polygon, MultiPolygon]], stat: str = "mean"
    ) -> np.ndarray:
        """
        Sample raster values within each polygon of a GeoDataFrame.

        Parameters:
            polygon_list: List of polygon geometries (can include MultiPolygons).
            stat (str): Aggregation statistic to compute within each polygon.
                        Options: "mean", "median", "sum", "min", "max".
        Returns:
            A NumPy array of sampled values
        """
        self.logger.info("Sampling raster values within polygons...")

        with self.open_dataset() as src:
            results = []

            for geom in polygon_list:
                if geom.is_empty:
                    results.append(np.nan)
                    continue

                try:
                    # Mask the raster with the polygon
                    out_image, _ = mask(src, [geom], crop=True)

                    # Flatten the raster values and remove NoData values
                    values = out_image[out_image != src.nodata].flatten()

                    # Compute the desired statistic
                    if len(values) == 0:
                        results.append(np.nan)
                    else:
                        if stat == "mean":
                            results.append(np.mean(values))
                        elif stat == "median":
                            results.append(np.median(values))
                        elif stat == "sum":
                            results.append(np.sum(values))
                        elif stat == "min":
                            results.append(np.min(values))
                        elif stat == "max":
                            results.append(np.max(values))
                        else:
                            raise ValueError(f"Unknown statistic: {stat}")

                except Exception as e:
                    self.logger.error(f"Error processing polygon: {e}")
                    results.append(np.nan)

        return np.array(results)

    def _to_rgba_dataframe(self, drop_transparent: bool = False) -> pd.DataFrame:
        """
        Convert RGBA TIF to DataFrame with separate columns for R, G, B, A values.
        """
        self.logger.info("Processing RGBA dataset...")

        with self.open_dataset() as src:
            if self.count != 4:
                raise ValueError("RGBA mode requires a 4-band TIF file")

            # Read all four bands
            red, green, blue, alpha = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_transparent:
                mask = alpha > 0
                red = np.extract(mask, red)
                green = np.extract(mask, green)
                blue = np.extract(mask, blue)
                alpha = np.extract(mask, alpha)
                lons = np.extract(mask, x_coords)
                lats = np.extract(mask, y_coords)
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()
                alpha = alpha.flatten()

            # Create DataFrame with RGBA values
            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                    "alpha": alpha,
                }
            )

            # Normalize alpha values if they're not in [0, 1] range
            if data["alpha"].max() > 1:
                data["alpha"] = data["alpha"] / data["alpha"].max()

        self.logger.info("RGBA dataset is processed!")
        return data

    def _to_rgb_dataframe(self, drop_nodata: bool = True) -> pd.DataFrame:
        """Convert RGB TIF to DataFrame with separate columns for R, G, B values."""
        if self.mode != "rgb":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing RGB dataset...")

        with self.open_dataset() as src:
            if self.count != 3:
                raise ValueError("RGB mode requires a 3-band TIF file")

            # Read all three bands
            red, green, blue = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    mask = ~(
                        (red == nodata_value)
                        | (green == nodata_value)
                        | (blue == nodata_value)
                    )
                    red = np.extract(mask, red)
                    green = np.extract(mask, green)
                    blue = np.extract(mask, blue)
                    lons = np.extract(mask, x_coords)
                    lats = np.extract(mask, y_coords)
                else:
                    lons = x_coords.flatten()
                    lats = y_coords.flatten()
                    red = red.flatten()
                    green = green.flatten()
                    blue = blue.flatten()
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()

            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                }
            )

        self.logger.info("RGB dataset is processed!")
        return data

    def _to_band_dataframe(
        self, band_number: int = 1, drop_nodata: bool = True, drop_values: list = []
    ) -> pd.DataFrame:
        """Process single-band TIF to DataFrame."""
        if self.mode != "single":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing single-band dataset...")

        if band_number <= 0 or band_number > self.count:
            self.logger.error(
                f"Error: Band number {band_number} is out of range. The file has {self.count} bands."
            )
            return None

        with self.open_dataset() as src:

            band = src.read(band_number)

            x_coords, y_coords = self._get_pixel_coordinates()

            values_to_mask = []
            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    values_to_mask.append(nodata_value)

            if drop_values:
                values_to_mask.extend(drop_values)

            if values_to_mask:
                data_mask = ~np.isin(band, values_to_mask)
                pixel_values = np.extract(data_mask, band)
                lons = np.extract(data_mask, x_coords)
                lats = np.extract(data_mask, y_coords)
            else:
                pixel_values = band.flatten()
                lons = x_coords.flatten()
                lats = y_coords.flatten()

            data = pd.DataFrame({"lon": lons, "lat": lats, "pixel_value": pixel_values})

        self.logger.info("Dataset is processed!")
        return data

    def _to_multi_band_dataframe(
        self,
        drop_nodata: bool = True,
        drop_values: list = [],
        band_names: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """
        Process multi-band TIF to DataFrame with all bands included.

        Args:
            drop_nodata (bool): Whether to drop nodata values. Defaults to True.
            drop_values (list): Additional values to drop from the dataset. Defaults to empty list.
            band_names (Optional[List[str]]): Custom names for the bands. If None, bands will be named using
                                            the band descriptions from the GeoTIFF metadata if available,
                                            otherwise 'band_1', 'band_2', etc.

        Returns:
            pd.DataFrame: DataFrame containing coordinates and all band values
        """
        self.logger.info("Processing multi-band dataset...")

        with self.open_dataset() as src:
            # Read all bands
            stack = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            # Initialize dictionary with coordinates
            data_dict = {"lon": x_coords.flatten(), "lat": y_coords.flatten()}

            # Get band descriptions from metadata if available
            if band_names is None and hasattr(src, "descriptions") and src.descriptions:
                band_names = [
                    desc if desc else f"band_{i+1}"
                    for i, desc in enumerate(src.descriptions)
                ]

            # Process each band
            for band_idx in range(self.count):
                band_data = stack[band_idx]

                # Handle nodata and other values to drop
                if drop_nodata or drop_values:
                    values_to_mask = []
                    if drop_nodata and src.nodata is not None:
                        values_to_mask.append(src.nodata)
                    if drop_values:
                        values_to_mask.extend(drop_values)

                    if values_to_mask:
                        data_mask = ~np.isin(band_data, values_to_mask)
                        band_values = np.extract(data_mask, band_data)
                        if band_idx == 0:  # Only need to mask coordinates once
                            data_dict["lon"] = np.extract(data_mask, x_coords)
                            data_dict["lat"] = np.extract(data_mask, y_coords)
                    else:
                        band_values = band_data.flatten()
                else:
                    band_values = band_data.flatten()

                # Use custom band names if provided, otherwise use descriptions or default naming
                band_name = (
                    band_names[band_idx]
                    if band_names and len(band_names) > band_idx
                    else f"band_{band_idx + 1}"
                )
                data_dict[band_name] = band_values

        self.logger.info("Multi-band dataset is processed!")
        return pd.DataFrame(data_dict)

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]
bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

tabular: pd.DataFrame property

Get the data from the TIF file

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__post_init__()

Validate inputs and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(self.__class__.__name__)
    self._cache = {}

    if not self.data_store.file_exists(self.dataset_path):
        raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

    self._load_metadata()

    # Validate mode and band count
    if self.mode == "rgba" and self.count != 4:
        raise ValueError("RGBA mode requires a 4-band TIF file")
    if self.mode == "rgb" and self.count != 3:
        raise ValueError("RGB mode requires a 3-band TIF file")
    if self.mode == "single" and self.count != 1:
        raise ValueError("Single mode requires a 1-band TIF file")
    if self.mode == "multi" and self.count < 2:
        raise ValueError("Multi mode requires a TIF file with 2 or more bands")
get_zoned_geodataframe()

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Source code in gigaspatial/processing/tif_processor.py
def get_zoned_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.
    """
    self.logger.info("Converting data to GeoDataFrame with zones...")

    df = self.tabular

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    self.logger.info("Conversion to GeoDataFrame complete!")
    return gdf
open_dataset()

Context manager for accessing the dataset

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset"""
    with self.data_store.open(self.dataset_path, "rb") as f:
        with rasterio.MemoryFile(f.read()) as memfile:
            with memfile.open() as src:
                yield src
sample_by_polygons(polygon_list, stat='mean')

Sample raster values within each polygon of a GeoDataFrame.

Parameters:

Name Type Description Default
polygon_list List[Union[Polygon, MultiPolygon]]

List of polygon geometries (can include MultiPolygons).

required
stat str

Aggregation statistic to compute within each polygon. Options: "mean", "median", "sum", "min", "max".

'mean'
Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self, polygon_list: List[Union[Polygon, MultiPolygon]], stat: str = "mean"
) -> np.ndarray:
    """
    Sample raster values within each polygon of a GeoDataFrame.

    Parameters:
        polygon_list: List of polygon geometries (can include MultiPolygons).
        stat (str): Aggregation statistic to compute within each polygon.
                    Options: "mean", "median", "sum", "min", "max".
    Returns:
        A NumPy array of sampled values
    """
    self.logger.info("Sampling raster values within polygons...")

    with self.open_dataset() as src:
        results = []

        for geom in polygon_list:
            if geom.is_empty:
                results.append(np.nan)
                continue

            try:
                # Mask the raster with the polygon
                out_image, _ = mask(src, [geom], crop=True)

                # Flatten the raster values and remove NoData values
                values = out_image[out_image != src.nodata].flatten()

                # Compute the desired statistic
                if len(values) == 0:
                    results.append(np.nan)
                else:
                    if stat == "mean":
                        results.append(np.mean(values))
                    elif stat == "median":
                        results.append(np.median(values))
                    elif stat == "sum":
                        results.append(np.sum(values))
                    elif stat == "min":
                        results.append(np.min(values))
                    elif stat == "max":
                        results.append(np.max(values))
                    else:
                        raise ValueError(f"Unknown statistic: {stat}")

            except Exception as e:
                self.logger.error(f"Error processing polygon: {e}")
                results.append(np.nan)

    return np.array(results)

sample_multiple_tifs_by_coordinates(tif_processors, coordinate_list)

Sample raster values from multiple TIFF files for given coordinates.

Parameters: - tif_processors: List of TifProcessor instances. - coordinate_list: List of (x, y) coordinates.

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_coordinates(
    tif_processors: List[TifProcessor], coordinate_list: List[Tuple[float, float]]
):
    """
    Sample raster values from multiple TIFF files for given coordinates.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - coordinate_list: List of (x, y) coordinates.

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(coordinate_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_coordinates(coordinate_list=coordinate_list)

        if tp.nodata is not None:
            mask = (np.isnan(sampled_values)) & (
                values != tp.nodata
            )  # Replace only NaNs
        else:
            mask = np.isnan(sampled_values)  # No explicit nodata, replace all NaNs

        sampled_values[mask] = values[mask]  # Update only missing values

    return sampled_values

sample_multiple_tifs_by_polygons(tif_processors, polygon_list, stat='mean')

Sample raster values from multiple TIFF files for polygons in a list and join the results.

Parameters: - tif_processors: List of TifProcessor instances. - polygon_list: List of polygon geometries (can include MultiPolygons). - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_polygons(
    tif_processors: List[TifProcessor],
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: str = "mean",
) -> np.ndarray:
    """
    Sample raster values from multiple TIFF files for polygons in a list and join the results.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - polygon_list: List of polygon geometries (can include MultiPolygons).
    - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(polygon_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_polygons(polygon_list=polygon_list, stat=stat)

        mask = np.isnan(sampled_values)  # replace all NaNs

        sampled_values[mask] = values[mask]  # Update only values with samapled value

    return sampled_values

utils

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df