ItemPipeline

After an item has been scraped by a spider, it is sent to the Item Pipeline which processes it through several components that are executed sequentially.

Each item pipeline component (sometimes referred as just “Item Pipeline”) is a Python class that implements a simple method. They receive an item and perform an action over it, also deciding if the item should continue through the pipeline or be dropped and no longer processed.

SpiderでItemがスクレイプされた後、ItemPipelineに送られ、処理される。

Typical uses of item pipelines are:

  • cleansing HTML data
  • validating scraped data (checking that the items contain certain fields)
  • checking for duplicates (and dropping them)
  • storing the scraped item in a database

典型的なItemPipelineの例は

  • HTMLのクレンジング
  • バリデーション
  • 重複チェック
  • データベースへの保存

取得したデータの保存に限らず、クレンジング/バリデーション/重複チェックといったデータのチェック整形もパイプラインが担う。

Activating an Item Pipeline component

To activate an Item Pipeline component you must add its class to the ITEM_PIPELINES setting, like in the following example:

1
2
3
4
>ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
>.}

The integer values you assign to classes in this setting determine the order in which they run: items go through from lower valued to higher valued classes. It’s customary to define these numbers in the 0-1000 range.

settings.pyのITEM_PIPELINESで有効化。0~1000の値で実行順序を制御している。

MongoDBの例

公式サイトのpymongoを使いMongoDBへ追加するサンプル

__init__from_crawler()はパイプライン自体を生成し、DBに関する設定値を読み取っている。
crawler.settingsからsettings.pyで設定したパラメーターを取得することができる。

open_spider()close_spider()はDBへの接続/切断処理を行い、process_item()で生成されたitemを追加していく。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymongo

class MongoPipeline:

collection_name = 'scrapy_items'

def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)

def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
self.client.close()

def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item

DynamoDBの例

Scrapy公式ではないが、GitHubでscrapy-dynamodbというScrapy向けのItemPipelineが公開されている。
基本的な記述は公式のMongoDBの例と同じだが、余剰なコードが多い。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import datetime
import boto3


def default_encoder(value):
if isinstance(value, datetime.datetime):
return value.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(value, datetime.date):
return value.strftime('%Y-%m-%d')
elif isinstance(value, datetime.time):
return value.strftime('%H:%M:%S')
else:
return value


class DynamoDbPipeline(object):

def __init__(self, aws_access_key_id, aws_secret_access_key, region_name,
table_name, encoder=default_encoder):
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.region_name = region_name
self.table_name = table_name
self.encoder = encoder
self.table = None

@classmethod
def from_crawler(cls, crawler):
aws_access_key_id = crawler.settings['AWS_ACCESS_KEY_ID']
aws_secret_access_key = crawler.settings['AWS_SECRET_ACCESS_KEY']
region_name = crawler.settings['DYNAMODB_PIPELINE_REGION_NAME']
table_name = crawler.settings['DYNAMODB_PIPELINE_TABLE_NAME']
return cls(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
table_name=table_name
)

def open_spider(self, spider):
db = boto3.resource(
'dynamodb',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.region_name,
)
self.table = db.Table(self.table_name) # pylint: disable=no-member

def close_spider(self, spider):
self.table = None

def process_item(self, item, spider):
self.table.put_item(
TableName=self.table_name,
Item={k: self.encoder(v) for k, v in item.items()},
)
return item