001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import com.google.protobuf.RpcCallback;
021import com.google.protobuf.RpcController;
022import com.google.protobuf.Service;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Set;
029import java.util.TreeSet;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.CoprocessorEnvironment;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Mutation;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
041import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
042import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
043import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
044import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder;
045import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
046import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
047import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
048import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
049import org.apache.hadoop.hbase.regionserver.OperationStatus;
050import org.apache.hadoop.hbase.regionserver.Region;
051import org.apache.hadoop.hbase.regionserver.RegionScanner;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
059 * conditions(filters) etc.This can be used to delete rows, column family(s), column qualifier(s)
060 * or version(s) of columns.When delete type is FAMILY or COLUMN, which all family(s) or column(s)
061 * getting deleted will be determined by the Scan. Scan need to select all the families/qualifiers
062 * which need to be deleted.When delete type is VERSION, Which column(s) and version(s) to be
063 * deleted will be determined by the Scan. Scan need to select all the qualifiers and its versions
064 * which needs to be deleted.When a timestamp is passed only one version at that timestamp will be
065 * deleted(even if Scan fetches many versions). When timestamp passed as null, all the versions
066 * which the Scan selects will get deleted.
067 *
068 * <br> Example: <pre><code>
069 * Scan scan = new Scan();
070 * // set scan properties(rowkey range, filters, timerange etc).
071 * HTable ht = ...;
072 * long noOfDeletedRows = 0L;
073 * Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt; callable =
074 *     new Batch.Call&lt;BulkDeleteService, BulkDeleteResponse&gt;() {
075 *   ServerRpcController controller = new ServerRpcController();
076 *   BlockingRpcCallback&lt;BulkDeleteResponse&gt; rpcCallback =
077 *     new BlockingRpcCallback&lt;BulkDeleteResponse&gt;();
078 *
079 *   public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
080 *     Builder builder = BulkDeleteRequest.newBuilder();
081 *     builder.setScan(ProtobufUtil.toScan(scan));
082 *     builder.setDeleteType(DeleteType.VERSION);
083 *     builder.setRowBatchSize(rowBatchSize);
084 *     // Set optional timestamp if needed
085 *     builder.setTimestamp(timeStamp);
086 *     service.delete(controller, builder.build(), rpcCallback);
087 *     return rpcCallback.get();
088 *   }
089 * };
090 * Map&lt;byte[], BulkDeleteResponse&gt; result = ht.coprocessorService(BulkDeleteService.class,
091 *  scan.getStartRow(), scan.getStopRow(), callable);
092 * for (BulkDeleteResponse response : result.values()) {
093 *   noOfDeletedRows += response.getRowsDeleted();
094 * }
095 * </code></pre>
096 */
097@InterfaceAudience.Private
098public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCoprocessor {
099  private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
100  private static final Logger LOG = LoggerFactory.getLogger(BulkDeleteEndpoint.class);
101
102  private RegionCoprocessorEnvironment env;
103
104  @Override
105  public Iterable<Service> getServices() {
106    return Collections.singleton(this);
107  }
108
109  @Override
110  public void delete(RpcController controller, BulkDeleteRequest request,
111      RpcCallback<BulkDeleteResponse> done) {
112    long totalRowsDeleted = 0L;
113    long totalVersionsDeleted = 0L;
114    Region region = env.getRegion();
115    int rowBatchSize = request.getRowBatchSize();
116    Long timestamp = null;
117    if (request.hasTimestamp()) {
118      timestamp = request.getTimestamp();
119    }
120    DeleteType deleteType = request.getDeleteType();
121    boolean hasMore = true;
122    RegionScanner scanner = null;
123    try {
124      Scan scan = ProtobufUtil.toScan(request.getScan());
125      if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
126        // What we need is just the rowkeys. So only 1st KV from any row is enough.
127        // Only when it is a row delete, we can apply this filter.
128        // In other types we rely on the scan to know which all columns to be deleted.
129        scan.setFilter(new FirstKeyOnlyFilter());
130      }
131      // Here by assume that the scan is perfect with the appropriate
132      // filter and having necessary column(s).
133      scanner = region.getScanner(scan);
134      while (hasMore) {
135        List<List<Cell>> deleteRows = new ArrayList<>(rowBatchSize);
136        for (int i = 0; i < rowBatchSize; i++) {
137          List<Cell> results = new ArrayList<>();
138          hasMore = scanner.next(results);
139          if (results.size() > 0) {
140            deleteRows.add(results);
141          }
142          if (!hasMore) {
143            // There are no more rows.
144            break;
145          }
146        }
147        if (deleteRows.size() > 0) {
148          Mutation[] deleteArr = new Mutation[deleteRows.size()];
149          int i = 0;
150          for (List<Cell> deleteRow : deleteRows) {
151            deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
152          }
153          OperationStatus[] opStatus = region.batchMutate(deleteArr);
154          for (i = 0; i < opStatus.length; i++) {
155            if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
156              break;
157            }
158            totalRowsDeleted++;
159            if (deleteType == DeleteType.VERSION) {
160              byte[] versionsDeleted = deleteArr[i].getAttribute(
161                  NO_OF_VERSIONS_TO_DELETE);
162              if (versionsDeleted != null) {
163                totalVersionsDeleted += Bytes.toInt(versionsDeleted);
164              }
165            }
166          }
167        }
168      }
169    } catch (IOException ioe) {
170      LOG.error(ioe.toString(), ioe);
171      // Call ServerRpcController#getFailedOn() to retrieve this IOException at client side.
172      CoprocessorRpcUtils.setControllerException(controller, ioe);
173    } finally {
174      if (scanner != null) {
175        try {
176          scanner.close();
177        } catch (IOException ioe) {
178          LOG.error(ioe.toString(), ioe);
179        }
180      }
181    }
182    Builder responseBuilder = BulkDeleteResponse.newBuilder();
183    responseBuilder.setRowsDeleted(totalRowsDeleted);
184    if (deleteType == DeleteType.VERSION) {
185      responseBuilder.setVersionsDeleted(totalVersionsDeleted);
186    }
187    BulkDeleteResponse result = responseBuilder.build();
188    done.run(result);
189  }
190
191  private Delete createDeleteMutation(List<Cell> deleteRow, DeleteType deleteType,
192      Long timestamp) {
193    long ts;
194    if (timestamp == null) {
195      ts = HConstants.LATEST_TIMESTAMP;
196    } else {
197      ts = timestamp;
198    }
199    // We just need the rowkey. Get it from 1st KV.
200    byte[] row = CellUtil.cloneRow(deleteRow.get(0));
201    Delete delete = new Delete(row, ts);
202    if (deleteType == DeleteType.FAMILY) {
203      Set<byte[]> families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
204      for (Cell kv : deleteRow) {
205        if (families.add(CellUtil.cloneFamily(kv))) {
206          delete.addFamily(CellUtil.cloneFamily(kv), ts);
207        }
208      }
209    } else if (deleteType == DeleteType.COLUMN) {
210      Set<Column> columns = new HashSet<>();
211      for (Cell kv : deleteRow) {
212        Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
213        if (columns.add(column)) {
214          // Making deleteColumns() calls more than once for the same cf:qualifier is not correct
215          // Every call to deleteColumns() will add a new KV to the familymap which will finally
216          // get written to the memstore as part of delete().
217          delete.addColumns(column.family, column.qualifier, ts);
218        }
219      }
220    } else if (deleteType == DeleteType.VERSION) {
221      // When some timestamp was passed to the delete() call only one version of the column (with
222      // given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
223      // How many versions will get deleted depends on the Scan being passed. All the KVs that
224      // the scan fetched will get deleted.
225      int noOfVersionsToDelete = 0;
226      if (timestamp == null) {
227        for (Cell kv : deleteRow) {
228          delete.addColumn(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
229                  kv.getTimestamp());
230          noOfVersionsToDelete++;
231        }
232      } else {
233        Set<Column> columns = new HashSet<>();
234        for (Cell kv : deleteRow) {
235          Column column = new Column(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv));
236          // Only one version of particular column getting deleted.
237          if (columns.add(column)) {
238            delete.addColumn(column.family, column.qualifier, ts);
239            noOfVersionsToDelete++;
240          }
241        }
242      }
243      delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
244    }
245    return delete;
246  }
247
248  private static class Column {
249    private byte[] family;
250    private byte[] qualifier;
251
252    public Column(byte[] family, byte[] qualifier) {
253      this.family = family;
254      this.qualifier = qualifier;
255    }
256
257    @Override
258    public boolean equals(Object other) {
259      if (!(other instanceof Column)) {
260        return false;
261      }
262      Column column = (Column) other;
263      return Bytes.equals(this.family, column.family)
264          && Bytes.equals(this.qualifier, column.qualifier);
265    }
266
267    @Override
268    public int hashCode() {
269      int h = 31;
270      h = h + 13 * Bytes.hashCode(this.family);
271      h = h + 13 * Bytes.hashCode(this.qualifier);
272      return h;
273    }
274  }
275
276  @Override
277  public void start(CoprocessorEnvironment env) throws IOException {
278    if (env instanceof RegionCoprocessorEnvironment) {
279      this.env = (RegionCoprocessorEnvironment) env;
280    } else {
281      throw new CoprocessorException("Must be loaded on a table region!");
282    }
283  }
284
285  @Override
286  public void stop(CoprocessorEnvironment env) throws IOException {
287    // nothing to do
288  }
289}