File size: 7,978 Bytes
7f62904 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
from faker import Faker
import pandas as pd
import re
import sys
import json
import unicodedata
from bs4 import BeautifulSoup #解析requests请求到的HTML页面
from urllib.parse import urljoin
import cpca # 地点 city mapping
from selenium import webdriver
import helium as hm
from import Options
from import Service
from keywordInfo import key_pat, zwlx_list
# 北京市信息发布,考编公告~ A _1
sub_url =""
sub_url =""
def getDriver():
# Set path Selenium
uas = Faker()
CHROMEDRIVER_PATH = '/usr/bin/chromedriver'
CHROMEDRIVER_PATH = './chromedriver'
WINDOW_SIZE = "1920,1080"
# Options
chrome_options = Options()
chrome_options.add_argument("--window-size=%s" % WINDOW_SIZE)
# 增加一个参数设置
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
with open('./stealth.min.js') as f:
js =
driver = webdriver.Chrome(executable_path=CHROMEDRIVER_PATH, options=chrome_options)
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js })
return driver
def content_with_date(lines):
if len(lines) < 1:
return []
date_pattern1 = r'\d{1,2}月\d{1,2}日'
date_pattern2 = r'[上|下]午'
date_pattern3 = r'\d{4}年\d{1,2}月\d{1,2}日'
inx_ = len(lines)-1
for inx_, line in enumerate(lines):
matches = re.findall(f'({date_pattern1}|{date_pattern3})', line)
if len(matches)>0:
if len(matches)<1:
return []
# new_ = "\n".join(lines[:inx_+1])
# year_pattern = r'\d{4}年'
# matches = re.findall(f'({year_pattern})', old_title)
return lines[:inx_+1]
#👏👏👏 这里需要调整获取多少段落,不至于获得太多冗余信息
def find_key_paragrap(search_text, paragraphs):
# Loop through the paragraphs and print the text content of those that contain the search text
for inx_, paragraph in enumerate(paragraphs):
text = paragraph.text
if search_text in text:
# Get the index of the matched line
# index = text.index(search_text)
index = inx_
# Print the matched line and 5 lines before and after it
start = max(0, inx_)
end = min(len(paragraphs), index + 7)
target_paragrap = paragraphs[start:end]
texts = [i.text for i in target_paragrap]
dt_lines = content_with_date(texts)
if len(dt_lines) >= 1:
return texts
return None
def titleLocInfo(title):
"""get loction and year from title"""
# print(title)
# print(driver.current_url)
# zwk_year
year_pattern = r'\d{4}年'
matches = re.findall(f'({year_pattern})', title)
zwk_year = matches[0] if len(matches) else "2023"
# zwk_sheng
area_df = cpca.transform([title])
# 省份
zwk_sheng = list(area_df["省"])[0] if area_df.shape[0] > 0 else ""
a_ = list(area_df["市"])[0] if area_df.shape[0] > 0 else ""
b_ = list(area_df["区"])[0] if area_df.shape[0] > 0 else ""
zwk_diqu = a_
# 唯一标识
zwk_zip = list(area_df["adcode"])[0] if area_df.shape[0] > 0 else ""
zwlx = zwlx_list[0] # 默认类型公务员
for i in zwlx_list:
if i in title:
zwlx = i
res = [zwk_year, zwk_sheng, zwk_diqu, zwk_zip, zwlx]
# print("\t".join(res))
return res
def extract_from_driver(driver):
""" get result from url request BeautifulSoup(texts,'html.parser')
return: doc_item ,time source info, and attach information
#[zwk_year, zwk_sheng, zwk_diqu, zwk_zip, zwlx]
title_info = titleLocInfo(title)
items_ = driver.find_elements_by_xpath("//p")
items_ = [i.text for i in items_ if i.text != ""]
context_to_label = "\n".join(items_)
paragraphs = driver.find_elements_by_tag_name("p")
paragraphs = [i for i in paragraphs if i.text.strip() != ""]
# extract keyword info
# key_pat["报名"], key_pat["考试"], key_pat["缴费"], key_pat["准考证"], key_pat["all"]
def get_key_info(pt:list):
for item in pt:
res_ = find_key_paragrap(item, paragraphs)
if res_ is not None:
return res_
return ""
bm_sj = get_key_info(key_pat["报名"])
fee_sj = get_key_info(key_pat["缴费"])
ks_sj = get_key_info(key_pat["考试"])
zkz_sj = get_key_info(key_pat["准考证"])
# 附件 links ".doc" or ".xls" ".xlsx"
links = driver.find_elements_by_tag_name("a")
unique_link = {}
for link in links:
url_ = link.get_attribute("href")
content_ = link.get_attribute("textContent")
url_con = url_ and (url_.endswith(".doc") or url_.endswith(".xls") or url_.endswith(".xlsx"))
name_con = content_ and (content_.endswith(".doc") or content_.endswith(".xls") or content_.endswith(".xlsx"))
if url_con or name_con:
unique_link[content_] = url_
name = ["title", "zwk_year", "zwk_sheng", "zwk_diqu", "zwk_zip", "zwlx",
"bm_sj", "fee_sj", "ks_sj", "zkz_sj",
"fn_list" ,
"tidy_bm_sj","tidy_fee_sj", "tidy_ks_sj", "tidy_zkz_sj"
doc_item = [title]
doc_item.extend([bm_sj, fee_sj, ks_sj, zkz_sj,
td_bm_sj = content_with_date(bm_sj)
td_fee_sj = content_with_date(fee_sj)
td_ks_sj = content_with_date(ks_sj)
td_zkz_sj = content_with_date(zkz_sj)
doc_item.extend([td_bm_sj, td_fee_sj, td_ks_sj, td_zkz_sj])
doc_dc = {}
for k_, v_ in zip(name, doc_item):
doc_dc[k_] = v_
return doc_dc
# 用于表格化记录
def table_record_doc(doc):
fn_dc = doc["fn_list"]
row_item = [
doc["title"], doc["zwk_year"], doc["zwk_sheng"], doc["zwk_diqu"], doc["zwk_zip"],
"\n".join([f"{k}:{v}" for k,v in fn_dc.items()])
name = ["title", "zwk_year", "zwk_sheng", "zwk_diqu", "zwk_zip", "zwlx",
"bm_sj", "fee_sj", "ks_sj", "zkz_sj",
"tidy_bm_sj","tidy_fee_sj", "tidy_ks_sj", "tidy_zkz_sj",
a = pd.DataFrame(data=[row_item], columns=name)
return row_item
if __name__ == '__main__':
mydriver = getDriver()
# hm.goto(url)
url = ""
if len(sys.argv) > 1:
url = sys.argv[1]
hm.set_driver(mydriver) # 给它一个selnuim driver
# mydriver.driver.
import time
res = extract_from_driver(mydriver)
print("-raw, mostly contains----------------------------")
bm_sj = doc["bm_sj"]
bm_sj = content_with_date(bm_sj)
fee_sj = content_with_date(doc["fee_sj"])
ks_sj = content_with_date(doc["ks_sj"])
zkz_sj = content_with_date(doc["zkz_sj"])