利用python实现多线程抓取知乎用户方法

需要用到的包:

beautifulsoup4html5libimagerequestsredispymysql

pip安装所有依赖包:

pip install \
image \
requests \
beautifulsoup4 \
html5lib \
redis \
pymysql

运行环境需要支持中文测试运行环境python3.5,不保证其他运行环境能完美运行需要安装mysql和redis 配置

config.ini

文件,设置好mysql和redis,并且填写你的知乎帐号向数据库导入

init.sql

run 开始抓取数据:

python get_user.py

查看抓取数量:

python check_redis.py

效果 原因在于普通的连接是keep-alive的但是却又没有关闭

# http请求的header
headers = {
“user-agent”: “mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/53.0.2785.143 safari/537.36”,
“accept”: “text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8”,
“host”: “www.zhihu.com”,
“referer”: “https://www.zhihu.com/”,
“origin”: “https://www.zhihu.com/”,
“upgrade-insecure-requests”: “1”,
“content-type”: “application/x-www-form-urlencoded; charset=utf-8”,
“pragma”: “no-cache”,
“accept-encoding”: “gzip, deflate, br”,
‘connection’: ‘close’
}
# 验证是否登陆
def check_login(self):
check_url = ‘https://www.zhihu.com/settings/profile’
try:
login_check = self.__session.get(check_url, headers=self.headers, timeout=35)
except exception as err:
print(traceback.print_exc())
print(err)
print(“验证登陆失败,请检查网络”)
sys.exit()
print(“验证登陆的http status code为:” + str(login_check.status_code))
if int(login_check.status_code) == 200:
return true
else:
return false

进入首页查看http状态码来验证是否登陆,200为已经登陆,一般304就是被重定向所以就是没有登陆

# 获取验证码
def get_captcha(self):
t = str(time.time() * 1000)
captcha_url = ‘http://www.zhihu.com/captcha.gif?r=’ + t + “&type=login”
r = self.__session.get(captcha_url, headers=self.headers, timeout=35)
with open(‘captcha.jpg’, ‘wb’) as f:
f.write(r.content)
f.close()
# 用pillow 的 image 显示验证码
# 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入
”’try:
im = image.open(‘captcha.jpg’)
im.show()
im.close()
except:”’
print(u’请到 %s 目录找到captcha.jpg 手动输入’ % os.path.abspath(‘captcha.jpg’))
captcha = input(“请输入验证码\n>”)
return captcha

获取验证码的方法。当登录次数太多有可能会要求输入验证码,这里实现这个功能

# 获取xsrf
def get_xsrf(self):
index_url = ‘http://www.zhihu.com’
# 获取登录时需要用到的_xsrf
try:
index_page = self.__session.get(index_url, headers=self.headers, timeout=35)
except:
print(‘获取知乎页面失败,请检查网络连接’)
sys.exit()
html = index_page.text
# 这里的_xsrf 返回的是一个list
bs = beautifulsoup(html, ‘html.parser’)
xsrf_input = bs.find(attrs={‘name’: ‘_xsrf’})
pattern = r’value=\”(.*?)\”‘
print(xsrf_input)
self.__xsrf = re.findall(pattern, str(xsrf_input))
return self.__xsrf[0]

获取xsrf,为什么要获取xsrf呢,因为xsrf是一种防止跨站攻击的手段,具体介绍可以看这里csrf在获取到xsrf之后把xsrf存入cookie当中,并且在调用api的时候带上xsrf作为头部,不然的话知乎会返回403

# 进行模拟登陆
def do_login(self):
try:
# 模拟登陆
if self.check_login():
print(‘您已经登录’)
return
else:
if self.config.get(“zhihu_account”, “username”) and self.config.get(“zhihu_account”, “password”):
self.username = self.config.get(“zhihu_account”, “username”)
self.password = self.config.get(“zhihu_account”, “password”)
else:
self.username = input(‘请输入你的用户名\n> ‘)
self.password = input(“请输入你的密码\n> “)
except exception as err:
print(traceback.print_exc())
print(err)
sys.exit()
if re.match(r”^1\d{10}$”, self.username):
print(“手机登陆\n”)
post_url = ‘http://www.zhihu.com/login/phone_num’
postdata = {
‘_xsrf’: self.get_xsrf(),
‘password’: self.password,
‘remember_me’: ‘true’,
‘phone_num’: self.username,
}
else:
print(“邮箱登陆\n”)
post_url = ‘http://www.zhihu.com/login/email’
postdata = {
‘_xsrf’: self.get_xsrf(),
‘password’: self.password,
‘remember_me’: ‘true’,
’email’: self.username,
}
try:
login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35)
login_text = json.loads(login_page.text.encode(‘latin-1’).decode(‘unicode-escape’))
print(postdata)
print(login_text)
# 需要输入验证码 r = 0为登陆成功代码
if login_text[‘r’] == 1:
sys.exit()
except:
postdata[‘captcha’] = self.get_captcha()
login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35)
print(json.loads(login_page.text.encode(‘latin-1’).decode(‘unicode-escape’)))
# 保存登陆cookie
self.__session.cookies.save()

这个就是核心的登陆功能啦,非常关键的就是用到了requests库,非常方便的保存到session我们这里全局都是用单例模式,统一使用同一个requests.session对象进行访问功能,保持登录状态的一致性最后主要调用登陆的代码为

# 创建login对象
lo = login.login.login(self.session)
# 模拟登陆
if lo.check_login():
print(‘您已经登录’)
else:
if self.config.get(“zhihu_account”, “username”) and self.config.get(“zhihu_account”, “username”):
username = self.config.get(“zhihu_account”, “username”)
password = self.config.get(“zhihu_account”, “password”)
else:
username = input(‘请输入你的用户名\n> ‘)
password = input(“请输入你的密码\n> “)
lo.do_login(username, password)

知乎模拟登陆到此就完成啦知乎用户抓取

def __init__(self, thread’):
# 多线程
print(“线程” + str(threadid) + “初始化”)
threading.thread.__init__(self)
self.threadid = threadid
self.name = name
try:
print(“线程” + str(threadid) + “初始化成功”)
except exception as err:
print(err)
print(“线程” + str(threadid) + “开启失败”)
self.threadlock = threading.lock()
# 获取配置
self.config = configparser.configparser()
self.config.read(“config.ini”)
# 初始化session
requests.adapters.default_retries = 5
self.session = requests.session()
self.session.cookies = cookielib.lwpcookiejar(filename=’cookie’)
self.session.keep_alive = false
try:
self.session.cookies.load(ignore_discard=true)
except:
print(‘cookie 未能加载’)
finally:
pass
# 创建login对象
lo = login(self.session)
lo.do_login()
# 初始化redis连接
try:
redis_host = self.config.get(“redis”, “host”)
redis_port = self.config.get(“redis”, “port”)
self.redis_con = redis.redis(host=redis_host, port=redis_port, db=0)
# 刷新redis库
# self.redis_con.flushdb()
except:
print(“请安装redis或检查redis连接配置”)
sys.exit()
# 初始化数据库连接
try:
db_host = self.config.get(“db”, “host”)
db_port = int(self.config.get(“db”, “port”))
db_user = self.config.get(“db”, “user”)
db_pass = self.config.get(“db”, “password”)
db_db = self.config.get(“db”, “db”)
db_charset = self.config.get(“db”, “charset”)
self.db = pymysql.connect(host=db_host, port=db_port, user=db_user, passwd=db_pass, db=db_db,
charset=db_charset)
self.db_cursor = self.db.cursor()
except:
print(“请检查数据库配置”)
sys.exit()
# 初始化系统设置
self.max_queue_len = int(self.config.get(“sys”, “max_queue_len”))

这个是get_user.py的构造函数,主要功能就是初始化mysql连接、redis连接、验证登陆、生成全局的session对象、导入系统配置、开启多线程。

# 获取首页html
def get_index_page(self):
index_url = ‘https://www.zhihu.com/’
try:
index_html = self.session.get(index_url, headers=self.headers, timeout=35)
except exception as err:
# 出现异常重试
print(“获取页面失败,正在重试……”)
print(err)
traceback.print_exc()
return none
finally:
pass
return index_html.text
# 获取单个用户详情页面
def get_user_page(self, name_url):
user_page_url = ‘https://www.zhihu.com’ + str(name_url) + ‘/about’
try:
index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
except exception as err:
# 出现异常重试
print(“失败name_url:” + str(name_url) + “获取页面失败,放弃该用户”)
print(err)
traceback.print_exc()
return none
finally:
pass
return index_html.text
# 获取粉丝页面
def get_follower_page(self, name_url):
user_page_url = ‘https://www.zhihu.com’ + str(name_url) + ‘/followers’
try:
index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
except exception as err:
# 出现异常重试
print(“失败name_url:” + str(name_url) + “获取页面失败,放弃该用户”)
print(err)
traceback.print_exc()
return none
finally:
pass
return index_html.text
def get_following_page(self, name_url):
user_page_url = ‘https://www.zhihu.com’ + str(name_url) + ‘/followers’
try:
index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
except exception as err:
# 出现异常重试
print(“失败name_url:” + str(name_url) + “获取页面失败,放弃该用户”)
print(err)
traceback.print_exc()
return none
finally:
pass
return index_html.text
# 获取首页上的用户列表,存入redis
def get_index_page_user(self):
index_html = self.get_index_page()
if not index_html:
return
bs = beautifulsoup(index_html, “html.parser”)
self.get_xsrf(index_html)
user_a = bs.find_all(“a”, class_=”author-link”) # 获取用户的a标签
for a in user_a:
if a:
self.add_wait_user(a.get(‘href’))
else:
continue

这一部分的代码就是用于抓取各个页面的html代码

# 加入带抓取用户队列,先用redis判断是否已被抓取过
def add_wait_user(self, name_url):
# 判断是否已抓取
self.threadlock.acquire()
if not self.redis_con.hexists(‘already_get_user’, name_url):
self.counter += 1
print(name_url + ” 加入队列”)
self.redis_con.hset(‘already_get_user’, name_url, 1)
self.redis_con.lpush(‘user_queue’, name_url)
print(“添加用户 ” + name_url + “到队列”)
self.threadlock.release()
# 获取页面出错移出redis
def del_already_user(self, name_url):
self.threadlock.acquire()
if not self.redis_con.hexists(‘already_get_user’, name_url):
self.counter -= 1
self.redis_con.hdel(‘already_get_user’, name_url)
self.threadlock.release()

用户加入redis的操作,在数据库插入出错时我们调用del_already_user删除插入出错的用户

# 分析粉丝页面获取用户的所有粉丝用户
# @param follower_page get_follower_page()中获取到的页面,这里获取用户hash_id请求粉丝接口获取粉丝信息
def get_all_follower(self, name_url):
follower_page = self.get_follower_page(name_url)
# 判断是否获取到页面
if not follower_page:
return
bs = beautifulsoup(follower_page, ‘html.parser’)
# 获取关注者数量
follower_num = int(bs.find(‘span’, text=’关注者’).find_parent().find(‘strong’).get_text())
# 获取用户的hash_id
hash_id = \
json.loads(bs.select(“#zh-profile-follows-list”)[0].select(“.zh-general-list”)[0].get(‘data-init’))[
‘params’][
‘hash_id’]
# 获取关注者列表
self.get_xsrf(follower_page) # 获取xsrf
post_url = ‘https://www.zhihu.com/node/profilefollowerslistv2’
# 开始获取所有的关注者 math.ceil(follower_num/20)*20
for i in range(0, math.ceil(follower_num / 20) * 20, 20):
post_data = {
‘method’: ‘next’,
‘params’: json.dumps({“offset”: i, “order_by”: “created”, “hash_id”: hash_id})
}
try:
j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode(
‘latin-1’).decode(
‘unicode-escape’)
pattern = re.compile(r”zm-item-link-avatar\”[^\”]*\”([^\”]*)”, re.dotall)
j = pattern.findall(j)
for user in j:
user = user.replace(‘\\’, ”)
self.add_wait_user(user) # 保存到redis
except exception as err:
print(“获取正在关注失败”)
print(err)
traceback.print_exc()
pass
# 获取正在关注列表
def get_all_following(self, name_url):
following_page = self.get_following_page(name_url)
# 判断是否获取到页面
if not following_page:
return
bs = beautifulsoup(following_page, ‘html.parser’)
# 获取关注者数量
following_num = int(bs.find(‘span’, text=’关注了’).find_parent().find(‘strong’).get_text())
# 获取用户的hash_id
hash_id = \
json.loads(bs.select(“#zh-profile-follows-list”)[0].select(“.zh-general-list”)[0].get(‘data-init’))[
‘params’][
‘hash_id’]
# 获取关注者列表
self.get_xsrf(following_page) # 获取xsrf
post_url = ‘https://www.zhihu.com/node/profilefolloweeslistv2’
# 开始获取所有的关注者 math.ceil(follower_num/20)*20
for i in range(0, math.ceil(following_num / 20) * 20, 20):
post_data = {
‘method’: ‘next’,
‘params’: json.dumps({“offset”: i, “order_by”: “created”, “hash_id”: hash_id})
}
try:
j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode(
‘latin-1’).decode(
‘unicode-escape’)
pattern = re.compile(r”zm-item-link-avatar\”[^\”]*\”([^\”]*)”, re.dotall)
j = pattern.findall(j)
for user in j:
user = user.replace(‘\\’, ”)
self.add_wait_user(user) # 保存到redis
except exception as err:
print(“获取正在关注失败”)
print(err)
traceback.print_exc()
pass

调用知乎的api,获取所有的关注用户列表和粉丝用户列表,递归获取用户这里需要注意的是头部要记得带上xsrf不然会抛出403

# 分析about页面,获取用户详细资料
def get_user_info(self, name_url):
about_page = self.get_user_page(name_url)
# 判断是否获取到页面
if not about_page:
print(“获取用户详情页面失败,跳过,name_url:” + name_url)
return
self.get_xsrf(about_page)
bs = beautifulsoup(about_page, ‘html.parser’)
# 获取页面的具体数据
try:
nickname = bs.find(“a”, class_=”name”).get_text() if bs.find(“a”, class_=”name”) else ”
user_type = name_url[1:name_url.index(‘/’, 1)]
self_domain = name_url[name_url.index(‘/’, 1) + 1:]
gender = 2 if bs.find(“i”, class_=”icon icon-profile-female”) else (1 if bs.find(“i”, class_=”icon icon-profile-male”) else 3)
follower_num = int(bs.find(‘span’, text=’关注者’).find_parent().find(‘strong’).get_text())
following_num = int(bs.find(‘span’, text=’关注了’).find_parent().find(‘strong’).get_text())
agree_num = int(re.findall(r'(.*).*赞同’, about_page)[0])
appreciate_num = int(re.findall(r'(.*).*感谢’, about_page)[0])
star_num = int(re.findall(r'(.*).*收藏’, about_page)[0])
share_num = int(re.findall(r'(.*).*分享’, about_page)[0])
browse_num = int(bs.find_all(“span”, class_=”zg-gray-normal”)[2].find(“strong”).get_text())
trade = bs.find(“span”, class_=”business item”).get(‘title’) if bs.find(“span”,
class_=”business item”) else ”
company = bs.find(“span”, class_=”employment item”).get(‘title’) if bs.find(“span”,
class_=”employment item”) else ”
school = bs.find(“span”, class_=”education item”).get(‘title’) if bs.find(“span”,
class_=”education item”) else ”
major = bs.find(“span”, class_=”education-extra item”).get(‘title’) if bs.find(“span”,
class_=”education-extra item”) else ”
job = bs.find(“span”, class_=”position item”).get_text() if bs.find(“span”,
class_=”position item”) else ”
location = bs.find(“span”, class_=”location item”).get(‘title’) if bs.find(“span”,
class_=”location item”) else ”
description = bs.find(“p”, class_=”bio ellipsis”).get(‘title’) if bs.find(“p”,
class_=”bio ellipsis”) else ”
ask_num = int(bs.find_all(“a”, class_=’item’)[1].find(“span”).get_text()) if \
bs.find_all(“a”, class_=’item’)[
1] else int(0)
answer_num = int(bs.find_all(“a”, class_=’item’)[2].find(“span”).get_text()) if \
bs.find_all(“a”, class_=’item’)[
2] else int(0)
article_num = int(bs.find_all(“a”, class_=’item’)[3].find(“span”).get_text()) if \
bs.find_all(“a”, class_=’item’)[3] else int(0)
collect_num = int(bs.find_all(“a”, class_=’item’)[4].find(“span”).get_text()) if \
bs.find_all(“a”, class_=’item’)[4] else int(0)
public_edit_num = int(bs.find_all(“a”, class_=’item’)[5].find(“span”).get_text()) if \
bs.find_all(“a”, class_=’item’)[5] else int(0)
replace_data = \
(pymysql.escape_string(name_url), nickname, self_domain, user_type,
gender, follower_num, following_num, agree_num, appreciate_num, star_num, share_num, browse_num,
trade, company, school, major, job, location, pymysql.escape_string(description),
ask_num, answer_num, article_num, collect_num, public_edit_num)
replace_sql = ”’replace into
user(url,nickname,self_domain,user_type,
gender, follower,following,agree_num,appreciate_num,star_num,share_num,browse_num,
trade,company,school,major,job,location,description,
ask_num,answer_num,article_num,collect_num,public_edit_num)
values(%s,%s,%s,%s,
%s,%s,%s,%s,%s,%s,%s,%s,
%s,%s,%s,%s,%s,%s,%s,
%s,%s,%s,%s,%s)”’
try:
print(“获取到数据:”)
print(replace_data)
self.db_cursor.execute(replace_sql, replace_data)
self.db.commit()
except exception as err:
print(“插入数据库出错”)
print(“获取到数据:”)
print(replace_data)
print(“插入语句:” + self.db_cursor._last_executed)
self.db.rollback()
print(err)
traceback.print_exc()
except exception as err:
print(“获取数据出错,跳过用户”)
self.redis_con.hdel(“already_get_user”, name_url)
self.del_already_user(name_url)
print(err)
traceback.print_exc()
pass

最后,到用户的about页面,分析页面元素,利用正则或者beatifulsoup分析抓取页面的数据这里我们sql语句用replace into而不用insert into,这样可以很好的防止数据重复问题

# 开始抓取用户,程序总入口
def entrance(self):
while 1:
if int(self.redis_con.llen(“user_queue”)) < 1: self.get_index_page_user() else: # 出队列获取用户name_url redis取出的是byte,要decode成utf-8 name_url = str(self.redis_con.rpop("user_queue").decode('utf-8')) print("正在处理name_url:" + name_url) self.get_user_info(name_url) if int(self.redis_con.llen("user_queue"))

Posted in 未分类

发表评论