Skip to content

Commit

Permalink
mt-broker-ingress check audience of received token (#7336)
Browse files Browse the repository at this point in the history
* mt-broker-ingress reject request for wrong audience

* Add e2e test

* Refactor e2e test to be usable for addressables

* Check if brokers audience is nil and return 500
  • Loading branch information
creydr committed Oct 19, 2023
1 parent 3ef831c commit 16d75a9
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 92 deletions.
10 changes: 8 additions & 2 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func main() {
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
ctx = injection.WithConfig(ctx, cfg)
loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
Expand Down Expand Up @@ -150,11 +151,16 @@ func main() {
})
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
ctxFunc := func(ctx context.Context) context.Context {
return featureStore.ToContext(ctx)
}

reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenProvider)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenVerifier, oidcTokenProvider, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/auth/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package auth

import (
"fmt"
"net/http"
"strings"
)

const (
AuthHeaderKey = "Authorization"
)

// GetJWTFromHeader Returns the JWT from the Authorization header
func GetJWTFromHeader(header http.Header) string {
authHeader := header.Get(AuthHeaderKey)
if authHeader == "" {
return ""
}

return strings.TrimPrefix(authHeader, "Bearer ")
}

// SetAuthHeader sets Authorization header with the given JWT
func SetAuthHeader(jwt string, header http.Header) {
header.Set(AuthHeaderKey, fmt.Sprintf("Bearer %s", jwt))
}
64 changes: 48 additions & 16 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
Expand Down Expand Up @@ -67,9 +68,13 @@ type Handler struct {
Logger *zap.Logger

eventDispatcher *kncloudevents.Dispatcher

tokenVerifier *auth.OIDCTokenVerifier

withContext func(ctx context.Context) context.Context
}

func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, oidcTokenProvider *auth.OIDCTokenProvider) (*Handler, error) {
func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, withContext func(ctx context.Context) context.Context) (*Handler, error) {
connectionArgs := kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -115,6 +120,8 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve
Logger: logger,
BrokerLister: brokerInformer.Lister(),
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
tokenVerifier: tokenVerifier,
withContext: withContext,
}, nil
}

Expand All @@ -127,11 +134,7 @@ func (h *Handler) getBroker(name, namespace string) (*eventingv1.Broker, error)
return broker, nil
}

func (h *Handler) getChannelAddress(name, namespace string) (*duckv1.Addressable, error) {
broker, err := h.getBroker(name, namespace)
if err != nil {
return nil, err
}
func (h *Handler) getChannelAddress(broker *eventingv1.Broker) (*duckv1.Addressable, error) {
if broker.Status.Annotations == nil {
return nil, fmt.Errorf("broker status annotations uninitialized")
}
Expand Down Expand Up @@ -184,7 +187,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

ctx := request.Context()
ctx := h.withContext(request.Context())

message := cehttp.NewMessageFromHttpRequest(request)
defer message.Finish(nil)
Expand All @@ -211,6 +214,39 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
Namespace: brokerNamespace,
}

broker, err := h.getBroker(brokerName, brokerNamespace)
if err != nil {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}

features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
h.Logger.Debug("OIDC authentication is enabled")

if broker.Status.Address.Audience == nil {
h.Logger.Warn(fmt.Sprintf("Audience of broker %s/%s must not be nil, while feature %s is enabled", broker.Name, broker.Namespace, feature.OIDCAuthentication))
writer.WriteHeader(http.StatusInternalServerError)
return
}

token := auth.GetJWTFromHeader(request.Header)
if token == "" {
h.Logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication))
writer.WriteHeader(http.StatusUnauthorized)
return
}

if _, err := h.tokenVerifier.VerifyJWT(ctx, token, *broker.Status.Address.Audience); err != nil {
h.Logger.Warn("no valid JWT provided", zap.Error(err))
writer.WriteHeader(http.StatusUnauthorized)
return
}

h.Logger.Debug("Request contained a valid JWT. Continuing...")
}

ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerNamespacedName))
defer span.End()

Expand All @@ -230,7 +266,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
eventType: event.Type(),
}

statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, brokerNamespace, brokerName)
statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker)
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
}
Expand All @@ -240,11 +276,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

// EventType auto-create feature handling
if h.EvenTypeHandler != nil {
b, err := h.getBroker(brokerName, brokerNamespace)
if err != nil {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
}
if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(b), b.GetUID()); err != nil {
if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID()); err != nil {
h.Logger.Error("Even type auto create failed", zap.Error(err))
}
}
Expand All @@ -267,7 +299,7 @@ func toKReference(broker *eventingv1.Broker) *duckv1.KReference {
return kref
}

func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) {
func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerObj *eventingv1.Broker) (int, time.Duration) {
// Setting the extension as a string as the CloudEvents sdk does not support non-string extensions.
event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()})
if h.Defaulter != nil {
Expand All @@ -280,9 +312,9 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
return http.StatusBadRequest, kncloudevents.NoDuration
}

channelAddress, err := h.getChannelAddress(brokerName, brokerNamespace)
channelAddress, err := h.getChannelAddress(brokerObj)
if err != nil {
h.Logger.Warn("Broker not found in the namespace", zap.Error(err))
h.Logger.Warn("could not get channel address from broker", zap.Error(err))
return http.StatusBadRequest, kncloudevents.NoDuration
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ingress

import (
"bytes"
"context"
"io"
nethttp "net/http"
"net/http/httptest"
Expand Down Expand Up @@ -285,9 +286,18 @@ func TestHandler_ServeHTTP(t *testing.T) {
brokerinformerfake.Get(ctx).Informer().GetStore().Add(b)
}

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx), oidcTokenProvider)
tokenProvider := auth.NewOIDCTokenProvider(ctx)
tokenVerifier := auth.NewOIDCTokenVerifier(ctx)

h, err := NewHandler(logger,
&mockReporter{},
tc.defaulter,
brokerinformerfake.Get(ctx),
tokenVerifier,
tokenProvider,
func(ctx context.Context) context.Context {
return ctx
})
if err != nil {
t.Fatal("Unable to create receiver:", err)
}
Expand Down

0 comments on commit 16d75a9

Please sign in to comment.