diff --git a/internal/report/report.go b/internal/report/report.go index 53eb10d41e..c86dfc497b 100644 --- a/internal/report/report.go +++ b/internal/report/report.go @@ -10,16 +10,16 @@ import ( "encoding/json" "fmt" "net/http" + "os" "runtime" "strconv" "strings" + "sync" + "time" "github.com/open-policy-agent/opa/keys" "github.com/open-policy-agent/opa/logging" - "os" - "time" - "github.com/open-policy-agent/opa/plugins/rest" "github.com/open-policy-agent/opa/util" "github.com/open-policy-agent/opa/version" @@ -38,10 +38,16 @@ var ExternalServiceURL = "https://telemetry.openpolicyagent.org" // Reporter reports information such as the version, heap usage about the running OPA instance to an external service type Reporter struct { - body map[string]string + body map[string]any client rest.Client + + gatherers map[string]Gatherer + gatherersMtx sync.Mutex } +// Gatherer represents a mechanism to inject additional data in the telemetry report +type Gatherer func(ctx context.Context) (any, error) + // DataResponse represents the data returned by the external service type DataResponse struct { Latest ReleaseDetails `json:"latest,omitempty"` @@ -62,8 +68,10 @@ type Options struct { // New returns an instance of the Reporter func New(id string, opts Options) (*Reporter, error) { - r := Reporter{} - r.body = map[string]string{ + r := Reporter{ + gatherers: map[string]Gatherer{}, + } + r.body = map[string]any{ "id": id, "version": version.Version, } @@ -83,6 +91,9 @@ func New(id string, opts Options) (*Reporter, error) { } r.client = client + // heap_usage_bytes is always present, so register it unconditionally + r.RegisterGatherer("heap_usage_bytes", readRuntimeMemStats) + return &r, nil } @@ -92,9 +103,15 @@ func (r *Reporter) SendReport(ctx context.Context) (*DataResponse, error) { rCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - var m runtime.MemStats - runtime.ReadMemStats(&m) - r.body["heap_usage_bytes"] = strconv.FormatUint(m.Alloc, 10) + r.gatherersMtx.Lock() + for key, g := range r.gatherers { + var err error + r.body[key], err = g(rCtx) + if err != nil { + return nil, fmt.Errorf("gather telemetry error for key %s: %w", key, err) + } + } + r.gatherersMtx.Unlock() resp, err := r.client.WithJSON(r.body).Do(rCtx, "POST", "/v1/version") if err != nil { @@ -119,6 +136,12 @@ func (r *Reporter) SendReport(ctx context.Context) (*DataResponse, error) { } } +func (r *Reporter) RegisterGatherer(key string, f Gatherer) { + r.gatherersMtx.Lock() + r.gatherers[key] = f + r.gatherersMtx.Unlock() +} + // IsSet returns true if dr is populated. func (dr *DataResponse) IsSet() bool { return dr != nil && dr.Latest.LatestRelease != "" && dr.Latest.Download != "" && dr.Latest.ReleaseNotes != "" @@ -153,3 +176,9 @@ func (dr *DataResponse) Pretty() string { return strings.Join(lines, "\n") } + +func readRuntimeMemStats(_ context.Context) (any, error) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return strconv.FormatUint(m.Alloc, 10), nil +} diff --git a/internal/report/report_test.go b/internal/report/report_test.go index 70ca42685a..2171cd26b8 100644 --- a/internal/report/report_test.go +++ b/internal/report/report_test.go @@ -114,7 +114,6 @@ func TestReportWithHeapStats(t *testing.T) { } _, err = reporter.SendReport(context.Background()) - if err != nil { t.Fatalf("Expected no error but got %v", err) } @@ -124,6 +123,38 @@ func TestReportWithHeapStats(t *testing.T) { } } +func TestReportWithExtraKeys(t *testing.T) { + + // test server + baseURL, teardown := getTestServer(nil, http.StatusOK) + defer teardown() + + t.Setenv("OPA_TELEMETRY_SERVICE_URL", baseURL) + + reporter, err := New("", Options{}) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + reporter.RegisterGatherer("foobear", func(ctx context.Context) (any, error) { + return map[string]any{"baz": []string{"one", "two"}}, nil + }) + + _, err = reporter.SendReport(context.Background()) + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + if _, ok := reporter.body["foobear"]; !ok { + t.Fatal("Expected key \"foobear\" in the report") + } + + exp := map[string]any{"baz": []string{"one", "two"}} + if act := reporter.body["foobear"]; !reflect.DeepEqual(act, exp) { + t.Fatalf("Expected response: %+v but got: %+v", exp, act) + } +} + func TestPretty(t *testing.T) { dr := DataResponse{} resp := dr.Pretty() diff --git a/plugins/plugins.go b/plugins/plugins.go index 830da78bdc..cd8baa0cec 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -8,9 +8,11 @@ package plugins import ( "context" "fmt" + mr "math/rand" "sync" "time" + "github.com/open-policy-agent/opa/internal/report" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/sdk/trace" @@ -148,6 +150,9 @@ const ( DefaultTriggerMode TriggerMode = "periodic" ) +// default interval between OPA report uploads +var defaultUploadIntervalSec = int64(3600) + // Status has a Plugin's current status plus an optional Message. type Status struct { State State `json:"state"` @@ -200,6 +205,10 @@ type Manager struct { registeredNDCacheTriggers []func(bool) bootstrapConfigLabels map[string]string hooks hooks.Hooks + enableTelemetry bool + reporter *report.Reporter + opaReportNotifyCh chan struct{} + stop chan chan struct{} } type managerContextKey string @@ -385,6 +394,13 @@ func WithHooks(hs hooks.Hooks) func(*Manager) { } } +// WithEnableTelemetry controls whether OPA will send telemetry reports to an external service. +func WithEnableTelemetry(enableTelemetry bool) func(*Manager) { + return func(m *Manager) { + m.enableTelemetry = enableTelemetry + } +} + // New creates a new Manager using config. func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*Manager, error) { @@ -453,6 +469,14 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M return nil, err } + if m.enableTelemetry { + reporter, err := report.New(id, report.Options{Logger: m.logger}) + if err != nil { + return nil, err + } + m.reporter = reporter + } + return m, nil } @@ -469,6 +493,13 @@ func (m *Manager) Init(ctx context.Context) error { Context: storage.NewContext(), } + if m.enableTelemetry { + m.opaReportNotifyCh = make(chan struct{}) + m.stop = make(chan chan struct{}) + d := time.Duration(int64(time.Second) * defaultUploadIntervalSec) + go m.sendOPAUpdateLoop(ctx, d) + } + err := storage.Txn(ctx, m.Store, params, func(txn storage.Transaction) error { result, err := initload.InsertAndCompile(ctx, initload.InsertAndCompileOptions{ @@ -497,6 +528,12 @@ func (m *Manager) Init(ctx context.Context) error { }) if err != nil { + if m.stop != nil { + done := make(chan struct{}) + m.stop <- done + <-done + } + return err } @@ -673,6 +710,12 @@ func (m *Manager) Stop(ctx context.Context) { m.logger.Error("Error closing store: %v", err) } } + + if m.stop != nil { + done := make(chan struct{}) + m.stop <- done + <-done + } } // Reconfigure updates the configuration on the manager. @@ -817,6 +860,11 @@ func (m *Manager) onCommit(ctx context.Context, txn storage.Transaction, event s if compiler != nil { m.setCompiler(compiler) + + if m.enableTelemetry && event.PolicyChanged() { + m.opaReportNotifyCh <- struct{}{} + } + for _, f := range m.registeredTriggers { f(txn) } @@ -982,3 +1030,46 @@ func (m *Manager) RegisterNDCacheTrigger(trigger func(bool)) { defer m.mtx.Unlock() m.registeredNDCacheTriggers = append(m.registeredNDCacheTriggers, trigger) } + +func (m *Manager) sendOPAUpdateLoop(ctx context.Context, uploadDuration time.Duration) { + ticker := time.NewTicker(uploadDuration) + mr.New(mr.NewSource(time.Now().UnixNano())) + + ctx, cancel := context.WithCancel(ctx) + + var opaReportNotify bool + + for { + select { + case <-m.opaReportNotifyCh: + opaReportNotify = true + case <-ticker.C: + ticker.Stop() + + if opaReportNotify { + m.reporter.RegisterGatherer("min_compatible_version", func(_ context.Context) (any, error) { + var minimumCompatibleVersion string + if m.compiler.Required != nil { + minimumCompatibleVersion, _ = m.compiler.Required.MinimumCompatibleVersion() + } + return minimumCompatibleVersion, nil + }) + + _, err := m.reporter.SendReport(ctx) + if err != nil { + m.logger.WithFields(map[string]interface{}{"err": err}).Debug("Unable to send OPA report.") + } + + opaReportNotify = false + } + + newInterval := mr.Int63n(defaultUploadIntervalSec) + defaultUploadIntervalSec + ticker = time.NewTicker(time.Duration(int64(time.Second) * newInterval)) + case done := <-m.stop: + cancel() + ticker.Stop() + done <- struct{}{} + return + } + } +} diff --git a/plugins/plugins_test.go b/plugins/plugins_test.go index 9ab07d9d7d..5ecdfe2035 100644 --- a/plugins/plugins_test.go +++ b/plugins/plugins_test.go @@ -6,16 +6,22 @@ package plugins import ( "context" + "encoding/json" "fmt" "net/http" + "net/http/httptest" "reflect" "testing" + "time" + "github.com/open-policy-agent/opa/bundle" internal_tracing "github.com/open-policy-agent/opa/internal/distributedtracing" + "github.com/open-policy-agent/opa/internal/file/archive" "github.com/open-policy-agent/opa/internal/storage/mock" "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/logging/test" "github.com/open-policy-agent/opa/plugins/rest" + "github.com/open-policy-agent/opa/storage" inmem "github.com/open-policy-agent/opa/storage/inmem/test" "github.com/open-policy-agent/opa/topdown/cache" prom "github.com/prometheus/client_golang/prometheus" @@ -184,6 +190,102 @@ func TestPluginStatusUpdateOnStartAndStop(t *testing.T) { m.Stop(context.Background()) } +func TestManagerWithOPATelemetryUpdateLoop(t *testing.T) { + // test server + mux := http.NewServeMux() + ts := httptest.NewServer(mux) + + count := 0 + mux.HandleFunc("/v1/version", func(w http.ResponseWriter, req *http.Request) { + count++ + w.WriteHeader(http.StatusOK) + bs, _ := json.Marshal(map[string]string{"foo": "bar"}) // dummy data + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(bs) // ignore error + }) + defer ts.Close() + + t.Setenv("OPA_TELEMETRY_SERVICE_URL", ts.URL) + + ctx := context.Background() + + m, err := New([]byte{}, "test", inmem.New(), WithEnableTelemetry(true)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + defaultUploadIntervalSec = int64(1) + + err = m.Start(context.Background()) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + // add a policy to the store to trigger a telemetry update + module := `package x + p { array.reverse([1,2,3]) }` + + err = storage.Txn(ctx, m.Store, storage.WriteParams, func(txn storage.Transaction) error { + return m.Store.UpsertPolicy(ctx, txn, "policy.rego", []byte(module)) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + time.Sleep(2 * time.Second) + + // add data to the store and verify there is no trigger for a telemetry update + err = storage.Txn(ctx, m.Store, storage.WriteParams, func(txn storage.Transaction) error { + return m.Store.Write(ctx, txn, storage.AddOp, storage.MustParsePath("/a"), `[2,1,3]`) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // add a bundle with some policy to trigger a telemetry update + txn := storage.NewTransactionOrDie(ctx, m.Store, storage.WriteParams) + + var archiveFiles = map[string]string{ + "/a/b/c/data.json": "[1,2,3]", + "/policy.rego": "package foo\n import future.keywords.every", + "/roles/policy.rego": "package bar\n import future.keywords.if\n p.a.b.c.d if { true }", + } + + files := make([][2]string, 0, len(archiveFiles)) + for name, content := range archiveFiles { + files = append(files, [2]string{name, content}) + } + + buf := archive.MustWriteTarGz(files) + b, err := bundle.NewReader(buf).WithLazyLoadingMode(true).Read() + if err != nil { + t.Fatal(err) + } + + iterator := bundle.NewIterator(b.Raw) + + params := storage.WriteParams + params.BasePaths = []string{""} + + err = m.Store.Truncate(ctx, txn, params, iterator) + if err != nil { + t.Fatalf("Unexpected truncate error: %v", err) + } + + if err := m.Store.Commit(ctx, txn); err != nil { + t.Fatalf("Unexpected commit error: %v", err) + } + + time.Sleep(2 * time.Second) + + m.Stop(ctx) + + exp := 2 + if count != exp { + t.Fatalf("Expected number of server calls: %+v but got: %+v", exp, count) + } +} + type testPlugin struct { m *Manager } diff --git a/runtime/runtime.go b/runtime/runtime.go index 86dd08bd59..a3fb495f09 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -404,7 +404,8 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) { plugins.PrintHook(loggingPrintHook{logger: logger}), plugins.WithRouter(params.Router), plugins.WithPrometheusRegister(metrics), - plugins.WithTracerProvider(tracerProvider)) + plugins.WithTracerProvider(tracerProvider), + plugins.WithEnableTelemetry(params.EnableVersionCheck)) if err != nil { return nil, fmt.Errorf("config error: %w", err) }