Skip to content

Commit 1bccea5

Browse files
authoredFeb 14, 2025··
fix: session leak when tx was aborted during commit (#382)
The driver would leak a session if a read/write transaction was aborted during a Commit, and the retry of the transaction would fail due to concurrently modified data. Fixes #380
1 parent 4b81c98 commit 1bccea5

File tree

2 files changed

+155
-40
lines changed

2 files changed

+155
-40
lines changed
 

‎aborted_transactions_test.go

+152-40
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ func TestCommitAborted(t *testing.T) {
5353
if g, w := len(commitReqs), 2; g != w {
5454
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
5555
}
56+
57+
// Verify that the db is still usable.
58+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
59+
t.Fatalf("failed to execute statement after transaction: %v", err)
60+
}
5661
}
5762

5863
func TestCommitAbortedWithInternalRetriesDisabled(t *testing.T) {
@@ -80,6 +85,11 @@ func TestCommitAbortedWithInternalRetriesDisabled(t *testing.T) {
8085
if g, w := len(commitReqs), 1; g != w {
8186
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
8287
}
88+
89+
// Verify that the db is still usable.
90+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
91+
t.Fatalf("failed to execute statement after transaction: %v", err)
92+
}
8393
}
8494

8595
func TestUpdateAborted(t *testing.T) {
@@ -121,6 +131,11 @@ func TestUpdateAborted(t *testing.T) {
121131
if g, w := len(commitReqs), 1; g != w {
122132
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
123133
}
134+
135+
// Verify that the db is still usable.
136+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
137+
t.Fatalf("failed to execute statement after transaction: %v", err)
138+
}
124139
}
125140

126141
func TestBatchUpdateAborted(t *testing.T) {
@@ -160,6 +175,11 @@ func TestBatchUpdateAborted(t *testing.T) {
160175
if g, w := len(commitReqs), 1; g != w {
161176
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
162177
}
178+
179+
// Verify that the db is still usable.
180+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
181+
t.Fatalf("failed to execute statement after transaction: %v", err)
182+
}
163183
}
164184

165185
func TestQueryAborted(t *testing.T) {
@@ -172,7 +192,7 @@ func TestQueryAborted(t *testing.T) {
172192

173193
func TestEmptyQueryAbortedTwice(t *testing.T) {
174194
testRetryReadWriteTransactionWithQueryWithRetrySuccess(t, func(server testutil.InMemSpannerServer) {
175-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
195+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
176196
Type: testutil.StatementResultResultSet,
177197
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{}, "FOO"),
178198
})
@@ -243,7 +263,7 @@ func TestQueryConsumedHalfway_RetryContainsMoreResults_CommitAborted(t *testing.
243263
})
244264
}, codes.OK, 2, 2, 2,
245265
func(server testutil.InMemSpannerServer) {
246-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
266+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
247267
Type: testutil.StatementResultResultSet,
248268
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{1, 2, 3}, "FOO"),
249269
})
@@ -253,7 +273,7 @@ func TestQueryConsumedHalfway_RetryContainsMoreResults_CommitAborted(t *testing.
253273
func TestQueryWithError_CommitAborted(t *testing.T) {
254274
testRetryReadWriteTransactionWithQueryWithRetrySuccess(t, func(server testutil.InMemSpannerServer) {
255275
// Let the query return a Table not found error.
256-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
276+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
257277
Type: testutil.StatementResultError,
258278
Err: status.Errorf(codes.NotFound, "Table not found"),
259279
})
@@ -287,7 +307,7 @@ func TestQueryAbortedWithMoreResultsDuringRetry(t *testing.T) {
287307
})
288308
// Replace the original query result with a new one with an additional row
289309
// before the transaction is committed. This will cause the retry to fail.
290-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
310+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
291311
Type: testutil.StatementResultResultSet,
292312
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{1, 2, 3}, "FOO"),
293313
})
@@ -303,7 +323,7 @@ func TestQueryAbortedWithLessResultsDuringRetry(t *testing.T) {
303323
})
304324
// Replace the original query result with a new one with an additional row
305325
// before the transaction is committed. This will cause the retry to fail.
306-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
326+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
307327
Type: testutil.StatementResultResultSet,
308328
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{1}, "FOO"),
309329
})
@@ -315,7 +335,7 @@ func TestQueryAbortedWithLessResultsDuringRetry(t *testing.T) {
315335
func TestQueryWithEmptyResult_CommitAborted(t *testing.T) {
316336
testRetryReadWriteTransactionWithQueryWithRetrySuccess(t, func(server testutil.InMemSpannerServer) {
317337
// Let the query return an empty result set with only one column.
318-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
338+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
319339
Type: testutil.StatementResultResultSet,
320340
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{}, "FOO"),
321341
})
@@ -330,7 +350,7 @@ func TestQueryWithEmptyResult_CommitAborted(t *testing.T) {
330350
func TestQueryWithNewColumn_CommitAborted(t *testing.T) {
331351
testRetryReadWriteTransactionWithQuery(t, func(server testutil.InMemSpannerServer) {
332352
// Let the query return an empty result set with only one column.
333-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
353+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
334354
Type: testutil.StatementResultResultSet,
335355
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{}, "FOO"),
336356
})
@@ -341,7 +361,7 @@ func TestQueryWithNewColumn_CommitAborted(t *testing.T) {
341361
func(server testutil.InMemSpannerServer) {
342362
// Let the query return an empty result set with two columns during the retry.
343363
// This should cause a retry failure.
344-
server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
364+
_ = server.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
345365
Type: testutil.StatementResultResultSet,
346366
ResultSet: testutil.CreateTwoColumnResultSet([][2]int64{}, [2]string{"FOO", "BAR"}),
347367
})
@@ -366,6 +386,7 @@ func testRetryReadWriteTransactionWithQuery(t *testing.T, setupServer func(serve
366386

367387
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
368388
defer teardown()
389+
db.SetMaxOpenConns(1)
369390

370391
if setupServer != nil {
371392
setupServer(server.TestSpanner)
@@ -431,6 +452,12 @@ func testRetryReadWriteTransactionWithQuery(t *testing.T, setupServer func(serve
431452
if g, w := len(commitReqs), wantCommitCount; g != w {
432453
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
433454
}
455+
456+
// Execute another statement to ensure that the session that was used
457+
// has been returned to the pool.
458+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
459+
t.Fatalf("failed to execute statement after transaction: %v", err)
460+
}
434461
}
435462

436463
// Tests that a query that is aborted halfway the stream will be retried,
@@ -439,15 +466,17 @@ func testRetryReadWriteTransactionWithQuery(t *testing.T, setupServer func(serve
439466
func TestQueryAbortedHalfway_WithDifferentResultsInFirstHalf(t *testing.T) {
440467
t.Parallel()
441468

442-
db, server, teardown := setupTestDBConnection(t)
469+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
443470
defer teardown()
471+
db.SetMaxOpenConns(1)
444472
// Ensure that the second call to Next() will fail with an Aborted error.
445473
server.TestSpanner.AddPartialResultSetError(testutil.SelectFooFromBar, testutil.PartialResultSetExecutionTime{
446474
ResumeToken: testutil.EncodeResumeToken(2),
447475
Err: status.Error(codes.Aborted, "Aborted"),
448476
})
449477

450-
ctx := context.Background()
478+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
479+
defer cancel()
451480
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
452481
if err != nil {
453482
t.Fatalf("begin failed: %v", err)
@@ -477,7 +506,7 @@ func TestQueryAbortedHalfway_WithDifferentResultsInFirstHalf(t *testing.T) {
477506
// Replace the original query result with a new one with a different value
478507
// for the first row. This should cause the transaction to fail with an
479508
// ErrAbortedDueToConcurrentModification error.
480-
server.TestSpanner.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
509+
_ = server.TestSpanner.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
481510
Type: testutil.StatementResultResultSet,
482511
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{2, 2}, "FOO"),
483512
})
@@ -490,28 +519,38 @@ func TestQueryAbortedHalfway_WithDifferentResultsInFirstHalf(t *testing.T) {
490519
if g, w := rows.Err(), ErrAbortedDueToConcurrentModification; g != w {
491520
t.Fatalf("next error mismatch\nGot: %v\nWant: %v", g, w)
492521
}
522+
if err := tx.Rollback(); err != nil {
523+
t.Fatalf("failed to rollback transaction: %v", err)
524+
}
493525

494526
reqs := drainRequestsFromServer(server.TestSpanner)
495527
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
496528
if g, w := len(execReqs), 2; g != w {
497529
t.Fatalf("execute request count mismatch\nGot: %v\nWant: %v", g, w)
498530
}
531+
532+
// Verify that the db is still usable.
533+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
534+
t.Fatalf("failed to execute statement after transaction: %v", err)
535+
}
499536
}
500537

501538
// Tests that a change in the results of a result set that has not yet been seen
502539
// by the user does not cause a retry failure.
503540
func TestQueryAbortedHalfway_WithDifferentResultsInSecondHalf(t *testing.T) {
504541
t.Parallel()
505542

506-
db, server, teardown := setupTestDBConnection(t)
543+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
507544
defer teardown()
545+
db.SetMaxOpenConns(1)
508546
// Ensure that the second call to Next() will fail with an Aborted error.
509547
server.TestSpanner.AddPartialResultSetError(testutil.SelectFooFromBar, testutil.PartialResultSetExecutionTime{
510548
ResumeToken: testutil.EncodeResumeToken(2),
511549
Err: status.Error(codes.Aborted, "Aborted"),
512550
})
513551

514-
ctx := context.Background()
552+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
553+
defer cancel()
515554
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
516555
if err != nil {
517556
t.Fatalf("begin failed: %v", err)
@@ -542,7 +581,7 @@ func TestQueryAbortedHalfway_WithDifferentResultsInSecondHalf(t *testing.T) {
542581
// for the second row. This should not cause the transaction to fail with an
543582
// ErrAbortedDueToConcurrentModification error as the result has not yet
544583
// been seen by the user.
545-
server.TestSpanner.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
584+
_ = server.TestSpanner.PutStatementResult(testutil.SelectFooFromBar, &testutil.StatementResult{
546585
Type: testutil.StatementResultResultSet,
547586
ResultSet: testutil.CreateSingleColumnInt64ResultSet([]int64{1, 3}, "FOO"),
548587
})
@@ -563,21 +602,31 @@ func TestQueryAbortedHalfway_WithDifferentResultsInSecondHalf(t *testing.T) {
563602
if g, w := val, int64(3); g != w {
564603
t.Fatalf("value mismatch\nGot: %v\nWant: %v", g, w)
565604
}
605+
if err := tx.Commit(); err != nil {
606+
t.Fatalf("failed to commit transaction: %v", err)
607+
}
566608

567609
reqs := drainRequestsFromServer(server.TestSpanner)
568610
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
569611
if g, w := len(execReqs), 2; g != w {
570612
t.Fatalf("execute request count mismatch\nGot: %v\nWant: %v", g, w)
571613
}
614+
615+
// Verify that the db is still usable.
616+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
617+
t.Fatalf("failed to execute statement after transaction: %v", err)
618+
}
572619
}
573620

574621
func TestSecondUpdateAborted(t *testing.T) {
575622
t.Parallel()
576623

577-
db, server, teardown := setupTestDBConnection(t)
624+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
578625
defer teardown()
626+
db.SetMaxOpenConns(1)
579627

580-
ctx := context.Background()
628+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
629+
defer cancel()
581630
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
582631
if err != nil {
583632
t.Fatalf("begin failed: %v", err)
@@ -619,15 +668,22 @@ func TestSecondUpdateAborted(t *testing.T) {
619668
if g, w := len(commitReqs), 1; g != w {
620669
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
621670
}
671+
672+
// Verify that the db is still usable.
673+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
674+
t.Fatalf("failed to execute statement after transaction: %v", err)
675+
}
622676
}
623677

624678
func TestSecondBatchUpdateAborted(t *testing.T) {
625679
t.Parallel()
626680

627-
db, server, teardown := setupTestDBConnection(t)
681+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
628682
defer teardown()
683+
db.SetMaxOpenConns(1)
629684

630-
ctx := context.Background()
685+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
686+
defer cancel()
631687
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
632688
if err != nil {
633689
t.Fatalf("begin failed: %v", err)
@@ -671,20 +727,27 @@ func TestSecondBatchUpdateAborted(t *testing.T) {
671727
if g, w := len(commitReqs), 1; g != w {
672728
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
673729
}
730+
731+
// Verify that the db is still usable.
732+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
733+
t.Fatalf("failed to execute statement after transaction: %v", err)
734+
}
674735
}
675736

676737
func TestSecondUpdateAborted_FirstStatementWithSameError(t *testing.T) {
677738
t.Parallel()
678739

679-
db, server, teardown := setupTestDBConnection(t)
740+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
680741
defer teardown()
742+
db.SetMaxOpenConns(1)
681743

682-
ctx := context.Background()
744+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
745+
defer cancel()
683746
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
684747
if err != nil {
685748
t.Fatalf("begin failed: %v", err)
686749
}
687-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
750+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
688751
Type: testutil.StatementResultError,
689752
Err: status.Error(codes.NotFound, "Table not found"),
690753
})
@@ -726,6 +789,11 @@ func TestSecondUpdateAborted_FirstStatementWithSameError(t *testing.T) {
726789
if g, w := len(commitReqs), 1; g != w {
727790
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
728791
}
792+
793+
// Verify that the db is still usable.
794+
if _, err := db.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
795+
t.Fatalf("failed to execute statement after transaction: %v", err)
796+
}
729797
}
730798

731799
func TestSecondUpdateAborted_FirstResultUpdateCountChanged(t *testing.T) {
@@ -769,13 +837,14 @@ func TestSecondUpdateAborted_FirstResultFromErrorToOtherError(t *testing.T) {
769837
func testSecondUpdateAborted_FirstResultChanged(t *testing.T, firstResult *testutil.StatementResult, secondResult *testutil.StatementResult) {
770838
t.Parallel()
771839

772-
db, server, teardown := setupTestDBConnection(t)
840+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
773841
defer teardown()
774842
if firstResult != nil {
775-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, firstResult)
843+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, firstResult)
776844
}
777845

778-
ctx := context.Background()
846+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
847+
defer cancel()
779848
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
780849
if err != nil {
781850
t.Fatalf("begin failed: %v", err)
@@ -784,7 +853,7 @@ func testSecondUpdateAborted_FirstResultChanged(t *testing.T, firstResult *testu
784853
// the result is different during the retry.
785854
_, _ = tx.ExecContext(ctx, testutil.UpdateSingersSetLastName)
786855
// Update the result to simulate a different result during the retry.
787-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, secondResult)
856+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, secondResult)
788857

789858
server.TestSpanner.PutExecutionTime(testutil.MethodExecuteStreamingSql, testutil.SimulatedExecutionTime{
790859
Errors: []error{status.Error(codes.Aborted, "Aborted")},
@@ -795,27 +864,37 @@ func testSecondUpdateAborted_FirstResultChanged(t *testing.T, firstResult *testu
795864
if err != ErrAbortedDueToConcurrentModification {
796865
t.Fatalf("update error mismatch\nGot: %v\nWant: %v", err, ErrAbortedDueToConcurrentModification)
797866
}
867+
if err := tx.Rollback(); err != nil {
868+
t.Fatalf("failed to rollback transaction: %v", err)
869+
}
798870
reqs := drainRequestsFromServer(server.TestSpanner)
799871
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
800872
// The server should receive 3 execute statements, as only the first statement is retried.
801873
if g, w := len(execReqs), 3; g != w {
802874
t.Fatalf("execute request count mismatch\nGot: %v\nWant: %v", g, w)
803875
}
876+
877+
// Verify that the db is still usable.
878+
if _, err := db.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
879+
t.Fatalf("failed to execute statement after transaction: %v", err)
880+
}
804881
}
805882

806883
func TestBatchUpdateAbortedWithError(t *testing.T) {
807884
t.Parallel()
808885

809-
db, server, teardown := setupTestDBConnection(t)
886+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
810887
defer teardown()
888+
db.SetMaxOpenConns(1)
811889

812890
// Make sure that one of the DML statements will return an error.
813-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
891+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
814892
Type: testutil.StatementResultError,
815893
Err: status.Error(codes.NotFound, "Table not found"),
816894
})
817895

818-
ctx := context.Background()
896+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
897+
defer cancel()
819898
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
820899
if err != nil {
821900
t.Fatalf("begin failed: %v", err)
@@ -858,21 +937,28 @@ func TestBatchUpdateAbortedWithError(t *testing.T) {
858937
if g, w := len(commitReqs), 2; g != w {
859938
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
860939
}
940+
941+
// Verify that the db is still usable.
942+
if _, err := db.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
943+
t.Fatalf("failed to execute statement after transaction: %v", err)
944+
}
861945
}
862946

863947
func TestBatchUpdateAbortedWithError_DifferentRowCountDuringRetry(t *testing.T) {
864948
t.Parallel()
865949

866-
db, server, teardown := setupTestDBConnection(t)
950+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
867951
defer teardown()
952+
db.SetMaxOpenConns(1)
868953

869954
// Make sure that one of the DML statements will return an error.
870-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
955+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
871956
Type: testutil.StatementResultError,
872957
Err: status.Error(codes.NotFound, "Table not found"),
873958
})
874959

875-
ctx := context.Background()
960+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
961+
defer cancel()
876962
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
877963
if err != nil {
878964
t.Fatalf("begin failed: %v", err)
@@ -895,7 +981,7 @@ func TestBatchUpdateAbortedWithError_DifferentRowCountDuringRetry(t *testing.T)
895981

896982
// Change the returned row count of the first DML statement. This will cause
897983
// the retry to fail.
898-
server.TestSpanner.PutStatementResult(testutil.UpdateBarSetFoo, &testutil.StatementResult{
984+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateBarSetFoo, &testutil.StatementResult{
899985
Type: testutil.StatementResultUpdateCount,
900986
UpdateCount: testutil.UpdateBarSetFooRowCount + 1,
901987
})
@@ -916,21 +1002,28 @@ func TestBatchUpdateAbortedWithError_DifferentRowCountDuringRetry(t *testing.T)
9161002
if g, w := len(commitReqs), 1; g != w {
9171003
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
9181004
}
1005+
1006+
// Verify that the db is still usable.
1007+
if _, err := db.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
1008+
t.Fatalf("failed to execute statement after transaction: %v", err)
1009+
}
9191010
}
9201011

9211012
func TestBatchUpdateAbortedWithError_DifferentErrorDuringRetry(t *testing.T) {
9221013
t.Parallel()
9231014

924-
db, server, teardown := setupTestDBConnection(t)
1015+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
9251016
defer teardown()
1017+
db.SetMaxOpenConns(1)
9261018

9271019
// Make sure that one of the DML statements will return an error.
928-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
1020+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
9291021
Type: testutil.StatementResultError,
9301022
Err: status.Error(codes.NotFound, "Table not found"),
9311023
})
9321024

933-
ctx := context.Background()
1025+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1026+
defer cancel()
9341027
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
9351028
if err != nil {
9361029
t.Fatalf("begin failed: %v", err)
@@ -947,7 +1040,7 @@ func TestBatchUpdateAbortedWithError_DifferentErrorDuringRetry(t *testing.T) {
9471040

9481041
// Remove the error for the DML statement and cause a retry. The missing
9491042
// error for the DML statement should fail the retry.
950-
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
1043+
_ = server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
9511044
Type: testutil.StatementResultUpdateCount,
9521045
UpdateCount: testutil.UpdateSingersSetLastNameRowCount,
9531046
})
@@ -968,13 +1061,19 @@ func TestBatchUpdateAbortedWithError_DifferentErrorDuringRetry(t *testing.T) {
9681061
if g, w := len(commitReqs), 1; g != w {
9691062
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
9701063
}
1064+
1065+
// Verify that the db is still usable.
1066+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
1067+
t.Fatalf("failed to execute statement after transaction: %v", err)
1068+
}
9711069
}
9721070

9731071
func TestLastInsertId(t *testing.T) {
9741072
t.Parallel()
9751073

976-
db, server, teardown := setupTestDBConnection(t)
1074+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
9771075
defer teardown()
1076+
db.SetMaxOpenConns(1)
9781077

9791078
query := "insert into singers (name) values ('foo') then return id"
9801079
_ = server.TestSpanner.PutStatementResult(query, &testutil.StatementResult{
@@ -983,7 +1082,8 @@ func TestLastInsertId(t *testing.T) {
9831082
UpdateCount: 1,
9841083
})
9851084

986-
ctx := context.Background()
1085+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1086+
defer cancel()
9871087
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
9881088
if err != nil {
9891089
t.Fatalf("begin failed: %v", err)
@@ -1005,13 +1105,19 @@ func TestLastInsertId(t *testing.T) {
10051105
if err := tx.Commit(); err != nil {
10061106
t.Fatalf("failed to commit: %v", err)
10071107
}
1108+
1109+
// Verify that the db is still usable.
1110+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
1111+
t.Fatalf("failed to execute statement after transaction: %v", err)
1112+
}
10081113
}
10091114

10101115
func TestLastInsertIdChanged(t *testing.T) {
10111116
t.Parallel()
10121117

1013-
db, server, teardown := setupTestDBConnection(t)
1118+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
10141119
defer teardown()
1120+
db.SetMaxOpenConns(1)
10151121

10161122
query := "insert into singers (name) values ('foo') then return id"
10171123
_ = server.TestSpanner.PutStatementResult(query, &testutil.StatementResult{
@@ -1020,7 +1126,8 @@ func TestLastInsertIdChanged(t *testing.T) {
10201126
UpdateCount: 1,
10211127
})
10221128

1023-
ctx := context.Background()
1129+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1130+
defer cancel()
10241131
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
10251132
if err != nil {
10261133
t.Fatalf("begin failed: %v", err)
@@ -1051,6 +1158,11 @@ func TestLastInsertIdChanged(t *testing.T) {
10511158
if err := tx.Commit(); err != ErrAbortedDueToConcurrentModification {
10521159
t.Fatalf("commit error mismatch\n Got: %v\nWant: %v", err, ErrAbortedDueToConcurrentModification)
10531160
}
1161+
1162+
// Verify that the db is still usable.
1163+
if _, err := db.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
1164+
t.Fatalf("failed to execute statement after transaction: %v", err)
1165+
}
10541166
}
10551167

10561168
func firstNonZero(values ...int) int {

‎transaction.go

+3
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,9 @@ func (tx *readWriteTransaction) Commit() (err error) {
365365
commitTs, err = tx.rwTx.Commit(ctx)
366366
return err
367367
})
368+
if err == ErrAbortedDueToConcurrentModification {
369+
tx.rwTx.Rollback(context.Background())
370+
}
368371
}
369372
tx.close(&commitTs, err)
370373
return err

0 commit comments

Comments
 (0)
Please sign in to comment.