Swarms / swarms /structs /base_structure.py
harshalmore31's picture
Synced repo using 'sync_with_huggingface' Github Action
d8d14f1 verified
import toml
import yaml
import asyncio
import concurrent.futures
import json
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Any, Dict, List, Optional, Callable
import psutil
try:
import gzip
except ImportError as error:
print(f"Error importing gzip: {error}")
# from pydantic import BaseModel
class BaseStructure:
"""Base structure.
Attributes:
name (Optional[str]): _description_
description (Optional[str]): _description_
save_metadata (bool): _description_
save_artifact_path (Optional[str]): _description_
save_metadata_path (Optional[str]): _description_
save_error_path (Optional[str]): _description_
Methods:
run: _description_
save_to_file: _description_
load_from_file: _description_
save_metadata: _description_
load_metadata: _description_
log_error: _description_
save_artifact: _description_
load_artifact: _description_
log_event: _description_
run_async: _description_
save_metadata_async: _description_
load_metadata_async: _description_
log_error_async: _description_
save_artifact_async: _description_
load_artifact_async: _description_
log_event_async: _description_
asave_to_file: _description_
aload_from_file: _description_
run_in_thread: _description_
save_metadata_in_thread: _description_
run_concurrent: _description_
compress_data: _description_
decompres_data: _description_
run_batched: _description_
load_config: _description_
backup_data: _description_
monitor_resources: _description_
run_with_resources: _description_
run_with_resources_batched: _description_
Examples:
>>> base_structure = BaseStructure()
>>> base_structure
BaseStructure(name=None, description=None, save_metadata=True, save_artifact_path='./artifacts', save_metadata_path='./metadata', save_error_path='./errors')
"""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
save_metadata_on: bool = True,
save_artifact_path: Optional[str] = "./artifacts",
save_metadata_path: Optional[str] = "./metadata",
save_error_path: Optional[str] = "./errors",
workspace_dir: Optional[str] = "./workspace",
):
super().__init__()
self.name = name
self.description = description
self.save_metadata_on = save_metadata_on
self.save_artifact_path = save_artifact_path
self.save_metadata_path = save_metadata_path
self.save_error_path = save_error_path
self.workspace_dir = workspace_dir
def run(self, *args, **kwargs):
"""Run the structure."""
def save_to_file(self, data: Any, file_path: str):
"""Save data to file.
Args:
data (Any): _description_
file_path (str): _description_
"""
with open(file_path, "w") as file:
json.dump(data, file)
def load_from_file(self, file_path: str) -> Any:
"""Load data from file.
Args:
file_path (str): _description_
Returns:
Any: _description_
"""
with open(file_path) as file:
return json.load(file)
def save_metadata(self, metadata: Dict[str, Any]):
"""Save metadata to file.
Args:
metadata (Dict[str, Any]): _description_
"""
if self.save_metadata:
file_path = os.path.join(
self.save_metadata_path, f"{self.name}_metadata.json"
)
self.save_to_file(metadata, file_path)
def load_metadata(self) -> Dict[str, Any]:
"""Load metadata from file.
Returns:
Dict[str, Any]: _description_
"""
file_path = os.path.join(
self.save_metadata_path, f"{self.name}_metadata.json"
)
return self.load_from_file(file_path)
def log_error(self, error_message: str):
"""Log error to file.
Args:
error_message (str): _description_
"""
file_path = os.path.join(
self.save_error_path, f"{self.name}_errors.log"
)
with open(file_path, "a") as file:
file.write(f"{error_message}\n")
def save_artifact(self, artifact: Any, artifact_name: str):
"""Save artifact to file.
Args:
artifact (Any): _description_
artifact_name (str): _description_
"""
file_path = os.path.join(
self.save_artifact_path, f"{artifact_name}.json"
)
self.save_to_file(artifact, file_path)
def load_artifact(self, artifact_name: str) -> Any:
"""Load artifact from file.
Args:
artifact_name (str): _description_
Returns:
Any: _description_
"""
file_path = os.path.join(
self.save_artifact_path, f"{artifact_name}.json"
)
return self.load_from_file(file_path)
def _current_timestamp(self):
"""Current timestamp.
Returns:
_type_: _description_
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def log_event(
self,
event: str,
event_type: str = "INFO",
):
"""Log event to file.
Args:
event (str): _description_
event_type (str, optional): _description_. Defaults to "INFO".
"""
timestamp = self._current_timestamp()
log_message = f"[{timestamp}] [{event_type}] {event}\n"
file = os.path.join(
self.save_metadata_path, f"{self.name}_events.log"
)
with open(file, "a") as file:
file.write(log_message)
async def run_async(self, *args, **kwargs):
"""Run the structure asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.run, *args, **kwargs
)
async def save_metadata_async(self, metadata: Dict[str, Any]):
"""Save metadata to file asynchronously.
Args:
metadata (Dict[str, Any]): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.save_metadata, metadata
)
async def load_metadata_async(self) -> Dict[str, Any]:
"""Load metadata from file asynchronously.
Returns:
Dict[str, Any]: _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.load_metadata)
async def log_error_async(self, error_message: str):
"""Log error to file asynchronously.
Args:
error_message (str): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.log_error, error_message
)
async def save_artifact_async(
self, artifact: Any, artifact_name: str
):
"""Save artifact to file asynchronously.
Args:
artifact (Any): _description_
artifact_name (str): _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.save_artifact, artifact, artifact_name
)
async def load_artifact_async(self, artifact_name: str) -> Any:
"""Load artifact from file asynchronously.
Args:
artifact_name (str): _description_
Returns:
Any: _description_
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.load_artifact, artifact_name
)
async def log_event_async(
self,
event: str,
event_type: str = "INFO",
):
"""Log event to file asynchronously.
Args:
event (str): _description_
event_type (str, optional): _description_. Defaults to "INFO".
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self.log_event, event, event_type
)
async def asave_to_file(
self, data: Any, file: str, *args, **kwargs
):
"""Save data to file asynchronously.
Args:
data (Any): _description_
file (str): _description_
"""
await asyncio.to_thread(
self.save_to_file,
data,
file,
*args,
)
async def aload_from_file(
self,
file: str,
) -> Any:
"""Async load data from file.
Args:
file (str): _description_
Returns:
Any: _description_
"""
return await asyncio.to_thread(self.load_from_file, file)
def run_in_thread(self, *args, **kwargs):
"""Run the structure in a thread."""
with concurrent.futures.ThreadPoolExecutor() as executor:
return executor.submit(self.run, *args, **kwargs)
def save_metadata_in_thread(self, metadata: Dict[str, Any]):
"""Save metadata to file in a thread.
Args:
metadata (Dict[str, Any]): _description_
"""
with concurrent.futures.ThreadPoolExecutor() as executor:
return executor.submit(self.save_metadata, metadata)
def run_concurrent(self, *args, **kwargs):
"""Run the structure concurrently."""
return asyncio.run(self.run_async(*args, **kwargs))
def compress_data(
self,
data: Any,
) -> bytes:
"""Compress data.
Args:
data (Any): _description_
Returns:
bytes: _description_
"""
return gzip.compress(json.dumps(data).encode())
def decompres_data(self, data: bytes) -> Any:
"""Decompress data.
Args:
data (bytes): _description_
Returns:
Any: _description_
"""
return json.loads(gzip.decompress(data).decode())
def run_batched(
self,
batched_data: List[Any],
batch_size: int = 10,
*args,
**kwargs,
):
"""Run batched data.
Args:
batched_data (List[Any]): _description_
batch_size (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
with ThreadPoolExecutor(max_workers=batch_size) as executor:
futures = [
executor.submit(self.run, data)
for data in batched_data
]
return [future.result() for future in futures]
def load_config(
self, config: str = None, *args, **kwargs
) -> Dict[str, Any]:
"""Load config from file.
Args:
config (str, optional): _description_. Defaults to None.
Returns:
Dict[str, Any]: _description_
"""
return self.load_from_file(config)
def backup_data(
self, data: Any, backup_path: str = None, *args, **kwargs
):
"""Backup data to file.
Args:
data (Any): _description_
backup_path (str, optional): _description_. Defaults to None.
"""
timestamp = self._current_timestamp()
backup_file_path = f"{backup_path}/{timestamp}.json"
self.save_to_file(data, backup_file_path)
def monitor_resources(self):
"""Monitor resource usage."""
memory = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=1)
self.log_event(
f"Resource usage - Memory: {memory}%, CPU: {cpu_usage}%"
)
def run_with_resources(self, *args, **kwargs):
"""Run the structure with resource monitoring."""
self.monitor_resources()
return self.run(*args, **kwargs)
def run_with_resources_batched(
self,
batched_data: List[Any],
batch_size: int = 10,
*args,
**kwargs,
):
"""Run batched data with resource monitoring.
Args:
batched_data (List[Any]): _description_
batch_size (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
self.monitor_resources()
return self.run_batched(
batched_data, batch_size, *args, **kwargs
)
def _serialize_callable(
self, attr_value: Callable
) -> Dict[str, Any]:
"""
Serializes callable attributes by extracting their name and docstring.
Args:
attr_value (Callable): The callable to serialize.
Returns:
Dict[str, Any]: Dictionary with name and docstring of the callable.
"""
return {
"name": getattr(
attr_value, "__name__", type(attr_value).__name__
),
"doc": getattr(attr_value, "__doc__", None),
}
def _serialize_attr(self, attr_name: str, attr_value: Any) -> Any:
"""
Serializes an individual attribute, handling non-serializable objects.
Args:
attr_name (str): The name of the attribute.
attr_value (Any): The value of the attribute.
Returns:
Any: The serialized value of the attribute.
"""
try:
if callable(attr_value):
return self._serialize_callable(attr_value)
elif hasattr(attr_value, "to_dict"):
return (
attr_value.to_dict()
) # Recursive serialization for nested objects
else:
json.dumps(
attr_value
) # Attempt to serialize to catch non-serializable objects
return attr_value
except (TypeError, ValueError):
return f"<Non-serializable: {type(attr_value).__name__}>"
def to_dict(self) -> Dict[str, Any]:
"""
Converts all attributes of the class, including callables, into a dictionary.
Handles non-serializable attributes by converting them or skipping them.
Returns:
Dict[str, Any]: A dictionary representation of the class attributes.
"""
return {
attr_name: self._serialize_attr(attr_name, attr_value)
for attr_name, attr_value in self.__dict__.items()
}
def to_json(self, indent: int = 4, *args, **kwargs):
return json.dumps(
self.to_dict(), indent=indent, *args, **kwargs
)
def to_yaml(self, indent: int = 4, *args, **kwargs):
return yaml.dump(
self.to_dict(), indent=indent, *args, **kwargs
)
def to_toml(self, *args, **kwargs):
return toml.dumps(self.to_dict(), *args, **kwargs)
# def model_dump_json(self):
# logger.info(
# f"Saving {self.agent_name} model to JSON in the {self.workspace_dir} directory"
# )
# create_file_in_folder(
# self.workspace_dir,
# f"{self.agent_name}.json",
# str(self.to_json()),
# )
# return (
# f"Model saved to {self.workspace_dir}/{self.agent_name}.json"
# )
# def model_dump_yaml(self):
# logger.info(
# f"Saving {self.agent_name} model to YAML in the {self.workspace_dir} directory"
# )
# create_file_in_folder(
# self.workspace_dir,
# f"{self.agent_name}.yaml",
# self.to_yaml(),
# )
# return (
# f"Model saved to {self.workspace_dir}/{self.agent_name}.yaml"
# )