document_loaders / toolbox /to_markdown /aliyun_to_markdown.py
HoneyTian's picture
first commit
e94100d
raw
history blame
13.2 kB
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import logging
import os
from pathlib import Path
import re
import shutil
import tempfile
from urllib.parse import urlparse
import uuid
import urllib
import time
import requests
from project_settings import environment, project_path
from toolbox.to_markdown.base_to_markdown import BaseToMarkdown
from alibabacloud_docmind_api20220711.client import Client as docmind_api20220711Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_docmind_api20220711 import models as docmind_api20220711_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_credentials.client import Client as CredClient
logger = logging.getLogger("toolbox")
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--filename",
# default=(project_path / "data/files/pdf/2024.naacl-long.35.pdf").as_posix(),
default="https://aclanthology.org/2024.naacl-long.35.pdf",
type=str
)
args = parser.parse_args()
return args
@BaseToMarkdown.register("aliyun")
class AliyunToMarkdown(BaseToMarkdown):
"""
https://help.aliyun.com/zh/document-mind/developer-reference/document-parsing-large-model-version
"""
def __init__(self,
filename: str,
endpoint: str = "docmind-api.cn-hangzhou.aliyuncs.com",
access_key_id: str = None,
access_key_secret: str = None,
):
super().__init__(filename)
self.filename_or_url = self.filename
self.endpoint = endpoint
if access_key_id is None or access_key_secret is None:
self.access_key_id, self.access_key_secret = self.get_access_key()
else:
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.client = self.get_client()
self.doc_mind_id: str = None
self.status: str = None
self.layouts: list = None
self.image_count = 0
@staticmethod
def get_access_key():
cred = CredClient().get_credential()
access_key_id = cred.get_access_key_id()
access_key_secret = cred.get_access_key_secret()
return access_key_id, access_key_secret
def get_client(self):
config = open_api_models.Config(
access_key_id=self.access_key_id,
access_key_secret=self.access_key_secret,
)
config.endpoint = self.endpoint
client = docmind_api20220711Client(config)
return client
def submit_url(self, url: str, filename_extension: str):
request = docmind_api20220711_models.SubmitDocParserJobRequest(
file_url=url,
file_name_extension=filename_extension,
)
try:
response = self.client.submit_doc_parser_job(request)
doc_mind_id = response.body.data.id
except Exception as error:
print(f"submit file failed. type: {type(error)}, text: {str(error)}")
raise error
return doc_mind_id
def submit_file(self, filename: str, filename_extension: str):
request = docmind_api20220711_models.SubmitDocParserJobAdvanceRequest(
file_url_object=open(filename, "rb"),
file_name_extension=filename_extension,
)
runtime = util_models.RuntimeOptions()
try:
response = self.client.submit_doc_parser_job_advance(request, runtime)
doc_mind_id = response.body.data.id
except Exception as error:
print(f"submit file failed. type: {type(error)}, text: {str(error)}")
raise error
return doc_mind_id
def query(self, doc_mind_id: str):
request = docmind_api20220711_models.QueryDocParserStatusRequest(
id=doc_mind_id,
)
try:
response = self.client.query_doc_parser_status(request)
result = response.body.data
except Exception as error:
print(f"query failed. type: {type(error)}, text: {str(error)}")
raise error
return result
def query_result(self, doc_mind_id: str, layout_num: int = 0, layout_step_size: int = 10):
request = docmind_api20220711_models.GetDocParserResultRequest(
id=doc_mind_id,
layout_num=layout_num,
layout_step_size = layout_step_size,
)
try:
response = self.client.get_doc_parser_result(request)
result = response.body.data
except Exception as error:
print(f"query failed. type: {type(error)}, text: {str(error)}")
raise error
return result
def get_layouts(self,
doc_mind_id: str = None,
layout_step_size: int = 10,
):
if doc_mind_id is None and self.layouts is not None:
return self.layouts
doc_mind_id = doc_mind_id or self.doc_mind_id
if self.status is None:
js = self.query(doc_mind_id)
self.status = js.status
elif self.status == "failed":
raise AssertionError("status: failed. ")
layout_num = 0
layouts_list = list()
while True:
js = self.query_result(
doc_mind_id=doc_mind_id,
layout_num=layout_num,
layout_step_size=layout_step_size
)
layouts = js["layouts"]
if len(layouts) == 0:
break
layouts_list.extend(layouts)
layout_num += layout_step_size
return layouts_list
def get_md_text(self,
doc_mind_id: str = None,
layout_step_size: int = 10,
with_images: bool = True,
with_formula: bool = True,
with_table: bool = True,
):
result = ""
layouts = self.get_layouts(doc_mind_id, layout_step_size)
for layout in layouts:
type_ = layout["type"]
sub_type_ = layout["subType"]
markdown_content_ = layout["markdownContent"]
if type_ == "title":
result += markdown_content_
elif type_ == "text":
result += markdown_content_
elif type_ == "corner_note":
result += markdown_content_
elif type_ == "contents_title" and sub_type_ == "cate_title":
result += markdown_content_
elif type_ == "contents_title" and sub_type_ == "none":
result += markdown_content_
elif type_ == "contents" and sub_type_ == "cate":
result += markdown_content_
elif type_ == "multicolumn" and sub_type_ == "none":
result += markdown_content_
elif type_ == "stamp" and sub_type_ == "none":
continue
elif type_ == "side" and sub_type_ == "sidebar":
continue
elif type_ == "side" and sub_type_ == "none":
continue
elif type_ == "head_image" and sub_type_ == "none":
continue
elif type_ == "foot_image" and sub_type_ == "none":
continue
elif type_ == "embedded" and sub_type_ == "none":
continue
elif type_ == "figure" and sub_type_ == "picture":
if with_images:
result += markdown_content_
elif type_ == "figure" and sub_type_ == "picture":
if with_images:
result += markdown_content_
elif type_ == "figure" and sub_type_ == "logo":
if with_images:
result += markdown_content_
elif type_ == "figure" and sub_type_ == "none":
if with_images:
result += markdown_content_
elif type_ == "figure_name" and sub_type_ == "none":
if with_images:
result += markdown_content_
elif type_ == "figure_name" and sub_type_ == "pic_title":
if with_images:
result += markdown_content_
elif type_ == "formula" and sub_type_ == "formula":
if with_formula:
result += markdown_content_
elif type_ == "formula" and sub_type_ == "none":
if with_formula:
result += markdown_content_
elif type_ == "table" and sub_type_ == "none":
if with_table:
result += markdown_content_
elif type_ == "table_name" and sub_type_ == "none":
if with_table:
result += markdown_content_
else:
print(type_)
print(sub_type_)
print(markdown_content_)
print(layout)
result += markdown_content_
return result
def save_to_zip(self, output_dir: str):
is_url = self.is_url(self.filename_or_url)
filename_extension = self.get_extension_name(self.filename_or_url, is_url=is_url)
# submit
if is_url:
doc_mind_id = self.submit_url(url=self.filename_or_url, filename_extension=filename_extension)
else:
doc_mind_id = self.submit_file(filename=self.filename_or_url, filename_extension=filename_extension)
logger.info(f"doc_mind_id: {doc_mind_id}, filename: {self.filename_or_url}")
# query
while True:
js = self.query(doc_mind_id=doc_mind_id)
status = js.status
if status is None:
time.sleep(1)
continue
elif status == "init":
time.sleep(1)
continue
elif status == "processing":
time.sleep(1)
continue
elif status == "failed":
raise AssertionError("failed. ")
elif status == "success":
break
else:
raise AssertionError(f"unexpected status: {status}")
# query result
md_text = self.get_md_text(
doc_mind_id=doc_mind_id,
)
# save
basename = str(uuid.uuid4())
temp_dir = Path(tempfile.gettempdir()) / basename
temp_dir.mkdir(parents=True, exist_ok=False)
# save images
md_text = self.convert_image_to_local(
markdown_text=md_text,
data_dir=temp_dir.as_posix(),
image_folder="media",
)
# save markdown
md_file = temp_dir / f"{basename}.md"
with open(md_file.as_posix(), "w", encoding="utf-8") as f:
f.write(md_text)
# zip
output_zip_file = os.path.join(output_dir, f"{basename}.zip")
self.zip_directory(temp_dir, output_zip_file)
shutil.rmtree(temp_dir)
return output_zip_file
def save_image(self,
image_url: str,
data_dir: str = "media",
image_folder: str = "media",
):
parse_result = urlparse(image_url)
image_name = Path(parse_result.path).name
filename = Path(data_dir) / image_folder / image_name
filename.parent.mkdir(parents=True, exist_ok=True)
resp = requests.get(image_url)
with open(filename.as_posix(), "wb") as f:
f.write(resp.content)
return filename
def convert_image_to_local(self,
markdown_text: str,
data_dir: str,
image_folder: str = "media",
):
pattern1 = r'\!\[(?:.*?)\]\((.+?)\)'
def replace(match):
image_url = match.group(1)
filename = self.save_image(image_url, data_dir, image_folder)
relative_path = Path(filename).relative_to(data_dir)
image_name = relative_path.name
result = f"![{image_name}]({relative_path.as_posix()})"
return result
markdown_text = re.sub(pattern1, replace, markdown_text)
return markdown_text
@staticmethod
def is_url(string: str):
try:
result = urlparse(string)
return all([result.scheme, result.netloc])
except ValueError:
return False
def get_extension_name(self, filename_or_url: str, is_url: bool = False):
if is_url:
parse_result = urlparse(filename_or_url)
path = parse_result.path
_, filename_extension = os.path.splitext(path)
else:
_, filename_extension = os.path.splitext(filename_or_url)
filename_extension = filename_extension[1:]
filename_extension = filename_extension.lower()
return filename_extension
def main():
args = get_args()
aliyun = AliyunToMarkdown(
filename=args.filename,
)
output_zip_file = aliyun.save_to_zip(output_dir=".")
print(output_zip_file)
return
if __name__ == "__main__":
main()