Skip to content

Core Module

gigaspatial.core

io

adls_data_store

ADLSDataStore

Bases: DataStore

An implementation of DataStore for Azure Data Lake Storage.

Source code in gigaspatial/core/io/adls_data_store.py
class ADLSDataStore(DataStore):
    """
    An implementation of DataStore for Azure Data Lake Storage.
    """

    def __init__(
        self,
        container: str = config.ADLS_CONTAINER_NAME,
        connection_string: str = config.ADLS_CONNECTION_STRING,
    ):
        """
        Create a new instance of ADLSDataStore
        :param container: The name of the container in ADLS to interact with.
        """
        self.blob_service_client = BlobServiceClient.from_connection_string(
            connection_string
        )
        self.container_client = self.blob_service_client.get_container_client(
            container=container
        )
        self.container = container

    def read_file(self, path: str, encoding: Optional[str] = None) -> Union[str, bytes]:
        """
        Read file with flexible encoding support.

        :param path: Path to the file in blob storage
        :param encoding: File encoding (optional)
        :return: File contents as string or bytes
        """
        try:
            blob_client = self.container_client.get_blob_client(path)
            blob_data = blob_client.download_blob().readall()

            # If no encoding specified, return raw bytes
            if encoding is None:
                return blob_data

            # If encoding is specified, decode the bytes
            return blob_data.decode(encoding)

        except Exception as e:
            raise IOError(f"Error reading file {path}: {e}")

    def write_file(self, path: str, data) -> None:
        """
        Write file with support for content type and improved type handling.

        :param path: Destination path in blob storage
        :param data: File contents
        """
        blob_client = self.blob_service_client.get_blob_client(
            container=self.container, blob=path, snapshot=None
        )

        if isinstance(data, str):
            binary_data = data.encode()
        elif isinstance(data, bytes):
            binary_data = data
        else:
            raise Exception(f'Unsupported data type. Only "bytes" or "string" accepted')

        blob_client.upload_blob(binary_data, overwrite=True)

    def upload_file(self, file_path, blob_path):
        """Uploads a single file to Azure Blob Storage."""
        try:
            blob_client = self.container_client.get_blob_client(blob_path)
            with open(file_path, "rb") as data:
                blob_client.upload_blob(data, overwrite=True)
            print(f"Uploaded {file_path} to {blob_path}")
        except Exception as e:
            print(f"Failed to upload {file_path}: {e}")

    def upload_directory(self, dir_path, blob_dir_path):
        """Uploads all files from a directory to Azure Blob Storage."""
        for root, dirs, files in os.walk(dir_path):
            for file in files:
                local_file_path = os.path.join(root, file)
                relative_path = os.path.relpath(local_file_path, dir_path)
                blob_file_path = os.path.join(blob_dir_path, relative_path).replace(
                    "\\", "/"
                )

                self.upload_file(local_file_path, blob_file_path)

    def download_directory(self, blob_dir_path: str, local_dir_path: str):
        """Downloads all files from a directory in Azure Blob Storage to a local directory."""
        try:
            # Ensure the local directory exists
            os.makedirs(local_dir_path, exist_ok=True)

            # List all files in the blob directory
            blob_items = self.container_client.list_blobs(
                name_starts_with=blob_dir_path
            )

            for blob_item in blob_items:
                # Get the relative path of the blob file
                relative_path = os.path.relpath(blob_item.name, blob_dir_path)
                # Construct the local file path
                local_file_path = os.path.join(local_dir_path, relative_path)
                # Create directories if needed
                os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

                # Download the blob to the local file
                blob_client = self.container_client.get_blob_client(blob_item.name)
                with open(local_file_path, "wb") as file:
                    file.write(blob_client.download_blob().readall())

            print(f"Downloaded directory {blob_dir_path} to {local_dir_path}")
        except Exception as e:
            print(f"Failed to download directory {blob_dir_path}: {e}")

    def copy_directory(self, source_dir: str, destination_dir: str):
        """
        Copies all files from a source directory to a destination directory within the same container.

        :param source_dir: The source directory path in the blob storage
        :param destination_dir: The destination directory path in the blob storage
        """
        try:
            # Ensure source directory path ends with a trailing slash
            source_dir = source_dir.rstrip("/") + "/"
            destination_dir = destination_dir.rstrip("/") + "/"

            # List all blobs in the source directory
            source_blobs = self.container_client.list_blobs(name_starts_with=source_dir)

            for blob in source_blobs:
                # Get the relative path of the blob
                relative_path = os.path.relpath(blob.name, source_dir)

                # Construct the new blob path
                new_blob_path = os.path.join(destination_dir, relative_path).replace(
                    "\\", "/"
                )

                # Create a source blob client
                source_blob_client = self.container_client.get_blob_client(blob.name)

                # Create a destination blob client
                destination_blob_client = self.container_client.get_blob_client(
                    new_blob_path
                )

                # Start the copy operation
                destination_blob_client.start_copy_from_url(source_blob_client.url)

            print(f"Copied directory from {source_dir} to {destination_dir}")
        except Exception as e:
            print(f"Failed to copy directory {source_dir}: {e}")

    def exists(self, path: str) -> bool:
        blob_client = self.blob_service_client.get_blob_client(
            container=self.container, blob=path, snapshot=None
        )
        return blob_client.exists()

    def file_exists(self, path: str) -> bool:
        return self.exists(path) and not self.is_dir(path)

    def file_size(self, path: str) -> float:
        blob_client = self.blob_service_client.get_blob_client(
            container=self.container, blob=path, snapshot=None
        )
        properties = blob_client.get_blob_properties()

        # The size is in bytes, convert it to kilobytes
        size_in_bytes = properties.size
        size_in_kb = size_in_bytes / 1024.0
        return size_in_kb

    def list_files(self, path: str):
        blob_items = self.container_client.list_blobs(name_starts_with=path)
        return [item["name"] for item in blob_items]

    def walk(self, top: str):
        top = top.rstrip("/") + "/"
        blob_items = self.container_client.list_blobs(name_starts_with=top)
        blobs = [item["name"] for item in blob_items]
        for blob in blobs:
            dirpath, filename = os.path.split(blob)
            yield (dirpath, [], [filename])

    @contextlib.contextmanager
    def open(self, path: str, mode: str = "r"):
        """
        Context manager for file operations with enhanced mode support.

        :param path: File path in blob storage
        :param mode: File open mode (r, rb, w, wb)
        """
        if mode == "w":
            file = io.StringIO()
            yield file
            self.write_file(path, file.getvalue())

        elif mode == "wb":
            file = io.BytesIO()
            yield file
            self.write_file(path, file.getvalue())

        elif mode == "r":
            data = self.read_file(path, encoding="UTF-8")
            file = io.StringIO(data)
            yield file

        elif mode == "rb":
            data = self.read_file(path)
            file = io.BytesIO(data)
            yield file

    def get_file_metadata(self, path: str) -> dict:
        """
        Retrieve comprehensive file metadata.

        :param path: File path in blob storage
        :return: File metadata dictionary
        """
        blob_client = self.container_client.get_blob_client(path)
        properties = blob_client.get_blob_properties()

        return {
            "name": path,
            "size_bytes": properties.size,
            "content_type": properties.content_settings.content_type,
            "last_modified": properties.last_modified,
            "etag": properties.etag,
        }

    def is_file(self, path: str) -> bool:
        return self.file_exists(path)

    def is_dir(self, path: str) -> bool:
        dir_path = path.rstrip("/") + "/"

        existing_blobs = self.list_files(dir_path)

        if len(existing_blobs) > 1:
            return True
        elif len(existing_blobs) == 1:
            if existing_blobs[0] != path.rstrip("/"):
                return True

        return False

    def rmdir(self, dir: str) -> None:
        blobs = self.list_files(dir)
        self.container_client.delete_blobs(*blobs)

    def mkdir(self, path: str, exist_ok: bool = False) -> None:
        """
        Create a directory in Azure Blob Storage.

        In ADLS, directories are conceptual and created by adding a placeholder blob.

        :param path: Path of the directory to create
        :param exist_ok: If False, raise an error if the directory already exists
        """
        dir_path = path.rstrip("/") + "/"

        existing_blobs = list(self.list_files(dir_path))

        if existing_blobs and not exist_ok:
            raise FileExistsError(f"Directory {path} already exists")

        # Create a placeholder blob to represent the directory
        placeholder_blob_path = os.path.join(dir_path, ".placeholder")

        # Only create placeholder if it doesn't already exist
        if not self.file_exists(placeholder_blob_path):
            placeholder_content = (
                b"This is a placeholder blob to represent a directory."
            )
            blob_client = self.blob_service_client.get_blob_client(
                container=self.container, blob=placeholder_blob_path
            )
            blob_client.upload_blob(placeholder_content, overwrite=True)

    def remove(self, path: str) -> None:
        blob_client = self.blob_service_client.get_blob_client(
            container=self.container, blob=path, snapshot=None
        )
        if blob_client.exists():
            blob_client.delete_blob()
__init__(container=config.ADLS_CONTAINER_NAME, connection_string=config.ADLS_CONNECTION_STRING)

Create a new instance of ADLSDataStore :param container: The name of the container in ADLS to interact with.

Source code in gigaspatial/core/io/adls_data_store.py
def __init__(
    self,
    container: str = config.ADLS_CONTAINER_NAME,
    connection_string: str = config.ADLS_CONNECTION_STRING,
):
    """
    Create a new instance of ADLSDataStore
    :param container: The name of the container in ADLS to interact with.
    """
    self.blob_service_client = BlobServiceClient.from_connection_string(
        connection_string
    )
    self.container_client = self.blob_service_client.get_container_client(
        container=container
    )
    self.container = container
copy_directory(source_dir, destination_dir)

Copies all files from a source directory to a destination directory within the same container.

:param source_dir: The source directory path in the blob storage :param destination_dir: The destination directory path in the blob storage

Source code in gigaspatial/core/io/adls_data_store.py
def copy_directory(self, source_dir: str, destination_dir: str):
    """
    Copies all files from a source directory to a destination directory within the same container.

    :param source_dir: The source directory path in the blob storage
    :param destination_dir: The destination directory path in the blob storage
    """
    try:
        # Ensure source directory path ends with a trailing slash
        source_dir = source_dir.rstrip("/") + "/"
        destination_dir = destination_dir.rstrip("/") + "/"

        # List all blobs in the source directory
        source_blobs = self.container_client.list_blobs(name_starts_with=source_dir)

        for blob in source_blobs:
            # Get the relative path of the blob
            relative_path = os.path.relpath(blob.name, source_dir)

            # Construct the new blob path
            new_blob_path = os.path.join(destination_dir, relative_path).replace(
                "\\", "/"
            )

            # Create a source blob client
            source_blob_client = self.container_client.get_blob_client(blob.name)

            # Create a destination blob client
            destination_blob_client = self.container_client.get_blob_client(
                new_blob_path
            )

            # Start the copy operation
            destination_blob_client.start_copy_from_url(source_blob_client.url)

        print(f"Copied directory from {source_dir} to {destination_dir}")
    except Exception as e:
        print(f"Failed to copy directory {source_dir}: {e}")
download_directory(blob_dir_path, local_dir_path)

Downloads all files from a directory in Azure Blob Storage to a local directory.

Source code in gigaspatial/core/io/adls_data_store.py
def download_directory(self, blob_dir_path: str, local_dir_path: str):
    """Downloads all files from a directory in Azure Blob Storage to a local directory."""
    try:
        # Ensure the local directory exists
        os.makedirs(local_dir_path, exist_ok=True)

        # List all files in the blob directory
        blob_items = self.container_client.list_blobs(
            name_starts_with=blob_dir_path
        )

        for blob_item in blob_items:
            # Get the relative path of the blob file
            relative_path = os.path.relpath(blob_item.name, blob_dir_path)
            # Construct the local file path
            local_file_path = os.path.join(local_dir_path, relative_path)
            # Create directories if needed
            os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

            # Download the blob to the local file
            blob_client = self.container_client.get_blob_client(blob_item.name)
            with open(local_file_path, "wb") as file:
                file.write(blob_client.download_blob().readall())

        print(f"Downloaded directory {blob_dir_path} to {local_dir_path}")
    except Exception as e:
        print(f"Failed to download directory {blob_dir_path}: {e}")
get_file_metadata(path)

Retrieve comprehensive file metadata.

:param path: File path in blob storage :return: File metadata dictionary

Source code in gigaspatial/core/io/adls_data_store.py
def get_file_metadata(self, path: str) -> dict:
    """
    Retrieve comprehensive file metadata.

    :param path: File path in blob storage
    :return: File metadata dictionary
    """
    blob_client = self.container_client.get_blob_client(path)
    properties = blob_client.get_blob_properties()

    return {
        "name": path,
        "size_bytes": properties.size,
        "content_type": properties.content_settings.content_type,
        "last_modified": properties.last_modified,
        "etag": properties.etag,
    }
mkdir(path, exist_ok=False)

Create a directory in Azure Blob Storage.

In ADLS, directories are conceptual and created by adding a placeholder blob.

:param path: Path of the directory to create :param exist_ok: If False, raise an error if the directory already exists

Source code in gigaspatial/core/io/adls_data_store.py
def mkdir(self, path: str, exist_ok: bool = False) -> None:
    """
    Create a directory in Azure Blob Storage.

    In ADLS, directories are conceptual and created by adding a placeholder blob.

    :param path: Path of the directory to create
    :param exist_ok: If False, raise an error if the directory already exists
    """
    dir_path = path.rstrip("/") + "/"

    existing_blobs = list(self.list_files(dir_path))

    if existing_blobs and not exist_ok:
        raise FileExistsError(f"Directory {path} already exists")

    # Create a placeholder blob to represent the directory
    placeholder_blob_path = os.path.join(dir_path, ".placeholder")

    # Only create placeholder if it doesn't already exist
    if not self.file_exists(placeholder_blob_path):
        placeholder_content = (
            b"This is a placeholder blob to represent a directory."
        )
        blob_client = self.blob_service_client.get_blob_client(
            container=self.container, blob=placeholder_blob_path
        )
        blob_client.upload_blob(placeholder_content, overwrite=True)
open(path, mode='r')

Context manager for file operations with enhanced mode support.

:param path: File path in blob storage :param mode: File open mode (r, rb, w, wb)

Source code in gigaspatial/core/io/adls_data_store.py
@contextlib.contextmanager
def open(self, path: str, mode: str = "r"):
    """
    Context manager for file operations with enhanced mode support.

    :param path: File path in blob storage
    :param mode: File open mode (r, rb, w, wb)
    """
    if mode == "w":
        file = io.StringIO()
        yield file
        self.write_file(path, file.getvalue())

    elif mode == "wb":
        file = io.BytesIO()
        yield file
        self.write_file(path, file.getvalue())

    elif mode == "r":
        data = self.read_file(path, encoding="UTF-8")
        file = io.StringIO(data)
        yield file

    elif mode == "rb":
        data = self.read_file(path)
        file = io.BytesIO(data)
        yield file
read_file(path, encoding=None)

Read file with flexible encoding support.

:param path: Path to the file in blob storage :param encoding: File encoding (optional) :return: File contents as string or bytes

Source code in gigaspatial/core/io/adls_data_store.py
def read_file(self, path: str, encoding: Optional[str] = None) -> Union[str, bytes]:
    """
    Read file with flexible encoding support.

    :param path: Path to the file in blob storage
    :param encoding: File encoding (optional)
    :return: File contents as string or bytes
    """
    try:
        blob_client = self.container_client.get_blob_client(path)
        blob_data = blob_client.download_blob().readall()

        # If no encoding specified, return raw bytes
        if encoding is None:
            return blob_data

        # If encoding is specified, decode the bytes
        return blob_data.decode(encoding)

    except Exception as e:
        raise IOError(f"Error reading file {path}: {e}")
upload_directory(dir_path, blob_dir_path)

Uploads all files from a directory to Azure Blob Storage.

Source code in gigaspatial/core/io/adls_data_store.py
def upload_directory(self, dir_path, blob_dir_path):
    """Uploads all files from a directory to Azure Blob Storage."""
    for root, dirs, files in os.walk(dir_path):
        for file in files:
            local_file_path = os.path.join(root, file)
            relative_path = os.path.relpath(local_file_path, dir_path)
            blob_file_path = os.path.join(blob_dir_path, relative_path).replace(
                "\\", "/"
            )

            self.upload_file(local_file_path, blob_file_path)
upload_file(file_path, blob_path)

Uploads a single file to Azure Blob Storage.

Source code in gigaspatial/core/io/adls_data_store.py
def upload_file(self, file_path, blob_path):
    """Uploads a single file to Azure Blob Storage."""
    try:
        blob_client = self.container_client.get_blob_client(blob_path)
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True)
        print(f"Uploaded {file_path} to {blob_path}")
    except Exception as e:
        print(f"Failed to upload {file_path}: {e}")
write_file(path, data)

Write file with support for content type and improved type handling.

:param path: Destination path in blob storage :param data: File contents

Source code in gigaspatial/core/io/adls_data_store.py
def write_file(self, path: str, data) -> None:
    """
    Write file with support for content type and improved type handling.

    :param path: Destination path in blob storage
    :param data: File contents
    """
    blob_client = self.blob_service_client.get_blob_client(
        container=self.container, blob=path, snapshot=None
    )

    if isinstance(data, str):
        binary_data = data.encode()
    elif isinstance(data, bytes):
        binary_data = data
    else:
        raise Exception(f'Unsupported data type. Only "bytes" or "string" accepted')

    blob_client.upload_blob(binary_data, overwrite=True)

data_api

GigaDataAPI
Source code in gigaspatial/core/io/data_api.py
class GigaDataAPI:

    def __init__(
        self,
        profile_file: Union[str, Path] = config.API_PROFILE_FILE_PATH,
        share_name: str = config.API_SHARE_NAME,
        schema_name: str = config.API_SCHEMA_NAME,
    ):
        """
        Initialize the GigaDataAPI class with the profile file, share name, and schema name.

        profile_file: Path to the delta-sharing profile file.
        share_name: Name of the share (e.g., "gold").
        schema_name: Name of the schema (e.g., "school-master").
        """
        self.profile_file = profile_file
        self.share_name = share_name
        self.schema_name = schema_name
        self.client = delta_sharing.SharingClient(profile_file)

        self._cache = {}

    def get_country_list(self, sort=True):
        """
        Retrieve a list of available countries in the dataset.

        :param sort: Whether to sort the country list alphabetically (default is True).
        """
        country_list = [
            t.name for t in self.client.list_all_tables() if t.share == self.share_name
        ]
        if sort:
            country_list.sort()
        return country_list

    def load_country_data(self, country, filters=None, use_cache=True):
        """
        Load the dataset for the specified country with optional filtering and caching.

        country: The country code (e.g., "MWI").
        filters: A dictionary with column names as keys and filter values as values.
        use_cache: Whether to use cached data if available (default is True).
        """
        # Check if data is cached
        if use_cache and country in self._cache:
            df_country = self._cache[country]
        else:
            # Load data from the API
            table_url = (
                f"{self.profile_file}#{self.share_name}.{self.schema_name}.{country}"
            )
            df_country = delta_sharing.load_as_pandas(table_url)
            self._cache[country] = df_country  # Cache the data

        # Apply filters if provided
        if filters:
            for column, value in filters.items():
                df_country = df_country[df_country[column] == value]

        return df_country

    def load_multiple_countries(self, countries):
        """
        Load data for multiple countries and combine them into a single DataFrame.

        countries: A list of country codes.
        """
        df_list = []
        for country in countries:
            df_list.append(self.load_country_data(country))
        return pd.concat(df_list, ignore_index=True)

    def get_country_metadata(self, country):
        """
        Retrieve metadata (e.g., column names and data types) for a country's dataset.

        country: The country code (e.g., "MWI").
        """
        df_country = self.load_country_data(country)
        metadata = {
            "columns": df_country.columns.tolist(),
            "data_types": df_country.dtypes.to_dict(),
            "num_records": len(df_country),
        }
        return metadata

    def get_all_cached_data_as_dict(self):
        """
        Retrieve all cached data in a dictionary format, where each key is a country code,
        and the value is the DataFrame of that country.
        """
        return self._cache if self._cache else {}

    def get_all_cached_data_as_json(self):
        """
        Retrieve all cached data in a JSON-like format. Each country is represented as a key,
        and the value is a list of records (i.e., the DataFrame's `to_dict(orient='records')` format).
        """
        if not self._cache:
            return {}

        # Convert each DataFrame in the cache to a JSON-like format (list of records)
        return {
            country: df.to_dict(orient="records") for country, df in self._cache.items()
        }
__init__(profile_file=config.API_PROFILE_FILE_PATH, share_name=config.API_SHARE_NAME, schema_name=config.API_SCHEMA_NAME)

Initialize the GigaDataAPI class with the profile file, share name, and schema name.

profile_file: Path to the delta-sharing profile file. share_name: Name of the share (e.g., "gold"). schema_name: Name of the schema (e.g., "school-master").

Source code in gigaspatial/core/io/data_api.py
def __init__(
    self,
    profile_file: Union[str, Path] = config.API_PROFILE_FILE_PATH,
    share_name: str = config.API_SHARE_NAME,
    schema_name: str = config.API_SCHEMA_NAME,
):
    """
    Initialize the GigaDataAPI class with the profile file, share name, and schema name.

    profile_file: Path to the delta-sharing profile file.
    share_name: Name of the share (e.g., "gold").
    schema_name: Name of the schema (e.g., "school-master").
    """
    self.profile_file = profile_file
    self.share_name = share_name
    self.schema_name = schema_name
    self.client = delta_sharing.SharingClient(profile_file)

    self._cache = {}
get_all_cached_data_as_dict()

Retrieve all cached data in a dictionary format, where each key is a country code, and the value is the DataFrame of that country.

Source code in gigaspatial/core/io/data_api.py
def get_all_cached_data_as_dict(self):
    """
    Retrieve all cached data in a dictionary format, where each key is a country code,
    and the value is the DataFrame of that country.
    """
    return self._cache if self._cache else {}
get_all_cached_data_as_json()

Retrieve all cached data in a JSON-like format. Each country is represented as a key, and the value is a list of records (i.e., the DataFrame's to_dict(orient='records') format).

Source code in gigaspatial/core/io/data_api.py
def get_all_cached_data_as_json(self):
    """
    Retrieve all cached data in a JSON-like format. Each country is represented as a key,
    and the value is a list of records (i.e., the DataFrame's `to_dict(orient='records')` format).
    """
    if not self._cache:
        return {}

    # Convert each DataFrame in the cache to a JSON-like format (list of records)
    return {
        country: df.to_dict(orient="records") for country, df in self._cache.items()
    }
get_country_list(sort=True)

Retrieve a list of available countries in the dataset.

:param sort: Whether to sort the country list alphabetically (default is True).

Source code in gigaspatial/core/io/data_api.py
def get_country_list(self, sort=True):
    """
    Retrieve a list of available countries in the dataset.

    :param sort: Whether to sort the country list alphabetically (default is True).
    """
    country_list = [
        t.name for t in self.client.list_all_tables() if t.share == self.share_name
    ]
    if sort:
        country_list.sort()
    return country_list
get_country_metadata(country)

Retrieve metadata (e.g., column names and data types) for a country's dataset.

country: The country code (e.g., "MWI").

Source code in gigaspatial/core/io/data_api.py
def get_country_metadata(self, country):
    """
    Retrieve metadata (e.g., column names and data types) for a country's dataset.

    country: The country code (e.g., "MWI").
    """
    df_country = self.load_country_data(country)
    metadata = {
        "columns": df_country.columns.tolist(),
        "data_types": df_country.dtypes.to_dict(),
        "num_records": len(df_country),
    }
    return metadata
load_country_data(country, filters=None, use_cache=True)

Load the dataset for the specified country with optional filtering and caching.

country: The country code (e.g., "MWI"). filters: A dictionary with column names as keys and filter values as values. use_cache: Whether to use cached data if available (default is True).

Source code in gigaspatial/core/io/data_api.py
def load_country_data(self, country, filters=None, use_cache=True):
    """
    Load the dataset for the specified country with optional filtering and caching.

    country: The country code (e.g., "MWI").
    filters: A dictionary with column names as keys and filter values as values.
    use_cache: Whether to use cached data if available (default is True).
    """
    # Check if data is cached
    if use_cache and country in self._cache:
        df_country = self._cache[country]
    else:
        # Load data from the API
        table_url = (
            f"{self.profile_file}#{self.share_name}.{self.schema_name}.{country}"
        )
        df_country = delta_sharing.load_as_pandas(table_url)
        self._cache[country] = df_country  # Cache the data

    # Apply filters if provided
    if filters:
        for column, value in filters.items():
            df_country = df_country[df_country[column] == value]

    return df_country
load_multiple_countries(countries)

Load data for multiple countries and combine them into a single DataFrame.

countries: A list of country codes.

Source code in gigaspatial/core/io/data_api.py
def load_multiple_countries(self, countries):
    """
    Load data for multiple countries and combine them into a single DataFrame.

    countries: A list of country codes.
    """
    df_list = []
    for country in countries:
        df_list.append(self.load_country_data(country))
    return pd.concat(df_list, ignore_index=True)

data_store

DataStore

Bases: ABC

Abstract base class defining the interface for data store implementations. This class serves as a parent for both local and cloud-based storage solutions.

Source code in gigaspatial/core/io/data_store.py
class DataStore(ABC):
    """
    Abstract base class defining the interface for data store implementations.
    This class serves as a parent for both local and cloud-based storage solutions.
    """

    @abstractmethod
    def read_file(self, path: str) -> Any:
        """
        Read contents of a file from the data store.

        Args:
            path: Path to the file to read

        Returns:
            Contents of the file

        Raises:
            IOError: If file cannot be read
        """
        pass

    @abstractmethod
    def write_file(self, path: str, data: Any) -> None:
        """
        Write data to a file in the data store.

        Args:
            path: Path where to write the file
            data: Data to write to the file

        Raises:
            IOError: If file cannot be written
        """
        pass

    @abstractmethod
    def file_exists(self, path: str) -> bool:
        """
        Check if a file exists in the data store.

        Args:
            path: Path to check

        Returns:
            True if file exists, False otherwise
        """
        pass

    @abstractmethod
    def list_files(self, path: str) -> List[str]:
        """
        List all files in a directory.

        Args:
            path: Directory path to list

        Returns:
            List of file paths in the directory
        """
        pass

    @abstractmethod
    def walk(self, top: str) -> Generator:
        """
        Walk through directory tree, similar to os.walk().

        Args:
            top: Starting directory for the walk

        Returns:
            Generator yielding tuples of (dirpath, dirnames, filenames)
        """
        pass

    @abstractmethod
    def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
        """
        Context manager for file operations.

        Args:
            file: Path to the file
            mode: File mode ('r', 'w', 'rb', 'wb')

        Yields:
            File-like object

        Raises:
            IOError: If file cannot be opened
        """
        pass

    @abstractmethod
    def is_file(self, path: str) -> bool:
        """
        Check if path points to a file.

        Args:
            path: Path to check

        Returns:
            True if path is a file, False otherwise
        """
        pass

    @abstractmethod
    def is_dir(self, path: str) -> bool:
        """
        Check if path points to a directory.

        Args:
            path: Path to check

        Returns:
            True if path is a directory, False otherwise
        """
        pass

    @abstractmethod
    def remove(self, path: str) -> None:
        """
        Remove a file.

        Args:
            path: Path to the file to remove

        Raises:
            IOError: If file cannot be removed
        """
        pass

    @abstractmethod
    def rmdir(self, dir: str) -> None:
        """
        Remove a directory and all its contents.

        Args:
            dir: Path to the directory to remove

        Raises:
            IOError: If directory cannot be removed
        """
        pass
file_exists(path) abstractmethod

Check if a file exists in the data store.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def file_exists(self, path: str) -> bool:
    """
    Check if a file exists in the data store.

    Args:
        path: Path to check

    Returns:
        True if file exists, False otherwise
    """
    pass
is_dir(path) abstractmethod

Check if path points to a directory.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a directory, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_dir(self, path: str) -> bool:
    """
    Check if path points to a directory.

    Args:
        path: Path to check

    Returns:
        True if path is a directory, False otherwise
    """
    pass
is_file(path) abstractmethod

Check if path points to a file.

Parameters:

Name Type Description Default
path str

Path to check

required

Returns:

Type Description
bool

True if path is a file, False otherwise

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def is_file(self, path: str) -> bool:
    """
    Check if path points to a file.

    Args:
        path: Path to check

    Returns:
        True if path is a file, False otherwise
    """
    pass
list_files(path) abstractmethod

List all files in a directory.

Parameters:

Name Type Description Default
path str

Directory path to list

required

Returns:

Type Description
List[str]

List of file paths in the directory

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def list_files(self, path: str) -> List[str]:
    """
    List all files in a directory.

    Args:
        path: Directory path to list

    Returns:
        List of file paths in the directory
    """
    pass
open(file, mode='r') abstractmethod

Context manager for file operations.

Parameters:

Name Type Description Default
file str

Path to the file

required
mode str

File mode ('r', 'w', 'rb', 'wb')

'r'

Yields:

Type Description
Union[str, bytes]

File-like object

Raises:

Type Description
IOError

If file cannot be opened

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def open(self, file: str, mode: str = "r") -> Union[str, bytes]:
    """
    Context manager for file operations.

    Args:
        file: Path to the file
        mode: File mode ('r', 'w', 'rb', 'wb')

    Yields:
        File-like object

    Raises:
        IOError: If file cannot be opened
    """
    pass
read_file(path) abstractmethod

Read contents of a file from the data store.

Parameters:

Name Type Description Default
path str

Path to the file to read

required

Returns:

Type Description
Any

Contents of the file

Raises:

Type Description
IOError

If file cannot be read

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def read_file(self, path: str) -> Any:
    """
    Read contents of a file from the data store.

    Args:
        path: Path to the file to read

    Returns:
        Contents of the file

    Raises:
        IOError: If file cannot be read
    """
    pass
remove(path) abstractmethod

Remove a file.

Parameters:

Name Type Description Default
path str

Path to the file to remove

required

Raises:

Type Description
IOError

If file cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def remove(self, path: str) -> None:
    """
    Remove a file.

    Args:
        path: Path to the file to remove

    Raises:
        IOError: If file cannot be removed
    """
    pass
rmdir(dir) abstractmethod

Remove a directory and all its contents.

Parameters:

Name Type Description Default
dir str

Path to the directory to remove

required

Raises:

Type Description
IOError

If directory cannot be removed

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def rmdir(self, dir: str) -> None:
    """
    Remove a directory and all its contents.

    Args:
        dir: Path to the directory to remove

    Raises:
        IOError: If directory cannot be removed
    """
    pass
walk(top) abstractmethod

Walk through directory tree, similar to os.walk().

Parameters:

Name Type Description Default
top str

Starting directory for the walk

required

Returns:

Type Description
Generator

Generator yielding tuples of (dirpath, dirnames, filenames)

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def walk(self, top: str) -> Generator:
    """
    Walk through directory tree, similar to os.walk().

    Args:
        top: Starting directory for the walk

    Returns:
        Generator yielding tuples of (dirpath, dirnames, filenames)
    """
    pass
write_file(path, data) abstractmethod

Write data to a file in the data store.

Parameters:

Name Type Description Default
path str

Path where to write the file

required
data Any

Data to write to the file

required

Raises:

Type Description
IOError

If file cannot be written

Source code in gigaspatial/core/io/data_store.py
@abstractmethod
def write_file(self, path: str, data: Any) -> None:
    """
    Write data to a file in the data store.

    Args:
        path: Path where to write the file
        data: Data to write to the file

    Raises:
        IOError: If file cannot be written
    """
    pass

local_data_store

LocalDataStore

Bases: DataStore

Implementation for local filesystem storage.

Source code in gigaspatial/core/io/local_data_store.py
class LocalDataStore(DataStore):
    """Implementation for local filesystem storage."""

    def __init__(self, base_path: Union[str, Path] = ""):
        super().__init__()
        self.base_path = Path(base_path).resolve()

    def _resolve_path(self, path: str) -> Path:
        """Resolve path relative to base directory."""
        return self.base_path / path

    def read_file(self, path: str) -> bytes:
        full_path = self._resolve_path(path)
        with open(full_path, "rb") as f:
            return f.read()

    def write_file(self, path: str, data: Union[bytes, str]) -> None:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)

        if isinstance(data, str):
            mode = "w"
            encoding = "utf-8"
        else:
            mode = "wb"
            encoding = None

        with open(full_path, mode, encoding=encoding) as f:
            f.write(data)

    def file_exists(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def list_files(self, path: str) -> List[str]:
        full_path = self._resolve_path(path)
        return [
            str(f.relative_to(self.base_path))
            for f in full_path.iterdir()
            if f.is_file()
        ]

    def walk(self, top: str) -> Generator[Tuple[str, List[str], List[str]], None, None]:
        full_path = self._resolve_path(top)
        for root, dirs, files in os.walk(full_path):
            rel_root = str(Path(root).relative_to(self.base_path))
            yield rel_root, dirs, files

    def open(self, path: str, mode: str = "r") -> IO:
        full_path = self._resolve_path(path)
        self.mkdir(str(full_path.parent), exist_ok=True)
        return open(full_path, mode)

    def is_file(self, path: str) -> bool:
        return self._resolve_path(path).is_file()

    def is_dir(self, path: str) -> bool:
        return self._resolve_path(path).is_dir()

    def remove(self, path: str) -> None:
        full_path = self._resolve_path(path)
        if full_path.is_file():
            os.remove(full_path)

    def rmdir(self, directory: str) -> None:
        full_path = self._resolve_path(directory)
        if full_path.is_dir():
            os.rmdir(full_path)

    def mkdir(self, path: str, exist_ok: bool = False) -> None:
        full_path = self._resolve_path(path)
        full_path.mkdir(parents=True, exist_ok=exist_ok)

    def exists(self, path: str) -> bool:
        return self._resolve_path(path).exists()

readers

read_dataset(data_store, path, compression=None, **kwargs)

Read data from various file formats stored in both local and cloud-based storage.

Parameters:

data_store : DataStore Instance of DataStore for accessing data storage. path : str, Path Path to the file in data storage. **kwargs : dict Additional arguments passed to the specific reader function.

Returns:

pandas.DataFrame or geopandas.GeoDataFrame The data read from the file.

Raises:

FileNotFoundError If the file doesn't exist in blob storage. ValueError If the file type is unsupported or if there's an error reading the file.

Source code in gigaspatial/core/io/readers.py
def read_dataset(data_store: DataStore, path: str, compression: str = None, **kwargs):
    """
    Read data from various file formats stored in both local and cloud-based storage.

    Parameters:
    ----------
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    path : str, Path
        Path to the file in data storage.
    **kwargs : dict
        Additional arguments passed to the specific reader function.

    Returns:
    -------
    pandas.DataFrame or geopandas.GeoDataFrame
        The data read from the file.

    Raises:
    ------
    FileNotFoundError
        If the file doesn't exist in blob storage.
    ValueError
        If the file type is unsupported or if there's an error reading the file.
    """

    # Define supported file formats and their readers
    BINARY_FORMATS = {
        ".shp",
        ".zip",
        ".parquet",
        ".gpkg",
        ".xlsx",
        ".xls",
        ".kmz",
        ".gz",
    }

    PANDAS_READERS = {
        ".csv": pd.read_csv,
        ".xlsx": lambda f, **kw: pd.read_excel(f, engine="openpyxl", **kw),
        ".xls": lambda f, **kw: pd.read_excel(f, engine="xlrd", **kw),
        ".json": pd.read_json,
        # ".gz": lambda f, **kw: pd.read_csv(f, compression="gzip", **kw),
    }

    GEO_READERS = {
        ".shp": gpd.read_file,
        ".zip": gpd.read_file,
        ".geojson": gpd.read_file,
        ".gpkg": gpd.read_file,
        ".parquet": gpd.read_parquet,
        ".kmz": read_kmz,
    }

    COMPRESSION_FORMATS = {
        ".gz": "gzip",
        ".bz2": "bz2",
        ".zip": "zip",
        ".xz": "xz",
    }

    try:
        # Check if file exists
        if not data_store.file_exists(path):
            raise FileNotFoundError(f"File '{path}' not found in blob storage")

        path_obj = Path(path)
        suffixes = path_obj.suffixes
        file_extension = suffixes[-1].lower() if suffixes else ""

        if compression is None and file_extension in COMPRESSION_FORMATS:
            compression_format = COMPRESSION_FORMATS[file_extension]

            # if file has multiple extensions (e.g., .csv.gz), get the inner format
            if len(suffixes) > 1:
                inner_extension = suffixes[-2].lower()

                if inner_extension == ".tar":
                    raise ValueError(
                        "Tar archives (.tar.gz) are not directly supported"
                    )

                if inner_extension in PANDAS_READERS:
                    try:
                        with data_store.open(path, "rb") as f:
                            return PANDAS_READERS[inner_extension](
                                f, compression=compression_format, **kwargs
                            )
                    except Exception as e:
                        raise ValueError(f"Error reading compressed file: {str(e)}")
                elif inner_extension in GEO_READERS:
                    try:
                        with data_store.open(path, "rb") as f:
                            if compression_format == "gzip":
                                import gzip

                                decompressed_data = gzip.decompress(f.read())
                                import io

                                return GEO_READERS[inner_extension](
                                    io.BytesIO(decompressed_data), **kwargs
                                )
                            else:
                                raise ValueError(
                                    f"Compression format {compression_format} not supported for geo data"
                                )
                    except Exception as e:
                        raise ValueError(f"Error reading compressed geo file: {str(e)}")
            else:
                # if just .gz without clear inner type, assume csv
                try:
                    with data_store.open(path, "rb") as f:
                        return pd.read_csv(f, compression=compression_format, **kwargs)
                except Exception as e:
                    raise ValueError(
                        f"Error reading compressed file as CSV: {str(e)}. "
                        f"If not a CSV, specify the format in the filename (e.g., .json.gz)"
                    )

        # Special handling for compressed files
        if file_extension == ".zip":
            # For zip files, we need to use binary mode
            with data_store.open(path, "rb") as f:
                return gpd.read_file(f)

        # Determine if we need binary mode based on file type
        mode = "rb" if file_extension in BINARY_FORMATS else "r"

        # Try reading with appropriate reader
        if file_extension in PANDAS_READERS:
            try:
                with data_store.open(path, mode) as f:
                    return PANDAS_READERS[file_extension](f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error reading file with pandas: {str(e)}")

        if file_extension in GEO_READERS:
            try:
                with data_store.open(path, "rb") as f:
                    return GEO_READERS[file_extension](f, **kwargs)
            except Exception as e:
                # For parquet files, try pandas reader if geopandas fails
                if file_extension == ".parquet":
                    try:
                        with data_store.open(path, "rb") as f:
                            return pd.read_parquet(f, **kwargs)
                    except Exception as e2:
                        raise ValueError(
                            f"Failed to read parquet with both geopandas ({str(e)}) "
                            f"and pandas ({str(e2)})"
                        )
                raise ValueError(f"Error reading file with geopandas: {str(e)}")

        # If we get here, the file type is unsupported
        supported_formats = sorted(set(PANDAS_READERS.keys()) | set(GEO_READERS.keys()))
        supported_compressions = sorted(COMPRESSION_FORMATS.keys())
        raise ValueError(
            f"Unsupported file type: {file_extension}\n"
            f"Supported formats: {', '.join(supported_formats)}"
            f"Supported compressions: {', '.join(supported_compressions)}"
        )

    except Exception as e:
        if isinstance(e, (FileNotFoundError, ValueError)):
            raise
        raise RuntimeError(f"Unexpected error reading dataset: {str(e)}")
read_datasets(data_store, paths, **kwargs)

Read multiple datasets from data storage at once.

Parameters:

data_store : DataStore Instance of DataStore for accessing data storage. paths : list of str Paths to files in data storage. **kwargs : dict Additional arguments passed to read_dataset.

Returns:

dict Dictionary mapping paths to their corresponding DataFrames/GeoDataFrames.

Source code in gigaspatial/core/io/readers.py
def read_datasets(data_store: DataStore, paths, **kwargs):
    """
    Read multiple datasets from data storage at once.

    Parameters:
    ----------
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    paths : list of str
        Paths to files in data storage.
    **kwargs : dict
        Additional arguments passed to read_dataset.

    Returns:
    -------
    dict
        Dictionary mapping paths to their corresponding DataFrames/GeoDataFrames.
    """
    results = {}
    errors = {}

    for path in paths:
        try:
            results[path] = read_dataset(data_store, path, **kwargs)
        except Exception as e:
            errors[path] = str(e)

    if errors:
        error_msg = "\n".join(f"- {path}: {error}" for path, error in errors.items())
        raise ValueError(f"Errors reading datasets:\n{error_msg}")

    return results
read_gzipped_json_or_csv(file_path, data_store)

Reads a gzipped file, attempting to parse it as JSON (lines=True) or CSV.

Source code in gigaspatial/core/io/readers.py
def read_gzipped_json_or_csv(file_path, data_store):
    """Reads a gzipped file, attempting to parse it as JSON (lines=True) or CSV."""

    with data_store.open(file_path, "rb") as f:
        g = gzip.GzipFile(fileobj=f)
        text = g.read().decode("utf-8")
        try:
            df = pd.read_json(io.StringIO(text), lines=True)
            return df
        except json.JSONDecodeError:
            try:
                df = pd.read_csv(io.StringIO(text))
                return df
            except pd.errors.ParserError:
                print(f"Error: Could not parse {file_path} as JSON or CSV.")
                return None
read_kmz(file_obj, **kwargs)

Helper function to read KMZ files and return a GeoDataFrame.

Source code in gigaspatial/core/io/readers.py
def read_kmz(file_obj, **kwargs):
    """Helper function to read KMZ files and return a GeoDataFrame."""
    try:
        with zipfile.ZipFile(file_obj) as kmz:
            # Find the KML file in the archive (usually doc.kml)
            kml_filename = next(
                name for name in kmz.namelist() if name.endswith(".kml")
            )

            # Read the KML content
            kml_content = io.BytesIO(kmz.read(kml_filename))

            gdf = gpd.read_file(kml_content)

            # Validate the GeoDataFrame
            if gdf.empty:
                raise ValueError(
                    "The KML file is empty or does not contain valid geospatial data."
                )

        return gdf

    except zipfile.BadZipFile:
        raise ValueError("The provided file is not a valid KMZ file.")
    except StopIteration:
        raise ValueError("No KML file found in the KMZ archive.")
    except Exception as e:
        raise RuntimeError(f"An error occurred: {e}")

writers

write_dataset(data, data_store, path, **kwargs)

Write DataFrame or GeoDataFrame to various file formats in Azure Blob Storage.

Parameters:

data : pandas.DataFrame or geopandas.GeoDataFrame The data to write to blob storage. data_store : DataStore Instance of DataStore for accessing data storage. path : str Path where the file will be written in data storage. **kwargs : dict Additional arguments passed to the specific writer function.

Raises:

ValueError If the file type is unsupported or if there's an error writing the file. TypeError If input data is not a DataFrame or GeoDataFrame.

Source code in gigaspatial/core/io/writers.py
def write_dataset(data, data_store: DataStore, path, **kwargs):
    """
    Write DataFrame or GeoDataFrame to various file formats in Azure Blob Storage.

    Parameters:
    ----------
    data : pandas.DataFrame or geopandas.GeoDataFrame
        The data to write to blob storage.
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    path : str
        Path where the file will be written in data storage.
    **kwargs : dict
        Additional arguments passed to the specific writer function.

    Raises:
    ------
    ValueError
        If the file type is unsupported or if there's an error writing the file.
    TypeError
        If input data is not a DataFrame or GeoDataFrame.
    """

    # Define supported file formats and their writers
    BINARY_FORMATS = {".shp", ".zip", ".parquet", ".gpkg", ".xlsx", ".xls"}

    PANDAS_WRITERS = {
        ".csv": lambda df, buf, **kw: df.to_csv(buf, **kw),
        ".xlsx": lambda df, buf, **kw: df.to_excel(buf, engine="openpyxl", **kw),
        ".json": lambda df, buf, **kw: df.to_json(buf, **kw),
        ".parquet": lambda df, buf, **kw: df.to_parquet(buf, **kw),
    }

    GEO_WRITERS = {
        ".geojson": lambda gdf, buf, **kw: gdf.to_file(buf, driver="GeoJSON", **kw),
        ".gpkg": lambda gdf, buf, **kw: gdf.to_file(buf, driver="GPKG", **kw),
        ".parquet": lambda gdf, buf, **kw: gdf.to_parquet(buf, **kw),
    }

    try:
        # Input validation
        if not isinstance(data, (pd.DataFrame, gpd.GeoDataFrame)):
            raise TypeError("Input data must be a pandas DataFrame or GeoDataFrame")

        # Get file suffix and ensure it's lowercase
        suffix = Path(path).suffix.lower()

        # Determine if we need binary mode based on file type
        mode = "wb" if suffix in BINARY_FORMATS else "w"

        # Handle different data types and formats
        if isinstance(data, gpd.GeoDataFrame):
            if suffix not in GEO_WRITERS:
                supported_formats = sorted(GEO_WRITERS.keys())
                raise ValueError(
                    f"Unsupported file type for GeoDataFrame: {suffix}\n"
                    f"Supported formats: {', '.join(supported_formats)}"
                )

            try:
                with data_store.open(path, "wb") as f:
                    GEO_WRITERS[suffix](data, f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error writing GeoDataFrame: {str(e)}")

        else:  # pandas DataFrame
            if suffix not in PANDAS_WRITERS:
                supported_formats = sorted(PANDAS_WRITERS.keys())
                raise ValueError(
                    f"Unsupported file type for DataFrame: {suffix}\n"
                    f"Supported formats: {', '.join(supported_formats)}"
                )

            try:
                with data_store.open(path, mode) as f:
                    PANDAS_WRITERS[suffix](data, f, **kwargs)
            except Exception as e:
                raise ValueError(f"Error writing DataFrame: {str(e)}")

    except Exception as e:
        if isinstance(e, (TypeError, ValueError)):
            raise
        raise RuntimeError(f"Unexpected error writing dataset: {str(e)}")
write_datasets(data_dict, data_store, **kwargs)

Write multiple datasets to data storage at once.

Parameters:

data_dict : dict Dictionary mapping paths to DataFrames/GeoDataFrames. data_store : DataStore Instance of DataStore for accessing data storage. **kwargs : dict Additional arguments passed to write_dataset.

Raises:

ValueError If there are any errors writing the datasets.

Source code in gigaspatial/core/io/writers.py
def write_datasets(data_dict, data_store: DataStore, **kwargs):
    """
    Write multiple datasets to data storage at once.

    Parameters:
    ----------
    data_dict : dict
        Dictionary mapping paths to DataFrames/GeoDataFrames.
    data_store : DataStore
        Instance of DataStore for accessing data storage.
    **kwargs : dict
        Additional arguments passed to write_dataset.

    Raises:
    ------
    ValueError
        If there are any errors writing the datasets.
    """
    errors = {}

    for path, data in data_dict.items():
        try:
            write_dataset(data, data_store, path, **kwargs)
        except Exception as e:
            errors[path] = str(e)

    if errors:
        error_msg = "\n".join(f"- {path}: {error}" for path, error in errors.items())
        raise ValueError(f"Errors writing datasets:\n{error_msg}")

schemas

entity

BaseGigaEntity

Bases: BaseModel

Base class for all Giga entities with common fields.

Source code in gigaspatial/core/schemas/entity.py
class BaseGigaEntity(BaseModel):
    """Base class for all Giga entities with common fields."""

    source: Optional[str] = Field(None, max_length=100, description="Source reference")
    source_detail: Optional[str] = None

    @property
    def id(self) -> str:
        """Abstract property that must be implemented by subclasses."""
        raise NotImplementedError("Subclasses must implement id property")
id: str property

Abstract property that must be implemented by subclasses.

EntityTable

Bases: BaseModel, Generic[E]

Source code in gigaspatial/core/schemas/entity.py
class EntityTable(BaseModel, Generic[E]):
    entities: List[E] = Field(default_factory=list)
    _cached_kdtree: Optional[cKDTree] = PrivateAttr(
        default=None
    )  # Internal cache for the KDTree

    @classmethod
    def from_file(
        cls: Type["EntityTable"],
        file_path: Union[str, Path],
        entity_class: Type[E],
        data_store: Optional[DataStore] = None,
        **kwargs,
    ) -> "EntityTable":
        """
        Create an EntityTable instance from a file.

        Args:
            file_path: Path to the dataset file
            entity_class: The entity class for validation

        Returns:
            EntityTable instance

        Raises:
            ValidationError: If any row fails validation
            FileNotFoundError: If the file doesn't exist
        """
        data_store = data_store or LocalDataStore()
        file_path = Path(file_path)
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")

        df = read_dataset(data_store, file_path, **kwargs)
        try:
            entities = [entity_class(**row) for row in df.to_dict(orient="records")]
            return cls(entities=entities)
        except ValidationError as e:
            raise ValueError(f"Validation error in input data: {e}")
        except Exception as e:
            raise ValueError(f"Error reading or processing the file: {e}")

    def _check_has_location(self, method_name: str) -> bool:
        """Helper method to check if entities have location data."""
        if not self.entities:
            return False
        if not isinstance(self.entities[0], GigaEntity):
            raise ValueError(
                f"Cannot perform {method_name}: entities of type {type(self.entities[0]).__name__} "
                "do not have location data (latitude/longitude)"
            )
        return True

    def to_dataframe(self) -> pd.DataFrame:
        """Convert the entity table to a pandas DataFrame."""
        return pd.DataFrame([e.model_dump() for e in self.entities])

    def to_geodataframe(self) -> gpd.GeoDataFrame:
        """Convert the entity table to a GeoDataFrame."""
        if not self._check_has_location("to_geodataframe"):
            raise ValueError("Cannot create GeoDataFrame: no entities available")
        df = self.to_dataframe()
        return gpd.GeoDataFrame(
            df,
            geometry=gpd.points_from_xy(df["longitude"], df["latitude"]),
            crs="EPSG:4326",
        )

    def to_coordinate_vector(self) -> np.ndarray:
        """Transforms the entity table into a numpy vector of coordinates"""
        if not self.entities:
            return np.zeros((0, 2))

        if not self._check_has_location("to_coordinate_vector"):
            return np.zeros((0, 2))

        return np.array([[e.latitude, e.longitude] for e in self.entities])

    def get_lat_array(self) -> np.ndarray:
        """Get an array of latitude values."""
        if not self._check_has_location("get_lat_array"):
            return np.array([])
        return np.array([e.latitude for e in self.entities])

    def get_lon_array(self) -> np.ndarray:
        """Get an array of longitude values."""
        if not self._check_has_location("get_lon_array"):
            return np.array([])
        return np.array([e.longitude for e in self.entities])

    def filter_by_admin1(self, admin1_id_giga: str) -> "EntityTable[E]":
        """Filter entities by primary administrative division."""
        return self.__class__(
            entities=[e for e in self.entities if e.admin1_id_giga == admin1_id_giga]
        )

    def filter_by_admin2(self, admin2_id_giga: str) -> "EntityTable[E]":
        """Filter entities by secondary administrative division."""
        return self.__class__(
            entities=[e for e in self.entities if e.admin2_id_giga == admin2_id_giga]
        )

    def filter_by_polygon(self, polygon: Polygon) -> "EntityTable[E]":
        """Filter entities within a polygon"""
        if not self._check_has_location("filter_by_polygon"):
            return self.__class__(entities=[])

        filtered = [
            e for e in self.entities if polygon.contains(Point(e.longitude, e.latitude))
        ]
        return self.__class__(entities=filtered)

    def filter_by_bounds(
        self, min_lat: float, max_lat: float, min_lon: float, max_lon: float
    ) -> "EntityTable[E]":
        """Filter entities whose coordinates fall within the given bounds."""
        if not self._check_has_location("filter_by_bounds"):
            return self.__class__(entities=[])

        filtered = [
            e
            for e in self.entities
            if min_lat <= e.latitude <= max_lat and min_lon <= e.longitude <= max_lon
        ]
        return self.__class__(entities=filtered)

    def get_nearest_neighbors(
        self, lat: float, lon: float, k: int = 5
    ) -> "EntityTable[E]":
        """Find k nearest neighbors to a point using a cached KDTree."""
        if not self._check_has_location("get_nearest_neighbors"):
            return self.__class__(entities=[])

        if not self._cached_kdtree:
            self._build_kdtree()  # Build the KDTree if not already cached

        if not self._cached_kdtree:  # If still None after building
            return self.__class__(entities=[])

        _, indices = self._cached_kdtree.query([[lat, lon]], k=k)
        return self.__class__(entities=[self.entities[i] for i in indices[0]])

    def _build_kdtree(self):
        """Builds and caches the KDTree."""
        if not self._check_has_location("_build_kdtree"):
            self._cached_kdtree = None
            return
        coords = self.to_coordinate_vector()
        if coords:
            self._cached_kdtree = cKDTree(coords)

    def clear_cache(self):
        """Clears the KDTree cache."""
        self._cached_kdtree = None

    def to_file(
        self,
        file_path: Union[str, Path],
        data_store: Optional[DataStore] = None,
        **kwargs,
    ) -> None:
        """
        Save the entity data to a file.

        Args:
            file_path: Path to save the file
        """
        if not self.entities:
            raise ValueError("Cannot write to a file: no entities available.")

        data_store = data_store or LocalDataStore()

        write_dataset(self.to_dataframe(), data_store, file_path, **kwargs)

    def __len__(self) -> int:
        return len(self.entities)

    def __iter__(self):
        return iter(self.entities)
clear_cache()

Clears the KDTree cache.

Source code in gigaspatial/core/schemas/entity.py
def clear_cache(self):
    """Clears the KDTree cache."""
    self._cached_kdtree = None
filter_by_admin1(admin1_id_giga)

Filter entities by primary administrative division.

Source code in gigaspatial/core/schemas/entity.py
def filter_by_admin1(self, admin1_id_giga: str) -> "EntityTable[E]":
    """Filter entities by primary administrative division."""
    return self.__class__(
        entities=[e for e in self.entities if e.admin1_id_giga == admin1_id_giga]
    )
filter_by_admin2(admin2_id_giga)

Filter entities by secondary administrative division.

Source code in gigaspatial/core/schemas/entity.py
def filter_by_admin2(self, admin2_id_giga: str) -> "EntityTable[E]":
    """Filter entities by secondary administrative division."""
    return self.__class__(
        entities=[e for e in self.entities if e.admin2_id_giga == admin2_id_giga]
    )
filter_by_bounds(min_lat, max_lat, min_lon, max_lon)

Filter entities whose coordinates fall within the given bounds.

Source code in gigaspatial/core/schemas/entity.py
def filter_by_bounds(
    self, min_lat: float, max_lat: float, min_lon: float, max_lon: float
) -> "EntityTable[E]":
    """Filter entities whose coordinates fall within the given bounds."""
    if not self._check_has_location("filter_by_bounds"):
        return self.__class__(entities=[])

    filtered = [
        e
        for e in self.entities
        if min_lat <= e.latitude <= max_lat and min_lon <= e.longitude <= max_lon
    ]
    return self.__class__(entities=filtered)
filter_by_polygon(polygon)

Filter entities within a polygon

Source code in gigaspatial/core/schemas/entity.py
def filter_by_polygon(self, polygon: Polygon) -> "EntityTable[E]":
    """Filter entities within a polygon"""
    if not self._check_has_location("filter_by_polygon"):
        return self.__class__(entities=[])

    filtered = [
        e for e in self.entities if polygon.contains(Point(e.longitude, e.latitude))
    ]
    return self.__class__(entities=filtered)
from_file(file_path, entity_class, data_store=None, **kwargs) classmethod

Create an EntityTable instance from a file.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path to the dataset file

required
entity_class Type[E]

The entity class for validation

required

Returns:

Type Description
EntityTable

EntityTable instance

Raises:

Type Description
ValidationError

If any row fails validation

FileNotFoundError

If the file doesn't exist

Source code in gigaspatial/core/schemas/entity.py
@classmethod
def from_file(
    cls: Type["EntityTable"],
    file_path: Union[str, Path],
    entity_class: Type[E],
    data_store: Optional[DataStore] = None,
    **kwargs,
) -> "EntityTable":
    """
    Create an EntityTable instance from a file.

    Args:
        file_path: Path to the dataset file
        entity_class: The entity class for validation

    Returns:
        EntityTable instance

    Raises:
        ValidationError: If any row fails validation
        FileNotFoundError: If the file doesn't exist
    """
    data_store = data_store or LocalDataStore()
    file_path = Path(file_path)
    if not file_path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")

    df = read_dataset(data_store, file_path, **kwargs)
    try:
        entities = [entity_class(**row) for row in df.to_dict(orient="records")]
        return cls(entities=entities)
    except ValidationError as e:
        raise ValueError(f"Validation error in input data: {e}")
    except Exception as e:
        raise ValueError(f"Error reading or processing the file: {e}")
get_lat_array()

Get an array of latitude values.

Source code in gigaspatial/core/schemas/entity.py
def get_lat_array(self) -> np.ndarray:
    """Get an array of latitude values."""
    if not self._check_has_location("get_lat_array"):
        return np.array([])
    return np.array([e.latitude for e in self.entities])
get_lon_array()

Get an array of longitude values.

Source code in gigaspatial/core/schemas/entity.py
def get_lon_array(self) -> np.ndarray:
    """Get an array of longitude values."""
    if not self._check_has_location("get_lon_array"):
        return np.array([])
    return np.array([e.longitude for e in self.entities])
get_nearest_neighbors(lat, lon, k=5)

Find k nearest neighbors to a point using a cached KDTree.

Source code in gigaspatial/core/schemas/entity.py
def get_nearest_neighbors(
    self, lat: float, lon: float, k: int = 5
) -> "EntityTable[E]":
    """Find k nearest neighbors to a point using a cached KDTree."""
    if not self._check_has_location("get_nearest_neighbors"):
        return self.__class__(entities=[])

    if not self._cached_kdtree:
        self._build_kdtree()  # Build the KDTree if not already cached

    if not self._cached_kdtree:  # If still None after building
        return self.__class__(entities=[])

    _, indices = self._cached_kdtree.query([[lat, lon]], k=k)
    return self.__class__(entities=[self.entities[i] for i in indices[0]])
to_coordinate_vector()

Transforms the entity table into a numpy vector of coordinates

Source code in gigaspatial/core/schemas/entity.py
def to_coordinate_vector(self) -> np.ndarray:
    """Transforms the entity table into a numpy vector of coordinates"""
    if not self.entities:
        return np.zeros((0, 2))

    if not self._check_has_location("to_coordinate_vector"):
        return np.zeros((0, 2))

    return np.array([[e.latitude, e.longitude] for e in self.entities])
to_dataframe()

Convert the entity table to a pandas DataFrame.

Source code in gigaspatial/core/schemas/entity.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert the entity table to a pandas DataFrame."""
    return pd.DataFrame([e.model_dump() for e in self.entities])
to_file(file_path, data_store=None, **kwargs)

Save the entity data to a file.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path to save the file

required
Source code in gigaspatial/core/schemas/entity.py
def to_file(
    self,
    file_path: Union[str, Path],
    data_store: Optional[DataStore] = None,
    **kwargs,
) -> None:
    """
    Save the entity data to a file.

    Args:
        file_path: Path to save the file
    """
    if not self.entities:
        raise ValueError("Cannot write to a file: no entities available.")

    data_store = data_store or LocalDataStore()

    write_dataset(self.to_dataframe(), data_store, file_path, **kwargs)
to_geodataframe()

Convert the entity table to a GeoDataFrame.

Source code in gigaspatial/core/schemas/entity.py
def to_geodataframe(self) -> gpd.GeoDataFrame:
    """Convert the entity table to a GeoDataFrame."""
    if not self._check_has_location("to_geodataframe"):
        raise ValueError("Cannot create GeoDataFrame: no entities available")
    df = self.to_dataframe()
    return gpd.GeoDataFrame(
        df,
        geometry=gpd.points_from_xy(df["longitude"], df["latitude"]),
        crs="EPSG:4326",
    )
GigaEntity

Bases: BaseGigaEntity

Entity with location data.

Source code in gigaspatial/core/schemas/entity.py
class GigaEntity(BaseGigaEntity):
    """Entity with location data."""

    latitude: float = Field(
        ..., ge=-90, le=90, description="Latitude coordinate of the entity"
    )
    longitude: float = Field(
        ..., ge=-180, le=180, description="Longitude coordinate of the entity"
    )
    admin1: Optional[str] = Field(
        "Unknown", max_length=100, description="Primary administrative division"
    )
    admin1_id_giga: Optional[str] = Field(
        None,
        max_length=50,
        description="Unique identifier for the primary administrative division",
    )
    admin2: Optional[str] = Field(
        "Unknown", max_length=100, description="Secondary administrative division"
    )
    admin2_id_giga: Optional[str] = Field(
        None,
        max_length=50,
        description="Unique identifier for the secondary administrative division",
    )
GigaEntityNoLocation

Bases: BaseGigaEntity

Entity without location data.

Source code in gigaspatial/core/schemas/entity.py
class GigaEntityNoLocation(BaseGigaEntity):
    """Entity without location data."""

    pass