KafkaApis
is created when KafkaServer
is requested to start (for KafkaRequestHandlerPool).
When created, KafkaApis
is given broker services (from the owning KafkaServer) that are used to handle requests (from producers, consumers, brokers, administrative clients, etc.)
API Key | Handler |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cluster action / handleLeaderAndIsrRequest |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cluster action / handleStopReplicaRequest |
|
|
|
|
Cluster action / handleUpdateMetadataRequest |
|
|
adminZkClient: AdminZkClient
When created, KafkaApis
creates a AdminZkClient that is used to create a topic for the following handlers:
-
handleFindCoordinatorRequest (for
consumer_offsets
andtransaction_state
internal topics) -
handleTopicMetadataRequest (for
consumer_offsets
andtransaction_state
internal topics or any other topic with auto-creation enabled)
handleElectReplicaLeader(
request: RequestChannel.Request): Unit
Caution
|
FIXME Describe me again |
In summary, handleElectPreferredReplicaLeader
requests the ReplicaManager to electPreferredLeaders.
Internally, handleElectPreferredReplicaLeader
…FIXME
Note
|
handleElectPreferredReplicaLeader is used exclusively when KafkaApis is requested to handle a ElectPreferredLeaders request.
|
handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit
In summary, handleAlterReplicaLogDirsRequest
requests the ReplicaManager to alterReplicaLogDirs.
handleAlterReplicaLogDirsRequest
…FIXME
Note
|
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.
|
handleCreateTopicsRequest(request: RequestChannel.Request): Unit
handleCreateTopicsRequest
…FIXME
handleCreateTopicsRequest
checks whether KafkaController is active…FIXME
handleCreateTopicsRequest
authorizes the Create
operation for ClusterResource
…FIXME
In the end, handleCreateTopicsRequest
requests AdminManager to create the topics.
Note
|
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.
|
handleOffsetFetchRequest(request: RequestChannel.Request): Unit
handleOffsetFetchRequest
…FIXME
Note
|
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.
|
handleTopicMetadataRequest(
request: RequestChannel.Request): Unit
handleTopicMetadataRequest
takes the MetadataRequest from (the body of) the RequestChannel.Request.
handleTopicMetadataRequest
requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest
).
handleTopicMetadataRequest
filters out the topics for which the current principal (user) is not authorized to execute Describe
operation.
For every authorized topic, handleTopicMetadataRequest
…FIXME
handleTopicMetadataRequest
creates a MetadataResponse.TopicMetadata
with TOPIC_AUTHORIZATION_FAILED
for every unauthorizedForCreateTopics
and unauthorizedForDescribeTopics
.
handleTopicMetadataRequest
getTopicMetadata if there are authorizedTopics
.
handleTopicMetadataRequest
prints out the following TRACE message to the logs:
Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]
In the end, handleTopicMetadataRequest
sendResponseMaybeThrottle with a new MetadataResponse.
Note
|
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.
|
authorize(
request: RequestChannel.Request,
operation: AclOperation,
resourceType: ResourceType,
resourceName: String,
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true,
refCount: Int = 1): Boolean
authorize
simply requests the Authorizer (when defined) to authorize the given AclOperation
on a broker resource (described by the ResourceType
and resourceName
).
authorize
is positive (true
) when the Authorizer
returned ALLOWED
.
Note
|
The Authorizer is created in KafkaServer (when the KafkaApis is created). It is configured using authorizer.class.name configuration property which is empty by default and so all operations are authorized.
|
Request | AclOperation | Resource Type | Resource Name |
---|---|---|---|
|
|
groupId |
|
|
|
transactionalId |
|
|
|
kafka-cluster |
|
Fetch (from followers) |
|
|
kafka-cluster |
Metadata (for auto-create topics) |
|
|
kafka-cluster |
|
|
kafka-cluster |
|
|
|
kafka-cluster |
|
|
|
Coordinator key |
|
|
|
Coordinator key |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
|
|
Transactional ID |
|
|
|
kafka-cluster |
|
|
|
Transactional ID |
|
|
|
Transactional ID |
|
|
|
Transactional ID |
|
|
|
Group ID |
|
|
|
Transactional ID |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
AlterConfigs (for brokers) |
|
|
kafka-cluster |
AlterConfigs (for topics) |
|
|
Topic name |
IncrementalAlterConfigs (for brokers) |
|
|
kafka-cluster |
IncrementalAlterConfigs (for topics) |
|
|
Topic name |
DescribeConfigs (for brokers) |
|
|
kafka-cluster |
DescribeConfigs (for topics) |
|
|
Topic name |
|
|
kafka-cluster |
|
|
|
kafka-cluster |
|
|
|
Token ID |
|
|
|
kafka-cluster |
|
|
|
Group ID |
handleDescribeConfigsRequest(
request: RequestChannel.Request): Unit
handleDescribeConfigsRequest
takes the DescribeConfigsRequest from (the body of) the given RequestChannel.Request.
handleDescribeConfigsRequest
authorizes the DescribeConfigs operation on the broker and topic resources (of the DescribeConfigsRequest).
For every authorized operation, handleDescribeConfigsRequest
requests the AdminManager to describeConfigs.
In the end, handleDescribeConfigsRequest
sendResponseMaybeThrottle with a new DescribeConfigsResponse
.
Note
|
handleDescribeConfigsRequest is used exclusively when KafkaApis is requested to handle a DescribeConfigs request.
|
handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit
In summary, handleDescribeLogDirsRequest
requests the ReplicaManager to describeLogDirs.
Internally, handleDescribeLogDirsRequest
takes the DescribeLogDirsRequest from the body (of the RequestChannel.Request).
handleDescribeLogDirsRequest
branches off per whether the DescribeLogDirsRequest
was for isAllTopicPartitions or not.
-
For all TopicPartitions,
handleDescribeLogDirsRequest
requests the ReplicaManager for the LogManager that is requested for all the partition logs and their TopicPartitions. -
For specific
TopicPartitions
,handleDescribeLogDirsRequest
requests them from the DescribeLogDirsRequest.
Note
|
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.
|
handleDescribeLogDirsRequest
then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions
.
In the end, handleDescribeLogDirsRequest
sendResponseMaybeThrottle with a DescribeLogDirsResponse
and the LogDirInfos
.
Note
|
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.
|
handleStopReplicaRequest(request: RequestChannel.Request): Unit
In summary, handleStopReplicaRequest
requests the ReplicaManager to stopReplicas.
handleStopReplicaRequest
…FIXME
Note
|
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.
|
handleUpdateMetadataRequest(
request: RequestChannel.Request): Unit
handleUpdateMetadataRequest
takes the UpdateMetadataRequest from (the body of) the RequestChannel.Request.
When authorized for cluster action and not isBrokerEpochStale, handleUpdateMetadataRequest
requests the following:
-
ReplicaManager to maybeUpdateMetadataCache (that gives deleted partitions)
-
AdminManager to tryCompleteDelayedTopicOperations for all the topics (based on the partitionStates of the
UpdateMetadataRequest
)
handleUpdateMetadataRequest
updates quotas…FIXME
handleUpdateMetadataRequest
requests the ReplicaManager to tryCompleteElection for every partition (based on the partitionStates of the UpdateMetadataRequest
).
In the end, handleUpdateMetadataRequest
sendResponseExemptThrottle (with a no-error UpdateMetadataResponse
).
Note
|
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.
|
handleOffsetCommitRequest(request: RequestChannel.Request): Unit
handleOffsetCommitRequest
takes the OffsetCommitRequest from (the body of) the RequestChannel.Request.
If authorized, handleOffsetCommitRequest
simply requests the GroupCoordinator to handleCommitOffsets (with the sendResponseCallback).
Note
|
If authorized, handleOffsetCommitRequest branches off per API version (i.e. 0 to store offsets in Zookeeper and 1 and beyond). The API version 0 is not described here.
|
If not authorized, handleOffsetCommitRequest
…FIXME
Note
|
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle an OffsetCommit request.
|
sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]): Unit
sendResponseCallback
prints out the following DEBUG message to the logs for offsets with errors (i.e. unauthorized topics to read or non-existing topics):
Offset commit request with correlation id [correlationId] from client [clientId] on partition [topicPartition] failed due to [exceptionName]
In the end, sendResponseCallback
sendResponseMaybeThrottle a new OffsetCommitResponse
.
createInternalTopic(
topic: String): MetadataResponse.TopicMetadata
createInternalTopic
…FIXME
Note
|
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata (for metadata of consumer_offsets and
transaction_state internal topics).
|
getOrCreateInternalTopic(
topic: String,
listenerName: ListenerName): MetadataResponse.TopicMetadata
getOrCreateInternalTopic
requests the MetadataCache for getTopicMetadata for the input topic
(and the ListenerName
).
In the end, getOrCreateInternalTopic
returns the TopicMetadata
if available or createInternalTopic.
Note
|
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinator request (and requests the metadata of consumer_offsets and
transaction_state internal topics).
|
handleFindCoordinatorRequest(request: RequestChannel.Request): Unit
handleFindCoordinatorRequest
takes the FindCoordinatorRequest from the body (of the RequestChannel.Request).
handleFindCoordinatorRequest
checks permissions…FIXME
For an authorized request, handleFindCoordinatorRequest
branches off per CoordinatorType, i.e. GROUP or TRANSACTION.
For GROUP
coordinator type, handleFindCoordinatorRequest
does the following:
-
Requests the GroupCoordinator for partitionFor the coordinator key (of the
FindCoordinatorRequest
) -
getOrCreateInternalTopic for __consumer_offsets topic
For TRANSACTION
coordinator type, handleFindCoordinatorRequest
does the following:
-
Requests the TransactionCoordinator for partitionFor (for the
coordinatorKey
of theFindCoordinatorRequest
) -
getOrCreateInternalTopic for __transaction_state topic
In the end, handleFindCoordinatorRequest
sendResponseMaybeThrottle with a new FindCoordinatorResponse.
You should see the following TRACE message in the logs:
Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
Note
|
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.
|
handleJoinGroupRequest(request: RequestChannel.Request): Unit
handleJoinGroupRequest
takes the JoinGroupRequest from the body (of the RequestChannel.Request) and simply requests the GroupCoordinator to handleJoinGroup (with sendResponseCallback to handle the response).
Note
|
handleJoinGroupRequest is used exclusively when KafkaApis is requested to handle a JoinGroup request.
|
sendResponseCallback(joinResult: JoinGroupResult): Unit
sendResponseCallback
creates a new JoinGroupResponse for the given JoinGroupResult
and prints out the following TRACE message to the logs:
Sending join group response [responseBody] for correlation id [correlationId] to client [clientId].
In the end, sendResponseCallback
sendResponseMaybeThrottle with the new JoinGroupResponse.
handleProduceRequest(
request: RequestChannel.Request): Unit
In summary, handleProduceRequest
takes the ProduceRequest from the body (of the RequestChannel.Request) and requests the ReplicaManager to appendRecords (with isFromClient
flag enabled).
Note
|
internalTopicsAllowed flag (when the ReplicaManager is requested to appendRecords) is enabled (true ) only when the client ID is __admin_client.
|
handleProduceRequest
…FIXME
handleProduceRequest
…FIXME
Note
|
handleProduceRequest is used when KafkaApis is requested to handle a Produce request.
|
handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit
In summary, handleWriteTxnMarkersRequest
requests the ReplicaManager to getMagic followed by appendRecords (with isFromClient
flag disabled).
handleWriteTxnMarkersRequest
…FIXME
Note
|
handleWriteTxnMarkersRequest is used exclusively when KafkaApis is requested to handle a WriteTxnMarkers request.
|
handleDeleteRecordsRequest(request: RequestChannel.Request): Unit
In summary, handleDeleteRecordsRequest
requests the ReplicaManager to deleteRecords.
handleDeleteRecordsRequest
…FIXME
Note
|
handleDeleteRecordsRequest is used exclusively when KafkaApis is requested to handle a DeleteRecords request.
|
handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit
In summary, handleOffsetForLeaderEpochRequest
requests the ReplicaManager to lastOffsetForLeaderEpoch.
handleOffsetForLeaderEpochRequest
…FIXME
Note
|
handleOffsetForLeaderEpochRequest is used exclusively when KafkaApis is requested to handle a OffsetForLeaderEpoch request.
|
handleListOffsetRequest(request: RequestChannel.Request): Unit
In summary, handleListOffsetRequest
requests the ReplicaManager to fetchOffsetForTimestamp.
handleListOffsetRequest
…FIXME
Note
|
handleListOffsetRequest is used exclusively when KafkaApis is requested to handle a ListOffsets request.
|
isAuthorizedClusterAction(request: RequestChannel.Request): Boolean
isAuthorizedClusterAction
simply authorize with ClusterAction
operation and ClusterResource
resource.
Note
|
isAuthorizedClusterAction is used when…FIXME
|
authorizeClusterAction(request: RequestChannel.Request): Unit
authorizeClusterAction
simply asserts that the RequestChannel.Request is authorized to execute ClusterAction
on a ClusterResource
. If so, authorizeClusterAction
does nothing and returns.
If not authorized, authorizeClusterAction
throws a ClusterAuthorizationException
:
Request [request] is not authorized.
Note
|
authorizeClusterAction is used when KafkaApis is requested to handleLeaderAndIsrRequest, handleStopReplicaRequest, handleUpdateMetadataRequest, handleControlledShutdownRequest, and handleWriteTxnMarkersRequest.
|
authorizedOperations(
request: RequestChannel.Request,
resource: Resource): Int
authorizedOperations
…FIXME
Request | Resource Type | Resource Name |
---|---|---|
|
kafka-cluster |
|
|
Topic name |
|
|
Group ID |
Throwing ClusterAuthorizationException for Unauthorized Cluster Operation — authorizeClusterOperation
Internal Method
authorizeClusterOperation(
request: RequestChannel.Request,
operation: AclOperation): Unit
authorizeClusterOperation
simply throws a ClusterAuthorizationException
when the given request is not authorized for the given operation on CLUSTER
resource and the name as kafka-cluster
.
Request [request] is not authorized.
Request | AclOperation |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|