油猴插件之表格数据获取

插件的作用是辅助我们精准且快速获取信息,提升效率。
表格数据获取核心逻辑都在python后台程序。
主要流程如下:

  1. 将交互按钮插入页面
  2. 点击按钮后发送页面信息到后台
  3. 后台通过插件获取数据并清洗
  4. 存入数据库

如何使用tampermonkey

看官方教程,推荐使用edge浏览器
https://www.tampermonkey.net/

包含表格的网站

在插件脚本 ==UserScript== 中配置目标网站

1
2
3
4
5
6
7
8
9
10
11
// ==UserScript==
// @name 抓取表格数据
// @namespace http://tampermonkey.net/
// @version 0.1
// @description try to take over the world!
// @author You
// @match http://sh.bendibao.com/*
// @icon https://www.google.com/s2/favicons?sz=64&domain=
// @grant GM_xmlhttpRequest
// @require https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js
// ==/UserScript==

例如:http://sh.bendibao.com/news/2022225/248869.shtm

目标数据

  1. 包含表格的网站或pdf文件
  2. 常规表格
  3. 跨行或跨列的复杂表格
    我们需要复杂表格(3)转换为常规表格(2)

页面插入按钮

我们需要有选择的抓取数据,在页面中插入按钮

1
2
3
4
let downloadLink = document.createElement("div");
downloadLink.innerHTML =
"<a onclick=\"window.postMessage('pluginGetData', '*')\" style=\"display:block;width:300px;height:100px; line-height: 100px; position:fixed; top:10px;right:10px;z-index:999999; background-color:#c4261d; box-shadow: 1px 2px 3px #000; color:#fff; font-size: 28px; text-align:center; cursor: pointer;\">》》获取数据《《</a>";
document.body.insertBefore(downloadLink, document.body.children[0]);

GM_ 与 window不能同时使用,因为GM_*在沙盒中运行。点击按钮后发送postMessage消息

1
window.postMessage('pluginGetData', '*')

页面与沙盒通信

接收页面消息,原页面与沙盒通信

1
2
3
4
5
6
7
8
window.addEventListener("message", receiveMessage, false);

function receiveMessage(event) {
console.log("receiveMessage", event);
if (event.data.includes("pluginGetData")) {
pluginGetData();
}
}

数据上报

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(function() {
'use strict';
GM_xmlhttpRequest({
url:"http://localhost:5000/parseSchoolData", // 本地调试
method :"POST",
data:JSON.stringify({
url:location.href
}),
headers: {
"Content-type": "application/json"
},
onload:function(xhr){
console.log(xhr.responseText);
// alert("链接" + location.href + "上报成功!")
alert(xhr.responseText)
}
});
// Your code here...
})();

后台程序

主程序,开启http服务
index.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import pandas as pd
import requests
import time
import pdfplumber
import os
import io
from flask import Flask, redirect, url_for, request, render_template
from urllib.parse import urlencode
from bs4 import BeautifulSoup
from lxml import etree
import json
import storage.parse
from query.parse import checkDataSource

app = Flask(__name__)

'''
目前仅支持如下数据类型(域名):
dataType1: { # Table园所列表
dataType1_1: 'http://sh.bendibao.com/news/2022225/248869.shtm'
dataType1_2: 'http://jy.tj.gov.cn/BMFW/JYZC5803/YEY787/202111/t20211129_5736214.html'
}
dataType2: { # Table单园所信息
dataType2_1: 'http://www.lg.gov.cn/zwfw/zdfw/jy/mdml/yeymdml/content/post_9766183.html'
}
dataType3: { # PDF园所列表
dataType3_1: 'http://szeb.sz.gov.cn/attachment/1/1181/1181674/10052534.pdf'
}
'''

# 网页内容提取
def getPage(url):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'
}
response = requests.get(url, headers=headers)
response.encoding = 'utf-8'
if response.status_code == 200:
return response.text
return None
except BaseException as error:
print('def getPage error:', error, '网页内容提取失败!')

# 网页内容解析(dataType1、dataType3)
def parsePage(html):
try:
return pd.read_html(html, encoding='utf-8', header=0)[0]
except BaseException as error:
print('def parsePage error:', error, '未找到表格!')
return False

# 网页内容解析(dataType2)
def parseSchoolPage(html):
try:
return pd.read_html(html, encoding='utf-8')[0]
except BaseException as error:
print('def parseSchoolPage error:', error, '未找到表格!')
return False

# PDF下载并保存临时目录
def getPDF(url):
response = requests.get(url)
pdfPath = f'temp/pdf/~temp{time.time()*1000}~.pdf'
with open(pdfPath, 'wb+') as f:
f.write(response.content)
f.flush()
return pdfPath

# PDF内容解析
def parsePDF(pdfPath):
pdf = pdfplumber.open(pdfPath)
pages = pdf.pages
table = []
for page in pages:
table += page.extract_tables()[0]
pdf.close()
return table

# 重命名columns
def renameColumns(df):
# dataType1_1
df.rename(columns={ '级别': 'kindergartenLevel',
'性质': 'nature',
'区县': 'district',
'名称': 'schoolName',
'园部': 'division',
'地址': 'address',
'电话': 'phone',
}, inplace=True)
# dataType1_2
df.rename(columns={ '幼儿园名称': 'schoolName',
'办别': 'system',
'举办者': 'organizer',
'幼儿园地址': 'address',
'办公电话': 'phone',
}, inplace=True)
# dataType2_1
df.rename(columns={ '幼儿园名称:': 'schoolName',
'性质:': 'system',
'所在街道:': 'street',
'教师人数:': 'teacherNumber',
'详细地址:': 'address',
'园区简介:': 'schoolIntroduction',
'园长介绍:': 'leaderIntroduction',
'师资力量:': 'TeachersStrength',
'教学环境:': 'teachingEnvironment',
'收费标准:': 'feeStandard',
'交通路线:': 'trafficRoutes',
'主管部门:': 'competentDepartment',
'联系电话:': 'phone',
'所在社区:': 'community',
'学生人数:': 'studentNumber',
}, inplace=True)
# dataType3_1
df.rename(columns={ '序号': 'index',
'幼儿园名称': 'schoolName',
'办学许可证编号或事业单\n位法人证书编号或社会统\n一信用代码': 'uniformCreditCode',
'实际办学地址': 'address',
'幼儿园所属街道、社\n区': 'street',
'举办者': 'organizer',
'法定代表人': 'legalPerson',
'园长': 'leaderName',
'联系电话': 'phone',
'办园类型': 'system',
'民办园年\n检结论': 'annualInspection',
'保教费实\n际收取标\n准(元\n/月)': 'feeStandard',
}, inplace=True)

# 数据存储
def save2db(df, url):
if len(df) > 0:
storage.parse.batchSave(df, url)
return True
else:
return False
# df.to_csv(r'temp/csv/temp' + '%d' %time.time() + '.csv', index=False)

# 返回结果
def getResult(success = True, message = ''):
return {
'code': 1 if success else 9,
'message': (message or '操作成功!') if success else (message or '操作失败!')
}

# 主函数
def main(url):
try:
# dataType3_1
if ('.pdf' in url):
pdfPath = os.path.abspath(getPDF(url))
table = parsePDF(pdfPath)
os.remove(pdfPath)
dataFrame = pd.DataFrame(table[1:(len(table)-1)], columns=table[0])
else:
# dataType2_1
if('www.lg.gov.cn/zwfw/zdfw/jy/mdml/yeymdml/content' in url):
html = getPage(url)
table = parseSchoolPage(html)
dataFrame = pd.DataFrame().append(table)
# 切割前两列数据,行列互换,重置序号
dataFrame0_0_1_2 = dataFrame.iloc[:,:2].transpose().reset_index(drop=True)
# 切割前两列、前四排数据,行列互换,重置序号
dataFrame0_4_2_4 = dataFrame.iloc[:4,2:4].transpose().reset_index(drop=True)
# 合并
dataFrame = pd.concat([dataFrame0_0_1_2, dataFrame0_4_2_4], axis=1)
# 移除header,将第一行设为header
dataFrame = pd.read_csv(io.StringIO(u""+dataFrame.to_csv(header=False, index=False)), header=0)
else:
# dataType1
html = getPage(url)
table = parsePage(html)
dataFrame = pd.DataFrame().append(table)
renameColumns(dataFrame)
return save2db(dataFrame, url)
except BaseException as error:
print('def main error:', error)
return False

# 路由
@app.route('/parseSchoolData', methods = ['POST'])
def postPage():
data = json.loads(json.dumps(request.json))
dataSource = data['url']
dataLen = checkDataSource(dataSource)
result = {}
if dataLen == -1:
result = getResult(False, '查询失败!')
elif dataLen > 0:
result = getResult(True, '数据已存在!')
elif dataLen == 0:
result = getResult(main(dataSource))
result['timestamp'] = int(time.time()*1000)
resultStr = json.dumps(result, skipkeys=True, ensure_ascii=False)
print(resultStr)
return resultStr

# 单进程
if __name__ == '__main__':
# Debug/Development
# app.run(debug=True, host='0.0.0.0', port=5000)
# Production
from waitress import serve
serve(app, host='0.0.0.0', port=5000)

连接Parse服务

engine/parse.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import http.client
from props import props

parseConnection = http.client.HTTPConnection(props['apiBase'], 80)
parseConnection.connect()

parseHeaders = {
'X-Parse-Application-Id': props['parseApplicationId'],
'X-Parse-REST-API-Key': '',
'Content-Type': 'application/json'
}
parseApi = {
'PythonSpiderSchool' : '/parse/classes/PythonSpiderSchool'
}

数据查询

防止重复写入
query/parse.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json
import urllib.parse
from engine.parse import parseConnection, parseHeaders, parseApi

# 查询"数据来源"是否存在
def checkDataSource(dataSource):
try:
paramsStr = urllib.parse.urlencode({
'where': json.dumps({
'dataSource': dataSource
}),
'limit': 1
})
parseConnection.request(
'GET',
parseApi['PythonSpiderSchool'] + '?%s' % paramsStr,
'',
parseHeaders
)
readStr = parseConnection.getresponse().read()
readDict = json.loads(readStr)
return len(readDict['results'])
except BaseException as error:
print('def checkDataSource error:', error)
return -1

数据存储

数据写入
storage/parse.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
from engine.parse import parseConnection, parseHeaders, parseApi

# 单条保存
def save(dataFrame, dataSource):
jsonStr = dataFrame.to_json(orient ='records', force_ascii = False)
jsonDict = json.loads(jsonStr)[0]
jsonDict['dataSource'] = dataSource
body = json.dumps(jsonDict)
parseConnection.request(
'POST',
parseApi['PythonSpiderSchool'],
body,
parseHeaders
)
# results = json.loads(parseConnection.getresponse().read())
# print(results)

# 批量保存
def batchSave(dataFrame, dataSource):
jsonStr = dataFrame.to_json(orient ='records', force_ascii = False)
jsonDict = json.loads(jsonStr)
list = []
for dictItem in jsonDict:
dictItem['dataSource'] = dataSource
listItem = {
'method': 'POST',
'path': parseApi['PythonSpiderSchool'],
'body': dictItem
}
list.append(listItem)
body = json.dumps({
'requests': list
})
parseConnection.request(
'POST',
'/parse/batch',
body,
parseHeaders
)
# results = json.loads(parseConnection.getresponse().read())
# print(results)

参考资料

https://zhuanlan.zhihu.com/p/423911750


油猴插件之表格数据获取
http://example.com/20230313-油猴插件之表格数据获取/
作者
csorz
发布于
2023年3月13日
许可协议