View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor.example;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.TreeSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.Coprocessor;
32  import org.apache.hadoop.hbase.CoprocessorEnvironment;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
35  import org.apache.hadoop.hbase.client.Delete;
36  import org.apache.hadoop.hbase.client.Mutation;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
39  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
40  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
41  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
42  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
43  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
44  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder;
45  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
46  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
47  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
48  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
49  import org.apache.hadoop.hbase.regionserver.OperationStatus;
50  import org.apache.hadoop.hbase.regionserver.Region;
51  import org.apache.hadoop.hbase.regionserver.RegionScanner;
52  import org.apache.hadoop.hbase.util.Bytes;
53  
54  import com.google.protobuf.RpcCallback;
55  import com.google.protobuf.RpcController;
56  import com.google.protobuf.Service;
57  
58  /**
59   * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
60   * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s) 
61   * or version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s)
62   * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers
63   * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be
64   * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions
65   * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be
66   * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions
67   * which the Scan selects will get deleted.
68   * 
69   * </br> Example: <code><pre>
70   * Scan scan = new Scan();
71   * // set scan properties(rowkey range, filters, timerange etc).
72   * HTable ht = ...;
73   * long noOfDeletedRows = 0L;
74   * Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt; callable = 
75   *     new Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt;() {
76   *   ServerRpcController controller = new ServerRpcController();
77   *   BlockingRpcCallback&lt;BulkDeleteResponse&gt; rpcCallback = 
78   *     new BlockingRpcCallback&lt;BulkDeleteResponse&gt;();
79   *
80   *   public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
81   *     Builder builder = BulkDeleteRequest.newBuilder();
82   *     builder.setScan(ProtobufUtil.toScan(scan));
83   *     builder.setDeleteType(DeleteType.VERSION);
84   *     builder.setRowBatchSize(rowBatchSize);
85   *     // Set optional timestamp if needed
86   *     builder.setTimestamp(timeStamp);
87   *     service.delete(controller, builder.build(), rpcCallback);
88   *     return rpcCallback.get();
89   *   }
90   * };
91   * Map&lt;byte[], BulkDeleteResponse&gt; result = ht.coprocessorService(BulkDeleteService.class, scan
92   *     .getStartRow(), scan.getStopRow(), callable);
93   * for (BulkDeleteResponse response : result.values()) {
94   *   noOfDeletedRows += response.getRowsDeleted();
95   * }
96   * </pre></code>
97   */
98  public class BulkDeleteEndpoint extends BulkDeleteService implements CoprocessorService,
99      Coprocessor {
100   private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
101   private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
102 
103   private RegionCoprocessorEnvironment env;
104 
105   @Override
106   public Service getService() {
107     return this;
108   }
109 
110   @Override
111   public void delete(RpcController controller, BulkDeleteRequest request,
112       RpcCallback<BulkDeleteResponse> done) {
113     long totalRowsDeleted = 0L;
114     long totalVersionsDeleted = 0L;
115     Region region = env.getRegion();
116     int rowBatchSize = request.getRowBatchSize();
117     Long timestamp = null;
118     if (request.hasTimestamp()) {
119       timestamp = request.getTimestamp();
120     }
121     DeleteType deleteType = request.getDeleteType();
122     boolean hasMore = true;
123     RegionScanner scanner = null;
124     try {
125       Scan scan = ProtobufUtil.toScan(request.getScan());
126       if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
127         // What we need is just the rowkeys. So only 1st KV from any row is enough.
128         // Only when it is a row delete, we can apply this filter.
129         // In other types we rely on the scan to know which all columns to be deleted.
130         scan.setFilter(new FirstKeyOnlyFilter());
131       }
132       // Here by assume that the scan is perfect with the appropriate
133       // filter and having necessary column(s).
134       scanner = region.getScanner(scan);
135       while (hasMore) {
136         List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize);
137         for (int i = 0; i < rowBatchSize; i++) {
138           List<Cell> results = new ArrayList<Cell>();
139           hasMore = scanner.next(results);
140           if (results.size() > 0) {
141             deleteRows.add(results);
142           }
143           if (!hasMore) {
144             // There are no more rows.
145             break;
146           }
147         }
148         if (deleteRows.size() > 0) {
149           Mutation[] deleteArr = new Mutation[deleteRows.size()];
150           int i = 0;
151           for (List<Cell> deleteRow : deleteRows) {
152             deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
153           }
154           OperationStatus[] opStatus = region.batchMutate(deleteArr, HConstants.NO_NONCE,
155             HConstants.NO_NONCE);
156           for (i = 0; i < opStatus.length; i++) {
157             if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
158               break;
159             }
160             totalRowsDeleted++;
161             if (deleteType == DeleteType.VERSION) {
162               byte[] versionsDeleted = deleteArr[i].getAttribute(
163                   NO_OF_VERSIONS_TO_DELETE);
164               if (versionsDeleted != null) {
165                 totalVersionsDeleted += Bytes.toInt(versionsDeleted);
166               }
167             }
168           }
169         }
170       }
171     } catch (IOException ioe) {
172       LOG.error(ioe);
173       // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side.
174       ResponseConverter.setControllerException(controller, ioe);
175     } finally {
176       if (scanner != null) {
177         try {
178           scanner.close();
179         } catch (IOException ioe) {
180           LOG.error(ioe);
181         }
182       }
183     }
184     Builder responseBuilder = BulkDeleteResponse.newBuilder();
185     responseBuilder.setRowsDeleted(totalRowsDeleted);
186     if (deleteType == DeleteType.VERSION) {
187       responseBuilder.setVersionsDeleted(totalVersionsDeleted);
188     }
189     BulkDeleteResponse result = responseBuilder.build();
190     done.run(result);
191   }
192 
193   private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType,
194       Long timestamp) {
195     long ts;
196     if (timestamp == null) {
197       ts = HConstants.LATEST_TIMESTAMP;
198     } else {
199       ts = timestamp;
200     }
201     // We just need the rowkey. Get it from 1st KV.
202     byte[] row = CellUtil.cloneRow(deleteRow.get(0));
203     Delete delete = new Delete(row, ts);
204     if (deleteType == DeleteType.FAMILY) {
205       Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
206       for (Cell kv : deleteRow) {
207         if (families.add(CellUtil.cloneFamily(kv))) {
208           delete.deleteFamily(CellUtil.cloneFamily(kv), ts);
209         }
210       }
211     } else if (deleteType == DeleteType.COLUMN) {
212       Set<Column> columns = new HashSet<Column>();
213       for (Cell kv : deleteRow) {
214         Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
215         if (columns.add(column)) {
216           // Making deleteColumns() calls more than once for the same cf:qualifier is not correct
217           // Every call to deleteColumns() will add a new KV to the familymap which will finally
218           // get written to the memstore as part of delete().
219           delete.deleteColumns(column.family, column.qualifier, ts);
220         }
221       }
222     } else if (deleteType == DeleteType.VERSION) {
223       // When some timestamp was passed to the delete() call only one version of the column (with
224       // given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
225       // How many versions will get deleted depends on the Scan being passed. All the KVs that
226       // the scan fetched will get deleted.
227       int noOfVersionsToDelete = 0;
228       if (timestamp == null) {
229         for (Cell kv : deleteRow) {
230           delete.deleteColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp());
231           noOfVersionsToDelete++;
232         }
233       } else {
234         Set<Column> columns = new HashSet<Column>();
235         for (Cell kv : deleteRow) {
236           Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
237           // Only one version of particular column getting deleted.
238           if (columns.add(column)) {
239             delete.deleteColumn(column.family, column.qualifier, ts);
240             noOfVersionsToDelete++;
241           }
242         }
243       }
244       delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
245     }
246     return delete;
247   }
248 
249   private static class Column {
250     private byte[] family;
251     private byte[] qualifier;
252 
253     public Column(byte[] family, byte[] qualifier) {
254       this.family = family;
255       this.qualifier = qualifier;
256     }
257 
258     @Override
259     public boolean equals(Object other) {
260       if (!(other instanceof Column)) {
261         return false;
262       }
263       Column column = (Column) other;
264       return Bytes.equals(this.family, column.family)
265           && Bytes.equals(this.qualifier, column.qualifier);
266     }
267 
268     @Override
269     public int hashCode() {
270       int h = 31;
271       h = h + 13 * Bytes.hashCode(this.family);
272       h = h + 13 * Bytes.hashCode(this.qualifier);
273       return h;
274     }
275   }
276 
277   @Override
278   public void start(CoprocessorEnvironment env) throws IOException {
279     if (env instanceof RegionCoprocessorEnvironment) {
280       this.env = (RegionCoprocessorEnvironment) env;
281     } else {
282       throw new CoprocessorException("Must be loaded on a table region!");
283     }
284   }
285 
286   @Override
287   public void stop(CoprocessorEnvironment env) throws IOException {
288     // nothing to do
289   }
290 }