新聞中心
回調(diào)API和核心API
回調(diào) API:
- 啟動(dòng)一個(gè)事務(wù),執(zhí)行指定的操作,并提交(或出錯(cuò)時(shí)中止)。
- 自動(dòng)包含 "TransientTransactionError" 和 "UnknownTransactionCommitResult" 的錯(cuò)誤處理邏輯。
核心 API:
- 需要顯式調(diào)用來啟動(dòng)事務(wù)并提交事務(wù)。
- 不包含 "TransientTransactionError" 和 "UnknownTransactionCommitResult" 的錯(cuò)誤處理邏輯,而是為這些錯(cuò)誤提供了包含自定義錯(cuò)誤處理的靈活性。
回調(diào)API
回調(diào) API 包含以下邏輯:
- 如果事務(wù)遇到 "TransientTransactionError",則作為一個(gè)整體重試事務(wù)。
- 如果提交遇到 "UnknownTransactionCommitResult",則重新這個(gè)提交操作。
示例:
該示例使用新的回調(diào) API 來處理事務(wù),它啟動(dòng)事務(wù)、執(zhí)行指定的操作并提交(或在出錯(cuò)時(shí)中止)。新的回調(diào) API 包含 "TransientTransactionError"或"UnknownTransactionCommitResult" 提交錯(cuò)誤的重試邏輯。

成都網(wǎng)絡(luò)公司-成都網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)10余年經(jīng)驗(yàn)成就非凡,專業(yè)從事網(wǎng)站制作、做網(wǎng)站,成都網(wǎng)頁(yè)設(shè)計(jì),成都網(wǎng)頁(yè)制作,軟文推廣,一元廣告等。10余年來已成功提供全面的成都網(wǎng)站建設(shè)方案,打造行業(yè)特色的成都網(wǎng)站建設(shè)案例,建站熱線:028-86922220,我們期待您的來電!
重要:
- 推薦。使用針對(duì) MongoDB 部署版本更新的 MongoDB 驅(qū)動(dòng)程序。對(duì)于 MongoDB 4.2 部署(副本集和分片集群)上的事務(wù),客戶端必須使用為 MongoDB 4.2 更新的 MongoDB 驅(qū)動(dòng)程序。
- 使用驅(qū)動(dòng)程序時(shí),事務(wù)中的每個(gè)操作必須與會(huì)話相關(guān)聯(lián)(即將會(huì)話傳遞給每個(gè)操作)。
- 事務(wù)中的操作使用 事務(wù)級(jí)別的讀關(guān)注,事務(wù)級(jí)別的寫關(guān)注,和 事務(wù)級(jí)別的讀偏好。
- 在 MongoDB 4.2 及更早版本中,你無法在事務(wù)中創(chuàng)建集合。如果在事務(wù)內(nèi)部運(yùn)行,導(dǎo)致文檔插入的寫操作(例如 insert 或帶有 upsert: true 的更新操作)必須在 已有的 集合上執(zhí)行。
- 從 MongoDB 4.4 開始,你可以隱式或顯式地在事務(wù)中創(chuàng)建集合。但是,你比須使用針對(duì) 4.4 更新的 MongoDB 驅(qū)動(dòng)程序。
// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample() {
ctx := context.Background()
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
var uri string
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(ctx, clientOpts)
if err != nil {
panic(err)
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.New(writeconcern.WMajority(), writeconcern.WTimeout(1*time.Second))
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sessCtx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sessCtx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
panic(err)
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
panic(err)
}
fmt.Printf("result: %v\n", result)
}
核心API
核心事務(wù) API 不包含標(biāo)記錯(cuò)誤的重試邏輯:
- "TransientTransactionError"。如果事務(wù)中的操作返回標(biāo)記為 "TransientTransactionError"的錯(cuò)誤,則事務(wù)會(huì)被作為一個(gè)整體進(jìn)行重試。
- "UnknownTransactionCommitResult"。如果提交返回標(biāo)記為 "UnknownTransactionCommitResult"的錯(cuò)誤,提交會(huì)被重試。
為了處理 "UnknownTransactionCommitResult",應(yīng)用程序應(yīng)該明確地包含錯(cuò)誤的重試邏輯。
示例:
以下的示例包含了針對(duì)暫時(shí)性錯(cuò)誤重試事務(wù)和針對(duì)未知提交錯(cuò)誤重試提交的邏輯:
runTransactionWithRetry := func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {
for {
err := txnFn(sctx) // Performs transaction.
if err == nil {
return nil
}
log.Println("Transaction aborted. Caught exception during transaction.")
// If transient error, retry the whole transaction
if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
log.Println("TransientTransactionError, retrying transaction...")
continue
}
return err
}
}
commitWithRetry := func(sctx mongo.SessionContext) error {
for {
err := sctx.CommitTransaction(sctx)
switch e := err.(type) {
case nil:
log.Println("Transaction committed.")
return nil
case mongo.CommandError:
// Can retry commit
if e.HasErrorLabel("UnknownTransactionCommitResult") {
log.Println("UnknownTransactionCommitResult, retrying commit operation...")
continue
}
log.Println("Error during commit...")
return e
default:
log.Println("Error during commit...")
return e
}
}
}
// Updates two collections in a transaction.
updateEmployeeInfo := func(sctx mongo.SessionContext) error {
employees := client.Database("hr").Collection("employees")
events := client.Database("reporting").Collection("events")
err := sctx.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
)
if err != nil {
return err
}
_, err = employees.UpdateOne(sctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
if err != nil {
sctx.AbortTransaction(sctx)
log.Println("caught exception during transaction, aborting.")
return err
}
_, err = events.InsertOne(sctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
if err != nil {
sctx.AbortTransaction(sctx)
log.Println("caught exception during transaction, aborting.")
return err
}
return commitWithRetry(sctx)
}
return client.UseSessionWithOptions(
ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
func(sctx mongo.SessionContext) error {
return runTransactionWithRetry(sctx, updateEmployeeInfo)
},
)
}
驅(qū)動(dòng)程序版本
對(duì)于 MongoDB 4.2 部署(副本集和分片集群)上的事務(wù),客戶端必須使用為 MongoDB 4.2 更新的 MongoDB 驅(qū)動(dòng)程序:
|
??C 1.15.0????C# 2.9.0????Go 1.1?? |
??Java 3.11.0????Node 3.3.0????Perl 2.2.0?? |
??Python 3.9.0????Ruby 2.10.0????Scala 2.7.0?? |
對(duì)于 MongoDB 4.0 副本集上的事務(wù),客戶端需要為 MongoDB 4.0 或更高版本更新 MongoDB 驅(qū)動(dòng)程序。
|
Java 3.8.0Python 3.7.0C 1.11.0 |
C# 2.7Node 3.1.0Ruby 2.6.0 |
Perl 2.0.0PHP (PHPC) 1.5.0Scala 2.4.0 |
事務(wù)錯(cuò)誤處理
無論是哪種數(shù)據(jù)庫(kù)系統(tǒng),無論是MongoDB還是關(guān)系型數(shù)據(jù)庫(kù),應(yīng)用程序都應(yīng)該采取措施處理事務(wù)提交過程中的錯(cuò)誤,并包含事務(wù)的重試邏輯。
"TransientTransactionError"
無論 retryWrites的值是多少,事務(wù)內(nèi)部的單個(gè)寫操作都不可重試。如果操作遇到一個(gè)錯(cuò)誤與標(biāo)簽相關(guān) "TransientTransactionError",比如當(dāng)主節(jié)點(diǎn)降級(jí),事務(wù)會(huì)作為一個(gè)整體被重試。
- 回調(diào) API 包含了 "TransientTransactionError" 的重試邏輯。
- 核心事務(wù) API 不包含 "TransientTransactionError" 的重試邏輯。為了處理"TransientTransactionError",應(yīng)用程序應(yīng)該明確地包含錯(cuò)誤的重試邏輯。
"UnknownTransactionCommitResult"
提交操作是可重試的寫操作。如果提交操作遇到錯(cuò)誤,無論 retryWrites的值是多少,MongoDB 驅(qū)動(dòng)程序都會(huì)重試提交。
如果提交操作遇到標(biāo)記為 "UnknownTransactionCommitResult"的錯(cuò)誤,提交可以被重試。
- 回調(diào) API 包含了 "UnknownTransactionCommitResult"的重試邏輯。
- 核心事務(wù) API 不包含 "UnknownTransactionCommitResult"的重試邏輯。為了處理 "UnknownTransactionCommitResult",應(yīng)用程序應(yīng)該明確地包含錯(cuò)誤的重試邏輯。
驅(qū)動(dòng)程序版本錯(cuò)誤
在具有多個(gè) mongos 實(shí)例的分片集群上,使用為 MongoDB 4.0 更新的驅(qū)動(dòng)程序執(zhí)行事務(wù) (而不是 MongoDB 4.2)將失敗并可能導(dǎo)致錯(cuò)誤,包括:注釋你的驅(qū)動(dòng)程序可能會(huì)返回不同的錯(cuò)誤。有關(guān)詳細(xì)信息,請(qǐng)參閱驅(qū)動(dòng)程序的文檔。
|
Error Code |
Error Message |
|
251 |
? |
|
50940 |
? |
對(duì)于 MongoDB 4.2 部署(副本集和分片集群)上的事務(wù),使用為 MongoDB 4.2 更新的 MongoDB 驅(qū)動(dòng)程序。
附加信息:
mongo Shell 示例
下面列出的 mongo shell 方法可用于事務(wù):
- Session.startTransaction()
- Session.commitTransaction()
- Session.abortTransaction()
注釋
mongo shell 示例為了簡(jiǎn)單起見省略了重試邏輯和強(qiáng)大的錯(cuò)誤處理。有關(guān)在應(yīng)用程序中包含事務(wù)的更實(shí)際示例,請(qǐng)參閱 事務(wù)錯(cuò)誤處理 。
// Create collections:
db.getSiblingDB("mydb1").foo.insert( {abc: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } );
db.getSiblingDB("mydb2").bar.insert( {xyz: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } );
// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );
coll1 = session.getDatabase("mydb1").foo;
coll2 = session.getDatabase("mydb2").bar;
// Start a transaction
session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } );
// Operations inside the transaction
try {
coll1.insertOne( { abc: 1 } );
coll2.insertOne( { xyz: 999 } );
} catch (error) {
// Abort transaction on error
session.abortTransaction();
throw error;
}
// Commit the transaction using write concern set at transaction start
session.commitTransaction();
session.endSession();
本文題目:一文讀懂驅(qū)動(dòng)程序API
標(biāo)題路徑:http://www.dlmjj.cn/article/cdhcdse.html


咨詢
建站咨詢
