Skip to content

Commit

Permalink
[Synapse] az synapse spark pool: Add parameter `--enable-dynamic-ex…
Browse files Browse the repository at this point in the history
…ecutor-allocation` (#23960)
  • Loading branch information
kevinzz6 authored Oct 28, 2022
1 parent a5c7174 commit f7f70ec
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@
text: |-
az synapse spark pool update --name testpool --workspace-name testsynapseworkspace --resource-group rg \\
--spark-config-file-path 'path/configfile.txt'
- name: Update the Spark pool's dynamic executor allocation configuration.
text: |-
az synapse spark pool update --name testpool --workspace-name testsynapseworkspace --resource-group rg \\
--enable-dynamic-exec --min-executors 3 --max-executors 10
"""

helps['synapse spark pool delete'] = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=too-many-statements, line-too-long, too-many-branches
# pylint: disable=too-many-statements, line-too-long, too-many-branches, option-length-too-long
from knack.arguments import CLIArgumentType
from argcomplete import FilesCompleter
from azure.mgmt.synapse.models import TransparentDataEncryptionStatus, SecurityAlertPolicyState, BlobAuditingPolicyState
Expand Down Expand Up @@ -171,6 +171,12 @@ def load_arguments(self, _):
# Spark config file
c.argument('spark_config_file_path', arg_group='Environment Configuration', help='Absolute path of Spark pool properties configuration file.')

# Dynamic executor allocation
c.argument('enable_dynamic_executor_allocation', arg_type=get_three_state_flag(), arg_group='DynamicExecutor',
options_list=['--enable-dynamic-exec'], help='Indicates whether Dynamic Executor Allocation is enabled or not.')
c.argument('max_executors', type=int, arg_group='DynamicExecutor', help='The maximum number of executors alloted.')
c.argument('min_executors', type=int, arg_group='DynamicExecutor', help='The minimum number of executors alloted.')

c.argument('tags', arg_type=tags_type)

with self.argument_context('synapse spark pool update') as c:
Expand Down Expand Up @@ -205,6 +211,12 @@ def load_arguments(self, _):
# Spark config file
c.argument('spark_config_file_path', arg_group='Environment Configuration', help='Absolute path of Spark pool properties configuration file.')

# Dynamic executor allocation
c.argument('enable_dynamic_executor_allocation', arg_type=get_three_state_flag(), arg_group='DynamicExecutor',
options_list=['--enable-dynamic-exec'], help='Indicates whether Dynamic Executor Allocation is enabled or not.')
c.argument('max_executors', type=int, arg_group='DynamicExecutor', help='The maximum number of executors alloted.')
c.argument('min_executors', type=int, arg_group='DynamicExecutor', help='The minimum number of executors alloted.')

# synapse sql pool
with self.argument_context('synapse sql pool') as c:
c.argument('workspace_name', id_part='name', help='The workspace name.')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=unused-argument, line-too-long, too-many-locals
# pylint: disable=unused-argument, line-too-long, too-many-locals, too-many-branches, too-many-statements
from azure.cli.core.util import sdk_no_wait, read_file_content
from azure.mgmt.synapse.models import BigDataPoolResourceInfo, AutoScaleProperties, AutoPauseProperties, LibraryRequirements, NodeSizeFamily, LibraryInfo, SparkConfigProperties
from azure.mgmt.synapse.models import BigDataPoolResourceInfo, AutoScaleProperties, AutoPauseProperties, LibraryRequirements, NodeSizeFamily, LibraryInfo, SparkConfigProperties, DynamicExecutorAllocation
from .._client_factory import cf_synapse_client_workspace_factory
from .artifacts import get_workspace_package
from pathlib import Path
Expand All @@ -20,7 +20,8 @@ def create_spark_pool(cmd, client, resource_group_name, workspace_name, spark_po
node_size_family=NodeSizeFamily.memory_optimized.value, enable_auto_scale=None,
min_node_count=None, max_node_count=None, spark_config_file_path=None,
enable_auto_pause=None, delay=None, spark_events_folder="/events",
spark_log_folder="/logs", tags=None, no_wait=False):
spark_log_folder="/logs", enable_dynamic_executor_allocation=None, min_executors=None,
max_executors=None, tags=None, no_wait=False):

workspace_client = cf_synapse_client_workspace_factory(cmd.cli_ctx)
workspace_object = workspace_client.get(resource_group_name, workspace_name)
Expand All @@ -37,6 +38,9 @@ def create_spark_pool(cmd, client, resource_group_name, workspace_name, spark_po
big_data_pool_info.auto_pause = AutoPauseProperties(enabled=enable_auto_pause,
delay_in_minutes=delay)

big_data_pool_info.dynamic_executor_allocation = DynamicExecutorAllocation(enabled=enable_dynamic_executor_allocation,
min_executors=min_executors,
max_executors=max_executors)
if spark_config_file_path:
filename = Path(spark_config_file_path).stem
try:
Expand All @@ -55,11 +59,10 @@ def create_spark_pool(cmd, client, resource_group_name, workspace_name, spark_po

def update_spark_pool(cmd, client, resource_group_name, workspace_name, spark_pool_name,
node_size=None, node_count=None, enable_auto_scale=None,
min_node_count=None, max_node_count=None,
enable_auto_pause=None, delay=None,
min_node_count=None, max_node_count=None, enable_auto_pause=None, delay=None,
library_requirements=None, spark_config_file_path=None,
package_action=None, package=None,
tags=None, force=False, no_wait=False):
package_action=None, package=None, enable_dynamic_executor_allocation=None, min_executors=None,
max_executors=None, tags=None, force=False, no_wait=False):
existing_spark_pool = client.get(resource_group_name, workspace_name, spark_pool_name)

if node_size:
Expand Down Expand Up @@ -118,6 +121,18 @@ def update_spark_pool(cmd, client, resource_group_name, workspace_name, spark_po
raise InvalidArgumentValueError(err_msg)
existing_spark_pool.spark_config_properties = SparkConfigProperties(content=content,
filename=filename)

if existing_spark_pool.dynamic_executor_allocation is not None:
if enable_dynamic_executor_allocation is not None:
existing_spark_pool.dynamic_executor_allocation.enabled = enable_dynamic_executor_allocation
if min_executors:
existing_spark_pool.dynamic_executor_allocation.min_executors = min_executors
if max_executors:
existing_spark_pool.dynamic_executor_allocation.max_executors = max_executors
else:
existing_spark_pool.dynamic_executor_allocation = DynamicExecutorAllocation(enabled=enable_dynamic_executor_allocation,
min_executors=min_executors,
max_executors=max_executors)
return sdk_no_wait(no_wait, client.begin_create_or_update, resource_group_name, workspace_name, spark_pool_name,
existing_spark_pool, force=force)

Expand Down
Loading

0 comments on commit f7f70ec

Please sign in to comment.