N0.32——应用Redis搭建爬虫的代理池

2023-09-25 5 0

        在做爬虫时,如果遇到反爬措施比较严格的网站,会遇到IP被封禁的问题,使用代理可以有效解决这个问题。这样,我们可以花钱在网上购买一些优质代理,同样,还可以在网上爬取一些免费的代理。不管是付费的还是免费的,同一个代理可能有多个人同时使用,所以不能保证每个代理都是有效的,基于此目的,我们可以维护一个维护一个代理池,提前把可用代理筛选出来,同时把不可用代理剔除掉,将可用代理保留下来。

      (一)准备工具

1、安装Redis数据库

2、安装aiohttp、requests、redis-py、pyquery、轻量级Flask库

      (二)代理池架构

1、存储模块。使用Redis的Sorted Set,即有序集合,保证代理不重复。

2、获取模块。从几个免费公开代理网站抓取可用代理,

https://proxy.mimvp.com/free.php?proxy=in_hp

http://www.coobobo.com/free-http-proxy

http://ip.zdaye.com/

http://www.mayidaili.com/free/anonymous/%E9%AB%98%E5%8C%BF

http://http.taiyangruanjian.com/

http://http.zhimaruanjian.com/

3、检测模块。用目标网站做测试,检测代理的可用性。检测一次,若可用,直接将分值设置为100,若不可用,分值减1,减到0则从代理池剔除。

4、接口模块。提供一个Web API接口,通过访问接口即可拿到可用代理。

5、调度模块。

      (三)代理池的实现

1、存储模块

import redis
from proxypool.error import PoolEmptyError
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY
from proxypool.setting import MAX_SCORE, MIN_SCORE, INITIAL_SCORE
from random import choice
import re#存储模块
class RedisClient(object):def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):"""初始化:param host: Redis 地址:param port: Redis 端口:param password: Redis密码"""self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)def add(self, proxy, score=INITIAL_SCORE):"""添加代理,设置分数为最高:param proxy: 代理:param score: 分数:return: 添加结果"""#    ***.***.***.***:***if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', proxy):print('代理不符合规范', proxy, '丢弃')return#如果有序的分数集合中不存在相关数据if not self.db.zscore(REDIS_KEY, proxy):#添加成员 新版本的redis以字典形式进行添加return self.db.zadd(REDIS_KEY, {proxy:score}) def random(self):"""随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常:return: 随机代理"""#返回指定分数区间range1 < score <= range2 的成员result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)if len(result):#返回随机项return choice(result)else:#降序排列result = self.db.zrevrange(REDIS_KEY, 0, 100)if len(result):return choice(result)else:raise PoolEmptyErrordef decrease(self, proxy):"""代理值减一分,小于最小值则删除:param proxy: 代理:return: 修改后的代理分数"""score = self.db.zscore(REDIS_KEY, proxy)if score and score > MIN_SCORE:print('代理', proxy, '当前分数', score, '减1')#对指定成员的分数添加一个增量   比如 ZINCRBY key -5 memberreturn self.db.zincrby(REDIS_KEY, -1, proxy )else:print('代理', proxy, '当前分数', score, '移除')#移除有序集合中的一个或多个成员return self.db.zrem(REDIS_KEY, proxy)def exists(self, proxy):"""判断是否存在:param proxy: 代理:return: 是否存在"""return not self.db.zscore(REDIS_KEY, proxy) == Nonedef max(self, proxy):"""将代理设置为MAX_SCORE:param proxy: 代理:return: 设置结果"""print('代理', proxy, '可用,设置为', MAX_SCORE)return self.db.zadd(REDIS_KEY, {proxy,MAX_SCORE})def count(self):"""获取数量:return: 数量"""#获取有序集合的成员数return self.db.zcard(REDIS_KEY)def all(self):"""获取全部代理:return: 全部代理列表"""return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)def batch(self, start, stop):"""批量获取:param start: 开始索引:param stop: 结束索引:return: 代理列表"""#降序排列return self.db.zrevrange(REDIS_KEY, start, stop - 1)if __name__ == '__main__':#声明RedisClient()类的对象conn = RedisClient()#调用conn对象的batch方法result = conn.batch(680, 688)print(result)

  录入的判断importer.py 

from proxypool.db import RedisClientconn = RedisClient()def set(proxy):result = conn.add(proxy)print(proxy)print('录入成功' if result else '录入失败')def scan():print('请输入代理, 输入exit退出读入')while True:proxy = input()if proxy == 'exit':breakset(proxy)if __name__ == '__main__':scan()

 

2、获取模块

          定义一个元类ProxyMetaclass,将获取代理的每个方法统一定义为以crawl_开头,以列表的形式存放在元类中。元类中实现了_new_()方法,其中有固有的几个参数,attrs包含类的一些属性,遍历attrs便可获取类的方法信息,像遍历字典,键名对应方法名。判断方法的开头是否有crawl_,有则加入_CrawlFunc_属性中。如果后期做扩展,直接添加以crawl_开头的方法。

          然后定义一个get_proxy()方法,将以crawl_开头的方法调用一遍,获取每个方法返回的代理并组合成列表形式返回。

import json
import re
from .utils import get_page
from pyquery import PyQuery as pq#获取模块
class ProxyMetaclass(type):def __new__(cls, name, bases, attrs):count = 0attrs['__CrawlFunc__'] = []for k, v in attrs.items():if 'crawl_' in k:attrs['__CrawlFunc__'].append(k)count += 1attrs['__CrawlFuncCount__'] = countreturn type.__new__(cls, name, bases, attrs)class Crawler(object, metaclass=ProxyMetaclass):def get_proxies(self, callback):proxies = []for proxy in eval("self.{}()".format(callback)):print('成功获取到代理', proxy)proxies.append(proxy)return proxies# def crawl_daxiang(self):#     url = 'http://vtp.daxiangdaili.com/ip/?tid=559363191592228&num=50&filter=on'#     html = get_page(url)#     if html:#         urls = html.split('\n')#         for url in urls:#             yield urldef crawl_daili66(self, page_count=4):"""获取代理66:param page_count: 页码:return: 代理"""start_url = 'http://www.66ip.cn/{}.html'urls = [start_url.format(page) for page in range(1, page_count + 1)]for url in urls:print('Crawling', url)html = get_page(url)if html:doc = pq(html)trs = doc('.containerbox table tr:gt(0)').items()for tr in trs:ip = tr.find('td:nth-child(1)').text()port = tr.find('td:nth-child(2)').text()yield ':'.join([ip, port])def crawl_proxy360(self):"""获取Proxy360:return: 代理"""start_url = 'http://www.proxy360.cn/Region/China'print('Crawling', start_url)html = get_page(start_url)if html:doc = pq(html)lines = doc('div[name="list_proxy_ip"]').items()for line in lines:ip = line.find('.tbBottomLine:nth-child(1)').text()port = line.find('.tbBottomLine:nth-child(2)').text()yield ':'.join([ip, port])def crawl_goubanjia(self):"""获取Goubanjia:return: 代理"""start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'html = get_page(start_url)if html:doc = pq(html)tds = doc('td.ip').items()for td in tds:td.find('p').remove()yield td.text().replace(' ', '')def crawl_ip181(self):start_url = 'http://www.ip181.com/'html = get_page(start_url)ip_address = re.compile('<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')# \s* 匹配空格,起到换行作用re_ip_address = ip_address.findall(html)for address,port in re_ip_address:result = address + ':' + portyield result.replace(' ', '')def crawl_ip3366(self):for page in range(1, 4):start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(page)html = get_page(start_url)ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')# \s * 匹配空格,起到换行作用re_ip_address = ip_address.findall(html)for address, port in re_ip_address:result = address+':'+ portyield result.replace(' ', '')def crawl_kxdaili(self):for i in range(1, 11):start_url = 'http://www.kxdaili.com/ipList/{}.html#ip'.format(i)html = get_page(start_url)ip_address = re.compile('<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')# \s* 匹配空格,起到换行作用re_ip_address = ip_address.findall(html)for address, port in re_ip_address:result = address + ':' + portyield result.replace(' ', '')def crawl_premproxy(self):for i in ['China-01','China-02','China-03','China-04','Taiwan-01']:start_url = 'https://premproxy.com/proxy-by-country/{}.htm'.format(i)html = get_page(start_url)if html:ip_address = re.compile('<td data-label="IP:port ">(.*?)</td>') re_ip_address = ip_address.findall(html)for address_port in re_ip_address:yield address_port.replace(' ','')def crawl_xroxy(self):for i in ['CN','TW']:start_url = 'http://www.xroxy.com/proxylist.php?country={}'.format(i)html = get_page(start_url)if html:ip_address1 = re.compile("title='View this Proxy details'>\s*(.*).*")re_ip_address1 = ip_address1.findall(html)ip_address2 = re.compile("title='Select proxies with port number .*'>(.*)</a>") re_ip_address2 = ip_address2.findall(html)for address,port in zip(re_ip_address1,re_ip_address2):address_port = address+':'+portyield address_port.replace(' ','')def crawl_kuaidaili(self):for i in range(1, 4):start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i)html = get_page(start_url)if html:ip_address = re.compile('<td data-title="IP">(.*?)</td>') re_ip_address = ip_address.findall(html)port = re.compile('<td data-title="PORT">(.*?)</td>')re_port = port.findall(html)for address,port in zip(re_ip_address, re_port):address_port = address+':'+portyield address_port.replace(' ','')def crawl_xicidaili(self):for i in range(1, 3):start_url = 'http://www.xicidaili.com/nn/{}'.format(i)headers = {'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8','Cookie':'_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3','Host':'www.xicidaili.com','Referer':'http://www.xicidaili.com/nn/3','Upgrade-Insecure-Requests':'1',}html = get_page(start_url, options=headers)if html:find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S)trs = find_trs.findall(html)for tr in trs:find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(tr)find_port = re.compile('<td>(\d+)</td>')re_port = find_port.findall(tr)for address,port in zip(re_ip_address, re_port):address_port = address+':'+portyield address_port.replace(' ','')def crawl_ip3366(self):for i in range(1, 4):start_url = 'http://www.ip3366.net/?stype=1&page={}'.format(i)html = get_page(start_url)if html:find_tr = re.compile('<tr>(.*?)</tr>', re.S)trs = find_tr.findall(html)for s in range(1, len(trs)):find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>')re_ip_address = find_ip.findall(trs[s])find_port = re.compile('<td>(\d+)</td>')re_port = find_port.findall(trs[s])for address,port in zip(re_ip_address, re_port):address_port = address+':'+portyield address_port.replace(' ','')def crawl_iphai(self):start_url = 'http://www.iphai.com/'html = get_page(start_url)if html:find_tr = re.compile('<tr>(.*?)</tr>', re.S)trs = find_tr.findall(html)for s in range(1, len(trs)):find_ip = re.compile('<td>\s+(\d+\.\d+\.\d+\.\d+)\s+</td>', re.S)re_ip_address = find_ip.findall(trs[s])find_port = re.compile('<td>\s+(\d+)\s+</td>', re.S)re_port = find_port.findall(trs[s])for address,port in zip(re_ip_address, re_port):address_port = address+':'+portyield address_port.replace(' ','')def crawl_89ip(self):start_url = 'http://www.89ip.cn/apijk/?&tqsl=1000&sxa=&sxb=&tta=&ports=&ktip=&cf=1'html = get_page(start_url)if html:find_ips = re.compile('(\d+\.\d+\.\d+\.\d+:\d+)', re.S)ip_ports = find_ips.findall(html)for address_port in ip_ports:yield address_portdef crawl_data5u(self):start_url = 'http://www.data5u.com/free/gngn/index.shtml'headers = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8','Accept-Encoding': 'gzip, deflate','Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7','Cache-Control': 'max-age=0','Connection': 'keep-alive','Cookie': 'JSESSIONID=47AA0C887112A2D83EE040405F837A86','Host': 'www.data5u.com','Referer': 'http://www.data5u.com/free/index.shtml','Upgrade-Insecure-Requests': '1','User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36',}html = get_page(start_url, options=headers)if html:ip_address = re.compile('<span><li>(\d+\.\d+\.\d+\.\d+)</li>.*?<li class=\"port.*?>(\d+)</li>', re.S)re_ip_address = ip_address.findall(html)for address, port in re_ip_address:result = address + ':' + portyield result.replace(' ', '')

            上面定义了Crawler类,接下来定义一个Getter类,用来动态的调用所有以crawl_开头的方法,然后将抓取的代理存入数据库。

from proxypool.tester import Tester
from proxypool.db import RedisClient
from proxypool.crawler import Crawler
from proxypool.setting import *
import sys
#调取所有crawl方法
class Getter():def __init__(self):#建立Redis连接self.redis = RedisClient()#声明Crawler()类的对象self.crawler = Crawler()def is_over_threshold(self):"""判断是否达到了代理池限制"""if self.redis.count() >= POOL_UPPER_THRESHOLD:return Trueelse:return False#入口方法def run(self):print('获取器开始执行')if not self.is_over_threshold():for callback_label in range(self.crawler.__CrawlFuncCount__):callback = self.crawler.__CrawlFunc__[callback_label]# 获取代理,调用对象的get_proxies()方法proxies = self.crawler.get_proxies(callback)sys.stdout.flush()   #刷新输出for proxy in proxies:self.redis.add(proxy)

3、检测模块 

         requests作为一个同步请求库,在发出第一个请求后,程序需要等待网页加载完成后才能继续执行。异步请求库就解决这个问题,可以在等待响应的过程做其他事情,比如调度其他请求。因此,使用异步请求库aiohttp来进行检测。

import asyncio
import aiohttp
import time
import sys
try:from aiohttp import ClientError
except:from aiohttp import ClientProxyConnectionError as ProxyConnectionError
from proxypool.db import RedisClient
from proxypool.setting import *#检测模块
class Tester(object):def __init__(self):self.redis = RedisClient()#标志异步请求async def test_single_proxy(self, proxy):"""测试单个代理:param proxy::return:"""#建立异步请求链接conn = aiohttp.TCPConnector(verify_ssl=False)#建立aiohttp的ClientSession对象async with aiohttp.ClientSession(connector=conn) as session:try:#判断是否是字节型if isinstance(proxy, bytes):#解码成字符串型proxy = proxy.decode('utf-8')real_proxy = 'http://' + proxyprint('正在测试', proxy)async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:if response.status in VALID_STATUS_CODES:#调用redis对象的max方法self.redis.max(proxy)print('代理可用', proxy)else:self.redis.decrease(proxy)print('请求响应码不合法 ', response.status, 'IP', proxy)except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):self.redis.decrease(proxy)print('代理请求失败', proxy)def run(self):"""测试主函数:return:"""print('测试器开始运行')try:count = self.redis.count()print('当前剩余', count, '个代理')for i in range(0, count, BATCH_TEST_SIZE):start = istop = min(i + BATCH_TEST_SIZE, count)print('正在测试第', start + 1, '-', stop, '个代理')test_proxies = self.redis.batch(start, stop)#异步IO协程。协程不能直接运行,需要把协程加入到时间循环(loop),由后者在适当时候调用协程。asyncio.get_event_loop()方法可以创建一个事件循环,然后使用run_until_complete(asyncio.wait(tasks))将协程包装成一个任务对象,注册到事件循环,并启动事件循环。loop = asyncio.get_event_loop()tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]loop.run_until_complete(asyncio.wait(tasks))sys.stdout.flush()time.sleep(5)except Exception as e:print('测试器发生错误', e.args)

4、接口模块 

       以 Web API的形式暴露可用代理。

       在这里,声明一个Flask对象,定义了3个接口,分别是首页、随机代理页、获取数量页。

from flask import Flask, gfrom .db import RedisClient__all__ = ['app']app = Flask(__name__)#接口模块
def get_conn():if not hasattr(g, 'redis'):g.redis = RedisClient()return g.redis@app.route('/')
def index():return '<h2>Welcome to Proxy Pool System</h2>'@app.route('/random')
def get_proxy():"""Get a proxy:return: 随机代理"""conn = get_conn()return conn.random()@app.route('/count')
def get_counts():"""Get the count of proxies:return: 代理池总量"""conn = get_conn()return str(conn.count())if __name__ == '__main__':app.run()

5、调度模块

         调度以上定义的检测模块、获取模块、接口模块。将这3个模块通过多进程的形式运行起来。启动入口是run()方法,如果开关开启,启动时程序就新建一个Process进程,设置好启动目标,然后调用start()方法运行,这样3个进程就可以并行执行,互不干扰。

import time
from multiprocessing import Process
from proxypool.api import app
from proxypool.getter import Getter
from proxypool.tester import Tester
from proxypool.db import RedisClient
from proxypool.setting import *#调度模块
class Scheduler():def schedule_tester(self, cycle=TESTER_CYCLE):"""定时测试代理"""#声明检测模块的对象tester = Tester()#进入死循环,每隔一个休眠时间进行代理检测while True:print('测试器开始运行')tester.run()time.sleep(cycle)def schedule_getter(self, cycle=GETTER_CYCLE):"""定时获取代理"""#声明获取模块的对象getter = Getter()while True:print('开始抓取代理')getter.run()time.sleep(cycle)def schedule_api(self):"""开启API"""app.run(API_HOST, API_PORT)def run(self):print('代理池开始运行')#没有join(),几个进程的输出是同时的if TESTER_ENABLED:#建立进程,进程调用对象tester_process = Process(target=self.schedule_tester)tester_process.start()if GETTER_ENABLED:getter_process = Process(target=self.schedule_getter)getter_process.start()if API_ENABLED:api_process = Process(target=self.schedule_api)api_process.start()

6、运行 

        运行代码,输出结果

       访问接口页面:

         访问此接口可以获取随机代理

#从代理池中随机获取一个代理
import requestsPROXY_POOL_URL = 'http://127.0.0.1:5555/random'try:response = requests.get(PROXY_POOL_URL)if response.status_code == 200:print(response.text)
except requests.exceptions.ConnectionError as e:print('Error',e.args)

        有了代理池,可以有效防止IP被封禁的情况。

构建代理池入口程序

from proxypool.scheduler import Scheduler
import sys
import io
#构建代理池的入口程序
#在不关闭一个已打开的文件前提下增加或改变它的Unicode编码
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')def main():try:s = Scheduler()s.run()except:main()if __name__ == '__main__':main()

 

代码编程
赞赏

相关文章

Google以21亿美元的价格收购了健康追踪器Fitbit
增长模型拆解:分享有礼裂变玩法的底层逻辑与细节设计
怎么做好B端数据运营?
基于两个实际案例,分享自己2020年运营的一些心得
数字化营销时代:企业如何从“推时代”进阶“拉时代”
谷歌三星杀进来了,高通为啥一点不怕?