前期準備css
獲取39藥品網全部藥品IDhtml
1.下載39藥品網全部藥品頁面python
# -*- coding: utf-8 -*- """ @Datetime: 2018/10/15 @Author: Zhang Yafei """ import re import logging import requests import os import time from retrying import retry from urllib.request import urljoin from urllib.parse import urlsplit # from scrapy import Selector from lxml import etree # from fake_useragent import UserAgent from multiprocessing import Pool from ids import Diabetes_ids # ua = UserAgent() # headers = {'User-Agent':ua.random} headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'} proxies = {'http':'http://61.135.217.7:80','https':'http://171.113.156.168:8010'} BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # DOWNLOAD_DIR = os.path.join(BASE_DIR,'藥品') DOWNLOAD_DIR = os.path.join(BASE_DIR,'糖尿病') file_path = os.path.join(BASE_DIR,'drug_ruls.txt') RUN_LOG_FILE = os.path.join(BASE_DIR,'log','run.log') ERROR_LOG_FILE = os.path.join(BASE_DIR,'log','error_log') if not os.path.exists(DOWNLOAD_DIR): os.makedirs(DOWNLOAD_DIR) class Logger(object): """ logger對象:打印日誌 """ def __init__(self): self.run_log_file = RUN_LOG_FILE self.error_log_file = ERROR_LOG_FILE self.run_log = None self.error_log = None self.initialize_run_log() self.initialize_error_log() @staticmethod def check_path_exist(log_abs_file): log_path = os.path.split(log_abs_file)[0] if not os.path.exists(log_path): os.mkdir(log_path) def initialize_run_log(self): self.check_path_exist(self.run_log_file) fh = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8') sh = logging.StreamHandler() # fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s") # fh.setFormatter(fmt) # sh.setFormatter(fmt) logger1 = logging.Logger('run_log', level=logging.INFO) logger1.addHandler(fh) logger1.addHandler(sh) self.run_logger = logger1 def initialize_error_log(self): self.check_path_exist(self.error_log_file) fh = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8') sh = logging.StreamHandler() # fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s") # fh.setFormatter(fmt) # sh.setFormatter(fmt) logger1 = logging.Logger('error_log', level=logging.ERROR) logger1.addHandler(fh) logger1.addHandler(sh) self.error_logger = logger1 def log(self, message, mode=True): """ 寫入日誌 :param message: 日誌信息 :param mode: True表示運行信息,False表示錯誤信息 :return: """ if mode: self.run_logger.info(message) else: self.error_logger.error(message) logger = Logger() class Drug(object): """ self.base_url 藥物抓取基礎url=藥品概述url self.manual_url 藥品詳細說明書url self.comment_url 藥品用藥經驗url self.ask_url 藥品諮詢url self.logger 打印日誌 """ def __init__(self,base_url): self.base_url = base_url self.drug_id = self.base_url.split('/')[-2] self.manual_url = urljoin(base_url,'manual') self.comment_url = urljoin(base_url,'comment') self.ask_url = urljoin(base_url,'ask') self.make_drug_dir() method_list = [self.summary,self.manual,self.comment,self.ask] map(lambda x:x,[x() for x in method_list]) def make_drug_dir(self): """ 建立每一種藥品全部網頁文件文件夾 :return: """ # self.check_download_dir() response = requests.get(self.base_url,headers=headers) response.encoding = response.apparent_encoding html = etree.HTML(response.text) # selector = Selector(response) # drug_name = selector.css('.t1 h1 a::text').extract_first() try: drug_name = html.xpath('//div[@class="t1"]/h1/a/text()')[0] except IndexError: drug_name = html.xpath('//div[@class="t1"]/h1/text()')[0] self.drug_name = self.validateTitle(drug_name) self.drug_dir_path = os.path.join(DOWNLOAD_DIR,'{}[{}]'.format(self.drug_name,self.drug_id)) if not os.path.exists(self.drug_dir_path): os.mkdir(self.drug_dir_path) def validateTitle(self,title): rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |' new_title = re.sub(rstr, "_", title) # 替換爲下劃線 return new_title @retry(stop_max_attempt_number=3) def retry_download(self, url): """ 經過裝飾器封裝重試下載模塊,最多重試三次 :param url_str: 下載網頁的最終地址 :param data: Post傳輸數據 :param method: 下載方法GET或POST :param proxies: 代理服務器 :return: 下載結果 """ result = requests.get(url, headers=headers, proxies=proxies,timeout=3) assert result.status_code == 200 # 使用斷言判斷下載狀態,成功則返回結果,失敗拋出異常 return result def download(self, url): """ 真正的下載類,代理模式 :param url_str:下載的連接 :param data:post須要傳輸的數據 :param method:請求方法 :param proxies:代理 :return:下載的結果 """ try: result = self.retry_download(url) except Exception as e: # 異常處理儘可能使用具體的異常 print(e) # logger.log(url,False) result = None return result def summary(self): """ 抓取藥品概述頁 :return: """ summary_path = os.path.join(self.drug_dir_path,'{}[{}]-藥品概述.html'.format(self.drug_name,self.drug_id)) if os.path.exists(summary_path): print('{}藥品概述已經下載過了'.format(self.drug_name)) else: response = requests.get(self.base_url,headers=headers) if response.status_code != 200: response = self.download(self.base_url) if not response: # self.logger.log('{}[{}]-藥品概述下載失敗-{}'.format(self.drug_name,self.drug_id,self.base_url),False) logger.log('{}'.format(self.base_url),False) return response = response.content.decode('gb2312','ignore') with open(summary_path,'w',encoding='gb2312') as file: file.write(response) logger.log('{}[{}]-藥品概述下載完成'.format(self.drug_name,self.drug_id)) def manual(self): """ 抓取藥品詳細說明書 :return: """ manual_path = os.path.join(self.drug_dir_path,'{}[{}]-詳細說明書.html'.format(self.drug_name,self.drug_id)) if os.path.exists(manual_path): print('{}詳細說明書已經下載過了'.format(self.drug_name)) else: response = requests.get(self.manual_url,headers=headers) if response.status_code != 200: response = self.download(self.base_url) if not response: # self.logger.log('{}[{}]-詳細說明書下載失敗-{}'.format(self.drug_name,self.drug_id,self.manual_url),False) logger.log('{}'.format(self.manual_url),False) return response = response.content.decode('gb2312','ignore') with open(manual_path,'w',encoding='gb2312') as file: file.write(response) logger.log('{}[{}]-詳細說明書下載完成'.format(self.drug_name,self.drug_id)) def comment(self): """ 藥品用藥經驗頁 :return: """ response = requests.get(self.comment_url,headers=headers) if response.status_code != 200: response = self.download(self.base_url) if not response: # self.logger.log('{}[{}]-用藥經驗下載失敗'.format(self.drug_name,self.drug_id,self.comment_url),False) logger.log('{}'.format(self.comment_url),False) return response = response.content.decode('gb2312','ignore') html = etree.HTML(response) try: comment_nums = int(html.xpath('//div[@class="dps"]/cite/font/text()')[0]) except IndexError as e: logger.log('{}[{}]-用藥經驗頁評論數爲零'.format(self.drug_name,self.drug_id)) comment_nums = 0 # selector = Selector(response) # comment_nums = int(selector.css('.dps cite font::text').extract_first()) num,remainder = divmod(comment_nums,20) for x in range(1,num+2): url = urljoin(self.base_url,'comment/k0_p{}'.format(x)) self.comment_page(url) def comment_page(self,url): """ 抓取用藥經驗詳情頁 :param url: :return: """ comment_path = os.path.join(self.drug_dir_path,'{}[{}]-用藥經驗{}.html'.format(self.drug_name,self.drug_id,url[-1])) if os.path.exists(comment_path): print('{}[{}]-用藥經驗{}已經下載過了'.format(self.drug_name,self.drug_id,url[-1])) else: response = requests.get(url,headers=headers) if response.status_code != 200: response = self.download(self.base_url) if not response: # self.logger.log('{}[{}]-用藥經驗{}下載失敗-{}'.format(self.drug_name,self.drug_id,url[-1],url),False) logger.log('{}'.format(url),False) return response = response.content.decode('gb2312','ignore') with open(comment_path,'w',encoding='gb2312') as file: file.write(response) logger.log('{}[{}]-用藥經驗{}下載完成'.format(self.drug_name,self.drug_id,url[-1])) def ask(self): """ 藥品用藥諮詢頁 :return: """ response = requests.get(self.ask_url) if response.status_code != 200: response = self.download(self.base_url) if not response: # self.logger.log('{}[{}]-用藥諮詢下載失敗-{}'.format(self.drug_name,self.drug_id,self.ask_url),False) logger.log('{}'.format(self.ask_url),False) return response = response.content.decode('gb2312','ignore') html = etree.HTML(response) try: ask_nums = html.xpath('//span[@class="pages"]/span[@class="pgleft"]/b/text()')[0] ask_nums = int(re.match('.*?(\d+).*',ask_nums).group(1)) except Exception as e: ask_nums = 0 logger.log('{}[{}]-用藥諮詢頁無人提問'.format(self.drug_name,self.drug_id)) # selector = Selector(response) # ask_nums = int(selector.css('.pages .pgleft b::text').re('\d+')[0]) num,remainder = divmod(ask_nums,5) for x in range(1,num+2): url = urljoin(self.base_url,'ask/p{}'.format(x)) self.ask_page(url) def ask_page(self,url): """ 抓取用藥諮詢詳情頁 :param url: :return: """ ask_path = os.path.join(self.drug_dir_path,'{}[{}]-用藥諮詢{}.html'.format(self.drug_name,self.drug_id,url[-1])) if os.path.exists(ask_path): print('{}[{}]-用藥諮詢{}已經下載過了'.format(self.drug_name,self.drug_id,url[-1])) else: response = requests.get(url,headers=headers) if response.status_code != 200: response = self.download(self.base_url) if not response: # self.logger.log('{}[{}]-用藥諮詢{}下載失敗-{}'.format(self.drug_name,self.drug_id,url[-1],url),False) logger.log('{}'.format(url),False) return response = response.content.decode('gb2312','ignore') with open(ask_path,'w',encoding='gb2312') as file: file.write(response) logger.log('{}[{}]-用藥諮詢{}下載完成'.format(self.drug_name,self.drug_id,url[-1])) def transform_urls(filename): drug_id = re.findall(r'.*?\[(\d+)\]', filename)[-1] drug_url = 'http://ypk.39.net/{}/'.format(drug_id) return drug_url def check_downloaded(func): def inner(drug_urls): file_list = os.listdir(DOWNLOAD_DIR) file_list = map(transform_urls,[filename for filename in file_list]) # print(len(list(file_list))) files = set(drug_urls)-set(file_list) # print(len(drug_urls)) # print(len(files)) func(list(files)) return inner def get_drug_urls(): """讀取全部要抓取藥品的url地址""" with open(file_path,'r',encoding='utf-8') as f: drug_urls = f.readlines() drug_urls = list(map(lambda x: x.strip(), list(drug_urls))) return drug_urls def get_diabetes_urls(): return list(set(list(map(lambda x:'http://ypk.39.net/{}/'.format(x),Diabetes_ids)))) def main(drug_base_url): """建立Drug類實例,進行每一種藥品的抓取""" Drug(drug_base_url) def validateTitle(title): rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |' new_title = re.sub(rstr, "_", title) # 替換爲下劃線 return new_title def spider(url): url_path = urlsplit(url) drug_id = url_path.path.strip('/') try: response = requests.get(url=url,headers=headers,timeout=3) # response.encoding = response.apparent_encoding response = response.content.decode('gb2312','ignore') html = etree.HTML(response) drug_name = html.xpath('//div[@class="t1"]/h1/text()')[0] drug_name = validateTitle(drug_name) except Exception as e: print(e) logger.log(url,False) return drug_dir_path = os.path.join(DOWNLOAD_DIR, '{}[{}]'.format(drug_name, drug_id)) if not os.path.exists(drug_dir_path): os.mkdir(drug_dir_path) drug_html_detail = os.path.join(drug_dir_path,'{}[{}].html'.format(drug_name,drug_id)) if not os.path.exists(drug_html_detail): with open(drug_html_detail,'w',encoding='gb2312') as file: file.write(response) print(drug_name,'下載成功') @check_downloaded def run(drug_urls): """建立進程池""" print(drug_urls) print(len(drug_urls)) pool = Pool(5) pool.map(main,drug_urls) #drug_urls[7010:12000] # pool.map(spider,drug_urls) pool.close() pool.join() if __name__ == '__main__': drug_urls = get_drug_urls() run(drug_urls) # urls = get_diabetes_urls() # run(urls)
具體實現:進程池,requests+lxml,打印日誌,類,從新下載排除已下載頁面react
2.解析全部藥品頁面提取有價值信息web
# -*- coding: utf-8 -*- """ @Datetime: 2018/10/13 @Author: Zhang Yafei """ import csv import json import os import re from scrapy.selector import Selector import logging import pandas # import numpy as np BASE_DIRS = os.path.dirname(os.path.abspath(__file__)) drug_path = os.path.join(BASE_DIRS,'藥品') dirs_list = os.listdir(drug_path) analysis_file_path = os.path.join(BASE_DIRS,'drug_info.tsv') RUN_LOG_FILE = os.path.join(BASE_DIRS,'analysis_log','run.log') ERROR_LOG_FILE = os.path.join(BASE_DIRS,'analysis_log','error_log') def get_unresoved_drug_list(): """獲得未解析藥品列表""" data = pandas.read_csv('drug_info.tsv',sep='\t',encoding='utf-8') try: resoved_drug_list = data.apply(lambda row:'{}[{}]'.format(row['藥品名稱'],row['藥品ID']),axis=1).tolist() except AttributeError as e: resoved_drug_list = [] for index,row in data.iterrows(): drug_name = '{}[{}]'.format(row['藥品名稱'],row['藥品ID']) resoved_drug_list.append(drug_name) unresoved_drug_list = list(set(dirs_list) - set(resoved_drug_list)) return unresoved_drug_list #1.index方式 # resoved_drug_list = [] # for row in data.index: # drug_name = '{}[{}]'.format(data.iloc[row]['藥品名稱'],data.iloc[row]['藥品ID']) # resoved_drug_list.append(drug_name) #2.iterrows方式 # for index,row in data.iterrows(): # drug_name = '{}[{}]'.format(row['藥品名稱'],row['藥品ID']) # resoved_drus_list.append(drug_name) # print(dirs_list.__len__(),resoved_drug_list.__len__(),unresoved_drug_list.__len__()) def write_resoved_drag_list(drag): """將解析完成的藥品寫入文件""" if not os.path.exists('resolved_drag_list.py'): resoved_drag_list = set() resoved_drag_list.add(drag) else: with open('resolved_drag_list.py', 'r', encoding='utf-8') as f: resoved_drag_list = set(json.load(f)) resoved_drag_list.add(drag) with open('resolved_drag_list.py','w',encoding='utf-8') as f: json.dump(list(resoved_drag_list),f) def write_error_drag_list(drag): """將錯誤drug寫入文件""" if not os.path.exists('error_drag_list.py'): error_drag_list = set() error_drag_list.add(drag) else: with open('error_drag_list.py', 'r', encoding='utf-8') as f: error_drag_list = set(json.load(f)) error_drag_list.add(drag) with open('error_drag_list.py','w',encoding='utf-8') as f: json.dump(list(error_drag_list),f) class Logger(object): def __init__(self): self.run_log_file = RUN_LOG_FILE self.error_log_file = ERROR_LOG_FILE self.run_log = None self.error_log = None self.initialize_run_log() self.initialize_error_log() @staticmethod def check_path_exist(log_abs_file): log_path = os.path.split(log_abs_file)[0] if not os.path.exists(log_path): os.mkdir(log_path) def initialize_run_log(self): self.check_path_exist(self.run_log_file) fh = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8') sh = logging.StreamHandler() fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s") # fh.setFormatter(fmt) sh.setFormatter(fmt) logger1 = logging.Logger('run_log', level=logging.INFO) logger1.addHandler(fh) logger1.addHandler(sh) self.run_logger = logger1 def initialize_error_log(self): self.check_path_exist(self.error_log_file) fh = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8') sh = logging.StreamHandler() fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s") # fh.setFormatter(fmt) sh.setFormatter(fmt) logger1 = logging.Logger('error_log', level=logging.ERROR) logger1.addHandler(fh) logger1.addHandler(sh) self.error_logger = logger1 def log(self, message, mode=True): """ 寫入日誌 :param message: 日誌信息 :param mode: True表示運行信息,False表示錯誤信息 :return: """ if mode: self.run_logger.info(message) else: self.error_logger.error(message) class DrugInfo(object): """ 提取的藥品信息: self.drug_name #藥品名稱 self.category #藥品類型 self.cite #國家標準 self.company #生產廠家 self.address #廠家地址 self.license_number #批准文號 self.approval_date #批准日期 self.form_drug #劑型 self.spec #規格 self.store #儲存方法 self.period_valid #有效期限 self.attention_rank #關注度排名 self.indication #適應症 self.component #成分 self.function #功能主治 self.usage_dosage #用法用量 self.contraindication #禁忌症 self.special_population #特殊人羣用藥 self.indications #適應症概況 self.is_or_not_medical_insurance #是否屬於醫保 self.is_or_not_infections #是否有傳染性 self.related_symptoms #相關症狀 self.related_examination #相關檢查 self.adverse_reaction #不良反應 self.attention_matters #注意事項 self.interaction #藥物相互做用 self.pharmacological_action #藥理做用 self.revision_date #說明書修訂日期 self.drug_use_consult #用藥諮詢 self.drug_use_experience #用藥經驗 """ def __init__(self,drug): drug_dir = os.path.join(drug_path, drug) self.drug_name = re.findall('(.*?)\[\d+\]',drug)[0] self.drug_id = re.findall('.*?\[(\d+)\].*',drug)[0] self.drug_dir = drug_dir self.drug_use_experience = '' self.drug_use_consult = '' self.file_list = os.listdir(self.drug_dir) self.logger = Logger() self.result = True self.dispatch() if self.drug_use_consult.__len__()==0:self.drug_use_consult = '無' if self.drug_use_experience.__len__()==0:self.drug_use_experience = '無' def dispatch(self): for file in self.file_list: if file.endswith('藥品概述.html'): self.drug_summary(self.file_path(file)) elif file.endswith('詳細說明書.html'): self.drug_instruction(self.file_path(file)) elif re.match('.*?用藥諮詢.*',file): self.drug_consultation(self.file_path(file)) elif re.match('.*?用藥經驗.*',file): self.drug_experience(self.file_path(file)) else: self.result = False break def file_path(self,file): return os.path.join(self.drug_dir,file) def read_file(self,file): with open(file,'r') as f: html = f.read() return html def drug_summary(self,file): """藥品概況""" html = self.read_file(file) selector = Selector(text=html) self.category = selector.xpath('//div[@class="t1"]/cite[1]/span/text()').extract_first() #藥品類型 if not self.category: self.category = '未知' self.cite = selector.xpath('//div[@class="t1"]/cite[2]/span/text()').extract_first() #國家標準 if not self.cite: self.cite = '未知' try: self.company = selector.css('.t3 .company a::text').extract()[0] #生產廠家 except IndexError as e: self.company = '未知' try: self.address = selector.css('.t3 .address::text').extract()[0] #廠家地址 except IndexError as e: self.address = '未知' try: self.license_number = selector.xpath('//ul[@class="xxs"]/li[1]/text()').extract_first().strip() #批准文號 except AttributeError: self.license_number = '未知' try: self.approval_date = selector.xpath('//ul[@class="xxs"]/li[2]/text()').extract_first().strip() #批准日期 except AttributeError: self.approval_date = '未知' try: self.form_drug = selector.xpath('//ul[@class="showlis"]/li[1]/text()').extract_first().strip() #劑型 except AttributeError: self.form_drug = '未知' try: self.spec = selector.xpath('//ul[@class="showlis"]/li[2]/text()').extract_first().strip() #規格 except AttributeError: self.spec = '未知' try: self.store = selector.xpath('//ul[@class="showlis"]/li[3]/text()').extract_first().strip().strip('。') #儲存方法 except AttributeError: self.store = '未知' try: self.period_valid = selector.xpath('//ul[@class="showlis"]/li[4]/text()').extract_first().strip('。').replace('\n','') #有效期限 except AttributeError: self.period_valid = '未知' self.attention_rank = selector.css('.guanzhu cite font::text').extract_first() #關注度排名 if not self.attention_rank: self.attention_rank = '未知' self.indication = ','.join(selector.css('.whatsthis li::text').extract()) #適應症 if self.indication == '': self.indication = '未知' usage_dosage = selector.css('.ps p:nth-child(3)::text').extract_first() #用法用量 if usage_dosage: self.usage_dosage = re.sub('<.*?>','',usage_dosage).strip().replace('\n','') #禁忌症 else: self.usage_dosage = '未知' indications = selector.css('#diseaseintro::text').extract_first() #適應症概況 if indications: self.indications = re.sub('<.*?>','',indications).strip().replace('\n','') #禁忌症 else: self.indications = '未知' try: self.is_or_not_medical_insurance = selector.css('.syz_cons p:nth-child(2)::text').extract_first().split(':')[1] #是否屬於醫保 except AttributeError as e: self.is_or_not_medical_insurance = '未知' try: self.is_or_not_infections = selector.css('.syz_cons p:nth-child(3)::text').extract_first().split(':')[1].strip() #是否有傳染性 except AttributeError as e: self.is_or_not_infections = '未知' self.related_symptoms = ','.join(selector.css('.syz_cons p:nth-child(4) a::text').extract()[:-1]) #相關症狀 if len(self.related_symptoms) == 0: self.related_symptoms = '未知' self.related_examination = ','.join(selector.css('.syz_cons p:nth-child(5) a::text').extract()[:-1]) #相關檢查 if len(self.related_examination) == 0: self.related_examination = '未知' def drug_instruction(self,file): """詳細說明書""" html = self.read_file(file) selector = Selector(text=html) #注:不一樣藥品之間網頁結構有差異,提取的時候應注意 component = selector.xpath('//dt[text()="【成份】"]/following::*[1]').extract_first() if not component: self.component = '未知' else: self.component = re.sub('<.*?>','',component).strip() #成分 contraindication= selector.xpath('//dt[text()="【禁忌】"]/following::*[1]').extract_first() if contraindication: self.contraindication = re.sub('<.*?>','',contraindication).strip().replace('\n','') #禁忌症 else: self.contraindication = '未知' function = selector.xpath('//dt[text()="【功能主治】"]/following::*[1]').extract_first() if function: self.function = re.sub('<.*?>','',function).strip() #功能主治 else: self.function = '未知' try: self.adverse_reaction = selector.xpath('//dt[text()="【不良反應】"]/following::*[1]/p/text()').extract_first().strip('。') #不良反應 except AttributeError as e: try: self.adverse_reaction = selector.xpath('//dt[text()="【不良反應】"]/following::*[1]/text()').extract_first().strip('。') #不良反應 self.adverse_reaction = re.sub('<.*?>','',self.adverse_reaction).strip().replace('\n','') #注意事項 except AttributeError: self.adverse_reaction = '未知' attention_matters = selector.xpath('//dt[text()="【注意事項】"]/following::*[1]').extract_first() if attention_matters: self.attention_matters = re.sub('<.*?>','',attention_matters).strip().replace('\n','') #注意事項 else: self.attention_matters = '未知' self.logger.log('{}[{}]-注意事項爲空'.format(self.drug_name,self.drug_id),False) try: self.interaction = selector.xpath('//dt[text()="【藥物相互做用】"]/following::*[1]/p/text()').extract_first() #藥物相互做用 self.interaction = re.sub('<.*?>','',self.interaction).strip().replace('\n','') #注意事項 except TypeError: self.interaction = '未知' try: self.pharmacological_action = selector.xpath('//dt[text()="【藥理做用】"]/following::*[1]/p/text()').extract_first() #藥理做用 self.pharmacological_action = re.sub('<.*?>','',self.pharmacological_action).strip().replace('\n','') except TypeError: self.pharmacological_action = '未知' try: self.revision_date = selector.xpath('//dt[text()="【說明書修訂日期】"]/following::*[1]/text()').extract_first().strip() #說明書修訂日期 except AttributeError: self.revision_date = '未知' try: self.special_population = selector.xpath('//dt[text()="【特殊人羣用藥】"]/following::*[1]/text()').extract_first() #特殊人羣用藥 self.special_population = re.sub('<.*?>','',self.special_population).strip().replace('\n','') #特殊人羣用藥 except TypeError: self.special_population = '未知' def drug_consultation(self,file): """用藥諮詢""" html = self.read_file(file) selector = Selector(text=html) drug_use_consult = selector.css('.dpzx_con .zx p::text').extract() drug_use_consult = ''.join(drug_use_consult) drug_use_consult = re.sub('<.*?>','',drug_use_consult).strip().replace('\n','') #用藥諮詢 self.drug_use_consult += drug_use_consult def drug_experience(self,file): """用藥經驗""" html = self.read_file(file) selector = Selector(text=html) drug_use_experience = selector.css('.pls_box .pls_mid p::text').extract() drug_use_experience = ''.join(drug_use_experience) drug_use_experience = re.sub('<.*?>','',drug_use_experience).strip().replace('\n','') #用藥經驗 self.drug_use_experience += drug_use_experience.strip() @staticmethod def write_to_fileheader(): with open('drug_info.tsv','w',newline='',encoding='utf-8') as MyFile: writer = csv.writer(MyFile,dialect='excel-tab') drug_header = ['藥品名稱','藥品ID','藥品類型','國家標準','生產廠家','廠家地址','批准文號','批准日期','劑型','規格','儲存方法','有效期限','關注度排名','適應症','成分','功能主治','用發用量','禁忌症','特殊人羣用藥','適應症概況','是否用於醫保','是否具備傳染性','相關症狀','相關檢查','不良反應','注意事項','藥物相互做用','藥理做用','說明書修訂日期','用藥經驗','用藥諮詢'] writer.writerow(drug_header) def write_to_file(self): with open('drug_info.tsv', 'a', newline='', encoding='utf-8') as MyFile: writer = csv.writer(MyFile, dialect='excel-tab') druginfo_list = [self.drug_name,self.drug_id,self.category,self.cite,self.company,self.address,self.license_number,self.approval_date, self.form_drug,self.spec,self.store,self.period_valid,self.attention_rank, self.indication,self.component,self.function,self.usage_dosage,self.contraindication, self.special_population,self.indications,self.is_or_not_medical_insurance,self.is_or_not_infections, self.related_symptoms,self.related_examination,self.adverse_reaction,self.attention_matters, self.interaction,self.pharmacological_action,self.revision_date,self.drug_use_experience, self.drug_use_consult, ] writer.writerow(druginfo_list) self.logger.log('{}[{}]信息寫入文件完畢'.format(self.drug_name,self.drug_id)) def main(drug): """主函數""" druginfo = DrugInfo(drug) # print(druginfo.drug_name,druginfo.drug_id) # print(druginfo.drug_use_experience) # print(druginfo.drug_use_consult) if druginfo.result: druginfo.write_to_file() write_resoved_drag_list(drug) else: druginfo.logger.log('{}[{}]'.format(druginfo.drug_name,druginfo.drug_id),False) write_error_drag_list(drug) def new_data(row): """增長別名列""" # print(row['藥品名稱'],row['別名']) drug_name = row['藥品名稱'] try: row['別名'] = drug_name.rsplit('(',1)[1].strip(')') row['藥品名稱'] = drug_name.rsplit('(',1)[0] except IndexError as e: row['別名'] = np.NAN return row def update_drug_name(): """更新葯品文件信息""" data = pandas.read_csv('drug_info.tsv',sep='\t',encoding='utf-8') col_name = data.columns.tolist() # print(dir(col_name)) col_name.insert(col_name.index('藥品名稱')+1,'別名') # col_name.insert(1,'別名') data = data.reindex(columns=col_name) new_drug = data.apply(new_data,axis=1) new_drug.to_csv('new_drug_info.tsv',index=False,sep='\t',encoding='utf-8') print('文件保存成功') # print(new_drug[['藥品名稱','別名']]) # for row in data[10:13].iterrows(): # drug_name = row['藥品名稱'].values # drug_alias = drug_name.rsplit('(',1)[1].strip(')') # print(drug_name) # print(drug_alias) # print(data.tail(10).index) # print(data.iloc[:2,1:8]) # print(data.iloc[1]['注意事項'].replace('\n','')) # print(data.iloc[2]['注意事項'].replace('\n','')) # print(data.__len__()) # resoved_drus_list = data.apply(lambda row:'{}[{}]'.format(row['藥品名稱'],row['藥品ID']),axis=1).tolist() # print(resoved_drus_list.__len__()) # unresoved_drug_list = set(dirs_list) - set(resoved_drus_list) # print(unresoved_drug_list.__len__()) if __name__ == '__main__': if not os.path.exists(analysis_file_path): DrugInfo.write_to_fileheader() drug_list = get_unresoved_drug_list() print(drug_list.__len__()) list(map(main,drug_list)) # with open('error_drag_list.py','r',encoding='utf-8') as f: # data = json.load(f) # print(data) # update_drug_name() # newdata = pandas.read_csv('new_drug_info.tsv',sep='\t',encoding='utf-8') # print(newdata.head())
3.下載糖尿病相關藥品頁面數據庫
# -*- coding: utf-8 -*- """ @Datetime: 2018/11/10 @Author: Zhang Yafei """ import json import requests from scrapy.selector import Selector from lxml import etree from multiprocessing import Pool ids_list = [] headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'} def spider(url): response = requests.get(url,headers=headers) # selector = Selector(response=response) html = etree.HTML(response.text) # ids = selector.css('.search_ul li a:nth-child(1)::attr(href)').extract() ids = html.xpath('//ul[@class="search_ul search_ul_yb"]/li/a/@href') ids = list(map(lambda x:x.strip('/'),ids)) ids_list.extend(ids) if __name__ == '__main__': urls = ['http://ypk.39.net/tangniaobing/p{}'.format(i) for i in range(1,135)] pool = Pool(4) pool.map(spider,urls) list(map(spider,urls)) with open('ids.py','w',encoding='utf-8') as f: json.dump(ids_list,f)
4.更新葯品信息,拆分藥名列分爲藥品名稱列和別名列json
def new_data(row): """增長別名列""" drug_name = row['藥品名稱'] try: row['別名'] = drug_name.rsplit('(',1)[1].strip(')') row['藥品名稱'] = drug_name.rsplit('(',1)[0] except IndexError as e: row['別名'] = np.NAN return row def update_drug_name(): """更新葯品文件信息""" data = pandas.read_csv('drug_info.tsv',sep='\t',encoding='utf-8') col_name = data.columns.tolist() # print(dir(col_name)) col_name.insert(col_name.index('藥品名稱')+1,'別名') # col_name.insert(1,'別名') data = data.reindex(columns=col_name) new_drug = data.apply(new_data,axis=1) new_drug.to_csv('new_drug_info.tsv',index=False,sep='\t',encoding='utf-8') print('文件保存成功') if __name__ == '__main__': update_drug_name()
5.抓取全部藥品評論數,並構建藥品評論數字典服務器
# -*- coding: utf-8 -*- """ @Datetime: 2018/11/10 @Author: Zhang Yafei """ import pandas import os import re # import jieba from multiprocessing.pool import Pool from scrapy import Selector import json import numpy import time import csv BASE_DIR = os.path.dirname(os.path.abspath(__file__)) drug_path = os.path.join(BASE_DIR, '藥品') dirs_list = os.listdir(drug_path) result = {} k_list = [] v_list = [] class_list = [] # comment_data = pandas.read_csv('comment_num_grade.csv',encoding='utf-8') # data = list(comment_data.藥品名稱.values) # comment_data['類別'] = '' count = 0 class DrugInfo(object): """構造藥品評論數字典""" def __init__(self, drug): self.drug = drug drug_dir = os.path.join(drug_path, drug) self.drug_name = re.findall('(.*?)\[\d+\]', drug)[0] self.drug_id = re.findall('.*?\[(\d+)\].*', drug)[0] self.drug_dir = drug_dir self.file_list = os.listdir(self.drug_dir) self.dispatch() def dispatch(self): for file in self.file_list: # if file.endswith('藥品概述.html'): # self.drug_summary(self.file_path(file)) # if re.match('.*?用藥諮詢.*',file): # self.drug_consultation(self.file_path(file)) if re.match('.*?用藥經驗.*', file): self.drug_experience(self.file_path(file)) def file_path(self, file): return os.path.join(self.drug_dir, file) def read_file(self, file): with open(file, 'r') as f: html = f.read() return html def drug_summary(self, file): """藥品概況""" html = self.read_file(file) selector = Selector(text=html) category = selector.xpath('//div[@class="subs"]/p/a[last()]/text()').extract_first() print(category) # class_list.append(category) index = comment_data.loc[comment_data.藥品名稱 == self.drug, '類別'].index.values[0] comment_data.loc[index, '類別'] = category def drug_experience(self, file): # print(file) """用藥經驗""" html = self.read_file(file) selector = Selector(text=html) drug_use_experience_num = selector.css('.dps cite font::text').extract_first() if not drug_use_experience_num: self.drug_use_experience_num = 0 else: self.drug_use_experience_num = int(drug_use_experience_num) result[self.drug] = self.drug_use_experience_num print(self.drug,self.drug_use_experience_num) def write_to_file(self): try: with open('comment_num_grade.csv', 'a', newline='', encoding='utf_8_sig') as MyFile: writer = csv.writer(MyFile) druginfo_list = [self.drug,self.drug_use_experience_num] writer.writerow(druginfo_list) print('{}寫入文件完畢'.format(self.drug)) except AttributeError: return def write_num(): with open('comment.py', 'w', encoding='utf-8') as f: json.dump(result, f) # for k,v in result.items(): # k_list.append(k) # v_list.append(v) data = {'藥品名稱': list(result.keys()), '評論數': list(result.values())} df = pandas.DataFrame(data) comment_data = df.sort_values(by='評論數', ascending=False) comment_data.to_csv('comment_num_grade.csv', sep=',', encoding='utf_8_sig', mode='w', index=False) return comment_data def read_num(): with open('comment.py', 'r', encoding='utf-8') as f: num = json.load(f) for k, v in num.items(): k_list.append(k) v_list.append(v) data = {'藥品名稱': k_list, '評論數': v_list} df = pandas.DataFrame(data) comment_data = df.sort_values(by='評論數', ascending=False) comment_data.to_csv('comment_num_grade.csv', sep=',', encoding='utf_8_sig', mode='w', index=False) return comment_data def main(drug): """主函數""" DrugInfo(drug) # try: # result[d.drug] = d.drug_use_experience_num # except: # result[d.drug] = 0 # write_to_file(d) if __name__ == '__main__': start = time.time() # pool = Pool(4) # pool.map(main,dirs_list) # pool.close() # pool.join() list(map(main,dirs_list)) write_num() # comment_data.to_csv('new_comment_num_grade.csv',encoding='utf_8_sig',mode='w',index=False) print('總花費:{}秒'.format(time.time() - start)) # comment_data = read_num() # print(comment_data) # print(len(num))
6.提取評論數量最多的前10個藥品評論信息app
# -*- coding: utf-8 -*- """ @Datetime: 2018/11/10 @Author: Zhang Yafei """ import csv import numpy import pandas import os import re import jieba from scrapy import Selector import re BASE_DIR = os.path.dirname(os.path.abspath(__file__)) drug_path = os.path.join(BASE_DIR,'藥品') dirs_list = os.listdir(drug_path) comment_info_filename = 'first50_comment.csv' class DrugInfo(object): """ 提取的用戶評論信息: """ def __init__(self,drug): drug_dir = os.path.join(drug_path, drug) self.drug_name = re.findall('(.*?)\[\d+\]',drug)[0] self.drug_id = re.findall('.*?\[(\d+)\].*',drug)[0] self.drug_dir = drug_dir self.drug_use_experience = '' self.file_list = os.listdir(self.drug_dir) self.result = True self.dispatch() def dispatch(self): for file in self.file_list: # if re.match('.*?用藥諮詢.*',file): # self.drug_consultation(self.file_path(file)) if re.match('.*?用藥經驗.*',file): self.drug_experience(self.file_path(file)) def file_path(self,file): return os.path.join(self.drug_dir,file) def read_file(self,file): with open(file,'r') as f: html = f.read() return html def drug_experience(self,file): print(file) """用藥經驗""" html = self.read_file(file) selector = Selector(text=html) drug_use_experience = selector.css('.pls_box') try: page = selector.css('.dpzx .pages .pgleft span::text').extract()[0] except IndexError: page = 1 drug_url = 'http://ypk.39.net/{}/comment/k0_p{}'.format(self.drug_id,page) if not drug_use_experience: self.write_to_file(numpy.NAN,numpy.NAN,numpy.NAN,drug_url) return for drug in drug_use_experience: self.drug_use_experience = drug.css('.pls_mid p::text').extract()[0].replace(' ','').strip('\n') commter_info = drug.css('.pls_top cite::text').extract()[0].replace('\n','').strip('來自').strip(' ').replace(' ','/').rstrip('/') cut_info = '/'.join(list(jieba.cut(self.drug_use_experience))) cut_info = cut_info.strip('/ /') time = drug.css('.pls_top i::text').extract()[0].strip().strip('點評時間:') if not time: time = numpy.NAN self.write_to_file(commter_info,cut_info,time,drug_url) def write_to_file(self,commter_info,cut_info,time,drug_url): with open(comment_info_filename, 'a', newline='', encoding='utf_8_sig') as MyFile: writer = csv.writer(MyFile) druginfo_list = [self.drug_name,self.drug_id,commter_info,self.drug_use_experience,cut_info,time,drug_url] writer.writerow(druginfo_list) print('{}寫入文件完畢'.format(drug_url)) def write_to_fileheader(): with open(comment_info_filename,'w',newline='',encoding='utf_8_sig') as MyFile: writer = csv.writer(MyFile) drug_header = ['藥品名稱','藥品ID','評論者信息','評論','分詞','評論時間','url'] writer.writerow(drug_header) def main(drug): """主函數""" DrugInfo(drug) print('解析完成') def read_comment_num_first50(): """ 讀取前評論數前10多藥品 :return: 評論數前10多藥品名稱的列表 """ data = pandas.read_csv('concat_first50_comment.csv',encoding='utf-8') drugs = data.藥品名稱.values.tolist() drugs_id = list(map(lambda x:re.findall('\d+',x)[-1],drugs)) df = pandas.DataFrame({'drug_name':dirs_list}) drugs = list(map(lambda x:df[df.drug_name.str.contains(x)].drug_name.values,drugs_id)) drugs = list(filter(lambda x:x.__len__(),drugs)) return [x[0] for x in drugs] if __name__ == '__main__': if not os.path.exists(os.path.join(BASE_DIR,comment_info_filename)): write_to_fileheader() drugs = read_comment_num_first50() print(drugs.__len__()) list(map(main,drugs)) print(drugs.__len__())
7.分析藥品評論數量所佔比例echarts
# -*- coding: utf-8 -*- """ Created on Mon Nov 12 19:28:09 2018 @author: Zhang Yafei """ import json import os import pandas #from wordcloud import WordCloud as wc #from pyecharts import WordCloud import matplotlib as mpl from matplotlib import pyplot as plt #import wordcloud import numpy as np from PIL import Image data = pandas.read_csv('new_comment_num_grade.csv',encoding='utf-8') drug_type_num = data.類別.value_counts() drug_type_names = data.類別.value_counts().index.values drug_type_dict = {} def parse(drug_type_name): drug_type_frequence = data[data['類別']==drug_type_name].評論數.sum() drug_type_dict[drug_type_name] = int(drug_type_frequence) def plot_wordcloud(drug_dict=None): if drug_dict: label = drug_dict.keys() attr = drug_dict.values() else: label = drug_type_dict.keys() attr = drug_type_dict.values() wordcloud = WordCloud(width=800, height=620) wordcloud.add('', label, attr, word_size_range=[20, 100]) wordcloud.render('drug_comment_wordcloud.html') def plot_wc(drug_dict=None): mask = np.array(Image.open('mask1.jpg')) word_plot = wc( font_path='font/simsun.ttc', # 設置字體格式 mask=mask, # 設置背景圖 max_words=200, # 最多顯示詞數 max_font_size=100 # 字體最大值 ) if drug_dict: word_plot = word_plot.generate_from_frequencies(drug_dict) # 從字典生成詞雲 else: word_plot = word_plot.generate_from_frequencies(drug_type_dict) # 從字典生成詞雲 image_colors = wordcloud.ImageColorGenerator(mask) # 從背景圖創建顏色方案 word_plot.recolor(color_func=image_colors) # 將詞雲顏色設置爲背景圖方案 plt.imshow(word_plot) # 顯示詞雲 plt.axis('off') # 關閉座標軸 plt.show() # 顯示圖像 word_plot.to_file('comment_num.jpg') def plot_series_pie(): mpl.rcParams['font.sans-serif'] = ['SimHei'] pie_data = pandas.read_csv('drug_type_num_sum.csv',encoding='utf-8') numbers = np.array(pie_data[pie_data.評論總數>0].評論總數) drug_type = pie_data[pie_data.評論總數>0].類別名稱 series = pandas.Series(numbers,index=drug_type,name='藥物類型評論數餅狀圖') series.plot.pie(figsize=(8,8),autopct='%.2f') def plot_mpl_pie(): font = { 'family': 'SimHei' } mpl.rc('font', **font) pie_data = pandas.read_csv('drug_type_num_sum.csv', encoding='utf-8') numbers = np.array(pie_data[pie_data.評論總數 > 0].評論總數) drug_type = pie_data.類別名稱 plt.pie(numbers, labels=drug_type, autopct='%.2f%%', shadow=True, labeldistance=1.1, startangle=90, pctdistance=0.6) plt.title('藥物類型評論數餅狀圖') plt.savefig('藥物類別與評論數量餅狀圖(mpl).png') plt.show() def type_drug_num_pie(): font = { 'family': 'SimHei' } mpl.rc('font', **font) pie_data = pandas.read_csv('drug_type_num_sum.csv', encoding='utf-8') numbers = np.array(pie_data.藥品數量) drug_type = pie_data.類別名稱 plt.pie(numbers, labels=drug_type, autopct='%.2f%%', shadow=True, labeldistance=1.1, startangle=90, pctdistance=0.6) plt.title('藥物類型藥品數量數餅狀圖') plt.savefig('藥物類別與藥品數量餅狀圖(mpl).png') plt.show() def wirte_to_file(): with open('comment_num_dict.py','w',encoding='utf-8') as f: json.dump(drug_type_dict,f) def read_from_file(): with open('comment_num_dict.py','r',encoding='utf-8') as f: drug_type_dict = json.load(f) return drug_type_dict def write_type_num_to_file(): drug_type_dict = read_from_file() type_name = list(drug_type_dict.keys()) type_num = list(drug_type_dict.values()) drug_type_nums = data.類別.value_counts().values df_data = {'類別名稱':type_name,'藥品數量':drug_type_nums,'評論總數':type_num,} df = pandas.DataFrame(df_data) df.to_csv('drug_type_num_sum.csv',mode='w',encoding='utf_8_sig',index=False) def write_new_file(): new_data = pandas.read_csv('drug_type_num_sum.csv', encoding='utf-8') new_data['藥品數量所佔比例'] = round(new_data.藥品數量/new_data.藥品數量.sum(),4) new_data['評論數量所佔比例'] = round(new_data.評論總數/new_data.評論總數.sum(),4) new_data.to_csv('drug_type_num_sum.csv',mode='w',encoding='utf_8_sig',index=False) def main(): if os.path.exists('comment_num_dict.py'): drug_dict = read_from_file() # plot_wordcloud(drug_dict) plot_wc(drug_dict) else: list(map(parse,drug_type_names)) wirte_to_file() # plot_wordcloud() plot_wc() if __name__ == '__main__': # 1.計算每人評論數量所佔比例,並生成詞雲 # main() # write_type_num_to_file() # 2.畫餅狀圖 # plot_series_pie() # plot_mpl_pie() # type_drug_num_pie() # write_new_file()
8.前50藥品數據合併
# -*- coding: utf-8 -*- """ Created on Mon Dec 3 20:50:12 2018 @author: Zhang Yafei """ import pandas as pd import matplotlib.pyplot as plt import matplotlib data1 = pd.read_csv('comment_num_grade_zhangyafei.csv',encoding='utf-8') data2 = pd.read_csv('comment_num_grade_wangyuxin.csv',encoding='utf-8') data3 = pd.read_csv('comment_num_grade_liangwenqi.csv',encoding='utf-8') data4 = pd.read_csv('comment_num_grade_zhangxinrui.csv',encoding='utf-8') data5 = pd.read_table('macaizhen.txt',encoding='utf-8',header=None,names=['藥品名稱','評論數']) data6 = pd.read_csv('comment_num_grade_wangshuai.csv',encoding='utf-8') data7 = pd.read_csv('comment_num_grade_wangqi.csv',encoding='utf-8') data8 = pd.read_csv('tangao.txt',encoding='utf-8',delimiter='\t',header=None,names=['藥品名稱','評論數']) data1['who'] = '張亞飛' data2['who'] = '王於心' data3['who'] = '梁雯琪' data4['who'] = '張昕瑞' data5['who'] = '馬彩珍' data6['who'] = '王帥' data7['who'] = '王琪' data8['who'] = '唐奧' data_concat = pd.concat([data1,data2,data3,data4,data5,data6,data7,data8],ignore_index=True,sort=True).sort_values('評論數',ascending=False).reset_index().drop('index',axis=1)[:50] print(data_concat) data_concat.who.value_counts() data_concat.評論數.sum() groupby_data = data_concat.groupby(by='who')['評論數'].agg(np.sum) data9 = pd.read_csv('first50_comment_zhangyafei.csv',encoding='utf-8') data10 = pd.read_csv('first50_comment_zhangxinrui.csv',encoding='utf-8') data11 = pd.read_csv('first50_comment_wangqi.csv',encoding='utf-8') data12 = pd.read_csv('first50_comment_tangao.csv',encoding='utf-8') data13 = pd.read_csv('first50_comment_wangshuai.csv',encoding='utf-8') data14 = pd.read_csv('first50_comment_wangyuxin.csv',encoding='utf-8') data15 = pd.read_csv('first50_comment_liangwenqi.csv',encoding='utf-8') data16 = pd.read_csv('first50_comment_macaizhen.csv',encoding='utf-8') data_concat2 = pd.concat([data9,data10,data11,data12,data13,data14,data15,data16],ignore_index=True) def plot_hist(): """畫出評論數量分佈直方圖""" font = {'family' : 'SimHei'} matplotlib.rc('font', **font) plt.figure(figsize=(15,8),dpi=80) # x = data_concat.評論數.values x = data_concat2.藥品ID.value_counts().values # num_bins 分組數 num_bins = int((max(x)-min(x))//10) plt.hist(x,num_bins,facecolor='blue') plt.xticks(range(int(min(x)),int(max(x))+10,10)) plt.grid(alpha=0.5) plt.title('評論總數前50名藥品數量分佈情況') plt.xlabel('評論數量') plt.ylabel('分佈狀況') plt.savefig('評論總數前50名藥品數量分佈情況1.png') plt.show() def plot_bar(): """畫出每一個人的評論數量對比條形圖""" font = {'family':'SimHei'} matplotlib.rc('font', **font) plt.figure(figsize=(11,6),dpi=80) plt.bar(groupby_data.index,groupby_data.values) plt.xlabel('姓名') plt.ylabel('評論數') plt.title('評論數量前50名我的所佔評論總數對比') plt.savefig('評論數量前50名我的所佔評論總數對比.png') plt.show() #plot_bar() ## #plot_hist() # df = pd.DataFrame(np.arange(24).reshape(6,4),columns=['A','B','C','D']) ## df[2] = 1 # df # df[:1] def label_recognition(df): """標註識別""" # label1 = df[df.apply(lambda x:x.分詞 == x.分詞2,axis=1)] label1 = df[df.分詞 == df.分詞2] label2 = df[(df.分詞 == df.分詞2) & (df.分詞 == df.分詞3)] return label1, label2 if __name__ == '__main__': # data_concat.to_csv('concat_first50_comment.csv',encoding='utf_8_sig',index=False) # data_concat2.to_csv('first50_comment.csv',encoding='utf_8_sig',index=False) label1 = pd.read_excel(io='first50_comment_zhangxinrui2.xlsx',encoding='utf-8') label,label2 = label_recognition(label1) writer = pd.ExcelWriter('three_people_same_label.xlsx') label2.to_excel(writer,'diabetes') writer.save() new_label = label.drop('分詞2',axis=1) new_label.to_csv('label.csv',encoding='utf_8_sig',index=False)
9.適應症和不良反應數據字典的構建
# -*- coding: utf-8 -*- """ @Datetime: 2018/1/10 @Author: Zhang Yafei """ import numpy import re from scrapy.selector import Selector import pandas from twisted.web.client import getPage, defer from twisted.internet import reactor pandas.set_option('display.max_columns', None) data_list = [] n = 0 re_data = pandas.DataFrame(columns=['通用名稱', '商品名稱', '適應症', '不良反應', 'url']) def parse(content, url): """ 詳細說明書中提取適應症和不良反應 :param content: :param url: :return: 通用名稱 商品名稱 適應症 不良反應 url """ global n n += 1 print(n, url) # text = content.decode('GB2312') text = content.decode('gbk') selector = Selector(text=text) # 注:不一樣藥品之間網頁結構有差異,提取的時候應注意 drug_name = selector.xpath('//dt[text()="【藥品名稱】"]/following::*[1]').extract_first() if not drug_name: drug_name = selector.xpath('//dt[text()="【產品名稱】"]/following::*[1]').extract_first() generic_name = re.findall('通用名稱:(.*)<br>', drug_name)[0] trade_name = re.findall('商品名稱:(.*)<br>', drug_name)[0] # trade_name = numpy.NAN function = selector.xpath('//dt[text()="【功能主治】"]/following::*[1]').extract_first() if function: function = re.sub('<.*?>', '', function).strip() # 功能主治 else: function = numpy.NAN indiction = selector.xpath('//dt[text()="【適應症】"]/following::*[1]') if indiction: indiction = indiction.xpath('string(.)').extract_first().strip().replace('\n', '') else: indiction = numpy.NAN indictions = indiction if indiction is not numpy.NAN else function try: adverse_reaction = selector.xpath('//dt[text()="【不良反應】"]/following::*[1]/p/text()').extract_first().strip( '。') # 不良反應 except AttributeError: try: adverse_reaction = selector.xpath('//dt[text()="【不良反應】"]/following::*[1]/text()').extract_first().strip( '。') # 不良反應 adverse_reaction = re.sub('<.*?>', '', adverse_reaction).strip().replace('\n', '') except AttributeError: adverse_reaction = numpy.NAN data = {'通用名稱': generic_name, '商品名稱': trade_name, '適應症': indictions, '不良反應': adverse_reaction, 'url': url, } data_list.append(data) def stop_loop(arg): reactor.stop() def main(url_list): """ 主函數:利用twisted實現基於事件循環的異步非阻塞IO :param url_list: :return: """ # 制定任務計劃:分配請求任務和添加回調函數 defered_list = [] for url in url_list: defered = getPage(bytes(url, encoding='utf-8')) defered.addCallback(callback=parse, url=url) defered_list.append(defered) # 將任務計劃告訴領導和下屬,並通知任務結束以後中止 dlist = defer.DeferredList(defered_list) dlist.addBoth(stop_loop) # 開始執行任務 reactor.run() if __name__ == '__main__': # 1.讀取數據url下載響應信息 # data = pandas.read_excel('three_people_same_label.xlsx') # url_list = ['http://ypk.39.net/{}/manual'.format(i) for i in data.藥品ID.unique().tolist()] # data = pandas.read_excel('drug_dict.xlsx') # has_url = set(data.url.tolist()) # urls = list(set(url_list) - has_url) # main(urls) # # # 2. 將下載信息寫入文件 # df = pandas.DataFrame(data=data_list) # df = df.loc[:, ['通用名稱','商品名稱','適應症','不良反應','url']] # result = pandas.concat([data, df]) # writer = pandas.ExcelWriter('drug_dict.xlsx') # result.to_excel(writer, 'drug_dict', index=False) # writer.save() # 3.合併39藥品數據和不良反應數據庫數據 # df1 = pandas.read_excel('adverse_reaction_database.xlsx') # df2 = pandas.read_excel('drug_dict.xlsx') # df2['適應症2'] = numpy.NAN # df2['不良反應2'] = numpy.NAN # print(df1.藥品通用名稱) # print(df2.通用名稱) # index = df2.通用名稱.apply(lambda x: x in df1.藥品通用名稱.values) # df3 = df2.loc[index, :] # df4 = pandas.DataFrame(columns=['藥品通用名稱', '適應症', '不良反應']) # # df3.通用名稱.apply(judge) # for k in df3.通用名稱.values: # data = df1[df1.藥品通用名稱 == k] # df4 = df4.append(data, ignore_index=True) # writer = pandas.ExcelWriter('drug_dict2.xlsx') # df4.to_excel(writer, 'drug_dict', index=False) # writer.save() # 4.讀取drug_dict2.xlsx,合併相關數據 df4 = pandas.read_excel('drug_dict2.xlsx') drug_list = [] for name in df4.藥品通用名稱.unique(): result = df4[df4.藥品通用名稱 == name] indiction = '/'.join(str(s) for s in result.適應症.values if s is not numpy.NAN).strip() adverse = '/'.join(str(s) for s in result.不良反應.values if s is not numpy.NAN).strip() dict = { '藥品通用名稱': name, '適應症': indiction, '不良反應': adverse, } drug_list.append(dict) df5 = pandas.DataFrame(data=drug_list) df5 = df5.loc[:, ['藥品通用名稱','適應症','不良反應']] writer = pandas.ExcelWriter('database_dict.xlsx') df5.to_excel(writer, sheet_name='database_dict', index=False) writer.save()