|
""" |
|
MIT License |
|
|
|
Copyright (c) 2023 hysts |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy |
|
of this software and associated documentation files (the "Software"), to deal |
|
in the Software without restriction, including without limitation the rights |
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
copies of the Software, and to permit persons to whom the Software is |
|
furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all |
|
copies or substantial portions of the Software. |
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|
SOFTWARE. |
|
""" |
|
|
|
import json |
|
import tempfile |
|
import uuid |
|
from pathlib import Path |
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
import pyarrow as pa |
|
import pyarrow.parquet as pq |
|
from huggingface_hub import CommitScheduler, HfApi |
|
|
|
|
|
class ParquetScheduler(CommitScheduler): |
|
""" |
|
Usage: configure the scheduler with a repo id. Once started, you can add data to be uploaded to the Hub. 1 `.append` |
|
call will result in 1 row in your final dataset. |
|
|
|
```py |
|
# Start scheduler |
|
>>> scheduler = ParquetScheduler(repo_id="my-parquet-dataset") |
|
|
|
# Append some data to be uploaded |
|
>>> scheduler.append({...}) |
|
>>> scheduler.append({...}) |
|
>>> scheduler.append({...}) |
|
``` |
|
|
|
The scheduler will automatically infer the schema from the data it pushes. |
|
Optionally, you can manually set the schema yourself: |
|
|
|
```py |
|
>>> scheduler = ParquetScheduler( |
|
... repo_id="my-parquet-dataset", |
|
... schema={ |
|
... "prompt": {"_type": "Value", "dtype": "string"}, |
|
... "negative_prompt": {"_type": "Value", "dtype": "string"}, |
|
... "guidance_scale": {"_type": "Value", "dtype": "int64"}, |
|
... "image": {"_type": "Image"}, |
|
... }, |
|
... ) |
|
|
|
See https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value for the list of |
|
possible values. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*, |
|
repo_id: str, |
|
schema: Optional[Dict[str, Dict[str, str]]] = None, |
|
every: Union[int, float] = 5, |
|
revision: Optional[str] = None, |
|
private: bool = False, |
|
token: Optional[str] = None, |
|
allow_patterns: Union[List[str], str, None] = None, |
|
ignore_patterns: Union[List[str], str, None] = None, |
|
hf_api: Optional[HfApi] = None, |
|
) -> None: |
|
super().__init__( |
|
repo_id=repo_id, |
|
folder_path=tempfile.tempdir, |
|
every=every, |
|
repo_type="dataset", |
|
revision=revision, |
|
private=private, |
|
token=token, |
|
allow_patterns=allow_patterns, |
|
ignore_patterns=ignore_patterns, |
|
hf_api=hf_api, |
|
) |
|
|
|
self._rows: List[Dict[str, Any]] = [] |
|
self._schema = schema |
|
|
|
def append(self, row: Dict[str, Any]) -> None: |
|
"""Add a new item to be uploaded.""" |
|
with self.lock: |
|
self._rows.append(row) |
|
|
|
def push_to_hub(self): |
|
|
|
with self.lock: |
|
rows = self._rows |
|
self._rows = [] |
|
if not rows: |
|
return |
|
print(f"Got {len(rows)} item(s) to commit.") |
|
|
|
|
|
schema: Dict[str, Dict] = self._schema or {} |
|
path_to_cleanup: List[Path] = [] |
|
for row in rows: |
|
for key, value in row.items(): |
|
|
|
if key not in schema: |
|
schema[key] = _infer_schema(key, value) |
|
|
|
|
|
if schema[key]["_type"] in ("Image", "Audio"): |
|
if isinstance(value, bytes): |
|
row[key] = { |
|
"path": "", |
|
"bytes": value, |
|
} |
|
else: |
|
|
|
file_path = Path(value) |
|
if file_path.is_file(): |
|
row[key] = { |
|
"path": file_path.name, |
|
"bytes": file_path.read_bytes(), |
|
} |
|
path_to_cleanup.append(file_path) |
|
|
|
|
|
for row in rows: |
|
for feature in schema: |
|
if feature not in row: |
|
row[feature] = None |
|
|
|
|
|
table = pa.Table.from_pylist(rows) |
|
|
|
|
|
table = table.replace_schema_metadata( |
|
{"huggingface": json.dumps({"info": {"features": schema}})} |
|
) |
|
|
|
|
|
archive_file = tempfile.NamedTemporaryFile() |
|
pq.write_table(table, archive_file.name) |
|
|
|
|
|
self.api.upload_file( |
|
repo_id=self.repo_id, |
|
repo_type=self.repo_type, |
|
revision=self.revision, |
|
path_in_repo=f"{uuid.uuid4()}.parquet", |
|
path_or_fileobj=archive_file.name, |
|
) |
|
print("Commit completed.") |
|
|
|
|
|
archive_file.close() |
|
for path in path_to_cleanup: |
|
path.unlink(missing_ok=True) |
|
|
|
|
|
def _infer_schema(key: str, value: Any) -> Dict[str, str]: |
|
"""Infer schema for the `datasets` library. |
|
|
|
See |
|
https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value. |
|
""" |
|
if "image" in key: |
|
return {"_type": "Image"} |
|
if "audio" in key: |
|
return {"_type": "Audio"} |
|
if isinstance(value, int): |
|
return {"_type": "Value", "dtype": "int64"} |
|
if isinstance(value, float): |
|
return {"_type": "Value", "dtype": "float64"} |
|
if isinstance(value, bool): |
|
return {"_type": "Value", "dtype": "bool"} |
|
if isinstance(value, bytes): |
|
return {"_type": "Value", "dtype": "binary"} |
|
|
|
return {"_type": "Value", "dtype": "string"} |
|
|