Package org.apache.hadoop.hbase.coprocessor

Table of Contents

See: Description

Package org.apache.hadoop.hbase.coprocessor Description

Table of Contents


Coprocessors are code that runs in-process on each region server. Regions contain references to the coprocessor implementation classes associated with them. Coprocessor classes can be loaded either from local jars on the region server's classpath or via the HDFS classloader.

Multiple types of coprocessors are provided to provide sufficient flexibility for potential use cases. Right now there are:


A coprocessor is required to implement Coprocessor interface so that coprocessor framework can manage it internally.

Another design goal of this interface is to provide simple features for making coprocessors useful, while exposing no more internal state or control actions of the region server than necessary and not exposing them directly.

Over the lifecycle of a region, the methods of this interface are invoked when the corresponding events happen. The master transitions regions through the following states:

    unassigned -> pendingOpen -> open -> pendingClose -> closed.

Coprocessors have opportunity to intercept and handle events in pendingOpen, open, and pendingClose states.


The region server is opening a region to bring it online. Coprocessors can piggyback or fail this process.


The region is open on the region server and is processing both client requests (get, put, scan, etc.) and administrative actions (flush, compact, split, etc.). Coprocessors can piggyback administrative actions via:


The region server is closing the region. This can happen as part of normal operations or may happen when the region server is aborting due to fatal conditions such as OOME, health check failure, or fatal filesystem problems. Coprocessors can piggyback this event. If the server is aborting an indication to this effect will be passed as an argument.


If the coprocessor implements the RegionObserver interface it can observe and mediate client actions on the region:

You can also extend abstract class BaseRegionObserverCoprocessor which implements both Coprocessor and RegionObserver. In addition, it overrides all methods with default behaviors so you don't have to override all of them.

Here's an example of what a simple RegionObserver might look like. This example shows how to implement access control for HBase. This coprocessor checks user information for a given client request, e.g., Get/Put/Delete/Scan by injecting code at certain RegionObserver preXXX hooks. If the user is not allowed to access the resource, a CoprocessorException will be thrown. And the client request will be denied by receiving this exception.

package org.apache.hadoop.hbase.coprocessor;

import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;

// Sample access-control coprocessor. It utilizes RegionObserver
// and intercept preXXX() method to check user privilege for the given table
// and column family.
public class AccessControlCoprocessor extends BaseRegionObserverCoprocessor {
  // @Override
  public Get preGet(CoprocessorEnvironment e, Get get)
      throws CoprocessorException {

    // check permissions..
    if (access_not_allowed)  {
      throw new AccessDeniedException("User is not allowed to access.");
    return get;

  // override prePut(), preDelete(), etc.


Coprocessor and RegionObserver provide certain hooks for injecting user code running at each region. The user code will be triggerd by existing HTable and HBaseAdmin operations at the certain hook points.

Through Endpoint and dynamic RPC protocol, you can define your own interface communicated between client and region server, i.e., you can create a new method, specify passed parameters and return types for this new method. And the new Endpoint methods can be triggered by calling client side dynamic RPC functions -- HTable.coprocessorExec(...) .

To implement a Endpoint, you need to:

Here's an example of performing column aggregation at region server:

// A sample protocol for performing aggregation at regions.
public static interface ColumnAggregationProtocol
extends CoprocessorProtocol {
  // Perform aggregation for a given column at the region. The aggregation
  // will include all the rows inside the region. It can be extended to
  // allow passing start and end rows for a fine-grained aggregation.
  public int sum(byte[] family, byte[] qualifier) throws IOException;
// Aggregation implementation at a region.
public static class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
implements ColumnAggregationProtocol {
  // @Override
  // Scan the region by the given family and qualifier. Return the aggregation
  // result.
  public int sum(byte[] family, byte[] qualifier)
  throws IOException {
    // aggregate at each region
    Scan scan = new Scan();
    scan.addColumn(family, qualifier);
    int sumResult = 0;
    // use an internal scanner to perform scanning.
    InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
    try {
      List<KeyValue> curVals = new ArrayList<KeyValue>();
      boolean done = false;
      do {
        done =;
        KeyValue kv = curVals.get(0);
        sumResult += Bytes.toInt(kv.getValue());
      } while (done);
    } finally {
    return sumResult;

Client invocations are performed through HTable, which has the following methods added by dynamic RPC protocol:

public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, Row row)

public <T extends CoprocessorProtocol, R> void coprocessorExec(
    Class<T> protocol, List<? extends Row> rows,
    BatchCall<T,R> callable, BatchCallback<R> callback)

public <T extends CoprocessorProtocol, R> void coprocessorExec(
    Class<T> protocol, RowRange range,
    BatchCall<T,R> callable, BatchCallback<R> callback)

Here is a client side example of invoking ColumnAggregationEndpoint:

HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Scan scan;
Map<byte[], Integer> results;

// scan: for all regions
scan = new Scan();
results = table.coprocessorExec(ColumnAggregationProtocol.class, scan,
    new BatchCall<ColumnAggregationProtocol,Integer>() {
      public Integer call(ColumnAggregationProtocol instance) throws IOException{
        return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Integer> e : results.entrySet()) {
  sumResult += e.getValue();

Coprocess loading

A customized coprocessor can be loaded by two different ways, by configuration, or by HTableDescriptor for a newly created table.

(Currently we don't really have an on demand coprocessor loading machanism for opened regions.)

Load from configuration

Whenever a region is opened, it will read coprocessor class names from hbase.coprocessor.region.classes from Configuration. Coprocessor framework will automatically load the configured classes as default coprocessors. The classes must be included in the classpath already.

    <value>org.apache.hadoop.hbase.coprocessor.AccessControllCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol</value>
    <description>A comma-separated list of Coprocessors that are loaded by
    default. For any override coprocessor method from RegionObservor or
    Coprocessor, these classes' implementation will be called
    in order. After implement your own
    Coprocessor, just put it in HBase's classpath and add the fully
    qualified class name here.

The first defined coprocessor will be assigned Coprocessor.Priority.SYSTEM as priority. And each following coprocessor's priority will be incremented by one. Coprocessors are executed in order according to the natural ordering of the int.

Load from table attribute

Coprocessor classes can also be configured at table attribute. The attribute key must start with "Coprocessor" and values of the form is <path>:<class>:<priority>, so that the framework can recognize and load it.

'COPROCESSOR$1' => 'hdfs://localhost:8020/hbase/coprocessors/test.jar:Test:1000'
'COPROCESSOR$2' => '/hbase/coprocessors/test2.jar:AnotherTest:1001'

<path> must point to a jar, can be on any filesystem supported by the Hadoop FileSystem object.

<class> is the coprocessor implementation class. A jar can contain more than one coprocessor implementation, but only one can be specified at a time in each table attribute.

<priority> is an integer. Coprocessors are executed in order according to the natural ordering of the int. Coprocessors can optionally abort actions. So typically one would want to put authoritative CPs (security policy implementations, perhaps) ahead of observers.

  Path path = new Path(fs.getUri() + Path.SEPARATOR +

  // create a table that references the jar
  HTableDescriptor htd = new HTableDescriptor(getClass().getName());
  htd.addFamily(new HColumnDescriptor("test"));
    path.toString() +
    ":" + classFullName +
    ":" + Coprocessor.Priority.USER);
  HBaseAdmin admin = new HBaseAdmin(this.conf);

Chain of RegionObservers

As described above, multiple coprocessors can be loaded at one region at the same time. In case of RegionObserver, you can have more than one RegionObservers register to one same hook point, i.e, preGet(), etc. When a region reach the hook point, the framework will invoke each registered RegionObserver by the order of assigned priority.

Copyright © 2015 The Apache Software Foundation. All rights reserved.