diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 7fd670e42a3..b0401f56324 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -808,7 +808,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet * return nil, err } - tablet, err := cluster.vtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctlclientGetTablet(vttablet) if err != nil { return nil, err } @@ -851,7 +851,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin // returns the responses. It returns an error if the stream ends with fewer than // `count` responses. func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vttablet *Vttablet, count int) (responses []*querypb.StreamHealthResponse, err error) { - tablet, err := cluster.vtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctlclientGetTablet(vttablet) if err != nil { return nil, err } @@ -883,7 +883,7 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta return responses, nil } -func (cluster *LocalProcessCluster) vtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) { +func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) { result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias) if err != nil { return nil, err diff --git a/go/test/endtoend/reparent/prssettingspool/main_test.go b/go/test/endtoend/reparent/prssettingspool/main_test.go new file mode 100644 index 00000000000..a9f4312caea --- /dev/null +++ b/go/test/endtoend/reparent/prssettingspool/main_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package misc + +import ( + "context" + _ "embed" + "flag" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + rutils "vitess.io/vitess/go/test/endtoend/reparent/utils" + "vitess.io/vitess/go/test/endtoend/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "test" + + //go:embed schema.sql + schemaSQL string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--queryserver-enable-settings-pool") + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 2, false) + if err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--planner-version", "gen4") + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestSettingsPoolWithTXAndPRS(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + // set a system settings that will trigger reserved connection usage. + utils.Exec(t, conn, "set default_week_format = 5") + + // have transaction on the session + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "select id1, id2 from t1") + utils.Exec(t, conn, "commit") + + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + + // prs should happen without any error. + text, err := rutils.Prs(t, clusterInstance, tablets[1]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[0], 1*time.Minute) + + defer func() { + // reset state + text, err = rutils.Prs(t, clusterInstance, tablets[0]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[1], 1*time.Minute) + }() + + // no error should occur and it should go to the right tablet. + utils.Exec(t, conn, "select id1, id2 from t1") +} + +func TestSettingsPoolWithoutTXAndPRS(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + // set a system settings that will trigger reserved connection usage. + utils.Exec(t, conn, "set default_week_format = 5") + + // execute non-tx query + utils.Exec(t, conn, "select id1, id2 from t1") + + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + + // prs should happen without any error. + text, err := rutils.Prs(t, clusterInstance, tablets[1]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[0], 1*time.Minute) + defer func() { + // reset state + text, err = rutils.Prs(t, clusterInstance, tablets[0]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[1], 1*time.Minute) + }() + + // no error should occur and it should go to the right tablet. + utils.Exec(t, conn, "select id1, id2 from t1") + +} diff --git a/go/test/endtoend/reparent/prssettingspool/schema.sql b/go/test/endtoend/reparent/prssettingspool/schema.sql new file mode 100644 index 00000000000..3e78cab09d6 --- /dev/null +++ b/go/test/endtoend/reparent/prssettingspool/schema.sql @@ -0,0 +1,5 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index c2ab9d48306..0505f4a266b 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -31,6 +31,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -693,3 +696,24 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V require.Equal(t, "No", res.Rows[0][11].ToString()) } } + +func WaitForTabletToBeServing(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) { + vTablet, err := clusterInstance.VtctlclientGetTablet(tablet) + require.NoError(t, err) + + tConn, err := tabletconn.GetDialer()(vTablet, false) + require.NoError(t, err) + + newCtx, cancel := context.WithTimeout(context.Background(), timeout) + err = tConn.StreamHealth(newCtx, func(shr *querypb.StreamHealthResponse) error { + if shr.Serving { + cancel() + } + return nil + }) + + // the error should only be because we cancelled the context when the tablet became serving again. + if err != nil && !strings.Contains(err.Error(), "context canceled") { + t.Fatal(err.Error()) + } +} diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index 7ecb2943265..326c98b6b06 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -149,11 +149,22 @@ func (session *SafeSession) ResetTx() { session.mu.Lock() defer session.mu.Unlock() session.resetCommonLocked() - if !session.Session.InReservedConn { - session.ShardSessions = nil - session.PreSessions = nil - session.PostSessions = nil + // If settings pools is enabled on the vttablet. + // This variable will be true but there will not be a shard session with reserved connection id. + // So, we should check the shard session and not just this variable. + if session.Session.InReservedConn { + allSessions := append(session.ShardSessions, append(session.PreSessions, session.PostSessions...)...) + for _, ss := range allSessions { + if ss.ReservedId != 0 { + // found that reserved connection exists. + // abort here, we should keep the shard sessions. + return + } + } } + session.ShardSessions = nil + session.PreSessions = nil + session.PostSessions = nil } // Reset clears the session diff --git a/test/config.json b/test/config.json index 13ab5bfcbf0..d5757645b6c 100644 --- a/test/config.json +++ b/test/config.json @@ -1214,6 +1214,15 @@ "Shard": "vttablet_prscomplex", "RetryMax": 1, "Tags": [""] + }, + "prssettingspool": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/reparent/prssettingspool"], + "Command": [], + "Manual": false, + "Shard": "vttablet_prscomplex", + "RetryMax": 1, + "Tags": [""] } } }