Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Kubernetes aware of the LoadBalancer behaviour #119937

Merged
merged 4 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/proxy/conntrack/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP)
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
for _, lbIP := range svcInfo.LoadBalancerVIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP)
}
nodePort := svcInfo.NodePort()
Expand Down Expand Up @@ -100,7 +100,7 @@ func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap pro
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
for _, lbIP := range svcInfo.LoadBalancerVIPStrings() {
err := ClearEntriesForNAT(exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func (proxier *Proxier) syncProxyRules() {
// create a firewall chain.
loadBalancerTrafficChain := externalTrafficChain
fwChain := svcInfo.firewallChainName
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
if usesFWChain {
activeNATChains[fwChain] = true
loadBalancerTrafficChain = fwChain
Expand Down Expand Up @@ -1116,7 +1116,7 @@ func (proxier *Proxier) syncProxyRules() {
}

// Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerIPStrings() {
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
if hasEndpoints {
natRules.Write(
"-A", string(kubeServicesChain),
Expand All @@ -1141,7 +1141,7 @@ func (proxier *Proxier) syncProxyRules() {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get short-circuited
// by the EXT chain.)
for _, lbip := range svcInfo.LoadBalancerIPStrings() {
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment,
Expand Down Expand Up @@ -1319,7 +1319,7 @@ func (proxier *Proxier) syncProxyRules() {
// will loop back with the source IP set to the VIP. We
// need the following rules to allow requests from this node.
if allowFromNode {
for _, lbip := range svcInfo.LoadBalancerIPStrings() {
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
natRules.Write(
args,
"-s", lbip,
Expand Down
128 changes: 128 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/metrics"
Expand Down Expand Up @@ -8276,3 +8279,128 @@ func TestNoEndpointsMetric(t *testing.T) {
})
}
}

func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP

testCases := []struct {
name string
ipModeEnabled bool
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedRule bool
}{
/* LoadBalancerIPMode disabled */
{
name: "LoadBalancerIPMode disabled, ipMode Proxy",
ipModeEnabled: false,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: true,
},
{
name: "LoadBalancerIPMode disabled, ipMode VIP",
ipModeEnabled: false,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
name: "LoadBalancerIPMode disabled, ipMode nil",
ipModeEnabled: false,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
/* LoadBalancerIPMode enabled */
{
name: "LoadBalancerIPMode enabled, ipMode Proxy",
ipModeEnabled: true,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedRule: false,
},
{
name: "LoadBalancerIPMode enabled, ipMode VIP",
ipModeEnabled: true,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedRule: true,
},
{
name: "LoadBalancerIPMode enabled, ipMode nil",
ipModeEnabled: true,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedRule: true,
},
}

svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)

tcpProtocol := v1.ProtocolTCP
populateEndpointSlices(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
}),
)

fp.syncProxyRules()

c, _ := ipt.Dump.GetChain(utiliptables.TableNAT, kubeServicesChain)
ruleExists := false
for _, r := range c.Rules {
if r.DestinationAddress != nil && r.DestinationAddress.Value == testCase.svcLBIP {
ruleExists = true
}
}
if ruleExists != testCase.expectedRule {
t.Errorf("unexpected rule for %s", testCase.svcLBIP)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func (proxier *Proxier) syncProxyRules() {
}

// Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerIPStrings() {
for _, ingress := range svcInfo.LoadBalancerVIPStrings() {
// ipset call
entry = &utilipset.Entry{
IP: ingress,
Expand Down
123 changes: 123 additions & 0 deletions pkg/proxy/ipvs/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
Expand Down Expand Up @@ -5836,3 +5839,123 @@ func TestDismissLocalhostRuleExist(t *testing.T) {
})
}
}

func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
ipModeProxy := v1.LoadBalancerIPModeProxy
ipModeVIP := v1.LoadBalancerIPModeVIP

testCases := []struct {
name string
ipModeEnabled bool
svcIP string
svcLBIP string
ipMode *v1.LoadBalancerIPMode
expectedServices int
}{
/* LoadBalancerIPMode disabled */
{
name: "LoadBalancerIPMode disabled, ipMode Proxy",
ipModeEnabled: false,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode disabled, ipMode VIP",
ipModeEnabled: false,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode disabled, ipMode nil",
ipModeEnabled: false,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
/* LoadBalancerIPMode enabled */
{
name: "LoadBalancerIPMode enabled, ipMode Proxy",
ipModeEnabled: true,
svcIP: "10.20.30.41",
svcLBIP: "1.2.3.4",
ipMode: &ipModeProxy,
expectedServices: 1,
},
{
name: "LoadBalancerIPMode enabled, ipMode VIP",
ipModeEnabled: true,
svcIP: "10.20.30.42",
svcLBIP: "1.2.3.5",
ipMode: &ipModeVIP,
expectedServices: 2,
},
{
name: "LoadBalancerIPMode enabled, ipMode nil",
ipModeEnabled: true,
svcIP: "10.20.30.43",
svcLBIP: "1.2.3.6",
ipMode: nil,
expectedServices: 2,
},
}

svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
_, fp := buildFakeProxier()
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = testCase.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: testCase.svcLBIP,
IPMode: testCase.ipMode,
}}
}),
)

tcpProtocol := v1.ProtocolTCP
makeEndpointSliceMap(fp,
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
}),
)

fp.syncProxyRules()

services, err := fp.ipvs.GetVirtualServers()
if err != nil {
t.Errorf("Failed to get ipvs services, err: %v", err)
}
if len(services) != testCase.expectedServices {
t.Errorf("Expected %d ipvs services, got %d", testCase.expectedServices, len(services))
}
})
}
}