云海的博客
首页
  • 接口
  • 数组
  • slice
  • map
  • 指针
  • 反射
  • Context
  • sync.map
  • 锁
  • 类型和类型指针分析
  • recover
  • 从零实现RPC框架
  • make和new区别
  • channel
  • sync.Once
  • sync.Pool
  • protobuf
  • MongoDB pkg源码-findone
  • MyBatis
  • Maven
  • 解析Laravel框架—路由处理
  • PHP(客户端)与 Golang(服务端)使用grpc+protobuf 通信
  • JAVA(客户端)与 Golang(服务端) 使用grpc+protobuf通信
  • Docker使用笔记-常用命令
  • Docker使用笔记-容器间通讯
  • Docker使用笔记-搭建Redis集群
  • Docker使用笔记-镜像多阶段构建
  • Kubernetes部署golang服务
  • Linux常用命令
  • Docker安装Prometheus与Grafana
  • Protobuf
  • TCP抓包
  • 概述-《TCP/IP详解》读书笔记
  • 索引
  • 事务隔离级别
  • 常识
  • 每日一题(1)
  • 每日一题(2)
  • 每日一题(3)
  • 每日一题(4)
关于
GitHub (opens new window)

云海

服务端研发
首页
  • 接口
  • 数组
  • slice
  • map
  • 指针
  • 反射
  • Context
  • sync.map
  • 锁
  • 类型和类型指针分析
  • recover
  • 从零实现RPC框架
  • make和new区别
  • channel
  • sync.Once
  • sync.Pool
  • protobuf
  • MongoDB pkg源码-findone
  • MyBatis
  • Maven
  • 解析Laravel框架—路由处理
  • PHP(客户端)与 Golang(服务端)使用grpc+protobuf 通信
  • JAVA(客户端)与 Golang(服务端) 使用grpc+protobuf通信
  • Docker使用笔记-常用命令
  • Docker使用笔记-容器间通讯
  • Docker使用笔记-搭建Redis集群
  • Docker使用笔记-镜像多阶段构建
  • Kubernetes部署golang服务
  • Linux常用命令
  • Docker安装Prometheus与Grafana
  • Protobuf
  • TCP抓包
  • 概述-《TCP/IP详解》读书笔记
  • 索引
  • 事务隔离级别
  • 常识
  • 每日一题(1)
  • 每日一题(2)
  • 每日一题(3)
  • 每日一题(4)
关于
GitHub (opens new window)
  • 接口
  • 数组
  • slice
  • map
  • 反射
  • sync.Pool
  • net包笔记
  • net-rpc分析
  • 指针
  • 数组排序
  • Context
  • sync.map
  • 锁
  • recover
  • 泛型
  • 类型和类型指针分析
  • make和new区别
  • channel
  • sync.Once
  • protobuf
  • GoLand debug(1)
  • 从零实现RPC框架
  • 从零开始学Go Origin框架
  • MongoDB pkg源码-findone
  • Golang
云海
2023-05-24

MongoDB pkg源码-findone

# 基础用法

collection := client.Database("baz").Collection("qux")
res, err := collection.InsertOne(context.Background(), bson.M{"hello": "world"})
if err != nil { return err }
id := res.InsertedID
1
2
3
4

# 源码

func (coll *Collection) InsertOne(ctx context.Context, document interface{},
	opts ...*options.InsertOneOptions) (*InsertOneResult, error) {

	ioOpts := options.MergeInsertOneOptions(opts...)
	imOpts := options.InsertMany()                          // InsertMany creates a new InsertManyOptions instance.

	if ioOpts.BypassDocumentValidation != nil && *ioOpts.BypassDocumentValidation {
		imOpts.SetBypassDocumentValidation(*ioOpts.BypassDocumentValidation)
	}
	res, err := coll.insert(ctx, []interface{}{document}, imOpts)

	rr, err := processWriteError(err)
	if rr&rrOne == 0 {
		return nil, err
	}
	return &InsertOneResult{InsertedID: res[0]}, err
}

func (coll *Collection) insert(ctx context.Context, documents []interface{},
	opts ...*options.InsertManyOptions) ([]interface{}, error) {

	if ctx == nil {
		ctx = context.Background()
	}

	result := make([]interface{}, len(documents))
	docs := make([]bsoncore.Document, len(documents)) // Document is a raw bytes representation of a BSON document.

	for i, doc := range documents {
		var err error
        // transformAndEnsureID is a hack that makes it easy to get a RawValue as the _id value.
        // It will also add an ObjectID _id as the first key if it not already present in the passed-in val.
		docs[i], result[i], err = transformAndEnsureID(coll.registry, doc) 
		if err != nil {
			return nil, err
		}
	}

	sess := sessionFromContext(ctx)
	if sess == nil && coll.client.sessionPool != nil {
		var err error
		sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
		if err != nil {
			return nil, err
		}
		defer sess.EndSession()
	}

	err := coll.client.validSession(sess)
	if err != nil {
		return nil, err
	}

	wc := coll.writeConcern
	if sess.TransactionRunning() {
		wc = nil
	}
	if !writeconcern.AckWrite(wc) {
		sess = nil
	}

	selector := makePinnedSelector(sess, coll.writeSelector)

	op := operation.NewInsert(docs...).
		Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
		ServerSelector(selector).ClusterClock(coll.client.clock).
		Database(coll.db.name).Collection(coll.name).
		Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).Ordered(true).
		ServerAPI(coll.client.serverAPI)
	imo := options.MergeInsertManyOptions(opts...)
	if imo.BypassDocumentValidation != nil && *imo.BypassDocumentValidation {
		op = op.BypassDocumentValidation(*imo.BypassDocumentValidation)
	}
	if imo.Ordered != nil {
		op = op.Ordered(*imo.Ordered)
	}
	retry := driver.RetryNone
	if coll.client.retryWrites {
		retry = driver.RetryOncePerCommand
	}
	op = op.Retry(retry)

	err = op.Execute(ctx)
	wce, ok := err.(driver.WriteCommandError)
	if !ok {
		return result, err
	}

	// remove the ids that had writeErrors from result
	for i, we := range wce.WriteErrors {
		// i indexes have been removed before the current error, so the index is we.Index-i
		idIndex := int(we.Index) - i
		// if the insert is ordered, nothing after the error was inserted
		if imo.Ordered == nil || *imo.Ordered {
			result = result[:idIndex]
			break
		}
		result = append(result[:idIndex], result[idIndex+1:]...)
	}

	return result, err
}

func transformAndEnsureID(registry *bsoncodec.Registry, val interface{}) (bsoncore.Document, interface{}, error) {
	if registry == nil {
		registry = bson.NewRegistryBuilder().Build()
	}
	switch tt := val.(type) {
	case nil:
		return nil, nil, ErrNilDocument
	case bsonx.Doc:
		val = tt.Copy()
	case []byte:
		// Slight optimization so we'll just use MarshalBSON and not go through the codec machinery.
		val = bson.Raw(tt)
	}

	// TODO(skriptble): Use a pool of these instead.
	doc := make(bsoncore.Document, 0, 256)
	doc, err := bson.MarshalAppendWithRegistry(registry, doc, val)
	if err != nil {
		return nil, nil, MarshalError{Value: val, Err: err}
	}

	var id interface{}

	value := doc.Lookup("_id")
	switch value.Type {
	case bsontype.Type(0):
		value = bsoncore.Value{Type: bsontype.ObjectID, Data: bsoncore.AppendObjectID(nil, primitive.NewObjectID())}
		olddoc := doc
		doc = make(bsoncore.Document, 0, len(olddoc)+17) // type byte + _id + null byte + object ID
		_, doc = bsoncore.ReserveLength(doc)
		doc = bsoncore.AppendValueElement(doc, "_id", value)
		doc = append(doc, olddoc[4:]...) // remove the length
		doc = bsoncore.UpdateLength(doc, 0, int32(len(doc)))
	default:
		// We copy the bytes here to ensure that any bytes returned to the user aren't modified
		// later.
		buf := make([]byte, len(value.Data))
		copy(buf, value.Data)
		value.Data = buf
	}

	err = bson.RawValue{Type: value.Type, Value: value.Data}.UnmarshalWithRegistry(registry, &id)
	if err != nil {
		return nil, nil, err
	}

	return doc, id, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
上次更新: 2023/05/31
从零开始学Go Origin框架

← 从零开始学Go Origin框架

最近更新
01
函数
04-11
02
面试题
04-11
03
EFK日志收集系统单机版
08-18
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式