#
# MIT License
#
# Copyright (c) 2022 GT4SD team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
"""Module configuration."""
import logging
import os
from functools import lru_cache
from typing import Dict, Optional, Set
from pydantic_settings import BaseSettings, SettingsConfigDict
from .s3 import GT4SDS3Client, S3SyncError, sync_folder_with_s3, upload_file_to_s3
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
[docs]class GT4SDConfiguration(BaseSettings):
"""GT4SDConfiguration settings from environment variables.
Default configurations for gt4sd including a read-only COS for algorithms' artifacts.
Default configuration for gt4sd hub including a read-write COS for algorithms' artifacts uploaded by users.
"""
gt4sd_local_cache_path: str = os.path.join(os.path.expanduser("~"), ".gt4sd")
gt4sd_local_cache_path_algorithms: str = "algorithms"
gt4sd_local_cache_path_properties: str = "properties"
gt4sd_max_number_of_stuck_calls: int = 50
gt4sd_max_number_of_samples: int = 1000000
gt4sd_max_runtime: int = 86400
gt4sd_create_unverified_ssl_context: bool = False
gt4sd_disable_cudnn: bool = False
gt4sd_s3_host: str = "s3.par01.cloud-object-storage.appdomain.cloud"
gt4sd_s3_access_key: str = "6e9891531d724da89997575a65f4592e"
gt4sd_s3_secret_key: str = "5997d63c4002cc04e13c03dc0c2db9dae751293dab106ac5"
gt4sd_s3_secure: bool = True
gt4sd_s3_bucket_algorithms: str = "gt4sd-cos-algorithms-artifacts"
gt4sd_s3_bucket_properties: str = "gt4sd-cos-properties-artifacts"
gt4sd_s3_host_hub: str = "s3.par01.cloud-object-storage.appdomain.cloud"
gt4sd_s3_access_key_hub: str = "d9536662ebcf462f937efb9f58012830"
gt4sd_s3_secret_key_hub: str = "934d1f3afdaea55ac586f6c2f729ac2ba2694bb8e975ee0b"
gt4sd_s3_secure_hub: bool = True
gt4sd_s3_bucket_hub_algorithms: str = "gt4sd-cos-hub-algorithms-artifacts"
gt4sd_s3_bucket_hub_properties: str = "gt4sd-cos-hub-properties-artifacts"
model_config = SettingsConfigDict(frozen=True)
[docs] @staticmethod
@lru_cache(maxsize=None)
def get_instance() -> "GT4SDConfiguration":
return GT4SDConfiguration()
[docs]class GT4SDArtifactManagementConfiguration:
"""Artifact management configuration."""
gt4sd_s3_modules: Set[str] = {"algorithms", "properties"}
[docs] def __init__(self, gt4sd_configuration: GT4SDConfiguration) -> None:
"""Initialize the artifact management configuration from the base one.
Args:
gt4sd_configuration: GT4SD base configuration.
"""
self.local_cache_path: Dict[str, str] = {
"algorithms": gt4sd_configuration.gt4sd_local_cache_path_algorithms,
"properties": gt4sd_configuration.gt4sd_local_cache_path_properties,
}
self.s3_bucket: Dict[str, str] = {
"algorithms": gt4sd_configuration.gt4sd_s3_bucket_algorithms,
"properties": gt4sd_configuration.gt4sd_s3_bucket_properties,
}
self.s3_bucket_hub: Dict[str, str] = {
"algorithms": gt4sd_configuration.gt4sd_s3_bucket_hub_algorithms,
"properties": gt4sd_configuration.gt4sd_s3_bucket_hub_properties,
}
gt4sd_configuration_instance = GT4SDConfiguration.get_instance()
gt4sd_artifact_management_configuration = GT4SDArtifactManagementConfiguration(
gt4sd_configuration=gt4sd_configuration_instance
)
for key, val in gt4sd_artifact_management_configuration.local_cache_path.items():
logger.info(f"using as local cache path for {key}: {val}")
path = os.path.join(gt4sd_configuration_instance.gt4sd_local_cache_path, val)
try:
os.makedirs(path)
except FileExistsError:
logger.debug(f"local cache path for {key} already exists at {path}.")
[docs]def upload_to_s3(
target_filepath: str, source_filepath: str, module: str = "algorithms"
):
"""Upload an algorithm in source_filepath in target_filepath on a bucket in the model hub.
Args:
target_filepath: path to save the objects in s3.
source_filepath: path to the file to sync.
module: the submodule of gt4sd that acts as a root for the bucket, defaults
to `algorithms`.
"""
if module not in gt4sd_artifact_management_configuration.gt4sd_s3_modules:
raise ValueError(
f"Unknown cache module: {module}. Supported modules: "
f"{','.join(gt4sd_artifact_management_configuration.gt4sd_s3_modules)}"
)
try:
upload_file_to_s3(
host=gt4sd_configuration_instance.gt4sd_s3_host_hub,
access_key=gt4sd_configuration_instance.gt4sd_s3_access_key_hub,
secret_key=gt4sd_configuration_instance.gt4sd_s3_secret_key_hub,
bucket=gt4sd_artifact_management_configuration.s3_bucket_hub[module],
target_filepath=target_filepath,
source_filepath=source_filepath,
secure=gt4sd_configuration_instance.gt4sd_s3_secure_hub,
)
except S3SyncError:
logger.exception("error in syncing the cache with S3")
[docs]def sync_algorithm_with_s3(
prefix: Optional[str] = None, module: str = "algorithms"
) -> str:
"""Sync an algorithm in the local cache using environment variables.
Args:
prefix: the relative path in the bucket (both
on S3 and locally) to match files to download. Defaults to None.
module: the submodule of gt4sd that acts as a root for the bucket, defaults
to `algorithms`.
Returns:
str: local path using the prefix.
"""
if module not in gt4sd_artifact_management_configuration.gt4sd_s3_modules:
raise ValueError(
f"Unknown cache module: {module}. Supported modules: "
f"{','.join(gt4sd_artifact_management_configuration.gt4sd_s3_modules)}"
)
folder_path = os.path.join(
gt4sd_configuration_instance.gt4sd_local_cache_path,
gt4sd_artifact_management_configuration.local_cache_path[module],
)
try:
# sync with the public bucket
sync_folder_with_s3(
host=gt4sd_configuration_instance.gt4sd_s3_host,
access_key=gt4sd_configuration_instance.gt4sd_s3_access_key,
secret_key=gt4sd_configuration_instance.gt4sd_s3_secret_key,
bucket=gt4sd_artifact_management_configuration.s3_bucket[module],
folder_path=folder_path,
prefix=prefix,
secure=gt4sd_configuration_instance.gt4sd_s3_secure,
)
# sync with the public bucket hub
sync_folder_with_s3(
host=gt4sd_configuration_instance.gt4sd_s3_host_hub,
access_key=gt4sd_configuration_instance.gt4sd_s3_access_key_hub,
secret_key=gt4sd_configuration_instance.gt4sd_s3_secret_key_hub,
bucket=gt4sd_artifact_management_configuration.s3_bucket_hub[module],
folder_path=folder_path,
prefix=prefix,
secure=gt4sd_configuration_instance.gt4sd_s3_secure_hub,
)
except S3SyncError:
logger.exception("error in syncing the cache with S3")
return os.path.join(folder_path, prefix) if prefix is not None else folder_path
[docs]def get_cached_algorithm_path(
prefix: Optional[str] = None, module: str = "algorithms"
) -> str:
if module not in gt4sd_artifact_management_configuration.gt4sd_s3_modules:
raise ValueError(
f"Unknown cache module: {module}. Supported modules: "
f"{','.join(gt4sd_artifact_management_configuration.gt4sd_s3_modules)}."
)
return (
os.path.join(
gt4sd_configuration_instance.gt4sd_local_cache_path,
gt4sd_artifact_management_configuration.local_cache_path[module],
prefix,
)
if prefix is not None
else os.path.join(
gt4sd_configuration_instance.gt4sd_local_cache_path,
gt4sd_artifact_management_configuration.local_cache_path[module],
)
)
[docs]def get_algorithm_subdirectories_from_s3_coordinates(
host: str,
access_key: str,
secret_key: str,
bucket: str,
secure: bool = True,
prefix: Optional[str] = None,
) -> Set[str]:
"""Wrapper to initialize a client and list the directories in a bucket."""
client = GT4SDS3Client(
host=host, access_key=access_key, secret_key=secret_key, secure=secure
)
return client.list_directories(bucket=bucket, prefix=prefix)
[docs]def get_algorithm_subdirectories_with_s3(
prefix: Optional[str] = None, module: str = "algorithms"
) -> Set[str]:
"""Get algorithms in the s3 buckets.
Args:
prefix: the relative path in the bucket (both
on S3 and locally) to match files to download. Defaults to None.
module: the submodule of gt4sd that acts as a root for the bucket, defaults
to `algorithms`.
Returns:
Set: set of available algorithms on s3 with that prefix.
"""
if module not in gt4sd_artifact_management_configuration.gt4sd_s3_modules:
raise ValueError(
f"Unknown cache module: {module}. Supported modules: "
f"{','.join(gt4sd_artifact_management_configuration.gt4sd_s3_modules)}"
)
try:
# directories in the read-only public bucket
dirs = get_algorithm_subdirectories_from_s3_coordinates(
host=gt4sd_configuration_instance.gt4sd_s3_host,
access_key=gt4sd_configuration_instance.gt4sd_s3_access_key,
secret_key=gt4sd_configuration_instance.gt4sd_s3_secret_key,
bucket=gt4sd_artifact_management_configuration.s3_bucket[module],
secure=gt4sd_configuration_instance.gt4sd_s3_secure,
prefix=prefix,
)
# directories in the write public-hub bucket
dirs_hub = get_algorithm_subdirectories_from_s3_coordinates(
host=gt4sd_configuration_instance.gt4sd_s3_host_hub,
access_key=gt4sd_configuration_instance.gt4sd_s3_access_key_hub,
secret_key=gt4sd_configuration_instance.gt4sd_s3_secret_key_hub,
bucket=gt4sd_artifact_management_configuration.s3_bucket_hub[module],
secure=gt4sd_configuration_instance.gt4sd_s3_secure_hub,
prefix=prefix,
)
# set of directories in the public bucket and public hub bucket
versions = dirs.union(dirs_hub)
return versions
except Exception:
logger.exception("generic syncing error")
raise S3SyncError(
"CacheSyncingError",
f"error in getting directories of prefix={prefix}",
)
[docs]def get_algorithm_subdirectories_in_cache(
prefix: Optional[str] = None, module: str = "algorithms"
) -> Set[str]:
"""Get algorithm subdirectories from the cache.
Args:
prefix: prefix matching cache subdirectories. Defaults to None.
module: the submodule of gt4sd that acts as a root for the bucket, defaults
to `algorithms`.
Returns:
a set of subdirectories.
"""
path = get_cached_algorithm_path(prefix=prefix, module=module)
try:
_, dirs, _ = next(iter(os.walk(path)))
return set(dirs)
except StopIteration:
return set()
[docs]def reset_logging_root_logger():
"""Reset the root logger from logging library."""
root = logging.getLogger()
root.handlers = []
root.filters = []