001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Collection; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.function.Predicate; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 046import org.apache.hadoop.hbase.coprocessor.RegionObserver; 047import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 048import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 049import org.apache.hadoop.hbase.regionserver.InternalScanner; 050import org.apache.hadoop.hbase.regionserver.Region; 051import org.apache.hadoop.hbase.regionserver.ScanType; 052import org.apache.hadoop.hbase.regionserver.ScannerContext; 053import org.apache.hadoop.hbase.regionserver.Store; 054import org.apache.hadoop.hbase.regionserver.StoreScanner; 055import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.testclassification.MiscTests; 059import org.apache.hadoop.hbase.wal.WALEdit; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.runner.RunWith; 066import org.junit.runners.Parameterized; 067import org.junit.runners.Parameterized.Parameters; 068 069@Category({ MiscTests.class, MediumTests.class }) 070@RunWith(Parameterized.class) 071public class TestCoprocessorScanPolicy { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestCoprocessorScanPolicy.class); 076 077 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 078 private static final byte[] F = Bytes.toBytes("fam"); 079 private static final byte[] Q = Bytes.toBytes("qual"); 080 private static final byte[] R = Bytes.toBytes("row"); 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 Configuration conf = TEST_UTIL.getConfiguration(); 085 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); 086 TEST_UTIL.startMiniCluster(); 087 } 088 089 @AfterClass 090 public static void tearDownAfterClass() throws Exception { 091 TEST_UTIL.shutdownMiniCluster(); 092 } 093 094 @Parameters 095 public static Collection<Object[]> parameters() { 096 return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED; 097 } 098 099 public TestCoprocessorScanPolicy(boolean parallelSeekEnable) { 100 TEST_UTIL.getMiniHBaseCluster().getConf() 101 .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable); 102 } 103 104 @Test 105 public void testBaseCases() throws Exception { 106 TableName tableName = TableName.valueOf("baseCases"); 107 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 108 TEST_UTIL.deleteTable(tableName); 109 } 110 Table t = TEST_UTIL.createTable(tableName, F, 10); 111 // insert 3 versions 112 long now = EnvironmentEdgeManager.currentTime(); 113 Put p = new Put(R); 114 p.addColumn(F, Q, now, Q); 115 t.put(p); 116 p = new Put(R); 117 p.addColumn(F, Q, now + 1, Q); 118 t.put(p); 119 p = new Put(R); 120 p.addColumn(F, Q, now + 2, Q); 121 t.put(p); 122 123 Get g = new Get(R); 124 g.readVersions(10); 125 Result r = t.get(g); 126 assertEquals(3, r.size()); 127 128 TEST_UTIL.flush(tableName); 129 TEST_UTIL.compact(tableName, true); 130 131 // still visible after a flush/compaction 132 r = t.get(g); 133 assertEquals(3, r.size()); 134 135 // set the version override to 2 136 p = new Put(R); 137 p.setAttribute("versions", new byte[] {}); 138 p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); 139 t.put(p); 140 141 // only 2 versions now 142 r = t.get(g); 143 assertEquals(2, r.size()); 144 145 TEST_UTIL.flush(tableName); 146 TEST_UTIL.compact(tableName, true); 147 148 // still 2 versions after a flush/compaction 149 r = t.get(g); 150 assertEquals(2, r.size()); 151 152 // insert a new version 153 p.addColumn(F, Q, now + 3, Q); 154 t.put(p); 155 156 // still 2 versions 157 r = t.get(g); 158 assertEquals(2, r.size()); 159 160 t.close(); 161 } 162 163 @Test 164 public void testTTL() throws Exception { 165 TableName tableName = TableName.valueOf("testTTL"); 166 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 167 TEST_UTIL.deleteTable(tableName); 168 } 169 Table t = TEST_UTIL.createTable(tableName, F, 10); 170 long now = EnvironmentEdgeManager.currentTime(); 171 ManualEnvironmentEdge me = new ManualEnvironmentEdge(); 172 me.setValue(now); 173 EnvironmentEdgeManagerTestHelper.injectEdge(me); 174 // 2s in the past 175 long ts = now - 2000; 176 177 Put p = new Put(R); 178 p.addColumn(F, Q, ts, Q); 179 t.put(p); 180 p = new Put(R); 181 p.addColumn(F, Q, ts + 1, Q); 182 t.put(p); 183 184 // Set the TTL override to 3s 185 p = new Put(R); 186 p.setAttribute("ttl", new byte[] {}); 187 p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); 188 t.put(p); 189 // these two should still be there 190 Get g = new Get(R); 191 g.readAllVersions(); 192 Result r = t.get(g); 193 // still there? 194 assertEquals(2, r.size()); 195 196 TEST_UTIL.flush(tableName); 197 TEST_UTIL.compact(tableName, true); 198 199 g = new Get(R); 200 g.readAllVersions(); 201 r = t.get(g); 202 // still there? 203 assertEquals(2, r.size()); 204 205 // roll time forward 2s. 206 me.setValue(now + 2000); 207 // now verify that data eventually does expire 208 g = new Get(R); 209 g.readAllVersions(); 210 r = t.get(g); 211 // should be gone now 212 assertEquals(0, r.size()); 213 t.close(); 214 EnvironmentEdgeManager.reset(); 215 } 216 217 public static class ScanObserver implements RegionCoprocessor, RegionObserver { 218 private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>(); 219 private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>(); 220 221 @Override 222 public Optional<RegionObserver> getRegionObserver() { 223 return Optional.of(this); 224 } 225 226 // lame way to communicate with the coprocessor, 227 // since it is loaded by a different class loader 228 @Override 229 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put, 230 final WALEdit edit, final Durability durability) throws IOException { 231 if (put.getAttribute("ttl") != null) { 232 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 233 ttls.put( 234 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 235 cell.getQualifierLength())), 236 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 237 c.bypass(); 238 } else if (put.getAttribute("versions") != null) { 239 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 240 versions.put( 241 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 242 cell.getQualifierLength())), 243 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 244 c.bypass(); 245 } 246 } 247 248 private InternalScanner wrap(Store store, InternalScanner scanner) { 249 Long ttl = this.ttls.get(store.getTableName()); 250 Integer version = this.versions.get(store.getTableName()); 251 return new DelegatingInternalScanner(scanner) { 252 253 private byte[] row; 254 255 private byte[] qualifier; 256 257 private int count; 258 259 private Predicate<Cell> checkTtl(long now, long ttl) { 260 return c -> now - c.getTimestamp() > ttl; 261 } 262 263 private Predicate<Cell> checkVersion(Cell firstCell, int version) { 264 if (version == 0) { 265 return c -> true; 266 } else { 267 if (row == null || !CellUtil.matchingRows(firstCell, row)) { 268 row = CellUtil.cloneRow(firstCell); 269 // reset qualifier as there is a row change 270 qualifier = null; 271 } 272 return c -> { 273 if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) { 274 if (count >= version) { 275 return true; 276 } 277 count++; 278 return false; 279 } else { // qualifier switch 280 qualifier = CellUtil.cloneQualifier(c); 281 count = 1; 282 return false; 283 } 284 }; 285 } 286 } 287 288 @Override 289 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 290 boolean moreRows = scanner.next(result, scannerContext); 291 if (result.isEmpty()) { 292 return moreRows; 293 } 294 long now = EnvironmentEdgeManager.currentTime(); 295 Predicate<Cell> predicate = null; 296 if (ttl != null) { 297 predicate = checkTtl(now, ttl); 298 } 299 if (version != null) { 300 Predicate<Cell> vp = checkVersion(result.get(0), version); 301 if (predicate != null) { 302 predicate = predicate.and(vp); 303 } else { 304 predicate = vp; 305 } 306 } 307 if (predicate != null) { 308 result.removeIf(predicate); 309 } 310 return moreRows; 311 } 312 }; 313 } 314 315 @Override 316 public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 317 InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 318 return wrap(store, scanner); 319 } 320 321 @Override 322 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 323 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 324 CompactionRequest request) throws IOException { 325 return wrap(store, scanner); 326 } 327 328 @Override 329 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, 330 List<Cell> result) throws IOException { 331 TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); 332 Long ttl = this.ttls.get(tableName); 333 if (ttl != null) { 334 get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax()); 335 } 336 Integer version = this.versions.get(tableName); 337 if (version != null) { 338 get.readVersions(version); 339 } 340 } 341 342 @Override 343 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) 344 throws IOException { 345 Region region = c.getEnvironment().getRegion(); 346 TableName tableName = region.getTableDescriptor().getTableName(); 347 Long ttl = this.ttls.get(tableName); 348 if (ttl != null) { 349 scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax()); 350 } 351 Integer version = this.versions.get(tableName); 352 if (version != null) { 353 scan.readVersions(version); 354 } 355 } 356 } 357}