MongoDB 基础系列十五:增删查改 CRUD 之 Query - 批量写入

前言

此篇博文是 Mongdb 基础系列之一;

本文为作者的原创作品,转载需注明出处;

简介

MongoDB 为客户端提供了批量写入的功能;批量写入的操作只会作用到单个 collection;

New in version 3.2.

MongoDB 提供了 db.collection.bulkWrite() 方法来执行批量的 insert,update 和 delete 操作;对 insert 操作,MongoDB 也提供了 db.collection.insertMany() 方法来执行批量插入的操作;

Ordered vs Unordered Operations

通过在 db.collection.bulkWrite() 方法中通过指定参数 { ordered : false } 或者 { ordered : true } 来决定该批处理是 Ordered 还是 Unordered 的批处理操作;参考官方的例子 for more information;

如果是 Ordered Bulk Write 操作,如果其中在执行到某一个文档失败以后,MongoDB 将会返回,并不会继续执行剩下的 documents;如果是 Unordered Bulk Write 操作,如果在执行到某一个文档失败以后,MongoDB 还会继续执行;

Ordered Bulk Write

假设 characters collection 有如下的元素,

1
2
3
{ "_id" : 1, "char" : "Brisbane", "class" : "monk", "lvl" : 4 },
{ "_id" : 2, "char" : "Eldon", "class" : "alchemist", "lvl" : 3 },
{ "_id" : 3, "char" : "Meldane", "class" : "ranger", "lvl" : 3 }

Positive Case

下面的 builkWrite() 操作将会顺序的在 collection 上执行批操作,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
try {
db.characters.bulkWrite(
[
{ insertOne :
{
"document" :
{
"_id" : 4, "char" : "Dithras", "class" : "barbarian", "lvl" : 4
}
}
},
{ insertOne :
{
"document" :
{
"_id" : 5, "char" : "Taeln", "class" : "fighter", "lvl" : 3
}
}
},
{ updateOne :
{
"filter" : { "char" : "Eldon" },
"update" : { $set : { "status" : "Critical Injury" } }
}
},
{ deleteOne :
{ "filter" : { "char" : "Brisbane"} }
},
{ replaceOne :
{
"filter" : { "char" : "Meldane" },
"replacement" : { "char" : "Tanys", "class" : "oracle", "lvl" : 4 }
}
}
]
);
}
catch (e) {
print(e);
}

可以看到,上面的批操作过程中,insert 批处理操作中的 _id 是依次顺序递增的,因此该批处理是 Ordered Bulk Write Operations

执行上述操作,将返回下面的成功执行信息,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"acknowledged" : true,
"deletedCount" : 1,
"insertedCount" : 2,
"matchedCount" : 2,
"upsertedCount" : 0,
"insertedIds" : {
"0" : 4,
"1" : 5
},
"upsertedIds" : {

}
}

这里需要特别注意的是,默认情况下,是使用参数 { ordered : false } 来构建 bulkWrite() 方法的;也就是说,默认情况下,使用的是 Ordered Bulk Operations;

Negative case

如果第二个 insertOne 操作与第一个 insertOne 操作的 _id 重复了,比如都是 5,像下面这样,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
try {
db.characters.bulkWrite(
[
{ insertOne :
{
"document" :
{
"_id" : 5, "char" : "Dithras", "class" : "barbarian", "lvl" : 4
}
}
},
{ insertOne :
{
"document" :
{
"_id" : 5, "char" : "Taeln", "class" : "fighter", "lvl" : 3
}
}
},
{ updateOne :
{
"filter" : { "char" : "Eldon" },
"update" : { $set : { "status" : "Critical Injury" } }
}
},
{ deleteOne :
{ "filter" : { "char" : "Brisbane"} }
},
{ replaceOne :
{
"filter" : { "char" : "Meldane" },
"replacement" : { "char" : "Tanys", "class" : "oracle", "lvl" : 4 }
}
}
]
);
}
catch (e) {
print(e);
}

那么执行的时候会报错,并且中断后续的执行,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
BulkWriteError({
"writeErrors" : [
{
"index" : 0,
"code" : 11000,
"errmsg" : "E11000 duplicate key error collection: guidebook.characters index: _id_ dup key: { : 4 }",
"op" : {
"_id" : 5,
"char" : "Taeln"
}
}
],
"writeConcernErrors" : [ ],
"nInserted" : 1,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})

Unordered Build Write

假设 characters collection 有如下的元素,

1
2
3
{ "_id" : 1, "char" : "Brisbane", "class" : "monk", "lvl" : 4 },
{ "_id" : 2, "char" : "Eldon", "class" : "alchemist", "lvl" : 3 },
{ "_id" : 3, "char" : "Meldane", "class" : "ranger", "lvl" : 3 }

这里在构建批处理操作的时候,我们使用参数 { ordered : false } 来表明为 unordered operations;下面这个测试用例中,假设我们在插入数据的时候,当有重复的 _id 值的时候,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
try {
db.characters.bulkWrite(
[
{ insertOne :
{
"document" :
{
"_id" : 4, "char" : "Dithras", "class" : "barbarian", "lvl" : 4
}
}
},
{ insertOne :
{
"document" :
{
"_id" : 4, "char" : "Taeln", "class" : "fighter", "lvl" : 3
}
}
},
{ updateOne :
{
"filter" : { "char" : "Eldon" },
"update" : { $set : { "status" : "Critical Injury" } }
}
},
{ deleteOne :
{ "filter" : { "char" : "Brisbane"} }
},
{ replaceOne :
{
"filter" : { "char" : "Meldane" },
"replacement" : { "char" : "Tanys", "class" : "oracle", "lvl" : 4 }
}
}
],
{ ordered : false }
);
}
catch (e) {
print(e);
}

上述操作将返回如下的结果,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
BulkWriteError({
"writeErrors" : [
{
"index" : 0,
"code" : 11000,
"errmsg" : "E11000 duplicate key error collection: guidebook.characters index: _id_ dup key: { : 4 }",
"op" : {
"_id" : 4,
"char" : "Taeln"
}
}
],
"writeConcernErrors" : [ ],
"nInserted" : 1,
"nUpserted" : 0,
"nMatched" : 2,
"nModified" : 2,
"nRemoved" : 1,
"upserted" : [ ]
})

因为这是一个 Unordered 的操作,即便是出错,剩余的部分也同样会继续执行;

批处理所支持的方法

bulkWrite() 支持的写入方法如下,

  • insertOne
  • updateOne
  • updateMany
  • replaceOne
  • deleteOne
  • deleteMany

每一个方法带一个 document 传入 bulkWrite() 方法执行批处理操作;

Strategies for Bulk Inserts to a Sharded Collection

本小节提到了大批量的 bulkWrite() 有可能会影响到分片集群( sharded cluster ),可以考虑如下的方式来减轻对性能的影响;

Pre-Split the Collection

TODO

Unordered Writes to mongos

TODO

Avoid Monotonic Throttling

TODO

References

https://docs.mongodb.com/manual/core/bulk-write-operations/