
| import pandas as pd import requests import time import pdfplumber import os import io from flask import Flask, redirect, url_for, request, render_template from urllib.parse import urlencode from bs4 import BeautifulSoup from lxml import etree import json import storage.parse from query.parse import checkDataSource
app = Flask(__name__)
''' 目前仅支持如下数据类型(域名): dataType1: { # Table园所列表 dataType1_1: 'http://sh.bendibao.com/news/2022225/248869.shtm' dataType1_2: 'http://jy.tj.gov.cn/BMFW/JYZC5803/YEY787/202111/t20211129_5736214.html' } dataType2: { # Table单园所信息 dataType2_1: 'http://www.lg.gov.cn/zwfw/zdfw/jy/mdml/yeymdml/content/post_9766183.html' } dataType3: { # PDF园所列表 dataType3_1: 'http://szeb.sz.gov.cn/attachment/1/1181/1181674/10052534.pdf' } '''
def getPage(url): try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36' } response = requests.get(url, headers=headers) response.encoding = 'utf-8' if response.status_code == 200: return response.text return None except BaseException as error: print('def getPage error:', error, '网页内容提取失败!')
def parsePage(html): try: return pd.read_html(html, encoding='utf-8', header=0)[0] except BaseException as error: print('def parsePage error:', error, '未找到表格!') return False
def parseSchoolPage(html): try: return pd.read_html(html, encoding='utf-8')[0] except BaseException as error: print('def parseSchoolPage error:', error, '未找到表格!') return False
def getPDF(url): response = requests.get(url) pdfPath = f'temp/pdf/~temp{time.time()*1000}~.pdf' with open(pdfPath, 'wb+') as f: f.write(response.content) f.flush() return pdfPath
def parsePDF(pdfPath): pdf = pdfplumber.open(pdfPath) pages = pdf.pages table = [] for page in pages: table += page.extract_tables()[0] pdf.close() return table
def renameColumns(df): df.rename(columns={ '级别': 'kindergartenLevel', '性质': 'nature', '区县': 'district', '名称': 'schoolName', '园部': 'division', '地址': 'address', '电话': 'phone', }, inplace=True) df.rename(columns={ '幼儿园名称': 'schoolName', '办别': 'system', '举办者': 'organizer', '幼儿园地址': 'address', '办公电话': 'phone', }, inplace=True) df.rename(columns={ '幼儿园名称:': 'schoolName', '性质:': 'system', '所在街道:': 'street', '教师人数:': 'teacherNumber', '详细地址:': 'address', '园区简介:': 'schoolIntroduction', '园长介绍:': 'leaderIntroduction', '师资力量:': 'TeachersStrength', '教学环境:': 'teachingEnvironment', '收费标准:': 'feeStandard', '交通路线:': 'trafficRoutes', '主管部门:': 'competentDepartment', '联系电话:': 'phone', '所在社区:': 'community', '学生人数:': 'studentNumber', }, inplace=True) df.rename(columns={ '序号': 'index', '幼儿园名称': 'schoolName', '办学许可证编号或事业单\n位法人证书编号或社会统\n一信用代码': 'uniformCreditCode', '实际办学地址': 'address', '幼儿园所属街道、社\n区': 'street', '举办者': 'organizer', '法定代表人': 'legalPerson', '园长': 'leaderName', '联系电话': 'phone', '办园类型': 'system', '民办园年\n检结论': 'annualInspection', '保教费实\n际收取标\n准(元\n/月)': 'feeStandard', }, inplace=True)
def save2db(df, url): if len(df) > 0: storage.parse.batchSave(df, url) return True else: return False
def getResult(success = True, message = ''): return { 'code': 1 if success else 9, 'message': (message or '操作成功!') if success else (message or '操作失败!') }
def main(url): try: if ('.pdf' in url): pdfPath = os.path.abspath(getPDF(url)) table = parsePDF(pdfPath) os.remove(pdfPath) dataFrame = pd.DataFrame(table[1:(len(table)-1)], columns=table[0]) else: if('www.lg.gov.cn/zwfw/zdfw/jy/mdml/yeymdml/content' in url): html = getPage(url) table = parseSchoolPage(html) dataFrame = pd.DataFrame().append(table) dataFrame0_0_1_2 = dataFrame.iloc[:,:2].transpose().reset_index(drop=True) dataFrame0_4_2_4 = dataFrame.iloc[:4,2:4].transpose().reset_index(drop=True) dataFrame = pd.concat([dataFrame0_0_1_2, dataFrame0_4_2_4], axis=1) dataFrame = pd.read_csv(io.StringIO(u""+dataFrame.to_csv(header=False, index=False)), header=0) else: html = getPage(url) table = parsePage(html) dataFrame = pd.DataFrame().append(table) renameColumns(dataFrame) return save2db(dataFrame, url) except BaseException as error: print('def main error:', error) return False
@app.route('/parseSchoolData', methods = ['POST']) def postPage(): data = json.loads(json.dumps(request.json)) dataSource = data['url'] dataLen = checkDataSource(dataSource) result = {} if dataLen == -1: result = getResult(False, '查询失败!') elif dataLen > 0: result = getResult(True, '数据已存在!') elif dataLen == 0: result = getResult(main(dataSource)) result['timestamp'] = int(time.time()*1000) resultStr = json.dumps(result, skipkeys=True, ensure_ascii=False) print(resultStr) return resultStr
if __name__ == '__main__': from waitress import serve serve(app, host='0.0.0.0', port=5000)
|