1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor.example;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.hadoop.hbase.Cell;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.Coprocessor;
28 import org.apache.hadoop.hbase.CoprocessorEnvironment;
29 import org.apache.hadoop.hbase.client.Scan;
30 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
31 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
32 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33 import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
34 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
36 import org.apache.hadoop.hbase.regionserver.InternalScanner;
37 import org.apache.hadoop.hbase.util.Bytes;
38
39 import com.google.protobuf.RpcCallback;
40 import com.google.protobuf.RpcController;
41 import com.google.protobuf.Service;
42
43
44
45
46
47
48
49
50
51 public class RowCountEndpoint extends ExampleProtos.RowCountService
52 implements Coprocessor, CoprocessorService {
53 private RegionCoprocessorEnvironment env;
54
55 public RowCountEndpoint() {
56 }
57
58
59
60
61 @Override
62 public Service getService() {
63 return this;
64 }
65
66
67
68
69 @Override
70 public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
71 RpcCallback<ExampleProtos.CountResponse> done) {
72 Scan scan = new Scan();
73 scan.setFilter(new FirstKeyOnlyFilter());
74 ExampleProtos.CountResponse response = null;
75 InternalScanner scanner = null;
76 try {
77 scanner = env.getRegion().getScanner(scan);
78 List<Cell> results = new ArrayList<Cell>();
79 boolean hasMore = false;
80 byte[] lastRow = null;
81 long count = 0;
82 do {
83 hasMore = scanner.next(results);
84 for (Cell kv : results) {
85 byte[] currentRow = CellUtil.cloneRow(kv);
86 if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
87 lastRow = currentRow;
88 count++;
89 }
90 }
91 results.clear();
92 } while (hasMore);
93
94 response = ExampleProtos.CountResponse.newBuilder()
95 .setCount(count).build();
96 } catch (IOException ioe) {
97 ResponseConverter.setControllerException(controller, ioe);
98 } finally {
99 if (scanner != null) {
100 try {
101 scanner.close();
102 } catch (IOException ignored) {}
103 }
104 }
105 done.run(response);
106 }
107
108
109
110
111 @Override
112 public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
113 RpcCallback<ExampleProtos.CountResponse> done) {
114 ExampleProtos.CountResponse response = null;
115 InternalScanner scanner = null;
116 try {
117 scanner = env.getRegion().getScanner(new Scan());
118 List<Cell> results = new ArrayList<Cell>();
119 boolean hasMore = false;
120 long count = 0;
121 do {
122 hasMore = scanner.next(results);
123 for (Cell kv : results) {
124 count++;
125 }
126 results.clear();
127 } while (hasMore);
128
129 response = ExampleProtos.CountResponse.newBuilder()
130 .setCount(count).build();
131 } catch (IOException ioe) {
132 ResponseConverter.setControllerException(controller, ioe);
133 } finally {
134 if (scanner != null) {
135 try {
136 scanner.close();
137 } catch (IOException ignored) {}
138 }
139 }
140 done.run(response);
141 }
142
143
144
145
146
147
148
149
150
151
152
153 @Override
154 public void start(CoprocessorEnvironment env) throws IOException {
155 if (env instanceof RegionCoprocessorEnvironment) {
156 this.env = (RegionCoprocessorEnvironment)env;
157 } else {
158 throw new CoprocessorException("Must be loaded on a table region!");
159 }
160 }
161
162 @Override
163 public void stop(CoprocessorEnvironment env) throws IOException {
164
165 }
166 }