python

超轻量级php框架startmvc

Python 微信爬虫完整实例【单线程与多线程】

更新时间:2020-07-11 22:12 作者:startmvc
本文实例讲述了Python实现的微信爬虫。分享给大家供大家参考,具体如下:单线程版:import

本文实例讲述了Python 实现的微信爬虫。分享给大家供大家参考,具体如下:

单线程版:


import urllib.request
import urllib.parse
import urllib.error
import re,time
headers = ("User-Agent",
 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
list_url = []
###使用代理获取网页url内容
def use_proxy(url):
 try:
 # proxy = urllib.request.ProxyHandler({'http':proxy_addr})    ##使用代理版
 # operner = urllib.request.build_opener()
 # urllib.request.install_opener(operner)
 headers = ("User-Agent",
 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
 operner = urllib.request.build_opener()
 operner.addheaders = [headers]
 urllib.request.install_opener(operner)
 data = urllib.request.urlopen(url).read().decode('utf-8')
 # print (data)
 return data
 except urllib.error.URLError as e:
 if hasattr(e, "code"):
 print(e.code)
 elif hasattr(e, "reason"):
 print(e.reason)
 except Exception as e:
 print("exception" + str(e))
 time.sleep(1)
##获取要爬取的url
def get_url(key, pagestart, pageend):
 try:
 keycode = urllib.parse.quote(key)
 for page in range(pagestart, pageend + 1):
 url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (
 keycode, page)
 data1 = use_proxy(url)
 #print("data1的内容是", data1)
 listurl_pattern = '<h3>.*?("http://.*?)</h3>'
 result = re.compile(listurl_pattern, re.S).findall(data1)
 for i in range(len(result)):
 res = result[i].replace("amp;", "").split(" ")[0].replace("\"", "")
 list_url.append(res)
 #print(list_url)
 return list_url
 except urllib.error.URLError as e:
 if hasattr(e, "code"):
 print(e.code)
 elif hasattr(e, "reason"):
 print(e.reason)
 except Exception as e:
 print("exception:", e)
##通过获取的url爬行内容数据并处理
def get_url_content(list_url):
 fh1=open("D:\\python-script\\1.html", 'wb')
 html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>'''
 fh1.write(html1.encode("utf-8"))
 fh1.close()
 fh = open("D:\\python-script\\1.html", 'ab')
 for url in list_url:
 data_content = use_proxy(url)
 #print (data_content)
 #sys.exit()
 title_pattern = '<h2.*>.*?</h2>'
 result_title = re.compile(title_pattern, re.S).findall(data_content)
 ##标题(str)
 res_title = result_title[0].replace("<h2 class=\"rich_media_title\" id=\"activity-name\">", "").replace("</h2>",
 "").strip()
 content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">'
 content = re.compile(content_pattern, re.S).findall(data_content)
 try:
 fh.write(res_title.encode("utf-8"))
 for i in content:
 fh.write(i.strip().encode("utf-8"))
 except UnicodeEncodeError as e:
 continue
 fh.write("</body></html>".encode("utf-8"))
if __name__ == '__main__':
 pagestart = 1
 pageend = 2
 key = "人工智能"
 get_url(key, pagestart, pageend)
 get_url_content(list_url)

多线程版:


import urllib.request
import urllib.parse
import urllib.error
import re,time
import queue
import threading
headers = ("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
operner = urllib.request.build_opener()
operner.addheaders = [headers]
urllib.request.install_opener(operner)
urlque = queue.Queue()
list_url = []
###使用代理获取网页url内容
def use_proxy(url):
 try:
 # proxy = urllib.request.ProxyHandler({'http':proxy_addr})
 # operner = urllib.request.build_opener()
 # urllib.request.install_opener(operner)
 headers = ("User-Agent",
 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3107.4 Safari/537.36")
 operner = urllib.request.build_opener()
 operner.addheaders = [headers]
 urllib.request.install_opener(operner)
 data = urllib.request.urlopen(url).read().decode('utf-8')
 #print (data)
 return data
 except urllib.error.URLError as e:
 if hasattr(e,"code"):
 print (e.code)
 elif hasattr(e,"reason"):
 print (e.reason)
 except Exception as e:
 print ("exception"+str(e))
 time.sleep(1)
###获取文章的url连接,并将连接加入到队列
class get_url(threading.Thread):
 def __init__(self,key,pagestart,pageend,urlque):
 threading.Thread.__init__(self)
 self.pagestart = pagestart
 self.pageend = pageend
 self.key = key
 self.urlque = urlque
 def run(self):
 try:
 keycode = urllib.parse.quote(self.key)
 for page in range(self.pagestart,self.pageend+1):
 url = "http://weixin.sogou.com/weixin?query=%s&_sug_type_=&s_from=input&_sug_=n&type=%d&page=1&ie=utf8" % (keycode,page)
 data = use_proxy(url)
 print ("data1的内容是",data)
 listurl_pattern = '<h3>.*?("http://.*?)</h3>'
 result = re.compile(listurl_pattern,re.S).findall(data)
 print (result)
 if len(result) == 0:
 print ("没有可用的url")
 sys.exit()
 for i in range(len(result)):
 res = result[i].replace("amp;","").split(" ")[0].replace("\"" ,"")
 #list_url.append(res) #加入列表
 self.urlque.put(res) ##加入队列
 self.urlque.task_done()
 #return list_url
 except urllib.error.URLError as e:
 if hasattr(e, "code"):
 print(e.code)
 elif hasattr(e, "reason"):
 print(e.reason)
 except Exception as e:
 print ("exception:",e)
##根据url获取文章内容
class get_url_content(threading.Thread):
 def __init__(self,urlque):
 threading.Thread.__init__(self)
 self.urlque = urlque
 def run(self):
 fh1 = open("D:\\python-script\\1.html", 'wb')
 html1 = '''<!DOCTYPE html>\n<html xmlns="http://www.w3.org/1999/xhmtl">\n<head>\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8">\n<title>微信文章</title></head>\n<body>'''
 fh1.write(html1.encode("utf-8"))
 fh1.close()
 fh = open("D:\\python-script\\1.html", 'ab')
 while True:
 try:
 url = self.urlque.get()
 data_content = use_proxy(url)
 title_pattern = '<h2.*>.*?</h2>'
 result_title = re.compile(title_pattern, re.S).findall(data_content)
 ##标题
 res_title = result_title[0].replace("<h2 class=\"rich_media_title\" id=\"activity-name\">", "").replace("</h2>","").strip()
 content_pattern = 'id="js_content">(.*?)<div class="rich_media_tool" id="js_sg_bar">'
 content = re.compile(content_pattern, re.S).findall(data_content)
 #c = '<p style="max-width: 100%;box-sizing: border-box;min-height: 1em;text-indent: 2em;word-wrap: break-word !important;">'
 # for i in content:
 # ##内容
 # c_content=i.replace(c, "").replace("<br /></p>", "").replace("</p>", "")
 fh.write(res_title.encode("utf-8"))
 for i in content:
 fh.write(i.strip().encode("utf-8"))
 except UnicodeEncodeError as e:
 continue
 fh.close()
class contrl(threading.Thread):
 def __init__(self,urlqueue):
 threading.Thread.__init__(self)
 self.urlqueue = urlqueue
 while True:
 print ("程序正在执行")
 if self.urlqueue.empty():
 time.sleep(3)
 print ("程序执行完毕")
 exit()
if __name__ == '__main__':
 pagestart = 1
 pageend = 2
 key = "人工智能"
 get_url = get_url(key,pagestart,pageend,urlque)
 get_url.start()
 get_content = get_url_content(urlque)
 get_content.start()
 cntrol = contrl(urlque)
 cntrol.start()