001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import com.google.protobuf.RpcCallback;
021import com.google.protobuf.RpcController;
022import com.google.protobuf.Service;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.List;
027import org.apache.commons.io.IOUtils;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.CoprocessorEnvironment;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
035import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
036import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
037import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
038import org.apache.hadoop.hbase.regionserver.InternalScanner;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.yetus.audience.InterfaceAudience;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
043
044/**
045 * Sample coprocessor endpoint exposing a Service interface for counting rows and key values.
046 * <p>
047 * For the protocol buffer definition of the RowCountService, see the source file located under
048 * hbase-examples/src/main/protobuf/Examples.proto.
049 * </p>
050 */
051@InterfaceAudience.Private
052public class RowCountEndpoint extends ExampleProtos.RowCountService implements RegionCoprocessor {
053  private RegionCoprocessorEnvironment env;
054
055  public RowCountEndpoint() {
056  }
057
058  /**
059   * Just returns a reference to this object, which implements the RowCounterService interface.
060   */
061  @Override
062  public Iterable<Service> getServices() {
063    return Collections.singleton(this);
064  }
065
066  /**
067   * Returns a count of the rows in the region where this coprocessor is loaded.
068   */
069  @Override
070  public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
071    RpcCallback<ExampleProtos.CountResponse> done) {
072    Scan scan = new Scan();
073    scan.setFilter(new FirstKeyOnlyFilter());
074    ExampleProtos.CountResponse response = null;
075    InternalScanner scanner = null;
076    try {
077      scanner = env.getRegion().getScanner(scan);
078      List<Cell> results = new ArrayList<>();
079      boolean hasMore = false;
080      byte[] lastRow = null;
081      long count = 0;
082      do {
083        hasMore = scanner.next(results);
084        for (Cell kv : results) {
085          byte[] currentRow = CellUtil.cloneRow(kv);
086          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
087            lastRow = currentRow;
088            count++;
089          }
090        }
091        results.clear();
092      } while (hasMore);
093
094      response = ExampleProtos.CountResponse.newBuilder().setCount(count).build();
095    } catch (IOException ioe) {
096      CoprocessorRpcUtils.setControllerException(controller, ioe);
097    } finally {
098      if (scanner != null) {
099        IOUtils.closeQuietly(scanner);
100      }
101    }
102    done.run(response);
103  }
104
105  /**
106   * Returns a count of all KeyValues in the region where this coprocessor is loaded.
107   */
108  @Override
109  public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
110    RpcCallback<ExampleProtos.CountResponse> done) {
111    ExampleProtos.CountResponse response = null;
112    InternalScanner scanner = null;
113    try {
114      scanner = env.getRegion().getScanner(new Scan());
115      List<Cell> results = new ArrayList<>();
116      boolean hasMore = false;
117      long count = 0;
118      do {
119        hasMore = scanner.next(results);
120        count += Iterables.size(results);
121        results.clear();
122      } while (hasMore);
123
124      response = ExampleProtos.CountResponse.newBuilder().setCount(count).build();
125    } catch (IOException ioe) {
126      CoprocessorRpcUtils.setControllerException(controller, ioe);
127    } finally {
128      if (scanner != null) {
129        IOUtils.closeQuietly(scanner);
130      }
131    }
132    done.run(response);
133  }
134
135  /**
136   * Stores a reference to the coprocessor environment provided by the
137   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
138   * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on
139   * a table region, so always expects this to be an instance of
140   * {@link RegionCoprocessorEnvironment}.
141   * @param env the environment provided by the coprocessor host
142   * @throws IOException if the provided environment is not an instance of
143   *                     {@code RegionCoprocessorEnvironment}
144   */
145  @Override
146  public void start(CoprocessorEnvironment env) throws IOException {
147    if (env instanceof RegionCoprocessorEnvironment) {
148      this.env = (RegionCoprocessorEnvironment) env;
149    } else {
150      throw new CoprocessorException("Must be loaded on a table region!");
151    }
152  }
153
154  @Override
155  public void stop(CoprocessorEnvironment env) throws IOException {
156    // nothing to do
157  }
158}