File size: 3,479 Bytes
1b41672 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
import os
import subprocess
import sys
import argparse
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
RESET = "\033[0m"
def check_and_install_zipnn():
try:
import zipnn
except ImportError:
print("zipnn not found. Installing...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "zipnn"])
import zipnn
def decompress_file(input_file, delete=False, force=False, hf_cache=False):
import zipnn
if not input_file.endswith(".znn"):
raise ValueError("Input file does not have the '.znn' suffix")
if os.path.exists(input_file):
if delete and not hf_cache:
print(f"Deleting {input_file}...")
os.remove(input_file)
else:
decompressed_path = input_file[:-4]
if not force and os.path.exists(decompressed_path):
user_input = (
input(f"{decompressed_path} already exists; overwrite (y/n)? ").strip().lower()
)
if user_input not in ("yes", "y"):
print(f"Skipping {input_file}...")
return
print(f"Decompressing {input_file}...")
output_file = input_file[:-4]
zpn = zipnn.ZipNN(is_streaming=True)
with open(input_file, "rb") as infile, open(output_file, "wb") as outfile:
d_data = b""
chunk = infile.read()
d_data += zpn.decompress(chunk)
outfile.write(d_data)
print(f"Decompressed {input_file} to {output_file}")
if hf_cache:
# If the file is in the Hugging Face cache, fix the symlinks
print(f"{YELLOW}Reorganizing Hugging Face cache...{RESET}")
try:
snapshot_path = os.path.dirname(input_file)
blob_name = os.path.join(snapshot_path, os.readlink(input_file))
os.rename(output_file, blob_name)
os.symlink(blob_name, output_file)
if os.path.exists(input_file):
os.remove(input_file)
except Exception as e:
raise Exception(f"Error reorganizing Hugging Face cache: {e}")
else:
print(f"Error: The file {input_file} does not exist.")
if __name__ == "__main__":
check_and_install_zipnn()
parser = argparse.ArgumentParser(description="Enter a file path to decompress.")
parser.add_argument("input_file", type=str, help="Specify the path to the file to decompress.")
parser.add_argument(
"--delete",
action="store_true",
help="A flag that triggers deletion of a single compressed file instead of decompression",
)
parser.add_argument(
"--force", action="store_true", help="A flag that forces overwriting when decompressing."
)
parser.add_argument(
"--hf_cache",
action="store_true",
help="A flag that indicates if the file is in the Hugging Face cache.",
)
args = parser.parse_args()
optional_kwargs = {}
if args.delete:
optional_kwargs["delete"] = args.delete
if args.force:
optional_kwargs["force"] = args.force
if args.hf_cache:
optional_kwargs["hf_cache"] = args.hf_cache
decompress_file(args.input_file, **optional_kwargs)
|