001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.TreeSet;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.CoprocessorEnvironment;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
032import org.apache.hadoop.hbase.client.Delete;
033import org.apache.hadoop.hbase.client.Mutation;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
037import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
038import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
039import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
040import org.apache.hadoop.hbase.regionserver.OperationStatus;
041import org.apache.hadoop.hbase.regionserver.Region;
042import org.apache.hadoop.hbase.regionserver.RegionScanner;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
049import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
050import org.apache.hbase.thirdparty.com.google.protobuf.Service;
051
052import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
053import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
054import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
055import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder;
056import org.apache.hadoop.hbase.shaded.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058
059/**
060 * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
061 * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s) or
062 * version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s)
063 * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers
064 * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be
065 * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions
066 * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be
067 * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions
068 * which the Scan selects will get deleted. <br>
069 * Example:
070 *
071 * <pre>
072 * <code>
073 * Scan scan = new Scan();
074 * // set scan properties(rowkey range, filters, timerange etc).
075 * HTable ht = ...;
076 * long noOfDeletedRows = 0L;
077 * Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt; callable =
078 *     new Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt;() {
079 *   ServerRpcController controller = new ServerRpcController();
080 *   BlockingRpcCallback&lt;BulkDeleteResponse&gt; rpcCallback =
081 *     new BlockingRpcCallback&lt;BulkDeleteResponse&gt;();
082 *
083 *   public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
084 *     Builder builder = BulkDeleteRequest.newBuilder();
085 *     builder.setScan(ProtobufUtil.toScan(scan));
086 *     builder.setDeleteType(DeleteType.VERSION);
087 *     builder.setRowBatchSize(rowBatchSize);
088 *     // Set optional timestamp if needed
089 *     builder.setTimestamp(timeStamp);
090 *     service.delete(controller, builder.build(), rpcCallback);
091 *     return rpcCallback.get();
092 *   }
093 * };
094 * Map&lt;byte[], BulkDeleteResponse&gt; result = ht.coprocessorService(BulkDeleteService.class,
095 *  scan.getStartRow(), scan.getStopRow(), callable);
096 * for (BulkDeleteResponse response : result.values()) {
097 *   noOfDeletedRows += response.getRowsDeleted();
098 * }
099 * </code>
100 * </pre>
101 */
102@InterfaceAudience.Private
103public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor {
104  private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
105  private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteEndpoint.class);
106
107  private RegionCoprocessorEnvironment env;
108
109  @Override
110  public Iterable<Service> getServices() {
111    return Collections.singleton(this);
112  }
113
114  @Override
115  public void delete(RpcController controller, BulkDeleteRequest request,
116    RpcCallback<BulkDeleteResponse> done) {
117    long totalRowsDeleted = 0L;
118    long totalVersionsDeleted = 0L;
119    Region region = env.getRegion();
120    int rowBatchSize = request.getRowBatchSize();
121    Long timestamp = null;
122    if (request.hasTimestamp()) {
123      timestamp = request.getTimestamp();
124    }
125    DeleteType deleteType = request.getDeleteType();
126    boolean hasMore = true;
127    RegionScanner scanner = null;
128    try {
129      Scan scan = ProtobufUtil.toScan(request.getScan());
130      if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
131        // What we need is just the rowkeys. So only 1st KV from any row is enough.
132        // Only when it is a row delete, we can apply this filter.
133        // In other types we rely on the scan to know which all columns to be deleted.
134        scan.setFilter(new FirstKeyOnlyFilter());
135      }
136      // Here by assume that the scan is perfect with the appropriate
137      // filter and having necessary column(s).
138      scanner = region.getScanner(scan);
139      while (hasMore) {
140        List<List<Cell>> deleteRows = new ArrayList<>(rowBatchSize);
141        for (int i = 0; i < rowBatchSize; i++) {
142          List<Cell> results = new ArrayList<>();
143          hasMore = scanner.next(results);
144          if (results.size() > 0) {
145            deleteRows.add(results);
146          }
147          if (!hasMore) {
148            // There are no more rows.
149            break;
150          }
151        }
152        if (deleteRows.size() > 0) {
153          Mutation[] deleteArr = new Mutation[deleteRows.size()];
154          int i = 0;
155          for (List<Cell> deleteRow : deleteRows) {
156            deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
157          }
158          OperationStatus[] opStatus = region.batchMutate(deleteArr);
159          for (i = 0; i < opStatus.length; i++) {
160            if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
161              break;
162            }
163            totalRowsDeleted++;
164            if (deleteType == DeleteType.VERSION) {
165              byte[] versionsDeleted = deleteArr[i].getAttribute(NO_OF_VERSIONS_TO_DELETE);
166              if (versionsDeleted != null) {
167                totalVersionsDeleted += Bytes.toInt(versionsDeleted);
168              }
169            }
170          }
171        }
172      }
173    } catch (IOException ioe) {
174      LOG.error(ioe.toString(), ioe);
175      // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side.
176      CoprocessorRpcUtils.setControllerException(controller, ioe);
177    } finally {
178      if (scanner != null) {
179        try {
180          scanner.close();
181        } catch (IOException ioe) {
182          LOG.error(ioe.toString(), ioe);
183        }
184      }
185    }
186    Builder responseBuilder = BulkDeleteResponse.newBuilder();
187    responseBuilder.setRowsDeleted(totalRowsDeleted);
188    if (deleteType == DeleteType.VERSION) {
189      responseBuilder.setVersionsDeleted(totalVersionsDeleted);
190    }
191    BulkDeleteResponse result = responseBuilder.build();
192    done.run(result);
193  }
194
195  private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType, Long timestamp) {
196    long ts;
197    if (timestamp == null) {
198      ts = HConstants.LATEST_TIMESTAMP;
199    } else {
200      ts = timestamp;
201    }
202    // We just need the rowkey. Get it from 1st KV.
203    byte[] row = CellUtil.cloneRow(deleteRow.get(0));
204    Delete delete = new Delete(row, ts);
205    if (deleteType == DeleteType.FAMILY) {
206      Set<byte[]> families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
207      for (Cell kv : deleteRow) {
208        if (families.add(CellUtil.cloneFamily(kv))) {
209          delete.addFamily(CellUtil.cloneFamily(kv), ts);
210        }
211      }
212    } else if (deleteType == DeleteType.COLUMN) {
213      Set<Column> columns = new HashSet<>();
214      for (Cell kv : deleteRow) {
215        Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
216        if (columns.add(column)) {
217          // Making deleteColumns() calls more than once for the same cf:qualifier is not correct
218          // Every call to deleteColumns() will add a new KV to the familymap which will finally
219          // get written to the memstore as part of delete().
220          delete.addColumns(column.family, column.qualifier, ts);
221        }
222      }
223    } else if (deleteType == DeleteType.VERSION) {
224      // When some timestamp was passed to the delete() call only one version of the column (with
225      // given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
226      // How many versions will get deleted depends on the Scan being passed. All the KVs that
227      // the scan fetched will get deleted.
228      int noOfVersionsToDelete = 0;
229      if (timestamp == null) {
230        for (Cell kv : deleteRow) {
231          delete.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
232            kv.getTimestamp());
233          noOfVersionsToDelete++;
234        }
235      } else {
236        Set<Column> columns = new HashSet<>();
237        for (Cell kv : deleteRow) {
238          Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
239          // Only one version of particular column getting deleted.
240          if (columns.add(column)) {
241            delete.addColumn(column.family, column.qualifier, ts);
242            noOfVersionsToDelete++;
243          }
244        }
245      }
246      delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
247    }
248    return delete;
249  }
250
251  private static class Column {
252    private byte[] family;
253    private byte[] qualifier;
254
255    public Column(byte[] family, byte[] qualifier) {
256      this.family = family;
257      this.qualifier = qualifier;
258    }
259
260    @Override
261    public boolean equals(Object other) {
262      if (!(other instanceof Column)) {
263        return false;
264      }
265      Column column = (Column) other;
266      return Bytes.equals(this.family, column.family)
267        && Bytes.equals(this.qualifier, column.qualifier);
268    }
269
270    @Override
271    public int hashCode() {
272      int h = 31;
273      h = h + 13 * Bytes.hashCode(this.family);
274      h = h + 13 * Bytes.hashCode(this.qualifier);
275      return h;
276    }
277  }
278
279  @Override
280  public void start(CoprocessorEnvironment env) throws IOException {
281    if (env instanceof RegionCoprocessorEnvironment) {
282      this.env = (RegionCoprocessorEnvironment) env;
283    } else {
284      throw new CoprocessorException("Must be loaded on a table region!");
285    }
286  }
287
288  @Override
289  public void stop(CoprocessorEnvironment env) throws IOException {
290    // nothing to do
291  }
292}