From d869cd2a90fba616e43d06feb70a3beaa603e98e Mon Sep 17 00:00:00 2001
From: Andrew Walker <awalker@ixsystems.com>
Date: Wed, 30 Oct 2024 09:01:10 -0600
Subject: [PATCH] Add ability to set ACL entries by username (#13965)

This commit converts filesystem ACL APIs to use new api_method
as well as improving the filesystem.setacl API to allow setting
ACL entries by name. This commit also removes several private
APIs.
---
 .../middlewared/api/v25_04_0/__init__.py      |   1 +
 .../middlewared/api/v25_04_0/acl.py           | 372 ++++++++++
 .../middlewared/plugins/filesystem_/acl.py    | 649 +++++++-----------
 .../plugins/filesystem_/acl_template.py       | 158 +++--
 .../plugins/filesystem_/perm_check.py         |  11 +-
 .../middlewared/plugins/filesystem_/utils.py  | 434 +++++++-----
 .../pytest/unit/plugins/test_acl_inherit.py   |  20 +-
 .../middlewared/utils/filesystem/acl.py       | 195 +++++-
 tests/api2/test_300_nfs.py                    |   2 +-
 tests/api2/test_344_acl_templates.py          | 231 +++----
 tests/api2/test_acl_by_who.py                 |  98 +++
 11 files changed, 1397 insertions(+), 774 deletions(-)
 create mode 100644 src/middlewared/middlewared/api/v25_04_0/acl.py
 create mode 100644 tests/api2/test_acl_by_who.py

diff --git a/src/middlewared/middlewared/api/v25_04_0/__init__.py b/src/middlewared/middlewared/api/v25_04_0/__init__.py
index 4cb6c36f43682..3c4f659212ecb 100644
--- a/src/middlewared/middlewared/api/v25_04_0/__init__.py
+++ b/src/middlewared/middlewared/api/v25_04_0/__init__.py
@@ -1,4 +1,5 @@
 from .acme_protocol import *  # noqa
+from .acl import * # noqa
 from .alert import *  # noqa
 from .alertservice import *  # noqa
 from .api_key import *  # noqa
diff --git a/src/middlewared/middlewared/api/v25_04_0/acl.py b/src/middlewared/middlewared/api/v25_04_0/acl.py
new file mode 100644
index 0000000000000..b3c4d9168f167
--- /dev/null
+++ b/src/middlewared/middlewared/api/v25_04_0/acl.py
@@ -0,0 +1,372 @@
+from annotated_types import Ge, Le
+from middlewared.api.base import (
+    BaseModel,
+    Excluded,
+    excluded_field,
+    ForUpdateMetaclass,
+    LocalUsername,
+    NonEmptyString,
+    RemoteUsername,
+    single_argument_args,
+)
+from pydantic import Field, model_validator
+from typing import Annotated, Literal, Self
+from middlewared.utils.filesystem.acl import (
+    ACL_UNDEFINED_ID,
+    FS_ACL_Type,
+    NFS4ACE_Tag,
+    NFS4ACE_Type,
+    NFS4ACE_MaskSimple,
+    NFS4ACE_FlagSimple,
+    POSIXACE_Tag,
+    NFS4_SPECIAL_ENTRIES,
+    POSIX_SPECIAL_ENTRIES,
+)
+from .common import QueryFilters, QueryOptions
+
+__all__ = [
+    'AclTemplateEntry',
+    'AclTemplateByPathArgs', 'AclTemplateByPathResult',
+    'AclTemplateCreateArgs', 'AclTemplateCreateResult',
+    'AclTemplateUpdateArgs', 'AclTemplateUpdateResult',
+    'AclTemplateDeleteArgs', 'AclTemplateDeleteResult',
+    'FilesystemAddToAclArgs', 'FilesystemAddToAclResult',
+    'FilesystemGetAclArgs', 'FilesystemGetAclResult',
+    'FilesystemSetAclArgs', 'FilesystemSetAclResult',
+    'FilesystemGetInheritedAclArgs', 'FilesystemGetInheritedAclResult'
+]
+
+ACL_MAX_ID = 2 ** 32 // 2 - 1
+
+AceWhoId = Annotated[int, Ge(ACL_UNDEFINED_ID), Le(ACL_MAX_ID)]
+
+NFS4ACE_BasicPermset = Literal[
+    NFS4ACE_MaskSimple.FULL_CONTROL,
+    NFS4ACE_MaskSimple.MODIFY,
+    NFS4ACE_MaskSimple.READ,
+    NFS4ACE_MaskSimple.TRAVERSE
+]
+
+NFS4ACE_BasicFlagset = Literal[
+    NFS4ACE_FlagSimple.INHERIT,
+    NFS4ACE_FlagSimple.NOINHERIT,
+]
+
+NFS4ACE_Tags = Literal[
+    NFS4ACE_Tag.SPECIAL_OWNER,
+    NFS4ACE_Tag.SPECIAL_GROUP,
+    NFS4ACE_Tag.SPECIAL_EVERYONE,
+    NFS4ACE_Tag.USER,
+    NFS4ACE_Tag.GROUP
+]
+
+NFS4ACE_EntryTypes = Literal[
+    NFS4ACE_Type.ALLOW,
+    NFS4ACE_Type.DENY
+]
+
+
+class NFS4ACE_AdvancedPerms(BaseModel):
+    READ_DATA: bool = False
+    WRITE_DATA: bool = False
+    APPEND_DATA: bool = False
+    READ_NAMED_ATTRS: bool = False
+    WRITE_NAMED_ATTRS: bool = False
+    EXECUTE: bool = False
+    DELETE: bool = False
+    DELETE_CHILD: bool = False
+    READ_ATTRIBUTES: bool = False
+    WRITE_ATTRIBUTES: bool = False
+    READ_ACL: bool = False
+    WRITE_ACL: bool = False
+    WRITE_OWNER: bool = False
+    SYNCHRONIZE: bool = False
+
+
+class NFS4ACE_BasicPerms(BaseModel):
+    BASIC: NFS4ACE_BasicPermset
+
+
+class NFS4ACE_AdvancedFlags(BaseModel):
+    FILE_INHERIT: bool = False
+    DIRECTORY_INHERIT: bool = False
+    NO_PROPAGATE_INHERIT: bool = False
+    INHERIT_ONLY: bool = False
+    INHERITED: bool = False
+
+    @model_validator(mode='after')
+    def check_inherit_only(self) -> Self:
+        if not self.INHERIT_ONLY:
+            return self
+
+        if not self.FILE_INHERIT and not self.DIRECTORY_INHERIT:
+            raise ValueError(
+                'At least one of FILE_INHERIT or DIRECTORY_INHERIT must '
+                'be set if INHERIT_ONLY is present in the ACE flags'
+            )
+
+        return self
+
+
+class NFS4ACE_BasicFlags(BaseModel):
+    BASIC: NFS4ACE_BasicFlagset
+
+
+class NFS4ACE(BaseModel):
+    tag: NFS4ACE_Tags
+    type: NFS4ACE_EntryTypes
+    perms: NFS4ACE_AdvancedPerms | NFS4ACE_BasicPerms
+    flags: NFS4ACE_AdvancedFlags | NFS4ACE_BasicFlags
+    id: AceWhoId | None = None
+    who: LocalUsername | RemoteUsername | None = None
+
+    @model_validator(mode='after')
+    def check_ace_valid(self) -> Self:
+        if self.tag in NFS4_SPECIAL_ENTRIES:
+            if self.id not in (-1, None):
+                raise ValueError(
+                    f'{self.id}: id may not be specified for the following ACL entry '
+                    f'tags: {", ".join([tag for tag in NFS4_SPECIAL_ENTRIES])}'
+                )
+        else:
+            if not self.who and self.id in (None, -1):
+                raise ValueError(
+                    'Numeric ID "id" or account name "who" must be specified'
+                )
+
+        return self
+
+
+class NFS4ACL_Flags(BaseModel):
+    autoinherit: bool = False
+    protected: bool = False
+    defaulted: bool = False
+
+
+POSIXACE_Tags = Literal[
+    POSIXACE_Tag.USER_OBJ,
+    POSIXACE_Tag.GROUP_OBJ,
+    POSIXACE_Tag.OTHER,
+    POSIXACE_Tag.MASK,
+    POSIXACE_Tag.USER,
+    POSIXACE_Tag.GROUP
+]
+
+
+class POSIXACE_Perms(BaseModel):
+    READ: bool
+    WRITE: bool
+    EXECUTE: bool
+
+
+class POSIXACE(BaseModel):
+    tag: POSIXACE_Tags
+    perms: POSIXACE_Perms
+    default: bool
+    id: AceWhoId | None = None
+    who: LocalUsername | RemoteUsername | None = None
+
+    @model_validator(mode='after')
+    def check_ace_valid(self) -> Self:
+        if self.tag in POSIX_SPECIAL_ENTRIES:
+            if self.id not in (-1, None):
+                raise ValueError(
+                    f'{self.id}: id may not be specified for the following ACL entry '
+                    f'tags: {", ".join([tag for tag in POSIX_SPECIAL_ENTRIES])}'
+                )
+        else:
+            if not self.who and self.id in (None, -1):
+                raise ValueError(
+                    'Numeric ID "id" or account name "who" must be specified'
+                )
+
+        return self
+
+
+class AclBaseInfo(BaseModel):
+    uid: AceWhoId | None
+    gid: AceWhoId | None
+
+
+class NFS4ACL(AclBaseInfo):
+    acltype: Literal[FS_ACL_Type.NFS4]
+    acl: list[NFS4ACE]
+    aclflags: NFS4ACL_Flags
+    trivial: bool
+
+
+class POSIXACL(AclBaseInfo):
+    acltype: Literal[FS_ACL_Type.POSIX1E]
+    acl: list[POSIXACE]
+    trivial: bool
+
+
+class DISABLED_ACL(AclBaseInfo):
+    # ACL response paths with ACL entirely disabled
+    acltype: Literal[FS_ACL_Type.DISABLED]
+    acl: Literal[None]
+    trivial: Literal[True]
+
+
+class FilesystemGetAclArgs(BaseModel):
+    path: NonEmptyString
+    simplified: bool = True
+    resolve_ids: bool = False
+
+
+class AclBaseResult(BaseModel):
+    path: NonEmptyString
+    user: NonEmptyString | None
+    group: NonEmptyString | None
+
+
+class NFS4ACLResult(NFS4ACL, AclBaseResult):
+    pass
+
+
+class POSIXACLResult(POSIXACL, AclBaseResult):
+    pass
+
+
+class DISABLED_ACLResult(DISABLED_ACL, AclBaseResult):
+    pass
+
+
+class FilesystemGetAclResult(BaseModel):
+    result: NFS4ACLResult | POSIXACLResult | DISABLED_ACLResult
+
+
+class FilesystemSetAclOptions(BaseModel):
+    stripacl: bool = False
+    recursive: bool = False
+    traverse: bool = False
+    canonicalize: bool = True
+    validate_effective_acl: bool = True
+
+
+@single_argument_args('filesystem_acl')
+class FilesystemSetAclArgs(BaseModel):
+    path: NonEmptyString
+    dacl: list[NFS4ACE] | list[POSIXACE]
+    options: FilesystemSetAclOptions = Field(default=FilesystemSetAclOptions())
+    nfs41_flags: NFS4ACL_Flags = Field(default=NFS4ACL_Flags())
+    uid: AceWhoId | None = ACL_UNDEFINED_ID
+    user: str | None = None
+    gid: AceWhoId | None = ACL_UNDEFINED_ID
+    group: str | None = None
+
+    # acltype is explicitly added to preserve compatibility with older setacl API
+    acltype: Literal[FS_ACL_Type.NFS4, FS_ACL_Type.POSIX1E] | None = None
+
+    @model_validator(mode='after')
+    def check_setacl_valid(self) -> Self:
+        if len(self.dacl) != 0 and self.options.stripacl:
+            raise ValueError(
+                'Simultaneosuly setting and removing ACL from path is not supported'
+            )
+
+        return self
+
+
+class FilesystemSetAclResult(FilesystemGetAclResult):
+    pass
+
+
+class AclTemplateEntry(BaseModel):
+    id: int
+    builtin: bool
+    name: str
+    acltype: Literal[FS_ACL_Type.NFS4, FS_ACL_Type.POSIX1E]
+    acl: list[NFS4ACE] | list[POSIXACE]
+    comment: str = ''
+
+
+class AclTemplateCreate(AclTemplateEntry):
+    id: Excluded = excluded_field()
+    builtin: Excluded = excluded_field()
+
+
+class AclTemplateCreateArgs(BaseModel):
+    acltemplate_create: AclTemplateCreate
+
+
+class AclTemplateCreateResult(BaseModel):
+    result: AclTemplateEntry
+
+
+class AclTemplateUpdate(AclTemplateCreate, metaclass=ForUpdateMetaclass):
+    pass
+
+
+class AclTemplateUpdateArgs(BaseModel):
+    id: int
+    acltemplate_update: AclTemplateUpdate
+
+
+class AclTemplateUpdateResult(BaseModel):
+    result: AclTemplateEntry
+
+
+class AclTemplateDeleteArgs(BaseModel):
+    id: int
+
+
+class AclTemplateDeleteResult(BaseModel):
+    result: int
+
+
+class AclTemplateFormatOptions(BaseModel):
+    canonicalize: bool = False
+    ensure_builtins: bool = False
+    resolve_names: bool = False
+
+
+@single_argument_args('filesystem_acl')
+class AclTemplateByPathArgs(BaseModel):
+    path: str = ""
+    query_filters: QueryFilters = Field(alias='query-filters', default=[])
+    query_options: QueryOptions = Field(alias='query-options', default=QueryOptions())
+    format_options: AclTemplateFormatOptions = Field(alias='format-options', default=AclTemplateFormatOptions())
+
+
+class AclTemplateByPathResult(BaseModel):
+    result: list[AclTemplateEntry]
+
+
+class SimplifiedAclEntry(BaseModel):
+    id_type: Literal[NFS4ACE_Tag.USER, NFS4ACE_Tag.GROUP]
+    id: int
+    access: Literal[
+        NFS4ACE_MaskSimple.READ,
+        NFS4ACE_MaskSimple.MODIFY,
+        NFS4ACE_MaskSimple.FULL_CONTROL
+    ]
+
+
+class FilesystemAddToAclOptions(BaseModel):
+    force: bool = False
+
+
+@single_argument_args('add_to_acl')
+class FilesystemAddToAclArgs(BaseModel):
+    path: NonEmptyString
+    entries: list[SimplifiedAclEntry]
+    options: FilesystemAddToAclOptions = Field(default=FilesystemAddToAclOptions())
+
+
+class FilesystemAddToAclResult(BaseModel):
+    result: bool
+
+
+class FSGetInheritedAclOptions(BaseModel):
+    directory: bool = True
+
+
+@single_argument_args('calculate_inherited_acl')
+class FilesystemGetInheritedAclArgs(BaseModel):
+    path: NonEmptyString
+    options: FSGetInheritedAclOptions = Field(default=FSGetInheritedAclOptions())
+
+
+class FilesystemGetInheritedAclResult(BaseModel):
+    result: list[NFS4ACE] | list[POSIXACE]
diff --git a/src/middlewared/middlewared/plugins/filesystem_/acl.py b/src/middlewared/middlewared/plugins/filesystem_/acl.py
index 0721748dd8f00..86d55a0ad01f7 100644
--- a/src/middlewared/middlewared/plugins/filesystem_/acl.py
+++ b/src/middlewared/middlewared/plugins/filesystem_/acl.py
@@ -2,15 +2,32 @@
 import json
 import os
 import subprocess
-import stat as pystat
 from pathlib import Path
 
-from middlewared.schema import Bool, Dict, Int, List, Str, Ref, UnixPerm, OROperator
-from middlewared.service import accepts, private, returns, job, CallError, ValidationErrors, Service
+from middlewared.api import api_method
+from middlewared.api.current import (
+    FilesystemAddToAclArgs, FilesystemAddToAclResult,
+    FilesystemGetAclArgs, FilesystemGetAclResult,
+    FilesystemSetAclArgs, FilesystemSetAclResult,
+    FilesystemGetInheritedAclArgs, FilesystemGetInheritedAclResult,
+)
+from middlewared.schema import Bool, Dict, Int, Str, UnixPerm
+from middlewared.service import accepts, private, returns, job, ValidationErrors, Service
+from middlewared.service_exception import CallError, MatchNotFound, ValidationError
+from middlewared.utils.filesystem.acl import (
+    ACL_UNDEFINED_ID,
+    FS_ACL_Type,
+    NFS4ACE_Tag,
+    POSIXACE_Tag,
+    normalize_acl_ids,
+    path_get_acltype,
+    strip_acl_path,
+    validate_nfs4_ace_full,
+)
 from middlewared.utils.filesystem.directory import directory_is_empty
 from middlewared.utils.path import FSLocation, path_location
 from middlewared.validators import Range
-from .utils import ACLType
+from .utils import acltool, AclToolAction, calculate_inherited_acl, canonicalize_nfs4_acl, gen_aclstring_posix1e
 
 
 class FilesystemService(Service):
@@ -18,24 +35,6 @@ class FilesystemService(Service):
     class Config:
         cli_private = True
 
-    def __acltool(self, path, action, uid, gid, options):
-
-        flags = "-r"
-        flags += "x" if options.get('traverse') else ""
-        flags += "C" if options.get('do_chmod') else ""
-        flags += "P" if options.get('posixacl') else ""
-
-        acltool = subprocess.run([
-            '/usr/bin/nfs4xdr_winacl',
-            '-a', action,
-            '-O', str(uid), '-G', str(gid),
-            flags,
-            '-c', path,
-            '-p', path], check=False, capture_output=True
-        )
-        if acltool.returncode != 0:
-            raise CallError(f"acltool [{action}] on path {path} failed with error: [{acltool.stderr.decode().strip()}]")
-
     def _common_perm_path_validate(self, schema, data, verrors, pool_mp_ok=False):
         loc = path_location(data['path'])
         if loc is FSLocation.EXTERNAL:
@@ -100,25 +99,7 @@ def path_get_acltype(self, path):
         if path_location(path) is FSLocation.EXTERNAL:
             raise NotImplementedError
 
-        try:
-            os.getxattr(path, "system.posix_acl_access")
-            return ACLType.POSIX1E.name
-
-        except OSError as e:
-            if e.errno == errno.ENODATA:
-                return ACLType.POSIX1E.name
-
-            if e.errno != errno.EOPNOTSUPP:
-                raise
-
-        try:
-            os.getxattr(path, "system.nfs4_acl_xdr")
-            return ACLType.NFS4.name
-        except OSError as e:
-            if e.errno == errno.EOPNOTSUPP:
-                return ACLType.DISABLED.name
-
-            raise
+        return path_get_acltype(path)
 
     @accepts(
         Dict(
@@ -172,30 +153,9 @@ def chown(self, job, data):
 
         job.set_progress(10, f'Recursively changing owner of {data["path"]}.')
         options['posixacl'] = True
-        self.__acltool(data['path'], 'chown', uid, gid, options)
+        acltool(data['path'], AclToolAction.CHOWN, uid, gid, options)
         job.set_progress(100, 'Finished changing owner.')
 
-    @private
-    def _strip_acl_nfs4(self, path):
-        stripacl = subprocess.run(
-            ['nfs4xdr_setfacl', '-b', path],
-            capture_output=True,
-            check=False
-        )
-        if stripacl.returncode != 0:
-            raise CallError(f"{path}: Failed to strip ACL on path: {stripacl.stderr.decode()}")
-
-        return
-
-    @private
-    def _strip_acl_posix1e(self, path):
-        posix_xattrs = ['system.posix_acl_access', 'system.posix_acl_default']
-        for xat in os.listxattr(path):
-            if xat not in posix_xattrs:
-                continue
-
-            os.removexattr(path, xat)
-
     @accepts(
         Dict(
             'filesystem_permission',
@@ -272,10 +232,7 @@ def setperm(self, job, data):
         if mode is not None:
             mode = int(mode, 8)
 
-        if is_nfs4acl:
-            self._strip_acl_nfs4(data['path'])
-        else:
-            self._strip_acl_posix1e(data['path'])
+        strip_acl_path(data['path'])
 
         if mode:
             os.chmod(data['path'], mode)
@@ -286,11 +243,11 @@ def setperm(self, job, data):
             job.set_progress(100, 'Finished setting permissions.')
             return
 
-        action = 'clone' if mode else 'strip'
+        action = AclToolAction.CLONE if mode else AclToolAction.STRIP
         job.set_progress(10, f'Recursively setting permissions on {data["path"]}.')
         options['posixacl'] = not is_nfs4acl
         options['do_chmod'] = True
-        self.__acltool(data['path'], action, uid, gid, options)
+        acltool(data['path'], action, uid, gid, options)
         job.set_progress(100, 'Finished setting permissions.')
 
     @private
@@ -329,7 +286,7 @@ def getacl_nfs4(self, path, simplified, resolve_ids):
             ace['flags'].pop('FAILED_ACCESS', None)
 
         na41flags = output.pop('nfs41_flags')
-        output['nfs41_flags'] = {
+        output['aclflags'] = {
             "protected": na41flags['PROTECTED'],
             "defaulted": na41flags['DEFAULTED'],
             "autoinherit": na41flags['AUTOINHERIT']
@@ -344,12 +301,7 @@ def getacl_posix1e(self, path, simplified, resolve_ids):
             'uid': st.st_uid,
             'gid': st.st_gid,
             'acl': [],
-            'flags': {
-                'setuid': bool(st.st_mode & pystat.S_ISUID),
-                'setgid': bool(st.st_mode & pystat.S_ISGID),
-                'sticky': bool(st.st_mode & pystat.S_ISVTX),
-            },
-            'acltype': ACLType.POSIX1E.name
+            'acltype': FS_ACL_Type.POSIX1E
         }
 
         ret['uid'] = st.st_uid
@@ -420,30 +372,19 @@ def getacl_posix1e(self, path, simplified, resolve_ids):
     def getacl_disabled(self, path):
         st = os.stat(path)
         return {
+            'path': path,
             'uid': st.st_uid,
             'gid': st.st_gid,
-            'acl': [],
-            'acltype': ACLType.DISABLED.name,
+            'acl': None,
+            'acltype': FS_ACL_Type.DISABLED,
             'trivial': True,
         }
 
-    @accepts(
-        Str('path'),
-        Bool('simplified', default=True),
-        Bool('resolve_ids', default=False),
-        roles=['FILESYSTEM_ATTRS_READ']
+    @api_method(
+        FilesystemGetAclArgs,
+        FilesystemGetAclResult,
+        roles=['FILESYSTEM_ATTRS_READ'],
     )
-    @returns(Dict(
-        'truenas_acl',
-        Str('path'),
-        Bool('trivial'),
-        Str('acltype', enum=[x.name for x in ACLType], null=True),
-        OROperator(
-            Ref('nfs4_acl'),
-            Ref('posix1e_acl'),
-            name='acl'
-        )
-    ))
     def getacl(self, path, simplified, resolve_ids):
         """
         Return ACL of a given path. This may return a POSIX1e ACL or a NFSv4 ACL. The acl type is indicated
@@ -483,22 +424,30 @@ def getacl(self, path, simplified, resolve_ids):
         if not os.path.exists(path):
             raise CallError('Path not found.', errno.ENOENT)
 
-        path_acltype = self.path_get_acltype(path)
-        acltype = ACLType[path_acltype]
+        acltype = path_get_acltype(path)
 
-        if acltype == ACLType.NFS4:
+        if acltype == FS_ACL_Type.NFS4:
             ret = self.getacl_nfs4(path, simplified, resolve_ids)
-        elif acltype == ACLType.POSIX1E:
+        elif acltype == FS_ACL_Type.POSIX1E:
             ret = self.getacl_posix1e(path, simplified, resolve_ids)
         else:
             ret = self.getacl_disabled(path)
 
+        ret.update({'user': None, 'group': None})
+
+        if resolve_ids:
+            if user := self.middleware.call_sync('user.query', [['uid', '=', ret['uid']]]):
+                ret['user'] = user[0]['username']
+
+            if group := self.middleware.call_sync('group.query', [['gid', '=', ret['gid']]]):
+                ret['group'] = group[0]['group']
+
         return ret
 
     @private
     def setacl_nfs4_internal(self, path, acl, do_canon, verrors):
         payload = {
-            'acl': ACLType.NFS4.canonicalize(acl) if do_canon else acl,
+            'acl': canonicalize_nfs4_acl(acl) if do_canon else acl,
         }
         json_payload = json.dumps(payload)
         setacl = subprocess.run(
@@ -523,190 +472,54 @@ def setacl_nfs4_internal(self, path, acl, do_canon, verrors):
             raise CallError(setacl.stderr.decode())
 
     @private
-    def setacl_nfs4(self, job, data):
+    def setacl_nfs4(self, job, current_acl, data):
         job.set_progress(0, 'Preparing to set acl.')
-        verrors = ValidationErrors()
-        options = data.get('options', {})
-        recursive = options.get('recursive', False)
-        do_strip = options.get('stripacl', False)
-        do_canon = options.get('canonicalize', False)
+        recursive = data['options'].get('recursive', False)
+        do_strip = data['options'].get('stripacl', False)
+        do_canon = data['options'].get('canonicalize', False)
+        action = AclToolAction.CLONE
 
-        path = data.get('path', '')
-        uid = -1 if data['uid'] is None else data.get('uid', -1)
-        gid = -1 if data['gid'] is None else data.get('gid', -1)
-
-        aclcheck = ACLType.NFS4.validate(data)
-        if not aclcheck['is_valid']:
-            for err in aclcheck['errors']:
-                if err[2]:
-                    v = f'filesystem_acl.dacl.{err[0]}.{err[2]}'
-                else:
-                    v = f'filesystem_acl.dacl.{err[0]}'
-
-                verrors.add(v, err[1])
+        verrors = ValidationErrors()
 
-        current_acl = self.getacl(path)
-        if current_acl['acltype'] != ACLType.NFS4.name:
-            verrors.add(
-                'filesystem_acl.acltype',
-                f'ACL type mismatch. On-disk format is [{current_acl["acltype"]}], '
-                f'but received [{data.get("acltype")}].'
-            )
+        for idx, ace in enumerate(data['dacl']):
+            validate_nfs4_ace_full(ace, f'filesystem.setacl.dacl.{idx}', verrors)
 
         verrors.check()
 
         if do_strip:
-            self._strip_acl_nfs4(path)
+            action = AclToolAction.STRIP
+            strip_acl_path(data['path'])
 
         else:
-            if options['validate_effective_acl']:
-                uid_to_check = current_acl['uid'] if uid == -1 else uid
-                gid_to_check = current_acl['gid'] if gid == -1 else gid
+            if data['options']['validate_effective_acl']:
+                uid_to_check = current_acl['uid'] if data['uid'] == ACL_UNDEFINED_ID else data['uid']
+                gid_to_check = current_acl['gid'] if data['gid'] == ACL_UNDEFINED_ID else data['gid']
 
                 self.middleware.call_sync(
                     'filesystem.check_acl_execute',
-                    path, data['dacl'], uid_to_check, gid_to_check, True
+                    data['path'], data['dacl'], uid_to_check, gid_to_check, True
                 )
 
-            self.setacl_nfs4_internal(path, data['dacl'], do_canon, verrors)
+            self.setacl_nfs4_internal(data['path'], data['dacl'], do_canon, verrors)
 
         if not recursive:
-            os.chown(path, uid, gid)
+            os.chown(data['path'], data['uid'], data['gid'])
             job.set_progress(100, 'Finished setting NFSv4 ACL.')
             return
 
-        self.__acltool(path, 'clone' if not do_strip else 'strip',
-                       uid, gid, options)
+        acltool(data['path'], action, data['uid'], data['gid'], data['options'])
 
         job.set_progress(100, 'Finished setting NFSv4 ACL.')
 
     @private
-    def gen_aclstring_posix1e(self, dacl, recursive, verrors):
-        """
-        This method iterates through provided POSIX1e ACL and
-        performs addtional validation before returning the ACL
-        string formatted for the setfacl command. In case
-        of ValidationError, None is returned.
-        """
-        has_tag = {
-            "USER_OBJ": False,
-            "GROUP_OBJ": False,
-            "OTHER": False,
-            "MASK": False,
-            "DEF_USER_OBJ": False,
-            "DEF_GROUP_OBJ": False,
-            "DEF_OTHER": False,
-            "DEF_MASK": False,
-        }
-        required_entries = ["USER_OBJ", "GROUP_OBJ", "OTHER"]
-        has_named = False
-        has_def_named = False
-        has_default = False
-        aclstring = ""
-
-        for idx, ace in enumerate(dacl):
-            if idx != 0:
-                aclstring += ","
-
-            if ace['id'] == -1:
-                ace['id'] = ''
-
-            who = "DEF_" if ace['default'] else ""
-            who += ace['tag']
-            duplicate_who = has_tag.get(who)
-
-            if duplicate_who is True:
-                verrors.add(
-                    'filesystem_acl.dacl.{idx}',
-                    f'More than one {"default" if ace["default"] else ""} '
-                    f'{ace["tag"]} entry is not permitted'
-                )
-
-            elif duplicate_who is False:
-                has_tag[who] = True
-
-            if ace['tag'] in ["USER", "GROUP"]:
-                if ace['default']:
-                    has_def_named = True
-                else:
-                    has_named = True
-
-            ace['tag'] = ace['tag'].rstrip('_OBJ').lower()
-
-            if ace['default']:
-                has_default = True
-                aclstring += "default:"
-
-            aclstring += f"{ace['tag']}:{ace['id']}:"
-            aclstring += 'r' if ace['perms']['READ'] else '-'
-            aclstring += 'w' if ace['perms']['WRITE'] else '-'
-            aclstring += 'x' if ace['perms']['EXECUTE'] else '-'
-
-        if has_named and not has_tag['MASK']:
-            verrors.add(
-                'filesystem_acl.dacl',
-                'Named (user or group) POSIX ACL entries '
-                'require a mask entry to be present in the ACL.'
-            )
-
-        elif has_def_named and not has_tag['DEF_MASK']:
-            verrors.add(
-                'filesystem_acl.dacl',
-                'Named default (user or group) POSIX ACL entries '
-                'require a default mask entry to be present in the ACL.'
-            )
-
-        if recursive and not has_default:
-            verrors.add(
-                'filesystem_acl.dacl',
-                'Default ACL entries are required in order to apply '
-                'ACL recursively.'
-            )
-
-        for entry in required_entries:
-            if not has_tag[entry]:
-                verrors.add(
-                    'filesystem_acl.dacl',
-                    f'Presence of [{entry}] entry is required.'
-                )
-
-            if has_default and not has_tag[f"DEF_{entry}"]:
-                verrors.add(
-                    'filesystem_acl.dacl',
-                    f'Presence of default [{entry}] entry is required.'
-                )
-
-        return aclstring
-
-    @private
-    def setacl_posix1e(self, job, data):
+    def setacl_posix1e(self, job, current_acl, data):
         job.set_progress(0, 'Preparing to set acl.')
-        verrors = ValidationErrors()
         options = data['options']
         recursive = options.get('recursive', False)
         do_strip = options.get('stripacl', False)
         dacl = data.get('dacl', [])
-        path = data['path']
-        uid = -1 if data['uid'] is None else data.get('uid', -1)
-        gid = -1 if data['gid'] is None else data.get('gid', -1)
-
-        aclcheck = ACLType.POSIX1E.validate(data)
-        if not aclcheck['is_valid']:
-            for err in aclcheck['errors']:
-                if err[2]:
-                    v = f'filesystem_acl.dacl.{err[0]}.{err[2]}'
-                else:
-                    v = f'filesystem_acl.dacl.{err[0]}'
-
-                verrors.add(v, err[1])
-
-        current_acl = self.getacl(path)
-        if current_acl['acltype'] != ACLType.POSIX1E.name:
-            verrors.add(
-                'filesystem_acl.acltype',
-                f'ACL type mismatch. On-disk format is [{current_acl["acltype"]}], '
-                f'but received [{data.get("acltype")}].'
-            )
+        verrors = ValidationErrors()
+        action = AclToolAction.STRIP if do_strip else AclToolAction.CLONE
 
         if do_strip and dacl:
             verrors.add(
@@ -718,12 +531,12 @@ def setacl_posix1e(self, job, data):
             if options['validate_effective_acl']:
                 try:
                     # check execute on parent paths
-                    uid_to_check = current_acl['uid'] if uid == -1 else uid
-                    gid_to_check = current_acl['gid'] if gid == -1 else gid
+                    uid_to_check = current_acl['uid'] if data['uid'] == ACL_UNDEFINED_ID else data['uid']
+                    gid_to_check = current_acl['gid'] if data['gid'] == ACL_UNDEFINED_ID else data['gid']
 
                     self.middleware.call_sync(
                         'filesystem.check_acl_execute',
-                        path, dacl, uid_to_check, gid_to_check, True
+                        data['path'], dacl, uid_to_check, gid_to_check, True
                     )
                 except CallError as e:
                     if e.errno != errno.EPERM:
@@ -734,133 +547,210 @@ def setacl_posix1e(self, job, data):
                         e.errmsg
                     )
 
-            aclstring = self.gen_aclstring_posix1e(dacl, recursive, verrors)
+            aclstring = gen_aclstring_posix1e(dacl, recursive, verrors)
 
         verrors.check()
 
-        self._strip_acl_posix1e(path)
+        strip_acl_path(data['path'])
 
         job.set_progress(50, 'Setting POSIX1e ACL.')
 
         if not do_strip:
-            setacl = subprocess.run(['setfacl', '-m', aclstring, path],
+            setacl = subprocess.run(['setfacl', '-m', aclstring, data['path']],
                                     check=False, capture_output=True)
             if setacl.returncode != 0:
-                raise CallError(f'Failed to set ACL [{aclstring}] on path [{path}]: '
+                raise CallError(f'Failed to set ACL [{aclstring}] on path [{data["path"]}]: '
                                 f'{setacl.stderr.decode()}')
 
         if not recursive:
-            os.chown(path, uid, gid)
+            os.chown(data['path'], data['uid'], data['gid'])
             job.set_progress(100, 'Finished setting POSIX1e ACL.')
             return
 
         options['posixacl'] = True
-        self.__acltool(data['path'],
-                       'clone' if not do_strip else 'strip',
-                       uid, gid, options)
+        acltool(data['path'], action, data['uid'], data['gid'], options)
 
         job.set_progress(100, 'Finished setting POSIX1e ACL.')
 
-    @accepts(
-        Dict(
-            'filesystem_acl',
-            Str('path', required=True),
-            Int('uid', null=True, default=None, validators=[Range(min_=-1, max_=2147483647)]),
-            Int('gid', null=True, default=None, validators=[Range(min_=-1, max_=2147483647)]),
-            OROperator(
-                List(
-                    'nfs4_acl',
-                    items=[Dict(
-                        'nfs4_ace',
-                        Str('tag', enum=['owner@', 'group@', 'everyone@', 'USER', 'GROUP']),
-                        Int('id', null=True, validators=[Range(min_=-1, max_=2147483647)]),
-                        Str('type', enum=['ALLOW', 'DENY']),
-                        Dict(
-                            'perms',
-                            Bool('READ_DATA'),
-                            Bool('WRITE_DATA'),
-                            Bool('APPEND_DATA'),
-                            Bool('READ_NAMED_ATTRS'),
-                            Bool('WRITE_NAMED_ATTRS'),
-                            Bool('EXECUTE'),
-                            Bool('DELETE_CHILD'),
-                            Bool('READ_ATTRIBUTES'),
-                            Bool('WRITE_ATTRIBUTES'),
-                            Bool('DELETE'),
-                            Bool('READ_ACL'),
-                            Bool('WRITE_ACL'),
-                            Bool('WRITE_OWNER'),
-                            Bool('SYNCHRONIZE'),
-                            Str('BASIC', enum=['FULL_CONTROL', 'MODIFY', 'READ', 'TRAVERSE']),
-                        ),
-                        Dict(
-                            'flags',
-                            Bool('FILE_INHERIT'),
-                            Bool('DIRECTORY_INHERIT'),
-                            Bool('NO_PROPAGATE_INHERIT'),
-                            Bool('INHERIT_ONLY'),
-                            Bool('INHERITED'),
-                            Str('BASIC', enum=['INHERIT', 'NOINHERIT']),
-                        ),
-                        register=True
-                    )],
-                    register=True
-                ),
-                List(
-                    'posix1e_acl',
-                    items=[Dict(
-                        'posix1e_ace',
-                        Bool('default', default=False),
-                        Str('tag', enum=['USER_OBJ', 'GROUP_OBJ', 'USER', 'GROUP', 'OTHER', 'MASK']),
-                        Int('id', default=-1, validators=[Range(min_=-1, max_=2147483647)]),
-                        Dict(
-                            'perms',
-                            Bool('READ', default=False),
-                            Bool('WRITE', default=False),
-                            Bool('EXECUTE', default=False),
-                        ),
-                        register=True
-                    )],
-                    register=True
-                ),
-                name='dacl',
-            ),
-            Dict(
-                'nfs41_flags',
-                Bool('autoinherit', default=False),
-                Bool('protected', default=False),
-                Bool('defaulted', default=False),
-            ),
-            Str('acltype', enum=[x.name for x in ACLType], null=True),
-            Dict(
-                'options',
-                Bool('stripacl', default=False),
-                Bool('recursive', default=False),
-                Bool('traverse', default=False),
-                Bool('canonicalize', default=True),
-                Bool('validate_effective_acl', default=True)
-            )
-        ), roles=['FILESYSTEM_ATTRS_WRITE'], audit='Filesystem set ACL', audit_extended=lambda data: data['path']
+    @api_method(
+        FilesystemSetAclArgs,
+        FilesystemSetAclResult,
+        roles=['FILESYSTEM_ATTRS_WRITE'],
+        audit='Filesystem set ACL',
+        audit_extended=lambda data: data['path']
     )
-    @returns()
     @job(lock="perm_change")
     def setacl(self, job, data):
+        """
+        Set ACL of a given path. Takes the following parameters:
+        `path` full path to directory or file.
+
+        `dacl` ACL entries. Formatting depends on the underlying `acltype`. NFS4ACL requires
+        NFSv4 entries. POSIX1e requires POSIX1e entries.
+
+        `uid` the desired UID of the file user. If set to None (the default), then user is not changed.
+
+        `user` the desired username for the file user. If set to None, then user is not changed.
+
+        Note about interaction between `uid` and `user`:
+        One and only one of these parameters should be set, and _only_ if the API consumer wishes to
+        change the owner on the file / directory.
+
+        `gid` the desired GID of the file group. If set to None (the default), then group is not changed.
+
+        `group` the desired groupname for the file group. If set to None (the default), then group is not
+        changed.
+
+        Note about interaction between `gid` and `group`:
+        One and only one of these parameters should be set, and _only_ if the API consumer wishes to
+        change the owner on the file / directory.
+
+        WARNING: if user, uid, group, or gid is specified in a recursive operation then the owning
+        user, group, or both for _all_ files will be changed.
+
+        `recursive` apply the ACL recursively
+
+        `traverse` traverse filestem boundaries (ZFS datasets)
+
+        `strip` convert ACL to trivial. ACL is trivial if it can be expressed as a file mode without
+        losing any access rules.
+
+        `canonicalize` reorder ACL entries so that they are in concanical form as described
+        in the Microsoft documentation MS-DTYP 2.4.5 (ACL). This only applies to NFSv4 ACLs.
+
+        The following notes about ACL entries are necessarily terse. If more detail is requried
+        please consult relevant TrueNAS documentation.
+
+        Notes about NFSv4 ACL entry fields:
+
+        `tag` refers to the type of principal to whom the ACL entries applies. USER and GROUP have
+        conventional meanings. `owner@` refers to the owning user of the file, `group@` refers to the owning
+        group of the file, and `everyone@` refers to ALL users (including the owning user and group)..
+
+        `id` refers to the numeric user id or group id associatiated with USER or GROUP entries.
+
+        `who` a user or group name may be specified in lieu of numeric ID for USER or GROUP entries
+
+        `type` may be ALLOW or DENY. Deny entries take precedence over allow when the ACL is evaluated.
+
+        `perms` permissions allowed or denied by the entry. May be set as a simlified BASIC type or
+        more complex type detailing specific permissions.
+
+        `flags` inheritance flags determine how this entry will be presented (if at all) on newly-created
+        files or directories within the specified path. Only valid for directories.
+
+        Notes about posix1e ACL entry fields:
+
+        `default` the ACL entry is in the posix default ACL (will be copied to new files and directories)
+        created within the directory where it is set. These are _NOT_ evaluated when determining access for
+        the file on which they're set. If default is false then the entry applies to the posix access ACL,
+        which is used to determine access to the directory, but is not inherited on new files / directories.
+
+        `tag` the type of principal to whom the ACL entry apples. USER and GROUP have conventional meanings
+        USER_OBJ refers to the owning user of the file and is also denoted by "user" in conventional POSIX
+        UGO permissions. GROUP_OBJ refers to the owning group of the file and is denoted by "group" in the
+        same. OTHER refers to POSIX other, which applies to all users and groups who are not USER_OBJ or
+        GROUP_OBJ. MASK sets maximum permissions granted to all USER and GROUP entries. A valid POSIX1 ACL
+        entry contains precisely one USER_OBJ, GROUP_OBJ, OTHER, and MASK entry for the default and access
+        list.
+
+        `id` refers to the numeric user id or group id associatiated with USER or GROUP entries.
+
+        `who` a user or group name may be specified in lieu of numeric ID for USER or GROUP entries
+
+        `perms` - object containing posix permissions.
+        """
         verrors = ValidationErrors()
         data['loc'] = self._common_perm_path_validate("filesystem.setacl", data, verrors)
+        normalize_acl_ids(data)
+        if data['uid'] != ACL_UNDEFINED_ID and data['user']:
+            verrors.add(
+                'filesystem.setacl.user',
+                'User and uid may not be specified simultaneously.'
+            )
+
+        if data['gid'] != ACL_UNDEFINED_ID and data['group']:
+            verrors.add(
+                'filesystem.setacl.group',
+                'group and gid may not be specified simultaneously.'
+            )
+
+        if data['user']:
+            if user := self.middleware.call_sync('user.query', [['username', '=', data['user']]]):
+                data['uid'] = user[0]['uid']
+            else:
+                verrors.add(
+                    'filesystem.setacl.user',
+                    f'{data["user"]}: user does not exist.'
+                )
+
+        if data['group']:
+            if group := self.middleware.call_sync('group.query', [['group', '=', data['group']]]):
+                data['gid'] = group[0]['gid']
+            else:
+                verrors.add(
+                    'filesystem.setacl.group',
+                    f'{data["group"]}: group does not exist.'
+                )
+
         verrors.check()
 
-        if 'acltype' in data:
-            acltype = ACLType[data['acltype']]
-        else:
-            path_acltype = self.path_get_acltype(data['path'])
-            acltype = ACLType[path_acltype]
+        current_acl = self.getacl(data['path'])
+        if data['acltype'] and data['acltype'] != current_acl['acltype']:
+            raise ValidationError(
+                'filesystem.setacl.dacl.acltype',
+                'ACL type is invalid for selected path'
+            )
 
-        if acltype == ACLType.NFS4:
-            return self.setacl_nfs4(job, data)
-        elif acltype == ACLType.POSIX1E:
-            return self.setacl_posix1e(job, data)
-        else:
-            raise CallError(f"{data['path']}: ACLs disabled on path.", errno.EOPNOTSUPP)
+        for idx, entry in enumerate(data['dacl']):
+            # Convert any names to ids (because ultimately uid/gid is written to disk)
+            # Earlier validation checks whether someone is trying to set both id and name.
+            if entry.get('who') in (None, ''):
+                # entry does not specify a name and so we don't need to normalize
+                continue
+
+            if entry.get('id') != ACL_UNDEFINED_ID:
+                # entry already has a uid / gid
+                continue
+
+            # We're using user.query and group.query to intialize cache entries if required
+            match entry['tag']:
+                case 'USER':
+                    method = 'user.query'
+                    filters = [['username', '=', entry['who']]]
+                    key = 'uid'
+                case 'GROUP':
+                    method = 'group.query'
+                    filters = [['group', '=', entry['who']]]
+                    key = 'gid'
+                case POSIXACE_Tag.USER_OBJ | POSIXACE_Tag.GROUP_OBJ | POSIXACE_Tag.OTHER | POSIXACE_Tag.MASK:
+                    # these tags don't require an explicit uid/gid
+                    continue
+                case NFS4ACE_Tag.SPECIAL_OWNER | NFS4ACE_Tag.SPECIAL_GROUP | NFS4ACE_Tag.EVERYONE:
+                    # these tags don't require an explicit uid/gid
+                    continue
+                case _:
+                    raise ValidationError(
+                        f'filesystem.setacl.{idx}.who',
+                        'Name may only be specified for USER and GROUP entries'
+                    )
+            try:
+                entry['id'] = self.middleware.call_sync(method, filters, {'get': True})[key]
+                entry['who'] = None
+            except MatchNotFound:
+                raise ValidationError(f'filesystem.setacl.{idx}.who', f'{entry["who"]}: account does not exist')
+
+        match current_acl['acltype']:
+            case FS_ACL_Type.NFS4:
+                self.setacl_nfs4(job, current_acl, data)
+            case FS_ACL_Type.POSIX1E:
+                self.setacl_posix1e(job, current_acl, data)
+            case FS_ACL_Type.DISABLED:
+                raise CallError(f"{data['path']}: ACLs disabled on path.", errno.EOPNOTSUPP)
+            case _:
+                raise TypeError(f'{current_acl["acltype"]}: unexpected ACL type')
+
+        return self.getacl(data['path'])
 
     @private
     def add_to_acl_posix(self, acl, entries):
@@ -1014,21 +904,14 @@ def check_acl_for_entry(entry):
 
         return changed
 
-    @private
-    @accepts(Dict(
-        'add_to_acl',
-        Str('path', required=True),
-        List('entries', required=True, items=[Dict(
-            'simplified_acl_entry',
-            Str('id_type', enum=['USER', 'GROUP'], required=True),
-            Int('id', required=True),
-            Str('access', enum=['READ', 'MODIFY', 'FULL_CONTROL'], required=True)
-        )]),
-        Dict(
-            'options',
-            Bool('force', default=False),
-        )
-    ), roles=['FILESYSTEM_ATTRS_WRITE'], audit='Filesystem add to ACL', audit_extended=lambda data: data['path'])
+    @api_method(
+        FilesystemAddToAclArgs,
+        FilesystemAddToAclResult,
+        roles=['FILESYSTEM_ATTRS_WRITE'],
+        audit='Filesystem add to ACL',
+        audit_extended=lambda data: data['path'],
+        private=True
+    )
     @job()
     def add_to_acl(self, job, data):
         """
@@ -1051,11 +934,11 @@ def add_to_acl(self, job, data):
 
         data['path'] = init_path
         current_acl = self.getacl(data['path'])
-        acltype = ACLType[current_acl['acltype']]
+        acltype = FS_ACL_Type(current_acl['acltype'])
 
-        if acltype == ACLType.NFS4:
+        if acltype == FS_ACL_Type.NFS4:
             changed = self.add_to_acl_nfs4(current_acl['acl'], data['entries'])
-        elif acltype == ACLType.POSIX1E:
+        elif acltype == FS_ACL_Type.POSIX1E:
             changed = self.add_to_acl_posix(current_acl['acl'], data['entries'])
         else:
             raise CallError(f"{data['path']}: ACLs disabled on path.", errno.EOPNOTSUPP)
@@ -1080,15 +963,7 @@ def add_to_acl(self, job, data):
         job.wrap_sync(setacl_job)
         return changed
 
-    @private
-    @accepts(Dict(
-        'calculate_inherited_acl',
-        Str('path', required=True),
-        Dict(
-            'options',
-            Bool('directory', default=True)
-        )
-    ))
+    @api_method(FilesystemGetInheritedAclArgs, FilesystemGetInheritedAclResult, private=True)
     def get_inherited_acl(self, data):
         """
         Generate an inherited ACL based on given `path`
@@ -1100,6 +975,4 @@ def get_inherited_acl(self, data):
         verrors.check()
 
         current_acl = self.getacl(data['path'], False)
-        acltype = ACLType[current_acl['acltype']]
-
-        return acltype.calculate_inherited(current_acl, data['options']['directory'])
+        return calculate_inherited_acl(current_acl, data['options']['directory'])
diff --git a/src/middlewared/middlewared/plugins/filesystem_/acl_template.py b/src/middlewared/middlewared/plugins/filesystem_/acl_template.py
index 75061e1a55d41..b76de9e7675ef 100644
--- a/src/middlewared/middlewared/plugins/filesystem_/acl_template.py
+++ b/src/middlewared/middlewared/plugins/filesystem_/acl_template.py
@@ -1,9 +1,22 @@
+from middlewared.api import api_method
+from middlewared.api.current import (
+    AclTemplateEntry,
+    AclTemplateByPathArgs, AclTemplateByPathResult,
+    AclTemplateCreateArgs, AclTemplateCreateResult,
+    AclTemplateUpdateArgs, AclTemplateUpdateResult,
+    AclTemplateDeleteArgs, AclTemplateDeleteResult,
+)
 from middlewared.service import CallError, CRUDService, ValidationErrors
-from middlewared.service import accepts, private, returns
-from middlewared.schema import Bool, Dict, Int, List, Str, Ref, Patch, OROperator
+from middlewared.service import private
 from middlewared.plugins.smb import SMBBuiltin
 from middlewared.utils.directoryservices.constants import DSStatus, DSType
-from .utils import ACLType
+from middlewared.utils.filesystem.acl import (
+    ACL_UNDEFINED_ID,
+    FS_ACL_Type,
+    NFS4_SPECIAL_ENTRIES,
+    POSIX_SPECIAL_ENTRIES
+)
+from .utils import canonicalize_nfs4_acl, gen_aclstring_posix1e
 
 import middlewared.sqlalchemy as sa
 import errno
@@ -27,46 +40,70 @@ class ACLTemplateService(CRUDService):
     class Config:
         datastore = 'filesystem.acltemplate'
         datastore_prefix = 'acltemplate_'
+        datastore_extend = 'filesystem.acltemplate.extend'
         namespace = 'filesystem.acltemplate'
         cli_private = True
+        entry = AclTemplateEntry
+        role_prefix = 'FILESYSTEM_ATTRS'
 
-    ENTRY = Patch(
-        'acltemplate_create', 'acltemplate_entry',
-        ('add', Int('id')),
-        ('add', Bool('builtin')),
-    )
+    @private
+    async def extend(self, data):
+        # Normalize entries for raw query. API consumer can request to
+        # resolve IDs in filesystem.acltemplate.by_path
+        for ace in data['acl']:
+            ace['who'] = None
 
     @private
-    async def validate_acl(self, data, schema, verrors):
-        acltype = ACLType[data['acltype']]
-        aclcheck = acltype.validate({'dacl': data['acl']})
-        if not aclcheck['is_valid']:
-            for err in aclcheck['errors']:
-                if err[2]:
-                    v = f'{schema}.{err[0]}.{err[2]}'
-                else:
-                    v = f'{schema}.{err[0]}'
-
-                verrors.add(v, err[1])
-
-        if acltype is ACLType.POSIX1E:
-            await self.middleware.call(
-                "filesystem.gen_aclstring_posix1e",
-                copy.deepcopy(data["acl"]), False, verrors
-            )
+    async def validate_acl(self, data, schema, verrors, template_id):
+        await self._ensure_unique(verrors, schema, 'name', data['name'], template_id)
+
+        acltype = FS_ACL_Type(data['acltype'])
 
         for idx, ace in enumerate(data['acl']):
+            # We deliberately remove `who` key from entry before datastore insertion
+            # because the name can change due to account management actions
+            ace_who = ace.pop('who', None)
+
             if ace.get('id') is None:
-                verrors.add(f'{schema}.{idx}.id', 'null id is not permitted.')
-
-    @accepts(Dict(
-        "acltemplate_create",
-        Str("name", required=True),
-        Str("acltype", required=True, enum=["NFS4", "POSIX1E"]),
-        Str("comment"),
-        OROperator(Ref('nfs4_acl'), Ref('posix1e_acl'), name='acl', required=True),
-        register=True
-    ), roles=['FILESYSTEM_ATTRS_WRITE'])
+                ace['id'] = ACL_UNDEFINED_ID
+
+            if ace['tag'] in NFS4_SPECIAL_ENTRIES | POSIX_SPECIAL_ENTRIES:
+                continue
+
+            if ace['id'] != ACL_UNDEFINED_ID:
+                if ace_who:
+                    verrors.add(f'{schema}.{idx}.who',
+                                'id and who may not be simultaneously specified in ACL entry')
+                continue
+
+            if ace_who is None:
+                verrors.add(f'{schema}.{idx}.id', 'identifier (uid, gid, who) is required')
+                continue
+
+            match ace['tag']:
+                case 'USER':
+                    entry = await self.middleware.call('user.query', [['username', '=', ace['who']]])
+                    entry_key = 'uid'
+                case 'GROUP':
+                    entry = await self.middleware.call('group.query', [['group', '=', ace['who']]])
+                    entry_key = 'gid'
+                case _:
+                    raise TypeError(f'{ace["tag"]}: unexpected ace tag.')
+
+            if not entry:
+                verrors.add(f'{schema}.{idx}.who', f'{ace["who"]}: {ace["tag"].lower()} does not exist')
+                continue
+
+            ace['id'] = entry[0][entry_key]
+
+        if acltype is FS_ACL_Type.POSIX1E:
+            gen_aclstring_posix1e(copy.deepcopy(data['acl']), False, verrors)
+
+    @api_method(
+        AclTemplateCreateArgs,
+        AclTemplateCreateResult,
+        roles=['FILESYSTEM_ATTRS_WRITE']
+    )
     async def do_create(self, data):
         """
         Create a new filesystem ACL template.
@@ -77,7 +114,7 @@ async def do_create(self, data):
                 "filesystem_acltemplate_create.acl",
                 "At least one ACL entry must be specified."
             )
-        await self.validate_acl(data, "filesystem_acltemplate_create.acl", verrors)
+        await self.validate_acl(data, "filesystem_acltemplate_create.acl", verrors, None)
         verrors.check()
         data['builtin'] = False
 
@@ -89,13 +126,9 @@ async def do_create(self, data):
         )
         return await self.get_instance(data['id'])
 
-    @accepts(
-        Int('id'),
-        Patch(
-            'acltemplate_create',
-            'acltemplate_update',
-            ('attr', {'update': True})
-        ),
+    @api_method(
+        AclTemplateUpdateArgs,
+        AclTemplateUpdateResult,
         roles=['FILESYSTEM_ATTRS_WRITE']
     )
     async def do_update(self, id_, data):
@@ -121,7 +154,7 @@ async def do_update(self, id_, data):
                 "filesystem_acltemplate_update.acl",
                 "At least one ACL entry must be specified."
             )
-        await self.validate_acl(new, "filesystem_acltemplate_update.acl", verrors)
+        await self.validate_acl(new, "filesystem_acltemplate_update.acl", verrors, id_)
         verrors.check()
 
         await self.middleware.call(
@@ -133,7 +166,11 @@ async def do_update(self, id_, data):
         )
         return await self.get_instance(id_)
 
-    @accepts(Int('id'))
+    @api_method(
+        AclTemplateDeleteArgs,
+        AclTemplateDeleteResult,
+        roles=['FILESYSTEM_ATTRS_WRITE']
+    )
     async def do_delete(self, id_):
         entry = await self.get_instance(id_)
         if entry['builtin']:
@@ -157,7 +194,7 @@ async def append_builtins_internal(self, ids, data):
         if (bu_id != -1 and has_bu) or (ba_id != -1 and has_ba):
             return
 
-        if data['acltype'] == ACLType.NFS4.name:
+        if data['acltype'] == FS_ACL_Type.NFS4:
             if bu_id != -1:
                 data['acl'].append(
                     {"tag": "GROUP", "id": bu_id, "perms": {"BASIC": "MODIFY"}, "flags": {"BASIC": "INHERIT"}, "type": "ALLOW"},
@@ -263,22 +300,11 @@ async def resolve_names(self, uid, gid, data):
 
         return
 
-    @accepts(Dict(
-        "acltemplate_by_path",
-        Str("path", default=""),
-        Ref('query-filters'),
-        Ref('query-options'),
-        Dict(
-            "format-options",
-            Bool("canonicalize", default=False),
-            Bool("ensure_builtins", default=False),
-            Bool("resolve_names", default=False),
-        ),
-    ), roles=['FILESYSTEM_ATTRS_READ'])
-    @returns(List(
-        'templates',
-        items=[Ref('acltemplate_entry')]
-    ))
+    @api_method(
+        AclTemplateByPathArgs,
+        AclTemplateByPathResult,
+        roles=['FILESYSTEM_ATTRS_READ']
+    )
     async def by_path(self, data):
         """
         Retrieve list of available ACL templates for a given `path`.
@@ -298,10 +324,10 @@ async def by_path(self, data):
             acltype = await self.middleware.call(
                 'filesystem.path_get_acltype', data['path']
             )
-            if acltype == ACLType.DISABLED.name:
+            if acltype == FS_ACL_Type.DISABLED:
                 return []
 
-            if acltype == ACLType.POSIX1E.name and data['format-options']['canonicalize']:
+            if acltype == FS_ACL_Type.POSIX1E and data['format-options']['canonicalize']:
                 verrors.add(
                     "filesystem.acltemplate_by_path.format-options.canonicalize",
                     "POSIX1E ACLs may not be sorted into Windows canonical order."
@@ -325,8 +351,8 @@ async def by_path(self, data):
                 st = await self.middleware.run_in_thread(os.stat, data['path'])
                 await self.resolve_names(st.st_uid, st.st_gid, t)
 
-            if data['format-options']['canonicalize'] and t['acltype'] == ACLType.NFS4.name:
-                canonicalized = ACLType[t['acltype']].canonicalize(t['acl'])
+            if data['format-options']['canonicalize'] and t['acltype'] == FS_ACL_Type.NFS4:
+                canonicalized = canonicalize_nfs4_acl(t['acl'])
                 t['acl'] = canonicalized
 
         return templates
diff --git a/src/middlewared/middlewared/plugins/filesystem_/perm_check.py b/src/middlewared/middlewared/plugins/filesystem_/perm_check.py
index c85bee2d6482a..9d8f4338f01f1 100644
--- a/src/middlewared/middlewared/plugins/filesystem_/perm_check.py
+++ b/src/middlewared/middlewared/plugins/filesystem_/perm_check.py
@@ -10,7 +10,7 @@
 
 # This should be a sufficiently high UID to never be used explicitly
 # We need one for doing access checks based on groups
-SYNTHETIC_UID = 2 ** 32 -2
+SYNTHETIC_UID = 2 ** 32 - 2
 
 
 def check_access(path: str, check_perms: dict) -> bool:
@@ -32,6 +32,9 @@ def get_user_details(id_type: str, xid: int) -> dict:
     if id_type not in ['USER', 'GROUP']:
         raise CallError(f'{id_type}: invalid ID type. Must be "USER" or "GROUP"')
 
+    if not isinstance(xid, int):
+        raise TypeError(f'{type(xid)}: xid must be int.')
+
     if id_type == 'USER':
         try:
             u = pwd.getpwuid(xid)
@@ -77,6 +80,12 @@ def check_acl_execute_impl(path: str, acl: list, uid: int, gid: int, path_must_e
     """
     parts = pathlib.Path(path).parts
 
+    if not isinstance(uid, int):
+        raise TypeError(f'{type(uid)}: uid is not int')
+
+    if not isinstance(gid, int):
+        raise TypeError(f'{type(gid)}: gid is not int')
+
     for entry in acl:
         if entry['tag'] in ('everyone@', 'OTHER', 'MASK'):
             continue
diff --git a/src/middlewared/middlewared/plugins/filesystem_/utils.py b/src/middlewared/middlewared/plugins/filesystem_/utils.py
index d2cdf1e921bdc..1c7e66ad64792 100644
--- a/src/middlewared/middlewared/plugins/filesystem_/utils.py
+++ b/src/middlewared/middlewared/plugins/filesystem_/utils.py
@@ -1,174 +1,278 @@
 import enum
-
-
-class ACLType(enum.Enum):
-    NFS4 = (['tag', 'id', 'perms', 'flags', 'type'], ["owner@", "group@", "everyone@"])
-    POSIX1E = (['default', 'tag', 'id', 'perms'], ["USER_OBJ", "GROUP_OBJ", "OTHER", "MASK"])
-    DISABLED = ([], [])
-
-    def _validate_id(self, id_, special):
-        if id_ is None or id_ < 0:
-            return True if special else False
-
-        return False if special else True
-
-    def _validate_entry(self, idx, entry, errors):
-        is_special = entry['tag'] in self.value[1]
-
-        if is_special and entry.get('type') == 'DENY':
-            errors.append((
-                idx,
-                f'{entry["tag"]}: DENY entries for this principal are not permitted.',
-                'tag'
-            ))
-
-        if not self._validate_id(entry['id'], is_special):
-            errors.append(
-                (idx, 'ACL entry has invalid id for tag type.', 'id')
+import subprocess
+
+from middlewared.service_exception import CallError, ValidationErrors
+from middlewared.utils.filesystem.acl import (
+    ACL_UNDEFINED_ID,
+    FS_ACL_Type,
+    NFS4ACE_Flag,
+    NFS4ACE_FlagSimple,
+)
+
+
+class AclToolAction(enum.StrEnum):
+    CHOWN = 'chown'  # Only chown files
+    CLONE = 'clone'  # Use simplified imheritance logic
+    INHERIT = 'inherit'  # NFS41-style inheritance
+    STRIP = 'strip'  # Strip ACL from specified path
+    RESTORE = 'restore'  # restore ACL from snapshot
+
+
+def acltool(path: str, action: AclToolAction, uid: int, gid: int, options: dict) -> None:
+    """
+    This is an internal-only tool that performs certain ACL-related operations on the specified path.
+    """
+    flags = "-r"
+    flags += "x" if options.get('traverse') else ""
+    flags += "C" if options.get('do_chmod') else ""
+    flags += "P" if options.get('posixacl') else ""
+
+    acltool = subprocess.run([
+        '/usr/bin/nfs4xdr_winacl',
+        '-a', action,
+        '-O', str(uid), '-G', str(gid),
+        flags,
+        '-c', path,
+        '-p', path], check=False, capture_output=True
+    )
+    if acltool.returncode != 0:
+        raise CallError(f"acltool [{action}] on path {path} failed with error: [{acltool.stderr.decode().strip()}]")
+
+
+def __ace_is_inherited_nfs4(ace):
+    if ace['flags'].get('BASIC'):
+        return False
+
+    return ace['flags'].get(NFS4ACE_Flag.INHERITED, False)
+
+
+def canonicalize_nfs4_acl(theacl):
+    """
+    Order NFS4 ACEs according to MS guidelines:
+    1) Deny ACEs that apply to the object itself (NOINHERIT)
+    2) Allow ACEs that apply to the object itself (NOINHERIT)
+    3) Deny ACEs that apply to a subobject of the object (INHERIT)
+    4) Allow ACEs that apply to a subobject of the object (INHERIT)
+
+    See http://docs.microsoft.com/en-us/windows/desktop/secauthz/order-of-aces-in-a-dacl
+    Logic is simplified here because we do not determine depth from which ACLs are inherited.
+    """
+    out = []
+    acl_groups = {
+        "deny_noinherit": [],
+        "deny_inherit": [],
+        "allow_noinherit": [],
+        "allow_inherit": [],
+    }
+
+    for ace in theacl:
+        key = f'{ace.get("type", "ALLOW").lower()}_{"inherit" if __ace_is_inherited_nfs4(ace) else "noinherit"}'
+        acl_groups[key].append(ace)
+
+    for g in acl_groups.values():
+        out.extend(g)
+
+    return out
+
+
+def __calculate_inherited_posix1e(theacl, isdir):
+    """
+    Create a new ACL based on what a file or directory would receive if it
+    were created within a directory that had `theacl` set on it.
+    """
+    inherited = []
+    for entry in theacl['acl']:
+        if entry['default'] is False:
+            continue
+
+        # add access entry
+        inherited.append(entry.copy() | {'default': False})
+
+        if isdir:
+            # add default entry
+            inherited.append(entry)
+
+    return inherited
+
+
+def __calculate_inherited_nfs4(theacl, isdir):
+    """
+    Create a new ACL based on what a file or directory would receive if it
+    were created within a directory that had `theacl` set on it.
+    """
+    inherited = []
+    for entry in theacl['acl']:
+        if not (flags := entry.get('flags', {}).copy()):
+            continue
+
+        if (basic := flags.get('BASIC')) == NFS4ACE_FlagSimple.NOINHERIT:
+            continue
+        elif basic == NFS4ACE_FlagSimple.INHERIT:
+            flags[NFS4ACE_Flag.INHERITED] = True
+            inherited.append(entry)
+            continue
+        elif not flags.get(NFS4ACE_Flag.FILE_INHERIT, False) and not flags.get(NFS4ACE_Flag.DIRECTORY_INHERIT, False):
+            # Entry has no inherit flags
+            continue
+        elif not isdir and not flags.get(NFS4ACE_Flag.FILE_INHERIT):
+            # File and this entry doesn't inherit on files
+            continue
+
+        if isdir:
+            if not flags.get(NFS4ACE_Flag.DIRECTORY_INHERIT, False):
+                if flags[NFS4ACE_Flag.NO_PROPAGATE_INHERIT]:
+                    # doesn't apply to this dir and shouldn't apply to contents.
+                    continue
+
+                # This is a directory ACL and we have entry that only applies to files.
+                flags[NFS4ACE_Flag.INHERIT_ONLY] = True
+            elif flags.get(NFS4ACE_Flag.INHERIT_ONLY, False):
+                flags[NFS4ACE_Flag.INHERIT_ONLY] = False
+            elif flags.get(NFS4ACE_Flag.NO_PROPAGATE_INHERIT):
+                flags[NFS4ACE_Flag.DIRECTORY_INHERIT] = False
+                flags[NFS4ACE_Flag.FILE_INHERIT] = False
+                flags[NFS4ACE_Flag.NO_PROPAGATE_INHERIT] = False
+        else:
+            flags[NFS4ACE_Flag.DIRECTORY_INHERIT] = False
+            flags[NFS4ACE_Flag.FILE_INHERIT] = False
+            flags[NFS4ACE_Flag.NO_PROPAGATE_INHERIT] = False
+            flags[NFS4ACE_Flag.INHERIT_ONLY] = False
+
+        inherited.append({
+            'tag': entry['tag'],
+            'id': entry['id'],
+            'type': entry['type'],
+            'perms': entry['perms'],
+            'flags': flags | {NFS4ACE_Flag.INHERITED: True}
+        })
+
+    return inherited
+
+
+def calculate_inherited_acl(theacl, isdir=True):
+    """
+    Create a new ACL based on what a file or directory would receive if it
+    were created within a directory that had `theacl` set on it.
+
+    This is intended to be used for determining new ACL to set on a dataset
+    that is created (in certain scenarios) to meet user expectations of
+    inheritance.
+    """
+    acltype = FS_ACL_Type(theacl['acltype'])
+
+    match acltype:
+        case FS_ACL_Type.POSIX1E:
+            return __calculate_inherited_posix1e(theacl, isdir)
+
+        case FS_ACL_Type.NFS4:
+            return __calculate_inherited_nfs4(theacl, isdir)
+
+        case FS_ACL_Type.DISABLED:
+            ValueError('ACL is disabled')
+
+        case _:
+            TypeError(f'{acltype}: unknown ACL type')
+
+
+def gen_aclstring_posix1e(dacl: list, recursive: bool, verrors: ValidationErrors) -> str:
+    """
+    This method iterates through provided POSIX1e ACL and
+    performs addtional validation before returning the ACL
+    string formatted for the setfacl command. In case
+    of ValidationError, None is returned.
+    """
+    has_tag = {
+        "USER_OBJ": False,
+        "GROUP_OBJ": False,
+        "OTHER": False,
+        "MASK": False,
+        "DEF_USER_OBJ": False,
+        "DEF_GROUP_OBJ": False,
+        "DEF_OTHER": False,
+        "DEF_MASK": False,
+    }
+    required_entries = ["USER_OBJ", "GROUP_OBJ", "OTHER"]
+    has_named = False
+    has_def_named = False
+    has_default = False
+    aclstring = ""
+
+    for idx, ace in enumerate(dacl):
+        if idx != 0:
+            aclstring += ","
+
+        if ace['who'] and ace['id'] not in (None, ACL_UNDEFINED_ID):
+            verrors.add(
+                'filesystem_acl.dacl.{idx}.who',
+                f'Numeric ID {ace["id"]} and account name {ace["who"]} may not be specified simultaneously'
             )
 
-    def validate(self, theacl):
-        errors = []
-        ace_keys = self.value[0]
-
-        if self != ACLType.NFS4 and theacl.get('nfs41flags'):
-            errors.append(f"NFS41 ACL flags are not valid for ACLType [{self.name}]")
-
-        for idx, entry in enumerate(theacl['dacl']):
-            extra = set(entry.keys()) - set(ace_keys)
-            missing = set(ace_keys) - set(entry.keys())
-            if extra:
-                errors.append(
-                    (idx, f"ACL entry contains invalid extra key(s): {extra}", None)
-                )
-            if missing:
-                errors.append(
-                    (idx, f"ACL entry is missing required keys(s): {missing}", None)
-                )
-
-            if extra or missing:
-                continue
-
-            self._validate_entry(idx, entry, errors)
-
-        return {"is_valid": len(errors) == 0, "errors": errors}
-
-    def _is_inherited(self, ace):
-        if ace['flags'].get("BASIC"):
-            return False
-
-        return ace['flags'].get('INHERITED', False)
-
-    def canonicalize(self, theacl):
-        """
-        Order NFS4 ACEs according to MS guidelines:
-        1) Deny ACEs that apply to the object itself (NOINHERIT)
-        2) Allow ACEs that apply to the object itself (NOINHERIT)
-        3) Deny ACEs that apply to a subobject of the object (INHERIT)
-        4) Allow ACEs that apply to a subobject of the object (INHERIT)
-
-        See http://docs.microsoft.com/en-us/windows/desktop/secauthz/order-of-aces-in-a-dacl
-        Logic is simplified here because we do not determine depth from which ACLs are inherited.
-        """
-        if self == ACLType.POSIX1E:
-            return
-
-        out = []
-        acl_groups = {
-            "deny_noinherit": [],
-            "deny_inherit": [],
-            "allow_noinherit": [],
-            "allow_inherit": [],
-        }
-
-        for ace in theacl:
-            key = f'{ace.get("type", "ALLOW").lower()}_{"inherit" if self._is_inherited(ace) else "noinherit"}'
-            acl_groups[key].append(ace)
-
-        for g in acl_groups.values():
-            out.extend(g)
-
-        return out
-
-    def xattr_names():
-        return set([
-            "system.posix_acl_access",
-            "system.posix_acl_default",
-            "system.nfs4_acl_xdr"
-        ])
-
-    def __calculate_inherited_posix1e(self, theacl, isdir):
-        inherited = []
-        for entry in theacl['acl']:
-            if entry['default'] is False:
-                continue
-
-            # add access entry
-            inherited.append(entry.copy() | {'default': False})
-
-            if isdir:
-                # add default entry
-                inherited.append(entry)
-
-        return inherited
-
-    def __calculate_inherited_nfs4(self, theacl, isdir):
-        inherited = []
-        for entry in theacl['acl']:
-            if not (flags := entry.get('flags', {}).copy()):
-                continue
-
-            if (basic := flags.get('BASIC')) == 'NOINHERIT':
-                continue
-            elif basic == 'INHERIT':
-                flags['INHERITED'] = True
-                inherited.append(entry)
-                continue
-            elif not flags.get('FILE_INHERIT', False) and not flags.get('DIRECTORY_INHERIT', False):
-                # Entry has no inherit flags
-                continue
-            elif not isdir and not flags.get('FILE_INHERIT'):
-                # File and this entry doesn't inherit on files
-                continue
-
-            if isdir:
-                if not flags.get('DIRECTORY_INHERIT', False):
-                    if flags['NO_PROPAGATE_INHERIT']:
-                        # doesn't apply to this dir and shouldn't apply to contents.
-                        continue
-
-                    # This is a directory ACL and we have entry that only applies to files.
-                    flags['INHERIT_ONLY'] = True
-                elif flags.get('INHERIT_ONLY', False):
-                    flags['INHERIT_ONLY'] = False
-                elif flags.get('NO_PROPAGATE_INHERIT'):
-                    flags['DIRECTORY_INHERIT'] = False
-                    flags['FILE_INHERIT'] = False
-                    flags['NO_PROPAGATE_INHERIT'] = False
-            else:
-                flags['DIRECTORY_INHERIT'] = False
-                flags['FILE_INHERIT'] = False
-                flags['NO_PROPAGATE_INHERIT'] = False
-                flags['INHERIT_ONLY'] = False
+        if ace['id'] == ACL_UNDEFINED_ID:
+            ace['id'] = ''
 
-            inherited.append({
-                'tag': entry['tag'],
-                'id': entry['id'],
-                'type': entry['type'],
-                'perms': entry['perms'],
-                'flags': flags | {'INHERITED': True}
-            })
+        who = "DEF_" if ace['default'] else ""
+        who += ace['tag']
+        duplicate_who = has_tag.get(who)
 
-        return inherited
+        if duplicate_who is True:
+            verrors.add(
+                'filesystem_acl.dacl.{idx}',
+                f'More than one {"default" if ace["default"] else ""} '
+                f'{ace["tag"]} entry is not permitted'
+            )
 
-    def calculate_inherited(self, theacl, isdir=True):
-        if self.name != theacl['acltype']:
-            raise ValueError('ACLType does not match')
+        elif duplicate_who is False:
+            has_tag[who] = True
 
-        if self == ACLType.POSIX1E:
-            return self.__calculate_inherited_posix1e(theacl, isdir)
+        if ace['tag'] in ["USER", "GROUP"]:
+            if ace['default']:
+                has_def_named = True
+            else:
+                has_named = True
+
+        ace['tag'] = ace['tag'].rstrip('_OBJ').lower()
+
+        if ace['default']:
+            has_default = True
+            aclstring += "default:"
+
+        aclstring += f"{ace['tag']}:{ace['id']}:"
+        aclstring += 'r' if ace['perms']['READ'] else '-'
+        aclstring += 'w' if ace['perms']['WRITE'] else '-'
+        aclstring += 'x' if ace['perms']['EXECUTE'] else '-'
+
+    if has_named and not has_tag['MASK']:
+        verrors.add(
+            'filesystem_acl.dacl',
+            'Named (user or group) POSIX ACL entries '
+            'require a mask entry to be present in the ACL.'
+        )
+
+    elif has_def_named and not has_tag['DEF_MASK']:
+        verrors.add(
+            'filesystem_acl.dacl',
+            'Named default (user or group) POSIX ACL entries '
+            'require a default mask entry to be present in the ACL.'
+        )
+
+    if recursive and not has_default:
+        verrors.add(
+            'filesystem_acl.dacl',
+            'Default ACL entries are required in order to apply '
+            'ACL recursively.'
+        )
+
+    for entry in required_entries:
+        if not has_tag[entry]:
+            verrors.add(
+                'filesystem_acl.dacl',
+                f'Presence of [{entry}] entry is required.'
+            )
 
-        elif self == ACLType.NFS4:
-            return self.__calculate_inherited_nfs4(theacl, isdir)
+        if has_default and not has_tag[f"DEF_{entry}"]:
+            verrors.add(
+                'filesystem_acl.dacl',
+                f'Presence of default [{entry}] entry is required.'
+            )
 
-        raise ValueError('ACLType does not support inheritance')
+    return aclstring
diff --git a/src/middlewared/middlewared/pytest/unit/plugins/test_acl_inherit.py b/src/middlewared/middlewared/pytest/unit/plugins/test_acl_inherit.py
index a241534c4ea25..15d50fd2c139d 100644
--- a/src/middlewared/middlewared/pytest/unit/plugins/test_acl_inherit.py
+++ b/src/middlewared/middlewared/pytest/unit/plugins/test_acl_inherit.py
@@ -1,14 +1,12 @@
-import pytest
-
 from copy import deepcopy
-from middlewared.plugins.filesystem_.utils import ACLType
+from middlewared.plugins.filesystem_.utils import calculate_inherited_acl
 
 
 NFS4_ACL = {'acl': [
     {
         'tag': 'GROUP',
         'id': 100,
-        'perms': { 'BASIC': 'FULL_CONTROL' },
+        'perms': {'BASIC': 'FULL_CONTROL'},
         'flags': {
             'FILE_INHERIT': True,
             'DIRECTORY_INHERIT': True,
@@ -21,7 +19,7 @@
     {
         'tag': 'GROUP',
         'id': 200,
-        'perms': { 'BASIC': 'MODIFY' },
+        'perms': {'BASIC': 'MODIFY'},
         'flags': {
             'FILE_INHERIT': False,
             'DIRECTORY_INHERIT': True,
@@ -34,7 +32,7 @@
     {
         'tag': 'GROUP',
         'id': 300,
-        'perms': { 'BASIC': 'MODIFY' },
+        'perms': {'BASIC': 'MODIFY'},
         'flags': {
             'FILE_INHERIT': True,
             'DIRECTORY_INHERIT': False,
@@ -47,7 +45,7 @@
     {
         'tag': 'GROUP',
         'id': 400,
-        'perms': { 'BASIC': 'MODIFY' },
+        'perms': {'BASIC': 'MODIFY'},
         'flags': {
             'FILE_INHERIT': True,
             'DIRECTORY_INHERIT': True,
@@ -60,7 +58,7 @@
     {
         'tag': 'GROUP',
         'id': 500,
-        'perms': { 'BASIC': 'MODIFY' },
+        'perms': {'BASIC': 'MODIFY'},
         'flags': {
             'FILE_INHERIT': False,
             'DIRECTORY_INHERIT': False,
@@ -73,7 +71,7 @@
     {
         'tag': 'GROUP',
         'id': 600,
-        'perms': { 'BASIC': 'MODIFY' },
+        'perms': {'BASIC': 'MODIFY'},
         'flags': {
             'FILE_INHERIT': True,
             'DIRECTORY_INHERIT': True,
@@ -87,8 +85,8 @@
 
 
 def test__nfs4_acl_inheritance():
-    dir_inherited = ACLType.NFS4.calculate_inherited(deepcopy(NFS4_ACL), True)
-    file_inherited = ACLType.NFS4.calculate_inherited(deepcopy(NFS4_ACL), False)
+    dir_inherited = calculate_inherited_acl(deepcopy(NFS4_ACL), True)
+    file_inherited = calculate_inherited_acl(deepcopy(NFS4_ACL), False)
 
     for entry in dir_inherited:
         match entry['id']:
diff --git a/src/middlewared/middlewared/utils/filesystem/acl.py b/src/middlewared/middlewared/utils/filesystem/acl.py
index f43def7561fa7..e19481659a322 100644
--- a/src/middlewared/middlewared/utils/filesystem/acl.py
+++ b/src/middlewared/middlewared/utils/filesystem/acl.py
@@ -1,16 +1,22 @@
 import enum
+import errno
+import os
 
+from middlewared.service_exception import ValidationErrors
 
-class ACLXattr(enum.Enum):
+ACL_UNDEFINED_ID = -1
+
+
+class ACLXattr(enum.StrEnum):
     POSIX_ACCESS = "system.posix_acl_access"
     POSIX_DEFAULT = "system.posix_acl_default"
     ZFS_NATIVE = "system.nfs4_acl_xdr"
 
 
-ACL_XATTRS = set([xat.value for xat in ACLXattr])
+ACL_XATTRS = frozenset([xat.value for xat in ACLXattr])
 
 # ACCESS_ACL_XATTRS is set of ACLs that control access to the file itself.
-ACCESS_ACL_XATTRS = set([ACLXattr.POSIX_ACCESS.value, ACLXattr.ZFS_NATIVE.value])
+ACCESS_ACL_XATTRS = frozenset([ACLXattr.POSIX_ACCESS.value, ACLXattr.ZFS_NATIVE.value])
 
 
 def acl_is_present(xat_list: list) -> bool:
@@ -21,3 +27,186 @@ def acl_is_present(xat_list: list) -> bool:
     authoritative.
     """
     return bool(set(xat_list) & ACL_XATTRS)
+
+
+class FS_ACL_Type(enum.StrEnum):
+    NFS4 = 'NFS4'
+    POSIX1E = 'POSIX1E'
+    DISABLED = 'DISABLED'
+
+
+class NFS4ACE_Tag(enum.StrEnum):
+    # See RFC-5661 Section 6.2.1.5
+    # https://datatracker.ietf.org/doc/html/rfc5661#section-6.2.1.5
+    #
+    # Combination of NFS4ACE_Tag and id create the ACE Who field
+
+    # Special identifiers
+    SPECIAL_OWNER = 'owner@'  # file owner
+    SPECIAL_GROUP = 'group@'  # file group
+    SPECIAL_EVERYONE = 'everyone@'  # world (including owner and group)
+
+    # Identifiers for regular user / group entries
+    USER = 'USER'
+    GROUP = 'GROUP'
+
+
+class NFS4ACE_Type(enum.StrEnum):
+    # See RFC-5661 Section 6.2.1.1
+    # https://datatracker.ietf.org/doc/html/rfc5661#section-6.2.1.1
+    ALLOW = 'ALLOW'
+    DENY = 'DENY'
+
+
+class NFS4ACE_Mask(enum.StrEnum):
+    # See RFC-5661 Section 6.2.1.3.1
+    # https://datatracker.ietf.org/doc/html/rfc5661#section-6.2.1.3.1
+    READ_DATA = 'READ_DATA'
+    WRITE_DATA = 'WRITE_DATA'
+    APPEND_DATA = 'APPEND_DATA'
+    READ_NAMED_ATTRS = 'READ_NAMED_ATTRS'
+    WRITE_NAMED_ATTRS = 'WRITE_NAMED_ATTRS'
+    EXECUTE = 'EXECUTE'
+    DELETE = 'DELETE'
+    DELETE_CHILD = 'DELETE_CHILD'
+    READ_ATTRIBUTES = 'READ_ATTRIBUTES'
+    WRITE_ATTRIBUTES = 'WRITE_ATTRIBUTES'
+    READ_ACL = 'READ_ACL'
+    WRITE_ACL = 'WRITE_ACL'
+    WRITE_OWNER = 'WRITE_OWNER'
+    SYNCHRONIZE = 'SYNCHRONIZE'
+
+
+class NFS4ACE_MaskSimple(enum.StrEnum):
+    # These are convenience access masks that are a combination of multiple
+    # permissions defined in NFS4ACE_Mask above
+    FULL_CONTROL = 'FULL_CONTROL'  # all perms above
+    MODIFY = 'MODIFY'  # all perms except WRITE_ACL and WRITE_OWNER
+    READ = 'READ'  # READ | READ_NAMED_ATTRS | READ_ATTRIBUTES | EXECUTE
+    TRAVERSE = 'TRAVERSE'  # READ_NAMED_ATTRS | READ_ATTRIBUTES | EXECUTE
+
+
+class NFS4ACE_Flag(enum.StrEnum):
+    # See RFC-5661 Section 6.2.1.4.1
+    # https://datatracker.ietf.org/doc/html/rfc5661#section-6.2.1.4.1
+    FILE_INHERIT = 'FILE_INHERIT'
+    DIRECTORY_INHERIT = 'DIRECTORY_INHERIT'
+    NO_PROPAGATE_INHERIT = 'NO_PROPAGATE_INHERIT'
+    INHERIT_ONLY = 'INHERIT_ONLY'
+    INHERITED = 'INHERITED'
+
+
+class NFS4ACE_FlagSimple(enum.StrEnum):
+    # These are convenience access masks that are a combination of multiple
+    # permissions defined in NFS4ACE_Mask above
+    INHERIT = 'INHERIT'  # FILE_INHERIT | DIRECTORY_INHERIT
+    NOINHERIT = 'NOINHERIT'  # ace flags = 0
+
+
+class NFS4ACL_Flag(enum.StrEnum):
+    # See RFC-5661 Section 6.4.3.2
+    # https://datatracker.ietf.org/doc/html/rfc5661#section-6.4.3.2
+    AUTOINHERIT = 'autoinherit'
+    PROTECTED = 'protected'
+    DEFAULTED = 'defaulted'
+
+
+class POSIXACE_Tag(enum.StrEnum):
+    # UGO entries
+    USER_OBJ = 'USER_OBJ'  # file owner
+    GROUP_OBJ = 'GROUP_OBJ'  # file group
+    OTHER = 'OTHER'  # other
+
+    MASK = 'MASK'  # defines maximum permissions granted to extended entries
+
+    # Identifiers for regular user / group entries
+    USER = 'USER'
+    GROUP = 'GROUP'
+
+
+class POSIXACE_Mask(enum.StrEnum):
+    READ = 'READ'
+    WRITE = 'WRITE'
+    EXECUTE = 'EXECUTE'
+
+
+NFS4_SPECIAL_ENTRIES = frozenset([
+    NFS4ACE_Tag.SPECIAL_OWNER,
+    NFS4ACE_Tag.SPECIAL_GROUP,
+    NFS4ACE_Tag.SPECIAL_EVERYONE,
+])
+
+POSIX_SPECIAL_ENTRIES = frozenset([
+    POSIXACE_Tag.USER_OBJ,
+    POSIXACE_Tag.GROUP_OBJ,
+    POSIXACE_Tag.OTHER,
+    POSIXACE_Tag.MASK,
+])
+
+
+def validate_nfs4_ace_full(ace_in: dict, schema_prefix: str, verrors: ValidationErrors) -> None:
+    """
+    This is further validation that occurs in filesystem.setacl. By this point
+    ACE should have already passed through `validate_nfs4_ace_model` above.
+    """
+    if not isinstance(ace_in, dict):
+        raise TypeError(f'{type(ace_in)}: expected dict')
+
+    if ace_in['tag'] in NFS4_SPECIAL_ENTRIES:
+        if ace_in['type'] == NFS4ACE_Type.DENY:
+            tag = ace_in['tag']
+            verrors.add(
+                f'{schema_prefix}.tag',
+                f'{tag}: DENY entries for specified tag are not permitted.'
+            )
+    else:
+        ace_id = ace_in.get('id', ACL_UNDEFINED_ID)
+        ace_who = ace_in.get('who')
+
+        if ace_id != ACL_UNDEFINED_ID and ace_who:
+            verrors.add(
+                f'{schema_prefix}.who',
+                f'Numeric ID {ace_id} and account name {ace_who} may not be specified simultaneously'
+            )
+
+
+def path_get_acltype(path: str) -> FS_ACL_Type:
+    try:
+        # ACCESS ACL is sufficient to determine POSIX ACL support
+        os.getxattr(path, ACLXattr.POSIX_ACCESS)
+        return FS_ACL_Type.POSIX1E
+
+    except OSError as e:
+        if e.errno == errno.ENODATA:
+            # No ACL set, but zfs acltype is set to POSIX
+            return FS_ACL_Type.POSIX1E
+
+        # EOPNOTSUPP means that ZFS acltype is not set to POSIX
+        if e.errno != errno.EOPNOTSUPP:
+            raise
+
+    try:
+        os.getxattr(path, ACLXattr.ZFS_NATIVE)
+        return FS_ACL_Type.NFS4
+    except OSError as e:
+        # ZFS acltype is not set to NFS4 which means it's disabled
+        if e.errno == errno.EOPNOTSUPP:
+            return FS_ACL_Type.DISABLED
+
+        raise
+
+
+def normalize_acl_ids(setacl_data: dict) -> None:
+    for key in ('uid', 'gid'):
+        if setacl_data[key] is None:
+            setacl_data[key] = ACL_UNDEFINED_ID
+
+    for ace in setacl_data['dacl']:
+        if ace['id'] is None:
+            ace['id'] = ACL_UNDEFINED_ID
+
+
+def strip_acl_path(path: str) -> None:
+    for xat in os.listxattr(path):
+        if xat in ACL_XATTRS:
+            os.removexattr(path, xat)
diff --git a/tests/api2/test_300_nfs.py b/tests/api2/test_300_nfs.py
index b49b3aed3ee41..27ecd61b1bea2 100644
--- a/tests/api2/test_300_nfs.py
+++ b/tests/api2/test_300_nfs.py
@@ -1689,7 +1689,7 @@ def test_nfsv4_acl_support(self, start_nfs):
                                     flag_is_set = acl_flag
 
                                 # Now ensure that only the expected flag is set
-                                nfs41_flags = result['nfs41_flags']
+                                nfs41_flags = result['aclflags']
                                 for flag in ['autoinherit', 'protected', 'defaulted']:
                                     if flag == flag_is_set:
                                         assert nfs41_flags[flag], nfs41_flags
diff --git a/tests/api2/test_344_acl_templates.py b/tests/api2/test_344_acl_templates.py
index 798c8937bdaba..1832fa45920b8 100644
--- a/tests/api2/test_344_acl_templates.py
+++ b/tests/api2/test_344_acl_templates.py
@@ -1,174 +1,127 @@
 #!/usr/bin/env python3
 
-import pytest
-import sys
 import os
-from pytest_dependency import depends
-apifolder = os.getcwd()
-sys.path.append(apifolder)
-from functions import POST, GET, PUT, DELETE
-from auto_config import pool_name
+import pytest
+from contextlib import contextmanager
+from middlewared.test.integration.utils import call
+from middlewared.test.integration.assets.pool import dataset as make_dataset
 
 
-@pytest.mark.dependency(name="ACLTEMPLATE_DATASETS_CREATED")
-@pytest.mark.parametrize('acltype', ['NFSV4', 'POSIX'])
-def test_01_create_test_datasets(request, acltype):
+@pytest.fixture(scope='module')
+def acltemplate_ds():
     """
     Setup of datasets for testing templates.
     This test shouldn't fail unless pool.dataset endpoint is
     thoroughly broken.
     """
-    result = POST(
-        '/pool/dataset/', {
-            'name': f'{pool_name}/acltemplate_{acltype.lower()}',
-            'acltype': acltype,
-            'aclmode': 'DISCARD' if acltype == 'POSIX' else 'PASSTHROUGH'
-        }
-    )
+    with make_dataset('acltemplate_posix', data={
+        'acltype': 'POSIX',
+        'aclmode': 'DISCARD'
+    }) as posix_ds:
+        with make_dataset('acltemplate_nfsv4', data={
+            'acltype': 'NFSV4',
+            'aclmode': 'PASSTHROUGH'
+        }) as nfsv4_ds:
+            yield {'POSIX': posix_ds, 'NFSV4': nfsv4_ds}
+
+
+@contextmanager
+def create_entry_type(acltype):
+    entry = call('filesystem.acltemplate.query', [['name', '=', f'{acltype}_RESTRICTED']], {'get': True})
+    acl = entry['acl']
+
+    payload = {
+        'name': f'{acltype}_TEST',
+        'acl': acl,
+        'acltype': entry['acltype']
+    }
+
+    template = call('filesystem.acltemplate.create', payload)
+
+    try:
+        yield template
+    finally:
+        call('filesystem.acltemplate.delete', template['id'])
+
+    # Verify actually deleted
+    assert call('filesystem.acltemplate.query', [['name', '=', f'{acltype}_TEST']]) == []
 
-    assert result.status_code == 200, result.text
+
+@pytest.fixture(scope='function')
+def tmp_posix_entry():
+    with create_entry_type('POSIX') as entry:
+        yield entry
+
+
+@pytest.fixture(scope='function')
+def tmp_nfs_entry():
+    with create_entry_type('NFS4') as entry:
+        yield entry
+
+
+@pytest.fixture(scope='function')
+def tmp_acltemplates(tmp_posix_entry, tmp_nfs_entry):
+    yield {'POSIX': tmp_posix_entry, 'NFSV4': tmp_nfs_entry}
+
+
+def dataset_path(data, acltype):
+    return os.path.join('/mnt', data[acltype])
 
 
 @pytest.mark.parametrize('acltype', ['NFSV4', 'POSIX'])
-def test_02_check_builtin_types_by_path(request, acltype):
+def test_check_builtin_types_by_path(acltemplate_ds, acltype):
     """
     This test verifies that we can query builtins by paths, and
     that the acltype of the builtins matches that of the
     underlying path.
     """
-    depends(request, ["ACLTEMPLATE_DATASETS_CREATED"], scope="session")
     expected_acltype = 'POSIX1E' if acltype == 'POSIX' else 'NFS4'
-    payload = {
-        'path': f'/mnt/{pool_name}/acltemplate_{acltype.lower()}',
-    }
-    results = POST('/filesystem/acltemplate/by_path', payload)
-    assert results.status_code == 200, results.text
-    for entry in results.json():
-        assert entry['builtin'], results.text
-        assert entry['acltype'] == expected_acltype, results.text
-
-    payload['format-options'] = {
-        'resolve_names': True,
-        'ensure_builtins': True
-    }
+    payload = {'path': dataset_path(acltemplate_ds, acltype)}
+    for entry in call('filesystem.acltemplate.by_path', payload):
+        assert entry['builtin'], str(entry)
+        assert entry['acltype'] == expected_acltype, str(entry)
 
-    results = POST('/filesystem/acltemplate/by_path', payload)
-    assert results.status_code == 200, results.text
-    for entry in results.json():
+    payload['format-options'] = {'resolve_names': True, 'ensure_builtins': True}
+    for entry in call('filesystem.acltemplate.by_path', payload):
         for ace in entry['acl']:
             if ace['tag'] not in ('USER_OBJ', 'GROUP_OBJ', 'USER', 'GROUP'):
                 continue
 
-            assert ace.get('who') is not None, results.text
+            assert ace.get('who') is not None, str(ace)
 
 
-@pytest.mark.dependency(name="NEW_ACLTEMPLATES_CREATED")
-@pytest.mark.parametrize('acltype', ['NFS4', 'POSIX'])
-def test_03_create_new_template(request, acltype):
-    """
-    This method queries an existing builtin and creates a
-    new acltemplate based on the data. Test of new ACL template
-    insertion.
-    """
-    depends(request, ["ACLTEMPLATE_DATASETS_CREATED"], scope="session")
-    results = GET(
-        '/filesystem/acltemplate', payload={
-            'query-filters': [['name', '=', f'{acltype}_RESTRICTED']],
-            'query-options': {'get': True},
-        }
-    )
-    assert results.status_code == 200, results.text
-
-    acl = results.json()['acl']
-    for entry in acl:
-        if entry['id'] is None:
-            entry['id'] = -1
-
-    payload = {
-        'name': f'{acltype}_TEST',
-        'acl': acl,
-        'acltype': results.json()['acltype']
-    }
-
-    results = POST('/filesystem/acltemplate', payload)
-    assert results.status_code == 200, results.text
- 
-
-@pytest.mark.dependency(name="NEW_ACLTEMPLATES_UPDATED")
-@pytest.mark.parametrize('acltype', ['NFS4', 'POSIX'])
-def test_09_update_new_template(request, acltype):
+@pytest.mark.parametrize('acltype', ['NFSV4', 'POSIX'])
+def test_update_new_template(tmp_acltemplates, acltype):
     """
     Rename the template we created to validated that `update`
     method works.
     """
-    depends(request, ["NEW_ACLTEMPLATES_CREATED"], scope="session")
-    results = GET(
-        '/filesystem/acltemplate', payload={
-            'query-filters': [['name', '=', f'{acltype}_TEST']],
-            'query-options': {'get': True},
-        }
-    )
-
-    assert results.status_code == 200, results.text
-
-    payload = results.json()
-    id = payload.pop('id')
-    payload.pop('builtin')
-    payload['name'] = f'{payload["name"]}2'
-
-    results = PUT(f'/filesystem/acltemplate/id/{id}/', payload)
-    assert results.status_code == 200, results.text
-
-
-@pytest.mark.parametrize('acltype', ['NFS4', 'POSIX'])
-def test_10_delete_new_template(request, acltype):
-    depends(request, ["NEW_ACLTEMPLATES_UPDATED"], scope="session")
-    results = GET(
-        '/filesystem/acltemplate', payload={
-            'query-filters': [['name', '=', f'{acltype}_TEST2']],
-            'query-options': {'get': True},
-        }
-    )
-    assert results.status_code == 200, results.text
-
-    results = DELETE(f'/filesystem/acltemplate/id/{results.json()["id"]}')
-    assert results.status_code == 200, results.text
-
-
-def test_40_knownfail_builtin_delete(request):
-    results = GET(
-        '/filesystem/acltemplate', payload={
-            'query-filters': [['builtin', '=', True]],
-            'query-options': {'get': True},
-        }
-    )
-    assert results.status_code == 200, results.text
-    id = results.json()['id']
-
-    results = DELETE(f'/filesystem/acltemplate/id/{id}')
-    assert results.status_code == 422, results.text
-
-
-def test_41_knownfail_builtin_update(request):
-    results = GET(
-        '/filesystem/acltemplate', payload={
-            'query-filters': [['builtin', '=', True]],
-            'query-options': {'get': True},
-        }
-    )
-    assert results.status_code == 200, results.text
-    payload = results.json()
-    id = payload.pop('id')
+    # shallow copy is sufficient since we're not changing nested values
+    payload = tmp_acltemplates[acltype].copy()
+
+    template_id = payload.pop('id')
     payload.pop('builtin')
-    payload['name'] = 'CANARY'
+    orig_name = payload.pop('name')
 
-    results = PUT(f'/filesystem/acltemplate/id/{id}/', payload)
-    assert results.status_code == 422, results.text
+    payload['name'] = f'{orig_name}2'
 
+    result = call('filesystem.acltemplate.update', template_id, payload)
+    assert result['name'] == payload['name']
 
-@pytest.mark.parametrize('acltype', ['NFSV4', 'POSIX'])
-def test_50_delete_test1_dataset(request, acltype):
-    depends(request, ["ACLTEMPLATE_DATASETS_CREATED"], scope="session")
-    dataset_name = f'{pool_name}/acltemplate_{acltype.lower()}'
-    results = DELETE(f'/pool/dataset/id/{dataset_name.replace("/", "%2F")}/')
-    assert results.status_code == 200, results.text
+
+def test_knownfail_builtin_delete(request):
+    builtin_templ = call('filesystem.acltemplate.query', [['builtin', '=', True]], {'get': True})
+
+    with pytest.raises(Exception):
+        call('filesystem.acltemplate.delete', builtin_templ['id'])
+
+
+def test_knownfail_builtin_update(request):
+    payload = call('filesystem.acltemplate.query', [['builtin', '=', True]], {'get': True})
+
+    tmpl_id = payload.pop('id')
+    payload.pop('builtin')
+    payload['name'] = 'CANARY'
+
+    with pytest.raises(Exception):
+        call('filesystem.acltemplate.update', tmpl_id, payload)
diff --git a/tests/api2/test_acl_by_who.py b/tests/api2/test_acl_by_who.py
new file mode 100644
index 0000000000000..04dd2dca9fff1
--- /dev/null
+++ b/tests/api2/test_acl_by_who.py
@@ -0,0 +1,98 @@
+from copy import deepcopy
+import os
+import pytest
+
+from middlewared.test.integration.assets.pool import dataset
+from middlewared.test.integration.utils import call
+from truenas_api_client import ValidationErrors as ClientValidationErrors
+
+permset_posix_full = {"READ": True, "WRITE": True, "EXECUTE": True}
+permset_nfsv4_full = {"BASIC": "FULL_CONTROL"}
+flagset_nfsv4_inherit = {"BASIC": "INHERIT"}
+
+
+@pytest.fixture(scope='module')
+def posix_acl_dataset():
+    with dataset('posix') as ds:
+        yield ds
+
+
+@pytest.fixture(scope='module')
+def nfsv4_acl_dataset():
+    with dataset('nfs4', data={'share_type': 'SMB'}) as ds:
+        yield ds
+
+
+def test__posix_by_who(posix_acl_dataset):
+    target = os.path.join('/mnt', posix_acl_dataset)
+    the_acl = call('filesystem.getacl', target)['acl']
+    the_acl.extend([
+        {'tag': 'MASK', 'id': -1, 'perms': permset_posix_full, 'default': False},
+        {'tag': 'USER', 'who': 'root', 'perms': permset_posix_full, 'default': False},
+        {'tag': 'GROUP', 'who': 'root', 'perms': permset_posix_full, 'default': False},
+    ])
+
+    call('filesystem.setacl', {'path': target, 'dacl': the_acl}, job=True)
+
+    new_acl = call('filesystem.getacl', target)['acl']
+    saw_user = False
+    saw_group = False
+    for entry in new_acl:
+        if entry['tag'] == 'USER':
+            assert entry['id'] == 0
+            assert entry['perms'] == permset_posix_full
+            saw_user = True
+        elif entry['tag'] == 'GROUP':
+            assert entry['id'] == 0
+            assert entry['perms'] == permset_posix_full
+            saw_group = True
+
+    assert saw_user, str(new_acl)
+    assert saw_group, str(new_acl)
+
+
+def test__nfsv4_by_who(nfsv4_acl_dataset):
+    target = os.path.join('/mnt', nfsv4_acl_dataset)
+    the_acl = call('filesystem.getacl', target)['acl']
+    the_acl.extend([
+        {'tag': 'USER', 'who': 'root', 'perms': permset_nfsv4_full, 'flags': flagset_nfsv4_inherit, 'type': 'ALLOW'},
+        {'tag': 'GROUP', 'who': 'root', 'perms': permset_nfsv4_full, 'flags': flagset_nfsv4_inherit, 'type': 'ALLOW'},
+    ])
+
+    call('filesystem.setacl', {'path': target, 'dacl': the_acl}, job=True)
+
+    new_acl = call('filesystem.getacl', target)['acl']
+    saw_user = False
+    saw_group = False
+    for entry in new_acl:
+        if entry['tag'] == 'USER':
+            assert entry['id'] == 0
+            assert entry['perms'] == permset_nfsv4_full
+            saw_user = True
+        elif entry['tag'] == 'GROUP' and entry['id'] == 0:
+            assert entry['perms'] == permset_nfsv4_full
+            saw_group = True
+
+    assert saw_user, str(new_acl)
+    assert saw_group, str(new_acl)
+
+
+def test__acl_validation_errors_posix(posix_acl_dataset):
+    target = os.path.join('/mnt', posix_acl_dataset)
+    the_acl = call('filesystem.getacl', target)['acl']
+
+    new_acl = deepcopy(the_acl)
+    new_acl.extend([
+        {'tag': 'USER', 'perms': permset_posix_full, 'default': False},
+    ])
+
+    with pytest.raises(ClientValidationErrors):
+        call('filesystem.setacl', {'path': target, 'dacl': new_acl}, job=True)
+
+    new_acl = deepcopy(the_acl)
+    new_acl.extend([
+        {'tag': 'USER', 'perms': permset_posix_full, 'default': False, 'who': 'root', 'id': 0},
+    ])
+
+    with pytest.raises(ClientValidationErrors):
+        call('filesystem.setacl', {'path': target, 'dacl': new_acl}, job=True)