Spaces:
Sleeping
Sleeping
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import argparse | |
import logging | |
import os | |
from pathlib import Path | |
import re | |
import shutil | |
import tempfile | |
from urllib.parse import urlparse | |
import uuid | |
import urllib | |
import time | |
import requests | |
from project_settings import environment, project_path | |
from toolbox.to_markdown.base_to_markdown import BaseToMarkdown | |
from alibabacloud_docmind_api20220711.client import Client as docmind_api20220711Client | |
from alibabacloud_tea_openapi import models as open_api_models | |
from alibabacloud_docmind_api20220711 import models as docmind_api20220711_models | |
from alibabacloud_tea_util import models as util_models | |
from alibabacloud_credentials.client import Client as CredClient | |
logger = logging.getLogger("toolbox") | |
def get_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--filename", | |
# default=(project_path / "data/files/pdf/2024.naacl-long.35.pdf").as_posix(), | |
default="https://aclanthology.org/2024.naacl-long.35.pdf", | |
type=str | |
) | |
args = parser.parse_args() | |
return args | |
class AliyunToMarkdown(BaseToMarkdown): | |
""" | |
https://help.aliyun.com/zh/document-mind/developer-reference/document-parsing-large-model-version | |
""" | |
def __init__(self, | |
filename: str, | |
endpoint: str = "docmind-api.cn-hangzhou.aliyuncs.com", | |
access_key_id: str = None, | |
access_key_secret: str = None, | |
): | |
super().__init__(filename) | |
self.filename_or_url = self.filename | |
self.endpoint = endpoint | |
if access_key_id is None or access_key_secret is None: | |
self.access_key_id, self.access_key_secret = self.get_access_key() | |
else: | |
self.access_key_id = access_key_id | |
self.access_key_secret = access_key_secret | |
self.client = self.get_client() | |
self.doc_mind_id: str = None | |
self.status: str = None | |
self.layouts: list = None | |
self.image_count = 0 | |
def get_access_key(): | |
cred = CredClient().get_credential() | |
access_key_id = cred.get_access_key_id() | |
access_key_secret = cred.get_access_key_secret() | |
return access_key_id, access_key_secret | |
def get_client(self): | |
config = open_api_models.Config( | |
access_key_id=self.access_key_id, | |
access_key_secret=self.access_key_secret, | |
) | |
config.endpoint = self.endpoint | |
client = docmind_api20220711Client(config) | |
return client | |
def submit_url(self, url: str, filename_extension: str): | |
request = docmind_api20220711_models.SubmitDocParserJobRequest( | |
file_url=url, | |
file_name_extension=filename_extension, | |
) | |
try: | |
response = self.client.submit_doc_parser_job(request) | |
doc_mind_id = response.body.data.id | |
except Exception as error: | |
print(f"submit file failed. type: {type(error)}, text: {str(error)}") | |
raise error | |
return doc_mind_id | |
def submit_file(self, filename: str, filename_extension: str): | |
request = docmind_api20220711_models.SubmitDocParserJobAdvanceRequest( | |
file_url_object=open(filename, "rb"), | |
file_name_extension=filename_extension, | |
) | |
runtime = util_models.RuntimeOptions() | |
try: | |
response = self.client.submit_doc_parser_job_advance(request, runtime) | |
doc_mind_id = response.body.data.id | |
except Exception as error: | |
print(f"submit file failed. type: {type(error)}, text: {str(error)}") | |
raise error | |
return doc_mind_id | |
def query(self, doc_mind_id: str): | |
request = docmind_api20220711_models.QueryDocParserStatusRequest( | |
id=doc_mind_id, | |
) | |
try: | |
response = self.client.query_doc_parser_status(request) | |
result = response.body.data | |
except Exception as error: | |
print(f"query failed. type: {type(error)}, text: {str(error)}") | |
raise error | |
return result | |
def query_result(self, doc_mind_id: str, layout_num: int = 0, layout_step_size: int = 10): | |
request = docmind_api20220711_models.GetDocParserResultRequest( | |
id=doc_mind_id, | |
layout_num=layout_num, | |
layout_step_size = layout_step_size, | |
) | |
try: | |
response = self.client.get_doc_parser_result(request) | |
result = response.body.data | |
except Exception as error: | |
print(f"query failed. type: {type(error)}, text: {str(error)}") | |
raise error | |
return result | |
def get_layouts(self, | |
doc_mind_id: str = None, | |
layout_step_size: int = 10, | |
): | |
if doc_mind_id is None and self.layouts is not None: | |
return self.layouts | |
doc_mind_id = doc_mind_id or self.doc_mind_id | |
if self.status is None: | |
js = self.query(doc_mind_id) | |
self.status = js.status | |
elif self.status == "failed": | |
raise AssertionError("status: failed. ") | |
layout_num = 0 | |
layouts_list = list() | |
while True: | |
js = self.query_result( | |
doc_mind_id=doc_mind_id, | |
layout_num=layout_num, | |
layout_step_size=layout_step_size | |
) | |
layouts = js["layouts"] | |
if len(layouts) == 0: | |
break | |
layouts_list.extend(layouts) | |
layout_num += layout_step_size | |
return layouts_list | |
def get_md_text(self, | |
doc_mind_id: str = None, | |
layout_step_size: int = 10, | |
with_images: bool = True, | |
with_formula: bool = True, | |
with_table: bool = True, | |
): | |
result = "" | |
layouts = self.get_layouts(doc_mind_id, layout_step_size) | |
for layout in layouts: | |
type_ = layout["type"] | |
sub_type_ = layout["subType"] | |
markdown_content_ = layout["markdownContent"] | |
if type_ == "title": | |
result += markdown_content_ | |
elif type_ == "text": | |
result += markdown_content_ | |
elif type_ == "corner_note": | |
result += markdown_content_ | |
elif type_ == "contents_title" and sub_type_ == "cate_title": | |
result += markdown_content_ | |
elif type_ == "contents_title" and sub_type_ == "none": | |
result += markdown_content_ | |
elif type_ == "contents" and sub_type_ == "cate": | |
result += markdown_content_ | |
elif type_ == "multicolumn" and sub_type_ == "none": | |
result += markdown_content_ | |
elif type_ == "stamp" and sub_type_ == "none": | |
continue | |
elif type_ == "side" and sub_type_ == "sidebar": | |
continue | |
elif type_ == "side" and sub_type_ == "none": | |
continue | |
elif type_ == "head_image" and sub_type_ == "none": | |
continue | |
elif type_ == "foot_image" and sub_type_ == "none": | |
continue | |
elif type_ == "embedded" and sub_type_ == "none": | |
continue | |
elif type_ == "figure" and sub_type_ == "picture": | |
if with_images: | |
result += markdown_content_ | |
elif type_ == "figure" and sub_type_ == "picture": | |
if with_images: | |
result += markdown_content_ | |
elif type_ == "figure" and sub_type_ == "logo": | |
if with_images: | |
result += markdown_content_ | |
elif type_ == "figure" and sub_type_ == "none": | |
if with_images: | |
result += markdown_content_ | |
elif type_ == "figure_name" and sub_type_ == "none": | |
if with_images: | |
result += markdown_content_ | |
elif type_ == "figure_name" and sub_type_ == "pic_title": | |
if with_images: | |
result += markdown_content_ | |
elif type_ == "formula" and sub_type_ == "formula": | |
if with_formula: | |
result += markdown_content_ | |
elif type_ == "formula" and sub_type_ == "none": | |
if with_formula: | |
result += markdown_content_ | |
elif type_ == "table" and sub_type_ == "none": | |
if with_table: | |
result += markdown_content_ | |
elif type_ == "table_name" and sub_type_ == "none": | |
if with_table: | |
result += markdown_content_ | |
else: | |
print(type_) | |
print(sub_type_) | |
print(markdown_content_) | |
print(layout) | |
result += markdown_content_ | |
return result | |
def save_to_zip(self, output_dir: str): | |
is_url = self.is_url(self.filename_or_url) | |
filename_extension = self.get_extension_name(self.filename_or_url, is_url=is_url) | |
# submit | |
if is_url: | |
doc_mind_id = self.submit_url(url=self.filename_or_url, filename_extension=filename_extension) | |
else: | |
doc_mind_id = self.submit_file(filename=self.filename_or_url, filename_extension=filename_extension) | |
logger.info(f"doc_mind_id: {doc_mind_id}, filename: {self.filename_or_url}") | |
# query | |
while True: | |
js = self.query(doc_mind_id=doc_mind_id) | |
status = js.status | |
if status is None: | |
time.sleep(1) | |
continue | |
elif status == "init": | |
time.sleep(1) | |
continue | |
elif status == "processing": | |
time.sleep(1) | |
continue | |
elif status == "failed": | |
raise AssertionError("failed. ") | |
elif status == "success": | |
break | |
else: | |
raise AssertionError(f"unexpected status: {status}") | |
# query result | |
md_text = self.get_md_text( | |
doc_mind_id=doc_mind_id, | |
) | |
# save | |
basename = str(uuid.uuid4()) | |
temp_dir = Path(tempfile.gettempdir()) / basename | |
temp_dir.mkdir(parents=True, exist_ok=False) | |
# save images | |
md_text = self.convert_image_to_local( | |
markdown_text=md_text, | |
data_dir=temp_dir.as_posix(), | |
image_folder="media", | |
) | |
# save markdown | |
md_file = temp_dir / f"{basename}.md" | |
with open(md_file.as_posix(), "w", encoding="utf-8") as f: | |
f.write(md_text) | |
# zip | |
output_zip_file = os.path.join(output_dir, f"{basename}.zip") | |
self.zip_directory(temp_dir, output_zip_file) | |
shutil.rmtree(temp_dir) | |
return output_zip_file | |
def save_image(self, | |
image_url: str, | |
data_dir: str = "media", | |
image_folder: str = "media", | |
): | |
parse_result = urlparse(image_url) | |
image_name = Path(parse_result.path).name | |
filename = Path(data_dir) / image_folder / image_name | |
filename.parent.mkdir(parents=True, exist_ok=True) | |
resp = requests.get(image_url) | |
with open(filename.as_posix(), "wb") as f: | |
f.write(resp.content) | |
return filename | |
def convert_image_to_local(self, | |
markdown_text: str, | |
data_dir: str, | |
image_folder: str = "media", | |
): | |
pattern1 = r'\!\[(?:.*?)\]\((.+?)\)' | |
def replace(match): | |
image_url = match.group(1) | |
filename = self.save_image(image_url, data_dir, image_folder) | |
relative_path = Path(filename).relative_to(data_dir) | |
image_name = relative_path.name | |
result = f"![{image_name}]({relative_path.as_posix()})" | |
return result | |
markdown_text = re.sub(pattern1, replace, markdown_text) | |
return markdown_text | |
def is_url(string: str): | |
try: | |
result = urlparse(string) | |
return all([result.scheme, result.netloc]) | |
except ValueError: | |
return False | |
def get_extension_name(self, filename_or_url: str, is_url: bool = False): | |
if is_url: | |
parse_result = urlparse(filename_or_url) | |
path = parse_result.path | |
_, filename_extension = os.path.splitext(path) | |
else: | |
_, filename_extension = os.path.splitext(filename_or_url) | |
filename_extension = filename_extension[1:] | |
filename_extension = filename_extension.lower() | |
return filename_extension | |
def main(): | |
args = get_args() | |
aliyun = AliyunToMarkdown( | |
filename=args.filename, | |
) | |
output_zip_file = aliyun.save_to_zip(output_dir=".") | |
print(output_zip_file) | |
return | |
if __name__ == "__main__": | |
main() | |