Skip to content

Commit

Permalink
[Enhancement] add appProtocol in fe service to support for running st…
Browse files Browse the repository at this point in the history
…arrocks on Istio service mesh (#288)

Signed-off-by: yandongxiao <[email protected]>
  • Loading branch information
yandongxiao authored Oct 24, 2023
1 parent b0b9da7 commit 66e2f2c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 20 deletions.
52 changes: 38 additions & 14 deletions pkg/common/resource_utils/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ const (
FeProxyService StarRocksServiceType = "fe-proxy"
)

const (
FeHTTPPortName = "http"
FeRPCPortName = "rpc"
FeQueryPortName = "query"
FeEditLogPortName = "edit-log"

BePortName = "be"
BeWebserverPortName = "webserver"
BeHeartbeatPortName = "heartbeat"
BeBrpcPortName = "brpc"

CnThriftPortName = "thrift"
CnWebserverPortName = "webserver"
CnHeartbeatPortName = "heartbeat"
CnBrpcPortName = "brpc"
)

// HashService service hash components
type hashService struct {
name string
Expand Down Expand Up @@ -108,13 +125,20 @@ func BuildExternalService(src *srapi.StarRocksCluster, name string, serviceType

var ports []corev1.ServicePort
for _, sp := range srPorts {
ports = append(ports, corev1.ServicePort{
servicePort := corev1.ServicePort{
Name: sp.Name,
Port: sp.Port,
NodePort: sp.NodePort,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(int(sp.ContainerPort)),
})
}
if servicePort.Name == FeQueryPortName {
servicePort.AppProtocol = func() *string {
protocol := "mysql"
return &protocol
}()
}
ports = append(ports, servicePort)
}
// set Ports field before calculate resource hash
svc.Spec.Ports = ports
Expand All @@ -131,13 +155,13 @@ func getFeServicePorts(config map[string]interface{}, service *srapi.StarRocksSe
queryPort := GetPort(config, QUERY_PORT)
editPort := GetPort(config, EDIT_LOG_PORT)
srPorts = append(srPorts, mergePort(service, srapi.StarRocksServicePort{
Port: httpPort, ContainerPort: httpPort, Name: "http",
Port: httpPort, ContainerPort: httpPort, Name: FeHTTPPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: rpcPort, ContainerPort: rpcPort, Name: "rpc",
Port: rpcPort, ContainerPort: rpcPort, Name: FeRPCPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: queryPort, ContainerPort: queryPort, Name: "query",
Port: queryPort, ContainerPort: queryPort, Name: FeQueryPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: editPort, ContainerPort: editPort, Name: "edit-log",
Port: editPort, ContainerPort: editPort, Name: FeEditLogPortName,
}))

return srPorts
Expand All @@ -150,13 +174,13 @@ func getBeServicePorts(config map[string]interface{}, service *srapi.StarRocksSe
brpcPort := GetPort(config, BRPC_PORT)

srPorts = append(srPorts, mergePort(service, srapi.StarRocksServicePort{
Port: bePort, ContainerPort: bePort, Name: "be",
Port: bePort, ContainerPort: bePort, Name: BePortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: webserverPort, ContainerPort: webserverPort, Name: "webserver",
Port: webserverPort, ContainerPort: webserverPort, Name: BeWebserverPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: heartPort, ContainerPort: heartPort, Name: "heartbeat",
Port: heartPort, ContainerPort: heartPort, Name: BeHeartbeatPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: brpcPort, ContainerPort: brpcPort, Name: "brpc",
Port: brpcPort, ContainerPort: brpcPort, Name: BeBrpcPortName,
}))

return srPorts
Expand All @@ -168,13 +192,13 @@ func getCnServicePorts(config map[string]interface{}, service *srapi.StarRocksSe
heartPort := GetPort(config, HEARTBEAT_SERVICE_PORT)
brpcPort := GetPort(config, BRPC_PORT)
srPorts = append(srPorts, mergePort(service, srapi.StarRocksServicePort{
Port: thriftPort, ContainerPort: thriftPort, Name: "thrift",
Port: thriftPort, ContainerPort: thriftPort, Name: CnThriftPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: webserverPort, ContainerPort: webserverPort, Name: "webserver",
Port: webserverPort, ContainerPort: webserverPort, Name: CnWebserverPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: heartPort, ContainerPort: heartPort, Name: "heartbeat",
Port: heartPort, ContainerPort: heartPort, Name: CnHeartbeatPortName,
}), mergePort(service, srapi.StarRocksServicePort{
Port: brpcPort, ContainerPort: brpcPort, Name: "brpc",
Port: brpcPort, ContainerPort: brpcPort, Name: CnBrpcPortName,
}))

return srPorts
Expand Down
10 changes: 7 additions & 3 deletions pkg/common/resource_utils/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestBuildExternalService(t *testing.T) {
Name: "service-name",
Namespace: "default",
Annotations: map[string]string{
srapi.ComponentResourceHash: "2728763357",
srapi.ComponentResourceHash: "1503664666",
},
OwnerReferences: func() []metav1.OwnerReference {
ref := metav1.NewControllerRef(src, src.GroupVersionKind())
Expand All @@ -124,13 +124,17 @@ func TestBuildExternalService(t *testing.T) {
srPorts := getFeServicePorts(map[string]interface{}{}, nil)
var ports []corev1.ServicePort
for _, sp := range srPorts {
ports = append(ports, corev1.ServicePort{
servicePort := corev1.ServicePort{
Name: sp.Name,
Port: sp.Port,
NodePort: sp.NodePort,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(int(sp.ContainerPort)),
})
}
if servicePort.Name == FeQueryPortName {
servicePort.AppProtocol = func() *string { v := "mysql"; return &v }()
}
ports = append(ports, servicePort)
}
return ports
}(),
Expand Down
7 changes: 4 additions & 3 deletions pkg/sub_controller/fe/fe_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ func (fc *FeController) Sync(ctx context.Context, src *srapi.StarRocksCluster) e
searchServiceName := service.SearchServiceName(src.Name, feSpec)
internalService := service.MakeSearchService(searchServiceName, &svc, []corev1.ServicePort{
{
Name: "query-port",
Port: rutils.GetPort(config, rutils.QUERY_PORT),
TargetPort: intstr.FromInt(int(rutils.GetPort(config, rutils.QUERY_PORT))),
Name: "query-port",
Port: rutils.GetPort(config, rutils.QUERY_PORT),
TargetPort: intstr.FromInt(int(rutils.GetPort(config, rutils.QUERY_PORT))),
AppProtocol: func() *string { mysql := "mysql"; return &mysql }(),
},
})

Expand Down

0 comments on commit 66e2f2c

Please sign in to comment.