001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.TreeSet; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.CoprocessorEnvironment; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.client.Delete; 032import org.apache.hadoop.hbase.client.Mutation; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 037import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 038import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 039import org.apache.hadoop.hbase.regionserver.OperationStatus; 040import org.apache.hadoop.hbase.regionserver.Region; 041import org.apache.hadoop.hbase.regionserver.RegionScanner; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 048import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 049import org.apache.hbase.thirdparty.com.google.protobuf.Service; 050 051import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest; 052import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType; 053import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse; 054import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService; 055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 056 057/** 058 * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with 059 * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s) or 060 * version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s) 061 * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers 062 * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be 063 * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions 064 * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be 065 * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions 066 * which the Scan selects will get deleted. <br> 067 * Example: 068 * 069 * <pre> 070 * <code> 071 * Scan scan = new Scan(); 072 * // set scan properties(rowkey range, filters, timerange etc). 073 * HTable ht = ...; 074 * long noOfDeletedRows = 0L; 075 * Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = 076 * new Batch.Call<BulkDeleteService, BulkDeleteResponse>() { 077 * ServerRpcController controller = new ServerRpcController(); 078 * BlockingRpcCallback<BulkDeleteResponse> rpcCallback = 079 * new BlockingRpcCallback<BulkDeleteResponse>(); 080 * 081 * public BulkDeleteResponse call(BulkDeleteService service) throws IOException { 082 * Builder builder = BulkDeleteRequest.newBuilder(); 083 * builder.setScan(ProtobufUtil.toScan(scan)); 084 * builder.setDeleteType(DeleteType.VERSION); 085 * builder.setRowBatchSize(rowBatchSize); 086 * // Set optional timestamp if needed 087 * builder.setTimestamp(timeStamp); 088 * service.delete(controller, builder.build(), rpcCallback); 089 * return rpcCallback.get(); 090 * } 091 * }; 092 * Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, 093 * scan.getStartRow(), scan.getStopRow(), callable); 094 * for (BulkDeleteResponse response : result.values()) { 095 * noOfDeletedRows += response.getRowsDeleted(); 096 * } 097 * </code> 098 * </pre> 099 */ 100@InterfaceAudience.Private 101public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor { 102 private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete"; 103 private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteEndpoint.class); 104 105 private RegionCoprocessorEnvironment env; 106 107 @Override 108 public Iterable<Service> getServices() { 109 return Collections.singleton(this); 110 } 111 112 @Override 113 public void delete(RpcController controller, BulkDeleteRequest request, 114 RpcCallback<BulkDeleteResponse> done) { 115 long totalRowsDeleted = 0L; 116 long totalVersionsDeleted = 0L; 117 Region region = env.getRegion(); 118 int rowBatchSize = request.getRowBatchSize(); 119 Long timestamp = null; 120 if (request.hasTimestamp()) { 121 timestamp = request.getTimestamp(); 122 } 123 DeleteType deleteType = request.getDeleteType(); 124 boolean hasMore = true; 125 RegionScanner scanner = null; 126 try { 127 Scan scan = ProtobufUtil.toScan(request.getScan()); 128 if (scan.getFilter() == null && deleteType == DeleteType.ROW) { 129 // What we need is just the rowkeys. So only 1st KV from any row is enough. 130 // Only when it is a row delete, we can apply this filter. 131 // In other types we rely on the scan to know which all columns to be deleted. 132 scan.setFilter(new FirstKeyOnlyFilter()); 133 } 134 // Here by assume that the scan is perfect with the appropriate 135 // filter and having necessary column(s). 136 scanner = region.getScanner(scan); 137 while (hasMore) { 138 List<List<Cell>> deleteRows = new ArrayList<>(rowBatchSize); 139 for (int i = 0; i < rowBatchSize; i++) { 140 List<Cell> results = new ArrayList<>(); 141 hasMore = scanner.next(results); 142 if (results.size() > 0) { 143 deleteRows.add(results); 144 } 145 if (!hasMore) { 146 // There are no more rows. 147 break; 148 } 149 } 150 if (deleteRows.size() > 0) { 151 Mutation[] deleteArr = new Mutation[deleteRows.size()]; 152 int i = 0; 153 for (List<Cell> deleteRow : deleteRows) { 154 deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); 155 } 156 OperationStatus[] opStatus = region.batchMutate(deleteArr); 157 for (i = 0; i < opStatus.length; i++) { 158 if (opStatus[i].getOperationStatusCode() != HConstants.OperationStatusCode.SUCCESS) { 159 break; 160 } 161 totalRowsDeleted++; 162 if (deleteType == DeleteType.VERSION) { 163 byte[] versionsDeleted = deleteArr[i].getAttribute(NO_OF_VERSIONS_TO_DELETE); 164 if (versionsDeleted != null) { 165 totalVersionsDeleted += Bytes.toInt(versionsDeleted); 166 } 167 } 168 } 169 } 170 } 171 } catch (IOException ioe) { 172 LOG.error(ioe.toString(), ioe); 173 // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side. 174 CoprocessorRpcUtils.setControllerException(controller, ioe); 175 } finally { 176 if (scanner != null) { 177 try { 178 scanner.close(); 179 } catch (IOException ioe) { 180 LOG.error(ioe.toString(), ioe); 181 } 182 } 183 } 184 BulkDeleteResponse.Builder responseBuilder = BulkDeleteResponse.newBuilder(); 185 responseBuilder.setRowsDeleted(totalRowsDeleted); 186 if (deleteType == DeleteType.VERSION) { 187 responseBuilder.setVersionsDeleted(totalVersionsDeleted); 188 } 189 BulkDeleteResponse result = responseBuilder.build(); 190 done.run(result); 191 } 192 193 private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType, Long timestamp) { 194 long ts; 195 if (timestamp == null) { 196 ts = HConstants.LATEST_TIMESTAMP; 197 } else { 198 ts = timestamp; 199 } 200 // We just need the rowkey. Get it from 1st KV. 201 byte[] row = CellUtil.cloneRow(deleteRow.get(0)); 202 Delete delete = new Delete(row, ts); 203 if (deleteType == DeleteType.FAMILY) { 204 Set<byte[]> families = new TreeSet<>(Bytes.BYTES_COMPARATOR); 205 for (Cell kv : deleteRow) { 206 if (families.add(CellUtil.cloneFamily(kv))) { 207 delete.addFamily(CellUtil.cloneFamily(kv), ts); 208 } 209 } 210 } else if (deleteType == DeleteType.COLUMN) { 211 Set<Column> columns = new HashSet<>(); 212 for (Cell kv : deleteRow) { 213 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 214 if (columns.add(column)) { 215 // Making deleteColumns() calls more than once for the same cf:qualifier is not correct 216 // Every call to deleteColumns() will add a new KV to the familymap which will finally 217 // get written to the memstore as part of delete(). 218 delete.addColumns(column.family, column.qualifier, ts); 219 } 220 } 221 } else if (deleteType == DeleteType.VERSION) { 222 // When some timestamp was passed to the delete() call only one version of the column (with 223 // given timestamp) will be deleted. If no timestamp passed, it will delete N versions. 224 // How many versions will get deleted depends on the Scan being passed. All the KVs that 225 // the scan fetched will get deleted. 226 int noOfVersionsToDelete = 0; 227 if (timestamp == null) { 228 for (Cell kv : deleteRow) { 229 delete.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 230 kv.getTimestamp()); 231 noOfVersionsToDelete++; 232 } 233 } else { 234 Set<Column> columns = new HashSet<>(); 235 for (Cell kv : deleteRow) { 236 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 237 // Only one version of particular column getting deleted. 238 if (columns.add(column)) { 239 delete.addColumn(column.family, column.qualifier, ts); 240 noOfVersionsToDelete++; 241 } 242 } 243 } 244 delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete)); 245 } 246 return delete; 247 } 248 249 private static class Column { 250 private byte[] family; 251 private byte[] qualifier; 252 253 public Column(byte[] family, byte[] qualifier) { 254 this.family = family; 255 this.qualifier = qualifier; 256 } 257 258 @Override 259 public boolean equals(Object other) { 260 if (!(other instanceof Column)) { 261 return false; 262 } 263 Column column = (Column) other; 264 return Bytes.equals(this.family, column.family) 265 && Bytes.equals(this.qualifier, column.qualifier); 266 } 267 268 @Override 269 public int hashCode() { 270 int h = 31; 271 h = h + 13 * Bytes.hashCode(this.family); 272 h = h + 13 * Bytes.hashCode(this.qualifier); 273 return h; 274 } 275 } 276 277 @Override 278 public void start(CoprocessorEnvironment env) throws IOException { 279 if (env instanceof RegionCoprocessorEnvironment) { 280 this.env = (RegionCoprocessorEnvironment) env; 281 } else { 282 throw new CoprocessorException("Must be loaded on a table region!"); 283 } 284 } 285 286 @Override 287 public void stop(CoprocessorEnvironment env) throws IOException { 288 // nothing to do 289 } 290}