001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.function.Predicate; 028import java.util.stream.Stream; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Durability; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 049import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 050import org.apache.hadoop.hbase.regionserver.InternalScanner; 051import org.apache.hadoop.hbase.regionserver.Region; 052import org.apache.hadoop.hbase.regionserver.ScanType; 053import org.apache.hadoop.hbase.regionserver.ScannerContext; 054import org.apache.hadoop.hbase.regionserver.Store; 055import org.apache.hadoop.hbase.regionserver.StoreScanner; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.testclassification.MiscTests; 060import org.apache.hadoop.hbase.wal.WALEdit; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.BeforeAll; 063import org.junit.jupiter.api.Tag; 064import org.junit.jupiter.api.TestTemplate; 065import org.junit.jupiter.params.provider.Arguments; 066 067@Tag(MiscTests.TAG) 068@Tag(MediumTests.TAG) 069@HBaseParameterizedTestTemplate 070public class TestCoprocessorScanPolicy { 071 072 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 073 private static final byte[] F = Bytes.toBytes("fam"); 074 private static final byte[] Q = Bytes.toBytes("qual"); 075 private static final byte[] R = Bytes.toBytes("row"); 076 077 @BeforeAll 078 public static void setUpBeforeClass() throws Exception { 079 Configuration conf = TEST_UTIL.getConfiguration(); 080 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); 081 TEST_UTIL.startMiniCluster(); 082 } 083 084 @AfterAll 085 public static void tearDownAfterClass() throws Exception { 086 TEST_UTIL.shutdownMiniCluster(); 087 } 088 089 public static Stream<Arguments> parameters() { 090 return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED.stream().map(arr -> Arguments.of(arr)); 091 } 092 093 public TestCoprocessorScanPolicy(boolean parallelSeekEnable) { 094 TEST_UTIL.getMiniHBaseCluster().getConf() 095 .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable); 096 } 097 098 @TestTemplate 099 public void testBaseCases() throws Exception { 100 TableName tableName = TableName.valueOf("baseCases"); 101 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 102 TEST_UTIL.deleteTable(tableName); 103 } 104 Table t = TEST_UTIL.createTable(tableName, F, 10); 105 // insert 3 versions 106 long now = EnvironmentEdgeManager.currentTime(); 107 Put p = new Put(R); 108 p.addColumn(F, Q, now, Q); 109 t.put(p); 110 p = new Put(R); 111 p.addColumn(F, Q, now + 1, Q); 112 t.put(p); 113 p = new Put(R); 114 p.addColumn(F, Q, now + 2, Q); 115 t.put(p); 116 117 Get g = new Get(R); 118 g.readVersions(10); 119 Result r = t.get(g); 120 assertEquals(3, r.size()); 121 122 TEST_UTIL.flush(tableName); 123 TEST_UTIL.compact(tableName, true); 124 125 // still visible after a flush/compaction 126 r = t.get(g); 127 assertEquals(3, r.size()); 128 129 // set the version override to 2 130 p = new Put(R); 131 p.setAttribute("versions", new byte[] {}); 132 p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); 133 t.put(p); 134 135 // only 2 versions now 136 r = t.get(g); 137 assertEquals(2, r.size()); 138 139 TEST_UTIL.flush(tableName); 140 TEST_UTIL.compact(tableName, true); 141 142 // still 2 versions after a flush/compaction 143 r = t.get(g); 144 assertEquals(2, r.size()); 145 146 // insert a new version 147 p.addColumn(F, Q, now + 3, Q); 148 t.put(p); 149 150 // still 2 versions 151 r = t.get(g); 152 assertEquals(2, r.size()); 153 154 t.close(); 155 } 156 157 @TestTemplate 158 public void testTTL() throws Exception { 159 TableName tableName = TableName.valueOf("testTTL"); 160 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 161 TEST_UTIL.deleteTable(tableName); 162 } 163 Table t = TEST_UTIL.createTable(tableName, F, 10); 164 long now = EnvironmentEdgeManager.currentTime(); 165 ManualEnvironmentEdge me = new ManualEnvironmentEdge(); 166 me.setValue(now); 167 EnvironmentEdgeManagerTestHelper.injectEdge(me); 168 // 2s in the past 169 long ts = now - 2000; 170 171 Put p = new Put(R); 172 p.addColumn(F, Q, ts, Q); 173 t.put(p); 174 p = new Put(R); 175 p.addColumn(F, Q, ts + 1, Q); 176 t.put(p); 177 178 // Set the TTL override to 3s 179 p = new Put(R); 180 p.setAttribute("ttl", new byte[] {}); 181 p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); 182 t.put(p); 183 // these two should still be there 184 Get g = new Get(R); 185 g.readAllVersions(); 186 Result r = t.get(g); 187 // still there? 188 assertEquals(2, r.size()); 189 190 TEST_UTIL.flush(tableName); 191 TEST_UTIL.compact(tableName, true); 192 193 g = new Get(R); 194 g.readAllVersions(); 195 r = t.get(g); 196 // still there? 197 assertEquals(2, r.size()); 198 199 // roll time forward 2s. 200 me.setValue(now + 2000); 201 // now verify that data eventually does expire 202 g = new Get(R); 203 g.readAllVersions(); 204 r = t.get(g); 205 // should be gone now 206 assertEquals(0, r.size()); 207 t.close(); 208 EnvironmentEdgeManager.reset(); 209 } 210 211 public static class ScanObserver implements RegionCoprocessor, RegionObserver { 212 private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>(); 213 private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>(); 214 215 @Override 216 public Optional<RegionObserver> getRegionObserver() { 217 return Optional.of(this); 218 } 219 220 // lame way to communicate with the coprocessor, 221 // since it is loaded by a different class loader 222 @Override 223 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 224 final Put put, final WALEdit edit, final Durability durability) throws IOException { 225 if (put.getAttribute("ttl") != null) { 226 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 227 ttls.put( 228 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 229 cell.getQualifierLength())), 230 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 231 c.bypass(); 232 } else if (put.getAttribute("versions") != null) { 233 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 234 versions.put( 235 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 236 cell.getQualifierLength())), 237 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 238 c.bypass(); 239 } 240 } 241 242 private InternalScanner wrap(Store store, InternalScanner scanner) { 243 Long ttl = this.ttls.get(store.getTableName()); 244 Integer version = this.versions.get(store.getTableName()); 245 return new DelegatingInternalScanner(scanner) { 246 247 private byte[] row; 248 249 private byte[] qualifier; 250 251 private int count; 252 253 private Predicate<Cell> checkTtl(long now, long ttl) { 254 return c -> now - c.getTimestamp() > ttl; 255 } 256 257 private Predicate<Cell> checkVersion(Cell firstCell, int version) { 258 if (version == 0) { 259 return c -> true; 260 } else { 261 if (row == null || !CellUtil.matchingRows(firstCell, row)) { 262 row = CellUtil.cloneRow(firstCell); 263 // reset qualifier as there is a row change 264 qualifier = null; 265 } 266 return c -> { 267 if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) { 268 if (count >= version) { 269 return true; 270 } 271 count++; 272 return false; 273 } else { // qualifier switch 274 qualifier = CellUtil.cloneQualifier(c); 275 count = 1; 276 return false; 277 } 278 }; 279 } 280 } 281 282 @Override 283 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 284 throws IOException { 285 boolean moreRows = scanner.next(result, scannerContext); 286 if (result.isEmpty()) { 287 return moreRows; 288 } 289 long now = EnvironmentEdgeManager.currentTime(); 290 Predicate<Cell> predicate = null; 291 if (ttl != null) { 292 predicate = checkTtl(now, ttl); 293 } 294 if (version != null) { 295 Predicate<Cell> vp = checkVersion((Cell) result.get(0), version); 296 if (predicate != null) { 297 predicate = predicate.and(vp); 298 } else { 299 predicate = vp; 300 } 301 } 302 if (predicate != null) { 303 ((List<Cell>) result).removeIf(predicate); 304 } 305 return moreRows; 306 } 307 }; 308 } 309 310 @Override 311 public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 312 Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 313 return wrap(store, scanner); 314 } 315 316 @Override 317 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 318 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 319 CompactionRequest request) throws IOException { 320 return wrap(store, scanner); 321 } 322 323 @Override 324 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 325 List<Cell> result) throws IOException { 326 TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); 327 Long ttl = this.ttls.get(tableName); 328 if (ttl != null) { 329 get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax()); 330 } 331 Integer version = this.versions.get(tableName); 332 if (version != null) { 333 get.readVersions(version); 334 } 335 } 336 337 @Override 338 public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan) 339 throws IOException { 340 Region region = c.getEnvironment().getRegion(); 341 TableName tableName = region.getTableDescriptor().getTableName(); 342 Long ttl = this.ttls.get(tableName); 343 if (ttl != null) { 344 scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax()); 345 } 346 Integer version = this.versions.get(tableName); 347 if (version != null) { 348 scan.readVersions(version); 349 } 350 } 351 } 352}