本页代码可以在这里下载。
1.设置代理
urllib 中使用代理:
代码:其中proxy是代理ip和端口号,验证代理只需要在url中加入验证参数即可。
# -*- coding:utf-8 -*- from urllib.error import URLError from urllib.request import ProxyHandler, build_opener # urllib agent proxy = '127.0.0.1:9743' proxy_handler = ProxyHandler({ 'http': 'http://' + proxy, 'https': 'https://' + proxy }) opener = build_opener(proxy_handler) try: responce = opener.open('http://httpbin.org/get') print(responce.read().decode('utf-8')) except URLError as e: print(e.reason)
requests 中使用代理:
对于requests来说,代理更加简单,只需要设置proxies参数即可,验证代理也和urllib一样,加入url参数即可。
# requests agent proxy = '127.0.0.1:9743' proxies = { 'http': 'http://' + proxy, 'https': 'https://' + proxy } try: responce = requests.get('http://http:httpbin.org/get', proxies = proxies) print(responce.text) except requests.exceptions.ConnectionError as e: print('Error', e.args)
Selenium 中使用代理:
在chrome中,用Selenium设置代理的方法也非常简单。
# selenium agent proxy = '127.0.0.1:9743' chrome_options = webdriver.ChromeOptions() chrome_options.add_argument('--proxy-server=http://' + proxy) browser = webdriver.Chrome(chrome_options=chrome_options) browser.get('http://httpbin.org/get')
如果是验证代理,这需要在本地创建一个mainifest.json 配置文件和 background.js 脚本来设置认证代理。运行代码之后本地会生成一个 proxy_auth_plugin.zip 文件来保存当前配置。
# selenium agent with auth ip = '127.0.0.1' port = 9743 username = 'username' password = 'password' manifest_json = """ { "version": "1.0.0", "manifest_version": 2, "name": "Chrome Proxy", "permissions": [ "proxy", "tabs", "unlimitedStorage", "storage", "<all_urls>", "webRequest", "webRequestBlocking" ], "background": { "scripts": ["background.js"] } } """ background_js = """ var config = { mode: "fixed_servers", rules: { singleProxy: { scheme: "http", host: "%(ip)s", port: %(port)s } } } chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); function callbackFn(details) { return { authCredentials: { username: "%(username)s", password: "%(password)s" } } } chrome.webRequest.onAuthRequired.addListener( callbackFn, {urls: ["<all_urls>"]}, ['blocking'] ) """ % {'ip': ip, 'port': port, 'username': username, 'password': password} plugin_file = 'proxy_auth_plugin.zip' with zipfile.ZipFile(plugin_file, 'w') as zp: zp.writestr("manifest.json", manifest_json) zp.writestr("background.js", background_js) chrome_options = Options() chrome_options.add_argument("--start-maximized") chrome_options.add_extension(plugin_file) browser = webdriver.Chrome(chrome_options=chrome_options) browser.get('http://httpbin.org/get')
2.代理池维护
我们利用代理可以解决目标网站封ip的问题。但是无论是收费代理还是免费代理都存在代理不可用的情况,所以我们需要建立一个高效易用的代理池。
存储模块:负责存储抓取下来的代理
获取模块:需要定时在各大代理网站抓取代理。
检测模块:需要定时检测数据库中的代理。
接口模块:需要用API来提供对外服务的接口。
存储模块:
采用分数制,分数100为可用,检测器定时循环检测每个代理的使用情况,检测到不可用分数减1,分数减至0后代理被移除。
新获取的代理的分数为10,如果测试可行,则分数立即置为100,不可行则分数减1,分数减至0后代理移除。
__init__():对Redis进行初始化,建立Redis连接。
add():向数据库中添加代理设置分数。
random():随机获取代理的方法,首先尝试获取100分,如果没有按照排名获取。
decrease():在代理检测无效时分数-1,分数达到最低值,移除代理。
exists():判断代理是否存在。
max():将代理的分数设置为最大。
count():返回集合个数。
all():返回所有代理列表,以供检查。
class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): print(''' 初始化 地址、端口、密码 ''') self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True) def add(self, proxy, score=INITIAL_SCORE): print(''' 添加代理,设置最高分数''') if not self.db.zscore(REDIS_KEY, proxy): return self.db.zadd(REDIS_KEY, proxy) def random(self): print(''' 随机获取有效代理''') result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(result): return choice(result) else: result = self.db.zrevrange(REDIS_KEY, 0, 100) if len(result): return choice(result) else: print('代理池为空') def decrease(self, proxy): print(''' 代理值减一分''') score = self.db.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: print('代理', proxy, '当前分数', score, '减1') return self.db.zincrby(REDIS_KEY, proxy, -1) else: print('代理', proxy, '当前分数', score, '移除') return self.db.zrem(REDIS_KEY, proxy) def exists(self, proxy): print(''' 判断是否存在''') return not self.db.zscore(REDIS_KEY, proxy) == None def max(self, proxy): print(''' 将代理设置为最大值''') print('代理', proxy, '可用,设置为', MAX_SCORE) return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy) def count(self): print(''' 获取数量''') return self.db.zcard(REDIS_KEY) def all(self): print(''' 获取全部代理''') return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
获取模块:
首先设置元类(ProxyMetaclass),该类中实现了__new__()方法,这个方法固有了几个参数,其中第四个attrs中包含了类的一些属性,我们可以遍历attrs来获取所有的类信息。
class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_proxies(self, callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功获取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 获取代理66 :param page_count: 页码 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) if html: doc = pq(html) trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_proxy360(self): """ 获取Proxy360 :return: 代理 """ start_url = 'http://www.proxy360.cn/Region/China' print('Crawling', start_url) html = get_page(start_url) if html: doc = pq(html) lines = doc('div[name="list_proxy_ip"]').items() for line in lines: ip = line.find('.tbBottomLine:nth-child(1)').text() port = line.find('.tbBottomLine:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_goubanjia(self): """ 获取Goubanjia :return: 代理 """ start_url = 'http://www.goubanjia.com/free/gngn/index.shtml' html = get_page(start_url) if html: doc = pq(html) tds = doc('td.ip').items() for td in tds: td.find('p').remove() yield td.text().replace(' ', '') def crawl_ip181(self): start_url = 'http://www.ip181.com/' html = get_page(start_url) ip_address = re.compile('<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>') # \s* 匹配空格,起到换行作用 re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '') def crawl_ip3366(self): for page in range(1, 4): start_url = 'http://www.ip3366.net/free/?stype=1&page={}'.format(page) html = get_page(start_url) ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>') # \s * 匹配空格,起到换行作用 re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '') def crawl_kxdaili(self): for i in range(1, 11): start_url = 'http://www.kxdaili.com/ipList/{}.html#ip'.format(i) html = get_page(start_url) ip_address = re.compile('<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>') # \s* 匹配空格,起到换行作用 re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '') def crawl_premproxy(self): for i in ['China-01', 'China-02', 'China-03', 'China-04', 'Taiwan-01']: start_url = 'https://premproxy.com/proxy-by-country/{}.htm'.format(i) html = get_page(start_url) if html: ip_address = re.compile('<td data-label="IP:port ">(.*?)</td>') re_ip_address = ip_address.findall(html) for address_port in re_ip_address: yield address_port.replace(' ', '') def crawl_xroxy(self): for i in ['CN', 'TW']: start_url = 'http://www.xroxy.com/proxylist.php?country={}'.format(i) html = get_page(start_url) if html: ip_address1 = re.compile("title='View this Proxy details'>\s*(.*).*") re_ip_address1 = ip_address1.findall(html) ip_address2 = re.compile("title='Select proxies with port number .*'>(.*)</a>") re_ip_address2 = ip_address2.findall(html) for address, port in zip(re_ip_address1, re_ip_address2): address_port = address + ':' + port yield address_port.replace(' ', '') def crawl_kuaidaili(self): for i in range(1, 4): start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i) html = get_page(start_url) if html: ip_address = re.compile('<td data-title="IP">(.*?)</td>') re_ip_address = ip_address.findall(html) port = re.compile('<td data-title="PORT">(.*?)</td>') re_port = port.findall(html) for address, port in zip(re_ip_address, re_port): address_port = address + ':' + port yield address_port.replace(' ', '') def crawl_xicidaili(self): for i in range(1, 3): start_url = 'http://www.xicidaili.com/nn/{}'.format(i) headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cookie': '_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3', 'Host': 'www.xicidaili.com', 'Referer': 'http://www.xicidaili.com/nn/3', 'Upgrade-Insecure-Requests': '1', } html = get_page(start_url, options=headers) if html: find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S) trs = find_trs.findall(html) for tr in trs: find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(tr) find_port = re.compile('<td>(\d+)</td>') re_port = find_port.findall(tr) for address, port in zip(re_ip_address, re_port): address_port = address + ':' + port yield address_port.replace(' ', '') def crawl_ip3366(self): for i in range(1, 4): start_url = 'http://www.ip3366.net/?stype=1&page={}'.format(i) html = get_page(start_url) if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) for s in range(1, len(trs)): find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(trs[s]) find_port = re.compile('<td>(\d+)</td>') re_port = find_port.findall(trs[s]) for address, port in zip(re_ip_address, re_port): address_port = address + ':' + port yield address_port.replace(' ', '') def crawl_iphai(self): start_url = 'http://www.iphai.com/' html = get_page(start_url) if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) for s in range(1, len(trs)): find_ip = re.compile('<td>\s+(\d+\.\d+\.\d+\.\d+)\s+</td>', re.S) re_ip_address = find_ip.findall(trs[s]) find_port = re.compile('<td>\s+(\d+)\s+</td>', re.S) re_port = find_port.findall(trs[s]) for address, port in zip(re_ip_address, re_port): address_port = address + ':' + port yield address_port.replace(' ', '') def crawl_89ip(self): start_url = 'http://www.89ip.cn/apijk/?&tqsl=1000&sxa=&sxb=&tta=&ports=&ktip=&cf=1' html = get_page(start_url) if html: find_ips = re.compile('(\d+\.\d+\.\d+\.\d+:\d+)', re.S) ip_ports = find_ips.findall(html) for address_port in ip_ports: yield address_port def crawl_data5u(self): start_url = 'http://www.data5u.com/free/gngn/index.shtml' headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7', 'Cache-Control': 'max-age=0', 'Connection': 'keep-alive', 'Cookie': 'JSESSIONID=47AA0C887112A2D83EE040405F837A86', 'Host': 'www.data5u.com', 'Referer': 'http://www.data5u.com/free/index.shtml', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36', } html = get_page(start_url, options=headers) if html: ip_address = re.compile('<span><li>(\d+\.\d+\.\d+\.\d+)</li>.*?<li class=\"port.*?>(\d+)</li>', re.S) re_ip_address = ip_address.findall(html) for address, port in re_ip_address: result = address + ':' + port yield result.replace(' ', '')
检测模块:
我们使用异步请求库aiohttp来进行检测。
requests作为一个同步请求库,我们发出一个请求之后,程序需要等待网页加载完成之后才能继续执行,如果服务器响应缓慢,那会十分影响效率,所以我们使用异步请求库来解决这个问题。
class Tester(object): def __init__(self): self.redis = RedisClient() async def test_single_proxy(self, proxy): """ 测试单个代理 :param proxy: :return: """ conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') real_proxy = 'http://' + proxy print('正在测试', proxy) async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用', proxy) else: self.redis.decrease(proxy) print('请求响应码不合法 ', response.status, 'IP', proxy) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.redis.decrease(proxy) print('代理请求失败', proxy) def run(self): """ 测试主函数 :return: """ print('测试器开始运行') try: count = self.redis.count() print('当前剩余', count, '个代理') for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print('正在测试第', start + 1, '-', stop, '个代理') test_proxies = self.redis.batch(start, stop) loop = asyncio.get_event_loop() tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print('测试器发生错误', e.args)
接口模块:
我们使用一个轻量级的Flask库来实现这个接口模块。
def get_conn(): if not hasattr(g, 'redis'): g.redis = RedisClient() return g.redis @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' @app.route('/random') def get_proxy(): """ Get a proxy :return: 随机代理 """ conn = get_conn() return conn.random() @app.route('/count') def get_counts(): """ Get the count of proxies :return: 代理池总量 """ conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
我们声明了Flask对象,定义了3个接口,分别是首页、随机代理页、获取数量页。
调度模块:
调用以上3个模块,将这三个模块通过多线程的形式运行起来。
class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定时测试代理 """ tester = Tester() while True: print('测试器开始运行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定时获取代理 """ getter = Getter() while True: print('开始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 开启API """ app.run(API_HOST, API_PORT) def run(self): print('代理池开始运行') if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()
运行:
访问API获得随机代理:
测试代理并设置分数:
查看Redis Desktop中的数据:
.
0 条评论