View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.coprocessor.example;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.Coprocessor;
28  import org.apache.hadoop.hbase.CoprocessorEnvironment;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
31  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
32  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
34  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36  import org.apache.hadoop.hbase.regionserver.InternalScanner;
37  import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
38  import org.apache.hadoop.hbase.util.Bytes;
39  
40  import com.google.protobuf.RpcCallback;
41  import com.google.protobuf.RpcController;
42  import com.google.protobuf.Service;
43  
44  /**
45   * Sample coprocessor endpoint exposing a Service interface for counting rows and key values.
46   *
47   * <p>
48   * For the protocol buffer definition of the RowCountService, see the source file located under
49   * hbase-server/src/main/protobuf/Examples.proto.
50   * </p>
51   */
52  public class RowCountEndpoint extends ExampleProtos.RowCountService
53      implements Coprocessor, CoprocessorService {
54    private RegionCoprocessorEnvironment env;
55  
56    public RowCountEndpoint() {
57    }
58  
59    /**
60     * Just returns a reference to this object, which implements the RowCounterService interface.
61     */
62    @Override
63    public Service getService() {
64      return this;
65    }
66  
67    /**
68     * Returns a count of the rows in the region where this coprocessor is loaded.
69     */
70    @Override
71    public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
72                            RpcCallback<ExampleProtos.CountResponse> done) {
73      Scan scan = new Scan();
74      scan.setFilter(new FirstKeyOnlyFilter());
75      ExampleProtos.CountResponse response = null;
76      InternalScanner scanner = null;
77      try {
78        scanner = env.getRegion().getScanner(scan);
79        List<Cell> results = new ArrayList<Cell>();
80        boolean hasMore = false;
81        byte[] lastRow = null;
82        long count = 0;
83        do {
84          hasMore = NextState.hasMoreValues(scanner.next(results));
85          for (Cell kv : results) {
86            byte[] currentRow = CellUtil.cloneRow(kv);
87            if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
88              lastRow = currentRow;
89              count++;
90            }
91          }
92          results.clear();
93        } while (hasMore);
94  
95        response = ExampleProtos.CountResponse.newBuilder()
96            .setCount(count).build();
97      } catch (IOException ioe) {
98        ResponseConverter.setControllerException(controller, ioe);
99      } finally {
100       if (scanner != null) {
101         try {
102           scanner.close();
103         } catch (IOException ignored) {}
104       }
105     }
106     done.run(response);
107   }
108 
109   /**
110    * Returns a count of all KeyValues in the region where this coprocessor is loaded.
111    */
112   @Override
113   public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
114                                RpcCallback<ExampleProtos.CountResponse> done) {
115     ExampleProtos.CountResponse response = null;
116     InternalScanner scanner = null;
117     try {
118       scanner = env.getRegion().getScanner(new Scan());
119       List<Cell> results = new ArrayList<Cell>();
120       boolean hasMore = false;
121       long count = 0;
122       do {
123         hasMore = NextState.hasMoreValues(scanner.next(results));
124         for (Cell kv : results) {
125           count++;
126         }
127         results.clear();
128       } while (hasMore);
129 
130       response = ExampleProtos.CountResponse.newBuilder()
131           .setCount(count).build();
132     } catch (IOException ioe) {
133       ResponseConverter.setControllerException(controller, ioe);
134     } finally {
135       if (scanner != null) {
136         try {
137           scanner.close();
138         } catch (IOException ignored) {}
139       }
140     }
141     done.run(response);
142   }
143 
144   /**
145    * Stores a reference to the coprocessor environment provided by the
146    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
147    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
148    * on a table region, so always expects this to be an instance of
149    * {@link RegionCoprocessorEnvironment}.
150    * @param env the environment provided by the coprocessor host
151    * @throws IOException if the provided environment is not an instance of
152    * {@code RegionCoprocessorEnvironment}
153    */
154   @Override
155   public void start(CoprocessorEnvironment env) throws IOException {
156     if (env instanceof RegionCoprocessorEnvironment) {
157       this.env = (RegionCoprocessorEnvironment)env;
158     } else {
159       throw new CoprocessorException("Must be loaded on a table region!");
160     }
161   }
162 
163   @Override
164   public void stop(CoprocessorEnvironment env) throws IOException {
165     // nothing to do
166   }
167 }