001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import org.apache.commons.io.IOUtils;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.CoprocessorEnvironment;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
030import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
032import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
033import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
034import org.apache.hadoop.hbase.regionserver.InternalScanner;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.yetus.audience.InterfaceAudience;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
039import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
041import org.apache.hbase.thirdparty.com.google.protobuf.Service;
042
043import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.RowCountProtos.CountRequest;
044import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.RowCountProtos.CountResponse;
045import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.RowCountProtos.RowCountService;
046
047/**
048 * Sample coprocessor endpoint exposing a Service interface for counting rows and key values.
049 * <p>
050 * For the protocol buffer definition of the RowCountService, see the source file located under
051 * hbase-examples/src/main/protobuf/Examples.proto.
052 * </p>
053 */
054@InterfaceAudience.Private
055public class RowCountEndpoint extends RowCountService implements RegionCoprocessor {
056  private RegionCoprocessorEnvironment env;
057
058  public RowCountEndpoint() {
059  }
060
061  /**
062   * Just returns a reference to this object, which implements the RowCounterService interface.
063   */
064  @Override
065  public Iterable<Service> getServices() {
066    return Collections.singleton(this);
067  }
068
069  /**
070   * Returns a count of the rows in the region where this coprocessor is loaded.
071   */
072  @Override
073  public void getRowCount(RpcController controller, CountRequest request,
074    RpcCallback<CountResponse> done) {
075    Scan scan = new Scan();
076    scan.setFilter(new FirstKeyOnlyFilter());
077    CountResponse response = null;
078    InternalScanner scanner = null;
079    try {
080      scanner = env.getRegion().getScanner(scan);
081      List<Cell> results = new ArrayList<>();
082      boolean hasMore = false;
083      byte[] lastRow = null;
084      long count = 0;
085      do {
086        hasMore = scanner.next(results);
087        for (Cell kv : results) {
088          byte[] currentRow = CellUtil.cloneRow(kv);
089          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
090            lastRow = currentRow;
091            count++;
092          }
093        }
094        results.clear();
095      } while (hasMore);
096
097      response = CountResponse.newBuilder().setCount(count).build();
098    } catch (IOException ioe) {
099      CoprocessorRpcUtils.setControllerException(controller, ioe);
100    } finally {
101      if (scanner != null) {
102        IOUtils.closeQuietly(scanner);
103      }
104    }
105    done.run(response);
106  }
107
108  /**
109   * Returns a count of all KeyValues in the region where this coprocessor is loaded.
110   */
111  @Override
112  public void getKeyValueCount(RpcController controller, CountRequest request,
113    RpcCallback<CountResponse> done) {
114    CountResponse response = null;
115    InternalScanner scanner = null;
116    try {
117      scanner = env.getRegion().getScanner(new Scan());
118      List<Cell> results = new ArrayList<>();
119      boolean hasMore = false;
120      long count = 0;
121      do {
122        hasMore = scanner.next(results);
123        count += Iterables.size(results);
124        results.clear();
125      } while (hasMore);
126      response = CountResponse.newBuilder().setCount(count).build();
127    } catch (IOException ioe) {
128      CoprocessorRpcUtils.setControllerException(controller, ioe);
129    } finally {
130      if (scanner != null) {
131        IOUtils.closeQuietly(scanner);
132      }
133    }
134    done.run(response);
135  }
136
137  /**
138   * Stores a reference to the coprocessor environment provided by the
139   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
140   * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on
141   * a table region, so always expects this to be an instance of
142   * {@link RegionCoprocessorEnvironment}.
143   * @param env the environment provided by the coprocessor host
144   * @throws IOException if the provided environment is not an instance of
145   *                     {@code RegionCoprocessorEnvironment}
146   */
147  @Override
148  public void start(CoprocessorEnvironment env) throws IOException {
149    if (env instanceof RegionCoprocessorEnvironment) {
150      this.env = (RegionCoprocessorEnvironment) env;
151    } else {
152      throw new CoprocessorException("Must be loaded on a table region!");
153    }
154  }
155
156  @Override
157  public void stop(CoprocessorEnvironment env) throws IOException {
158    // nothing to do
159  }
160}