from datetime import datetime from enum import Enum from typing import Dict, Any, Type from bson import ObjectId from pydantic import GetCoreSchemaHandler, BaseModel, Field, AnyUrl from pydantic.json_schema import JsonSchemaValue from pydantic_core import core_schema class PyObjectId: @classmethod def __get_pydantic_core_schema__( cls, source: type, handler: GetCoreSchemaHandler ) -> core_schema.CoreSchema: return core_schema.with_info_after_validator_function( cls.validate, core_schema.str_schema() ) @classmethod def __get_pydantic_json_schema__( cls, schema: core_schema.CoreSchema, handler: GetCoreSchemaHandler ) -> JsonSchemaValue: return {"type": "string"} @classmethod def validate(cls, value: str) -> ObjectId: if not ObjectId.is_valid(value): raise ValueError(f"Invalid ObjectId: {value}") return ObjectId(value) def __getattr__(self, item): return getattr(self.__dict__['value'], item) def __init__(self, value: str = None): if value is None: self.value = ObjectId() else: self.value = self.validate(value) def __str__(self): return str(self.value) class MongoBaseModel(BaseModel): id: str = Field(default_factory=lambda: str(PyObjectId())) class Config: arbitrary_types_allowed = True def to_mongo(self) -> Dict[str, Any]: def model_to_dict(model: BaseModel) -> Dict[str, Any]: doc = {} for name, value in model._iter(): key = model.__fields__[name].alias or name if isinstance(value, BaseModel): doc[key] = model_to_dict(value) elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value): doc[key] = [model_to_dict(item) for item in value] elif value and isinstance(value, Enum): doc[key] = value.value elif isinstance(value, datetime): doc[key] = value.isoformat() elif value and isinstance(value, AnyUrl): doc[key] = str(value) else: doc[key] = value return doc result = model_to_dict(self) return result @classmethod def from_mongo(cls, data: Dict[str, Any]): def restore_enums(inst: Any, model_cls: Type[BaseModel]) -> None: for name, field in model_cls.__fields__.items(): value = getattr(inst, name) if field and isinstance(field.annotation, type) and issubclass(field.annotation, Enum): setattr(inst, name, field.annotation(value)) elif isinstance(value, BaseModel): restore_enums(value, value.__class__) elif isinstance(value, list): for i, item in enumerate(value): if isinstance(item, BaseModel): restore_enums(item, item.__class__) elif isinstance(field.annotation, type) and issubclass(field.annotation, Enum): value[i] = field.annotation(item) elif isinstance(value, dict): for k, v in value.items(): if isinstance(v, BaseModel): restore_enums(v, v.__class__) elif isinstance(field.annotation, type) and issubclass(field.annotation, Enum): value[k] = field.annotation(v) if data is None: return None instance = cls(**data) restore_enums(instance, instance.__class__) return instance