Celery:celery 开发爬虫初尝

这是一个测试 celery 这个玩意的小脚本,celery 的官称是分布式队列,那就试试到底是如何工作的吧

下面有一个深度优先的简单例子 (爬取百度百科)
所谓的深度优先,就是你要爬取网站的深度,这个一般都是以实际情况来定义的 深度优先容易实现的地方是可以用递归来写,只要定义好阀值,写好逻辑判断等等

urls.py

import requests
import re
import time

exist_urls=[]
headers={
    'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36',
}
def get_link(url):
    try:
        response=requests.get(url=url,headers=headers)
        response.encoding='UTF-8'
        html=response.text
        link_lists=re.findall('.*?<a target=_blank href="/item/([^:#=<>]*?)".*?</a>',html)
        return link_lists
    except Exception as e:
        pass
    finally:
        exist_urls.append(url)

#主函数用来定义输出格式,当爬取深度小于三层时,递归调用主函数,继续爬取第二层的所有链接
def main(start_url,depth=1):
    count=0
    link_lists=get_link(start_url)
    if link_lists:
        unique_lists=list(set(link_lists)-set(exist_urls))
        for unique_list in unique_lists:
            unique_list='https://baike.baidu.com/item/'+unique_list
            count=count+1
            output='Depth:'+str(depth)+'\t'+start_url+'======>'+unique_list+'\n'
            print(output)
            with open('title.txt','a+') as f:
                f.write(output)
                f.close()
            with open('url.txt','a+') as f:
                f.write(unique_list + '\n')
                f.close()
            if depth<10:
                main(unique_list,depth+1)

if __name__=='__main__':
    t1=time.time()
    start_url='https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
    main(start_url)
    t2=time.time()
    print('总时间',t2-t1)

urls.py 是爬取 url 列表的文件,一个简易的深度优先的实现,带有简单的去重,url 集合保存在 url.txt 文件里,应该有好几百万条吧
parse.py 是简单的解析文件,简简单单抓了一些简介,存在了 mongodb

parse.py

from celery import Celery
import requests
from lxml import etree
import pymongo
app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost',27017)
db = client['baike']
@app.task
def get_url(link):
    item = {}
    headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
    res = requests.get(link,headers=headers)
    res.encoding = 'UTF-8'
    doc = etree.HTML(res.text)
    content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
    print(res.status_code)
    print(link,'\t','++++++++++++++++++++')
    item['link'] = link
    data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
    item['
data'] = data
    if db['
Baike'].insert(dict(item)):
        print("is OK ...")
    else:
        print('
No Mongo')

run_take 就是 celery 调度的主文件了,中间人选用的是 redis,也可以用官方推荐的 MQ 队列

run_take

from parse import get_url
from urls import url_list

def main(url):
    result = get_url.delay(url)
    return result

def run():
    with open('./url.txt''r') as f:
        for url in f.readlines():
            main(url.strip('\n'))

if __name__ == '__main__':
    run()

这里并没有用到自定义的配置文件,消费队列,以及一些定时爬取,监控爬取,这只是个小测试 只用了10个线程 

命令 

celery -A parse worker -l info -P gevent -c 10  # gevent可以自己指定  比如更为thread 等等

这样一来就开始监听 redis 队列,正在等待消费

发布任务

python run_take.py

redis 队列接收到任务之后开始消费,协程模式 gevent 10个线程并发,爬取速度效果非常好



CELERY

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!