001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.List; 027import org.apache.commons.io.IOUtils; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.CoprocessorEnvironment; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 035import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos; 036import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 037import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 038import org.apache.hadoop.hbase.regionserver.InternalScanner; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.yetus.audience.InterfaceAudience; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 043 044/** 045 * Sample coprocessor endpoint exposing a Service interface for counting rows and key values. 046 * <p> 047 * For the protocol buffer definition of the RowCountService, see the source file located under 048 * hbase-examples/src/main/protobuf/Examples.proto. 049 * </p> 050 */ 051@InterfaceAudience.Private 052public class RowCountEndpoint extends ExampleProtos.RowCountService implements RegionCoprocessor { 053 private RegionCoprocessorEnvironment env; 054 055 public RowCountEndpoint() { 056 } 057 058 /** 059 * Just returns a reference to this object, which implements the RowCounterService interface. 060 */ 061 @Override 062 public Iterable<Service> getServices() { 063 return Collections.singleton(this); 064 } 065 066 /** 067 * Returns a count of the rows in the region where this coprocessor is loaded. 068 */ 069 @Override 070 public void getRowCount(RpcController controller, ExampleProtos.CountRequest request, 071 RpcCallback<ExampleProtos.CountResponse> done) { 072 Scan scan = new Scan(); 073 scan.setFilter(new FirstKeyOnlyFilter()); 074 ExampleProtos.CountResponse response = null; 075 InternalScanner scanner = null; 076 try { 077 scanner = env.getRegion().getScanner(scan); 078 List<Cell> results = new ArrayList<>(); 079 boolean hasMore = false; 080 byte[] lastRow = null; 081 long count = 0; 082 do { 083 hasMore = scanner.next(results); 084 for (Cell kv : results) { 085 byte[] currentRow = CellUtil.cloneRow(kv); 086 if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { 087 lastRow = currentRow; 088 count++; 089 } 090 } 091 results.clear(); 092 } while (hasMore); 093 094 response = ExampleProtos.CountResponse.newBuilder().setCount(count).build(); 095 } catch (IOException ioe) { 096 CoprocessorRpcUtils.setControllerException(controller, ioe); 097 } finally { 098 if (scanner != null) { 099 IOUtils.closeQuietly(scanner); 100 } 101 } 102 done.run(response); 103 } 104 105 /** 106 * Returns a count of all KeyValues in the region where this coprocessor is loaded. 107 */ 108 @Override 109 public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request, 110 RpcCallback<ExampleProtos.CountResponse> done) { 111 ExampleProtos.CountResponse response = null; 112 InternalScanner scanner = null; 113 try { 114 scanner = env.getRegion().getScanner(new Scan()); 115 List<Cell> results = new ArrayList<>(); 116 boolean hasMore = false; 117 long count = 0; 118 do { 119 hasMore = scanner.next(results); 120 count += Iterables.size(results); 121 results.clear(); 122 } while (hasMore); 123 124 response = ExampleProtos.CountResponse.newBuilder().setCount(count).build(); 125 } catch (IOException ioe) { 126 CoprocessorRpcUtils.setControllerException(controller, ioe); 127 } finally { 128 if (scanner != null) { 129 IOUtils.closeQuietly(scanner); 130 } 131 } 132 done.run(response); 133 } 134 135 /** 136 * Stores a reference to the coprocessor environment provided by the 137 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 138 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on 139 * a table region, so always expects this to be an instance of 140 * {@link RegionCoprocessorEnvironment}. 141 * @param env the environment provided by the coprocessor host 142 * @throws IOException if the provided environment is not an instance of 143 * {@code RegionCoprocessorEnvironment} 144 */ 145 @Override 146 public void start(CoprocessorEnvironment env) throws IOException { 147 if (env instanceof RegionCoprocessorEnvironment) { 148 this.env = (RegionCoprocessorEnvironment) env; 149 } else { 150 throw new CoprocessorException("Must be loaded on a table region!"); 151 } 152 } 153 154 @Override 155 public void stop(CoprocessorEnvironment env) throws IOException { 156 // nothing to do 157 } 158}