View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.coprocessor.example;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.Coprocessor;
28  import org.apache.hadoop.hbase.CoprocessorEnvironment;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
31  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
32  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
34  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36  import org.apache.hadoop.hbase.regionserver.InternalScanner;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  import com.google.protobuf.RpcCallback;
40  import com.google.protobuf.RpcController;
41  import com.google.protobuf.Service;
42  
43  /**
44   * Sample coprocessor endpoint exposing a Service interface for counting rows and key values.
45   *
46   * <p>
47   * For the protocol buffer definition of the RowCountService, see the source file located under
48   * hbase-server/src/main/protobuf/Examples.proto.
49   * </p>
50   */
51  public class RowCountEndpoint extends ExampleProtos.RowCountService
52      implements Coprocessor, CoprocessorService {
53    private RegionCoprocessorEnvironment env;
54  
55    public RowCountEndpoint() {
56    }
57  
58    /**
59     * Just returns a reference to this object, which implements the RowCounterService interface.
60     */
61    @Override
62    public Service getService() {
63      return this;
64    }
65  
66    /**
67     * Returns a count of the rows in the region where this coprocessor is loaded.
68     */
69    @Override
70    public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
71                            RpcCallback<ExampleProtos.CountResponse> done) {
72      Scan scan = new Scan();
73      scan.setFilter(new FirstKeyOnlyFilter());
74      ExampleProtos.CountResponse response = null;
75      InternalScanner scanner = null;
76      try {
77        scanner = env.getRegion().getScanner(scan);
78        List<Cell> results = new ArrayList<Cell>();
79        boolean hasMore = false;
80        byte[] lastRow = null;
81        long count = 0;
82        do {
83          hasMore = scanner.next(results);
84          for (Cell kv : results) {
85            byte[] currentRow = CellUtil.cloneRow(kv);
86            if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
87              lastRow = currentRow;
88              count++;
89            }
90          }
91          results.clear();
92        } while (hasMore);
93  
94        response = ExampleProtos.CountResponse.newBuilder()
95            .setCount(count).build();
96      } catch (IOException ioe) {
97        ResponseConverter.setControllerException(controller, ioe);
98      } finally {
99        if (scanner != null) {
100         try {
101           scanner.close();
102         } catch (IOException ignored) {}
103       }
104     }
105     done.run(response);
106   }
107 
108   /**
109    * Returns a count of all KeyValues in the region where this coprocessor is loaded.
110    */
111   @Override
112   public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
113                                RpcCallback<ExampleProtos.CountResponse> done) {
114     ExampleProtos.CountResponse response = null;
115     InternalScanner scanner = null;
116     try {
117       scanner = env.getRegion().getScanner(new Scan());
118       List<Cell> results = new ArrayList<Cell>();
119       boolean hasMore = false;
120       long count = 0;
121       do {
122         hasMore = scanner.next(results);
123         for (Cell kv : results) {
124           count++;
125         }
126         results.clear();
127       } while (hasMore);
128 
129       response = ExampleProtos.CountResponse.newBuilder()
130           .setCount(count).build();
131     } catch (IOException ioe) {
132       ResponseConverter.setControllerException(controller, ioe);
133     } finally {
134       if (scanner != null) {
135         try {
136           scanner.close();
137         } catch (IOException ignored) {}
138       }
139     }
140     done.run(response);
141   }
142 
143   /**
144    * Stores a reference to the coprocessor environment provided by the
145    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
146    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
147    * on a table region, so always expects this to be an instance of
148    * {@link RegionCoprocessorEnvironment}.
149    * @param env the environment provided by the coprocessor host
150    * @throws IOException if the provided environment is not an instance of
151    * {@code RegionCoprocessorEnvironment}
152    */
153   @Override
154   public void start(CoprocessorEnvironment env) throws IOException {
155     if (env instanceof RegionCoprocessorEnvironment) {
156       this.env = (RegionCoprocessorEnvironment)env;
157     } else {
158       throw new CoprocessorException("Must be loaded on a table region!");
159     }
160   }
161 
162   @Override
163   public void stop(CoprocessorEnvironment env) throws IOException {
164     // nothing to do
165   }
166 }