#!/usr/bin/python3 # -*- coding: utf-8 -*- import argparse import logging import os from pathlib import Path import re import shutil import tempfile from urllib.parse import urlparse import uuid import urllib import time import requests from project_settings import environment, project_path from toolbox.to_markdown.base_to_markdown import BaseToMarkdown from alibabacloud_docmind_api20220711.client import Client as docmind_api20220711Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_docmind_api20220711 import models as docmind_api20220711_models from alibabacloud_tea_util import models as util_models from alibabacloud_credentials.client import Client as CredClient logger = logging.getLogger("toolbox") def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "--filename", # default=(project_path / "data/files/pdf/2024.naacl-long.35.pdf").as_posix(), default="https://aclanthology.org/2024.naacl-long.35.pdf", type=str ) args = parser.parse_args() return args @BaseToMarkdown.register("aliyun") class AliyunToMarkdown(BaseToMarkdown): """ https://help.aliyun.com/zh/document-mind/developer-reference/document-parsing-large-model-version """ def __init__(self, filename: str, endpoint: str = "docmind-api.cn-hangzhou.aliyuncs.com", access_key_id: str = None, access_key_secret: str = None, ): super().__init__(filename) self.filename_or_url = self.filename self.endpoint = endpoint if access_key_id is None or access_key_secret is None: self.access_key_id, self.access_key_secret = self.get_access_key() else: self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.client = self.get_client() self.doc_mind_id: str = None self.status: str = None self.layouts: list = None self.image_count = 0 @staticmethod def get_access_key(): cred = CredClient().get_credential() access_key_id = cred.get_access_key_id() access_key_secret = cred.get_access_key_secret() return access_key_id, access_key_secret def get_client(self): config = open_api_models.Config( access_key_id=self.access_key_id, access_key_secret=self.access_key_secret, ) config.endpoint = self.endpoint client = docmind_api20220711Client(config) return client def submit_url(self, url: str, filename_extension: str): request = docmind_api20220711_models.SubmitDocParserJobRequest( file_url=url, file_name_extension=filename_extension, ) try: response = self.client.submit_doc_parser_job(request) doc_mind_id = response.body.data.id except Exception as error: print(f"submit file failed. type: {type(error)}, text: {str(error)}") raise error return doc_mind_id def submit_file(self, filename: str, filename_extension: str): request = docmind_api20220711_models.SubmitDocParserJobAdvanceRequest( file_url_object=open(filename, "rb"), file_name_extension=filename_extension, ) runtime = util_models.RuntimeOptions() try: response = self.client.submit_doc_parser_job_advance(request, runtime) doc_mind_id = response.body.data.id except Exception as error: print(f"submit file failed. type: {type(error)}, text: {str(error)}") raise error return doc_mind_id def query(self, doc_mind_id: str): request = docmind_api20220711_models.QueryDocParserStatusRequest( id=doc_mind_id, ) try: response = self.client.query_doc_parser_status(request) result = response.body.data except Exception as error: print(f"query failed. type: {type(error)}, text: {str(error)}") raise error return result def query_result(self, doc_mind_id: str, layout_num: int = 0, layout_step_size: int = 10): request = docmind_api20220711_models.GetDocParserResultRequest( id=doc_mind_id, layout_num=layout_num, layout_step_size = layout_step_size, ) try: response = self.client.get_doc_parser_result(request) result = response.body.data except Exception as error: print(f"query failed. type: {type(error)}, text: {str(error)}") raise error return result def get_layouts(self, doc_mind_id: str = None, layout_step_size: int = 10, ): if doc_mind_id is None and self.layouts is not None: return self.layouts doc_mind_id = doc_mind_id or self.doc_mind_id if self.status is None: js = self.query(doc_mind_id) self.status = js.status elif self.status == "failed": raise AssertionError("status: failed. ") layout_num = 0 layouts_list = list() while True: js = self.query_result( doc_mind_id=doc_mind_id, layout_num=layout_num, layout_step_size=layout_step_size ) layouts = js["layouts"] if len(layouts) == 0: break layouts_list.extend(layouts) layout_num += layout_step_size return layouts_list def get_md_text(self, doc_mind_id: str = None, layout_step_size: int = 10, with_images: bool = True, with_formula: bool = True, with_table: bool = True, ): result = "" layouts = self.get_layouts(doc_mind_id, layout_step_size) for layout in layouts: type_ = layout["type"] sub_type_ = layout["subType"] markdown_content_ = layout["markdownContent"] if type_ == "title": result += markdown_content_ elif type_ == "text": result += markdown_content_ elif type_ == "corner_note": result += markdown_content_ elif type_ == "contents_title" and sub_type_ == "cate_title": result += markdown_content_ elif type_ == "contents_title" and sub_type_ == "none": result += markdown_content_ elif type_ == "contents" and sub_type_ == "cate": result += markdown_content_ elif type_ == "multicolumn" and sub_type_ == "none": result += markdown_content_ elif type_ == "stamp" and sub_type_ == "none": continue elif type_ == "side" and sub_type_ == "sidebar": continue elif type_ == "side" and sub_type_ == "none": continue elif type_ == "head_image" and sub_type_ == "none": continue elif type_ == "foot_image" and sub_type_ == "none": continue elif type_ == "embedded" and sub_type_ == "none": continue elif type_ == "figure" and sub_type_ == "picture": if with_images: result += markdown_content_ elif type_ == "figure" and sub_type_ == "picture": if with_images: result += markdown_content_ elif type_ == "figure" and sub_type_ == "logo": if with_images: result += markdown_content_ elif type_ == "figure" and sub_type_ == "none": if with_images: result += markdown_content_ elif type_ == "figure_name" and sub_type_ == "none": if with_images: result += markdown_content_ elif type_ == "figure_name" and sub_type_ == "pic_title": if with_images: result += markdown_content_ elif type_ == "formula" and sub_type_ == "formula": if with_formula: result += markdown_content_ elif type_ == "formula" and sub_type_ == "none": if with_formula: result += markdown_content_ elif type_ == "table" and sub_type_ == "none": if with_table: result += markdown_content_ elif type_ == "table_name" and sub_type_ == "none": if with_table: result += markdown_content_ else: print(type_) print(sub_type_) print(markdown_content_) print(layout) result += markdown_content_ return result def save_to_zip(self, output_dir: str): is_url = self.is_url(self.filename_or_url) filename_extension = self.get_extension_name(self.filename_or_url, is_url=is_url) # submit if is_url: doc_mind_id = self.submit_url(url=self.filename_or_url, filename_extension=filename_extension) else: doc_mind_id = self.submit_file(filename=self.filename_or_url, filename_extension=filename_extension) logger.info(f"doc_mind_id: {doc_mind_id}, filename: {self.filename_or_url}") # query while True: js = self.query(doc_mind_id=doc_mind_id) status = js.status if status is None: time.sleep(1) continue elif status == "init": time.sleep(1) continue elif status == "processing": time.sleep(1) continue elif status == "failed": raise AssertionError("failed. ") elif status == "success": break else: raise AssertionError(f"unexpected status: {status}") # query result md_text = self.get_md_text( doc_mind_id=doc_mind_id, ) # save basename = str(uuid.uuid4()) temp_dir = Path(tempfile.gettempdir()) / basename temp_dir.mkdir(parents=True, exist_ok=False) # save images md_text = self.convert_image_to_local( markdown_text=md_text, data_dir=temp_dir.as_posix(), image_folder="media", ) # save markdown md_file = temp_dir / f"{basename}.md" with open(md_file.as_posix(), "w", encoding="utf-8") as f: f.write(md_text) # zip output_zip_file = os.path.join(output_dir, f"{basename}.zip") self.zip_directory(temp_dir, output_zip_file) shutil.rmtree(temp_dir) return output_zip_file def save_image(self, image_url: str, data_dir: str = "media", image_folder: str = "media", ): parse_result = urlparse(image_url) image_name = Path(parse_result.path).name filename = Path(data_dir) / image_folder / image_name filename.parent.mkdir(parents=True, exist_ok=True) resp = requests.get(image_url) with open(filename.as_posix(), "wb") as f: f.write(resp.content) return filename def convert_image_to_local(self, markdown_text: str, data_dir: str, image_folder: str = "media", ): pattern1 = r'\!\[(?:.*?)\]\((.+?)\)' def replace(match): image_url = match.group(1) filename = self.save_image(image_url, data_dir, image_folder) relative_path = Path(filename).relative_to(data_dir) image_name = relative_path.name result = f"![{image_name}]({relative_path.as_posix()})" return result markdown_text = re.sub(pattern1, replace, markdown_text) return markdown_text @staticmethod def is_url(string: str): try: result = urlparse(string) return all([result.scheme, result.netloc]) except ValueError: return False def get_extension_name(self, filename_or_url: str, is_url: bool = False): if is_url: parse_result = urlparse(filename_or_url) path = parse_result.path _, filename_extension = os.path.splitext(path) else: _, filename_extension = os.path.splitext(filename_or_url) filename_extension = filename_extension[1:] filename_extension = filename_extension.lower() return filename_extension def main(): args = get_args() aliyun = AliyunToMarkdown( filename=args.filename, ) output_zip_file = aliyun.save_to_zip(output_dir=".") print(output_zip_file) return if __name__ == "__main__": main()