001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * <p>
010 * http://www.apache.org/licenses/LICENSE-2.0
011 * <p>
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.querymatcher;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.NavigableMap;
024import java.util.NavigableSet;
025import java.util.SortedMap;
026import java.util.SortedSet;
027import java.util.TreeMap;
028import java.util.TreeSet;
029
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.KeyValue.Type;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
037
038/**
039 * A tracker both implementing ColumnTracker and DeleteTracker, used for mvcc-sensitive scanning.
040 * We should make sure in one QueryMatcher the ColumnTracker and DeleteTracker is the same instance.
041 */
042@InterfaceAudience.Private
043public class NewVersionBehaviorTracker implements ColumnTracker, DeleteTracker {
044
045  private byte[] lastCqArray;
046  private int lastCqLength;
047  private int lastCqOffset;
048  private long lastCqTs;
049  private long lastCqMvcc;
050  private byte lastCqType;
051  private int columnIndex;
052  private int countCurrentCol;
053
054  protected int maxVersions;
055  private int resultMaxVersions;
056  private byte[][] columns;
057  private int minVersions;
058  private long oldestStamp;
059  private CellComparator comparator;
060
061  // These two maps have same structure.
062  // Each node is a versions deletion (DeleteFamily or DeleteColumn). Key is the mvcc of the marker,
063  // value is a data structure which contains infos we need that happens before this node's mvcc and
064  // after the previous node's mvcc. The last node is a special node whose key is max_long that
065  // saves infos after last deletion. See DeleteVersionsNode's comments for details.
066  // The delColMap is constructed and used for each cq, and thedelFamMap is constructed when cq is
067  // null and saving family-level delete markers. Each time the cq is changed, we should
068  // reconstruct delColMap as a deep copy of delFamMap.
069  protected NavigableMap<Long, DeleteVersionsNode> delColMap = new TreeMap<>();
070  protected NavigableMap<Long, DeleteVersionsNode> delFamMap = new TreeMap<>();
071
072  /**
073   * Note maxVersion and minVersion must set according to cf's conf, not user's scan parameter.
074   *
075   * @param columns           columns specified user in query
076   * @param comparartor       the cell comparator
077   * @param minVersion        The minimum number of versions to keep(used when TTL is set).
078   * @param maxVersion        The maximum number of versions in CF's conf
079   * @param resultMaxVersions maximum versions to return per column, which may be different from
080   *                          maxVersion
081   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
082   */
083  public NewVersionBehaviorTracker(NavigableSet<byte[]> columns, CellComparator comparartor,
084      int minVersion, int maxVersion, int resultMaxVersions, long oldestUnexpiredTS) {
085    this.maxVersions = maxVersion;
086    this.minVersions = minVersion;
087    this.resultMaxVersions = resultMaxVersions;
088    this.oldestStamp = oldestUnexpiredTS;
089    if (columns != null && columns.size() > 0) {
090      this.columns = new byte[columns.size()][];
091      int i = 0;
092      for (byte[] column : columns) {
093        this.columns[i++] = column;
094      }
095    }
096    this.comparator = comparartor;
097    reset();
098  }
099
100  @Override
101  public void beforeShipped() throws IOException {
102    // Do nothing
103  }
104
105  /**
106   * A data structure which contains infos we need that happens before this node's mvcc and
107   * after the previous node's mvcc. A node means there is a version deletion at the mvcc and ts.
108   */
109  protected class DeleteVersionsNode {
110    public long ts;
111    public long mvcc;
112
113    // <timestamp, set<mvcc>>
114    // Key is ts of version deletes, value is its mvccs.
115    // We may delete more than one time for a version.
116    private Map<Long, SortedSet<Long>> deletesMap = new HashMap<>();
117
118    // <mvcc, set<mvcc>>
119    // Key is mvcc of version deletes, value is mvcc of visible puts before the delete effect.
120    private NavigableMap<Long, SortedSet<Long>> mvccCountingMap = new TreeMap<>();
121
122    protected DeleteVersionsNode(long ts, long mvcc) {
123      this.ts = ts;
124      this.mvcc = mvcc;
125      mvccCountingMap.put(Long.MAX_VALUE, new TreeSet<Long>());
126    }
127
128    protected DeleteVersionsNode() {
129      this(Long.MIN_VALUE, Long.MAX_VALUE);
130    }
131
132    public void addVersionDelete(Cell cell) {
133      SortedSet<Long> set = deletesMap.get(cell.getTimestamp());
134      if (set == null) {
135        set = new TreeSet<>();
136        deletesMap.put(cell.getTimestamp(), set);
137      }
138      set.add(cell.getSequenceId());
139      // The init set should be the puts whose mvcc is smaller than this Delete. Because
140      // there may be some Puts masked by them. The Puts whose mvcc is larger than this Delete can
141      // not be copied to this node because we may delete one version and the oldest put may not be
142      // masked.
143      SortedSet<Long> nextValue = mvccCountingMap.ceilingEntry(cell.getSequenceId()).getValue();
144      SortedSet<Long> thisValue = new TreeSet<>(nextValue.headSet(cell.getSequenceId()));
145      mvccCountingMap.put(cell.getSequenceId(), thisValue);
146    }
147
148    protected DeleteVersionsNode getDeepCopy() {
149      DeleteVersionsNode node = new DeleteVersionsNode(ts, mvcc);
150      for (Map.Entry<Long, SortedSet<Long>> e : deletesMap.entrySet()) {
151        node.deletesMap.put(e.getKey(), new TreeSet<>(e.getValue()));
152      }
153      for (Map.Entry<Long, SortedSet<Long>> e : mvccCountingMap.entrySet()) {
154        node.mvccCountingMap.put(e.getKey(), new TreeSet<>(e.getValue()));
155      }
156      return node;
157    }
158  }
159
160  /**
161   * Reset the map if it is different with the last Cell.
162   * Save the cq array/offset/length for next Cell.
163   *
164   * @return If this put has duplicate ts with last cell, return the mvcc of last cell.
165   * Else return MAX_VALUE.
166   */
167  protected long prepare(Cell cell) {
168    boolean matchCq =
169        PrivateCellUtil.matchingQualifier(cell, lastCqArray, lastCqOffset, lastCqLength);
170    if (!matchCq) {
171      // The last cell is family-level delete and this is not, or the cq is changed,
172      // we should construct delColMap as a deep copy of delFamMap.
173      delColMap.clear();
174      for (Map.Entry<Long, DeleteVersionsNode> e : delFamMap.entrySet()) {
175        delColMap.put(e.getKey(), e.getValue().getDeepCopy());
176      }
177      countCurrentCol = 0;
178    }
179    if (matchCq && !PrivateCellUtil.isDelete(lastCqType) && lastCqType == cell.getTypeByte()
180        && lastCqTs == cell.getTimestamp()) {
181      // Put with duplicate timestamp, ignore.
182      return lastCqMvcc;
183    }
184    lastCqArray = cell.getQualifierArray();
185    lastCqOffset = cell.getQualifierOffset();
186    lastCqLength = cell.getQualifierLength();
187    lastCqTs = cell.getTimestamp();
188    lastCqMvcc = cell.getSequenceId();
189    lastCqType = cell.getTypeByte();
190    return Long.MAX_VALUE;
191  }
192
193  // DeleteTracker
194  @Override
195  public void add(Cell cell) {
196    prepare(cell);
197    byte type = cell.getTypeByte();
198    switch (Type.codeToType(type)) {
199    // By the order of seen. We put null cq at first.
200    case DeleteFamily: // Delete all versions of all columns of the specified family
201      delFamMap.put(cell.getSequenceId(),
202          new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId()));
203      break;
204    case DeleteFamilyVersion: // Delete all columns of the specified family and specified version
205      delFamMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
206      break;
207
208    // These two kinds of markers are mix with Puts.
209    case DeleteColumn: // Delete all versions of the specified column
210      delColMap.put(cell.getSequenceId(),
211          new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId()));
212      break;
213    case Delete: // Delete the specified version of the specified column.
214      delColMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
215      break;
216    default:
217      throw new AssertionError("Unknown delete marker type for " + cell);
218    }
219  }
220
221  /**
222   * This method is not idempotent, we will save some info to judge VERSION_MASKED.
223   * @param cell - current cell to check if deleted by a previously seen delete
224   * @return We don't distinguish DeleteColumn and DeleteFamily. We only return code for column.
225   */
226  @Override
227  public DeleteResult isDeleted(Cell cell) {
228    long duplicateMvcc = prepare(cell);
229
230    for (Map.Entry<Long, DeleteVersionsNode> e : delColMap.tailMap(cell.getSequenceId())
231        .entrySet()) {
232      DeleteVersionsNode node = e.getValue();
233      long deleteMvcc = Long.MAX_VALUE;
234      SortedSet<Long> deleteVersionMvccs = node.deletesMap.get(cell.getTimestamp());
235      if (deleteVersionMvccs != null) {
236        SortedSet<Long> tail = deleteVersionMvccs.tailSet(cell.getSequenceId());
237        if (!tail.isEmpty()) {
238          deleteMvcc = tail.first();
239        }
240      }
241      SortedMap<Long, SortedSet<Long>> subMap =
242          node.mvccCountingMap
243              .subMap(cell.getSequenceId(), true, Math.min(duplicateMvcc, deleteMvcc), true);
244      for (Map.Entry<Long, SortedSet<Long>> seg : subMap.entrySet()) {
245        if (seg.getValue().size() >= maxVersions) {
246          return DeleteResult.VERSION_MASKED;
247        }
248        seg.getValue().add(cell.getSequenceId());
249      }
250      if (deleteMvcc < Long.MAX_VALUE) {
251        return DeleteResult.VERSION_DELETED;
252      }
253
254      if (cell.getTimestamp() <= node.ts) {
255        return DeleteResult.COLUMN_DELETED;
256      }
257    }
258    if (duplicateMvcc < Long.MAX_VALUE) {
259      return DeleteResult.VERSION_MASKED;
260    }
261    return DeleteResult.NOT_DELETED;
262  }
263
264  @Override
265  public boolean isEmpty() {
266    return delColMap.size() == 1 && delColMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1
267        && delFamMap.size() == 1 && delFamMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1;
268  }
269
270  @Override
271  public void update() {
272    // ignore
273  }
274
275  //ColumnTracker
276
277  @Override
278  public MatchCode checkColumn(Cell cell, byte type) throws IOException {
279    if (columns == null) {
280        return MatchCode.INCLUDE;
281    }
282
283    while (!done()) {
284      int c = CellUtil.compareQualifiers(cell,
285        columns[columnIndex], 0, columns[columnIndex].length);
286      if (c < 0) {
287        return MatchCode.SEEK_NEXT_COL;
288      }
289
290      if (c == 0) {
291        // We drop old version in #isDeleted, so here we must return INCLUDE.
292        return MatchCode.INCLUDE;
293      }
294
295      columnIndex++;
296    }
297    // No more columns left, we are done with this query
298    return MatchCode.SEEK_NEXT_ROW;
299  }
300
301  @Override
302  public MatchCode checkVersions(Cell cell, long timestamp, byte type,
303      boolean ignoreCount) throws IOException {
304    assert !PrivateCellUtil.isDelete(type);
305    // We drop old version in #isDeleted, so here we won't SKIP because of versioning. But we should
306    // consider TTL.
307    if (ignoreCount) {
308      return MatchCode.INCLUDE;
309    }
310    countCurrentCol++;
311    if (timestamp < this.oldestStamp) {
312      if (countCurrentCol == minVersions) {
313        return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
314      }
315      if (countCurrentCol > minVersions) {
316        // This may not be reached, only for safety.
317        return MatchCode.SEEK_NEXT_COL;
318      }
319    }
320
321    if (countCurrentCol == resultMaxVersions) {
322      // We have enough number of versions for user's requirement.
323      return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
324    }
325    if (countCurrentCol > resultMaxVersions) {
326      // This may not be reached, only for safety
327      return MatchCode.SEEK_NEXT_COL;
328    }
329    return MatchCode.INCLUDE;
330  }
331
332  @Override
333  public void reset() {
334    delColMap.clear();
335    delFamMap.clear();
336    lastCqArray = null;
337    lastCqLength = 0;
338    lastCqOffset = 0;
339    lastCqTs = Long.MIN_VALUE;
340    lastCqMvcc = 0;
341    lastCqType = 0;
342    columnIndex = 0;
343    countCurrentCol = 0;
344    resetInternal();
345  }
346
347  protected void resetInternal(){
348    delFamMap.put(Long.MAX_VALUE, new DeleteVersionsNode());
349  }
350
351  @Override
352  public boolean done() {
353    return columns != null && columnIndex >= columns.length;
354  }
355
356  @Override
357  public ColumnCount getColumnHint() {
358    if (columns != null) {
359      if (columnIndex < columns.length) {
360        return new ColumnCount(columns[columnIndex]);
361      }
362    }
363    return null;
364  }
365
366  @Override
367  public MatchCode getNextRowOrNextColumn(Cell cell) {
368    // TODO maybe we can optimize.
369    return MatchCode.SEEK_NEXT_COL;
370  }
371
372  @Override
373  public boolean isDone(long timestamp) {
374    // We can not skip Cells with small ts.
375    return false;
376  }
377
378  @Override
379  public CellComparator getCellComparator() {
380    return this.comparator;
381  }
382
383}