1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.coprocessor.example;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.TreeSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.Coprocessor;
32 import org.apache.hadoop.hbase.CoprocessorEnvironment;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
39 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
40 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
41 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
42 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
43 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
44 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder;
45 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
46 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
47 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
48 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
49 import org.apache.hadoop.hbase.regionserver.OperationStatus;
50 import org.apache.hadoop.hbase.regionserver.Region;
51 import org.apache.hadoop.hbase.regionserver.RegionScanner;
52 import org.apache.hadoop.hbase.util.Bytes;
53
54 import com.google.protobuf.RpcCallback;
55 import com.google.protobuf.RpcController;
56 import com.google.protobuf.Service;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public class BulkDeleteEndpoint extends BulkDeleteService implements CoprocessorService,
99 Coprocessor {
100 private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
101 private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
102
103 private RegionCoprocessorEnvironment env;
104
105 @Override
106 public Service getService() {
107 return this;
108 }
109
110 @Override
111 public void delete(RpcController controller, BulkDeleteRequest request,
112 RpcCallback<BulkDeleteResponse> done) {
113 long totalRowsDeleted = 0L;
114 long totalVersionsDeleted = 0L;
115 Region region = env.getRegion();
116 int rowBatchSize = request.getRowBatchSize();
117 Long timestamp = null;
118 if (request.hasTimestamp()) {
119 timestamp = request.getTimestamp();
120 }
121 DeleteType deleteType = request.getDeleteType();
122 boolean hasMore = true;
123 RegionScanner scanner = null;
124 try {
125 Scan scan = ProtobufUtil.toScan(request.getScan());
126 if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
127
128
129
130 scan.setFilter(new FirstKeyOnlyFilter());
131 }
132
133
134 scanner = region.getScanner(scan);
135 while (hasMore) {
136 List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize);
137 for (int i = 0; i < rowBatchSize; i++) {
138 List<Cell> results = new ArrayList<Cell>();
139 hasMore = scanner.next(results);
140 if (results.size() > 0) {
141 deleteRows.add(results);
142 }
143 if (!hasMore) {
144
145 break;
146 }
147 }
148 if (deleteRows.size() > 0) {
149 Mutation[] deleteArr = new Mutation[deleteRows.size()];
150 int i = 0;
151 for (List<Cell> deleteRow : deleteRows) {
152 deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
153 }
154 OperationStatus[] opStatus = region.batchMutate(deleteArr, HConstants.NO_NONCE,
155 HConstants.NO_NONCE);
156 for (i = 0; i < opStatus.length; i++) {
157 if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
158 break;
159 }
160 totalRowsDeleted++;
161 if (deleteType == DeleteType.VERSION) {
162 byte[] versionsDeleted = deleteArr[i].getAttribute(
163 NO_OF_VERSIONS_TO_DELETE);
164 if (versionsDeleted != null) {
165 totalVersionsDeleted += Bytes.toInt(versionsDeleted);
166 }
167 }
168 }
169 }
170 }
171 } catch (IOException ioe) {
172 LOG.error(ioe);
173
174 ResponseConverter.setControllerException(controller, ioe);
175 } finally {
176 if (scanner != null) {
177 try {
178 scanner.close();
179 } catch (IOException ioe) {
180 LOG.error(ioe);
181 }
182 }
183 }
184 Builder responseBuilder = BulkDeleteResponse.newBuilder();
185 responseBuilder.setRowsDeleted(totalRowsDeleted);
186 if (deleteType == DeleteType.VERSION) {
187 responseBuilder.setVersionsDeleted(totalVersionsDeleted);
188 }
189 BulkDeleteResponse result = responseBuilder.build();
190 done.run(result);
191 }
192
193 private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType,
194 Long timestamp) {
195 long ts;
196 if (timestamp == null) {
197 ts = HConstants.LATEST_TIMESTAMP;
198 } else {
199 ts = timestamp;
200 }
201
202 byte[] row = CellUtil.cloneRow(deleteRow.get(0));
203 Delete delete = new Delete(row, ts);
204 if (deleteType == DeleteType.FAMILY) {
205 Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
206 for (Cell kv : deleteRow) {
207 if (families.add(CellUtil.cloneFamily(kv))) {
208 delete.deleteFamily(CellUtil.cloneFamily(kv), ts);
209 }
210 }
211 } else if (deleteType == DeleteType.COLUMN) {
212 Set<Column> columns = new HashSet<Column>();
213 for (Cell kv : deleteRow) {
214 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
215 if (columns.add(column)) {
216
217
218
219 delete.deleteColumns(column.family, column.qualifier, ts);
220 }
221 }
222 } else if (deleteType == DeleteType.VERSION) {
223
224
225
226
227 int noOfVersionsToDelete = 0;
228 if (timestamp == null) {
229 for (Cell kv : deleteRow) {
230 delete.deleteColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp());
231 noOfVersionsToDelete++;
232 }
233 } else {
234 Set<Column> columns = new HashSet<Column>();
235 for (Cell kv : deleteRow) {
236 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
237
238 if (columns.add(column)) {
239 delete.deleteColumn(column.family, column.qualifier, ts);
240 noOfVersionsToDelete++;
241 }
242 }
243 }
244 delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
245 }
246 return delete;
247 }
248
249 private static class Column {
250 private byte[] family;
251 private byte[] qualifier;
252
253 public Column(byte[] family, byte[] qualifier) {
254 this.family = family;
255 this.qualifier = qualifier;
256 }
257
258 @Override
259 public boolean equals(Object other) {
260 if (!(other instanceof Column)) {
261 return false;
262 }
263 Column column = (Column) other;
264 return Bytes.equals(this.family, column.family)
265 && Bytes.equals(this.qualifier, column.qualifier);
266 }
267
268 @Override
269 public int hashCode() {
270 int h = 31;
271 h = h + 13 * Bytes.hashCode(this.family);
272 h = h + 13 * Bytes.hashCode(this.qualifier);
273 return h;
274 }
275 }
276
277 @Override
278 public void start(CoprocessorEnvironment env) throws IOException {
279 if (env instanceof RegionCoprocessorEnvironment) {
280 this.env = (RegionCoprocessorEnvironment) env;
281 } else {
282 throw new CoprocessorException("Must be loaded on a table region!");
283 }
284 }
285
286 @Override
287 public void stop(CoprocessorEnvironment env) throws IOException {
288
289 }
290 }