001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.TreeSet; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.CoprocessorEnvironment; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 032import org.apache.hadoop.hbase.client.Delete; 033import org.apache.hadoop.hbase.client.Mutation; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 037import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 038import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 039import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 040import org.apache.hadoop.hbase.regionserver.OperationStatus; 041import org.apache.hadoop.hbase.regionserver.Region; 042import org.apache.hadoop.hbase.regionserver.RegionScanner; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 049import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 050import org.apache.hbase.thirdparty.com.google.protobuf.Service; 051 052import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest; 053import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType; 054import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse; 055import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder; 056import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService; 057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 058 059/** 060 * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with 061 * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s) or 062 * version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s) 063 * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers 064 * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be 065 * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions 066 * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be 067 * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions 068 * which the Scan selects will get deleted. <br> 069 * Example: 070 * 071 * <pre> 072 * <code> 073 * Scan scan = new Scan(); 074 * // set scan properties(rowkey range, filters, timerange etc). 075 * HTable ht = ...; 076 * long noOfDeletedRows = 0L; 077 * Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = 078 * new Batch.Call<BulkDeleteService, BulkDeleteResponse>() { 079 * ServerRpcController controller = new ServerRpcController(); 080 * BlockingRpcCallback<BulkDeleteResponse> rpcCallback = 081 * new BlockingRpcCallback<BulkDeleteResponse>(); 082 * 083 * public BulkDeleteResponse call(BulkDeleteService service) throws IOException { 084 * Builder builder = BulkDeleteRequest.newBuilder(); 085 * builder.setScan(ProtobufUtil.toScan(scan)); 086 * builder.setDeleteType(DeleteType.VERSION); 087 * builder.setRowBatchSize(rowBatchSize); 088 * // Set optional timestamp if needed 089 * builder.setTimestamp(timeStamp); 090 * service.delete(controller, builder.build(), rpcCallback); 091 * return rpcCallback.get(); 092 * } 093 * }; 094 * Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, 095 * scan.getStartRow(), scan.getStopRow(), callable); 096 * for (BulkDeleteResponse response : result.values()) { 097 * noOfDeletedRows += response.getRowsDeleted(); 098 * } 099 * </code> 100 * </pre> 101 */ 102@InterfaceAudience.Private 103public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor { 104 private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete"; 105 private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteEndpoint.class); 106 107 private RegionCoprocessorEnvironment env; 108 109 @Override 110 public Iterable<Service> getServices() { 111 return Collections.singleton(this); 112 } 113 114 @Override 115 public void delete(RpcController controller, BulkDeleteRequest request, 116 RpcCallback<BulkDeleteResponse> done) { 117 long totalRowsDeleted = 0L; 118 long totalVersionsDeleted = 0L; 119 Region region = env.getRegion(); 120 int rowBatchSize = request.getRowBatchSize(); 121 Long timestamp = null; 122 if (request.hasTimestamp()) { 123 timestamp = request.getTimestamp(); 124 } 125 DeleteType deleteType = request.getDeleteType(); 126 boolean hasMore = true; 127 RegionScanner scanner = null; 128 try { 129 Scan scan = ProtobufUtil.toScan(request.getScan()); 130 if (scan.getFilter() == null && deleteType == DeleteType.ROW) { 131 // What we need is just the rowkeys. So only 1st KV from any row is enough. 132 // Only when it is a row delete, we can apply this filter. 133 // In other types we rely on the scan to know which all columns to be deleted. 134 scan.setFilter(new FirstKeyOnlyFilter()); 135 } 136 // Here by assume that the scan is perfect with the appropriate 137 // filter and having necessary column(s). 138 scanner = region.getScanner(scan); 139 while (hasMore) { 140 List<List<Cell>> deleteRows = new ArrayList<>(rowBatchSize); 141 for (int i = 0; i < rowBatchSize; i++) { 142 List<Cell> results = new ArrayList<>(); 143 hasMore = scanner.next(results); 144 if (results.size() > 0) { 145 deleteRows.add(results); 146 } 147 if (!hasMore) { 148 // There are no more rows. 149 break; 150 } 151 } 152 if (deleteRows.size() > 0) { 153 Mutation[] deleteArr = new Mutation[deleteRows.size()]; 154 int i = 0; 155 for (List<Cell> deleteRow : deleteRows) { 156 deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); 157 } 158 OperationStatus[] opStatus = region.batchMutate(deleteArr); 159 for (i = 0; i < opStatus.length; i++) { 160 if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { 161 break; 162 } 163 totalRowsDeleted++; 164 if (deleteType == DeleteType.VERSION) { 165 byte[] versionsDeleted = deleteArr[i].getAttribute(NO_OF_VERSIONS_TO_DELETE); 166 if (versionsDeleted != null) { 167 totalVersionsDeleted += Bytes.toInt(versionsDeleted); 168 } 169 } 170 } 171 } 172 } 173 } catch (IOException ioe) { 174 LOG.error(ioe.toString(), ioe); 175 // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side. 176 CoprocessorRpcUtils.setControllerException(controller, ioe); 177 } finally { 178 if (scanner != null) { 179 try { 180 scanner.close(); 181 } catch (IOException ioe) { 182 LOG.error(ioe.toString(), ioe); 183 } 184 } 185 } 186 Builder responseBuilder = BulkDeleteResponse.newBuilder(); 187 responseBuilder.setRowsDeleted(totalRowsDeleted); 188 if (deleteType == DeleteType.VERSION) { 189 responseBuilder.setVersionsDeleted(totalVersionsDeleted); 190 } 191 BulkDeleteResponse result = responseBuilder.build(); 192 done.run(result); 193 } 194 195 private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType, Long timestamp) { 196 long ts; 197 if (timestamp == null) { 198 ts = HConstants.LATEST_TIMESTAMP; 199 } else { 200 ts = timestamp; 201 } 202 // We just need the rowkey. Get it from 1st KV. 203 byte[] row = CellUtil.cloneRow(deleteRow.get(0)); 204 Delete delete = new Delete(row, ts); 205 if (deleteType == DeleteType.FAMILY) { 206 Set<byte[]> families = new TreeSet<>(Bytes.BYTES_COMPARATOR); 207 for (Cell kv : deleteRow) { 208 if (families.add(CellUtil.cloneFamily(kv))) { 209 delete.addFamily(CellUtil.cloneFamily(kv), ts); 210 } 211 } 212 } else if (deleteType == DeleteType.COLUMN) { 213 Set<Column> columns = new HashSet<>(); 214 for (Cell kv : deleteRow) { 215 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 216 if (columns.add(column)) { 217 // Making deleteColumns() calls more than once for the same cf:qualifier is not correct 218 // Every call to deleteColumns() will add a new KV to the familymap which will finally 219 // get written to the memstore as part of delete(). 220 delete.addColumns(column.family, column.qualifier, ts); 221 } 222 } 223 } else if (deleteType == DeleteType.VERSION) { 224 // When some timestamp was passed to the delete() call only one version of the column (with 225 // given timestamp) will be deleted. If no timestamp passed, it will delete N versions. 226 // How many versions will get deleted depends on the Scan being passed. All the KVs that 227 // the scan fetched will get deleted. 228 int noOfVersionsToDelete = 0; 229 if (timestamp == null) { 230 for (Cell kv : deleteRow) { 231 delete.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 232 kv.getTimestamp()); 233 noOfVersionsToDelete++; 234 } 235 } else { 236 Set<Column> columns = new HashSet<>(); 237 for (Cell kv : deleteRow) { 238 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 239 // Only one version of particular column getting deleted. 240 if (columns.add(column)) { 241 delete.addColumn(column.family, column.qualifier, ts); 242 noOfVersionsToDelete++; 243 } 244 } 245 } 246 delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete)); 247 } 248 return delete; 249 } 250 251 private static class Column { 252 private byte[] family; 253 private byte[] qualifier; 254 255 public Column(byte[] family, byte[] qualifier) { 256 this.family = family; 257 this.qualifier = qualifier; 258 } 259 260 @Override 261 public boolean equals(Object other) { 262 if (!(other instanceof Column)) { 263 return false; 264 } 265 Column column = (Column) other; 266 return Bytes.equals(this.family, column.family) 267 && Bytes.equals(this.qualifier, column.qualifier); 268 } 269 270 @Override 271 public int hashCode() { 272 int h = 31; 273 h = h + 13 * Bytes.hashCode(this.family); 274 h = h + 13 * Bytes.hashCode(this.qualifier); 275 return h; 276 } 277 } 278 279 @Override 280 public void start(CoprocessorEnvironment env) throws IOException { 281 if (env instanceof RegionCoprocessorEnvironment) { 282 this.env = (RegionCoprocessorEnvironment) env; 283 } else { 284 throw new CoprocessorException("Must be loaded on a table region!"); 285 } 286 } 287 288 @Override 289 public void stop(CoprocessorEnvironment env) throws IOException { 290 // nothing to do 291 } 292}