Skip to content

Commit

Permalink
sdk/metric: Remove Reader.ForceFlush and ManualReader.ForceFlush (#4375)
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared committed Aug 11, 2023
1 parent c513972 commit a9552aa
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 32 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -45,6 +45,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377)
- Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408)

### Removed

- Remove `Reader.ForceFlush` in `go.opentelemetry.io/otel/metric`.
Notice that `PeriodicReader.ForceFlush` is still available. (#4375)

### Fixed

- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/config.go
Expand Up @@ -37,7 +37,9 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro
var fFuncs, sFuncs []func(context.Context) error
for _, r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
if f, ok := r.(interface{ ForceFlush(context.Context) error }); ok {
fFuncs = append(fFuncs, f.ForceFlush)
}
}

return unify(fFuncs), unifyShutdown(sFuncs)
Expand Down
7 changes: 0 additions & 7 deletions sdk/metric/manual_reader.go
Expand Up @@ -91,13 +91,6 @@ func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation
return mr.aggregationSelector(kind)
}

// ForceFlush is a no-op, it always returns nil.
//
// This method is safe to call concurrently.
func (mr *ManualReader) ForceFlush(context.Context) error {
return nil
}

// Shutdown closes any connections and frees any resources used by the reader.
//
// This method is safe to call concurrently.
Expand Down
12 changes: 11 additions & 1 deletion sdk/metric/periodic_reader_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (e *fnExporter) Shutdown(ctx context.Context) error {
type periodicReaderTestSuite struct {
*readerTestSuite

ErrReader Reader
ErrReader *PeriodicReader
}

func (ts *periodicReaderTestSuite) SetupTest() {
Expand Down Expand Up @@ -425,6 +426,15 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
})
}

func TestPeriodicReaderMultipleForceFlush(t *testing.T) {
ctx := context.Background()
r := NewPeriodicReader(new(fnExporter))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
require.NoError(t, r.ForceFlush(ctx))
require.NoError(t, r.ForceFlush(ctx))
}

func BenchmarkPeriodicReader(b *testing.B) {
b.Run("Collect", benchReaderCollectFunc(
NewPeriodicReader(new(fnExporter)),
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/provider.go
Expand Up @@ -110,6 +110,9 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri
// telemetry be flushed or all resources have been released in these
// situations.
//
// ForceFlush calls ForceFlush(context.Context) error
// on all Readers that implements this method.
//
// This method is safe to call concurrently.
func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
if mp.forceFlush != nil {
Expand Down
10 changes: 0 additions & 10 deletions sdk/metric/reader.go
Expand Up @@ -84,16 +84,6 @@ type Reader interface {
// passed context is expected to be honored.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error

// ForceFlush flushes all metric measurements held in an export pipeline.
//
// This deadline or cancellation of the passed context are honored. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// This method needs to be concurrent safe.
ForceFlush(context.Context) error

// Shutdown flushes all metric measurements held in an export pipeline and releases any
// held computational resources.
//
Expand Down
20 changes: 7 additions & 13 deletions sdk/metric/reader_test.go
Expand Up @@ -93,14 +93,6 @@ func (ts *readerTestSuite) TestShutdownTwice() {
ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown)
}

func (ts *readerTestSuite) TestMultipleForceFlush() {
ctx := context.Background()
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.ForceFlush(ctx))
ts.NoError(ts.Reader.ForceFlush(ctx))
}

func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
Expand Down Expand Up @@ -186,11 +178,13 @@ func (ts *readerTestSuite) TestMethodConcurrentSafe() {
_ = ts.Reader.Collect(ctx, nil)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = ts.Reader.ForceFlush(ctx)
}()
if f, ok := ts.Reader.(interface{ ForceFlush(context.Context) error }); ok {
wg.Add(1)
go func() {
defer wg.Done()
_ = f.ForceFlush(ctx)
}()
}

wg.Add(1)
go func() {
Expand Down

0 comments on commit a9552aa

Please sign in to comment.