001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Set; 029import java.util.TreeSet; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.CoprocessorEnvironment; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 041import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest; 042import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType; 043import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse; 044import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder; 045import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService; 046import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 047import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 048import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 049import org.apache.hadoop.hbase.regionserver.OperationStatus; 050import org.apache.hadoop.hbase.regionserver.Region; 051import org.apache.hadoop.hbase.regionserver.RegionScanner; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with 059 * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s) 060 * or version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s) 061 * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers 062 * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be 063 * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions 064 * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be 065 * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions 066 * which the Scan selects will get deleted. 067 * 068 * <br> Example: <pre><code> 069 * Scan scan = new Scan(); 070 * // set scan properties(rowkey range, filters, timerange etc). 071 * HTable ht = ...; 072 * long noOfDeletedRows = 0L; 073 * Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = 074 * new Batch.Call<BulkDeleteService, BulkDeleteResponse>() { 075 * ServerRpcController controller = new ServerRpcController(); 076 * BlockingRpcCallback<BulkDeleteResponse> rpcCallback = 077 * new BlockingRpcCallback<BulkDeleteResponse>(); 078 * 079 * public BulkDeleteResponse call(BulkDeleteService service) throws IOException { 080 * Builder builder = BulkDeleteRequest.newBuilder(); 081 * builder.setScan(ProtobufUtil.toScan(scan)); 082 * builder.setDeleteType(DeleteType.VERSION); 083 * builder.setRowBatchSize(rowBatchSize); 084 * // Set optional timestamp if needed 085 * builder.setTimestamp(timeStamp); 086 * service.delete(controller, builder.build(), rpcCallback); 087 * return rpcCallback.get(); 088 * } 089 * }; 090 * Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan 091 * .getStartRow(), scan.getStopRow(), callable); 092 * for (BulkDeleteResponse response : result.values()) { 093 * noOfDeletedRows += response.getRowsDeleted(); 094 * } 095 * </code></pre> 096 */ 097@InterfaceAudience.Private 098public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor { 099 private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete"; 100 private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteEndpoint.class); 101 102 private RegionCoprocessorEnvironment env; 103 104 @Override 105 public Iterable<Service> getServices() { 106 return Collections.singleton(this); 107 } 108 109 @Override 110 public void delete(RpcController controller, BulkDeleteRequest request, 111 RpcCallback<BulkDeleteResponse> done) { 112 long totalRowsDeleted = 0L; 113 long totalVersionsDeleted = 0L; 114 Region region = env.getRegion(); 115 int rowBatchSize = request.getRowBatchSize(); 116 Long timestamp = null; 117 if (request.hasTimestamp()) { 118 timestamp = request.getTimestamp(); 119 } 120 DeleteType deleteType = request.getDeleteType(); 121 boolean hasMore = true; 122 RegionScanner scanner = null; 123 try { 124 Scan scan = ProtobufUtil.toScan(request.getScan()); 125 if (scan.getFilter() == null && deleteType == DeleteType.ROW) { 126 // What we need is just the rowkeys. So only 1st KV from any row is enough. 127 // Only when it is a row delete, we can apply this filter. 128 // In other types we rely on the scan to know which all columns to be deleted. 129 scan.setFilter(new FirstKeyOnlyFilter()); 130 } 131 // Here by assume that the scan is perfect with the appropriate 132 // filter and having necessary column(s). 133 scanner = region.getScanner(scan); 134 while (hasMore) { 135 List<List<Cell>> deleteRows = new ArrayList<>(rowBatchSize); 136 for (int i = 0; i < rowBatchSize; i++) { 137 List<Cell> results = new ArrayList<>(); 138 hasMore = scanner.next(results); 139 if (results.size() > 0) { 140 deleteRows.add(results); 141 } 142 if (!hasMore) { 143 // There are no more rows. 144 break; 145 } 146 } 147 if (deleteRows.size() > 0) { 148 Mutation[] deleteArr = new Mutation[deleteRows.size()]; 149 int i = 0; 150 for (List<Cell> deleteRow : deleteRows) { 151 deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); 152 } 153 OperationStatus[] opStatus = region.batchMutate(deleteArr); 154 for (i = 0; i < opStatus.length; i++) { 155 if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { 156 break; 157 } 158 totalRowsDeleted++; 159 if (deleteType == DeleteType.VERSION) { 160 byte[] versionsDeleted = deleteArr[i].getAttribute( 161 NO_OF_VERSIONS_TO_DELETE); 162 if (versionsDeleted != null) { 163 totalVersionsDeleted += Bytes.toInt(versionsDeleted); 164 } 165 } 166 } 167 } 168 } 169 } catch (IOException ioe) { 170 LOG.error(ioe.toString(), ioe); 171 // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side. 172 CoprocessorRpcUtils.setControllerException(controller, ioe); 173 } finally { 174 if (scanner != null) { 175 try { 176 scanner.close(); 177 } catch (IOException ioe) { 178 LOG.error(ioe.toString(), ioe); 179 } 180 } 181 } 182 Builder responseBuilder = BulkDeleteResponse.newBuilder(); 183 responseBuilder.setRowsDeleted(totalRowsDeleted); 184 if (deleteType == DeleteType.VERSION) { 185 responseBuilder.setVersionsDeleted(totalVersionsDeleted); 186 } 187 BulkDeleteResponse result = responseBuilder.build(); 188 done.run(result); 189 } 190 191 private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType, 192 Long timestamp) { 193 long ts; 194 if (timestamp == null) { 195 ts = HConstants.LATEST_TIMESTAMP; 196 } else { 197 ts = timestamp; 198 } 199 // We just need the rowkey. Get it from 1st KV. 200 byte[] row = CellUtil.cloneRow(deleteRow.get(0)); 201 Delete delete = new Delete(row, ts); 202 if (deleteType == DeleteType.FAMILY) { 203 Set<byte[]> families = new TreeSet<>(Bytes.BYTES_COMPARATOR); 204 for (Cell kv : deleteRow) { 205 if (families.add(CellUtil.cloneFamily(kv))) { 206 delete.addFamily(CellUtil.cloneFamily(kv), ts); 207 } 208 } 209 } else if (deleteType == DeleteType.COLUMN) { 210 Set<Column> columns = new HashSet<>(); 211 for (Cell kv : deleteRow) { 212 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 213 if (columns.add(column)) { 214 // Making deleteColumns() calls more than once for the same cf:qualifier is not correct 215 // Every call to deleteColumns() will add a new KV to the familymap which will finally 216 // get written to the memstore as part of delete(). 217 delete.addColumns(column.family, column.qualifier, ts); 218 } 219 } 220 } else if (deleteType == DeleteType.VERSION) { 221 // When some timestamp was passed to the delete() call only one version of the column (with 222 // given timestamp) will be deleted. If no timestamp passed, it will delete N versions. 223 // How many versions will get deleted depends on the Scan being passed. All the KVs that 224 // the scan fetched will get deleted. 225 int noOfVersionsToDelete = 0; 226 if (timestamp == null) { 227 for (Cell kv : deleteRow) { 228 delete.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp()); 229 noOfVersionsToDelete++; 230 } 231 } else { 232 Set<Column> columns = new HashSet<>(); 233 for (Cell kv : deleteRow) { 234 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 235 // Only one version of particular column getting deleted. 236 if (columns.add(column)) { 237 delete.addColumn(column.family, column.qualifier, ts); 238 noOfVersionsToDelete++; 239 } 240 } 241 } 242 delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete)); 243 } 244 return delete; 245 } 246 247 private static class Column { 248 private byte[] family; 249 private byte[] qualifier; 250 251 public Column(byte[] family, byte[] qualifier) { 252 this.family = family; 253 this.qualifier = qualifier; 254 } 255 256 @Override 257 public boolean equals(Object other) { 258 if (!(other instanceof Column)) { 259 return false; 260 } 261 Column column = (Column) other; 262 return Bytes.equals(this.family, column.family) 263 && Bytes.equals(this.qualifier, column.qualifier); 264 } 265 266 @Override 267 public int hashCode() { 268 int h = 31; 269 h = h + 13 * Bytes.hashCode(this.family); 270 h = h + 13 * Bytes.hashCode(this.qualifier); 271 return h; 272 } 273 } 274 275 @Override 276 public void start(CoprocessorEnvironment env) throws IOException { 277 if (env instanceof RegionCoprocessorEnvironment) { 278 this.env = (RegionCoprocessorEnvironment) env; 279 } else { 280 throw new CoprocessorException("Must be loaded on a table region!"); 281 } 282 } 283 284 @Override 285 public void stop(CoprocessorEnvironment env) throws IOException { 286 // nothing to do 287 } 288}