001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.coprocessor.example; 020 021import com.google.protobuf.RpcCallback; 022import com.google.protobuf.RpcController; 023import com.google.protobuf.Service; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.CoprocessorEnvironment; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 035import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos; 036import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 037import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 038import org.apache.hadoop.hbase.regionserver.InternalScanner; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.yetus.audience.InterfaceAudience; 041 042/** 043 * Sample coprocessor endpoint exposing a Service interface for counting rows and key values. 044 * 045 * <p> 046 * For the protocol buffer definition of the RowCountService, see the source file located under 047 * hbase-examples/src/main/protobuf/Examples.proto. 048 * </p> 049 */ 050@InterfaceAudience.Private 051public class RowCountEndpoint extends ExampleProtos.RowCountService implements RegionCoprocessor { 052 private RegionCoprocessorEnvironment env; 053 054 public RowCountEndpoint() { 055 } 056 057 /** 058 * Just returns a reference to this object, which implements the RowCounterService interface. 059 */ 060 @Override 061 public Iterable<Service> getServices() { 062 return Collections.singleton(this); 063 } 064 065 /** 066 * Returns a count of the rows in the region where this coprocessor is loaded. 067 */ 068 @Override 069 public void getRowCount(RpcController controller, ExampleProtos.CountRequest request, 070 RpcCallback<ExampleProtos.CountResponse> done) { 071 Scan scan = new Scan(); 072 scan.setFilter(new FirstKeyOnlyFilter()); 073 ExampleProtos.CountResponse response = null; 074 InternalScanner scanner = null; 075 try { 076 scanner = env.getRegion().getScanner(scan); 077 List<Cell> results = new ArrayList<>(); 078 boolean hasMore = false; 079 byte[] lastRow = null; 080 long count = 0; 081 do { 082 hasMore = scanner.next(results); 083 for (Cell kv : results) { 084 byte[] currentRow = CellUtil.cloneRow(kv); 085 if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { 086 lastRow = currentRow; 087 count++; 088 } 089 } 090 results.clear(); 091 } while (hasMore); 092 093 response = ExampleProtos.CountResponse.newBuilder() 094 .setCount(count).build(); 095 } catch (IOException ioe) { 096 CoprocessorRpcUtils.setControllerException(controller, ioe); 097 } finally { 098 if (scanner != null) { 099 try { 100 scanner.close(); 101 } catch (IOException ignored) {} 102 } 103 } 104 done.run(response); 105 } 106 107 /** 108 * Returns a count of all KeyValues in the region where this coprocessor is loaded. 109 */ 110 @Override 111 public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request, 112 RpcCallback<ExampleProtos.CountResponse> done) { 113 ExampleProtos.CountResponse response = null; 114 InternalScanner scanner = null; 115 try { 116 scanner = env.getRegion().getScanner(new Scan()); 117 List<Cell> results = new ArrayList<>(); 118 boolean hasMore = false; 119 long count = 0; 120 do { 121 hasMore = scanner.next(results); 122 for (Cell kv : results) { 123 count++; 124 } 125 results.clear(); 126 } while (hasMore); 127 128 response = ExampleProtos.CountResponse.newBuilder() 129 .setCount(count).build(); 130 } catch (IOException ioe) { 131 CoprocessorRpcUtils.setControllerException(controller, ioe); 132 } finally { 133 if (scanner != null) { 134 try { 135 scanner.close(); 136 } catch (IOException ignored) {} 137 } 138 } 139 done.run(response); 140 } 141 142 /** 143 * Stores a reference to the coprocessor environment provided by the 144 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 145 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded 146 * on a table region, so always expects this to be an instance of 147 * {@link RegionCoprocessorEnvironment}. 148 * @param env the environment provided by the coprocessor host 149 * @throws IOException if the provided environment is not an instance of 150 * {@code RegionCoprocessorEnvironment} 151 */ 152 @Override 153 public void start(CoprocessorEnvironment env) throws IOException { 154 if (env instanceof RegionCoprocessorEnvironment) { 155 this.env = (RegionCoprocessorEnvironment)env; 156 } else { 157 throw new CoprocessorException("Must be loaded on a table region!"); 158 } 159 } 160 161 @Override 162 public void stop(CoprocessorEnvironment env) throws IOException { 163 // nothing to do 164 } 165}