001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.querymatcher; 019 020import java.io.IOException; 021import java.util.NavigableSet; 022 023import org.apache.hadoop.hbase.Cell; 024import org.apache.hadoop.hbase.CellUtil; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.PrivateCellUtil; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; 029 030/** 031 * This class is used for the tracking and enforcement of columns and numbers of versions during the 032 * course of a Get or Scan operation, when explicit column qualifiers have been asked for in the 033 * query. With a little magic (see {@link ScanQueryMatcher}), we can use this matcher for both scans 034 * and gets. The main difference is 'next' and 'done' collapse for the scan case (since we see all 035 * columns in order), and we only reset between rows. 036 * <p> 037 * This class is utilized by {@link ScanQueryMatcher} mainly through two methods: 038 * <ul> 039 * <li>{@link #checkColumn} is called when a Put satisfies all other conditions of the query.</li> 040 * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher believes that the current 041 * column should be skipped (by timestamp, filter etc.)</li> 042 * </ul> 043 * <p> 044 * These two methods returns a 045 * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} to define 046 * what action should be taken. 047 * <p> 048 * This class is NOT thread-safe as queries are never multi-threaded 049 */ 050@InterfaceAudience.Private 051public class ExplicitColumnTracker implements ColumnTracker { 052 053 private final int maxVersions; 054 private final int minVersions; 055 056 /** 057 * Contains the list of columns that the ExplicitColumnTracker is tracking. Each ColumnCount 058 * instance also tracks how many versions of the requested column have been returned. 059 */ 060 private final ColumnCount[] columns; 061 private int index; 062 private ColumnCount column; 063 /** 064 * Keeps track of the latest timestamp included for current column. Used to eliminate duplicates. 065 */ 066 private long latestTSOfCurrentColumn; 067 private long oldestStamp; 068 069 /** 070 * Default constructor. 071 * @param columns columns specified user in query 072 * @param minVersions minimum number of versions to keep 073 * @param maxVersions maximum versions to return per column 074 * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL 075 */ 076 public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions, int maxVersions, 077 long oldestUnexpiredTS) { 078 this.maxVersions = maxVersions; 079 this.minVersions = minVersions; 080 this.oldestStamp = oldestUnexpiredTS; 081 this.columns = new ColumnCount[columns.size()]; 082 int i = 0; 083 for (byte[] column : columns) { 084 this.columns[i++] = new ColumnCount(column); 085 } 086 reset(); 087 } 088 089 /** 090 * Done when there are no more columns to match against. 091 */ 092 @Override 093 public boolean done() { 094 return this.index >= columns.length; 095 } 096 097 @Override 098 public ColumnCount getColumnHint() { 099 return this.column; 100 } 101 102 /** 103 * {@inheritDoc} 104 */ 105 @Override 106 public ScanQueryMatcher.MatchCode checkColumn(Cell cell, byte type) { 107 // delete markers should never be passed to an 108 // *Explicit*ColumnTracker 109 assert !PrivateCellUtil.isDelete(type); 110 do { 111 // No more columns left, we are done with this query 112 if (done()) { 113 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row 114 } 115 116 // No more columns to match against, done with storefile 117 if (this.column == null) { 118 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row 119 } 120 121 // Compare specific column to current column 122 int ret = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(), 123 column.getLength()); 124 125 // Column Matches. Return include code. The caller would call checkVersions 126 // to limit the number of versions. 127 if (ret == 0) { 128 return ScanQueryMatcher.MatchCode.INCLUDE; 129 } 130 131 resetTS(); 132 133 if (ret < 0) { 134 // The current KV is smaller than the column the ExplicitColumnTracker 135 // is interested in, so seek to that column of interest. 136 return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; 137 } 138 139 // The current KV is bigger than the column the ExplicitColumnTracker 140 // is interested in. That means there is no more data for the column 141 // of interest. Advance the ExplicitColumnTracker state to next 142 // column of interest, and check again. 143 if (ret > 0) { 144 ++this.index; 145 if (done()) { 146 // No more to match, do not include, done with this row. 147 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row 148 } 149 // This is the recursive case. 150 this.column = this.columns[this.index]; 151 } 152 } while (true); 153 } 154 155 @Override 156 public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type, 157 boolean ignoreCount) throws IOException { 158 assert !PrivateCellUtil.isDelete(type); 159 if (ignoreCount) { 160 return ScanQueryMatcher.MatchCode.INCLUDE; 161 } 162 // Check if it is a duplicate timestamp 163 if (sameAsPreviousTS(timestamp)) { 164 // If duplicate, skip this Key 165 return ScanQueryMatcher.MatchCode.SKIP; 166 } 167 int count = this.column.increment(); 168 if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { 169 // Done with versions for this column 170 ++this.index; 171 resetTS(); 172 if (done()) { 173 // We have served all the requested columns. 174 this.column = null; 175 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; 176 } 177 // We are done with current column; advance to next column 178 // of interest. 179 this.column = this.columns[this.index]; 180 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; 181 } 182 setTS(timestamp); 183 return ScanQueryMatcher.MatchCode.INCLUDE; 184 } 185 186 // Called between every row. 187 @Override 188 public void reset() { 189 this.index = 0; 190 this.column = this.columns[this.index]; 191 for (ColumnCount col : this.columns) { 192 col.setCount(0); 193 } 194 resetTS(); 195 } 196 197 private void resetTS() { 198 latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP; 199 } 200 201 private void setTS(long timestamp) { 202 latestTSOfCurrentColumn = timestamp; 203 } 204 205 private boolean sameAsPreviousTS(long timestamp) { 206 return timestamp == latestTSOfCurrentColumn; 207 } 208 209 private boolean isExpired(long timestamp) { 210 return timestamp < oldestStamp; 211 } 212 213 @Override 214 public void doneWithColumn(Cell cell) { 215 while (this.column != null) { 216 int compare = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(), 217 column.getLength()); 218 resetTS(); 219 if (compare >= 0) { 220 ++this.index; 221 if (done()) { 222 // Will not hit any more columns in this storefile 223 this.column = null; 224 } else { 225 this.column = this.columns[this.index]; 226 } 227 if (compare > 0) { 228 continue; 229 } 230 } 231 return; 232 } 233 } 234 235 @Override 236 public MatchCode getNextRowOrNextColumn(Cell cell) { 237 doneWithColumn(cell); 238 239 if (getColumnHint() == null) { 240 return MatchCode.SEEK_NEXT_ROW; 241 } else { 242 return MatchCode.SEEK_NEXT_COL; 243 } 244 } 245 246 @Override 247 public boolean isDone(long timestamp) { 248 return minVersions <= 0 && isExpired(timestamp); 249 } 250 251 @Override 252 public void beforeShipped() throws IOException { 253 // do nothing 254 } 255}