Skip to content

Processing Module

gigaspatial.processing

geo

add_spatial_jitter(df, columns=['latitude', 'longitude'], amount=0.0001, seed=None, copy=True)

Add random jitter to duplicated geographic coordinates to create slight separation between overlapping points.

Parameters:

df : pandas.DataFrame DataFrame containing geographic coordinates. columns : list of str, optional Column names containing coordinates to jitter. Default is ['latitude', 'longitude']. amount : float or dict, optional Amount of jitter to add. If float, same amount used for all columns. If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}. Default is 0.0001 (approximately 11 meters at the equator). seed : int, optional Random seed for reproducibility. Default is None. copy : bool, optional Whether to create a copy of the input DataFrame. Default is True.

Returns:

pandas.DataFrame DataFrame with jittered coordinates for previously duplicated points.

Raises:

ValueError If columns don't exist or jitter amount is invalid. TypeError If input types are incorrect.

Source code in gigaspatial/processing/geo.py
def add_spatial_jitter(
    df: pd.DataFrame,
    columns: List[str] = ["latitude", "longitude"],
    amount: float = 0.0001,
    seed=None,
    copy=True,
) -> pd.DataFrame:
    """
    Add random jitter to duplicated geographic coordinates to create slight separation
    between overlapping points.

    Parameters:
    ----------
    df : pandas.DataFrame
        DataFrame containing geographic coordinates.
    columns : list of str, optional
        Column names containing coordinates to jitter. Default is ['latitude', 'longitude'].
    amount : float or dict, optional
        Amount of jitter to add. If float, same amount used for all columns.
        If dict, specify amount per column, e.g., {'lat': 0.0001, 'lon': 0.0002}.
        Default is 0.0001 (approximately 11 meters at the equator).
    seed : int, optional
        Random seed for reproducibility. Default is None.
    copy : bool, optional
        Whether to create a copy of the input DataFrame. Default is True.

    Returns:
    -------
    pandas.DataFrame
        DataFrame with jittered coordinates for previously duplicated points.

    Raises:
    ------
    ValueError
        If columns don't exist or jitter amount is invalid.
    TypeError
        If input types are incorrect.
    """

    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not all(col in df.columns for col in columns):
        raise ValueError(f"Not all columns {columns} found in DataFrame")

    # Handle jitter amounts
    if isinstance(amount, (int, float)):
        if amount <= 0:
            raise ValueError("Jitter amount must be positive")
        jitter_amounts = {col: amount for col in columns}
    elif isinstance(amount, dict):
        if not all(col in amount for col in columns):
            raise ValueError("Must specify jitter amount for each column")
        if not all(amt > 0 for amt in amount.values()):
            raise ValueError("All jitter amounts must be positive")
        jitter_amounts = amount
    else:
        raise TypeError("amount must be a number or dictionary")

    # Create copy if requested
    df_work = df.copy() if copy else df

    # Set random seed if provided
    if seed is not None:
        np.random.seed(seed)

    try:
        # Find duplicated coordinates
        duplicate_mask = df_work.duplicated(subset=columns, keep=False)
        n_duplicates = duplicate_mask.sum()

        if n_duplicates > 0:
            # Add jitter to each column separately
            for col in columns:
                jitter = np.random.uniform(
                    low=-jitter_amounts[col],
                    high=jitter_amounts[col],
                    size=n_duplicates,
                )
                df_work.loc[duplicate_mask, col] += jitter

            # Validate results (ensure no remaining duplicates)
            if df_work.duplicated(subset=columns, keep=False).any():
                # If duplicates remain, recursively add more jitter
                df_work = add_spatial_jitter(
                    df_work,
                    columns=columns,
                    amount={col: amt * 2 for col, amt in jitter_amounts.items()},
                    seed=seed,
                    copy=False,
                )

        return df_work

    except Exception as e:
        raise RuntimeError(f"Error during jittering operation: {str(e)}")

annotate_with_admin_regions(gdf, country_code, data_store=None, admin_id_column_suffix='_giga')

Annotate a GeoDataFrame with administrative region information.

Performs a spatial join between the input points and administrative boundaries at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

Parameters:

Name Type Description Default
gdf GeoDataFrame

GeoDataFrame containing points to annotate

required
country_code str

Country code for administrative boundaries

required
data_store Optional[DataStore]

Optional DataStore for loading admin boundary data

None

Returns:

Type Description
GeoDataFrame

GeoDataFrame with added administrative region columns

Source code in gigaspatial/processing/geo.py
def annotate_with_admin_regions(
    gdf: gpd.GeoDataFrame,
    country_code: str,
    data_store: Optional[DataStore] = None,
    admin_id_column_suffix="_giga",
) -> gpd.GeoDataFrame:
    """
    Annotate a GeoDataFrame with administrative region information.

    Performs a spatial join between the input points and administrative boundaries
    at levels 1 and 2, resolving conflicts when points intersect multiple admin regions.

    Args:
        gdf: GeoDataFrame containing points to annotate
        country_code: Country code for administrative boundaries
        data_store: Optional DataStore for loading admin boundary data

    Returns:
        GeoDataFrame with added administrative region columns
    """

    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("gdf must be a GeoDataFrame")

    if gdf.empty:
        LOGGER.warning("Empty GeoDataFrame provided, returning as-is")
        return gdf

    # read country admin data
    admin1_data = AdminBoundaries.create(
        country_code=country_code, admin_level=1, data_store=data_store
    ).to_geodataframe()

    admin1_data.rename(
        columns={"id": f"admin1_id{admin_id_column_suffix}", "name": "admin1"},
        inplace=True,
    )
    admin1_data.drop(columns=["name_en", "parent_id", "country_code"], inplace=True)

    admin2_data = AdminBoundaries.create(
        country_code=country_code, admin_level=2, data_store=data_store
    ).to_geodataframe()

    admin2_data.rename(
        columns={
            "id": f"admin2_id{admin_id_column_suffix}",
            "parent_id": f"admin1_id{admin_id_column_suffix}",
            "name": "admin2",
        },
        inplace=True,
    )
    admin2_data.drop(columns=["name_en", "country_code"], inplace=True)

    # Join dataframes based on 'admin1_id_giga'
    admin_data = admin2_data.merge(
        admin1_data[[f"admin1_id{admin_id_column_suffix}", "admin1", "geometry"]],
        left_on=f"admin1_id{admin_id_column_suffix}",
        right_on=f"admin1_id{admin_id_column_suffix}",
        how="outer",
    )

    admin_data["geometry"] = admin_data.apply(
        lambda x: x.geometry_x if x.geometry_x else x.geometry_y, axis=1
    )

    admin_data = gpd.GeoDataFrame(
        admin_data.drop(columns=["geometry_x", "geometry_y"]),
        geometry="geometry",
        crs=4326,
    )

    admin_data["admin2"].fillna("Unknown", inplace=True)
    admin_data[f"admin2_id{admin_id_column_suffix}"] = admin_data[
        f"admin2_id{admin_id_column_suffix}"
    ].replace({np.nan: None})

    if gdf.crs is None:
        LOGGER.warning("Input GeoDataFrame has no CRS, assuming EPSG:4326")
        gdf.set_crs(epsg=4326, inplace=True)
    elif gdf.crs != "EPSG:4326":
        LOGGER.info(f"Reprojecting from {gdf.crs} to EPSG:4326")
        gdf = gdf.to_crs(epsg=4326)

    # spatial join gdf to admins
    gdf_w_admins = gdf.copy().sjoin(
        admin_data,
        how="left",
        predicate="intersects",
    )

    # Check for duplicates caused by points intersecting multiple polygons
    if len(gdf_w_admins) != len(gdf):
        LOGGER.warning(
            "Some points intersect multiple administrative boundaries. Resolving conflicts..."
        )

        # Group by original index and select the closest admin area for ties
        gdf_w_admins["distance"] = gdf_w_admins.apply(
            lambda row: row.geometry.distance(
                admin_data.loc[row.index_right, "geometry"].centroid
            ),
            axis=1,
        )

        # For points with multiple matches, keep the closest polygon
        gdf_w_admins = gdf_w_admins.loc[
            gdf_w_admins.groupby(gdf.index)["distance"].idxmin()
        ].drop(columns="distance")

    # Drop unnecessary columns and reset the index
    gdf_w_admins = gdf_w_admins.drop(columns="index_right").reset_index(drop=True)

    return gdf_w_admins

buffer_geodataframe(gdf, buffer_distance_meters, cap_style='round', copy=True)

Buffers a GeoDataFrame with a given buffer distance in meters.

  • gdf : geopandas.GeoDataFrame The GeoDataFrame to be buffered.
  • buffer_distance_meters : float The buffer distance in meters.
  • cap_style : str, optional The style of caps. round, flat, square. Default is round.
  • geopandas.GeoDataFrame The buffered GeoDataFrame.
Source code in gigaspatial/processing/geo.py
def buffer_geodataframe(
    gdf: gpd.GeoDataFrame,
    buffer_distance_meters: float,
    cap_style: Literal["round", "square", "flat"] = "round",
    copy=True,
) -> gpd.GeoDataFrame:
    """
    Buffers a GeoDataFrame with a given buffer distance in meters.

    Parameters:
    - gdf : geopandas.GeoDataFrame
        The GeoDataFrame to be buffered.
    - buffer_distance_meters : float
        The buffer distance in meters.
    - cap_style : str, optional
        The style of caps. round, flat, square. Default is round.

    Returns:
    - geopandas.GeoDataFrame
        The buffered GeoDataFrame.
    """

    # Input validation
    if not isinstance(gdf, gpd.GeoDataFrame):
        raise TypeError("Input must be a GeoDataFrame")

    if not isinstance(buffer_distance_meters, (float, int)):
        raise TypeError("Buffer distance must be a number")

    if cap_style not in ["round", "square", "flat"]:
        raise ValueError("cap_style must be round, flat or square.")

    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame must have a defined CRS")

    # Create a copy if requested
    gdf_work = gdf.copy() if copy else gdf

    # Store input CRS
    input_crs = gdf_work.crs

    try:
        # Create a custom UTM CRS based on the calculated UTM zone
        utm_crs = gdf_work.estimate_utm_crs()

        # Transform to UTM, create buffer, and transform back
        gdf_work = gdf_work.to_crs(utm_crs)
        gdf_work["geometry"] = gdf_work["geometry"].buffer(
            buffer_distance_meters, cap_style=cap_style
        )
        gdf_work = gdf_work.to_crs(input_crs)

        return gdf_work

    except Exception as e:
        raise RuntimeError(f"Error during buffering operation: {str(e)}")

convert_to_geodataframe(data, lat_col=None, lon_col=None, crs='EPSG:4326')

Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns or from a WKT geometry column.

Parameters:

data : pandas.DataFrame Input DataFrame containing either lat/lon columns or a geometry column. lat_col : str, optional Name of the latitude column. Default is 'lat'. lon_col : str, optional Name of the longitude column. Default is 'lon'. crs : str or pyproj.CRS, optional Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

Returns:

geopandas.GeoDataFrame A GeoDataFrame containing the input data with a geometry column.

Raises:

TypeError If input is not a pandas DataFrame. ValueError If required columns are missing or contain invalid data.

Source code in gigaspatial/processing/geo.py
def convert_to_geodataframe(
    data: pd.DataFrame, lat_col: str = None, lon_col: str = None, crs="EPSG:4326"
) -> gpd.GeoDataFrame:
    """
    Convert a pandas DataFrame to a GeoDataFrame, either from latitude/longitude columns
    or from a WKT geometry column.

    Parameters:
    ----------
    data : pandas.DataFrame
        Input DataFrame containing either lat/lon columns or a geometry column.
    lat_col : str, optional
        Name of the latitude column. Default is 'lat'.
    lon_col : str, optional
        Name of the longitude column. Default is 'lon'.
    crs : str or pyproj.CRS, optional
        Coordinate Reference System of the geometry data. Default is 'EPSG:4326'.

    Returns:
    -------
    geopandas.GeoDataFrame
        A GeoDataFrame containing the input data with a geometry column.

    Raises:
    ------
    TypeError
        If input is not a pandas DataFrame.
    ValueError
        If required columns are missing or contain invalid data.
    """

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input 'data' must be a pandas DataFrame")

    # Create a copy to avoid modifying the input
    df = data.copy()

    try:
        if "geometry" not in df.columns:
            # If column names not provided, try to detect them
            if lat_col is None or lon_col is None:
                try:
                    detected_lat, detected_lon = detect_coordinate_columns(df)
                    lat_col = lat_col or detected_lat
                    lon_col = lon_col or detected_lon
                except ValueError as e:
                    raise ValueError(
                        f"Could not automatically detect coordinate columns and no "
                        f"'geometry' column found. Error: {str(e)}"
                    )

            # Validate latitude/longitude columns exist
            if lat_col not in df.columns or lon_col not in df.columns:
                raise ValueError(
                    f"Could not find columns: {lat_col} and/or {lon_col} in the DataFrame"
                )

            # Check for missing values
            if df[lat_col].isna().any() or df[lon_col].isna().any():
                raise ValueError(
                    f"Missing values found in {lat_col} and/or {lon_col} columns"
                )

            # Create geometry from lat/lon
            geometry = gpd.points_from_xy(x=df[lon_col], y=df[lat_col])

        else:
            # Check if geometry column already contains valid geometries
            if df["geometry"].apply(lambda x: isinstance(x, base.BaseGeometry)).all():
                geometry = df["geometry"]
            elif df["geometry"].apply(lambda x: isinstance(x, str)).all():
                # Convert WKT strings to geometry objects
                geometry = df["geometry"].apply(wkt.loads)
            else:
                raise ValueError(
                    "Invalid geometry format: contains mixed or unsupported types"
                )

        # drop the WKT column if conversion was done
        if (
            "geometry" in df.columns
            and not df["geometry"]
            .apply(lambda x: isinstance(x, base.BaseGeometry))
            .all()
        ):
            df = df.drop(columns=["geometry"])

        return gpd.GeoDataFrame(df, geometry=geometry, crs=crs)

    except Exception as e:
        raise RuntimeError(f"Error converting to GeoDataFrame: {str(e)}")

count_points_within_polygons(base_gdf, count_gdf, base_gdf_key)

Counts the number of points from count_gdf that fall within each polygon in base_gdf.

Parameters:

base_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries to count points within. count_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to be counted. base_gdf_key : str Column name in base_gdf to use as the key for grouping and merging.

Returns:

geopandas.GeoDataFrame The base_gdf with an additional column containing the count of points within each polygon.

Raises:

ValueError If base_gdf_key is missing in base_gdf.

Source code in gigaspatial/processing/geo.py
def count_points_within_polygons(base_gdf, count_gdf, base_gdf_key):
    """
    Counts the number of points from `count_gdf` that fall within each polygon in `base_gdf`.

    Parameters:
    ----------
    base_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries to count points within.
    count_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to be counted.
    base_gdf_key : str
        Column name in `base_gdf` to use as the key for grouping and merging.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_gdf` with an additional column containing the count of points within each polygon.

    Raises:
    ------
    ValueError
        If `base_gdf_key` is missing in `base_gdf`.
    """
    # Validate inputs
    if base_gdf_key not in base_gdf.columns:
        raise ValueError(f"Key column '{base_gdf_key}' not found in `base_gdf`.")

    # Ensure clean index for the join
    count_gdf = count_gdf.reset_index()

    # Spatial join: Find points intersecting polygons
    joined_gdf = gpd.sjoin(
        base_gdf, count_gdf[["geometry"]], how="left", predicate="intersects"
    )

    # Count points grouped by the base_gdf_key
    point_counts = joined_gdf.groupby(base_gdf_key)["index_right"].count()
    point_counts.name = "point_count"

    # Merge point counts back to the base GeoDataFrame
    result_gdf = base_gdf.merge(point_counts, on=base_gdf_key, how="left")

    # Fill NaN counts with 0 for polygons with no points
    result_gdf["point_count"] = result_gdf["point_count"].fillna(0).astype(int)

    return result_gdf

detect_coordinate_columns(data, lat_keywords=None, lon_keywords=None, case_sensitive=False)

Detect latitude and longitude columns in a DataFrame using keyword matching.

Parameters:

data : pandas.DataFrame DataFrame to search for coordinate columns. lat_keywords : list of str, optional Keywords for identifying latitude columns. If None, uses default keywords. lon_keywords : list of str, optional Keywords for identifying longitude columns. If None, uses default keywords. case_sensitive : bool, optional Whether to perform case-sensitive matching. Default is False.

Returns:

tuple[str, str] Names of detected (latitude, longitude) columns.

Raises:

ValueError If no unique pair of latitude/longitude columns can be found. TypeError If input data is not a pandas DataFrame.

Source code in gigaspatial/processing/geo.py
def detect_coordinate_columns(
    data, lat_keywords=None, lon_keywords=None, case_sensitive=False
) -> Tuple[str, str]:
    """
    Detect latitude and longitude columns in a DataFrame using keyword matching.

    Parameters:
    ----------
    data : pandas.DataFrame
        DataFrame to search for coordinate columns.
    lat_keywords : list of str, optional
        Keywords for identifying latitude columns. If None, uses default keywords.
    lon_keywords : list of str, optional
        Keywords for identifying longitude columns. If None, uses default keywords.
    case_sensitive : bool, optional
        Whether to perform case-sensitive matching. Default is False.

    Returns:
    -------
    tuple[str, str]
        Names of detected (latitude, longitude) columns.

    Raises:
    ------
    ValueError
        If no unique pair of latitude/longitude columns can be found.
    TypeError
        If input data is not a pandas DataFrame.
    """

    # Default keywords if none provided
    default_lat = [
        "latitude",
        "lat",
        "y",
        "lat_",
        "lat(s)",
        "_lat",
        "ylat",
        "latitude_y",
    ]
    default_lon = [
        "longitude",
        "lon",
        "long",
        "x",
        "lon_",
        "lon(e)",
        "long(e)",
        "_lon",
        "xlon",
        "longitude_x",
    ]

    lat_keywords = lat_keywords or default_lat
    lon_keywords = lon_keywords or default_lon

    # Input validation
    if not isinstance(data, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    if not data.columns.is_unique:
        raise ValueError("DataFrame contains duplicate column names")

    def create_pattern(keywords):
        """Create regex pattern from keywords."""
        return "|".join(rf"\b{re.escape(keyword)}\b" for keyword in keywords)

    def find_matching_columns(columns, pattern, case_sensitive) -> List:
        """Find columns matching the pattern."""
        flags = 0 if case_sensitive else re.IGNORECASE
        return [col for col in columns if re.search(pattern, col, flags=flags)]

    try:
        # Create patterns
        lat_pattern = create_pattern(lat_keywords)
        lon_pattern = create_pattern(lon_keywords)

        # Find matching columns
        lat_cols = find_matching_columns(data.columns, lat_pattern, case_sensitive)
        lon_cols = find_matching_columns(data.columns, lon_pattern, case_sensitive)

        # Remove any longitude matches from latitude columns and vice versa
        lat_cols = [col for col in lat_cols if col not in lon_cols]
        lon_cols = [col for col in lon_cols if col not in lat_cols]

        # Detailed error messages based on what was found
        if not lat_cols and not lon_cols:
            columns_list = "\n".join(f"- {col}" for col in data.columns)
            raise ValueError(
                f"No latitude or longitude columns found. Available columns are:\n{columns_list}\n"
                f"Consider adding more keywords or checking column names."
            )

        if not lat_cols:
            found_lons = ", ".join(lon_cols)
            raise ValueError(
                f"Found longitude columns ({found_lons}) but no latitude columns. "
                "Check latitude keywords or column names."
            )

        if not lon_cols:
            found_lats = ", ".join(lat_cols)
            raise ValueError(
                f"Found latitude columns ({found_lats}) but no longitude columns. "
                "Check longitude keywords or column names."
            )

        if len(lat_cols) > 1 or len(lon_cols) > 1:
            raise ValueError(
                f"Multiple possible coordinate columns found:\n"
                f"Latitude candidates: {lat_cols}\n"
                f"Longitude candidates: {lon_cols}\n"
                "Please specify more precise keywords."
            )

        return lat_cols[0], lon_cols[0]

    except Exception as e:
        if isinstance(e, ValueError):
            raise
        raise RuntimeError(f"Error detecting coordinate columns: {str(e)}")

map_points_within_polygons(base_points_gdf, polygon_gdf)

Maps whether each point in base_points_gdf is within any polygon in polygon_gdf.

Parameters:

base_points_gdf : geopandas.GeoDataFrame GeoDataFrame containing point geometries to check. polygon_gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries.

Returns:

geopandas.GeoDataFrame The base_points_gdf with an additional column is_within (True/False).

Raises:

ValueError If the geometries in either GeoDataFrame are invalid or not of the expected type.

Source code in gigaspatial/processing/geo.py
def map_points_within_polygons(base_points_gdf, polygon_gdf):
    """
    Maps whether each point in `base_points_gdf` is within any polygon in `polygon_gdf`.

    Parameters:
    ----------
    base_points_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing point geometries to check.
    polygon_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing polygon geometries.

    Returns:
    -------
    geopandas.GeoDataFrame
        The `base_points_gdf` with an additional column `is_within` (True/False).

    Raises:
    ------
    ValueError
        If the geometries in either GeoDataFrame are invalid or not of the expected type.
    """
    # Validate input GeoDataFrames
    if not all(base_points_gdf.geometry.geom_type == "Point"):
        raise ValueError("`base_points_gdf` must contain only Point geometries.")
    if not all(polygon_gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"])):
        raise ValueError(
            "`polygon_gdf` must contain only Polygon or MultiPolygon geometries."
        )

    if not base_points_gdf.crs == polygon_gdf.crs:
        raise ValueError("CRS of `base_points_gdf` and `polygon_gdf` must match.")

    # Perform spatial join to check if points fall within any polygon
    joined_gdf = gpd.sjoin(
        base_points_gdf, polygon_gdf[["geometry"]], how="left", predicate="within"
    )

    # Add `is_within` column to base_points_gdf
    base_points_gdf["is_within"] = base_points_gdf.index.isin(
        set(joined_gdf.index[~joined_gdf.index_right.isna()])
    )

    return base_points_gdf

overlay_aggregate_polygon_data(base_gdf, overlay_gdf, overlay_columns, base_gdf_key, overlay_gdf_key=None, agg_func='sum')

Overlay polygon data and aggregate values over the base GeoDataFrame.

Parameters:

base_gdf : geopandas.GeoDataFrame GeoDataFrame representing the base geometries. overlay_gdf : geopandas.GeoDataFrame GeoDataFrame with polygon geometries to overlay and aggregate. overlay_columns : list of str Columns in overlay_gdf to aggregate based on overlapping areas. base_gdf_key : str Column in base_gdf to use as the key for aggregation. overlay_gdf_key : str, optional Column in overlay_gdf to use as the index for merging. Defaults to the overlay GeoDataFrame's index. agg_func : str, callable, or dict, default="sum" Aggregation function or dictionary of column-specific aggregation functions. Examples: "sum", "mean", "max", or {"column1": "mean", "column2": "sum"}.

Returns:

geopandas.GeoDataFrame Base GeoDataFrame with aggregated values from the overlay.

Raises:

ValueError If the overlay GeoDataFrame has duplicate index values or missing columns. RuntimeError If any geometry operations fail.

Source code in gigaspatial/processing/geo.py
def overlay_aggregate_polygon_data(
    base_gdf: gpd.GeoDataFrame,
    overlay_gdf: gpd.GeoDataFrame,
    overlay_columns: List[str],
    base_gdf_key: str,
    overlay_gdf_key: str = None,
    agg_func: str = "sum",
) -> gpd.GeoDataFrame:
    """
    Overlay polygon data and aggregate values over the base GeoDataFrame.

    Parameters:
    ----------
    base_gdf : geopandas.GeoDataFrame
        GeoDataFrame representing the base geometries.
    overlay_gdf : geopandas.GeoDataFrame
        GeoDataFrame with polygon geometries to overlay and aggregate.
    overlay_columns : list of str
        Columns in `overlay_gdf` to aggregate based on overlapping areas.
    base_gdf_key : str
        Column in `base_gdf` to use as the key for aggregation.
    overlay_gdf_key : str, optional
        Column in `overlay_gdf` to use as the index for merging. Defaults to the overlay GeoDataFrame's index.
    agg_func : str, callable, or dict, default="sum"
        Aggregation function or dictionary of column-specific aggregation functions.
        Examples: "sum", "mean", "max", or {"column1": "mean", "column2": "sum"}.

    Returns:
    -------
    geopandas.GeoDataFrame
        Base GeoDataFrame with aggregated values from the overlay.

    Raises:
    ------
    ValueError
        If the overlay GeoDataFrame has duplicate index values or missing columns.
    RuntimeError
        If any geometry operations fail.
    """

    # Validate inputs
    if not isinstance(base_gdf, gpd.GeoDataFrame) or not isinstance(
        overlay_gdf, gpd.GeoDataFrame
    ):
        raise TypeError("Both base_gdf and overlay_gdf must be GeoDataFrames.")
    if not set(overlay_columns).issubset(overlay_gdf.columns):
        missing_cols = set(overlay_columns) - set(overlay_gdf.columns)
        raise ValueError(f"Missing columns in overlay_gdf: {missing_cols}")
    if overlay_gdf.index.duplicated().any():
        raise ValueError("Overlay GeoDataFrame contains duplicate index values.")
    if base_gdf_key not in base_gdf.columns:
        raise ValueError(f"base_gdf_key '{base_gdf_key}' not found in base_gdf.")

    # Set index name for overlay_gdf
    index_name = (
        overlay_gdf_key if overlay_gdf_key else overlay_gdf.index.name or "index"
    )

    # Ensure geometries are valid
    overlay_gdf = overlay_gdf.copy()

    # Perform the spatial overlay
    try:
        gdf_overlayed = (
            overlay_gdf[overlay_columns + ["geometry"]]
            .reset_index()
            .overlay(base_gdf, how="intersection")
            .set_index(index_name)
        )
    except Exception as e:
        raise RuntimeError(f"Error during spatial overlay: {e}")

    # Compute UTM CRS for accurate area calculations
    overlay_utm_crs = overlay_gdf.estimate_utm_crs()

    # Calculate pixel areas in overlay_gdf
    overlay_gdf_utm = overlay_gdf.to_crs(overlay_utm_crs)
    pixel_area = overlay_gdf_utm.area
    pixel_area.name = "pixel_area"

    # Add pixel area to the overlayed data
    gdf_overlayed = gdf_overlayed.join(pixel_area, how="left")

    # Compute overlap fraction
    gdf_overlayed_utm = gdf_overlayed.to_crs(overlay_utm_crs)
    gdf_overlayed["overlap_fraction"] = (
        gdf_overlayed_utm.area / gdf_overlayed["pixel_area"]
    )

    # Adjust overlay column values by overlap fraction
    gdf_overlayed[overlay_columns] = gdf_overlayed[overlay_columns].mul(
        gdf_overlayed["overlap_fraction"], axis=0
    )

    # Validate and apply aggregation
    if isinstance(agg_func, str) or callable(agg_func):
        aggregation_dict = {col: agg_func for col in overlay_columns}
    elif isinstance(agg_func, dict):
        aggregation_dict = agg_func
        for col in overlay_columns:
            if col not in aggregation_dict:
                raise ValueError(f"Missing aggregation function for column: {col}")
    else:
        raise ValueError(
            "Invalid aggregation function. Must be a string, callable, or dictionary."
        )

    # Aggregate data
    aggregated = (
        gdf_overlayed[overlay_columns + [base_gdf_key]]
        .groupby(base_gdf_key)
        .agg(aggregation_dict)
        .reset_index()
    )

    # Aggregate overlay columns by the base GeoDataFrame key
    # aggregated = (
    #    gdf_overlayed.groupby(base_gdf_key)[overlay_columns].sum().reset_index()
    # )

    # Merge aggregated values back to the base GeoDataFrame
    result = base_gdf.merge(aggregated, on=base_gdf_key, how="left")

    return result

simplify_geometries(gdf, tolerance=0.01, preserve_topology=True, geometry_column='geometry')

Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

Parameters

gdf : geopandas.GeoDataFrame GeoDataFrame containing geometries to simplify. tolerance : float, optional Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01). preserve_topology : bool, optional Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True). geometry_column : str, optional Name of the column containing geometries (default is "geometry").

Returns

geopandas.GeoDataFrame A new GeoDataFrame with simplified geometries.

Raises

ValueError If the specified geometry column does not exist or contains invalid geometries. TypeError If the geometry column does not contain valid geometries.

Examples

Simplify geometries in a GeoDataFrame:

simplified_gdf = simplify_geometries(gdf, tolerance=0.05)

Source code in gigaspatial/processing/geo.py
def simplify_geometries(
    gdf: gpd.GeoDataFrame,
    tolerance: float = 0.01,
    preserve_topology: bool = True,
    geometry_column: str = "geometry",
) -> gpd.GeoDataFrame:
    """
    Simplify geometries in a GeoDataFrame to reduce file size and improve visualization performance.

    Parameters
    ----------
    gdf : geopandas.GeoDataFrame
        GeoDataFrame containing geometries to simplify.
    tolerance : float, optional
        Tolerance for simplification. Larger values simplify more but reduce detail (default is 0.01).
    preserve_topology : bool, optional
        Whether to preserve topology while simplifying. Preserving topology prevents invalid geometries (default is True).
    geometry_column : str, optional
        Name of the column containing geometries (default is "geometry").

    Returns
    -------
    geopandas.GeoDataFrame
        A new GeoDataFrame with simplified geometries.

    Raises
    ------
    ValueError
        If the specified geometry column does not exist or contains invalid geometries.
    TypeError
        If the geometry column does not contain valid geometries.

    Examples
    --------
    Simplify geometries in a GeoDataFrame:
    >>> simplified_gdf = simplify_geometries(gdf, tolerance=0.05)
    """

    # Check if the specified geometry column exists
    if geometry_column not in gdf.columns:
        raise ValueError(
            f"Geometry column '{geometry_column}' not found in the GeoDataFrame."
        )

    # Check if the specified column contains geometries
    if not gpd.GeoSeries(gdf[geometry_column]).is_valid.all():
        raise TypeError(
            f"Geometry column '{geometry_column}' contains invalid geometries."
        )

    # Simplify geometries (non-destructive)
    gdf_simplified = gdf.copy()
    gdf_simplified[geometry_column] = gdf_simplified[geometry_column].simplify(
        tolerance=tolerance, preserve_topology=preserve_topology
    )

    return gdf_simplified

sat_images

calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs='EPSG:3857')

Calculates the number of pixels required to cover a given bounding box around a geographic coordinate, given a resolution in meters per pixel.

Parameters:

Name Type Description Default
gdf

a geodataframe with Point geometries that are geographic coordinates

required
resolution float

Desired resolution (meters per pixel).

required
bbox_size float

Bounding box size in meters (default 300m x 300m).

300
crs str

Target projection (default is EPSG:3857).

'EPSG:3857'

Returns:

Name Type Description
int

Number of pixels per side (width and height).

Source code in gigaspatial/processing/sat_images.py
def calculate_pixels_at_location(gdf, resolution, bbox_size=300, crs="EPSG:3857"):
    """
    Calculates the number of pixels required to cover a given bounding box
    around a geographic coordinate, given a resolution in meters per pixel.

    Parameters:
        gdf: a geodataframe with Point geometries that are geographic coordinates
        resolution (float): Desired resolution (meters per pixel).
        bbox_size (float): Bounding box size in meters (default 300m x 300m).
        crs (str): Target projection (default is EPSG:3857).

    Returns:
        int: Number of pixels per side (width and height).
    """

    # Calculate avg lat and lon
    lon = gdf.geometry.x.mean()
    lat = gdf.geometry.y.mean()

    # Define projections
    wgs84 = pyproj.CRS("EPSG:4326")  # Geographic coordinate system
    mercator = pyproj.CRS(crs)  # Target CRS (EPSG:3857)

    # Transform the center coordinate to EPSG:3857
    transformer = pyproj.Transformer.from_crs(wgs84, mercator, always_xy=True)
    x, y = transformer.transform(lon, lat)

    # Calculate scale factor (distortion) at given latitude
    scale_factor = np.cos(np.radians(lat))  # Mercator scale correction

    # Adjust the effective resolution
    effective_resolution = resolution * scale_factor

    # Compute number of pixels per side
    pixels = bbox_size / effective_resolution
    return int(round(pixels))

tif_processor

TifProcessor

A class to handle tif data processing, supporting single-band, RGB, and RGBA data.

Source code in gigaspatial/processing/tif_processor.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class TifProcessor:
    """
    A class to handle tif data processing, supporting single-band, RGB, and RGBA data.
    """

    dataset_path: Union[Path, str]
    data_store: Optional[DataStore] = None
    mode: Literal["single", "rgb", "rgba"] = "single"

    def __post_init__(self):
        """Validate inputs and set up logging."""
        self.data_store = self.data_store or LocalDataStore()
        self.logger = config.get_logger(__name__)
        self._cache = {}

        if not self.data_store.file_exists(self.dataset_path):
            raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

        self._load_metadata()

        if self.mode == "rgba" and self.count != 4:
            raise ValueError("RGBA mode requires a 4-band TIF file")
        if self.mode == "rgb" and self.count != 3:
            raise ValueError("RGB mode requires a 3-band TIF file")

    @contextmanager
    def open_dataset(self):
        """Context manager for accessing the dataset"""
        with self.data_store.open(self.dataset_path, "rb") as f:
            with rasterio.MemoryFile(f.read()) as memfile:
                with memfile.open() as src:
                    yield src

    def _load_metadata(self):
        """Load metadata from the TIF file if not already cached"""
        if not self._cache:
            with self.open_dataset() as src:
                self._cache["transform"] = src.transform
                self._cache["crs"] = src.crs.to_string()
                self._cache["bounds"] = src.bounds
                self._cache["width"] = src.width
                self._cache["height"] = src.height
                self._cache["resolution"] = (abs(src.transform.a), abs(src.transform.e))
                self._cache["x_transform"] = src.transform.a
                self._cache["y_transform"] = src.transform.e
                self._cache["nodata"] = src.nodata
                self._cache["count"] = src.count
                self._cache["dtype"] = src.dtypes[0]

    @property
    def transform(self):
        """Get the transform from the TIF file"""
        return self._cache["transform"]

    @property
    def crs(self):
        """Get the coordinate reference system from the TIF file"""
        return self._cache["crs"]

    @property
    def bounds(self):
        """Get the bounds of the TIF file"""
        return self._cache["bounds"]

    @property
    def resolution(self) -> Tuple[float, float]:
        """Get the x and y resolution (pixel width and height or pixel size) from the TIF file"""
        return self._cache["resolution"]

    @property
    def x_transform(self) -> float:
        """Get the x transform from the TIF file"""
        return self._cache["x_transform"]

    @property
    def y_transform(self) -> float:
        """Get the y transform from the TIF file"""
        return self._cache["y_transform"]

    @property
    def count(self) -> int:
        """Get the band count from the TIF file"""
        return self._cache["count"]

    @property
    def nodata(self) -> int:
        """Get the value representing no data in the rasters"""
        return self._cache["nodata"]

    @property
    def tabular(self) -> pd.DataFrame:
        """Get the data from the TIF file"""
        if not hasattr(self, "_tabular"):
            try:
                if self.mode == "single":
                    self._tabular = self._to_band_dataframe(
                        drop_nodata=True, drop_values=[]
                    )
                elif self.mode == "rgb":
                    self._tabular = self._to_rgb_dataframe(drop_nodata=True)
                elif self.mode == "rgba":
                    self._tabular = self._to_rgba_dataframe(drop_transparent=True)
            except Exception as e:
                raise ValueError(
                    f"Failed to process TIF file in mode '{self.mode}'. "
                    f"Please ensure the file is valid and matches the selected mode. "
                    f"Original error: {str(e)}"
                )

        return self._tabular

    def to_dataframe(self) -> pd.DataFrame:
        return self.tabular

    def get_zoned_geodataframe(self) -> gpd.GeoDataFrame:
        """
        Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
        Each zone is defined by its bounding box, based on pixel resolution and coordinates.
        """
        self.logger.info("Converting data to GeoDataFrame with zones...")

        df = self.tabular

        x_res, y_res = self.resolution

        # create bounding box for each pixel
        geometries = [
            box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
            for lon, lat in zip(df["lon"], df["lat"])
        ]

        gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

        self.logger.info("Conversion to GeoDataFrame complete!")
        return gdf

    def sample_by_coordinates(
        self, coordinate_list: List[Tuple[float, float]]
    ) -> Union[np.ndarray, dict]:
        self.logger.info("Sampling raster values at the coordinates...")

        with self.open_dataset() as src:
            if self.mode == "rgba":
                if self.count != 4:
                    raise ValueError("RGBA mode requires a 4-band TIF file")

                rgba_values = {"red": [], "green": [], "blue": [], "alpha": []}

                for band_idx, color in enumerate(["red", "green", "blue", "alpha"], 1):
                    rgba_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgba_values

            elif self.mode == "rgb":
                if self.count != 3:
                    raise ValueError("RGB mode requires a 3-band TIF file")

                rgb_values = {"red": [], "green": [], "blue": []}

                for band_idx, color in enumerate(["red", "green", "blue"], 1):
                    rgb_values[color] = [
                        vals[0]
                        for vals in src.sample(coordinate_list, indexes=band_idx)
                    ]

                return rgb_values
            else:
                if src.count != 1:
                    raise ValueError("Single band mode requires a 1-band TIF file")
                return np.array([vals[0] for vals in src.sample(coordinate_list)])

    def sample_by_polygons(
        self, polygon_list: List[Union[Polygon, MultiPolygon]], stat: str = "mean"
    ) -> gpd.GeoDataFrame:
        """
        Sample raster values within each polygon of a GeoDataFrame.

        Parameters:
            polygon_list: List of polygon geometries (can include MultiPolygons).
            stat (str): Aggregation statistic to compute within each polygon.
                        Options: "mean", "median", "sum", "min", "max".

        Returns:
            GeoDataFrame: A copy of the input GeoDataFrame with an added column
                        containing sampled raster values.
        """
        self.logger.info("Sampling raster values within polygons...")

        with self.open_dataset() as src:
            results = []

            for geom in polygon_list:
                if geom.is_empty:
                    results.append(np.nan)
                    continue

                try:
                    # Mask the raster with the polygon
                    out_image, _ = mask(src, [geom], crop=True)

                    # Flatten the raster values and remove NoData values
                    values = out_image[out_image != src.nodata].flatten()

                    # Compute the desired statistic
                    if len(values) == 0:
                        results.append(np.nan)
                    else:
                        if stat == "mean":
                            results.append(np.mean(values))
                        elif stat == "median":
                            results.append(np.median(values))
                        elif stat == "sum":
                            results.append(np.sum(values))
                        elif stat == "min":
                            results.append(np.min(values))
                        elif stat == "max":
                            results.append(np.max(values))
                        else:
                            raise ValueError(f"Unknown statistic: {stat}")

                except Exception as e:
                    self.logger.error(f"Error processing polygon: {e}")
                    results.append(np.nan)

        return np.array(results)

    def _to_rgba_dataframe(self, drop_transparent: bool = False) -> pd.DataFrame:
        """
        Convert RGBA TIF to DataFrame with separate columns for R, G, B, A values.
        """
        self.logger.info("Processing RGBA dataset...")

        with self.open_dataset() as src:
            if self.count != 4:
                raise ValueError("RGBA mode requires a 4-band TIF file")

            # Read all four bands
            red, green, blue, alpha = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_transparent:
                mask = alpha > 0
                red = np.extract(mask, red)
                green = np.extract(mask, green)
                blue = np.extract(mask, blue)
                alpha = np.extract(mask, alpha)
                lons = np.extract(mask, x_coords)
                lats = np.extract(mask, y_coords)
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()
                alpha = alpha.flatten()

            # Create DataFrame with RGBA values
            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                    "alpha": alpha,
                }
            )

            # Normalize alpha values if they're not in [0, 1] range
            if data["alpha"].max() > 1:
                data["alpha"] = data["alpha"] / data["alpha"].max()

        self.logger.info("RGBA dataset is processed!")
        return data

    def _to_rgb_dataframe(self, drop_nodata: bool = True) -> pd.DataFrame:
        """Convert RGB TIF to DataFrame with separate columns for R, G, B values."""
        if self.mode != "rgb":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing RGB dataset...")

        with self.open_dataset() as src:
            if self.count != 3:
                raise ValueError("RGB mode requires a 3-band TIF file")

            # Read all three bands
            red, green, blue = src.read()

            x_coords, y_coords = self._get_pixel_coordinates()

            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    mask = ~(
                        (red == nodata_value)
                        | (green == nodata_value)
                        | (blue == nodata_value)
                    )
                    red = np.extract(mask, red)
                    green = np.extract(mask, green)
                    blue = np.extract(mask, blue)
                    lons = np.extract(mask, x_coords)
                    lats = np.extract(mask, y_coords)
                else:
                    lons = x_coords.flatten()
                    lats = y_coords.flatten()
                    red = red.flatten()
                    green = green.flatten()
                    blue = blue.flatten()
            else:
                lons = x_coords.flatten()
                lats = y_coords.flatten()
                red = red.flatten()
                green = green.flatten()
                blue = blue.flatten()

            data = pd.DataFrame(
                {
                    "lon": lons,
                    "lat": lats,
                    "red": red,
                    "green": green,
                    "blue": blue,
                }
            )

        self.logger.info("RGB dataset is processed!")
        return data

    def _to_band_dataframe(
        self, band_number: int = 1, drop_nodata: bool = True, drop_values: list = []
    ) -> pd.DataFrame:
        """Process single-band TIF to DataFrame."""
        if self.mode != "single":
            raise ValueError("Use appropriate method for current mode")

        self.logger.info("Processing single-band dataset...")

        if band_number <= 0 or band_number > self.count:
            self.logger.error(
                f"Error: Band number {band_number} is out of range. The file has {self.count} bands."
            )
            return None

        with self.open_dataset() as src:

            band = src.read(band_number)

            x_coords, y_coords = self._get_pixel_coordinates()

            values_to_mask = []
            if drop_nodata:
                nodata_value = src.nodata
                if nodata_value is not None:
                    values_to_mask.append(nodata_value)

            if drop_values:
                values_to_mask.extend(drop_values)

            if values_to_mask:
                data_mask = ~np.isin(band, values_to_mask)
                pixel_values = np.extract(data_mask, band)
                lons = np.extract(data_mask, x_coords)
                lats = np.extract(data_mask, y_coords)
            else:
                pixel_values = band.flatten()
                lons = x_coords.flatten()
                lats = y_coords.flatten()

            data = pd.DataFrame({"lon": lons, "lat": lats, "pixel_value": pixel_values})

        self.logger.info("Dataset is processed!")
        return data

    def _get_pixel_coordinates(self):
        """Helper method to generate coordinate arrays for all pixels"""
        if "pixel_coords" not in self._cache:
            # use cached values
            bounds = self._cache["bounds"]
            width = self._cache["width"]
            height = self._cache["height"]
            pixel_size_x = self._cache["x_transform"]
            pixel_size_y = self._cache["y_transform"]

            self._cache["pixel_coords"] = np.meshgrid(
                np.linspace(
                    bounds.left + pixel_size_x / 2,
                    bounds.right - pixel_size_x / 2,
                    width,
                ),
                np.linspace(
                    bounds.top + pixel_size_y / 2,
                    bounds.bottom - pixel_size_y / 2,
                    height,
                ),
            )

        return self._cache["pixel_coords"]
bounds property

Get the bounds of the TIF file

count: int property

Get the band count from the TIF file

crs property

Get the coordinate reference system from the TIF file

nodata: int property

Get the value representing no data in the rasters

resolution: Tuple[float, float] property

Get the x and y resolution (pixel width and height or pixel size) from the TIF file

tabular: pd.DataFrame property

Get the data from the TIF file

transform property

Get the transform from the TIF file

x_transform: float property

Get the x transform from the TIF file

y_transform: float property

Get the y transform from the TIF file

__post_init__()

Validate inputs and set up logging.

Source code in gigaspatial/processing/tif_processor.py
def __post_init__(self):
    """Validate inputs and set up logging."""
    self.data_store = self.data_store or LocalDataStore()
    self.logger = config.get_logger(__name__)
    self._cache = {}

    if not self.data_store.file_exists(self.dataset_path):
        raise FileNotFoundError(f"Dataset not found at {self.dataset_path}")

    self._load_metadata()

    if self.mode == "rgba" and self.count != 4:
        raise ValueError("RGBA mode requires a 4-band TIF file")
    if self.mode == "rgb" and self.count != 3:
        raise ValueError("RGB mode requires a 3-band TIF file")
get_zoned_geodataframe()

Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone. Each zone is defined by its bounding box, based on pixel resolution and coordinates.

Source code in gigaspatial/processing/tif_processor.py
def get_zoned_geodataframe(self) -> gpd.GeoDataFrame:
    """
    Convert the processed TIF data into a GeoDataFrame, where each row represents a pixel zone.
    Each zone is defined by its bounding box, based on pixel resolution and coordinates.
    """
    self.logger.info("Converting data to GeoDataFrame with zones...")

    df = self.tabular

    x_res, y_res = self.resolution

    # create bounding box for each pixel
    geometries = [
        box(lon - x_res / 2, lat - y_res / 2, lon + x_res / 2, lat + y_res / 2)
        for lon, lat in zip(df["lon"], df["lat"])
    ]

    gdf = gpd.GeoDataFrame(df, geometry=geometries, crs=self.crs)

    self.logger.info("Conversion to GeoDataFrame complete!")
    return gdf
open_dataset()

Context manager for accessing the dataset

Source code in gigaspatial/processing/tif_processor.py
@contextmanager
def open_dataset(self):
    """Context manager for accessing the dataset"""
    with self.data_store.open(self.dataset_path, "rb") as f:
        with rasterio.MemoryFile(f.read()) as memfile:
            with memfile.open() as src:
                yield src
sample_by_polygons(polygon_list, stat='mean')

Sample raster values within each polygon of a GeoDataFrame.

Parameters:

Name Type Description Default
polygon_list List[Union[Polygon, MultiPolygon]]

List of polygon geometries (can include MultiPolygons).

required
stat str

Aggregation statistic to compute within each polygon. Options: "mean", "median", "sum", "min", "max".

'mean'

Returns:

Name Type Description
GeoDataFrame GeoDataFrame

A copy of the input GeoDataFrame with an added column containing sampled raster values.

Source code in gigaspatial/processing/tif_processor.py
def sample_by_polygons(
    self, polygon_list: List[Union[Polygon, MultiPolygon]], stat: str = "mean"
) -> gpd.GeoDataFrame:
    """
    Sample raster values within each polygon of a GeoDataFrame.

    Parameters:
        polygon_list: List of polygon geometries (can include MultiPolygons).
        stat (str): Aggregation statistic to compute within each polygon.
                    Options: "mean", "median", "sum", "min", "max".

    Returns:
        GeoDataFrame: A copy of the input GeoDataFrame with an added column
                    containing sampled raster values.
    """
    self.logger.info("Sampling raster values within polygons...")

    with self.open_dataset() as src:
        results = []

        for geom in polygon_list:
            if geom.is_empty:
                results.append(np.nan)
                continue

            try:
                # Mask the raster with the polygon
                out_image, _ = mask(src, [geom], crop=True)

                # Flatten the raster values and remove NoData values
                values = out_image[out_image != src.nodata].flatten()

                # Compute the desired statistic
                if len(values) == 0:
                    results.append(np.nan)
                else:
                    if stat == "mean":
                        results.append(np.mean(values))
                    elif stat == "median":
                        results.append(np.median(values))
                    elif stat == "sum":
                        results.append(np.sum(values))
                    elif stat == "min":
                        results.append(np.min(values))
                    elif stat == "max":
                        results.append(np.max(values))
                    else:
                        raise ValueError(f"Unknown statistic: {stat}")

            except Exception as e:
                self.logger.error(f"Error processing polygon: {e}")
                results.append(np.nan)

    return np.array(results)

sample_multiple_tifs_by_coordinates(tif_processors, coordinate_list)

Sample raster values from multiple TIFF files for given coordinates.

Parameters: - tif_processors: List of TifProcessor instances. - coordinate_list: List of (x, y) coordinates.

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_coordinates(
    tif_processors: List[TifProcessor], coordinate_list: List[Tuple[float, float]]
):
    """
    Sample raster values from multiple TIFF files for given coordinates.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - coordinate_list: List of (x, y) coordinates.

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(coordinate_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_coordinates(coordinate_list=coordinate_list)

        if tp.nodata is not None:
            mask = (np.isnan(sampled_values)) & (
                values != tp.nodata
            )  # Replace only NaNs
        else:
            mask = np.isnan(sampled_values)  # No explicit nodata, replace all NaNs

        sampled_values[mask] = values[mask]  # Update only missing values

    return sampled_values

sample_multiple_tifs_by_polygons(tif_processors, polygon_list, stat='mean')

Sample raster values from multiple TIFF files for polygons in a list and join the results.

Parameters: - tif_processors: List of TifProcessor instances. - polygon_list: List of polygon geometries (can include MultiPolygons). - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

Returns: - A NumPy array of sampled values, taking the first non-nodata value encountered.

Source code in gigaspatial/processing/tif_processor.py
def sample_multiple_tifs_by_polygons(
    tif_processors: List[TifProcessor],
    polygon_list: List[Union[Polygon, MultiPolygon]],
    stat: str = "mean",
):
    """
    Sample raster values from multiple TIFF files for polygons in a list and join the results.

    Parameters:
    - tif_processors: List of TifProcessor instances.
    - polygon_list: List of polygon geometries (can include MultiPolygons).
    - stat: Aggregation statistic to compute within each polygon (mean, median, sum, min, max).

    Returns:
    - A NumPy array of sampled values, taking the first non-nodata value encountered.
    """
    sampled_values = np.full(len(polygon_list), np.nan, dtype=np.float32)

    for tp in tif_processors:
        values = tp.sample_by_polygons(polygon_list=polygon_list, stat=stat)

        mask = np.isnan(sampled_values)  # replace all NaNs

        sampled_values[mask] = values[mask]  # Update only values with samapled value

    return sampled_values

utils

assign_id(df, required_columns, id_column='id')

Generate IDs for any entity type in a pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame containing entity data

required
required_columns List[str]

List of column names required for ID generation

required
id_column str

Name for the id column that will be generated

'id'

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with generated id column

Source code in gigaspatial/processing/utils.py
def assign_id(
    df: pd.DataFrame, required_columns: List[str], id_column: str = "id"
) -> pd.DataFrame:
    """
    Generate IDs for any entity type in a pandas DataFrame.

    Args:
        df (pd.DataFrame): Input DataFrame containing entity data
        required_columns (List[str]): List of column names required for ID generation
        id_column (str): Name for the id column that will be generated

    Returns:
        pd.DataFrame: DataFrame with generated id column
    """
    # Create a copy to avoid modifying the original DataFrame
    df = df.copy()

    # Check if ID column exists, if not create it with None values
    if id_column not in df.columns:
        df[id_column] = None

    # Check required columns exist
    if not all(col in df.columns for col in required_columns):
        return df

    # Create identifier concat for UUID generation
    df["identifier_concat"] = (
        df[required_columns].astype(str).fillna("").agg("".join, axis=1)
    )

    # Generate UUIDs only where all required fields are present and no existing ID
    mask = df[id_column].isna()
    for col in required_columns:
        mask &= df[col].notna()

    # Apply UUID generation only where mask is True
    df.loc[mask, id_column] = df.loc[mask, "identifier_concat"].apply(
        lambda x: str(uuid.uuid3(uuid.NAMESPACE_DNS, x))
    )

    # Drop temporary column
    df = df.drop(columns=["identifier_concat"])

    return df