Skip to content

Commit

Permalink
send rs and deployments to backend
Browse files Browse the repository at this point in the history
  • Loading branch information
kenanfarukcakir committed Jul 13, 2023
1 parent 14fb07a commit c10d4da
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 259 deletions.
4 changes: 4 additions & 0 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (a *Aggregator) processk8s() {
a.processPod(d)
case k8s.SERVICE:
a.processSvc(d)
case k8s.REPLICASET:
a.processReplicaSet(d)
case k8s.DEPLOYMENT:
a.processDeployment(d)
default:
log.Logger.Warn().Msgf("unknown resource type %s", d.ResourceType)
}
Expand Down
104 changes: 67 additions & 37 deletions aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"alaz/k8s"
"alaz/log"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

Expand All @@ -14,26 +15,10 @@ const (
DELETE = "DELETE"
)

func (a *Aggregator) persistPod(dtoPod datastore.Pod, eventType string) {
var callName string
var err error
switch eventType {
case ADD:
callName = "CreatePod"
err = a.ds.CreatePod(dtoPod)
case UPDATE:
callName = "UpdatePod"
err = a.ds.UpdatePod(dtoPod)
case DELETE:
callName = "DeletePod"
err = a.ds.DeletePod(dtoPod)
default:
log.Logger.Error().Msg("unknown event type")
return
}

func (a *Aggregator) persistPod(dto datastore.Pod, eventType string) {
err := a.ds.PersistPod(dto, eventType)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on %s call to %s", callName, a.dsDestination)
log.Logger.Error().Err(err).Msgf("error on PersistPod call to %s", eventType)
}
}

Expand Down Expand Up @@ -79,25 +64,9 @@ func (a *Aggregator) processPod(d k8s.K8sResourceMessage) {
}

func (a *Aggregator) persistSvc(dto datastore.Service, eventType string) {
var callName string
var err error
switch eventType {
case ADD:
callName = "CreateService"
err = a.ds.CreateService(dto)
case UPDATE:
callName = "UpdateService"
err = a.ds.UpdateService(dto)
case DELETE:
callName = "DeleteService"
err = a.ds.DeleteService(dto)
default:
log.Logger.Error().Msg("unknown event type")
return
}

err := a.ds.PersistService(dto, eventType)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on %s call to %s", callName, a.dsDestination)
log.Logger.Error().Err(err).Msgf("error on PersistService call to %s", eventType)
}
}

Expand Down Expand Up @@ -146,3 +115,64 @@ func (a *Aggregator) processSvc(d k8s.K8sResourceMessage) {
go a.persistSvc(dtoSvc, DELETE)
}
}

func (a *Aggregator) persistReplicaSet(dto datastore.ReplicaSet, eventType string) {
err := a.ds.PersistReplicaSet(dto, eventType)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on persistReplicaset call to %s", eventType)
}
}

func (a *Aggregator) processReplicaSet(d k8s.K8sResourceMessage) {
replicaSet := d.Object.(*appsv1.ReplicaSet)

var ownerType, ownerID, ownerName string
if len(replicaSet.OwnerReferences) > 0 {
ownerType = replicaSet.OwnerReferences[0].Kind
ownerID = string(replicaSet.OwnerReferences[0].UID)
ownerName = replicaSet.OwnerReferences[0].Name
} else {
log.Logger.Debug().Msgf("ReplicaSet %s/%s has no owner, event: %s", replicaSet.Namespace, replicaSet.Name, d.EventType)
}

dtoReplicaSet := datastore.ReplicaSet{
UID: string(replicaSet.UID),
Name: ownerName,
Namespace: replicaSet.Namespace,
OwnerType: ownerType,
OwnerID: ownerID,
OwnerName: ownerName,
Replicas: replicaSet.Status.Replicas,
}

switch d.EventType {
case k8s.ADD:
go a.persistReplicaSet(dtoReplicaSet, ADD)
case k8s.UPDATE:
go a.persistReplicaSet(dtoReplicaSet, UPDATE)
case k8s.DELETE:
go a.persistReplicaSet(dtoReplicaSet, DELETE)
}

}

func (a *Aggregator) processDeployment(d k8s.K8sResourceMessage) {
deployment := d.Object.(*appsv1.Deployment)

dto := datastore.Deployment{
UID: string(deployment.UID),
Name: deployment.Name,
Namespace: deployment.Namespace,
Replicas: deployment.Status.Replicas,
}

switch d.EventType {
case k8s.ADD:
go a.ds.PersistDeployment(dto, ADD)
case k8s.UPDATE:
go a.ds.PersistDeployment(dto, UPDATE)
case k8s.DELETE:
go a.ds.PersistDeployment(dto, DELETE)
}

}
154 changes: 90 additions & 64 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,48 @@ type SvcPayload struct {
} `json:"ports"`
}

type ReplicaSetPayload struct {
Metadata struct {
MonitoringID string `json:"monitoring_id"`
IdempotencyKey string `json:"idempotency_key"`
} `json:"metadata"`
UID string `json:"uid"`
EventType string `json:"event_type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Replicas int32 `json:"replicas"`
OwnerType string `json:"owner_type"`
OwnerName string `json:"owner_name"`
OwnerID string `json:"owner_id"`
}

type DeploymentPayload struct {
Metadata struct {
MonitoringID string `json:"monitoring_id"`
IdempotencyKey string `json:"idempotency_key"`
} `json:"metadata"`
UID string `json:"uid"`
EventType string `json:"event_type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Replicas int32 `json:"replicas"`
}

// BackendDS is a backend datastore
type BackendDS struct {
host string
port string
token string
c *http.Client

reqChan chan interface{}
}

const (
podEndpoint = "/alaz/k8s/pod"
svcEndpoint = "/alaz/k8s/svc"
podEndpoint = "/alaz/k8s/pod/"
svcEndpoint = "/alaz/k8s/svc/"
rsEndpoint = "/alaz/k8s/replicaset/"
depEndpoint = "/alaz/k8s/deployment/"
)

func NewBackendDS(conf config.BackendConfig) *BackendDS {
Expand All @@ -77,6 +108,9 @@ func NewBackendDS(conf config.BackendConfig) *BackendDS {
MaxConnsPerHost: 100, // 100 connection per host
},
Timeout: 5 * time.Second, // Set a timeout for the request
// CheckRedirect: func(req *http.Request, via []*http.Request) error {
// return http.ErrUseLastResponse
// },
}

return &BackendDS{
Expand All @@ -98,11 +132,14 @@ func (b *BackendDS) DoRequest(req *http.Request) error {
if err != nil {
return fmt.Errorf("error sending http request: %v", err)
}
defer resp.Body.Close()
_, _ = io.Copy(io.Discard, resp.Body) // in order to reuse the connection
defer func() {
_, _ = io.Copy(io.Discard, resp.Body) // in order to reuse the connection
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("not success: %d", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("not success: %d, %s", resp.StatusCode, string(body))
}

return nil
Expand All @@ -128,57 +165,15 @@ func convertPodToPayload(pod Pod, eventType string) PodPayload {
}
}

func (b *BackendDS) CreatePod(pod Pod) error {
podPayload := convertPodToPayload(pod, "ADD")
func (b *BackendDS) PersistPod(pod Pod, eventType string) error {
podPayload := convertPodToPayload(pod, eventType)

c, err := json.Marshal(podPayload)
if err != nil {
return fmt.Errorf("error marshalling pod payload: %v", err)
}

httpReq, err := http.NewRequest("POST", b.host+":"+b.port+podEndpoint, bytes.NewBuffer(c))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}

err = b.DoRequest(httpReq)
if err != nil {
return fmt.Errorf("error on persisting to backend: %v", err)
}

return nil
}

func (b *BackendDS) UpdatePod(pod Pod) error {
podPayload := convertPodToPayload(pod, "UPDATE")

c, err := json.Marshal(podPayload)
if err != nil {
return fmt.Errorf("error marshalling pod payload: %v", err)
}

httpReq, err := http.NewRequest("POST", b.host+":"+b.port+podEndpoint, bytes.NewBuffer(c))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}

err = b.DoRequest(httpReq)
if err != nil {
return fmt.Errorf("error on persisting to backend: %v", err)
}

return nil
}

func (b *BackendDS) DeletePod(pod Pod) error {
podPayload := convertPodToPayload(pod, "DELETE")

c, err := json.Marshal(podPayload)
if err != nil {
return fmt.Errorf("error marshalling pod payload: %v", err)
}

httpReq, err := http.NewRequest("POST", b.host+":"+b.port+podEndpoint, bytes.NewBuffer(c))
httpReq, err := http.NewRequest(http.MethodPost, b.host+":"+b.port+podEndpoint, bytes.NewBuffer(c))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}
Expand Down Expand Up @@ -210,15 +205,15 @@ func convertSvcToPayload(service Service, eventType string) SvcPayload {
}
}

func (b *BackendDS) CreateService(service Service) error {
svcPayload := convertSvcToPayload(service, "ADD")
func (b *BackendDS) PersistService(service Service, eventType string) error {
svcPayload := convertSvcToPayload(service, eventType)

c, err := json.Marshal(svcPayload)
if err != nil {
return fmt.Errorf("error marshalling pod payload: %v", err)
return fmt.Errorf("error marshalling svc payload: %v", err)
}

httpReq, err := http.NewRequest("POST", b.host+":"+b.port+svcEndpoint, bytes.NewBuffer(c))
httpReq, err := http.NewRequest(http.MethodPost, b.host+":"+b.port+svcEndpoint, bytes.NewBuffer(c))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}
Expand All @@ -231,15 +226,32 @@ func (b *BackendDS) CreateService(service Service) error {
return nil
}

func (b *BackendDS) UpdateService(service Service) error {
svcPayload := convertSvcToPayload(service, "UPDATE")
func convertReplicasetToPayload(rs ReplicaSet, eventType string) ReplicaSetPayload {
return ReplicaSetPayload{
Metadata: struct {
MonitoringID string `json:"monitoring_id"`
IdempotencyKey string `json:"idempotency_key"`
}{MonitoringID: MonitoringID, IdempotencyKey: string(uuid.NewUUID())},
UID: rs.UID,
EventType: eventType,
Name: rs.Name,
Namespace: rs.Namespace,
Replicas: rs.Replicas,
OwnerType: rs.OwnerType,
OwnerName: rs.OwnerName,
OwnerID: rs.OwnerID,
}
}

func (b *BackendDS) PersistReplicaSet(rs ReplicaSet, eventType string) error {
rsPayload := convertReplicasetToPayload(rs, eventType)

c, err := json.Marshal(svcPayload)
c, err := json.Marshal(rsPayload)
if err != nil {
return fmt.Errorf("error marshalling pod payload: %v", err)
return fmt.Errorf("error marshalling rs payload: %v", err)
}

httpReq, err := http.NewRequest("POST", b.host+":"+b.port+svcEndpoint, bytes.NewBuffer(c))
httpReq, err := http.NewRequest(http.MethodPost, b.host+":"+b.port+rsEndpoint, bytes.NewBuffer(c))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}
Expand All @@ -252,15 +264,29 @@ func (b *BackendDS) UpdateService(service Service) error {
return nil
}

func (b *BackendDS) DeleteService(service Service) error {
svcPayload := convertSvcToPayload(service, "DELETE")
func convertDeploymentToPayload(d Deployment, eventType string) DeploymentPayload {
return DeploymentPayload{
Metadata: struct {
MonitoringID string `json:"monitoring_id"`
IdempotencyKey string `json:"idempotency_key"`
}{MonitoringID: MonitoringID, IdempotencyKey: string(uuid.NewUUID())},
UID: d.UID,
EventType: eventType,
Name: d.Name,
Namespace: d.Namespace,
Replicas: d.Replicas,
}
}

c, err := json.Marshal(svcPayload)
func (b *BackendDS) PersistDeployment(d Deployment, eventType string) error {
dPayload := convertDeploymentToPayload(d, eventType)

c, err := json.Marshal(dPayload)
if err != nil {
return fmt.Errorf("error marshalling pod payload: %v", err)
return fmt.Errorf("error marshalling deployment payload: %v", err)
}

httpReq, err := http.NewRequest("POST", b.host+":"+b.port+svcEndpoint, bytes.NewBuffer(c))
httpReq, err := http.NewRequest(http.MethodPost, b.host+":"+b.port+depEndpoint, bytes.NewBuffer(c))
if err != nil {
return fmt.Errorf("error creating http request: %v", err)
}
Expand Down
11 changes: 5 additions & 6 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package datastore

type DataStore interface {
CreatePod(pod Pod) error
UpdatePod(pod Pod) error
DeletePod(pod Pod) error
CreateService(service Service) error
UpdateService(service Service) error
DeleteService(service Service) error
PersistPod(pod Pod, eventType string) error
PersistService(service Service, eventType string) error
PersistReplicaSet(rs ReplicaSet, eventType string) error
PersistDeployment(d Deployment, eventType string) error

PersistRequest(request Request) error
}
Loading

0 comments on commit c10d4da

Please sign in to comment.