View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.coprocessor.example;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.TreeSet;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.client.Delete;
34  import org.apache.hadoop.hbase.client.Mutation;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
37  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
38  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.regionserver.OperationStatus;
41  import org.apache.hadoop.hbase.regionserver.RegionScanner;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.Pair;
44  
45  public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkDeleteProtocol {
46    private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
47    private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
48    
49    @Override
50    public BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp,
51        int rowBatchSize) {
52      long totalRowsDeleted = 0L;
53      long totalVersionsDeleted = 0L;
54      BulkDeleteResponse response = new BulkDeleteResponse();
55      HRegion region = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
56      boolean hasMore = true;
57      RegionScanner scanner = null;
58      if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
59        // What we need is just the rowkeys. So only 1st KV from any row is enough.
60        // Only when it is a row delete, we can apply this filter
61        // In other types we rely on the scan to know which all columns to be deleted.
62        scan.setFilter(new FirstKeyOnlyFilter());
63      }
64      // When the delete is based on some conditions so that Filters are available in the scan,
65      // we assume that the scan is perfect having necessary column(s) only.
66      try {
67        scanner = region.getScanner(scan);
68        while (hasMore) {
69          List<List<KeyValue>> deleteRows = new ArrayList<List<KeyValue>>(rowBatchSize);
70          for (int i = 0; i < rowBatchSize; i++) {
71            List<KeyValue> results = new ArrayList<KeyValue>();
72            hasMore = scanner.next(results);
73            if (results.size() > 0) {
74              deleteRows.add(results);
75            }
76            if (!hasMore) {
77              // There are no more rows.
78              break;
79            }
80          }
81          if (deleteRows.size() > 0) {
82            Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
83            int i = 0;
84            for (List<KeyValue> deleteRow : deleteRows) {
85              Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
86              deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
87            }
88            OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
89            for (i = 0; i < opStatus.length; i++) {
90              if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
91                break;
92              }
93              totalRowsDeleted++;
94              if (deleteType == DeleteType.VERSION) {
95                byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
96                    NO_OF_VERSIONS_TO_DELETE);
97                if (versionsDeleted != null) {
98                  totalVersionsDeleted += Bytes.toInt(versionsDeleted);
99                }
100             }
101           }
102         } 
103       }
104     } catch (IOException ioe) {
105       LOG.error(ioe);
106       response.setIoException(ioe);
107     } finally {
108       if (scanner != null) {
109         try {
110           scanner.close();
111         } catch (IOException ioe) {
112           LOG.error(ioe);
113         }
114       }
115     }
116     response.setRowsDeleted(totalRowsDeleted);
117     response.setVersionsDeleted(totalVersionsDeleted);
118     return response;
119   }
120 
121   private Delete createDeleteMutation(List<KeyValue> deleteRow, byte deleteType, Long timestamp) {
122     long ts;
123     if (timestamp == null) {
124       ts = HConstants.LATEST_TIMESTAMP;
125     } else {
126       ts = timestamp;
127     }
128     // We just need the rowkey. Get it from 1st KV.
129     byte[] row = deleteRow.get(0).getRow();
130     Delete delete = new Delete(row, ts, null);
131     if (deleteType != DeleteType.ROW) {
132       switch (deleteType) {
133       case DeleteType.FAMILY:
134         Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
135         for (KeyValue kv : deleteRow) {
136           if (families.add(kv.getFamily())) {
137             delete.deleteFamily(kv.getFamily(), ts);
138           }
139         }
140         break;
141 
142       case DeleteType.COLUMN:
143         Set<Column> columns = new HashSet<Column>();
144         for (KeyValue kv : deleteRow) {
145           Column column = new Column(kv.getFamily(), kv.getQualifier());
146           if (columns.add(column)) {
147             // Making deleteColumns() calls more than once for the same cf:qualifier is not correct
148             // Every call to deleteColumns() will add a new KV to the familymap which will finally
149             // get written to the memstore as part of delete().
150             delete.deleteColumns(column.family, column.qualifier, ts);
151           }
152         }
153         break;
154 
155       case DeleteType.VERSION:
156         // When some timestamp was passed to the delete() call only one version of the column (with
157         // given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
158         // How many versions will get deleted depends on the Scan being passed. All the KVs that
159         // the scan fetched will get deleted.
160         int noOfVersionsToDelete = 0;
161         if (timestamp == null) {
162           for (KeyValue kv : deleteRow) {
163             delete.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());
164             noOfVersionsToDelete++;
165           }
166         } else {
167           columns = new HashSet<Column>();
168           for (KeyValue kv : deleteRow) {
169             Column column = new Column(kv.getFamily(), kv.getQualifier());
170             // Only one version of particular column getting deleted.
171             if (columns.add(column)) {
172               delete.deleteColumn(column.family, column.qualifier, ts);
173               noOfVersionsToDelete++;
174             }
175           }
176         }
177         delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
178       }
179     }
180     return delete;
181   }
182   
183   private static class Column {
184     private byte[] family;
185     private byte[] qualifier;
186 
187     public Column(byte[] family, byte[] qualifier) {
188       this.family = family;
189       this.qualifier = qualifier;
190     }
191 
192     @Override
193     public boolean equals(Object other) {
194       if (!(other instanceof Column)) {
195         return false;
196       }
197       Column column = (Column) other;
198       return Bytes.equals(this.family, column.family)
199           && Bytes.equals(this.qualifier, column.qualifier);
200     }
201 
202     @Override
203     public int hashCode() {
204       int h = 31;
205       h = h + 13 * Bytes.hashCode(this.family);
206       h = h + 13 * Bytes.hashCode(this.qualifier);
207       return h;
208     }
209   }
210 }