001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import org.apache.hadoop.hbase.HBaseClassTestRule; 021import org.junit.ClassRule; 022 023// this is deliberately not in the o.a.h.h.regionserver package 024 025// in order to make sure all required classes/method are available 026 027import static org.junit.Assert.assertEquals; 028 029import java.io.IOException; 030import java.util.Collection; 031import java.util.List; 032import java.util.Optional; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.function.Predicate; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Durability; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 050import org.apache.hadoop.hbase.coprocessor.ObserverContext; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 053import org.apache.hadoop.hbase.coprocessor.RegionObserver; 054import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 055import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 056import org.apache.hadoop.hbase.regionserver.InternalScanner; 057import org.apache.hadoop.hbase.regionserver.Region; 058import org.apache.hadoop.hbase.regionserver.ScanType; 059import org.apache.hadoop.hbase.regionserver.ScannerContext; 060import org.apache.hadoop.hbase.regionserver.Store; 061import org.apache.hadoop.hbase.regionserver.StoreScanner; 062import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 063import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 064import org.apache.hadoop.hbase.testclassification.MediumTests; 065import org.apache.hadoop.hbase.testclassification.MiscTests; 066import org.apache.hadoop.hbase.wal.WALEdit; 067import org.junit.AfterClass; 068import org.junit.BeforeClass; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.runner.RunWith; 072import org.junit.runners.Parameterized; 073import org.junit.runners.Parameterized.Parameters; 074 075@Category({ MiscTests.class, MediumTests.class }) 076@RunWith(Parameterized.class) 077public class TestCoprocessorScanPolicy { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestCoprocessorScanPolicy.class); 082 083 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 084 private static final byte[] F = Bytes.toBytes("fam"); 085 private static final byte[] Q = Bytes.toBytes("qual"); 086 private static final byte[] R = Bytes.toBytes("row"); 087 088 @BeforeClass 089 public static void setUpBeforeClass() throws Exception { 090 Configuration conf = TEST_UTIL.getConfiguration(); 091 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); 092 TEST_UTIL.startMiniCluster(); 093 } 094 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 @Parameters 101 public static Collection<Object[]> parameters() { 102 return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED; 103 } 104 105 public TestCoprocessorScanPolicy(boolean parallelSeekEnable) { 106 TEST_UTIL.getMiniHBaseCluster().getConf() 107 .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable); 108 } 109 110 @Test 111 public void testBaseCases() throws Exception { 112 TableName tableName = TableName.valueOf("baseCases"); 113 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 114 TEST_UTIL.deleteTable(tableName); 115 } 116 Table t = TEST_UTIL.createTable(tableName, F, 10); 117 // insert 3 versions 118 long now = EnvironmentEdgeManager.currentTime(); 119 Put p = new Put(R); 120 p.addColumn(F, Q, now, Q); 121 t.put(p); 122 p = new Put(R); 123 p.addColumn(F, Q, now + 1, Q); 124 t.put(p); 125 p = new Put(R); 126 p.addColumn(F, Q, now + 2, Q); 127 t.put(p); 128 129 Get g = new Get(R); 130 g.readVersions(10); 131 Result r = t.get(g); 132 assertEquals(3, r.size()); 133 134 TEST_UTIL.flush(tableName); 135 TEST_UTIL.compact(tableName, true); 136 137 // still visible after a flush/compaction 138 r = t.get(g); 139 assertEquals(3, r.size()); 140 141 // set the version override to 2 142 p = new Put(R); 143 p.setAttribute("versions", new byte[] {}); 144 p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); 145 t.put(p); 146 147 // only 2 versions now 148 r = t.get(g); 149 assertEquals(2, r.size()); 150 151 TEST_UTIL.flush(tableName); 152 TEST_UTIL.compact(tableName, true); 153 154 // still 2 versions after a flush/compaction 155 r = t.get(g); 156 assertEquals(2, r.size()); 157 158 // insert a new version 159 p.addColumn(F, Q, now + 3, Q); 160 t.put(p); 161 162 // still 2 versions 163 r = t.get(g); 164 assertEquals(2, r.size()); 165 166 t.close(); 167 } 168 169 @Test 170 public void testTTL() throws Exception { 171 TableName tableName = TableName.valueOf("testTTL"); 172 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 173 TEST_UTIL.deleteTable(tableName); 174 } 175 Table t = TEST_UTIL.createTable(tableName, F, 10); 176 long now = EnvironmentEdgeManager.currentTime(); 177 ManualEnvironmentEdge me = new ManualEnvironmentEdge(); 178 me.setValue(now); 179 EnvironmentEdgeManagerTestHelper.injectEdge(me); 180 // 2s in the past 181 long ts = now - 2000; 182 183 Put p = new Put(R); 184 p.addColumn(F, Q, ts, Q); 185 t.put(p); 186 p = new Put(R); 187 p.addColumn(F, Q, ts + 1, Q); 188 t.put(p); 189 190 // Set the TTL override to 3s 191 p = new Put(R); 192 p.setAttribute("ttl", new byte[] {}); 193 p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); 194 t.put(p); 195 // these two should still be there 196 Get g = new Get(R); 197 g.readAllVersions(); 198 Result r = t.get(g); 199 // still there? 200 assertEquals(2, r.size()); 201 202 TEST_UTIL.flush(tableName); 203 TEST_UTIL.compact(tableName, true); 204 205 g = new Get(R); 206 g.readAllVersions(); 207 r = t.get(g); 208 // still there? 209 assertEquals(2, r.size()); 210 211 // roll time forward 2s. 212 me.setValue(now + 2000); 213 // now verify that data eventually does expire 214 g = new Get(R); 215 g.readAllVersions(); 216 r = t.get(g); 217 // should be gone now 218 assertEquals(0, r.size()); 219 t.close(); 220 EnvironmentEdgeManager.reset(); 221 } 222 223 public static class ScanObserver implements RegionCoprocessor, RegionObserver { 224 private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>(); 225 private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>(); 226 227 @Override 228 public Optional<RegionObserver> getRegionObserver() { 229 return Optional.of(this); 230 } 231 232 // lame way to communicate with the coprocessor, 233 // since it is loaded by a different class loader 234 @Override 235 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put, 236 final WALEdit edit, final Durability durability) throws IOException { 237 if (put.getAttribute("ttl") != null) { 238 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 239 ttls.put( 240 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 241 cell.getQualifierLength())), 242 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 243 c.bypass(); 244 } else if (put.getAttribute("versions") != null) { 245 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 246 versions.put( 247 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 248 cell.getQualifierLength())), 249 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 250 c.bypass(); 251 } 252 } 253 254 private InternalScanner wrap(Store store, InternalScanner scanner) { 255 Long ttl = this.ttls.get(store.getTableName()); 256 Integer version = this.versions.get(store.getTableName()); 257 return new DelegatingInternalScanner(scanner) { 258 259 private byte[] row; 260 261 private byte[] qualifier; 262 263 private int count; 264 265 private Predicate<Cell> checkTtl(long now, long ttl) { 266 return c -> now - c.getTimestamp() > ttl; 267 } 268 269 private Predicate<Cell> checkVersion(Cell firstCell, int version) { 270 if (version == 0) { 271 return c -> true; 272 } else { 273 if (row == null || !CellUtil.matchingRows(firstCell, row)) { 274 row = CellUtil.cloneRow(firstCell); 275 // reset qualifier as there is a row change 276 qualifier = null; 277 } 278 return c -> { 279 if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) { 280 if (count >= version) { 281 return true; 282 } 283 count++; 284 return false; 285 } else { // qualifier switch 286 qualifier = CellUtil.cloneQualifier(c); 287 count = 1; 288 return false; 289 } 290 }; 291 } 292 } 293 294 @Override 295 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 296 boolean moreRows = scanner.next(result, scannerContext); 297 if (result.isEmpty()) { 298 return moreRows; 299 } 300 long now = EnvironmentEdgeManager.currentTime(); 301 Predicate<Cell> predicate = null; 302 if (ttl != null) { 303 predicate = checkTtl(now, ttl); 304 } 305 if (version != null) { 306 Predicate<Cell> vp = checkVersion(result.get(0), version); 307 if (predicate != null) { 308 predicate = predicate.and(vp); 309 } else { 310 predicate = vp; 311 } 312 } 313 if (predicate != null) { 314 result.removeIf(predicate); 315 } 316 return moreRows; 317 } 318 }; 319 } 320 321 @Override 322 public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 323 InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 324 return wrap(store, scanner); 325 } 326 327 @Override 328 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 329 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 330 CompactionRequest request) throws IOException { 331 return wrap(store, scanner); 332 } 333 334 @Override 335 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, 336 List<Cell> result) throws IOException { 337 TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); 338 Long ttl = this.ttls.get(tableName); 339 if (ttl != null) { 340 get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax()); 341 } 342 Integer version = this.versions.get(tableName); 343 if (version != null) { 344 get.readVersions(version); 345 } 346 } 347 348 @Override 349 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) 350 throws IOException { 351 Region region = c.getEnvironment().getRegion(); 352 TableName tableName = region.getTableDescriptor().getTableName(); 353 Long ttl = this.ttls.get(tableName); 354 if (ttl != null) { 355 scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax()); 356 } 357 Integer version = this.versions.get(tableName); 358 if (version != null) { 359 scan.readVersions(version); 360 } 361 } 362 } 363}