Skip to content

Commit

Permalink
Merge pull request kubernetes#25 from appscode/reverse-dns
Browse files Browse the repository at this point in the history
Reverse DNS records for named headless services
  • Loading branch information
bowei authored Jan 13, 2017
2 parents 9ef62d6 + cbbbfd9 commit 3700f98
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 7 deletions.
105 changes: 98 additions & 7 deletions pkg/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,10 @@ func (kd *KubeDNS) setEndpointsStore() {
&v1.Endpoints{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{
AddFunc: kd.handleEndpointAdd,
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates.
kd.handleEndpointAdd(newObj)
},
// No DeleteFunc for EndpointsStore because endpoint object will be deleted
// when corresponding service is deleted.
AddFunc: kd.handleEndpointAdd,
UpdateFunc: kd.handleEndpointUpdate,
// If Service is named headless need to remove the reverse dns entries.
DeleteFunc: kd.handleEndpointDelete,
},
)
}
Expand Down Expand Up @@ -323,6 +320,94 @@ func (kd *KubeDNS) handleEndpointAdd(obj interface{}) {
}
}

func (kd *KubeDNS) handleEndpointUpdate(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*v1.Endpoints)
if !ok {
glog.Errorf("oldObj type assertion failed! Expected 'v1.Endpoints', got %T", oldObj)
return
}

newEndpoints, ok := newObj.(*v1.Endpoints)
if !ok {
glog.Errorf("newObj type assertion failed! Expected 'v1.Endpoints', got %T", newObj)
return
}

// oldAddressMap is use to hold oldEndpoints addresses that are not
// in newEndpoints
oldAddressMap := make(map[string]bool)

// svc is same for both old and new endpoints
svc, err := kd.getServiceFromEndpoints(oldEndpoints)
if svc != nil && err == nil {
if !v1.IsServiceIPSet(svc) {
for idx := range oldEndpoints.Subsets {
for subIdx := range oldEndpoints.Subsets[idx].Addresses {
address := &oldEndpoints.Subsets[idx].Addresses[subIdx]
endpointIP := address.IP
if _, has := getHostname(address); has {
oldAddressMap[endpointIP] = true
}
}
}

for idx := range newEndpoints.Subsets {
for subIdx := range newEndpoints.Subsets[idx].Addresses {
address := newEndpoints.Subsets[idx].Addresses[subIdx]
endpointIP := address.IP
if _, ok := oldAddressMap[endpointIP]; ok {
address := &newEndpoints.Subsets[idx].Addresses[subIdx]
// Entries are both in old and new endpoint. Remove from the `oldAddressMap`
// if the address is still named to the service.
if _, has := getHostname(address); has {
// The service is still named in the Pod
delete(oldAddressMap, endpointIP)
}
}
}
}

// Remove all old PTR records for the endpoints that are not
// in new endpoints, or
// the addresses that are no longer named.
kd.cacheLock.Lock()
for k := range oldAddressMap {
delete(kd.reverseRecordMap, k)
}
kd.cacheLock.Unlock()
}
}

// TODO: Avoid unwanted updates.
kd.handleEndpointAdd(newObj)
}

func (kd *KubeDNS) handleEndpointDelete(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
glog.Errorf("obj type assertion failed! Expected 'v1.Endpoints', got %T", obj)
return
}

svc, err := kd.getServiceFromEndpoints(endpoints)
if svc != nil && err == nil {
if !v1.IsServiceIPSet(svc) {
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
// When endpoints for Named headless services deleted, delete old reverse dns records.
for idx := range endpoints.Subsets {
for subIdx := range endpoints.Subsets[idx].Addresses {
address := &endpoints.Subsets[idx].Addresses[subIdx]
endpointIP := address.IP
if _, has := getHostname(address); has {
delete(kd.reverseRecordMap, endpointIP)
}
}
}
}
}
}

func (kd *KubeDNS) addDNSUsingEndpoints(e *v1.Endpoints) error {
svc, err := kd.getServiceFromEndpoints(e)
if err != nil {
Expand Down Expand Up @@ -412,6 +497,12 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *v1.Endpoints, svc *v1.Se
subCache.SetEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
}
}

// Generate PTR records only for Named Headless service.
if _, has := getHostname(address); has {
reverseRecord, _ := util.GetSkyMsg(kd.fqdn(svc, endpointName), 0)
kd.reverseRecordMap[endpointIP] = reverseRecord
}
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
Expand Down
146 changes: 146 additions & 0 deletions pkg/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ func TestSimpleHeadlessService(t *testing.T) {
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(s)
assertDNSForHeadlessService(t, kd, endpoints)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)
kd.removeService(s)
assertNoDNSForHeadlessService(t, kd, s)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)
}

func TestHeadlessServiceWithNamedPorts(t *testing.T) {
Expand All @@ -299,9 +301,11 @@ func TestHeadlessServiceWithNamedPorts(t *testing.T) {
// We expect 6 records. 4 SRV records. 2 POD records.
assertDNSForHeadlessService(t, kd, endpoints)
assertSRVForHeadlessService(t, kd, service, endpoints)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)

kd.removeService(service)
assertNoDNSForHeadlessService(t, kd, service)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)
}

func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
Expand All @@ -325,15 +329,133 @@ func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
// expected DNSRecords = 4
kd.handleEndpointAdd(endpoints)
assertDNSForHeadlessService(t, kd, endpoints)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)

// remove all endpoints
endpoints.Subsets = []v1.EndpointSubset{}
kd.handleEndpointAdd(endpoints)
assertNoDNSForHeadlessService(t, kd, service)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)

// remove service
kd.removeService(service)
assertNoDNSForHeadlessService(t, kd, service)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)
}

func TestNamedHeadlessServiceEndpointAdd(t *testing.T) {
kd := newKubeDNS()

service := newHeadlessService()
// add service to store
assert.NoError(t, kd.servicesStore.Add(service))

endpoints := newEndpoints(service, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "10.0.0.1",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: testNamespace,
},
Hostname: "foo",
},
},
Ports: []v1.EndpointPort{},
})
// add endpoints to store
assert.NoError(t, kd.endpointsStore.Add(endpoints))

// add service
kd.newService(service)
assertDNSForHeadlessService(t, kd, endpoints)

kd.handleEndpointAdd(endpoints)
assertDNSForHeadlessService(t, kd, endpoints)
assertReverseDNSForNamedHeadlessService(t, kd, endpoints)
}

func TestNamedHeadlessServiceEndpointUpdate(t *testing.T) {
kd := newKubeDNS()

service := newHeadlessService()
// add service to store
assert.NoError(t, kd.servicesStore.Add(service))

oldEndpoints := newEndpoints(service, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "10.0.0.1",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: testNamespace,
},
Hostname: "foo",
},
},
Ports: []v1.EndpointPort{},
})
// add endpoints to store
assert.NoError(t, kd.endpointsStore.Add(oldEndpoints))

newEndpoints := newEndpoints(service, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "10.0.0.2",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: testNamespace,
},
Hostname: "foo",
},
},
Ports: []v1.EndpointPort{},
})

// add service
kd.newService(service)
assertDNSForHeadlessService(t, kd, oldEndpoints)

kd.handleEndpointUpdate(oldEndpoints, newEndpoints)
assertDNSForHeadlessService(t, kd, newEndpoints)
assertNoReverseDNSForHeadlessService(t, kd, oldEndpoints)
assertReverseDNSForNamedHeadlessService(t, kd, newEndpoints)
}

func TestNamedHeadlessServiceEndpointDelete(t *testing.T) {
kd := newKubeDNS()

service := newHeadlessService()
// add service to store
assert.NoError(t, kd.servicesStore.Add(service))

endpoints := newEndpoints(service, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{
{
IP: "10.0.0.1",
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: testNamespace,
},
Hostname: "foo",
},
},
Ports: []v1.EndpointPort{},
})
// add endpoints to store
assert.NoError(t, kd.endpointsStore.Add(endpoints))

// add service
kd.newService(service)
assertDNSForHeadlessService(t, kd, endpoints)

kd.handleEndpointDelete(endpoints)
assertDNSForHeadlessService(t, kd, endpoints)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)
}

func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
Expand All @@ -358,10 +480,12 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
kd.handleEndpointAdd(endpoints)

assertDNSForHeadlessService(t, kd, endpoints)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)

// remove service
kd.removeService(service)
assertNoDNSForHeadlessService(t, kd, service)
assertNoReverseDNSForHeadlessService(t, kd, endpoints)
}

// Verifies that a single record with host "a" is returned for query "q".
Expand Down Expand Up @@ -721,6 +845,24 @@ func assertDNSForHeadlessService(t *testing.T, kd *KubeDNS, e *v1.Endpoints) {
}
}

func assertReverseDNSForNamedHeadlessService(t *testing.T, kd *KubeDNS, e *v1.Endpoints) {
for _, subset := range e.Subsets {
for _, endpointAddress := range subset.Addresses {
record := kd.reverseRecordMap[endpointAddress.IP]
t.Logf("got reverse host name %s", record.Host)
assert.Equal(t, record.Host, getPodsFQDN(kd, e, endpointAddress.Hostname))
}
}
}

func assertNoReverseDNSForHeadlessService(t *testing.T, kd *KubeDNS, e *v1.Endpoints) {
for _, subset := range e.Subsets {
for _, endpointAddress := range subset.Addresses {
assert.Nil(t, kd.reverseRecordMap[endpointAddress.IP])
}
}
}

func assertDNSForExternalService(t *testing.T, kd *KubeDNS, s *v1.Service) {
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.NoError(t, err)
Expand Down Expand Up @@ -834,6 +976,10 @@ func getEndpointsFQDN(kd *KubeDNS, e *v1.Endpoints) string {
return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain)
}

func getPodsFQDN(kd *KubeDNS, e *v1.Endpoints, podHostName string) string {
return fmt.Sprintf("%s.%s.%s.svc.%s", podHostName, e.Name, e.Namespace, kd.domain)
}

func getSRVFQDN(kd *KubeDNS, s *v1.Service, portName string) string {
return fmt.Sprintf("_%s._tcp.%s.%s.svc.%s", portName, s.Name, s.Namespace, kd.domain)
}

0 comments on commit 3700f98

Please sign in to comment.