Skip to content

Commit 8dea88b

Browse files
cvvzonsi
andauthoredMay 3, 2023
fix: fail fast may cause Serial spec or cleanup Node interrupted (#1178)
* fix: fail fast may cause Serial spec or cleanup Node interrupted * tighten up edges around abort behavior 1. inter-process aborts should not interrupt cleanup nodes 2. whenever we fetch interrupt status, check and see if an abort has happened. if it has ensure we return the latest, correct, abort state. this allows us to avoid accidentally starting the next spec because the ABORT_POLLING_INTERVAL hasn't fired yet * fix race condition in internal integration suite * fix internal test race condition --------- Co-authored-by: Onsi Fakhouri <onsijoe@gmail.com>
1 parent 903be81 commit 8dea88b

File tree

7 files changed

+176
-19
lines changed

7 files changed

+176
-19
lines changed
 

‎internal/internal_integration/abort_test.go

+77-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package internal_integration_test
22

33
import (
4+
"time"
5+
46
. "github.com/onsi/ginkgo/v2"
57
. "github.com/onsi/gomega"
68

9+
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
710
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
811
"github.com/onsi/ginkgo/v2/types"
912
)
@@ -158,19 +161,92 @@ var _ = Describe("handling test aborts", func() {
158161
})
159162

160163
Describe("when running in parallel and a test aborts", func() {
164+
var c chan interface{}
161165
BeforeEach(func() {
162166
SetUpForParallel(2)
167+
c = make(chan interface{})
163168
})
164169

165170
It("notifies the server of the abort", func() {
166171
Ω(client.ShouldAbort()).Should(BeFalse())
167-
success, _ := RunFixture("aborting in parallel", func() {
172+
success := RunFixtureInParallel("aborting in parallel", func(_ int) {
168173
It("A", func() {
174+
<-c
169175
Abort("abort")
170176
})
177+
178+
It("B", func(ctx SpecContext) {
179+
close(c)
180+
select {
181+
case <-ctx.Done():
182+
rt.Run("dc-done")
183+
case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2):
184+
rt.Run("dc-after")
185+
}
186+
})
171187
})
172188
Ω(success).Should(BeFalse())
173189
Ω(client.ShouldAbort()).Should(BeTrue())
190+
191+
Ω(rt).Should(HaveTracked("dc-done")) //not dc-after
192+
Ω(reporter.Did.Find("A")).Should(HaveAborted("abort"))
193+
Ω(reporter.Did.Find("B")).Should(HaveBeenInterrupted(interrupt_handler.InterruptCauseAbortByOtherProcess))
174194
})
195+
196+
It("does not interrupt cleanup nodes", func() {
197+
success := RunFixtureInParallel("aborting in parallel", func(_ int) {
198+
It("A", func() {
199+
<-c
200+
Abort("abort")
201+
})
202+
203+
Context("B", func() {
204+
It("B", func() {
205+
})
206+
207+
AfterEach(func(ctx SpecContext) {
208+
close(c)
209+
select {
210+
case <-ctx.Done():
211+
rt.Run("dc-done")
212+
case <-time.After(interrupt_handler.ABORT_POLLING_INTERVAL * 2):
213+
rt.Run("dc-after")
214+
}
215+
})
216+
})
217+
})
218+
Ω(success).Should(BeFalse())
219+
220+
Ω(rt).Should(HaveTracked("dc-after")) //not dc-done
221+
Ω(reporter.Did.Find("A")).Should(HaveAborted("abort"))
222+
Ω(reporter.Did.Find("B")).Should(HavePassed())
223+
})
224+
225+
It("does not start serial nodes if an abort occurs", func() {
226+
success := RunFixtureInParallel("aborting in parallel", func(proc int) {
227+
It("A", func() {
228+
time.Sleep(time.Millisecond * 50)
229+
if proc == 2 {
230+
rt.Run("aborting")
231+
Abort("abort")
232+
}
233+
})
234+
235+
It("B", func() {
236+
time.Sleep(time.Millisecond * 50)
237+
if proc == 2 {
238+
rt.Run("aborting")
239+
Abort("abort")
240+
}
241+
})
242+
243+
It("C", Serial, func() {
244+
rt.Run("C")
245+
})
246+
})
247+
Ω(success).Should(BeFalse())
248+
Ω(rt).Should(HaveTracked("aborting")) //just one aborting and we don't see C
249+
}, MustPassRepeatedly(10))
250+
175251
})
176252
})

‎internal/internal_integration/internal_integration_suite_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package internal_integration_test
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"reflect"
78
"testing"
@@ -10,6 +11,7 @@ import (
1011
. "github.com/onsi/ginkgo/v2"
1112
"github.com/onsi/ginkgo/v2/internal"
1213
"github.com/onsi/ginkgo/v2/internal/global"
14+
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
1315
"github.com/onsi/ginkgo/v2/internal/parallel_support"
1416
. "github.com/onsi/ginkgo/v2/internal/test_helpers"
1517
"github.com/onsi/ginkgo/v2/types"
@@ -18,6 +20,7 @@ import (
1820
)
1921

2022
func TestSuiteTests(t *testing.T) {
23+
interrupt_handler.ABORT_POLLING_INTERVAL = 200 * time.Millisecond
2124
format.TruncatedDiff = false
2225
RegisterFailHandler(Fail)
2326
suiteConfig, _ := GinkgoConfiguration()
@@ -81,6 +84,7 @@ func WithSuite(suite *internal.Suite, callback func()) {
8184
func SetUpForParallel(parallelTotal int) {
8285
conf.ParallelTotal = parallelTotal
8386
server, client, exitChannels = SetUpServerAndClient(conf.ParallelTotal)
87+
8488
conf.ParallelHost = server.Address()
8589
}
8690

@@ -95,6 +99,48 @@ func RunFixture(description string, callback func()) (bool, bool) {
9599
return success, hasProgrammaticFocus
96100
}
97101

102+
/*
103+
You should call SetUpForParallel() first, then call RunFixtureInParallel()
104+
105+
this is, at best, an approximation. There are some dsl objects that can be called within a running node (e.g. DeferCleanup) that will not work with RunFixtureInParallel as they will attach to the actual internal_integration suite as opposed to the simulated fixture
106+
107+
moreover the FakeInterruptHandler is not used - instead a real interrupt handler is created and configured with the client generated by SetUpForParallel. this is to facilitate the testing of cross-process aborts, which was the primary motivator for this method.
108+
109+
also a noopProgressSignalRegistrar is used to avoid an annoying data race
110+
*/
111+
func RunFixtureInParallel(description string, callback func(proc int)) bool {
112+
suites := make([]*internal.Suite, conf.ParallelTotal)
113+
finished := make(chan bool, conf.ParallelTotal)
114+
for proc := 1; proc <= conf.ParallelTotal; proc++ {
115+
suites[proc-1] = internal.NewSuite()
116+
WithSuite(suites[proc-1], func() {
117+
callback(proc)
118+
Ω(suites[proc-1].BuildTree()).Should(Succeed())
119+
})
120+
}
121+
for proc := 1; proc <= conf.ParallelTotal; proc++ {
122+
proc := proc
123+
c := conf //make a copy
124+
c.ParallelProcess = proc
125+
exit := exitChannels[proc]
126+
suite := suites[proc-1]
127+
go func() {
128+
interruptHandler := interrupt_handler.NewInterruptHandler(client)
129+
defer interruptHandler.Stop()
130+
131+
success, _ := suite.Run(fmt.Sprintf("%s - %d", description, proc), Label("TopLevelLabel"), "/path/to/suite", failer, reporter, writer, outputInterceptor, interruptHandler, client, noopProgressSignalRegistrar, c)
132+
close(exit)
133+
finished <- success
134+
}()
135+
}
136+
success := true
137+
for proc := 1; proc <= conf.ParallelTotal; proc++ {
138+
success = (<-finished) && success
139+
}
140+
141+
return success
142+
}
143+
98144
func F(options ...interface{}) {
99145
location := cl
100146
message := "fail"

‎internal/interrupt_handler/interrupt_handler.go

+31-16
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/onsi/ginkgo/v2/internal/parallel_support"
1111
)
1212

13-
const ABORT_POLLING_INTERVAL = 500 * time.Millisecond
13+
var ABORT_POLLING_INTERVAL = 500 * time.Millisecond
1414

1515
type InterruptCause uint
1616

@@ -62,25 +62,27 @@ type InterruptHandlerInterface interface {
6262
}
6363

6464
type InterruptHandler struct {
65-
c chan interface{}
66-
lock *sync.Mutex
67-
level InterruptLevel
68-
cause InterruptCause
69-
client parallel_support.Client
70-
stop chan interface{}
71-
signals []os.Signal
65+
c chan interface{}
66+
lock *sync.Mutex
67+
level InterruptLevel
68+
cause InterruptCause
69+
client parallel_support.Client
70+
stop chan interface{}
71+
signals []os.Signal
72+
requestAbortCheck chan interface{}
7273
}
7374

7475
func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) *InterruptHandler {
7576
if len(signals) == 0 {
7677
signals = []os.Signal{os.Interrupt, syscall.SIGTERM}
7778
}
7879
handler := &InterruptHandler{
79-
c: make(chan interface{}),
80-
lock: &sync.Mutex{},
81-
stop: make(chan interface{}),
82-
client: client,
83-
signals: signals,
80+
c: make(chan interface{}),
81+
lock: &sync.Mutex{},
82+
stop: make(chan interface{}),
83+
requestAbortCheck: make(chan interface{}),
84+
client: client,
85+
signals: signals,
8486
}
8587
handler.registerForInterrupts()
8688
return handler
@@ -109,6 +111,12 @@ func (handler *InterruptHandler) registerForInterrupts() {
109111
pollTicker.Stop()
110112
return
111113
}
114+
case <-handler.requestAbortCheck:
115+
if handler.client.ShouldAbort() {
116+
close(abortChannel)
117+
pollTicker.Stop()
118+
return
119+
}
112120
case <-handler.stop:
113121
pollTicker.Stop()
114122
return
@@ -152,11 +160,18 @@ func (handler *InterruptHandler) registerForInterrupts() {
152160

153161
func (handler *InterruptHandler) Status() InterruptStatus {
154162
handler.lock.Lock()
155-
defer handler.lock.Unlock()
156-
157-
return InterruptStatus{
163+
status := InterruptStatus{
158164
Level: handler.level,
159165
Channel: handler.c,
160166
Cause: handler.cause,
161167
}
168+
handler.lock.Unlock()
169+
170+
if handler.client != nil && handler.client.ShouldAbort() && !status.Interrupted() {
171+
close(handler.requestAbortCheck)
172+
<-status.Channel
173+
return handler.Status()
174+
}
175+
176+
return status
162177
}

‎internal/interrupt_handler/interrupt_handler_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,6 @@ var _ = Describe("InterruptHandler", func() {
9696
})
9797
})
9898

99-
// here - test that abort only triggers once
100-
// here - test interplay with signal
10199
Describe("Interrupting when another Ginkgo process has aborted", func() {
102100
var client parallel_support.Client
103101
BeforeEach(func() {
@@ -165,5 +163,10 @@ var _ = Describe("InterruptHandler", func() {
165163
Ω(status.Cause).Should(Equal(interrupt_handler.InterruptCauseSignal))
166164
Ω(status.Level).Should(Equal(interrupt_handler.InterruptLevelCleanupAndReport))
167165
})
166+
167+
It("doesn't just rely on the ABORT_POLLING_INTERVAL timer to report that the interrupt has happened", func() {
168+
client.PostAbort()
169+
Ω(interruptHandler.Status().Cause).Should(Equal(interrupt_handler.InterruptCauseAbortByOtherProcess))
170+
}, MustPassRepeatedly(10))
168171
})
169172
})

‎internal/interrupt_handler/interrupthandler_suite_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package interrupt_handler_test
22

33
import (
44
"testing"
5+
"time"
56

67
. "github.com/onsi/ginkgo/v2"
8+
"github.com/onsi/ginkgo/v2/internal/interrupt_handler"
79
. "github.com/onsi/gomega"
810
)
911

1012
func TestInterrupthandler(t *testing.T) {
13+
interrupt_handler.ABORT_POLLING_INTERVAL = 50 * time.Millisecond
1114
RegisterFailHandler(Fail)
1215
RunSpecs(t, "Interrupthandler Suite")
1316
}

‎internal/suite.go

+6
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,12 @@ func (suite *Suite) runNode(node Node, specDeadline time.Time, text string) (typ
937937
gracePeriodChannel = time.After(gracePeriod)
938938
case <-interruptStatus.Channel:
939939
interruptStatus = suite.interruptHandler.Status()
940+
// ignore interruption from other process if we are cleaning up or reporting
941+
if interruptStatus.Cause == interrupt_handler.InterruptCauseAbortByOtherProcess &&
942+
node.NodeType.Is(types.NodeTypesAllowedDuringReportInterrupt|types.NodeTypesAllowedDuringCleanupInterrupt) {
943+
continue
944+
}
945+
940946
deadlineChannel = nil // don't worry about deadlines, time's up now
941947

942948
failureTimelineLocation := suite.generateTimelineLocation()

‎internal/test_helpers/fake_reporter.go

+8
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,26 @@ func NewFakeReporter() *FakeReporter {
111111
}
112112

113113
func (r *FakeReporter) SuiteWillBegin(report types.Report) {
114+
r.lock.Lock()
115+
defer r.lock.Unlock()
114116
r.Begin = report
115117
}
116118

117119
func (r *FakeReporter) WillRun(report types.SpecReport) {
120+
r.lock.Lock()
121+
defer r.lock.Unlock()
118122
r.Will = append(r.Will, report)
119123
}
120124

121125
func (r *FakeReporter) DidRun(report types.SpecReport) {
126+
r.lock.Lock()
127+
defer r.lock.Unlock()
122128
r.Did = append(r.Did, report)
123129
}
124130

125131
func (r *FakeReporter) SuiteDidEnd(report types.Report) {
132+
r.lock.Lock()
133+
defer r.lock.Unlock()
126134
r.End = report
127135
}
128136
func (r *FakeReporter) EmitProgressReport(progressReport types.ProgressReport) {

0 commit comments

Comments
 (0)
Please sign in to comment.