HBase 协处理器以 Google BigTable 的协处理器实现为模型( http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf 第 41-42 页。)。
协处理器框架提供了直接在管理数据的 RegionServers 上运行自定义代码的机制。正在努力弥合 HBase 实施与 BigTable 架构之间的差距。有关更多信息,请参阅 HBASE-4047 。
本章中的信息主要来源于以下资源并大量重用:
-
赖明杰的博文协处理器介绍。
-
Gaurav Bhardwaj 的博客文章 HBase 协处理器的方法。
使用协处理器是您自己的风险
协处理器是 HBase 的高级功能,仅供系统开发人员使用。由于协处理器代码直接在 RegionServer 上运行并且可以直接访问您的数据,因此会带来数据损坏,中间人攻击或其他恶意数据访问的风险。目前,虽然 HBASE-4047 正在开展工作,但没有机制可以防止协处理器的数据损坏。
- 此外,没有资源隔离,因此善意但行为不当的协处理器会严重降低集群性能和稳定性。
在 HBase 中,使用Get
或Scan
获取数据,而在 RDBMS 中使用 SQL 查询。为了仅获取相关数据,使用 HBase 过滤器进行过滤,而在 RDBMS 中使用WHERE
谓词。
获取数据后,您可以对其执行计算。这种范例适用于具有几千行和多列的“小数据”。但是,当您扩展到数十亿行和数百万列时,在网络中移动大量数据将在网络层产生瓶颈,客户端需要足够强大并且有足够的内存来处理大量数据和计算。此外,客户端代码可能变得庞大而复杂。
在这种情况下,协处理器可能有意义。您可以将业务计算代码放入在 RegionServer 上运行的协处理器中,与数据位于同一位置,并将结果返回给客户端。
这只是使用协处理器可以带来好处的一种情况。以下是一些类比,可能有助于解释协处理器的一些好处。
触发器和存储过程
Observer 协处理器类似于 RDBMS 中的触发器,因为它在特定事件(例如Get
或Put
)发生之前或之后执行代码。端点协处理器类似于 RDBMS 中的存储过程,因为它允许您对 RegionServer 本身而不是客户端上的数据执行自定义计算。
MapReduce 的
MapReduce 的工作原理是将计算移动到数据的位置。协处理器在相同的主体上运行。
AOP
如果您熟悉面向方面编程(AOP),您可以将协处理器视为通过拦截请求然后运行一些自定义代码来应用建议,然后将请求传递到其最终目标(甚至更改目标)。
-
你的类应该实现一个协处理器接口 - 协处理器, RegionObserver , CoprocessorService - 仅举几例。
-
使用 HBase Shell 静态(从配置)或动态加载协处理器。有关详细信息,请参阅加载协处理器。
-
从客户端代码调用协处理器。 HBase 透明地处理协处理器。
框架 API 在协处理器包中提供。
在特定事件发生之前或之后触发观察者协处理器。在事件之前发生的观察者使用以pre
前缀开头的方法,例如 prePut
。在事件之后发生的观察者会覆盖以post
前缀开头的方法,例如 postPut
。
安全
在执行Get
或Put
操作之前,您可以使用preGet
或prePut
方法检查权限。
参照完整性
HBase 不直接支持 RDBMS 的反射完整性概念,也称为外键。您可以使用协处理器来强制执行此类完整性。例如,如果您有一个业务规则,users
表的每个插入必须后跟user_daily_attendance
表中的相应条目,您可以实现协处理器以使用user
上的prePut
方法插入记录到user_daily_attendance
。
二级索引
您可以使用协处理器来维护二级索引。有关更多信息,请参阅 SecondaryIndexing 。
RegionObserver
RegionObserver 协处理器允许您观察区域上的事件,例如Get
和Put
操作。见 RegionObserver 。
RegionServerObserver
RegionServerObserver 允许您观察与 RegionServer 操作相关的事件,例如启动,停止或执行合并,提交或回滚。请参见 RegionServerObserver 。
MasterObserver
MasterObserver 允许您观察与 HBase Master 相关的事件,例如表创建,删除或架构修改。见 MasterObserver 。
WalObserver
WalObserver 允许您观察与写入预写日志(WAL)相关的事件。见 WALObserver 。
示例提供了观察者协处理器的工作示例。
端点处理器允许您在数据位置执行计算。参见协处理器类比。一个例子是需要计算跨越数百个区域的整个表的运行平均值或总和。
与您的代码透明运行的观察者协处理器相比,必须使用表或 HTable 中提供的 CoprocessorService()方法显式调用端点协处理器。
从 HBase 0.96 开始,端点协处理器使用 Google Protocol Buffers(protobuf)实现。有关 protobuf 的更多详细信息,请参阅 Google 的协议缓冲指南。端点以 0.94 版本编写的协处理器与 0.96 或更高版本不兼容。参见 HBASE-5448 )。要将 HBase 群集从 0.94 或更早版本升级到 0.96 或更高版本,您需要重新实现协处理器。
协处理器端点不应使用 HBase 内部,只能使用公共 API;理想情况下,CPEP 应仅依赖于接口和数据结构。这并不总是可行,但要注意这样做会使端点变脆,随着 HBase 内部发展而易于破损。注释为私有或演进的 HBase 内部 API 在删除之前不必遵守语义版本控制规则或关于弃用的一般 Java 规则。虽然生成的 protobuf 文件没有 hbase 受众注释 - 它们是由 protobuf protoc 工具创建的,它不知道 HBase 是如何工作的 - 它们应该被考虑@InterfaceAudience.Private
因此容易改变。
示例提供了端点协处理器的工作示例。
要使协处理器可用于 HBase,必须静态(通过 HBase 配置)或动态(使用 HBase Shell 或 Java API)加载。
请按照以下步骤静态加载协处理器。请记住,必须重新启动 HBase 才能卸载已静态加载的协处理器。
-
在 hbase-site.xml 中定义协处理器,<property>元素包含<name>和<value>子元素。 <name>应为以下之一:</name></value></name></property>
-
RegionObservers 和 Endpoints 的
hbase.coprocessor.region.classes
。 -
WALObservers 的
hbase.coprocessor.wal.classes
。 -
MasterObservers 的
hbase.coprocessor.master.classes
。<value>必须包含协处理器实现类的完全限定类名。</value>
例如,要加载协处理器(在类 SumEndPoint.java 中实现),您必须在 RegionServer 的'hbase-site.xml'文件中创建以下条目(通常位于'conf'目录下):
<property> <name>hbase.coprocessor.region.classes</name> <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value> </property>
如果为加载指定了多个类,则类名必须以逗号分隔。框架尝试使用默认的类加载器加载所有已配置的类。因此,jar 文件必须驻留在服务器端 HBase 类路径中。
以这种方式加载的协处理器将在所有表的所有区域上处于活动状态。这些也称为系统协处理器。将为第一个列出的协处理器分配优先级
Coprocessor.Priority.SYSTEM
。列表中的每个后续协处理器的优先级值都会增加 1(这会降低其优先级,因为优先级具有整数的自然排序顺序)。当调用注册的观察者时,框架以其优先级的排序顺序执行其回调方法。关系是任意破坏的。
-
-
将您的代码放在 HBase 的类路径上。一种简单的方法是将 jar(包含代码和所有依赖项)放入 HBase 安装的
lib/
目录中。 -
重启 HBase。
-
从
hbase-site.xml
中删除协处理器的<property>元素,包括子元素。</property> -
Restart HBase.
-
(可选)从类路径或 HBase 的
lib/
目录中删除协处理器的 JAR 文件。
您也可以动态加载协处理器,而无需重新启动 HBase。这似乎比静态加载更好,但动态加载的协处理器是基于每个表加载的,并且只能用于加载它们的表。因此,动态加载的表有时称为表协处理器。
此外,动态加载协处理器充当表上的模式更改,并且必须使表脱机以加载协处理器。
有三种方法可以动态加载协处理器。
假设
以下提到的说明做出以下假设:
名为
coprocessor.jar
的 JAR 包含协处理器实现及其所有依赖项。JAR 在
hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar
等某些位置可用于 HDFS。
-
使用 HBase Shell 禁用表:
hbase> disable 'users'
-
使用如下命令加载协处理器:
hbase alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/ user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| arg1=1,arg2=2'
协处理器框架将尝试从协处理器表属性值中读取类信息。该值包含由管道(
|
)字符分隔的四条信息。-
文件路径:包含协处理器实现的 jar 文件必须位于所有区域服务器都可以读取它的位置。您可以将文件复制到每个区域服务器上的本地磁盘上,但建议将其存储在 HDFS 中。 HBASE-14548 允许指定包含 jar 或某些通配符的目录,例如:hdfs:// <namenode>: <port>/ user / <hadoop-user>/或 hdfs:// <namenode>: <port>/ user / <hadoop-user>/*.jar。请注意,如果指定了目录,则会添加目录中的所有 jar 文件(.jar)。它不搜索子目录中的文件。如果要指定目录,请不要使用通配符。此增强功能也适用于通过 JAVA API 的用法。</hadoop-user></port></namenode></hadoop-user></port></namenode>
-
类名:协处理器的完整类名。
-
优先级:整数。该框架将使用优先级确定在同一个钩子上注册的所有已配置观察者的执行顺序。此字段可以留空。在这种情况下,框架将分配默认优先级值。
-
参数(可选):此字段传递给协处理器实现。这是可选的。
-
-
启用表格。
hbase(main):003:0> enable 'users'
-
验证协处理器已加载:
hbase(main):04:0> describe 'users'
协处理器应列在
TABLE_ATTRIBUTES
中。
以下 Java 代码显示如何使用HTableDescriptor
的setValue()
方法在users
表上加载协处理器。
TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path + "|"
+ RegionObserverExample.class.getCanonicalName() + "|"
+ Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
在 HBase 0.96 及更新版本中,HTableDescriptor
的addCoprocessor()
方法提供了一种动态加载协处理器的简便方法。
TableName tableName = TableName.valueOf("users");
Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar");
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path,
Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
无法保证框架将成功加载给定的协处理器。例如,shell 命令既不保证特定位置存在 jar 文件,也不验证给定类是否实际包含在 jar 文件中。
-
禁用该表。
hbase> disable 'users'
-
更改表以删除协处理器。
hbase> alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
-
Enable the table.
hbase> enable 'users'
通过使用setValue()
或addCoprocessor()
方法重新加载表定义而不设置协处理器的值。这将删除附加到表的任何协处理器。
TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
在 HBase 0.96 及更高版本中,您可以改为使用HTableDescriptor
类的removeCoprocessor()
方法。
HBase 提供了 Observer Coprocessor 的示例。
下面给出更详细的例子。
这些示例假设一个名为users
的表,它有两个列族personalDet
和salaryDet
,包含个人和工资详细信息。下面是users
表的图形表示。
personalDet | salaryDet | |
---|---|---|
jverne | 儒勒 | 凡尔纳 |
rowkey | 名称 | 姓氏 |
管理 | 管理员 | Admin |
cdickens | 查尔斯 | 狄更斯 |
以下 Observer 协处理器可防止在users
表的Get
或Scan
中返回用户admin
的详细信息。
-
编写一个实现 RegionObserver 类的类。
-
覆盖
preGetOp()
方法(不推荐使用preGet()
方法)以检查客户端是否已使用值admin
查询 rowkey。如果是,则返回空结果。否则,正常处理请求。 -
将您的代码和依赖项放在 JAR 文件中。
-
将 JAR 放在 HDFS 中,HBase 可以在其中找到它。
-
加载协处理器。
-
写一个简单的程序来测试它。
以下是上述步骤的实施:
public class RegionObserverExample implements RegionObserver {
private static final byte[] ADMIN = Bytes.toBytes("admin");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results)
throws IOException {
if (Bytes.equals(get.getRow(),ADMIN)) {
Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
System.currentTimeMillis(), (byte)4, VALUE);
results.add(c);
e.bypass();
}
}
}
覆盖preGetOp()
仅适用于Get
操作。您还需要覆盖preScannerOpen()
方法以从扫描结果中过滤admin
行。
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {
Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
scan.setFilter(filter);
return s;
}
这种方法有效,但有 _ 副作用 _。如果客户端在其扫描中使用了过滤器,则该过滤器将替换该过滤器。相反,您可以显式删除扫描中的任何admin
结果:
@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
Result result = null;
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), ROWKEY)) {
iterator.remove();
break;
}
}
return hasMore;
}
仍然使用users
表,此示例使用端点协处理器实现协处理器以计算所有员工工资的总和。
-
创建一个定义服务的'.proto'文件。
option java_package = "org.myname.hbase.coprocessor.autogenerated"; option java_outer_classname = "Sum"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; message SumRequest { required string family = 1; required string column = 2; } message SumResponse { required int64 sum = 1 [default = 0]; } service SumService { rpc getSum(SumRequest) returns (SumResponse); }
-
执行
protoc
命令从上面的.proto'文件生成 Java 代码。$ mkdir src $ protoc --java_out=src ./sum.proto
这将生成一个类调用
Sum.java
。 -
编写一个扩展生成的服务类的类,实现
Coprocessor
和CoprocessorService
类,并覆盖服务方法。> 如果从
hbase-site.xml
加载协处理器,然后使用 HBase Shell 再次加载同一个协处理器,它将再次加载。同一个类将存在两次,第二个实例将具有更高的 ID(因此具有更低的优先级)。结果是有效地忽略了重复的协处理器。public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; @Override public Service getService() { return this; } @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment)env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { // do nothing } @Override public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(request.getFamily())); scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); Sum.SumResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(scan); List<Cell> results = new ArrayList<>(); boolean hasMore = false; long sum = 0L; do { hasMore = scanner.next(results); for (Cell cell : results) { sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); } results.clear(); } while (hasMore); response = Sum.SumResponse.newBuilder().setSum(sum).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) {} } } done.run(response); } }
Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf("users"); Table table = connection.getTable(tableName); final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build(); try { Map<byte[], Long> results = table.coprocessorService( Sum.SumService.class, null, /* start key */ null, /* end key */ new Batch.Call<Sum.SumService, Long>() { @Override public Long call(Sum.SumService aggregate) throws IOException { BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>(); aggregate.getSum(null, request, rpcCallback); Sum.SumResponse response = rpcCallback.get(); return response.hasSum() ? response.getSum() : 0L; } } ); for (Long sum : results.values()) { System.out.println("Sum = " + sum); } } catch (ServiceException e) { e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); }
-
Load the Coprocessor.
-
编写客户端代码以调用协处理器。
捆绑协处理器
您可以将协处理器的所有类捆绑到 RegionServer 的类路径上的单个 JAR 中,以便于部署。否则,将所有依赖项放在 RegionServer 的类路径中,以便在 RegionServer 启动期间加载它们。 RegionServer 的类路径在 RegionServer 的hbase-env.sh
文件中设置。
自动化部署
您可以使用 Puppet,Chef 或 Ansible 等工具将协处理器的 JAR 发送到 RegionServers 文件系统上的所需位置,然后重新启动每个 RegionServer,以自动执行协处理器部署。此类设置的详细信息超出了本文档的范围。
更新协处理器
部署新版本的给定协处理器并不像禁用它,更换 JAR 和重新启用协处理器那么简单。这是因为除非删除对它的所有当前引用,否则无法在 JVM 中重新加载类。由于当前 JVM 引用了现有的协处理器,因此必须通过重新启动 RegionServer 来重新启动 JVM,以便替换它。预计此行为不会更改。
协处理器日志记录
协处理器框架不提供用于超出标准 Java 日志记录的 API。
协处理器配置
如果您不想从 HBase Shell 加载协处理器,可以将其配置属性添加到hbase-site.xml
。在中使用 HBase Shell ,设置了两个参数:arg1=1,arg2=2
。这些可以添加到hbase-site.xml
中,如下所示:
<property>
<name>arg1</name>
<value>1</value>
</property>
<property>
<name>arg2</name>
<value>2</value>
</property>
然后,您可以使用以下代码读取配置:
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
Get get = new Get(Bytes.toBytes("admin"));
Result result = table.get(get);
for (Cell c : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}");
}
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result res : scanner) {
for (Cell c : res.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ " ==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ " {" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toLong(CellUtil.cloneValue(c))
+ "}");
}
}
在多租户环境中,限制任意用户协处理器可能是一个大问题。 HBase 提供了连续的选项,以确保只有预期的协处理器运行:
-
hbase.coprocessor.enabled
:启用或禁用所有协处理器。这将限制 HBase 的功能,因为禁用所有协处理器将禁用某些安全提供程序。受影响的示例 coproccessor 是org.apache.hadoop.hbase.security.access.AccessController
。-
hbase.coprocessor.user.enabled
:启用或禁用在表(即用户协处理器)上加载协处理器。 -
可以通过
hbase-site.xml
中的以下可调参数静态加载协处理器:-
hbase.coprocessor.regionserver.classes
:由区域服务器加载的以逗号分隔的协处理器列表 -
hbase.coprocessor.region.classes
:RegionObserver 和 Endpoint 协处理器的逗号分隔列表 -
hbase.coprocessor.user.region.classes
:由所有区域加载的以逗号分隔的协处理器列表 -
hbase.coprocessor.master.classes
:由主服务器(MasterObserver 协处理器)加载的以逗号分隔的协处理器列表 -
hbase.coprocessor.wal.classes
:要加载的以逗号分隔的 WALObserver 协处理器列表
-
-
hbase.coprocessor.abortonerror
:如果协处理器应该出错而不是IOError
,是否中止已加载协处理器的守护进程。如果将此设置为 false 并且访问控制器协处理器应该有致命错误,则将绕过协处理器,因此在安全安装中,这被建议为true
;但是,可以在每个表的基础上为用户协处理器重写此操作,以确保它们不会中止其运行区域服务器,而是在出错时卸载。 -
hbase.coprocessor.region.whitelist.paths
:可用于加载org.apache.hadoop.hbase.security.access.CoprocessorWhitelistMasterObserver
的逗号分隔列表,从而可以使用以下选项列出可以加载协处理器的路径的白名单。-
类路径上的协处理器隐式列入白名单
-
*
到通配符所有协处理器路径 -
整个文件系统(例如
hdfs://my-cluster/
) -
由 FilenameUtils.wildcardMatch 评估的通配符路径
-
注意:路径可以指定方案与否(例如
[file:///usr/hbase/lib/coprocessors](file:///usr/hbase/lib/coprocessors)
或所有文件系统/usr/hbase/lib/coprocessors
)
-
-