# yyj import requests import xml.etree.ElementTree as ET import os from tqdm import tqdm import json import shutil from loguru import logger from lxml import etree import requests from bs4 import BeautifulSoup import os def download_pdfs(path, doi_list): #fox dalao contribution https://github.com/BigWhiteFox # 确保下载目录存在 if not os.path.exists(path): os.makedirs(path) if isinstance(doi_list, str): doi_list = [doi_list] href_list = [] for doi in doi_list: url = f"https://sci-hub.se/{doi}" response = requests.get(url) # 检查请求是否成功 if response.status_code == 200: print(f"成功请求:{url}") else: print(f"请求失败:{url},状态码:{response.status_code}") continue # 如果请求失败,跳过本次循环 soup = BeautifulSoup(response.text, 'html.parser') buttons = soup.find_all('button', onclick=True) for button in buttons: onclick = button.get('onclick') if onclick: pdf_url = onclick.split("'")[1] href_list.append((pdf_url, doi)) print("pdf_url:", pdf_url) print("href_list:", href_list) # 遍历href_list中的每个URL for href, doi in href_list: pdf_url = f"https:{href}" try: response = requests.get(pdf_url, stream=True) if response.status_code == 200: filename = doi.replace("/", "_") + ".pdf" file_path = os.path.join(path, filename) with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"File downloaded and saved as: {file_path}") else: print(f"Download failed, Status Code: {response.status_code}, URL: {pdf_url}") except requests.RequestException as e: print(f"Failed to download due to an exception: {e}") class ArticleRetrieval: def __init__(self, keywords: list = [], pmids: list = [], repo_dir = 'repodir', retmax = 500): if keywords is [] and pmids is []: raise ValueError("Either keywords or pmids must be provided.") self.keywords = keywords self.pmids = pmids self.repo_dir = repo_dir self.retmax = retmax self.pmc_ids = [] def esummary_pmc(self): base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?" params = { "db": "pubmed", "id": ','.join(self.pmids), # "retmax": self.retmax } response = requests.get(base_url, params=params) root = ET.fromstring(response.content) results = [] for docsum in root.findall('DocSum'): pmcid = None doi = None abstract = None id_value = docsum.find('Id').text for item in docsum.findall('.//Item[@Name="doi"]'): doi = item.text break for item in docsum.findall('.//Item[@Name="pmc"]'): pmcid = item.text break results.append((id_value, pmcid, doi)) logger.info(f"total {len(results)} articles:") logger.info(f"found {len([r for r in results if r[1] is not None])} articles with PMC ID.") logger.info(f"found {len([r for r in results if r[2] is not None])} articles with DOI.") logger.info(f"found {len([r for r in results if r[1] is None and r[2] is None])} articles without PMC ID and DOI.") self.esummary = results self.pmc_ids = [r[1] for r in results if r[1] is not None] self.scihub_doi = [r[2] for r in results if r[1] is None and r[2] is not None] self.failed_pmids = [r[0] for r in results if r[1] is None and r[2] is None] ## 通过Pubmed数据库检索文章 def esearch_pmc(self): base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi" params = { "db": "pubmed", "term": '+'.join(self.keywords), "retmax": self.retmax } response = requests.get(base_url, params=params) root = ET.fromstring(response.content) idlist = root.find('.//IdList') try: pmids = [id_element.text for id_element in idlist.findall('.//Id')] except: pmids = [] print(f"Found {len(pmids)} articles for keywords {self.keywords}.") self.search_pmid = pmids self.pmids.extend(pmids) # 解析XML文件 def _get_all_text(self, element): """递归获取XML元素及其所有子元素的文本内容。确保element不为None.""" if element is None: return "" text = element.text or "" for child in element: text += self._get_all_text(child) if child is not None and child.tail: text += child.tail return text ## 清洗XML文件 def _clean_xml(self,txt): parser = etree.XMLParser(recover=True) root = ET.fromstring(txt,parser=parser) txt = self._get_all_text(root) txt = txt.split('REFERENCES')[0] # 截取参考文献之前的文本 text = '\n\n'.join([t.strip() for t in txt.split('\n') if len(t.strip())>250]) return text ## 通过PMC数据库获取全文 def fetch_full_text(self): if not os.path.exists(self.repo_dir): os.makedirs(self.repo_dir) os.makedirs(self.repo_dir + '_ab') print(f"Saving articles to {self.repo_dir}.") self.pmc_success = 0 self.scihub_success = 0 self.abstract_success = 0 self.failed_download = [] self.failed_abstract = [] downloaded = os.listdir(self.repo_dir) downloaded_ab = os.listdir(self.repo_dir + '_ab') for id in tqdm(self.pmc_ids, desc="Fetching full texts", unit="article"): # check if file already downloaded if f"{id}.txt" in downloaded: print(f"File already downloaded: {id}") self.pmc_success += 1 continue base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" params = { "db": "pmc", "id": id, "rettype": "xml", "retmode": "text" } response = requests.get(base_url, params=params) full_text = self._clean_xml(response.text) if full_text.strip() == '': self.failed_download.append(id) continue else: logger.info(full_text[:200]) with open(os.path.join(self.repo_dir,f'{id}.txt'), 'w') as f: f.write(full_text) self.pmc_success += 1 for doi in tqdm(self.scihub_doi, desc="Fetching full texts", unit="article"): # check if file already downloaded if f"{doi.replace('/','_')}.pdf" in downloaded: print(f"File already downloaded: {doi}") self.scihub_success += 1 continue if download_pdfs(path=self.repo_dir,doi_list = doi): self.scihub_success += 1 else: self.failed_download.append(doi) for pmid in tqdm(self.pmids, desc="Fetching abstract texts", unit="article"): # check if file already downloaded if f"{pmid}.txt" in downloaded_ab: print(f"File already downloaded: {pmid}") self.scihub_success += 1 continue base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" params = { "db": "pubmed", "id": pmid, } response = requests.get(base_url, params=params) root = ET.fromstring(response.content) abstract = root.find('.//AbstractText') if abstract is not None: with open(os.path.join(self.repo_dir + '_ab',f'{pmid}.txt'), 'w') as f: f.write(abstract.text) self.abstract_success += 1 else: self.failed_abstract.append(pmid) def save_config(self): config = { 'repo_dir': self.repo_dir, 'keywords': self.keywords, 'retmax': self.retmax, "search_pmids": self.search_pmid, 'import_pmids': [id for id in self.pmids if id not in self.search_pmid], 'failed_pmids': self.failed_pmids, 'result': [ { 'pmid': r[0], 'pmcid': r[1], 'doi': r[2] } for r in self.esummary ], "pmc_success_d": self.pmc_success, "scihub_success_d": self.scihub_success, "failed_download": self.failed_download, "abstract_success": self.abstract_success, "failed_abstract": self.failed_abstract } with open(os.path.join(self.repo_dir, 'info.json'), 'w') as f: json.dump(config, f, indent=4, ensure_ascii=False) def initiallize(self): if self.keywords !=[]: print(self.keywords) self.esearch_pmc() # get pmids from pubmed database using keywords self.esummary_pmc() # get pmc ids from pubmed database using pmids self.fetch_full_text() # get full text from pmc database using pmc ids self.save_config() # save config file if __name__ == '__main__': if os.path.exists('repodir'): shutil.rmtree('repodir') strings = """ 34536239 7760895 36109602 24766875""" string = [k.strip() for k in strings.split('\n')] pmids = [k for k in string if k.isdigit()] print(pmids) keys = [k for k in string if not k.isdigit() and k != ''] print(keys) articelfinder = ArticleRetrieval(keywords = keys,pmids = pmids, repo_dir = 'repodir',retmax = 5) articelfinder.initiallize()