scrapy第一节
scrapy第二节
十.如何存储数据一切关于数据的存储,都放在pipeline中处理
1.写到本地文件中import codecs #处理文件以及编码的库 class JsonWithEncodingPipeline(object): #自定义json文件的导出 def __init__(self): self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider): # 方法的名称和参数是固定的 lines = json.dumps(dict(item), ensure_ascii=False) + "\n" self.file.write(lines)
return item
def spider_closed(self, spider): # 方法的名称和参数是固定的 self.file.close() # 这段代码是一个自己写文件的过程
2.scrapy提供的Exporter方式
以下以JsonItemExporter为例,其他的ItemExporter也是同样的写法
from scrapy.exporters import JsonItemExporter class JsonExporterPipleline(object): #调用scrapy提供的json export导出json文件 def __init__(self): self.file = open('articleexport.json', 'wb') #open self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False) #导出的形式,可以是xml/csv/json/... self.exporter.start_exporting() #开始导出 def close_spider(self, spider): self.exporter.finish_exporting() #结束导出 self.file.close() #close def process_item(self, item, spider): self.exporter.export_item(item) #传入需要导出的item return item
3.导出到数据库
本地使用mysql数据库,可以使用pymysql 或者 mysqlclient 驱动
此项目中使用mysqlclientpip install mysqlclient
import MySQLdb class MysqlPipeline(object): #采用同步的机制写入mysql def __init__(self): self.conn = MySQLdb.connect('192.168.0.106', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider): insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s
ON DUPLICATE KEY UPDATE fav_nums=VALUES(fav_nums)
""" params = list()
params.append(item.get("title", ""))
params.append(item.get("url", ""))
params.append(item.get("create_date", ""))
params.append(item.get("fav_nums", ""))
self.cursor.execute(insert_sql, tuple(params))
self.conn.commit()
return item
4.异步方式入库mysql
MySQLdb是同步的库,跟requests一样,同步的库尽量不要用在异步的框架中; 因为CPU解析的速度是很快的,但是如果使用同步的库,就会阻塞住,影响整体效率
import MySQLdb.cursors from twisted.enterprise import adbapi class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool = dbpool @classmethod def from_settings(cls, settings): dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings[" "],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider): #使用twisted将mysql插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常 return item
def handle_error(self, failure, item, spider): #处理异步插入的异常 print (failure)
def do_insert(self, cursor, item): #执行具体的插入 insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE fav_nums=VALUES(fav_nums)
""" params = list()
params.append(item.get("title", ""))
params.append(item.get("url", ""))
params.append(item.get("create_date", ""))
params.append(item.get("fav_nums", ""))
cursor.execute(insert_sql, tuple(params))
十一 itemloader
itemloader并不是非要学的,但是它可以将解析代码写的更加纯粹,代码分离性好,更容易维护
from scrapy.loader import ItemLoader def parse_detail(self, response): match_re = re.match(".*?(\d+)", response.url)
if match_re:
post_id = match_re.group(1)
item_loader = ItemLoader(item=CnblogsArticleItem(), response=response)
item_loader.add_css("title", "#news_title a::text")
item_loader.add_css("create_date", "#news_info .time::text")
item_loader.add_css("content", "#news_content")
item_loader.add_css("tags", ".news_tags a::text")
item_loader.add_value("url", response.url)
if response.meta.get("front_image_url", []):
item_loader.add_value("front_image_url", response.meta.get("front_image_url", []))
#article_item = item_loader.load_item() #可以延迟转换(进入pipeline之前转换)
yield Request(url=parse.urljoin(response.url, "/NewsAjax/GetAjaxNewsInfo?contentId={}".format(post_id)),
meta={"article_item": item_loader, "url": response.url}, callback=self.parse_nums)
通过这种方式传递的item,item里面的值都是list类型,我们需要将值提取出来,因为我们想要的是str
先来看下例子来找灵感:
#items.py 3次演变过程 from scrapy.loader.processors import Compose,TakeFirst,Identity,Join class CnblogsArticleItem(scrapy.Item): title = scrapy.Field(
output_processor = TakeFirst()
) # 标题 #这样,title标题就只取list里面的第一个值, -----------------------------------------------------演变1 #很多字段我们都只需要取第一个字符串,可是就需要每个字段都按上述方法写 #演变:继承ItemLoad类 from scrapy.loader import ItemLoader class ArticleItemLoader(ItemLoader): #自定义itemloader default_output_processor = TakeFirst() #此处就需要实例化我们自定义的itemloader item_loader = ArticleItemLoader(item=CnblogsArticleItem(), response=response)
---------------------------------------------------演变2 #针对图片下载,不能使用默认的str,需要单独使用list #针对create_date,使用itemloader前是需要通过正则匹配的,这个要如何实现呢? #针对标签,不能只提取第一个字符串,而是需要将列表里面的值拼接起来,如何实现呢 def date_convert(value): match_re = re.match(".*?(\d+.*)", value)
if match_re:
return match_re.group(1)
else:
return "1970-01-01" class CnblogsArticleItem(scrapy.Item): create_date = scrapy.Field(
input_processor = Compose(date_convert)
) # 创建日期 front_image_url = scrapy.Field(
output_processor = Identity()
) # 图片url(scrapy可自动下载)
tags = scrapy.Field(
output_processor = Join(separator=",")
) # 标签 --------------------------------------------------演变3
def parse_nums(self, response): j_data = json.loads(response.text)
item_loader = response.meta.get("article_item", "")
item_loader.add_value("praise_nums", j_data["DiggCount"])
item_loader.add_value("fav_nums", j_data["TotalView"])
item_loader.add_value("comment_nums", j_data["CommentCount"])
item_loader.add_value("url_object_id", get_md5(response.meta.get("url", "")))
article_item = item_loader.load_item()
yield article_item
本文由 mdnice 多平台发布
