Skip to content

Latest commit

 

History

History
867 lines (590 loc) · 32.9 KB

kafka-server-KafkaApis.adoc

File metadata and controls

867 lines (590 loc) · 32.9 KB

KafkaApis — API Request Handler

KafkaApis is created when KafkaServer is requested to start (for KafkaRequestHandlerPool).

When created, KafkaApis is given broker services (from the owning KafkaServer) that are used to handle requests (from producers, consumers, brokers, administrative clients, etc.)

Table 1. KafkaApis’s API Keys and Handlers
API Key Handler

AddOffsetsToTxn

handleAddOffsetsToTxnRequest

AddPartitionsToTxn

handleAddPartitionToTxnRequest

AlterConfigs

handleAlterConfigsRequest

AlterPartitionReassignments

handleAlterPartitionReassignmentsRequest

AlterReplicaLogDirs

handleAlterReplicaLogDirsRequest

ApiVersions

handleApiVersionsRequest

ControlledShutdown

handleControlledShutdownRequest

CreateAcls

handleCreateAcls

CreateDelegationToken

handleCreateTokenRequest

CreatePartitions

handleCreatePartitionsRequest

CreateTopics

handleCreateTopicsRequest

DeleteAcls

handleDeleteAcls

DeleteGroups

handleDeleteGroupsRequest

DeleteRecords

handleDeleteRecordsRequest

DeleteTopics

handleDeleteTopicsRequest

DescribeConfigs

handleDescribeConfigsRequest

DescribeGroups

handleDescribeGroupRequest

DescribeLogDirs

handleDescribeLogDirsRequest

DescribeAcls

handleDescribeAcls

DescribeDelegationToken

handleDescribeTokensRequest

ElectLeaders

handleElectReplicaLeader

EndTxn

handleEndTxnRequest

Fetch

handleFetchRequest

ExpireDelegationToken

handleExpireTokenRequest

FindCoordinator

handleFindCoordinatorRequest

Heartbeat

handleHeartbeatRequest

InitProducerId

handleInitProducerIdRequest

IncrementalAlterConfigs

handleIncrementalAlterConfigsRequest

JoinGroup

handleJoinGroupRequest

LeaderAndIsr

Cluster action / handleLeaderAndIsrRequest

LeaveGroup

handleLeaveGroupRequest

ListGroups

handleListGroupsRequest

ListOffsets

handleListOffsetRequest

ListPartitionReassignments

handleListPartitionReassignmentsRequest

Metadata

handleTopicMetadataRequest

OffsetCommit

handleOffsetCommitRequest

OffsetDelete

handleOffsetDeleteRequest

OffsetFetch

handleOffsetFetchRequest

OffsetForLeaderEpoch

handleOffsetForLeaderEpochRequest

Produce

handleProduceRequest

RenewDelegationToken

handleRenewTokenRequest

SaslAuthenticate

handleSaslAuthenticateRequest

SaslHandshake

handleSaslHandshakeRequest

StopReplica

Cluster action / handleStopReplicaRequest

SyncGroup

handleSyncGroupRequest

TxnOffsetCommit

handleTxnOffsetCommitRequest

UpdateMetadata

Cluster action / handleUpdateMetadataRequest

WriteTxnMarkers

handleWriteTxnMarkersRequest

KafkaApis and AdminZkClient

adminZkClient: AdminZkClient

When created, KafkaApis creates a AdminZkClient that is used to create a topic for the following handlers:

Handling ElectLeaders Request — handleElectReplicaLeader Handler

handleElectReplicaLeader(
  request: RequestChannel.Request): Unit
Caution
FIXME Describe me again

In summary, handleElectPreferredReplicaLeader requests the ReplicaManager to electPreferredLeaders.

Internally, handleElectPreferredReplicaLeader…​FIXME

Note
handleElectPreferredReplicaLeader is used exclusively when KafkaApis is requested to handle a ElectPreferredLeaders request.

Handling AlterReplicaLogDirs Request — handleAlterReplicaLogDirsRequest Handler

handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit

In summary, handleAlterReplicaLogDirsRequest requests the ReplicaManager to alterReplicaLogDirs.

handleAlterReplicaLogDirsRequest…​FIXME

Note
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.

Handling CreateTopics Request — handleCreateTopicsRequest Handler

handleCreateTopicsRequest(request: RequestChannel.Request): Unit

handleCreateTopicsRequest…​FIXME

handleCreateTopicsRequest checks whether KafkaController is active…​FIXME

handleCreateTopicsRequest authorizes the Create operation for ClusterResource…​FIXME

In the end, handleCreateTopicsRequest requests AdminManager to create the topics.

Note
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.

Handling OffsetFetch Request — handleOffsetFetchRequest Handler

handleOffsetFetchRequest(request: RequestChannel.Request): Unit

handleOffsetFetchRequest…​FIXME

Note
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.

Handling Metadata Request — handleTopicMetadataRequest Handler

handleTopicMetadataRequest(
  request: RequestChannel.Request): Unit

handleTopicMetadataRequest takes the MetadataRequest from (the body of) the RequestChannel.Request.

handleTopicMetadataRequest requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest).

handleTopicMetadataRequest filters out the topics for which the current principal (user) is not authorized to execute Describe operation.

For every authorized topic, handleTopicMetadataRequest…​FIXME

handleTopicMetadataRequest creates a MetadataResponse.TopicMetadata with TOPIC_AUTHORIZATION_FAILED for every unauthorizedForCreateTopics and unauthorizedForDescribeTopics.

handleTopicMetadataRequest getTopicMetadata if there are authorizedTopics.

handleTopicMetadataRequest prints out the following TRACE message to the logs:

Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]

In the end, handleTopicMetadataRequest sendResponseMaybeThrottle with a new MetadataResponse.

Note
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.

Authorizing Request for Operation on Resource — authorize Internal Method

authorize(
  request: RequestChannel.Request,
  operation: AclOperation,
  resourceType: ResourceType,
  resourceName: String,
  logIfAllowed: Boolean = true,
  logIfDenied: Boolean = true,
  refCount: Int = 1): Boolean

authorize simply requests the Authorizer (when defined) to authorize the given AclOperation on a broker resource (described by the ResourceType and resourceName).

authorize is positive (true) when the Authorizer returned ALLOWED.

Note
The Authorizer is created in KafkaServer (when the KafkaApis is created). It is configured using authorizer.class.name configuration property which is empty by default and so all operations are authorized.
Request AclOperation Resource Type Resource Name

OffsetCommit

READ

GROUP

groupId

Produce

WRITE

TRANSACTIONAL_ID

transactionalId

Produce

IDEMPOTENT_WRITE

CLUSTER

kafka-cluster

Fetch (from followers)

CLUSTER_ACTION

CLUSTER

kafka-cluster

Metadata (for auto-create topics)

CREATE

CLUSTER

kafka-cluster

Metadata

DESCRIBE

CLUSTER

kafka-cluster

OffsetFetch

DESCRIBE

CLUSTER

kafka-cluster

FindCoordinator

DESCRIBE

GROUP

Coordinator key

FindCoordinator

DESCRIBE

TRANSACTIONAL_ID

Coordinator key

DescribeGroups

DESCRIBE

GROUP

Group ID

ListGroups

DESCRIBE

CLUSTER

kafka-cluster

ListGroups

DESCRIBE

GROUP

Group ID

JoinGroup

READ

GROUP

Group ID

SyncGroup

READ

GROUP

Group ID

DeleteGroups

DELETE

GROUP

Group ID

Heartbeat

READ

GROUP

Group ID

LeaveGroup

READ

GROUP

Group ID

CreateTopics

CREATE

CLUSTER

kafka-cluster

InitProducerId

WRITE

TRANSACTIONAL_ID

Transactional ID

InitProducerId

IDEMPOTENT_WRITE

CLUSTER

kafka-cluster

EndTxn

WRITE

TRANSACTIONAL_ID

Transactional ID

AddPartitionsToTxn

WRITE

TRANSACTIONAL_ID

Transactional ID

AddOffsetsToTxn

WRITE

TRANSACTIONAL_ID

Transactional ID

AddOffsetsToTxn

READ

GROUP

Group ID

TxnOffsetCommit

WRITE

TRANSACTIONAL_ID

Transactional ID

TxnOffsetCommit

READ

GROUP

Group ID

OffsetForLeaderEpoch

CLUSTER_ACTION

CLUSTER

kafka-cluster

AlterConfigs (for brokers)

ALTER_CONFIGS

CLUSTER

kafka-cluster

AlterConfigs (for topics)

ALTER_CONFIGS

TOPIC

Topic name

IncrementalAlterConfigs (for brokers)

ALTER_CONFIGS

CLUSTER

kafka-cluster

IncrementalAlterConfigs (for topics)

ALTER_CONFIGS

TOPIC

Topic name

DescribeConfigs (for brokers)

DESCRIBE_CONFIGS

CLUSTER

kafka-cluster

DescribeConfigs (for topics)

DESCRIBE_CONFIGS

TOPIC

Topic name

AlterReplicaLogDirs

ALTER

CLUSTER

kafka-cluster

DescribeLogDirs

DESCRIBE

CLUSTER

kafka-cluster

DescribeDelegationToken

DESCRIBE

DELEGATION_TOKEN

Token ID

ElectLeaders

ALTER

CLUSTER

kafka-cluster

OffsetDelete

DELETE

GROUP

Group ID

Handling DescribeConfigs Request — handleDescribeConfigsRequest Handler

handleDescribeConfigsRequest(
  request: RequestChannel.Request): Unit

handleDescribeConfigsRequest takes the DescribeConfigsRequest from (the body of) the given RequestChannel.Request.

For every authorized operation, handleDescribeConfigsRequest requests the AdminManager to describeConfigs.

In the end, handleDescribeConfigsRequest sendResponseMaybeThrottle with a new DescribeConfigsResponse.

Note
handleDescribeConfigsRequest is used exclusively when KafkaApis is requested to handle a DescribeConfigs request.

Handling DescribeLogDirs Request — handleDescribeLogDirsRequest Handler

handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit

In summary, handleDescribeLogDirsRequest requests the ReplicaManager to describeLogDirs.

Internally, handleDescribeLogDirsRequest takes the DescribeLogDirsRequest from the body (of the RequestChannel.Request).

handleDescribeLogDirsRequest branches off per whether the DescribeLogDirsRequest was for isAllTopicPartitions or not.

Note
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.

handleDescribeLogDirsRequest then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions.

In the end, handleDescribeLogDirsRequest sendResponseMaybeThrottle with a DescribeLogDirsResponse and the LogDirInfos.

Note
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.

Handling StopReplica Request — handleStopReplicaRequest Handler

handleStopReplicaRequest(request: RequestChannel.Request): Unit

In summary, handleStopReplicaRequest requests the ReplicaManager to stopReplicas.

handleStopReplicaRequest…​FIXME

Note
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.

Handling UpdateMetadata Request (From Kafka Controller) — handleUpdateMetadataRequest Handler

handleUpdateMetadataRequest(
  request: RequestChannel.Request): Unit

handleUpdateMetadataRequest takes the UpdateMetadataRequest from (the body of) the RequestChannel.Request.

When authorized for cluster action and not isBrokerEpochStale, handleUpdateMetadataRequest requests the following:

handleUpdateMetadataRequest updates quotas…​FIXME

handleUpdateMetadataRequest requests the ReplicaManager to tryCompleteElection for every partition (based on the partitionStates of the UpdateMetadataRequest).

In the end, handleUpdateMetadataRequest sendResponseExemptThrottle (with a no-error UpdateMetadataResponse).

Note
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.

Handling OffsetCommitRequest — handleOffsetCommitRequest Handler

handleOffsetCommitRequest(request: RequestChannel.Request): Unit

handleOffsetCommitRequest takes the OffsetCommitRequest from (the body of) the RequestChannel.Request.

If authorized, handleOffsetCommitRequest simply requests the GroupCoordinator to handleCommitOffsets (with the sendResponseCallback).

Note
If authorized, handleOffsetCommitRequest branches off per API version (i.e. 0 to store offsets in Zookeeper and 1 and beyond). The API version 0 is not described here.

If not authorized, handleOffsetCommitRequest…​FIXME

Note
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle an OffsetCommit request.

sendResponseCallback Method

sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]): Unit

sendResponseCallback prints out the following DEBUG message to the logs for offsets with errors (i.e. unauthorized topics to read or non-existing topics):

Offset commit request with correlation id [correlationId] from client [clientId] on partition [topicPartition] failed due to [exceptionName]

In the end, sendResponseCallback sendResponseMaybeThrottle a new OffsetCommitResponse.

createInternalTopic Internal Method

createInternalTopic(
  topic: String): MetadataResponse.TopicMetadata

createInternalTopic…​FIXME

Note
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata (for metadata of consumer_offsets and transaction_state internal topics).

getOrCreateInternalTopic Internal Method

getOrCreateInternalTopic(
  topic: String,
  listenerName: ListenerName): MetadataResponse.TopicMetadata

getOrCreateInternalTopic requests the MetadataCache for getTopicMetadata for the input topic (and the ListenerName).

In the end, getOrCreateInternalTopic returns the TopicMetadata if available or createInternalTopic.

Note
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinator request (and requests the metadata of consumer_offsets and transaction_state internal topics).

Handling FindCoordinatorRequest — handleFindCoordinatorRequest Handler

handleFindCoordinatorRequest(request: RequestChannel.Request): Unit

handleFindCoordinatorRequest takes the FindCoordinatorRequest from the body (of the RequestChannel.Request).

handleFindCoordinatorRequest checks permissions…​FIXME

For an authorized request, handleFindCoordinatorRequest branches off per CoordinatorType, i.e. GROUP or TRANSACTION.

For GROUP coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the GroupCoordinator for partitionFor the coordinator key (of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __consumer_offsets topic

For TRANSACTION coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the TransactionCoordinator for partitionFor (for the coordinatorKey of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __transaction_state topic

In the end, handleFindCoordinatorRequest sendResponseMaybeThrottle with a new FindCoordinatorResponse.

You should see the following TRACE message in the logs:

Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
Note
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.

Handling JoinGroupRequest — handleJoinGroupRequest Handler

handleJoinGroupRequest(request: RequestChannel.Request): Unit

handleJoinGroupRequest takes the JoinGroupRequest from the body (of the RequestChannel.Request) and simply requests the GroupCoordinator to handleJoinGroup (with sendResponseCallback to handle the response).

Note
handleJoinGroupRequest is used exclusively when KafkaApis is requested to handle a JoinGroup request.

Handling JoinGroup Response — sendResponseCallback Method

sendResponseCallback(joinResult: JoinGroupResult): Unit

sendResponseCallback creates a new JoinGroupResponse for the given JoinGroupResult and prints out the following TRACE message to the logs:

Sending join group response [responseBody] for correlation id [correlationId] to client [clientId].

In the end, sendResponseCallback sendResponseMaybeThrottle with the new JoinGroupResponse.

Handling Produce Request — handleProduceRequest Handler

handleProduceRequest(
  request: RequestChannel.Request): Unit

In summary, handleProduceRequest takes the ProduceRequest from the body (of the RequestChannel.Request) and requests the ReplicaManager to appendRecords (with isFromClient flag enabled).

Note
internalTopicsAllowed flag (when the ReplicaManager is requested to appendRecords) is enabled (true) only when the client ID is __admin_client.

handleProduceRequest…​FIXME

handleProduceRequest filterAuthorized the request for WRITE operation on a TOPIC resource to the topics (from the ProduceRequest).

handleProduceRequest…​FIXME

Note
handleProduceRequest is used when KafkaApis is requested to handle a Produce request.

handleWriteTxnMarkersRequest Handler

handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit

In summary, handleWriteTxnMarkersRequest requests the ReplicaManager to getMagic followed by appendRecords (with isFromClient flag disabled).

handleWriteTxnMarkersRequest…​FIXME

Note
handleWriteTxnMarkersRequest is used exclusively when KafkaApis is requested to handle a WriteTxnMarkers request.

handleDeleteRecordsRequest Handler

handleDeleteRecordsRequest(request: RequestChannel.Request): Unit

In summary, handleDeleteRecordsRequest requests the ReplicaManager to deleteRecords.

handleDeleteRecordsRequest…​FIXME

Note
handleDeleteRecordsRequest is used exclusively when KafkaApis is requested to handle a DeleteRecords request.

handleOffsetForLeaderEpochRequest Handler

handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit

In summary, handleOffsetForLeaderEpochRequest requests the ReplicaManager to lastOffsetForLeaderEpoch.

handleOffsetForLeaderEpochRequest…​FIXME

Note
handleOffsetForLeaderEpochRequest is used exclusively when KafkaApis is requested to handle a OffsetForLeaderEpoch request.

handleListOffsetRequest Handler

handleListOffsetRequest(request: RequestChannel.Request): Unit

In summary, handleListOffsetRequest requests the ReplicaManager to fetchOffsetForTimestamp.

handleListOffsetRequest…​FIXME

Note
handleListOffsetRequest is used exclusively when KafkaApis is requested to handle a ListOffsets request.

isAuthorizedClusterAction Internal Method

isAuthorizedClusterAction(request: RequestChannel.Request): Boolean

isAuthorizedClusterAction simply authorize with ClusterAction operation and ClusterResource resource.

Note
isAuthorizedClusterAction is used when…​FIXME

Asserting Permissions for Cluster Action — authorizeClusterAction Method

authorizeClusterAction(request: RequestChannel.Request): Unit

authorizeClusterAction simply asserts that the RequestChannel.Request is authorized to execute ClusterAction on a ClusterResource. If so, authorizeClusterAction does nothing and returns.

If not authorized, authorizeClusterAction throws a ClusterAuthorizationException:

Request [request] is not authorized.

authorizedOperations Internal Method

authorizedOperations(
  request: RequestChannel.Request,
  resource: Resource): Int

authorizedOperations…​FIXME

Request Resource Type Resource Name

Metadata

CLUSTER

kafka-cluster

Metadata

TOPIC

Topic name

DescribeGroups

GROUP

Group ID

Throwing ClusterAuthorizationException for Unauthorized Cluster Operation — authorizeClusterOperation Internal Method

authorizeClusterOperation(
  request: RequestChannel.Request,
  operation: AclOperation): Unit

authorizeClusterOperation simply throws a ClusterAuthorizationException when the given request is not authorized for the given operation on CLUSTER resource and the name as kafka-cluster.

Request [request] is not authorized.
Request AclOperation

LeaderAndIsr

CLUSTER_ACTION

StopReplica

CLUSTER_ACTION

UpdateMetadata

CLUSTER_ACTION

ControlledShutdown

CLUSTER_ACTION

WriteTxnMarkers

CLUSTER_ACTION

DescribeAcls

DESCRIBE

CreateAcls

ALTER

DeleteAcls

ALTER

AlterPartitionReassignments

ALTER

ListPartitionReassignments

DESCRIBE