001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicReference;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.io.hfile.BlockCache;
048import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.CachedBlock;
051import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
053import org.apache.hadoop.hbase.regionserver.BloomType;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.HStore;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.RegionScanner;
058import org.apache.hadoop.hbase.regionserver.ScannerContext;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
076
077@Category({ LargeTests.class, ClientTests.class })
078@SuppressWarnings("deprecation")
079public class TestBlockEvictionFromClient {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083      HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
084
085  private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
086  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
087  static byte[][] ROWS = new byte[2][];
088  private static int NO_OF_THREADS = 3;
089  private static byte[] ROW = Bytes.toBytes("testRow");
090  private static byte[] ROW1 = Bytes.toBytes("testRow1");
091  private static byte[] ROW2 = Bytes.toBytes("testRow2");
092  private static byte[] ROW3 = Bytes.toBytes("testRow3");
093  private static byte[] FAMILY = Bytes.toBytes("testFamily");
094  private static byte[][] FAMILIES_1 = new byte[1][0];
095  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
096  private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
097  private static byte[] data = new byte[1000];
098  private static byte[] data2 = Bytes.add(data, data);
099  protected static int SLAVES = 1;
100  private static CountDownLatch latch;
101  private static CountDownLatch getLatch;
102  private static CountDownLatch compactionLatch;
103  private static CountDownLatch exceptionLatch;
104
105  @Rule
106  public TestName name = new TestName();
107
108  /**
109   * @throws java.lang.Exception
110   */
111  @BeforeClass
112  public static void setUpBeforeClass() throws Exception {
113    ROWS[0] = ROW;
114    ROWS[1] = ROW1;
115    Configuration conf = TEST_UTIL.getConfiguration();
116    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
117        MultiRowMutationEndpoint.class.getName());
118    conf.setInt("hbase.regionserver.handler.count", 20);
119    conf.setInt("hbase.bucketcache.size", 400);
120    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
121    conf.setFloat("hfile.block.cache.size", 0.2f);
122    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
123    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
124    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
125    FAMILIES_1[0] = FAMILY;
126    TEST_UTIL.startMiniCluster(SLAVES);
127  }
128
129  /**
130   * @throws java.lang.Exception
131   */
132  @AfterClass
133  public static void tearDownAfterClass() throws Exception {
134    TEST_UTIL.shutdownMiniCluster();
135  }
136
137  /**
138   * @throws java.lang.Exception
139   */
140  @Before
141  public void setUp() throws Exception {
142    CustomInnerRegionObserver.waitForGets.set(false);
143    CustomInnerRegionObserver.countOfNext.set(0);
144    CustomInnerRegionObserver.countOfGets.set(0);
145  }
146
147  /**
148   * @throws java.lang.Exception
149   */
150  @After
151  public void tearDown() throws Exception {
152    if (latch != null) {
153      while (latch.getCount() > 0) {
154        latch.countDown();
155      }
156    }
157    if (getLatch != null) {
158      getLatch.countDown();
159    }
160    if (compactionLatch != null) {
161      compactionLatch.countDown();
162    }
163    if (exceptionLatch != null) {
164      exceptionLatch.countDown();
165    }
166    latch = null;
167    getLatch = null;
168    compactionLatch = null;
169    exceptionLatch = null;
170    CustomInnerRegionObserver.throwException.set(false);
171    // Clean up the tables for every test case
172    TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
173    for (TableName tableName : listTableNames) {
174      if (!tableName.isSystemTable()) {
175        TEST_UTIL.getAdmin().disableTable(tableName);
176        TEST_UTIL.getAdmin().deleteTable(tableName);
177      }
178    }
179  }
180
181  @Test
182  public void testBlockEvictionWithParallelScans() throws Exception {
183    Table table = null;
184    try {
185      latch = new CountDownLatch(1);
186      final TableName tableName = TableName.valueOf(name.getMethodName());
187      // Create a table with block size as 1024
188      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
189          CustomInnerRegionObserver.class.getName());
190      // get the block cache and region
191      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
192      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
193      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
194          .getRegion(regionName);
195      HStore store = region.getStores().iterator().next();
196      CacheConfig cacheConf = store.getCacheConfig();
197      cacheConf.setCacheDataOnWrite(true);
198      cacheConf.setEvictOnClose(true);
199      BlockCache cache = cacheConf.getBlockCache().get();
200
201      // insert data. 2 Rows are added
202      Put put = new Put(ROW);
203      put.addColumn(FAMILY, QUALIFIER, data);
204      table.put(put);
205      put = new Put(ROW1);
206      put.addColumn(FAMILY, QUALIFIER, data);
207      table.put(put);
208      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
209      // data was in memstore so don't expect any changes
210      // flush the data
211      // Should create one Hfile with 2 blocks
212      region.flush(true);
213      // Load cache
214      // Create three sets of scan
215      ScanThread[] scanThreads = initiateScan(table, false);
216      Thread.sleep(100);
217      checkForBlockEviction(cache, false, false);
218      for (ScanThread thread : scanThreads) {
219        thread.join();
220      }
221      // CustomInnerRegionObserver.sleepTime.set(0);
222      Iterator<CachedBlock> iterator = cache.iterator();
223      iterateBlockCache(cache, iterator);
224      // read the data and expect same blocks, one new hit, no misses
225      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
226      iterator = cache.iterator();
227      iterateBlockCache(cache, iterator);
228      // Check how this miss is happening
229      // insert a second column, read the row, no new blocks, 3 new hits
230      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
231      byte[] data2 = Bytes.add(data, data);
232      put = new Put(ROW);
233      put.addColumn(FAMILY, QUALIFIER2, data2);
234      table.put(put);
235      Result r = table.get(new Get(ROW));
236      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
237      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
238      iterator = cache.iterator();
239      iterateBlockCache(cache, iterator);
240      // flush, one new block
241      System.out.println("Flushing cache");
242      region.flush(true);
243      iterator = cache.iterator();
244      iterateBlockCache(cache, iterator);
245      // compact, net minus two blocks, two hits, no misses
246      System.out.println("Compacting");
247      assertEquals(2, store.getStorefilesCount());
248      store.triggerMajorCompaction();
249      region.compact(true);
250      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
251      assertEquals(1, store.getStorefilesCount());
252      iterator = cache.iterator();
253      iterateBlockCache(cache, iterator);
254      // read the row, this should be a cache miss because we don't cache data
255      // blocks on compaction
256      r = table.get(new Get(ROW));
257      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
258      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
259      iterator = cache.iterator();
260      iterateBlockCache(cache, iterator);
261    } finally {
262      if (table != null) {
263        table.close();
264      }
265    }
266  }
267
268  @Test
269  public void testParallelGetsAndScans() throws IOException, InterruptedException {
270    Table table = null;
271    try {
272      latch = new CountDownLatch(2);
273      // Check if get() returns blocks on its close() itself
274      getLatch = new CountDownLatch(1);
275      final TableName tableName = TableName.valueOf(name.getMethodName());
276      // Create KV that will give you two blocks
277      // Create a table with block size as 1024
278      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
279          CustomInnerRegionObserver.class.getName());
280      // get the block cache and region
281      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
282      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
283      HRegion region =
284          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
285      HStore store = region.getStores().iterator().next();
286      CacheConfig cacheConf = store.getCacheConfig();
287      cacheConf.setCacheDataOnWrite(true);
288      cacheConf.setEvictOnClose(true);
289      BlockCache cache = cacheConf.getBlockCache().get();
290
291      insertData(table);
292      // flush the data
293      System.out.println("Flushing cache");
294      // Should create one Hfile with 2 blocks
295      region.flush(true);
296      // Create three sets of scan
297      CustomInnerRegionObserver.waitForGets.set(true);
298      ScanThread[] scanThreads = initiateScan(table, false);
299      // Create three sets of gets
300      GetThread[] getThreads = initiateGet(table, false, false);
301      checkForBlockEviction(cache, false, false);
302      CustomInnerRegionObserver.waitForGets.set(false);
303      checkForBlockEviction(cache, false, false);
304      for (GetThread thread : getThreads) {
305        thread.join();
306      }
307      // Verify whether the gets have returned the blocks that it had
308      CustomInnerRegionObserver.waitForGets.set(true);
309      // giving some time for the block to be decremented
310      checkForBlockEviction(cache, true, false);
311      getLatch.countDown();
312      for (ScanThread thread : scanThreads) {
313        thread.join();
314      }
315      System.out.println("Scans should have returned the bloks");
316      // Check with either true or false
317      CustomInnerRegionObserver.waitForGets.set(false);
318      // The scan should also have released the blocks by now
319      checkForBlockEviction(cache, true, true);
320    } finally {
321      if (table != null) {
322        table.close();
323      }
324    }
325  }
326
327  @Test
328  public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
329    Table table = null;
330    try {
331      latch = new CountDownLatch(1);
332      // Check if get() returns blocks on its close() itself
333      getLatch = new CountDownLatch(1);
334      final TableName tableName = TableName.valueOf(name.getMethodName());
335      // Create KV that will give you two blocks
336      // Create a table with block size as 1024
337      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
338          CustomInnerRegionObserver.class.getName());
339      // get the block cache and region
340      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
341      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
342      HRegion region =
343          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
344      HStore store = region.getStores().iterator().next();
345      CacheConfig cacheConf = store.getCacheConfig();
346      cacheConf.setCacheDataOnWrite(true);
347      cacheConf.setEvictOnClose(true);
348      BlockCache cache = cacheConf.getBlockCache().get();
349
350      Put put = new Put(ROW);
351      put.addColumn(FAMILY, QUALIFIER, data);
352      table.put(put);
353      region.flush(true);
354      put = new Put(ROW1);
355      put.addColumn(FAMILY, QUALIFIER, data);
356      table.put(put);
357      region.flush(true);
358      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
359      put = new Put(ROW);
360      put.addColumn(FAMILY, QUALIFIER2, data2);
361      table.put(put);
362      region.flush(true);
363      // flush the data
364      System.out.println("Flushing cache");
365      // Should create one Hfile with 2 blocks
366      CustomInnerRegionObserver.waitForGets.set(true);
367      // Create three sets of gets
368      GetThread[] getThreads = initiateGet(table, false, false);
369      Thread.sleep(200);
370      CustomInnerRegionObserver.getCdl().get().countDown();
371      for (GetThread thread : getThreads) {
372        thread.join();
373      }
374      // Verify whether the gets have returned the blocks that it had
375      CustomInnerRegionObserver.waitForGets.set(true);
376      // giving some time for the block to be decremented
377      checkForBlockEviction(cache, true, false);
378      getLatch.countDown();
379      System.out.println("Gets should have returned the bloks");
380    } finally {
381      if (table != null) {
382        table.close();
383      }
384    }
385  }
386
387  @Test
388  // TODO : check how block index works here
389  public void testGetsWithMultiColumnsAndExplicitTracker()
390      throws IOException, InterruptedException {
391    Table table = null;
392    try {
393      latch = new CountDownLatch(1);
394      // Check if get() returns blocks on its close() itself
395      getLatch = new CountDownLatch(1);
396      final TableName tableName = TableName.valueOf(name.getMethodName());
397      // Create KV that will give you two blocks
398      // Create a table with block size as 1024
399      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
400          CustomInnerRegionObserver.class.getName());
401      // get the block cache and region
402      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
403      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
404      HRegion region =
405          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
406      BlockCache cache = setCacheProperties(region);
407      Put put = new Put(ROW);
408      put.addColumn(FAMILY, QUALIFIER, data);
409      table.put(put);
410      region.flush(true);
411      put = new Put(ROW1);
412      put.addColumn(FAMILY, QUALIFIER, data);
413      table.put(put);
414      region.flush(true);
415      for (int i = 1; i < 10; i++) {
416        put = new Put(ROW);
417        put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
418        table.put(put);
419        if (i % 2 == 0) {
420          region.flush(true);
421        }
422      }
423      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
424      put = new Put(ROW);
425      put.addColumn(FAMILY, QUALIFIER2, data2);
426      table.put(put);
427      region.flush(true);
428      // flush the data
429      System.out.println("Flushing cache");
430      // Should create one Hfile with 2 blocks
431      CustomInnerRegionObserver.waitForGets.set(true);
432      // Create three sets of gets
433      GetThread[] getThreads = initiateGet(table, true, false);
434      Thread.sleep(200);
435      Iterator<CachedBlock> iterator = cache.iterator();
436      boolean usedBlocksFound = false;
437      int refCount = 0;
438      int noOfBlocksWithRef = 0;
439      while (iterator.hasNext()) {
440        CachedBlock next = iterator.next();
441        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
442        if (cache instanceof BucketCache) {
443          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
444        } else if (cache instanceof CombinedBlockCache) {
445          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
446        } else {
447          continue;
448        }
449        if (refCount != 0) {
450          // Blocks will be with count 3
451          System.out.println("The refCount is " + refCount);
452          assertEquals(NO_OF_THREADS, refCount);
453          usedBlocksFound = true;
454          noOfBlocksWithRef++;
455        }
456      }
457      assertTrue(usedBlocksFound);
458      // the number of blocks referred
459      assertEquals(10, noOfBlocksWithRef);
460      CustomInnerRegionObserver.getCdl().get().countDown();
461      for (GetThread thread : getThreads) {
462        thread.join();
463      }
464      // Verify whether the gets have returned the blocks that it had
465      CustomInnerRegionObserver.waitForGets.set(true);
466      // giving some time for the block to be decremented
467      checkForBlockEviction(cache, true, false);
468      getLatch.countDown();
469      System.out.println("Gets should have returned the bloks");
470    } finally {
471      if (table != null) {
472        table.close();
473      }
474    }
475  }
476
477  @Test
478  public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
479    Table table = null;
480    try {
481      latch = new CountDownLatch(1);
482      // Check if get() returns blocks on its close() itself
483      getLatch = new CountDownLatch(1);
484      final TableName tableName = TableName.valueOf(name.getMethodName());
485      // Create KV that will give you two blocks
486      // Create a table with block size as 1024
487      byte[][] fams = new byte[10][];
488      fams[0] = FAMILY;
489      for (int i = 1; i < 10; i++) {
490        fams[i] = (Bytes.toBytes("testFamily" + i));
491      }
492      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
493          CustomInnerRegionObserver.class.getName());
494      // get the block cache and region
495      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
496      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
497      HRegion region =
498          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
499      BlockCache cache = setCacheProperties(region);
500
501      Put put = new Put(ROW);
502      put.addColumn(FAMILY, QUALIFIER, data);
503      table.put(put);
504      region.flush(true);
505      put = new Put(ROW1);
506      put.addColumn(FAMILY, QUALIFIER, data);
507      table.put(put);
508      region.flush(true);
509      for (int i = 1; i < 10; i++) {
510        put = new Put(ROW);
511        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
512        table.put(put);
513        if (i % 2 == 0) {
514          region.flush(true);
515        }
516      }
517      region.flush(true);
518      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
519      put = new Put(ROW);
520      put.addColumn(FAMILY, QUALIFIER2, data2);
521      table.put(put);
522      region.flush(true);
523      // flush the data
524      System.out.println("Flushing cache");
525      // Should create one Hfile with 2 blocks
526      CustomInnerRegionObserver.waitForGets.set(true);
527      // Create three sets of gets
528      GetThread[] getThreads = initiateGet(table, true, true);
529      Thread.sleep(200);
530      Iterator<CachedBlock> iterator = cache.iterator();
531      boolean usedBlocksFound = false;
532      int refCount = 0;
533      int noOfBlocksWithRef = 0;
534      while (iterator.hasNext()) {
535        CachedBlock next = iterator.next();
536        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
537        if (cache instanceof BucketCache) {
538          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
539        } else if (cache instanceof CombinedBlockCache) {
540          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
541        } else {
542          continue;
543        }
544        if (refCount != 0) {
545          // Blocks will be with count 3
546          System.out.println("The refCount is " + refCount);
547          assertEquals(NO_OF_THREADS, refCount);
548          usedBlocksFound = true;
549          noOfBlocksWithRef++;
550        }
551      }
552      assertTrue(usedBlocksFound);
553      // the number of blocks referred
554      assertEquals(3, noOfBlocksWithRef);
555      CustomInnerRegionObserver.getCdl().get().countDown();
556      for (GetThread thread : getThreads) {
557        thread.join();
558      }
559      // Verify whether the gets have returned the blocks that it had
560      CustomInnerRegionObserver.waitForGets.set(true);
561      // giving some time for the block to be decremented
562      checkForBlockEviction(cache, true, false);
563      getLatch.countDown();
564      System.out.println("Gets should have returned the bloks");
565    } finally {
566      if (table != null) {
567        table.close();
568      }
569    }
570  }
571
572  @Test
573  public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
574    Table table = null;
575    try {
576      final TableName tableName = TableName.valueOf(name.getMethodName());
577      TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
578      // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
579      // two daughter regions are opened and a compaction is scheduled to get rid of reference
580      // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
581      // It is flakey since compaction can kick in anytime. To solve this issue, table is created
582      // with compaction disabled.
583      table = TEST_UTIL.createTable(
584        TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
585        null, BloomType.ROW, 1024, null);
586      // get the block cache and region
587      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
588      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
589      HRegion region =
590          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
591      HStore store = region.getStores().iterator().next();
592      CacheConfig cacheConf = store.getCacheConfig();
593      cacheConf.setEvictOnClose(true);
594      BlockCache cache = cacheConf.getBlockCache().get();
595
596      Put put = new Put(ROW);
597      put.addColumn(FAMILY, QUALIFIER, data);
598      table.put(put);
599      region.flush(true);
600      put = new Put(ROW1);
601      put.addColumn(FAMILY, QUALIFIER, data);
602      table.put(put);
603      region.flush(true);
604      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
605      put = new Put(ROW2);
606      put.addColumn(FAMILY, QUALIFIER2, data2);
607      table.put(put);
608      put = new Put(ROW3);
609      put.addColumn(FAMILY, QUALIFIER2, data2);
610      table.put(put);
611      region.flush(true);
612      ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
613      int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
614      LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
615        regionCount);
616      TEST_UTIL.getAdmin().split(tableName, ROW1);
617      // Wait for splits
618      TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
619      region.compact(true);
620      List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
621      for (HRegion r: regions) {
622        LOG.info("" + r.getCompactionState());
623        TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
624      }
625      LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
626      Iterator<CachedBlock> iterator = cache.iterator();
627      // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
628      // should be closed inorder to return those blocks
629      iterateBlockCache(cache, iterator);
630    } finally {
631      if (table != null) {
632        table.close();
633      }
634    }
635  }
636
637  @Test
638  public void testMultiGets() throws IOException, InterruptedException {
639    Table table = null;
640    try {
641      latch = new CountDownLatch(2);
642      // Check if get() returns blocks on its close() itself
643      getLatch = new CountDownLatch(1);
644      final TableName tableName = TableName.valueOf(name.getMethodName());
645      // Create KV that will give you two blocks
646      // Create a table with block size as 1024
647      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
648          CustomInnerRegionObserver.class.getName());
649      // get the block cache and region
650      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
651      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
652      HRegion region =
653          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
654      HStore store = region.getStores().iterator().next();
655      CacheConfig cacheConf = store.getCacheConfig();
656      cacheConf.setCacheDataOnWrite(true);
657      cacheConf.setEvictOnClose(true);
658      BlockCache cache = cacheConf.getBlockCache().get();
659
660      Put put = new Put(ROW);
661      put.addColumn(FAMILY, QUALIFIER, data);
662      table.put(put);
663      region.flush(true);
664      put = new Put(ROW1);
665      put.addColumn(FAMILY, QUALIFIER, data);
666      table.put(put);
667      region.flush(true);
668      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
669      put = new Put(ROW);
670      put.addColumn(FAMILY, QUALIFIER2, data2);
671      table.put(put);
672      region.flush(true);
673      // flush the data
674      System.out.println("Flushing cache");
675      // Should create one Hfile with 2 blocks
676      CustomInnerRegionObserver.waitForGets.set(true);
677      // Create three sets of gets
678      MultiGetThread[] getThreads = initiateMultiGet(table);
679      Thread.sleep(200);
680      int refCount;
681      Iterator<CachedBlock> iterator = cache.iterator();
682      boolean foundNonZeroBlock = false;
683      while (iterator.hasNext()) {
684        CachedBlock next = iterator.next();
685        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
686        if (cache instanceof BucketCache) {
687          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
688        } else if (cache instanceof CombinedBlockCache) {
689          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
690        } else {
691          continue;
692        }
693        if (refCount != 0) {
694          assertEquals(NO_OF_THREADS, refCount);
695          foundNonZeroBlock = true;
696        }
697      }
698      assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
699      CustomInnerRegionObserver.getCdl().get().countDown();
700      CustomInnerRegionObserver.getCdl().get().countDown();
701      for (MultiGetThread thread : getThreads) {
702        thread.join();
703      }
704      // Verify whether the gets have returned the blocks that it had
705      CustomInnerRegionObserver.waitForGets.set(true);
706      // giving some time for the block to be decremented
707      iterateBlockCache(cache, iterator);
708      getLatch.countDown();
709      System.out.println("Gets should have returned the bloks");
710    } finally {
711      if (table != null) {
712        table.close();
713      }
714    }
715  }
716  @Test
717  public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
718    Table table = null;
719    try {
720      latch = new CountDownLatch(1);
721      // Check if get() returns blocks on its close() itself
722      final TableName tableName = TableName.valueOf(name.getMethodName());
723      // Create KV that will give you two blocks
724      // Create a table with block size as 1024
725      byte[][] fams = new byte[10][];
726      fams[0] = FAMILY;
727      for (int i = 1; i < 10; i++) {
728        fams[i] = (Bytes.toBytes("testFamily" + i));
729      }
730      table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
731          CustomInnerRegionObserver.class.getName());
732      // get the block cache and region
733      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
734      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
735      HRegion region =
736          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
737      BlockCache cache = setCacheProperties(region);
738
739      Put put = new Put(ROW);
740      put.addColumn(FAMILY, QUALIFIER, data);
741      table.put(put);
742      region.flush(true);
743      put = new Put(ROW1);
744      put.addColumn(FAMILY, QUALIFIER, data);
745      table.put(put);
746      region.flush(true);
747      for (int i = 1; i < 10; i++) {
748        put = new Put(ROW);
749        put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
750        table.put(put);
751        if (i % 2 == 0) {
752          region.flush(true);
753        }
754      }
755      region.flush(true);
756      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
757      put = new Put(ROW);
758      put.addColumn(FAMILY, QUALIFIER2, data2);
759      table.put(put);
760      region.flush(true);
761      // flush the data
762      System.out.println("Flushing cache");
763      // Should create one Hfile with 2 blocks
764      // Create three sets of gets
765      ScanThread[] scanThreads = initiateScan(table, true);
766      Thread.sleep(200);
767      Iterator<CachedBlock> iterator = cache.iterator();
768      boolean usedBlocksFound = false;
769      int refCount = 0;
770      int noOfBlocksWithRef = 0;
771      while (iterator.hasNext()) {
772        CachedBlock next = iterator.next();
773        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
774        if (cache instanceof BucketCache) {
775          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
776        } else if (cache instanceof CombinedBlockCache) {
777          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
778        } else {
779          continue;
780        }
781        if (refCount != 0) {
782          // Blocks will be with count 3
783          System.out.println("The refCount is " + refCount);
784          assertEquals(NO_OF_THREADS, refCount);
785          usedBlocksFound = true;
786          noOfBlocksWithRef++;
787        }
788      }
789      assertTrue(usedBlocksFound);
790      // the number of blocks referred
791      assertEquals(12, noOfBlocksWithRef);
792      CustomInnerRegionObserver.getCdl().get().countDown();
793      for (ScanThread thread : scanThreads) {
794        thread.join();
795      }
796      // giving some time for the block to be decremented
797      checkForBlockEviction(cache, true, false);
798    } finally {
799      if (table != null) {
800        table.close();
801      }
802    }
803  }
804
805  private BlockCache setCacheProperties(HRegion region) {
806    Iterator<HStore> strItr = region.getStores().iterator();
807    BlockCache cache = null;
808    while (strItr.hasNext()) {
809      HStore store = strItr.next();
810      CacheConfig cacheConf = store.getCacheConfig();
811      cacheConf.setCacheDataOnWrite(true);
812      cacheConf.setEvictOnClose(true);
813      // Use the last one
814      cache = cacheConf.getBlockCache().get();
815    }
816    return cache;
817  }
818
819  @Test
820  public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
821      InterruptedException {
822    Table table = null;
823    try {
824      latch = new CountDownLatch(2);
825      // Check if get() returns blocks on its close() itself
826      getLatch = new CountDownLatch(1);
827      final TableName tableName = TableName.valueOf(name.getMethodName());
828      // Create KV that will give you two blocks
829      // Create a table with block size as 1024
830      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
831          CustomInnerRegionObserverWrapper.class.getName());
832      // get the block cache and region
833      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
834      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
835      HRegion region =
836          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
837      HStore store = region.getStores().iterator().next();
838      CacheConfig cacheConf = store.getCacheConfig();
839      cacheConf.setCacheDataOnWrite(true);
840      cacheConf.setEvictOnClose(true);
841      BlockCache cache = cacheConf.getBlockCache().get();
842
843      // insert data. 2 Rows are added
844      insertData(table);
845      // flush the data
846      System.out.println("Flushing cache");
847      // Should create one Hfile with 2 blocks
848      region.flush(true);
849      // CustomInnerRegionObserver.sleepTime.set(5000);
850      // Create three sets of scan
851      CustomInnerRegionObserver.waitForGets.set(true);
852      ScanThread[] scanThreads = initiateScan(table, false);
853      // Create three sets of gets
854      GetThread[] getThreads = initiateGet(table, false, false);
855      // The block would have been decremented for the scan case as it was
856      // wrapped
857      // before even the postNext hook gets executed.
858      // giving some time for the block to be decremented
859      Thread.sleep(100);
860      CustomInnerRegionObserver.waitForGets.set(false);
861      checkForBlockEviction(cache, false, false);
862      // countdown the latch
863      CustomInnerRegionObserver.getCdl().get().countDown();
864      for (GetThread thread : getThreads) {
865        thread.join();
866      }
867      getLatch.countDown();
868      for (ScanThread thread : scanThreads) {
869        thread.join();
870      }
871    } finally {
872      if (table != null) {
873        table.close();
874      }
875    }
876  }
877
878  @Test
879  public void testScanWithCompaction() throws IOException, InterruptedException {
880    testScanWithCompactionInternals(name.getMethodName(), false);
881  }
882
883  @Test
884  public void testReverseScanWithCompaction() throws IOException, InterruptedException {
885    testScanWithCompactionInternals(name.getMethodName(), true);
886  }
887
888  private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
889      throws IOException, InterruptedException {
890    Table table = null;
891    try {
892      latch = new CountDownLatch(1);
893      compactionLatch = new CountDownLatch(1);
894      TableName tableName = TableName.valueOf(tableNameStr);
895      // Create a table with block size as 1024
896      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
897          CustomInnerRegionObserverWrapper.class.getName());
898      // get the block cache and region
899      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
900      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
901      HRegion region =
902          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
903      HStore store = region.getStores().iterator().next();
904      CacheConfig cacheConf = store.getCacheConfig();
905      cacheConf.setCacheDataOnWrite(true);
906      cacheConf.setEvictOnClose(true);
907      BlockCache cache = cacheConf.getBlockCache().get();
908
909      // insert data. 2 Rows are added
910      Put put = new Put(ROW);
911      put.addColumn(FAMILY, QUALIFIER, data);
912      table.put(put);
913      put = new Put(ROW1);
914      put.addColumn(FAMILY, QUALIFIER, data);
915      table.put(put);
916      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
917      // Should create one Hfile with 2 blocks
918      region.flush(true);
919      // read the data and expect same blocks, one new hit, no misses
920      int refCount = 0;
921      // Check how this miss is happening
922      // insert a second column, read the row, no new blocks, 3 new hits
923      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
924      byte[] data2 = Bytes.add(data, data);
925      put = new Put(ROW);
926      put.addColumn(FAMILY, QUALIFIER2, data2);
927      table.put(put);
928      // flush, one new block
929      System.out.println("Flushing cache");
930      region.flush(true);
931      Iterator<CachedBlock> iterator = cache.iterator();
932      iterateBlockCache(cache, iterator);
933      // Create three sets of scan
934      ScanThread[] scanThreads = initiateScan(table, reversed);
935      Thread.sleep(100);
936      iterator = cache.iterator();
937      boolean usedBlocksFound = false;
938      while (iterator.hasNext()) {
939        CachedBlock next = iterator.next();
940        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
941        if (cache instanceof BucketCache) {
942          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
943        } else if (cache instanceof CombinedBlockCache) {
944          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
945        } else {
946          continue;
947        }
948        if (refCount != 0) {
949          // Blocks will be with count 3
950          assertEquals(NO_OF_THREADS, refCount);
951          usedBlocksFound = true;
952        }
953      }
954      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
955      usedBlocksFound = false;
956      System.out.println("Compacting");
957      assertEquals(2, store.getStorefilesCount());
958      store.triggerMajorCompaction();
959      region.compact(true);
960      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
961      assertEquals(1, store.getStorefilesCount());
962      // Even after compaction is done we will have some blocks that cannot
963      // be evicted this is because the scan is still referencing them
964      iterator = cache.iterator();
965      while (iterator.hasNext()) {
966        CachedBlock next = iterator.next();
967        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
968        if (cache instanceof BucketCache) {
969          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
970        } else if (cache instanceof CombinedBlockCache) {
971          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
972        } else {
973          continue;
974        }
975        if (refCount != 0) {
976          // Blocks will be with count 3 as they are not yet cleared
977          assertEquals(NO_OF_THREADS, refCount);
978          usedBlocksFound = true;
979        }
980      }
981      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
982      // Should not throw exception
983      compactionLatch.countDown();
984      latch.countDown();
985      for (ScanThread thread : scanThreads) {
986        thread.join();
987      }
988      // by this time all blocks should have been evicted
989      iterator = cache.iterator();
990      iterateBlockCache(cache, iterator);
991      Result r = table.get(new Get(ROW));
992      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
993      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
994      // The gets would be working on new blocks
995      iterator = cache.iterator();
996      iterateBlockCache(cache, iterator);
997    } finally {
998      if (table != null) {
999        table.close();
1000      }
1001    }
1002  }
1003
1004  @Test
1005  public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
1006      throws IOException, InterruptedException {
1007    // do flush and scan in parallel
1008    Table table = null;
1009    try {
1010      latch = new CountDownLatch(1);
1011      compactionLatch = new CountDownLatch(1);
1012      final TableName tableName = TableName.valueOf(name.getMethodName());
1013      // Create a table with block size as 1024
1014      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1015          CustomInnerRegionObserverWrapper.class.getName());
1016      // get the block cache and region
1017      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1018      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1019      HRegion region =
1020          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1021      HStore store = region.getStores().iterator().next();
1022      CacheConfig cacheConf = store.getCacheConfig();
1023      cacheConf.setCacheDataOnWrite(true);
1024      cacheConf.setEvictOnClose(true);
1025      BlockCache cache = cacheConf.getBlockCache().get();
1026
1027      // insert data. 2 Rows are added
1028      Put put = new Put(ROW);
1029      put.addColumn(FAMILY, QUALIFIER, data);
1030      table.put(put);
1031      put = new Put(ROW1);
1032      put.addColumn(FAMILY, QUALIFIER, data);
1033      table.put(put);
1034      assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1035      // Should create one Hfile with 2 blocks
1036      region.flush(true);
1037      // read the data and expect same blocks, one new hit, no misses
1038      int refCount = 0;
1039      // Check how this miss is happening
1040      // insert a second column, read the row, no new blocks, 3 new hits
1041      byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1042      byte[] data2 = Bytes.add(data, data);
1043      put = new Put(ROW);
1044      put.addColumn(FAMILY, QUALIFIER2, data2);
1045      table.put(put);
1046      // flush, one new block
1047      System.out.println("Flushing cache");
1048      region.flush(true);
1049      Iterator<CachedBlock> iterator = cache.iterator();
1050      iterateBlockCache(cache, iterator);
1051      // Create three sets of scan
1052      ScanThread[] scanThreads = initiateScan(table, false);
1053      Thread.sleep(100);
1054      iterator = cache.iterator();
1055      boolean usedBlocksFound = false;
1056      while (iterator.hasNext()) {
1057        CachedBlock next = iterator.next();
1058        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1059        if (cache instanceof BucketCache) {
1060          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1061        } else if (cache instanceof CombinedBlockCache) {
1062          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1063        } else {
1064          continue;
1065        }
1066        if (refCount != 0) {
1067          // Blocks will be with count 3
1068          assertEquals(NO_OF_THREADS, refCount);
1069          usedBlocksFound = true;
1070        }
1071      }
1072      // Make a put and do a flush
1073      QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1074      data2 = Bytes.add(data, data);
1075      put = new Put(ROW1);
1076      put.addColumn(FAMILY, QUALIFIER2, data2);
1077      table.put(put);
1078      // flush, one new block
1079      System.out.println("Flushing cache");
1080      region.flush(true);
1081      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1082      usedBlocksFound = false;
1083      System.out.println("Compacting");
1084      assertEquals(3, store.getStorefilesCount());
1085      store.triggerMajorCompaction();
1086      region.compact(true);
1087      waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1088      assertEquals(1, store.getStorefilesCount());
1089      // Even after compaction is done we will have some blocks that cannot
1090      // be evicted this is because the scan is still referencing them
1091      iterator = cache.iterator();
1092      while (iterator.hasNext()) {
1093        CachedBlock next = iterator.next();
1094        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1095        if (cache instanceof BucketCache) {
1096          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1097        } else if (cache instanceof CombinedBlockCache) {
1098          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1099        } else {
1100          continue;
1101        }
1102        if (refCount != 0) {
1103          // Blocks will be with count 3 as they are not yet cleared
1104          assertEquals(NO_OF_THREADS, refCount);
1105          usedBlocksFound = true;
1106        }
1107      }
1108      assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1109      // Should not throw exception
1110      compactionLatch.countDown();
1111      latch.countDown();
1112      for (ScanThread thread : scanThreads) {
1113        thread.join();
1114      }
1115      // by this time all blocks should have been evicted
1116      iterator = cache.iterator();
1117      // Since a flush and compaction happened after a scan started
1118      // we need to ensure that all the original blocks of the compacted file
1119      // is also removed.
1120      iterateBlockCache(cache, iterator);
1121      Result r = table.get(new Get(ROW));
1122      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1123      assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1124      // The gets would be working on new blocks
1125      iterator = cache.iterator();
1126      iterateBlockCache(cache, iterator);
1127    } finally {
1128      if (table != null) {
1129        table.close();
1130      }
1131    }
1132  }
1133
1134
1135  @Test
1136  public void testScanWithException() throws IOException, InterruptedException {
1137    Table table = null;
1138    try {
1139      latch = new CountDownLatch(1);
1140      exceptionLatch = new CountDownLatch(1);
1141      final TableName tableName = TableName.valueOf(name.getMethodName());
1142      // Create KV that will give you two blocks
1143      // Create a table with block size as 1024
1144      table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1145          CustomInnerRegionObserverWrapper.class.getName());
1146      // get the block cache and region
1147      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1148      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1149      HRegion region =
1150          TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1151      HStore store = region.getStores().iterator().next();
1152      CacheConfig cacheConf = store.getCacheConfig();
1153      cacheConf.setCacheDataOnWrite(true);
1154      cacheConf.setEvictOnClose(true);
1155      BlockCache cache = cacheConf.getBlockCache().get();
1156      // insert data. 2 Rows are added
1157      insertData(table);
1158      // flush the data
1159      System.out.println("Flushing cache");
1160      // Should create one Hfile with 2 blocks
1161      region.flush(true);
1162      // CustomInnerRegionObserver.sleepTime.set(5000);
1163      CustomInnerRegionObserver.throwException.set(true);
1164      ScanThread[] scanThreads = initiateScan(table, false);
1165      // The block would have been decremented for the scan case as it was
1166      // wrapped
1167      // before even the postNext hook gets executed.
1168      // giving some time for the block to be decremented
1169      Thread.sleep(100);
1170      Iterator<CachedBlock> iterator = cache.iterator();
1171      boolean usedBlocksFound = false;
1172      int refCount = 0;
1173      while (iterator.hasNext()) {
1174        CachedBlock next = iterator.next();
1175        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1176        if (cache instanceof BucketCache) {
1177          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1178        } else if (cache instanceof CombinedBlockCache) {
1179          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1180        } else {
1181          continue;
1182        }
1183        if (refCount != 0) {
1184          // Blocks will be with count 3
1185          assertEquals(NO_OF_THREADS, refCount);
1186          usedBlocksFound = true;
1187        }
1188      }
1189      assertTrue(usedBlocksFound);
1190      exceptionLatch.countDown();
1191      // countdown the latch
1192      CustomInnerRegionObserver.getCdl().get().countDown();
1193      for (ScanThread thread : scanThreads) {
1194        thread.join();
1195      }
1196      iterator = cache.iterator();
1197      usedBlocksFound = false;
1198      refCount = 0;
1199      while (iterator.hasNext()) {
1200        CachedBlock next = iterator.next();
1201        BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1202        if (cache instanceof BucketCache) {
1203          refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1204        } else if (cache instanceof CombinedBlockCache) {
1205          refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1206        } else {
1207          continue;
1208        }
1209        if (refCount != 0) {
1210          // Blocks will be with count 3
1211          assertEquals(NO_OF_THREADS, refCount);
1212          usedBlocksFound = true;
1213        }
1214      }
1215      assertFalse(usedBlocksFound);
1216      // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1217      assertEquals(0, refCount);
1218    } finally {
1219      if (table != null) {
1220        table.close();
1221      }
1222    }
1223  }
1224
1225  private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1226    int refCount;
1227    while (iterator.hasNext()) {
1228      CachedBlock next = iterator.next();
1229      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1230      if (cache instanceof BucketCache) {
1231        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1232        LOG.info("BucketCache {} {}", cacheKey, refCount);
1233      } else if (cache instanceof CombinedBlockCache) {
1234        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1235        LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1236      } else {
1237        continue;
1238      }
1239      assertEquals(0, refCount);
1240    }
1241  }
1242
1243  private void insertData(Table table) throws IOException {
1244    Put put = new Put(ROW);
1245    put.addColumn(FAMILY, QUALIFIER, data);
1246    table.put(put);
1247    put = new Put(ROW1);
1248    put.addColumn(FAMILY, QUALIFIER, data);
1249    table.put(put);
1250    byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1251    put = new Put(ROW);
1252    put.addColumn(FAMILY, QUALIFIER2, data2);
1253    table.put(put);
1254  }
1255
1256  private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1257      InterruptedException {
1258    ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1259    for (int i = 0; i < NO_OF_THREADS; i++) {
1260      scanThreads[i] = new ScanThread(table, reverse);
1261    }
1262    for (ScanThread thread : scanThreads) {
1263      thread.start();
1264    }
1265    return scanThreads;
1266  }
1267
1268  private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1269      throws IOException, InterruptedException {
1270    GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1271    for (int i = 0; i < NO_OF_THREADS; i++) {
1272      getThreads[i] = new GetThread(table, tracker, multipleCFs);
1273    }
1274    for (GetThread thread : getThreads) {
1275      thread.start();
1276    }
1277    return getThreads;
1278  }
1279
1280  private MultiGetThread[] initiateMultiGet(Table table)
1281      throws IOException, InterruptedException {
1282    MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1283    for (int i = 0; i < NO_OF_THREADS; i++) {
1284      multiGetThreads[i] = new MultiGetThread(table);
1285    }
1286    for (MultiGetThread thread : multiGetThreads) {
1287      thread.start();
1288    }
1289    return multiGetThreads;
1290  }
1291
1292  private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1293      throws InterruptedException {
1294    int counter = NO_OF_THREADS;
1295    if (CustomInnerRegionObserver.waitForGets.get()) {
1296      // Because only one row is selected, it has only 2 blocks
1297      counter = counter - 1;
1298      while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1299        Thread.sleep(100);
1300      }
1301    } else {
1302      while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1303        Thread.sleep(100);
1304      }
1305    }
1306    Iterator<CachedBlock> iterator = cache.iterator();
1307    int refCount = 0;
1308    while (iterator.hasNext()) {
1309      CachedBlock next = iterator.next();
1310      BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1311      if (cache instanceof BucketCache) {
1312        refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1313      } else if (cache instanceof CombinedBlockCache) {
1314        refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1315      } else {
1316        continue;
1317      }
1318      System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1319      if (CustomInnerRegionObserver.waitForGets.get()) {
1320        if (expectOnlyZero) {
1321          assertTrue(refCount == 0);
1322        }
1323        if (refCount != 0) {
1324          // Because the scan would have also touched up on these blocks but
1325          // it
1326          // would have touched
1327          // all 3
1328          if (getClosed) {
1329            // If get has closed only the scan's blocks would be available
1330            assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1331          } else {
1332              assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1333          }
1334        }
1335      } else {
1336        // Because the get would have also touched up on these blocks but it
1337        // would have touched
1338        // upon only 2 additionally
1339        if (expectOnlyZero) {
1340          assertTrue(refCount == 0);
1341        }
1342        if (refCount != 0) {
1343          if (getLatch == null) {
1344            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1345          } else {
1346            assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1347          }
1348        }
1349      }
1350    }
1351    CustomInnerRegionObserver.getCdl().get().countDown();
1352  }
1353
1354  private static class MultiGetThread extends Thread {
1355    private final Table table;
1356    private final List<Get> gets = new ArrayList<>();
1357    public MultiGetThread(Table table) {
1358      this.table = table;
1359    }
1360    @Override
1361    public void run() {
1362      gets.add(new Get(ROW));
1363      gets.add(new Get(ROW1));
1364      try {
1365        CustomInnerRegionObserver.getCdl().set(latch);
1366        Result[] r = table.get(gets);
1367        assertTrue(Bytes.equals(r[0].getRow(), ROW));
1368        assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1369      } catch (IOException e) {
1370      }
1371    }
1372  }
1373
1374  private static class GetThread extends Thread {
1375    private final Table table;
1376    private final boolean tracker;
1377    private final boolean multipleCFs;
1378
1379    public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1380      this.table = table;
1381      this.tracker = tracker;
1382      this.multipleCFs = multipleCFs;
1383    }
1384
1385    @Override
1386    public void run() {
1387      try {
1388        initiateGet(table);
1389      } catch (IOException e) {
1390        // do nothing
1391      }
1392    }
1393
1394    private void initiateGet(Table table) throws IOException {
1395      Get get = new Get(ROW);
1396      if (tracker) {
1397        // Change this
1398        if (!multipleCFs) {
1399          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1400          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1401          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1402          // Unknown key
1403          get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1404        } else {
1405          get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1406          get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1407          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1408          // Unknown key
1409          get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1410        }
1411      }
1412      CustomInnerRegionObserver.getCdl().set(latch);
1413      Result r = table.get(get);
1414      System.out.println(r);
1415      if (!tracker) {
1416        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1417        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1418      } else {
1419        if (!multipleCFs) {
1420          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1421          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1422          assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1423        } else {
1424          assertTrue(Bytes.equals(
1425              r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1426              data2));
1427          assertTrue(Bytes.equals(
1428              r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1429              data2));
1430          assertTrue(Bytes.equals(
1431              r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1432              data2));
1433        }
1434      }
1435    }
1436  }
1437
1438  private static class ScanThread extends Thread {
1439    private final Table table;
1440    private final boolean reverse;
1441
1442    public ScanThread(Table table, boolean reverse) {
1443      this.table = table;
1444      this.reverse = reverse;
1445    }
1446
1447    @Override
1448    public void run() {
1449      try {
1450        initiateScan(table);
1451      } catch (IOException e) {
1452        // do nothing
1453      }
1454    }
1455
1456    private void initiateScan(Table table) throws IOException {
1457      Scan scan = new Scan();
1458      if (reverse) {
1459        scan.setReversed(true);
1460      }
1461      CustomInnerRegionObserver.getCdl().set(latch);
1462      ResultScanner resScanner = table.getScanner(scan);
1463      int i = (reverse ? ROWS.length - 1 : 0);
1464      boolean resultFound = false;
1465      for (Result result : resScanner) {
1466        resultFound = true;
1467        System.out.println(result);
1468        if (!reverse) {
1469          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1470          i++;
1471        } else {
1472          assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1473          i--;
1474        }
1475      }
1476      assertTrue(resultFound);
1477    }
1478  }
1479
1480  private void waitForStoreFileCount(HStore store, int count, int timeout)
1481      throws InterruptedException {
1482    long start = EnvironmentEdgeManager.currentTime();
1483    while (start + timeout > EnvironmentEdgeManager.currentTime() &&
1484        store.getStorefilesCount() != count) {
1485      Thread.sleep(100);
1486    }
1487    System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() +
1488      ", cur=" + store.getStorefilesCount());
1489    assertEquals(count, store.getStorefilesCount());
1490  }
1491
1492  private static class CustomScanner implements RegionScanner {
1493
1494    private RegionScanner delegate;
1495
1496    public CustomScanner(RegionScanner delegate) {
1497      this.delegate = delegate;
1498    }
1499
1500    @Override
1501    public boolean next(List<Cell> results) throws IOException {
1502      return delegate.next(results);
1503    }
1504
1505    @Override
1506    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1507      return delegate.next(result, scannerContext);
1508    }
1509
1510    @Override
1511    public boolean nextRaw(List<Cell> result) throws IOException {
1512      return delegate.nextRaw(result);
1513    }
1514
1515    @Override
1516    public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1517      boolean nextRaw = delegate.nextRaw(result, context);
1518      if (compactionLatch != null && compactionLatch.getCount() > 0) {
1519        try {
1520          compactionLatch.await();
1521        } catch (InterruptedException ie) {
1522        }
1523      }
1524
1525      if (CustomInnerRegionObserver.throwException.get()) {
1526        if (exceptionLatch.getCount() > 0) {
1527          try {
1528            exceptionLatch.await();
1529          } catch (InterruptedException e) {
1530          }
1531          throw new IOException("throw exception");
1532        }
1533      }
1534      return nextRaw;
1535    }
1536
1537    @Override
1538    public void close() throws IOException {
1539      delegate.close();
1540    }
1541
1542    @Override
1543    public RegionInfo getRegionInfo() {
1544      return delegate.getRegionInfo();
1545    }
1546
1547    @Override
1548    public boolean isFilterDone() throws IOException {
1549      return delegate.isFilterDone();
1550    }
1551
1552    @Override
1553    public boolean reseek(byte[] row) throws IOException {
1554      return false;
1555    }
1556
1557    @Override
1558    public long getMaxResultSize() {
1559      return delegate.getMaxResultSize();
1560    }
1561
1562    @Override
1563    public long getMvccReadPoint() {
1564      return delegate.getMvccReadPoint();
1565    }
1566
1567    @Override
1568    public int getBatch() {
1569      return delegate.getBatch();
1570    }
1571  }
1572
1573  public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1574    @Override
1575    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1576        Scan scan, RegionScanner s) throws IOException {
1577      return new CustomScanner(s);
1578    }
1579  }
1580
1581  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1582    static final AtomicInteger countOfNext = new AtomicInteger(0);
1583    static final AtomicInteger countOfGets = new AtomicInteger(0);
1584    static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1585    static final AtomicBoolean throwException = new AtomicBoolean(false);
1586    private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1587        new CountDownLatch(0));
1588
1589    @Override
1590    public Optional<RegionObserver> getRegionObserver() {
1591      return Optional.of(this);
1592    }
1593
1594    @Override
1595    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1596        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1597      slowdownCode(e, false);
1598      if (getLatch != null && getLatch.getCount() > 0) {
1599        try {
1600          getLatch.await();
1601        } catch (InterruptedException e1) {
1602        }
1603      }
1604      return hasMore;
1605    }
1606
1607    @Override
1608    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1609        List<Cell> results) throws IOException {
1610      slowdownCode(e, true);
1611    }
1612
1613    public static AtomicReference<CountDownLatch> getCdl() {
1614      return cdl;
1615    }
1616
1617    private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1618        boolean isGet) {
1619      CountDownLatch latch = getCdl().get();
1620      try {
1621        System.out.println(latch.getCount() + " is the count " + isGet);
1622        if (latch.getCount() > 0) {
1623          if (isGet) {
1624            countOfGets.incrementAndGet();
1625          } else {
1626            countOfNext.incrementAndGet();
1627          }
1628          LOG.info("Waiting for the counterCountDownLatch");
1629          latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1630          if (latch.getCount() > 0) {
1631            throw new RuntimeException("Can't wait more");
1632          }
1633        }
1634      } catch (InterruptedException e1) {
1635        LOG.error(e1.toString(), e1);
1636      }
1637    }
1638  }
1639}