001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Set; 029import java.util.TreeSet; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.CoprocessorEnvironment; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.Delete; 035import org.apache.hadoop.hbase.client.Mutation; 036import org.apache.hadoop.hbase.client.Scan; 037import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 038import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 040import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest; 041import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType; 042import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse; 043import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService; 044import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 045import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 046import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 047import org.apache.hadoop.hbase.regionserver.OperationStatus; 048import org.apache.hadoop.hbase.regionserver.Region; 049import org.apache.hadoop.hbase.regionserver.RegionScanner; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with 057 * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s) or 058 * version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s) 059 * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers 060 * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be 061 * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions 062 * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be 063 * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions 064 * which the Scan selects will get deleted. <br> 065 * Example: 066 * 067 * <pre> 068 * <code> 069 * Scan scan = new Scan(); 070 * // set scan properties(rowkey range, filters, timerange etc). 071 * HTable ht = ...; 072 * long noOfDeletedRows = 0L; 073 * Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = 074 * new Batch.Call<BulkDeleteService, BulkDeleteResponse>() { 075 * ServerRpcController controller = new ServerRpcController(); 076 * BlockingRpcCallback<BulkDeleteResponse> rpcCallback = 077 * new BlockingRpcCallback<BulkDeleteResponse>(); 078 * 079 * public BulkDeleteResponse call(BulkDeleteService service) throws IOException { 080 * Builder builder = BulkDeleteRequest.newBuilder(); 081 * builder.setScan(ProtobufUtil.toScan(scan)); 082 * builder.setDeleteType(DeleteType.VERSION); 083 * builder.setRowBatchSize(rowBatchSize); 084 * // Set optional timestamp if needed 085 * builder.setTimestamp(timeStamp); 086 * service.delete(controller, builder.build(), rpcCallback); 087 * return rpcCallback.get(); 088 * } 089 * }; 090 * Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, 091 * scan.getStartRow(), scan.getStopRow(), callable); 092 * for (BulkDeleteResponse response : result.values()) { 093 * noOfDeletedRows += response.getRowsDeleted(); 094 * } 095 * </code> 096 * </pre> 097 */ 098@InterfaceAudience.Private 099public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor { 100 private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete"; 101 private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteEndpoint.class); 102 103 private RegionCoprocessorEnvironment env; 104 105 @Override 106 public Iterable<Service> getServices() { 107 return Collections.singleton(this); 108 } 109 110 @Override 111 public void delete(RpcController controller, BulkDeleteRequest request, 112 RpcCallback<BulkDeleteResponse> done) { 113 long totalRowsDeleted = 0L; 114 long totalVersionsDeleted = 0L; 115 Region region = env.getRegion(); 116 int rowBatchSize = request.getRowBatchSize(); 117 Long timestamp = null; 118 if (request.hasTimestamp()) { 119 timestamp = request.getTimestamp(); 120 } 121 DeleteType deleteType = request.getDeleteType(); 122 boolean hasMore = true; 123 RegionScanner scanner = null; 124 try { 125 Scan scan = ProtobufUtil.toScan(request.getScan()); 126 if (scan.getFilter() == null && deleteType == DeleteType.ROW) { 127 // What we need is just the rowkeys. So only 1st KV from any row is enough. 128 // Only when it is a row delete, we can apply this filter. 129 // In other types we rely on the scan to know which all columns to be deleted. 130 scan.setFilter(new FirstKeyOnlyFilter()); 131 } 132 // Here by assume that the scan is perfect with the appropriate 133 // filter and having necessary column(s). 134 scanner = region.getScanner(scan); 135 while (hasMore) { 136 List<List<Cell>> deleteRows = new ArrayList<>(rowBatchSize); 137 for (int i = 0; i < rowBatchSize; i++) { 138 List<Cell> results = new ArrayList<>(); 139 hasMore = scanner.next(results); 140 if (results.size() > 0) { 141 deleteRows.add(results); 142 } 143 if (!hasMore) { 144 // There are no more rows. 145 break; 146 } 147 } 148 if (deleteRows.size() > 0) { 149 Mutation[] deleteArr = new Mutation[deleteRows.size()]; 150 int i = 0; 151 for (List<Cell> deleteRow : deleteRows) { 152 deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); 153 } 154 OperationStatus[] opStatus = region.batchMutate(deleteArr); 155 for (i = 0; i < opStatus.length; i++) { 156 if (opStatus[i].getOperationStatusCode() != HConstants.OperationStatusCode.SUCCESS) { 157 break; 158 } 159 totalRowsDeleted++; 160 if (deleteType == DeleteType.VERSION) { 161 byte[] versionsDeleted = deleteArr[i].getAttribute(NO_OF_VERSIONS_TO_DELETE); 162 if (versionsDeleted != null) { 163 totalVersionsDeleted += Bytes.toInt(versionsDeleted); 164 } 165 } 166 } 167 } 168 } 169 } catch (IOException ioe) { 170 LOG.error(ioe.toString(), ioe); 171 // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side. 172 CoprocessorRpcUtils.setControllerException(controller, ioe); 173 } finally { 174 if (scanner != null) { 175 try { 176 scanner.close(); 177 } catch (IOException ioe) { 178 LOG.error(ioe.toString(), ioe); 179 } 180 } 181 } 182 BulkDeleteResponse.Builder responseBuilder = BulkDeleteResponse.newBuilder(); 183 responseBuilder.setRowsDeleted(totalRowsDeleted); 184 if (deleteType == DeleteType.VERSION) { 185 responseBuilder.setVersionsDeleted(totalVersionsDeleted); 186 } 187 BulkDeleteResponse result = responseBuilder.build(); 188 done.run(result); 189 } 190 191 private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType, Long timestamp) { 192 long ts; 193 if (timestamp == null) { 194 ts = HConstants.LATEST_TIMESTAMP; 195 } else { 196 ts = timestamp; 197 } 198 // We just need the rowkey. Get it from 1st KV. 199 byte[] row = CellUtil.cloneRow(deleteRow.get(0)); 200 Delete delete = new Delete(row, ts); 201 if (deleteType == DeleteType.FAMILY) { 202 Set<byte[]> families = new TreeSet<>(Bytes.BYTES_COMPARATOR); 203 for (Cell kv : deleteRow) { 204 if (families.add(CellUtil.cloneFamily(kv))) { 205 delete.addFamily(CellUtil.cloneFamily(kv), ts); 206 } 207 } 208 } else if (deleteType == DeleteType.COLUMN) { 209 Set<Column> columns = new HashSet<>(); 210 for (Cell kv : deleteRow) { 211 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 212 if (columns.add(column)) { 213 // Making deleteColumns() calls more than once for the same cf:qualifier is not correct 214 // Every call to deleteColumns() will add a new KV to the familymap which will finally 215 // get written to the memstore as part of delete(). 216 delete.addColumns(column.family, column.qualifier, ts); 217 } 218 } 219 } else if (deleteType == DeleteType.VERSION) { 220 // When some timestamp was passed to the delete() call only one version of the column (with 221 // given timestamp) will be deleted. If no timestamp passed, it will delete N versions. 222 // How many versions will get deleted depends on the Scan being passed. All the KVs that 223 // the scan fetched will get deleted. 224 int noOfVersionsToDelete = 0; 225 if (timestamp == null) { 226 for (Cell kv : deleteRow) { 227 delete.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 228 kv.getTimestamp()); 229 noOfVersionsToDelete++; 230 } 231 } else { 232 Set<Column> columns = new HashSet<>(); 233 for (Cell kv : deleteRow) { 234 Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)); 235 // Only one version of particular column getting deleted. 236 if (columns.add(column)) { 237 delete.addColumn(column.family, column.qualifier, ts); 238 noOfVersionsToDelete++; 239 } 240 } 241 } 242 delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete)); 243 } 244 return delete; 245 } 246 247 private static class Column { 248 private byte[] family; 249 private byte[] qualifier; 250 251 public Column(byte[] family, byte[] qualifier) { 252 this.family = family; 253 this.qualifier = qualifier; 254 } 255 256 @Override 257 public boolean equals(Object other) { 258 if (!(other instanceof Column)) { 259 return false; 260 } 261 Column column = (Column) other; 262 return Bytes.equals(this.family, column.family) 263 && Bytes.equals(this.qualifier, column.qualifier); 264 } 265 266 @Override 267 public int hashCode() { 268 int h = 31; 269 h = h + 13 * Bytes.hashCode(this.family); 270 h = h + 13 * Bytes.hashCode(this.qualifier); 271 return h; 272 } 273 } 274 275 @Override 276 public void start(CoprocessorEnvironment env) throws IOException { 277 if (env instanceof RegionCoprocessorEnvironment) { 278 this.env = (RegionCoprocessorEnvironment) env; 279 } else { 280 throw new CoprocessorException("Must be loaded on a table region!"); 281 } 282 } 283 284 @Override 285 public void stop(CoprocessorEnvironment env) throws IOException { 286 // nothing to do 287 } 288}