Skip to content

Commit

Permalink
Include OPA min compat version in telemetry report
Browse files Browse the repository at this point in the history
This commit extends the telemetry report to include the
minimum compatible version of policies loaded into OPA.
This information can be helpful to get visibility into
era of Rego being adopted in the wild.

Fixes: open-policy-agent#6361

Co-authored-by: Stephan Renatus <stephan@styra.com>
Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar and srenatus committed Dec 13, 2023
1 parent 078c9c3 commit d5f964c
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 11 deletions.
47 changes: 38 additions & 9 deletions internal/report/report.go
Expand Up @@ -10,16 +10,16 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/open-policy-agent/opa/keys"
"github.com/open-policy-agent/opa/logging"

"os"
"time"

"github.com/open-policy-agent/opa/plugins/rest"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
Expand All @@ -38,10 +38,16 @@ var ExternalServiceURL = "https://telemetry.openpolicyagent.org"

// Reporter reports information such as the version, heap usage about the running OPA instance to an external service
type Reporter struct {
body map[string]string
body map[string]any
client rest.Client

gatherers map[string]Gatherer
gatherersMtx sync.Mutex
}

// Gatherer represents a mechanism to inject additional data in the telemetry report
type Gatherer func(ctx context.Context) (any, error)

// DataResponse represents the data returned by the external service
type DataResponse struct {
Latest ReleaseDetails `json:"latest,omitempty"`
Expand All @@ -62,8 +68,10 @@ type Options struct {

// New returns an instance of the Reporter
func New(id string, opts Options) (*Reporter, error) {
r := Reporter{}
r.body = map[string]string{
r := Reporter{
gatherers: map[string]Gatherer{},
}
r.body = map[string]any{
"id": id,
"version": version.Version,
}
Expand All @@ -83,6 +91,9 @@ func New(id string, opts Options) (*Reporter, error) {
}
r.client = client

// heap_usage_bytes is always present, so register it unconditionally
r.RegisterGatherer("heap_usage_bytes", readRuntimeMemStats)

return &r, nil
}

Expand All @@ -92,9 +103,15 @@ func (r *Reporter) SendReport(ctx context.Context) (*DataResponse, error) {
rCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var m runtime.MemStats
runtime.ReadMemStats(&m)
r.body["heap_usage_bytes"] = strconv.FormatUint(m.Alloc, 10)
r.gatherersMtx.Lock()
for key, g := range r.gatherers {
var err error
r.body[key], err = g(rCtx)
if err != nil {
return nil, fmt.Errorf("gather telemetry error for key %s: %w", key, err)
}
}
r.gatherersMtx.Unlock()

resp, err := r.client.WithJSON(r.body).Do(rCtx, "POST", "/v1/version")
if err != nil {
Expand All @@ -119,6 +136,12 @@ func (r *Reporter) SendReport(ctx context.Context) (*DataResponse, error) {
}
}

func (r *Reporter) RegisterGatherer(key string, f Gatherer) {
r.gatherersMtx.Lock()
r.gatherers[key] = f
r.gatherersMtx.Unlock()
}

// IsSet returns true if dr is populated.
func (dr *DataResponse) IsSet() bool {
return dr != nil && dr.Latest.LatestRelease != "" && dr.Latest.Download != "" && dr.Latest.ReleaseNotes != ""
Expand Down Expand Up @@ -153,3 +176,9 @@ func (dr *DataResponse) Pretty() string {

return strings.Join(lines, "\n")
}

func readRuntimeMemStats(_ context.Context) (any, error) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return strconv.FormatUint(m.Alloc, 10), nil
}
33 changes: 32 additions & 1 deletion internal/report/report_test.go
Expand Up @@ -114,7 +114,6 @@ func TestReportWithHeapStats(t *testing.T) {
}

_, err = reporter.SendReport(context.Background())

if err != nil {
t.Fatalf("Expected no error but got %v", err)
}
Expand All @@ -124,6 +123,38 @@ func TestReportWithHeapStats(t *testing.T) {
}
}

func TestReportWithExtraKeys(t *testing.T) {

// test server
baseURL, teardown := getTestServer(nil, http.StatusOK)
defer teardown()

t.Setenv("OPA_TELEMETRY_SERVICE_URL", baseURL)

reporter, err := New("", Options{})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

reporter.RegisterGatherer("foobear", func(ctx context.Context) (any, error) {
return map[string]any{"baz": []string{"one", "two"}}, nil
})

_, err = reporter.SendReport(context.Background())
if err != nil {
t.Fatalf("Expected no error but got %v", err)
}

if _, ok := reporter.body["foobear"]; !ok {
t.Fatal("Expected key \"foobear\" in the report")
}

exp := map[string]any{"baz": []string{"one", "two"}}
if act := reporter.body["foobear"]; !reflect.DeepEqual(act, exp) {
t.Fatalf("Expected response: %+v but got: %+v", exp, act)
}
}

func TestPretty(t *testing.T) {
dr := DataResponse{}
resp := dr.Pretty()
Expand Down
91 changes: 91 additions & 0 deletions plugins/plugins.go
Expand Up @@ -8,9 +8,11 @@ package plugins
import (
"context"
"fmt"
mr "math/rand"
"sync"
"time"

"github.com/open-policy-agent/opa/internal/report"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/sdk/trace"

Expand Down Expand Up @@ -148,6 +150,9 @@ const (
DefaultTriggerMode TriggerMode = "periodic"
)

// default interval between OPA report uploads
var defaultUploadIntervalSec = int64(3600)

// Status has a Plugin's current status plus an optional Message.
type Status struct {
State State `json:"state"`
Expand Down Expand Up @@ -200,6 +205,10 @@ type Manager struct {
registeredNDCacheTriggers []func(bool)
bootstrapConfigLabels map[string]string
hooks hooks.Hooks
enableTelemetry bool
reporter *report.Reporter
opaReportNotifyCh chan struct{}
stop chan chan struct{}
}

type managerContextKey string
Expand Down Expand Up @@ -385,6 +394,13 @@ func WithHooks(hs hooks.Hooks) func(*Manager) {
}
}

// WithEnableTelemetry controls whether OPA will send telemetry reports to an external service.
func WithEnableTelemetry(enableTelemetry bool) func(*Manager) {
return func(m *Manager) {
m.enableTelemetry = enableTelemetry
}
}

// New creates a new Manager using config.
func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*Manager, error) {

Expand Down Expand Up @@ -453,6 +469,14 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M
return nil, err
}

if m.enableTelemetry {
reporter, err := report.New(id, report.Options{Logger: m.logger})
if err != nil {
return nil, err
}
m.reporter = reporter
}

return m, nil
}

Expand All @@ -469,6 +493,13 @@ func (m *Manager) Init(ctx context.Context) error {
Context: storage.NewContext(),
}

if m.enableTelemetry {
m.opaReportNotifyCh = make(chan struct{})
m.stop = make(chan chan struct{})
d := time.Duration(int64(time.Second) * defaultUploadIntervalSec)
go m.sendOPAUpdateLoop(ctx, d)
}

err := storage.Txn(ctx, m.Store, params, func(txn storage.Transaction) error {

result, err := initload.InsertAndCompile(ctx, initload.InsertAndCompileOptions{
Expand Down Expand Up @@ -497,6 +528,12 @@ func (m *Manager) Init(ctx context.Context) error {
})

if err != nil {
if m.stop != nil {
done := make(chan struct{})
m.stop <- done
<-done
}

return err
}

Expand Down Expand Up @@ -673,6 +710,12 @@ func (m *Manager) Stop(ctx context.Context) {
m.logger.Error("Error closing store: %v", err)
}
}

if m.stop != nil {
done := make(chan struct{})
m.stop <- done
<-done
}
}

// Reconfigure updates the configuration on the manager.
Expand Down Expand Up @@ -817,6 +860,11 @@ func (m *Manager) onCommit(ctx context.Context, txn storage.Transaction, event s

if compiler != nil {
m.setCompiler(compiler)

if m.enableTelemetry && event.PolicyChanged() {
m.opaReportNotifyCh <- struct{}{}
}

for _, f := range m.registeredTriggers {
f(txn)
}
Expand Down Expand Up @@ -982,3 +1030,46 @@ func (m *Manager) RegisterNDCacheTrigger(trigger func(bool)) {
defer m.mtx.Unlock()
m.registeredNDCacheTriggers = append(m.registeredNDCacheTriggers, trigger)
}

func (m *Manager) sendOPAUpdateLoop(ctx context.Context, uploadDuration time.Duration) {
ticker := time.NewTicker(uploadDuration)
mr.New(mr.NewSource(time.Now().UnixNano()))

ctx, cancel := context.WithCancel(ctx)

var opaReportNotify bool

for {
select {
case <-m.opaReportNotifyCh:
opaReportNotify = true
case <-ticker.C:
ticker.Stop()

if opaReportNotify {
m.reporter.RegisterGatherer("min_compatible_version", func(_ context.Context) (any, error) {
var minimumCompatibleVersion string
if m.compiler.Required != nil {
minimumCompatibleVersion, _ = m.compiler.Required.MinimumCompatibleVersion()
}
return minimumCompatibleVersion, nil
})

_, err := m.reporter.SendReport(ctx)
if err != nil {
m.logger.WithFields(map[string]interface{}{"err": err}).Debug("Unable to send OPA report.")
}

opaReportNotify = false
}

newInterval := mr.Int63n(defaultUploadIntervalSec) + defaultUploadIntervalSec
ticker = time.NewTicker(time.Duration(int64(time.Second) * newInterval))
case done := <-m.stop:
cancel()
ticker.Stop()
done <- struct{}{}
return
}
}
}

0 comments on commit d5f964c

Please sign in to comment.