001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.io.hfile.BlockCache;
049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
050import org.apache.hadoop.hbase.io.hfile.CacheConfig;
051import org.apache.hadoop.hbase.io.hfile.CachedBlock;
052import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
054import org.apache.hadoop.hbase.regionserver.BloomType;
055import org.apache.hadoop.hbase.regionserver.HRegion;
056import org.apache.hadoop.hbase.regionserver.HStore;
057import org.apache.hadoop.hbase.regionserver.InternalScanner;
058import org.apache.hadoop.hbase.regionserver.RegionScanner;
059import org.apache.hadoop.hbase.regionserver.ScannerContext;
060import org.apache.hadoop.hbase.testclassification.ClientTests;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.junit.After;
065import org.junit.AfterClass;
066import org.junit.Before;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.junit.rules.TestName;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
077
078@Category({ LargeTests.class, ClientTests.class })
079@SuppressWarnings("deprecation")
080public class TestBlockEvictionFromClient {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
087  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
088  static byte[][] ROWS = new byte[2][];
089  private static int NO_OF_THREADS = 3;
090  private static byte[] ROW = Bytes.toBytes("testRow");
091  private static byte[] ROW1 = Bytes.toBytes("testRow1");
092  private static byte[] ROW2 = Bytes.toBytes("testRow2");
093  private static byte[] ROW3 = Bytes.toBytes("testRow3");
094  private static byte[] FAMILY = Bytes.toBytes("testFamily");
095  private static byte[][] FAMILIES_1 = new byte[1][0];
096  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
097  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
098  private static byte[] data = new byte[1000];
099  private static byte[] data2 = Bytes.add(data, data);
100  protected static int SLAVES = 1;
101  private static CountDownLatch latch;
102  private static CountDownLatch getLatch;
103  private static CountDownLatch compactionLatch;
104  private static CountDownLatch exceptionLatch;
105
106  @Rule
107  public TestName name = new TestName();
108
109  /**
110   * @throws java.lang.Exception
111   */
112  @BeforeClass
113  public static void setUpBeforeClass() throws Exception {
114    ROWS[0] = ROW;
115    ROWS[1] = ROW1;
116    Configuration conf = TEST_UTIL.getConfiguration();
117    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
118      MultiRowMutationEndpoint.class.getName());
119    conf.setInt("hbase.regionserver.handler.count", 20);
120    conf.setInt("hbase.bucketcache.size", 400);
121    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
122    conf.setFloat("hfile.block.cache.size", 0.2f);
123    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
124    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
125    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
126    FAMILIES_1[0] = FAMILY;
127    TEST_UTIL.startMiniCluster(SLAVES);
128  }
129
130  /**
131   * @throws java.lang.Exception
132   */
133  @AfterClass
134  public static void tearDownAfterClass() throws Exception {
135    TEST_UTIL.shutdownMiniCluster();
136  }
137
138  /**
139   * @throws java.lang.Exception
140   */
141  @Before
142  public void setUp() throws Exception {
143    CustomInnerRegionObserver.waitForGets.set(false);
144    CustomInnerRegionObserver.countOfNext.set(0);
145    CustomInnerRegionObserver.countOfGets.set(0);
146  }
147
148  /**
149   * @throws java.lang.Exception
150   */
151  @After
152  public void tearDown() throws Exception {
153    if (latch != null) {
154      while (latch.getCount() > 0) {
155        latch.countDown();
156      }
157    }
158    if (getLatch != null) {
159      getLatch.countDown();
160    }
161    if (compactionLatch != null) {
162      compactionLatch.countDown();
163    }
164    if (exceptionLatch != null) {
165      exceptionLatch.countDown();
166    }
167    latch = null;
168    getLatch = null;
169    compactionLatch = null;
170    exceptionLatch = null;
171    CustomInnerRegionObserver.throwException.set(false);
172    // Clean up the tables for every test case
173    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
174    for (TableName tableName : listTableNames) {
175      if (!tableName.isSystemTable()) {
176        TEST_UTIL.getAdmin().disableTable(tableName);
177        TEST_UTIL.getAdmin().deleteTable(tableName);
178      }
179    }
180  }
181
182  @Test
183  public void testBlockEvictionWithParallelScans() throws Exception {
184    Table table = null;
185    try {
186      latch = new CountDownLatch(1);
187      final TableName tableName = TableName.valueOf(name.getMethodName());
188      // Create a table with block size as 1024
189      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
190        CustomInnerRegionObserver.class.getName());
191      // get the block cache and region
192      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
193      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
194      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
195      HStore store = region.getStores().iterator().next();
196      CacheConfig cacheConf = store.getCacheConfig();
197      cacheConf.setCacheDataOnWrite(true);
198      cacheConf.setEvictOnClose(true);
199      BlockCache cache = cacheConf.getBlockCache().get();
200
201      // insert data. 2 Rows are added
202      Put put = new Put(ROW);
203      put.addColumn(FAMILY, QUALIFIER, data);
204      table.put(put);
205      put = new Put(ROW1);
206      put.addColumn(FAMILY, QUALIFIER, data);
207      table.put(put);
208      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
209      // data was in memstore so don't expect any changes
210      // flush the data
211      // Should create one Hfile with 2 blocks
212      region.flush(true);
213      // Load cache
214      // Create three sets of scan
215      ScanThread[] scanThreads = initiateScan(table, false);
216      Thread.sleep(100);
217      checkForBlockEviction(cache, false, false);
218      for (ScanThread thread : scanThreads) {
219        thread.join();
220      }
221      // CustomInnerRegionObserver.sleepTime.set(0);
222      Iterator<CachedBlock> iterator = cache.iterator();
223      iterateBlockCache(cache, iterator);
224      // read the data and expect same blocks, one new hit, no misses
225      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
226      iterator = cache.iterator();
227      iterateBlockCache(cache, iterator);
228      // Check how this miss is happening
229      // insert a second column, read the row, no new blocks, 3 new hits
230      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
231      byte[] data2 = Bytes.add(data, data);
232      put = new Put(ROW);
233      put.addColumn(FAMILY, QUALIFIER2, data2);
234      table.put(put);
235      Result r = table.get(new Get(ROW));
236      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
237      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
238      iterator = cache.iterator();
239      iterateBlockCache(cache, iterator);
240      // flush, one new block
241      System.out.println("Flushing cache");
242      region.flush(true);
243      iterator = cache.iterator();
244      iterateBlockCache(cache, iterator);
245      // compact, net minus two blocks, two hits, no misses
246      System.out.println("Compacting");
247      assertEquals(2, store.getStorefilesCount());
248      store.triggerMajorCompaction();
249      region.compact(true);
250      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
251      assertEquals(1, store.getStorefilesCount());
252      iterator = cache.iterator();
253      iterateBlockCache(cache, iterator);
254      // read the row, this should be a cache miss because we don't cache data
255      // blocks on compaction
256      r = table.get(new Get(ROW));
257      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
258      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
259      iterator = cache.iterator();
260      iterateBlockCache(cache, iterator);
261    } finally {
262      if (table != null) {
263        table.close();
264      }
265    }
266  }
267
268  @Test
269  public void testParallelGetsAndScans() throws IOException, InterruptedException {
270    Table table = null;
271    try {
272      latch = new CountDownLatch(2);
273      // Check if get() returns blocks on its close() itself
274      getLatch = new CountDownLatch(1);
275      final TableName tableName = TableName.valueOf(name.getMethodName());
276      // Create KV that will give you two blocks
277      // Create a table with block size as 1024
278      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
279        CustomInnerRegionObserver.class.getName());
280      // get the block cache and region
281      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
282      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
283      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
284      HStore store = region.getStores().iterator().next();
285      CacheConfig cacheConf = store.getCacheConfig();
286      cacheConf.setCacheDataOnWrite(true);
287      cacheConf.setEvictOnClose(true);
288      BlockCache cache = cacheConf.getBlockCache().get();
289
290      insertData(table);
291      // flush the data
292      System.out.println("Flushing cache");
293      // Should create one Hfile with 2 blocks
294      region.flush(true);
295      // Create three sets of scan
296      CustomInnerRegionObserver.waitForGets.set(true);
297      ScanThread[] scanThreads = initiateScan(table, false);
298      // Create three sets of gets
299      GetThread[] getThreads = initiateGet(table, false, false);
300      checkForBlockEviction(cache, false, false);
301      CustomInnerRegionObserver.waitForGets.set(false);
302      checkForBlockEviction(cache, false, false);
303      for (GetThread thread : getThreads) {
304        thread.join();
305      }
306      // Verify whether the gets have returned the blocks that it had
307      CustomInnerRegionObserver.waitForGets.set(true);
308      // giving some time for the block to be decremented
309      checkForBlockEviction(cache, true, false);
310      getLatch.countDown();
311      for (ScanThread thread : scanThreads) {
312        thread.join();
313      }
314      System.out.println("Scans should have returned the bloks");
315      // Check with either true or false
316      CustomInnerRegionObserver.waitForGets.set(false);
317      // The scan should also have released the blocks by now
318      checkForBlockEviction(cache, true, true);
319    } finally {
320      if (table != null) {
321        table.close();
322      }
323    }
324  }
325
326  @Test
327  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
328    Table table = null;
329    try {
330      latch = new CountDownLatch(1);
331      // Check if get() returns blocks on its close() itself
332      getLatch = new CountDownLatch(1);
333      final TableName tableName = TableName.valueOf(name.getMethodName());
334      // Create KV that will give you two blocks
335      // Create a table with block size as 1024
336      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
337        CustomInnerRegionObserver.class.getName());
338      // get the block cache and region
339      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
340      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
341      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
342      HStore store = region.getStores().iterator().next();
343      CacheConfig cacheConf = store.getCacheConfig();
344      cacheConf.setCacheDataOnWrite(true);
345      cacheConf.setEvictOnClose(true);
346      BlockCache cache = cacheConf.getBlockCache().get();
347
348      Put put = new Put(ROW);
349      put.addColumn(FAMILY, QUALIFIER, data);
350      table.put(put);
351      region.flush(true);
352      put = new Put(ROW1);
353      put.addColumn(FAMILY, QUALIFIER, data);
354      table.put(put);
355      region.flush(true);
356      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
357      put = new Put(ROW);
358      put.addColumn(FAMILY, QUALIFIER2, data2);
359      table.put(put);
360      region.flush(true);
361      // flush the data
362      System.out.println("Flushing cache");
363      // Should create one Hfile with 2 blocks
364      CustomInnerRegionObserver.waitForGets.set(true);
365      // Create three sets of gets
366      GetThread[] getThreads = initiateGet(table, false, false);
367      Thread.sleep(200);
368      CustomInnerRegionObserver.getCdl().get().countDown();
369      for (GetThread thread : getThreads) {
370        thread.join();
371      }
372      // Verify whether the gets have returned the blocks that it had
373      CustomInnerRegionObserver.waitForGets.set(true);
374      // giving some time for the block to be decremented
375      checkForBlockEviction(cache, true, false);
376      getLatch.countDown();
377      System.out.println("Gets should have returned the bloks");
378    } finally {
379      if (table != null) {
380        table.close();
381      }
382    }
383  }
384
385  @Test
386  // TODO : check how block index works here
387  public void testGetsWithMultiColumnsAndExplicitTracker()
388    throws IOException, InterruptedException {
389    Table table = null;
390    try {
391      latch = new CountDownLatch(1);
392      // Check if get() returns blocks on its close() itself
393      getLatch = new CountDownLatch(1);
394      final TableName tableName = TableName.valueOf(name.getMethodName());
395      // Create KV that will give you two blocks
396      // Create a table with block size as 1024
397      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
398        CustomInnerRegionObserver.class.getName());
399      // get the block cache and region
400      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
401      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
402      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
403      BlockCache cache = setCacheProperties(region);
404      Put put = new Put(ROW);
405      put.addColumn(FAMILY, QUALIFIER, data);
406      table.put(put);
407      region.flush(true);
408      put = new Put(ROW1);
409      put.addColumn(FAMILY, QUALIFIER, data);
410      table.put(put);
411      region.flush(true);
412      for (int i = 1; i < 10; i++) {
413        put = new Put(ROW);
414        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
415        table.put(put);
416        if (i % 2 == 0) {
417          region.flush(true);
418        }
419      }
420      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
421      put = new Put(ROW);
422      put.addColumn(FAMILY, QUALIFIER2, data2);
423      table.put(put);
424      region.flush(true);
425      // flush the data
426      System.out.println("Flushing cache");
427      // Should create one Hfile with 2 blocks
428      CustomInnerRegionObserver.waitForGets.set(true);
429      // Create three sets of gets
430      GetThread[] getThreads = initiateGet(table, true, false);
431      Thread.sleep(200);
432      Iterator<CachedBlock> iterator = cache.iterator();
433      boolean usedBlocksFound = false;
434      int refCount = 0;
435      int noOfBlocksWithRef = 0;
436      while (iterator.hasNext()) {
437        CachedBlock next = iterator.next();
438        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
439        if (cache instanceof BucketCache) {
440          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
441        } else if (cache instanceof CombinedBlockCache) {
442          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
443        } else {
444          continue;
445        }
446        if (refCount != 0) {
447          // Blocks will be with count 3
448          System.out.println("The refCount is " + refCount);
449          assertEquals(NO_OF_THREADS, refCount);
450          usedBlocksFound = true;
451          noOfBlocksWithRef++;
452        }
453      }
454      assertTrue(usedBlocksFound);
455      // the number of blocks referred
456      assertEquals(10, noOfBlocksWithRef);
457      CustomInnerRegionObserver.getCdl().get().countDown();
458      for (GetThread thread : getThreads) {
459        thread.join();
460      }
461      // Verify whether the gets have returned the blocks that it had
462      CustomInnerRegionObserver.waitForGets.set(true);
463      // giving some time for the block to be decremented
464      checkForBlockEviction(cache, true, false);
465      getLatch.countDown();
466      System.out.println("Gets should have returned the bloks");
467    } finally {
468      if (table != null) {
469        table.close();
470      }
471    }
472  }
473
474  @Test
475  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
476    Table table = null;
477    try {
478      latch = new CountDownLatch(1);
479      // Check if get() returns blocks on its close() itself
480      getLatch = new CountDownLatch(1);
481      final TableName tableName = TableName.valueOf(name.getMethodName());
482      // Create KV that will give you two blocks
483      // Create a table with block size as 1024
484      byte[][] fams = new byte[10][];
485      fams[0] = FAMILY;
486      for (int i = 1; i < 10; i++) {
487        fams[i] = (Bytes.toBytes("testFamily" + i));
488      }
489      table =
490        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
491      // get the block cache and region
492      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
493      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
494      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
495      BlockCache cache = setCacheProperties(region);
496
497      Put put = new Put(ROW);
498      put.addColumn(FAMILY, QUALIFIER, data);
499      table.put(put);
500      region.flush(true);
501      put = new Put(ROW1);
502      put.addColumn(FAMILY, QUALIFIER, data);
503      table.put(put);
504      region.flush(true);
505      for (int i = 1; i < 10; i++) {
506        put = new Put(ROW);
507        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
508        table.put(put);
509        if (i % 2 == 0) {
510          region.flush(true);
511        }
512      }
513      region.flush(true);
514      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
515      put = new Put(ROW);
516      put.addColumn(FAMILY, QUALIFIER2, data2);
517      table.put(put);
518      region.flush(true);
519      // flush the data
520      System.out.println("Flushing cache");
521      // Should create one Hfile with 2 blocks
522      CustomInnerRegionObserver.waitForGets.set(true);
523      // Create three sets of gets
524      GetThread[] getThreads = initiateGet(table, true, true);
525      Thread.sleep(200);
526      Iterator<CachedBlock> iterator = cache.iterator();
527      boolean usedBlocksFound = false;
528      int refCount = 0;
529      int noOfBlocksWithRef = 0;
530      while (iterator.hasNext()) {
531        CachedBlock next = iterator.next();
532        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
533        if (cache instanceof BucketCache) {
534          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
535        } else if (cache instanceof CombinedBlockCache) {
536          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
537        } else {
538          continue;
539        }
540        if (refCount != 0) {
541          // Blocks will be with count 3
542          System.out.println("The refCount is " + refCount);
543          assertEquals(NO_OF_THREADS, refCount);
544          usedBlocksFound = true;
545          noOfBlocksWithRef++;
546        }
547      }
548      assertTrue(usedBlocksFound);
549      // the number of blocks referred
550      assertEquals(3, noOfBlocksWithRef);
551      CustomInnerRegionObserver.getCdl().get().countDown();
552      for (GetThread thread : getThreads) {
553        thread.join();
554      }
555      // Verify whether the gets have returned the blocks that it had
556      CustomInnerRegionObserver.waitForGets.set(true);
557      // giving some time for the block to be decremented
558      checkForBlockEviction(cache, true, false);
559      getLatch.countDown();
560      System.out.println("Gets should have returned the bloks");
561    } finally {
562      if (table != null) {
563        table.close();
564      }
565    }
566  }
567
568  @Test
569  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
570    Table table = null;
571    try {
572      final TableName tableName = TableName.valueOf(name.getMethodName());
573      TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
574      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
575      // two daughter regions are opened and a compaction is scheduled to get rid of reference
576      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
577      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
578      // with compaction disabled.
579      table = TEST_UTIL.createTable(
580        TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
581        null, BloomType.ROW, 1024, null);
582      // get the block cache and region
583      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
584      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
585      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
586      HStore store = region.getStores().iterator().next();
587      CacheConfig cacheConf = store.getCacheConfig();
588      cacheConf.setEvictOnClose(true);
589      BlockCache cache = cacheConf.getBlockCache().get();
590
591      Put put = new Put(ROW);
592      put.addColumn(FAMILY, QUALIFIER, data);
593      table.put(put);
594      region.flush(true);
595      put = new Put(ROW1);
596      put.addColumn(FAMILY, QUALIFIER, data);
597      table.put(put);
598      region.flush(true);
599      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
600      put = new Put(ROW2);
601      put.addColumn(FAMILY, QUALIFIER2, data2);
602      table.put(put);
603      put = new Put(ROW3);
604      put.addColumn(FAMILY, QUALIFIER2, data2);
605      table.put(put);
606      region.flush(true);
607      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
608      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
609      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
610        regionCount);
611      TEST_UTIL.getAdmin().split(tableName, ROW1);
612      // Wait for splits
613      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
614      region.compact(true);
615      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
616      for (HRegion r : regions) {
617        LOG.info("" + r.getCompactionState());
618        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
619      }
620      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
621      Iterator<CachedBlock> iterator = cache.iterator();
622      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
623      // should be closed inorder to return those blocks
624      iterateBlockCache(cache, iterator);
625    } finally {
626      if (table != null) {
627        table.close();
628      }
629    }
630  }
631
632  @Test
633  public void testMultiGets() throws IOException, InterruptedException {
634    Table table = null;
635    try {
636      latch = new CountDownLatch(2);
637      // Check if get() returns blocks on its close() itself
638      getLatch = new CountDownLatch(1);
639      final TableName tableName = TableName.valueOf(name.getMethodName());
640      // Create KV that will give you two blocks
641      // Create a table with block size as 1024
642      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
643        CustomInnerRegionObserver.class.getName());
644      // get the block cache and region
645      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
646      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
647      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
648      HStore store = region.getStores().iterator().next();
649      CacheConfig cacheConf = store.getCacheConfig();
650      cacheConf.setCacheDataOnWrite(true);
651      cacheConf.setEvictOnClose(true);
652      BlockCache cache = cacheConf.getBlockCache().get();
653
654      Put put = new Put(ROW);
655      put.addColumn(FAMILY, QUALIFIER, data);
656      table.put(put);
657      region.flush(true);
658      put = new Put(ROW1);
659      put.addColumn(FAMILY, QUALIFIER, data);
660      table.put(put);
661      region.flush(true);
662      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
663      put = new Put(ROW);
664      put.addColumn(FAMILY, QUALIFIER2, data2);
665      table.put(put);
666      region.flush(true);
667      // flush the data
668      System.out.println("Flushing cache");
669      // Should create one Hfile with 2 blocks
670      CustomInnerRegionObserver.waitForGets.set(true);
671      // Create three sets of gets
672      MultiGetThread[] getThreads = initiateMultiGet(table);
673      Thread.sleep(200);
674      int refCount;
675      Iterator<CachedBlock> iterator = cache.iterator();
676      boolean foundNonZeroBlock = false;
677      while (iterator.hasNext()) {
678        CachedBlock next = iterator.next();
679        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
680        if (cache instanceof BucketCache) {
681          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
682        } else if (cache instanceof CombinedBlockCache) {
683          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
684        } else {
685          continue;
686        }
687        if (refCount != 0) {
688          assertEquals(NO_OF_THREADS, refCount);
689          foundNonZeroBlock = true;
690        }
691      }
692      assertTrue("Should have found nonzero ref count block", foundNonZeroBlock);
693      CustomInnerRegionObserver.getCdl().get().countDown();
694      CustomInnerRegionObserver.getCdl().get().countDown();
695      for (MultiGetThread thread : getThreads) {
696        thread.join();
697      }
698      // Verify whether the gets have returned the blocks that it had
699      CustomInnerRegionObserver.waitForGets.set(true);
700      // giving some time for the block to be decremented
701      iterateBlockCache(cache, iterator);
702      getLatch.countDown();
703      System.out.println("Gets should have returned the bloks");
704    } finally {
705      if (table != null) {
706        table.close();
707      }
708    }
709  }
710
711  @Test
712  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
713    Table table = null;
714    try {
715      latch = new CountDownLatch(1);
716      // Check if get() returns blocks on its close() itself
717      final TableName tableName = TableName.valueOf(name.getMethodName());
718      // Create KV that will give you two blocks
719      // Create a table with block size as 1024
720      byte[][] fams = new byte[10][];
721      fams[0] = FAMILY;
722      for (int i = 1; i < 10; i++) {
723        fams[i] = (Bytes.toBytes("testFamily" + i));
724      }
725      table =
726        TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName());
727      // get the block cache and region
728      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
729      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
730      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
731      BlockCache cache = setCacheProperties(region);
732
733      Put put = new Put(ROW);
734      put.addColumn(FAMILY, QUALIFIER, data);
735      table.put(put);
736      region.flush(true);
737      put = new Put(ROW1);
738      put.addColumn(FAMILY, QUALIFIER, data);
739      table.put(put);
740      region.flush(true);
741      for (int i = 1; i < 10; i++) {
742        put = new Put(ROW);
743        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
744        table.put(put);
745        if (i % 2 == 0) {
746          region.flush(true);
747        }
748      }
749      region.flush(true);
750      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
751      put = new Put(ROW);
752      put.addColumn(FAMILY, QUALIFIER2, data2);
753      table.put(put);
754      region.flush(true);
755      // flush the data
756      System.out.println("Flushing cache");
757      // Should create one Hfile with 2 blocks
758      // Create three sets of gets
759      ScanThread[] scanThreads = initiateScan(table, true);
760      Thread.sleep(200);
761      Iterator<CachedBlock> iterator = cache.iterator();
762      boolean usedBlocksFound = false;
763      int refCount = 0;
764      int noOfBlocksWithRef = 0;
765      while (iterator.hasNext()) {
766        CachedBlock next = iterator.next();
767        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
768        if (cache instanceof BucketCache) {
769          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
770        } else if (cache instanceof CombinedBlockCache) {
771          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
772        } else {
773          continue;
774        }
775        if (refCount != 0) {
776          // Blocks will be with count 3
777          System.out.println("The refCount is " + refCount);
778          assertEquals(NO_OF_THREADS, refCount);
779          usedBlocksFound = true;
780          noOfBlocksWithRef++;
781        }
782      }
783      assertTrue(usedBlocksFound);
784      // the number of blocks referred
785      assertEquals(12, noOfBlocksWithRef);
786      CustomInnerRegionObserver.getCdl().get().countDown();
787      for (ScanThread thread : scanThreads) {
788        thread.join();
789      }
790      // giving some time for the block to be decremented
791      checkForBlockEviction(cache, true, false);
792    } finally {
793      if (table != null) {
794        table.close();
795      }
796    }
797  }
798
799  private BlockCache setCacheProperties(HRegion region) {
800    Iterator<HStore> strItr = region.getStores().iterator();
801    BlockCache cache = null;
802    while (strItr.hasNext()) {
803      HStore store = strItr.next();
804      CacheConfig cacheConf = store.getCacheConfig();
805      cacheConf.setCacheDataOnWrite(true);
806      cacheConf.setEvictOnClose(true);
807      // Use the last one
808      cache = cacheConf.getBlockCache().get();
809    }
810    return cache;
811  }
812
813  @Test
814  public void testParallelGetsAndScanWithWrappedRegionScanner()
815    throws IOException, InterruptedException {
816    Table table = null;
817    try {
818      latch = new CountDownLatch(2);
819      // Check if get() returns blocks on its close() itself
820      getLatch = new CountDownLatch(1);
821      final TableName tableName = TableName.valueOf(name.getMethodName());
822      // Create KV that will give you two blocks
823      // Create a table with block size as 1024
824      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
825        CustomInnerRegionObserverWrapper.class.getName());
826      // get the block cache and region
827      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
828      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
829      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
830      HStore store = region.getStores().iterator().next();
831      CacheConfig cacheConf = store.getCacheConfig();
832      cacheConf.setCacheDataOnWrite(true);
833      cacheConf.setEvictOnClose(true);
834      BlockCache cache = cacheConf.getBlockCache().get();
835
836      // insert data. 2 Rows are added
837      insertData(table);
838      // flush the data
839      System.out.println("Flushing cache");
840      // Should create one Hfile with 2 blocks
841      region.flush(true);
842      // CustomInnerRegionObserver.sleepTime.set(5000);
843      // Create three sets of scan
844      CustomInnerRegionObserver.waitForGets.set(true);
845      ScanThread[] scanThreads = initiateScan(table, false);
846      // Create three sets of gets
847      GetThread[] getThreads = initiateGet(table, false, false);
848      // The block would have been decremented for the scan case as it was
849      // wrapped
850      // before even the postNext hook gets executed.
851      // giving some time for the block to be decremented
852      Thread.sleep(100);
853      CustomInnerRegionObserver.waitForGets.set(false);
854      checkForBlockEviction(cache, false, false);
855      // countdown the latch
856      CustomInnerRegionObserver.getCdl().get().countDown();
857      for (GetThread thread : getThreads) {
858        thread.join();
859      }
860      getLatch.countDown();
861      for (ScanThread thread : scanThreads) {
862        thread.join();
863      }
864    } finally {
865      if (table != null) {
866        table.close();
867      }
868    }
869  }
870
871  @Test
872  public void testScanWithCompaction() throws IOException, InterruptedException {
873    testScanWithCompactionInternals(name.getMethodName(), false);
874  }
875
876  @Test
877  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
878    testScanWithCompactionInternals(name.getMethodName(), true);
879  }
880
881  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
882    throws IOException, InterruptedException {
883    Table table = null;
884    try {
885      latch = new CountDownLatch(1);
886      compactionLatch = new CountDownLatch(1);
887      TableName tableName = TableName.valueOf(tableNameStr);
888      // Create a table with block size as 1024
889      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
890        CustomInnerRegionObserverWrapper.class.getName());
891      // get the block cache and region
892      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
893      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
894      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
895      HStore store = region.getStores().iterator().next();
896      CacheConfig cacheConf = store.getCacheConfig();
897      cacheConf.setCacheDataOnWrite(true);
898      cacheConf.setEvictOnClose(true);
899      BlockCache cache = cacheConf.getBlockCache().get();
900
901      // insert data. 2 Rows are added
902      Put put = new Put(ROW);
903      put.addColumn(FAMILY, QUALIFIER, data);
904      table.put(put);
905      put = new Put(ROW1);
906      put.addColumn(FAMILY, QUALIFIER, data);
907      table.put(put);
908      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
909      // Should create one Hfile with 2 blocks
910      region.flush(true);
911      // read the data and expect same blocks, one new hit, no misses
912      int refCount = 0;
913      // Check how this miss is happening
914      // insert a second column, read the row, no new blocks, 3 new hits
915      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
916      byte[] data2 = Bytes.add(data, data);
917      put = new Put(ROW);
918      put.addColumn(FAMILY, QUALIFIER2, data2);
919      table.put(put);
920      // flush, one new block
921      System.out.println("Flushing cache");
922      region.flush(true);
923      Iterator<CachedBlock> iterator = cache.iterator();
924      iterateBlockCache(cache, iterator);
925      // Create three sets of scan
926      ScanThread[] scanThreads = initiateScan(table, reversed);
927      Thread.sleep(100);
928      iterator = cache.iterator();
929      boolean usedBlocksFound = false;
930      while (iterator.hasNext()) {
931        CachedBlock next = iterator.next();
932        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
933        if (cache instanceof BucketCache) {
934          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
935        } else if (cache instanceof CombinedBlockCache) {
936          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
937        } else {
938          continue;
939        }
940        if (refCount != 0) {
941          // Blocks will be with count 3
942          assertEquals(NO_OF_THREADS, refCount);
943          usedBlocksFound = true;
944        }
945      }
946      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
947      usedBlocksFound = false;
948      System.out.println("Compacting");
949      assertEquals(2, store.getStorefilesCount());
950      store.triggerMajorCompaction();
951      region.compact(true);
952      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
953      assertEquals(1, store.getStorefilesCount());
954      // Even after compaction is done we will have some blocks that cannot
955      // be evicted this is because the scan is still referencing them
956      iterator = cache.iterator();
957      while (iterator.hasNext()) {
958        CachedBlock next = iterator.next();
959        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
960        if (cache instanceof BucketCache) {
961          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
962        } else if (cache instanceof CombinedBlockCache) {
963          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
964        } else {
965          continue;
966        }
967        if (refCount != 0) {
968          // Blocks will be with count 3 as they are not yet cleared
969          assertEquals(NO_OF_THREADS, refCount);
970          usedBlocksFound = true;
971        }
972      }
973      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
974      // Should not throw exception
975      compactionLatch.countDown();
976      latch.countDown();
977      for (ScanThread thread : scanThreads) {
978        thread.join();
979      }
980      // by this time all blocks should have been evicted
981      iterator = cache.iterator();
982      iterateBlockCache(cache, iterator);
983      Result r = table.get(new Get(ROW));
984      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
985      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
986      // The gets would be working on new blocks
987      iterator = cache.iterator();
988      iterateBlockCache(cache, iterator);
989    } finally {
990      if (table != null) {
991        table.close();
992      }
993    }
994  }
995
996  @Test
997  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
998    throws IOException, InterruptedException {
999    // do flush and scan in parallel
1000    Table table = null;
1001    try {
1002      latch = new CountDownLatch(1);
1003      compactionLatch = new CountDownLatch(1);
1004      final TableName tableName = TableName.valueOf(name.getMethodName());
1005      // Create a table with block size as 1024
1006      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1007        CustomInnerRegionObserverWrapper.class.getName());
1008      // get the block cache and region
1009      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1010      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1011      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1012      HStore store = region.getStores().iterator().next();
1013      CacheConfig cacheConf = store.getCacheConfig();
1014      cacheConf.setCacheDataOnWrite(true);
1015      cacheConf.setEvictOnClose(true);
1016      BlockCache cache = cacheConf.getBlockCache().get();
1017
1018      // insert data. 2 Rows are added
1019      Put put = new Put(ROW);
1020      put.addColumn(FAMILY, QUALIFIER, data);
1021      table.put(put);
1022      put = new Put(ROW1);
1023      put.addColumn(FAMILY, QUALIFIER, data);
1024      table.put(put);
1025      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1026      // Should create one Hfile with 2 blocks
1027      region.flush(true);
1028      // read the data and expect same blocks, one new hit, no misses
1029      int refCount = 0;
1030      // Check how this miss is happening
1031      // insert a second column, read the row, no new blocks, 3 new hits
1032      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1033      byte[] data2 = Bytes.add(data, data);
1034      put = new Put(ROW);
1035      put.addColumn(FAMILY, QUALIFIER2, data2);
1036      table.put(put);
1037      // flush, one new block
1038      System.out.println("Flushing cache");
1039      region.flush(true);
1040      Iterator<CachedBlock> iterator = cache.iterator();
1041      iterateBlockCache(cache, iterator);
1042      // Create three sets of scan
1043      ScanThread[] scanThreads = initiateScan(table, false);
1044      Thread.sleep(100);
1045      iterator = cache.iterator();
1046      boolean usedBlocksFound = false;
1047      while (iterator.hasNext()) {
1048        CachedBlock next = iterator.next();
1049        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1050        if (cache instanceof BucketCache) {
1051          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1052        } else if (cache instanceof CombinedBlockCache) {
1053          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1054        } else {
1055          continue;
1056        }
1057        if (refCount != 0) {
1058          // Blocks will be with count 3
1059          assertEquals(NO_OF_THREADS, refCount);
1060          usedBlocksFound = true;
1061        }
1062      }
1063      // Make a put and do a flush
1064      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1065      data2 = Bytes.add(data, data);
1066      put = new Put(ROW1);
1067      put.addColumn(FAMILY, QUALIFIER2, data2);
1068      table.put(put);
1069      // flush, one new block
1070      System.out.println("Flushing cache");
1071      region.flush(true);
1072      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1073      usedBlocksFound = false;
1074      System.out.println("Compacting");
1075      assertEquals(3, store.getStorefilesCount());
1076      store.triggerMajorCompaction();
1077      region.compact(true);
1078      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1079      assertEquals(1, store.getStorefilesCount());
1080      // Even after compaction is done we will have some blocks that cannot
1081      // be evicted this is because the scan is still referencing them
1082      iterator = cache.iterator();
1083      while (iterator.hasNext()) {
1084        CachedBlock next = iterator.next();
1085        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1086        if (cache instanceof BucketCache) {
1087          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1088        } else if (cache instanceof CombinedBlockCache) {
1089          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1090        } else {
1091          continue;
1092        }
1093        if (refCount != 0) {
1094          // Blocks will be with count 3 as they are not yet cleared
1095          assertEquals(NO_OF_THREADS, refCount);
1096          usedBlocksFound = true;
1097        }
1098      }
1099      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1100      // Should not throw exception
1101      compactionLatch.countDown();
1102      latch.countDown();
1103      for (ScanThread thread : scanThreads) {
1104        thread.join();
1105      }
1106      // by this time all blocks should have been evicted
1107      iterator = cache.iterator();
1108      // Since a flush and compaction happened after a scan started
1109      // we need to ensure that all the original blocks of the compacted file
1110      // is also removed.
1111      iterateBlockCache(cache, iterator);
1112      Result r = table.get(new Get(ROW));
1113      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1114      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1115      // The gets would be working on new blocks
1116      iterator = cache.iterator();
1117      iterateBlockCache(cache, iterator);
1118    } finally {
1119      if (table != null) {
1120        table.close();
1121      }
1122    }
1123  }
1124
1125  @Test
1126  public void testScanWithException() throws IOException, InterruptedException {
1127    Table table = null;
1128    try {
1129      latch = new CountDownLatch(1);
1130      exceptionLatch = new CountDownLatch(1);
1131      final TableName tableName = TableName.valueOf(name.getMethodName());
1132      // Create KV that will give you two blocks
1133      // Create a table with block size as 1024
1134      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1135        CustomInnerRegionObserverWrapper.class.getName());
1136      // get the block cache and region
1137      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1138      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1139      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1140      HStore store = region.getStores().iterator().next();
1141      CacheConfig cacheConf = store.getCacheConfig();
1142      cacheConf.setCacheDataOnWrite(true);
1143      cacheConf.setEvictOnClose(true);
1144      BlockCache cache = cacheConf.getBlockCache().get();
1145      // insert data. 2 Rows are added
1146      insertData(table);
1147      // flush the data
1148      System.out.println("Flushing cache");
1149      // Should create one Hfile with 2 blocks
1150      region.flush(true);
1151      // CustomInnerRegionObserver.sleepTime.set(5000);
1152      CustomInnerRegionObserver.throwException.set(true);
1153      ScanThread[] scanThreads = initiateScan(table, false);
1154      // The block would have been decremented for the scan case as it was
1155      // wrapped
1156      // before even the postNext hook gets executed.
1157      // giving some time for the block to be decremented
1158      Thread.sleep(100);
1159      Iterator<CachedBlock> iterator = cache.iterator();
1160      boolean usedBlocksFound = false;
1161      int refCount = 0;
1162      while (iterator.hasNext()) {
1163        CachedBlock next = iterator.next();
1164        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1165        if (cache instanceof BucketCache) {
1166          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1167        } else if (cache instanceof CombinedBlockCache) {
1168          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1169        } else {
1170          continue;
1171        }
1172        if (refCount != 0) {
1173          // Blocks will be with count 3
1174          assertEquals(NO_OF_THREADS, refCount);
1175          usedBlocksFound = true;
1176        }
1177      }
1178      assertTrue(usedBlocksFound);
1179      exceptionLatch.countDown();
1180      // countdown the latch
1181      CustomInnerRegionObserver.getCdl().get().countDown();
1182      for (ScanThread thread : scanThreads) {
1183        thread.join();
1184      }
1185      iterator = cache.iterator();
1186      usedBlocksFound = false;
1187      refCount = 0;
1188      while (iterator.hasNext()) {
1189        CachedBlock next = iterator.next();
1190        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1191        if (cache instanceof BucketCache) {
1192          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1193        } else if (cache instanceof CombinedBlockCache) {
1194          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1195        } else {
1196          continue;
1197        }
1198        if (refCount != 0) {
1199          // Blocks will be with count 3
1200          assertEquals(NO_OF_THREADS, refCount);
1201          usedBlocksFound = true;
1202        }
1203      }
1204      assertFalse(usedBlocksFound);
1205      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1206      assertEquals(0, refCount);
1207    } finally {
1208      if (table != null) {
1209        table.close();
1210      }
1211    }
1212  }
1213
1214  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1215    int refCount;
1216    while (iterator.hasNext()) {
1217      CachedBlock next = iterator.next();
1218      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1219      if (cache instanceof BucketCache) {
1220        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1221        LOG.info("BucketCache {} {}", cacheKey, refCount);
1222      } else if (cache instanceof CombinedBlockCache) {
1223        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1224        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1225      } else {
1226        continue;
1227      }
1228      assertEquals(0, refCount);
1229    }
1230  }
1231
1232  private void insertData(Table table) throws IOException {
1233    Put put = new Put(ROW);
1234    put.addColumn(FAMILY, QUALIFIER, data);
1235    table.put(put);
1236    put = new Put(ROW1);
1237    put.addColumn(FAMILY, QUALIFIER, data);
1238    table.put(put);
1239    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1240    put = new Put(ROW);
1241    put.addColumn(FAMILY, QUALIFIER2, data2);
1242    table.put(put);
1243  }
1244
1245  private ScanThread[] initiateScan(Table table, boolean reverse)
1246    throws IOException, InterruptedException {
1247    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1248    for (int i = 0; i < NO_OF_THREADS; i++) {
1249      scanThreads[i] = new ScanThread(table, reverse);
1250    }
1251    for (ScanThread thread : scanThreads) {
1252      thread.start();
1253    }
1254    return scanThreads;
1255  }
1256
1257  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1258    throws IOException, InterruptedException {
1259    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1260    for (int i = 0; i < NO_OF_THREADS; i++) {
1261      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1262    }
1263    for (GetThread thread : getThreads) {
1264      thread.start();
1265    }
1266    return getThreads;
1267  }
1268
1269  private MultiGetThread[] initiateMultiGet(Table table) throws IOException, InterruptedException {
1270    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1271    for (int i = 0; i < NO_OF_THREADS; i++) {
1272      multiGetThreads[i] = new MultiGetThread(table);
1273    }
1274    for (MultiGetThread thread : multiGetThreads) {
1275      thread.start();
1276    }
1277    return multiGetThreads;
1278  }
1279
1280  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1281    throws InterruptedException {
1282    int counter = NO_OF_THREADS;
1283    if (CustomInnerRegionObserver.waitForGets.get()) {
1284      // Because only one row is selected, it has only 2 blocks
1285      counter = counter - 1;
1286      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1287        Thread.sleep(100);
1288      }
1289    } else {
1290      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1291        Thread.sleep(100);
1292      }
1293    }
1294    Iterator<CachedBlock> iterator = cache.iterator();
1295    int refCount = 0;
1296    while (iterator.hasNext()) {
1297      CachedBlock next = iterator.next();
1298      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1299      if (cache instanceof BucketCache) {
1300        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1301      } else if (cache instanceof CombinedBlockCache) {
1302        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1303      } else {
1304        continue;
1305      }
1306      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1307      if (CustomInnerRegionObserver.waitForGets.get()) {
1308        if (expectOnlyZero) {
1309          assertTrue(refCount == 0);
1310        }
1311        if (refCount != 0) {
1312          // Because the scan would have also touched up on these blocks but
1313          // it
1314          // would have touched
1315          // all 3
1316          if (getClosed) {
1317            // If get has closed only the scan's blocks would be available
1318            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1319          } else {
1320            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1321          }
1322        }
1323      } else {
1324        // Because the get would have also touched up on these blocks but it
1325        // would have touched
1326        // upon only 2 additionally
1327        if (expectOnlyZero) {
1328          assertTrue(refCount == 0);
1329        }
1330        if (refCount != 0) {
1331          if (getLatch == null) {
1332            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1333          } else {
1334            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1335          }
1336        }
1337      }
1338    }
1339    CustomInnerRegionObserver.getCdl().get().countDown();
1340  }
1341
1342  private static class MultiGetThread extends Thread {
1343    private final Table table;
1344    private final List<Get> gets = new ArrayList<>();
1345
1346    public MultiGetThread(Table table) {
1347      this.table = table;
1348    }
1349
1350    @Override
1351    public void run() {
1352      gets.add(new Get(ROW));
1353      gets.add(new Get(ROW1));
1354      try {
1355        CustomInnerRegionObserver.getCdl().set(latch);
1356        Result[] r = table.get(gets);
1357        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1358        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1359      } catch (IOException e) {
1360      }
1361    }
1362  }
1363
1364  private static class GetThread extends Thread {
1365    private final Table table;
1366    private final boolean tracker;
1367    private final boolean multipleCFs;
1368
1369    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1370      this.table = table;
1371      this.tracker = tracker;
1372      this.multipleCFs = multipleCFs;
1373    }
1374
1375    @Override
1376    public void run() {
1377      try {
1378        initiateGet(table);
1379      } catch (IOException e) {
1380        // do nothing
1381      }
1382    }
1383
1384    private void initiateGet(Table table) throws IOException {
1385      Get get = new Get(ROW);
1386      if (tracker) {
1387        // Change this
1388        if (!multipleCFs) {
1389          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1390          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1391          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1392          // Unknown key
1393          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1394        } else {
1395          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1396          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1397          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1398          // Unknown key
1399          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1400        }
1401      }
1402      CustomInnerRegionObserver.getCdl().set(latch);
1403      Result r = table.get(get);
1404      System.out.println(r);
1405      if (!tracker) {
1406        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1407        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1408      } else {
1409        if (!multipleCFs) {
1410          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1411          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1412          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1413        } else {
1414          assertTrue(Bytes.equals(
1415            r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1416            data2));
1417          assertTrue(Bytes.equals(
1418            r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1419            data2));
1420          assertTrue(Bytes.equals(
1421            r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1422            data2));
1423        }
1424      }
1425    }
1426  }
1427
1428  private static class ScanThread extends Thread {
1429    private final Table table;
1430    private final boolean reverse;
1431
1432    public ScanThread(Table table, boolean reverse) {
1433      this.table = table;
1434      this.reverse = reverse;
1435    }
1436
1437    @Override
1438    public void run() {
1439      try {
1440        initiateScan(table);
1441      } catch (IOException e) {
1442        // do nothing
1443      }
1444    }
1445
1446    private void initiateScan(Table table) throws IOException {
1447      Scan scan = new Scan();
1448      if (reverse) {
1449        scan.setReversed(true);
1450      }
1451      CustomInnerRegionObserver.getCdl().set(latch);
1452      ResultScanner resScanner = table.getScanner(scan);
1453      int i = (reverse ? ROWS.length - 1 : 0);
1454      boolean resultFound = false;
1455      for (Result result : resScanner) {
1456        resultFound = true;
1457        System.out.println(result);
1458        if (!reverse) {
1459          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1460          i++;
1461        } else {
1462          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1463          i--;
1464        }
1465      }
1466      assertTrue(resultFound);
1467    }
1468  }
1469
1470  private void waitForStoreFileCount(HStore store, int count, int timeout)
1471    throws InterruptedException {
1472    long start = EnvironmentEdgeManager.currentTime();
1473    while (
1474      start + timeout > EnvironmentEdgeManager.currentTime() && store.getStorefilesCount() != count
1475    ) {
1476      Thread.sleep(100);
1477    }
1478    System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() + ", cur="
1479      + store.getStorefilesCount());
1480    assertEquals(count, store.getStorefilesCount());
1481  }
1482
1483  private static class CustomScanner implements RegionScanner {
1484
1485    private RegionScanner delegate;
1486
1487    public CustomScanner(RegionScanner delegate) {
1488      this.delegate = delegate;
1489    }
1490
1491    @Override
1492    public boolean next(List<? super ExtendedCell> results) throws IOException {
1493      return delegate.next(results);
1494    }
1495
1496    @Override
1497    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
1498      throws IOException {
1499      return delegate.next(result, scannerContext);
1500    }
1501
1502    @Override
1503    public boolean nextRaw(List<? super ExtendedCell> result) throws IOException {
1504      return delegate.nextRaw(result);
1505    }
1506
1507    @Override
1508    public boolean nextRaw(List<? super ExtendedCell> result, ScannerContext context)
1509      throws IOException {
1510      boolean nextRaw = delegate.nextRaw(result, context);
1511      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1512        try {
1513          compactionLatch.await();
1514        } catch (InterruptedException ie) {
1515        }
1516      }
1517
1518      if (CustomInnerRegionObserver.throwException.get()) {
1519        if (exceptionLatch.getCount() > 0) {
1520          try {
1521            exceptionLatch.await();
1522          } catch (InterruptedException e) {
1523          }
1524          throw new IOException("throw exception");
1525        }
1526      }
1527      return nextRaw;
1528    }
1529
1530    @Override
1531    public void close() throws IOException {
1532      delegate.close();
1533    }
1534
1535    @Override
1536    public RegionInfo getRegionInfo() {
1537      return delegate.getRegionInfo();
1538    }
1539
1540    @Override
1541    public boolean isFilterDone() throws IOException {
1542      return delegate.isFilterDone();
1543    }
1544
1545    @Override
1546    public boolean reseek(byte[] row) throws IOException {
1547      return false;
1548    }
1549
1550    @Override
1551    public long getMaxResultSize() {
1552      return delegate.getMaxResultSize();
1553    }
1554
1555    @Override
1556    public long getMvccReadPoint() {
1557      return delegate.getMvccReadPoint();
1558    }
1559
1560    @Override
1561    public int getBatch() {
1562      return delegate.getBatch();
1563    }
1564  }
1565
1566  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1567    @Override
1568    public RegionScanner postScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> e,
1569      Scan scan, RegionScanner s) throws IOException {
1570      return new CustomScanner(s);
1571    }
1572  }
1573
1574  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1575    static final AtomicInteger countOfNext = new AtomicInteger(0);
1576    static final AtomicInteger countOfGets = new AtomicInteger(0);
1577    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1578    static final AtomicBoolean throwException = new AtomicBoolean(false);
1579    private static final AtomicReference<CountDownLatch> cdl =
1580      new AtomicReference<>(new CountDownLatch(0));
1581
1582    @Override
1583    public Optional<RegionObserver> getRegionObserver() {
1584      return Optional.of(this);
1585    }
1586
1587    @Override
1588    public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> e,
1589      InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1590      slowdownCode(e, false);
1591      if (getLatch != null && getLatch.getCount() > 0) {
1592        try {
1593          getLatch.await();
1594        } catch (InterruptedException e1) {
1595        }
1596      }
1597      return hasMore;
1598    }
1599
1600    @Override
1601    public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
1602      List<Cell> results) throws IOException {
1603      slowdownCode(e, true);
1604    }
1605
1606    public static AtomicReference<CountDownLatch> getCdl() {
1607      return cdl;
1608    }
1609
1610    private void slowdownCode(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
1611      boolean isGet) {
1612      CountDownLatch latch = getCdl().get();
1613      try {
1614        System.out.println(latch.getCount() + " is the count " + isGet);
1615        if (latch.getCount() > 0) {
1616          if (isGet) {
1617            countOfGets.incrementAndGet();
1618          } else {
1619            countOfNext.incrementAndGet();
1620          }
1621          LOG.info("Waiting for the counterCountDownLatch");
1622          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1623          if (latch.getCount() > 0) {
1624            throw new RuntimeException("Can't wait more");
1625          }
1626        }
1627      } catch (InterruptedException e1) {
1628        LOG.error(e1.toString(), e1);
1629      }
1630    }
1631  }
1632}