Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

mt-broker-ingress check audience of received token #7336

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func main() {
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
ctx = injection.WithConfig(ctx, cfg)
loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
Expand Down Expand Up @@ -150,11 +151,16 @@ func main() {
})
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
ctxFunc := func(ctx context.Context) context.Context {
return featureStore.ToContext(ctx)
}

reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenProvider)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenVerifier, oidcTokenProvider, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/auth/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2023 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package auth

import (
"fmt"
"net/http"
"strings"
)

const (
AuthHeaderKey = "Authorization"
)

// GetJWTFromHeader Returns the JWT from the Authorization header
func GetJWTFromHeader(header http.Header) string {
authHeader := header.Get(AuthHeaderKey)
if authHeader == "" {
return ""
}

return strings.TrimPrefix(authHeader, "Bearer ")
}

// SetAuthHeader sets Authorization header with the given JWT
func SetAuthHeader(jwt string, header http.Header) {
header.Set(AuthHeaderKey, fmt.Sprintf("Bearer %s", jwt))
}
64 changes: 48 additions & 16 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
Expand Down Expand Up @@ -67,9 +68,13 @@ type Handler struct {
Logger *zap.Logger

eventDispatcher *kncloudevents.Dispatcher

tokenVerifier *auth.OIDCTokenVerifier

withContext func(ctx context.Context) context.Context
}

func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, oidcTokenProvider *auth.OIDCTokenProvider) (*Handler, error) {
func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, withContext func(ctx context.Context) context.Context) (*Handler, error) {
connectionArgs := kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -115,6 +120,8 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve
Logger: logger,
BrokerLister: brokerInformer.Lister(),
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
tokenVerifier: tokenVerifier,
withContext: withContext,
}, nil
}

Expand All @@ -127,11 +134,7 @@ func (h *Handler) getBroker(name, namespace string) (*eventingv1.Broker, error)
return broker, nil
}

func (h *Handler) getChannelAddress(name, namespace string) (*duckv1.Addressable, error) {
broker, err := h.getBroker(name, namespace)
if err != nil {
return nil, err
}
func (h *Handler) getChannelAddress(broker *eventingv1.Broker) (*duckv1.Addressable, error) {
if broker.Status.Annotations == nil {
return nil, fmt.Errorf("broker status annotations uninitialized")
}
Expand Down Expand Up @@ -184,7 +187,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

ctx := request.Context()
ctx := h.withContext(request.Context())

message := cehttp.NewMessageFromHttpRequest(request)
defer message.Finish(nil)
Expand All @@ -211,6 +214,39 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
Namespace: brokerNamespace,
}

broker, err := h.getBroker(brokerName, brokerNamespace)
if err != nil {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}

features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
h.Logger.Debug("OIDC authentication is enabled")

if broker.Status.Address.Audience == nil {
h.Logger.Warn(fmt.Sprintf("Audience of broker %s/%s must not be nil, while feature %s is enabled", broker.Name, broker.Namespace, feature.OIDCAuthentication))
writer.WriteHeader(http.StatusInternalServerError)
return
}

token := auth.GetJWTFromHeader(request.Header)
if token == "" {
h.Logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication))
writer.WriteHeader(http.StatusUnauthorized)
return
}

if _, err := h.tokenVerifier.VerifyJWT(ctx, token, *broker.Status.Address.Audience); err != nil {
creydr marked this conversation as resolved.
Show resolved Hide resolved
h.Logger.Warn("no valid JWT provided", zap.Error(err))
writer.WriteHeader(http.StatusUnauthorized)
return
}

h.Logger.Debug("Request contained a valid JWT. Continuing...")
}

ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerNamespacedName))
defer span.End()

Expand All @@ -230,7 +266,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
eventType: event.Type(),
}

statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, brokerNamespace, brokerName)
statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker)
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
}
Expand All @@ -240,11 +276,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

// EventType auto-create feature handling
if h.EvenTypeHandler != nil {
b, err := h.getBroker(brokerName, brokerNamespace)
if err != nil {
h.Logger.Warn("Failed to retrieve broker", zap.Error(err))
}
if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(b), b.GetUID()); err != nil {
if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID()); err != nil {
h.Logger.Error("Even type auto create failed", zap.Error(err))
}
}
Expand All @@ -267,7 +299,7 @@ func toKReference(broker *eventingv1.Broker) *duckv1.KReference {
return kref
}

func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) {
func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerObj *eventingv1.Broker) (int, time.Duration) {
// Setting the extension as a string as the CloudEvents sdk does not support non-string extensions.
event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()})
if h.Defaulter != nil {
Expand All @@ -280,9 +312,9 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
return http.StatusBadRequest, kncloudevents.NoDuration
}

channelAddress, err := h.getChannelAddress(brokerName, brokerNamespace)
channelAddress, err := h.getChannelAddress(brokerObj)
if err != nil {
h.Logger.Warn("Broker not found in the namespace", zap.Error(err))
h.Logger.Warn("could not get channel address from broker", zap.Error(err))
return http.StatusBadRequest, kncloudevents.NoDuration
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ingress

import (
"bytes"
"context"
"io"
nethttp "net/http"
"net/http/httptest"
Expand Down Expand Up @@ -285,9 +286,18 @@ func TestHandler_ServeHTTP(t *testing.T) {
brokerinformerfake.Get(ctx).Informer().GetStore().Add(b)
}

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx), oidcTokenProvider)
tokenProvider := auth.NewOIDCTokenProvider(ctx)
tokenVerifier := auth.NewOIDCTokenVerifier(ctx)

h, err := NewHandler(logger,
&mockReporter{},
tc.defaulter,
brokerinformerfake.Get(ctx),
tokenVerifier,
tokenProvider,
func(ctx context.Context) context.Context {
return ctx
})
if err != nil {
t.Fatal("Unable to create receiver:", err)
}
Expand Down