001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
038import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
039import org.apache.hadoop.hbase.coprocessor.ObserverContext;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.RegionObserver;
043import org.apache.hadoop.hbase.io.hfile.BlockCache;
044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
045import org.apache.hadoop.hbase.io.hfile.CacheConfig;
046import org.apache.hadoop.hbase.io.hfile.CachedBlock;
047import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
049import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.InternalScanner;
053import org.apache.hadoop.hbase.regionserver.ScanType;
054import org.apache.hadoop.hbase.regionserver.ScannerContext;
055import org.apache.hadoop.hbase.regionserver.Store;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
058import org.apache.hadoop.hbase.testclassification.ClientTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
070
071@Category({ LargeTests.class, ClientTests.class })
072public class TestAvoidCellReferencesIntoShippedBlocks {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class);
077
078  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
079  static byte[][] ROWS = new byte[2][];
080  private static byte[] ROW = Bytes.toBytes("testRow");
081  private static byte[] ROW1 = Bytes.toBytes("testRow1");
082  private static byte[] ROW2 = Bytes.toBytes("testRow2");
083  private static byte[] ROW3 = Bytes.toBytes("testRow3");
084  private static byte[] ROW4 = Bytes.toBytes("testRow4");
085  private static byte[] ROW5 = Bytes.toBytes("testRow5");
086  private static byte[] FAMILY = Bytes.toBytes("testFamily");
087  private static byte[][] FAMILIES_1 = new byte[1][0];
088  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
089  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
090  private static byte[] data = new byte[1000];
091  protected static int SLAVES = 1;
092  private CountDownLatch latch = new CountDownLatch(1);
093  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
094  private static AtomicBoolean doScan = new AtomicBoolean(false);
095
096  @Rule
097  public TestName name = new TestName();
098
099  /**
100   * @throws java.lang.Exception
101   */
102  @BeforeClass
103  public static void setUpBeforeClass() throws Exception {
104    ROWS[0] = ROW;
105    ROWS[1] = ROW1;
106    Configuration conf = TEST_UTIL.getConfiguration();
107    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
108      MultiRowMutationEndpoint.class.getName());
109    conf.setInt("hbase.regionserver.handler.count", 20);
110    conf.setInt("hbase.bucketcache.size", 400);
111    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
112    conf.setInt("hbase.hstore.compactionThreshold", 7);
113    conf.setFloat("hfile.block.cache.size", 0.2f);
114    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
115    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
116    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
117    FAMILIES_1[0] = FAMILY;
118    TEST_UTIL.startMiniCluster(SLAVES);
119    compactReadLatch = new CountDownLatch(1);
120  }
121
122  /**
123   * @throws java.lang.Exception
124   */
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    TEST_UTIL.shutdownMiniCluster();
128  }
129
130  @Test
131  public void testHBase16372InCompactionWritePath() throws Exception {
132    final TableName tableName = TableName.valueOf(name.getMethodName());
133    // Create a table with block size as 1024
134    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
135      CompactorRegionObserver.class.getName());
136    try {
137      // get the block cache and region
138      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
139      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
140      HRegion region =
141        (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
142      HStore store = region.getStores().iterator().next();
143      CacheConfig cacheConf = store.getCacheConfig();
144      cacheConf.setCacheDataOnWrite(true);
145      cacheConf.setEvictOnClose(true);
146      final BlockCache cache = cacheConf.getBlockCache().get();
147      // insert data. 5 Rows are added
148      Put put = new Put(ROW);
149      put.addColumn(FAMILY, QUALIFIER, data);
150      table.put(put);
151      put = new Put(ROW);
152      put.addColumn(FAMILY, QUALIFIER1, data);
153      table.put(put);
154      put = new Put(ROW1);
155      put.addColumn(FAMILY, QUALIFIER, data);
156      table.put(put);
157      // data was in memstore so don't expect any changes
158      region.flush(true);
159      put = new Put(ROW1);
160      put.addColumn(FAMILY, QUALIFIER1, data);
161      table.put(put);
162      put = new Put(ROW2);
163      put.addColumn(FAMILY, QUALIFIER, data);
164      table.put(put);
165      put = new Put(ROW2);
166      put.addColumn(FAMILY, QUALIFIER1, data);
167      table.put(put);
168      // data was in memstore so don't expect any changes
169      region.flush(true);
170      put = new Put(ROW3);
171      put.addColumn(FAMILY, QUALIFIER, data);
172      table.put(put);
173      put = new Put(ROW3);
174      put.addColumn(FAMILY, QUALIFIER1, data);
175      table.put(put);
176      put = new Put(ROW4);
177      put.addColumn(FAMILY, QUALIFIER, data);
178      table.put(put);
179      // data was in memstore so don't expect any changes
180      region.flush(true);
181      put = new Put(ROW4);
182      put.addColumn(FAMILY, QUALIFIER1, data);
183      table.put(put);
184      put = new Put(ROW5);
185      put.addColumn(FAMILY, QUALIFIER, data);
186      table.put(put);
187      put = new Put(ROW5);
188      put.addColumn(FAMILY, QUALIFIER1, data);
189      table.put(put);
190      // data was in memstore so don't expect any changes
191      region.flush(true);
192      // Load cache
193      Scan s = new Scan();
194      s.setMaxResultSize(1000);
195      int count;
196      try (ResultScanner scanner = table.getScanner(s)) {
197        count = Iterables.size(scanner);
198      }
199      assertEquals("Count all the rows ", 6, count);
200      // all the cache is loaded
201      // trigger a major compaction
202      ScannerThread scannerThread = new ScannerThread(table, cache);
203      scannerThread.start();
204      region.compact(true);
205      s = new Scan();
206      s.setMaxResultSize(1000);
207      try (ResultScanner scanner = table.getScanner(s)) {
208        count = Iterables.size(scanner);
209      }
210      assertEquals("Count all the rows ", 6, count);
211    } finally {
212      table.close();
213    }
214  }
215
216  private static class ScannerThread extends Thread {
217    private final Table table;
218    private final BlockCache cache;
219
220    public ScannerThread(Table table, BlockCache cache) {
221      this.table = table;
222      this.cache = cache;
223    }
224
225    @Override
226    public void run() {
227      Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
228      try {
229        while (!doScan.get()) {
230          try {
231            // Sleep till you start scan
232            Thread.sleep(1);
233          } catch (InterruptedException e) {
234          }
235        }
236        List<BlockCacheKey> cacheList = new ArrayList<>();
237        Iterator<CachedBlock> iterator = cache.iterator();
238        // evict all the blocks
239        while (iterator.hasNext()) {
240          CachedBlock next = iterator.next();
241          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
242          cacheList.add(cacheKey);
243          // evict what ever is available
244          cache.evictBlock(cacheKey);
245        }
246        try (ResultScanner scanner = table.getScanner(s)) {
247          while (scanner.next() != null) {
248          }
249        }
250        compactReadLatch.countDown();
251      } catch (IOException e) {
252      }
253    }
254  }
255
256  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
257
258    @Override
259    public Optional<RegionObserver> getRegionObserver() {
260      return Optional.of(this);
261    }
262
263    @Override
264    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
265      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
266      CompactionRequest request) throws IOException {
267      return new CompactorInternalScanner(scanner);
268    }
269  }
270
271  private static final class CompactorInternalScanner extends DelegatingInternalScanner {
272
273    public CompactorInternalScanner(InternalScanner scanner) {
274      super(scanner);
275    }
276
277    @Override
278    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
279      boolean next = scanner.next(result, scannerContext);
280      for (Cell cell : result) {
281        if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
282          try {
283            // hold the compaction
284            // set doscan to true
285            doScan.compareAndSet(false, true);
286            compactReadLatch.await();
287          } catch (InterruptedException e) {
288          }
289        }
290      }
291      return next;
292    }
293  }
294
295  @Test
296  public void testHBASE16372InReadPath() throws Exception {
297    final TableName tableName = TableName.valueOf(name.getMethodName());
298    // Create a table with block size as 1024
299    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null);
300    try {
301      // get the block cache and region
302      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
303      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
304      HRegion region =
305        (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
306      HStore store = region.getStores().iterator().next();
307      CacheConfig cacheConf = store.getCacheConfig();
308      cacheConf.setCacheDataOnWrite(true);
309      cacheConf.setEvictOnClose(true);
310      final BlockCache cache = cacheConf.getBlockCache().get();
311      // insert data. 5 Rows are added
312      Put put = new Put(ROW);
313      put.addColumn(FAMILY, QUALIFIER, data);
314      table.put(put);
315      put = new Put(ROW);
316      put.addColumn(FAMILY, QUALIFIER1, data);
317      table.put(put);
318      put = new Put(ROW1);
319      put.addColumn(FAMILY, QUALIFIER, data);
320      table.put(put);
321      put = new Put(ROW1);
322      put.addColumn(FAMILY, QUALIFIER1, data);
323      table.put(put);
324      put = new Put(ROW2);
325      put.addColumn(FAMILY, QUALIFIER, data);
326      table.put(put);
327      put = new Put(ROW2);
328      put.addColumn(FAMILY, QUALIFIER1, data);
329      table.put(put);
330      put = new Put(ROW3);
331      put.addColumn(FAMILY, QUALIFIER, data);
332      table.put(put);
333      put = new Put(ROW3);
334      put.addColumn(FAMILY, QUALIFIER1, data);
335      table.put(put);
336      put = new Put(ROW4);
337      put.addColumn(FAMILY, QUALIFIER, data);
338      table.put(put);
339      put = new Put(ROW4);
340      put.addColumn(FAMILY, QUALIFIER1, data);
341      table.put(put);
342      put = new Put(ROW5);
343      put.addColumn(FAMILY, QUALIFIER, data);
344      table.put(put);
345      put = new Put(ROW5);
346      put.addColumn(FAMILY, QUALIFIER1, data);
347      table.put(put);
348      // data was in memstore so don't expect any changes
349      region.flush(true);
350      // Load cache
351      Scan s = new Scan();
352      s.setMaxResultSize(1000);
353      int count;
354      try (ResultScanner scanner = table.getScanner(s)) {
355        count = Iterables.size(scanner);
356      }
357      assertEquals("Count all the rows ", 6, count);
358
359      // Scan from cache
360      s = new Scan();
361      // Start a scan from row3
362      s.setCaching(1);
363      s.withStartRow(ROW1);
364      // set partial as true so that the scan can send partial columns also
365      s.setAllowPartialResults(true);
366      s.setMaxResultSize(1000);
367      try (ResultScanner scanner = table.getScanner(s)) {
368        Thread evictorThread = new Thread() {
369          @Override
370          public void run() {
371            List<BlockCacheKey> cacheList = new ArrayList<>();
372            Iterator<CachedBlock> iterator = cache.iterator();
373            // evict all the blocks
374            while (iterator.hasNext()) {
375              CachedBlock next = iterator.next();
376              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
377              cacheList.add(cacheKey);
378              /**
379               * There is only one Block referenced by rpc,here we evict blocks which have no rpc
380               * referenced.
381               */
382              evictBlock(cache, cacheKey);
383            }
384            try {
385              Thread.sleep(1);
386            } catch (InterruptedException e1) {
387            }
388            iterator = cache.iterator();
389            int refBlockCount = 0;
390            while (iterator.hasNext()) {
391              iterator.next();
392              refBlockCount++;
393            }
394            assertEquals("One block should be there ", 1, refBlockCount);
395            // Rescan to prepopulate the data
396            // cache this row.
397            Scan s1 = new Scan();
398            // This scan will start from ROW1 and it will populate the cache with a
399            // row that is lower than ROW3.
400            s1.withStartRow(ROW3);
401            s1.withStopRow(ROW5);
402            s1.setCaching(1);
403            ResultScanner scanner;
404            try {
405              scanner = table.getScanner(s1);
406              int count = Iterables.size(scanner);
407              assertEquals("Count the rows", 2, count);
408              int newBlockRefCount = 0;
409              List<BlockCacheKey> newCacheList = new ArrayList<>();
410              while (true) {
411                newBlockRefCount = 0;
412                newCacheList.clear();
413                iterator = cache.iterator();
414                while (iterator.hasNext()) {
415                  CachedBlock next = iterator.next();
416                  BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
417                  newCacheList.add(cacheKey);
418                }
419                for (BlockCacheKey key : cacheList) {
420                  if (newCacheList.contains(key)) {
421                    newBlockRefCount++;
422                  }
423                }
424                if (newBlockRefCount == 6) {
425                  break;
426                }
427              }
428              latch.countDown();
429            } catch (IOException e) {
430            }
431          }
432        };
433        count = 0;
434        while (scanner.next() != null) {
435          count++;
436          if (count == 2) {
437            evictorThread.start();
438            latch.await();
439          }
440        }
441      }
442      assertEquals("Count should give all rows ", 10, count);
443    } finally {
444      table.close();
445    }
446  }
447
448  /**
449   * For {@link BucketCache},we only evict Block if there is no rpc referenced.
450   */
451  private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
452    assertTrue(blockCache instanceof CombinedBlockCache);
453    BlockCache[] blockCaches = blockCache.getBlockCaches();
454    for (BlockCache currentBlockCache : blockCaches) {
455      if (currentBlockCache instanceof BucketCache) {
456        ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
457      } else {
458        currentBlockCache.evictBlock(blockCacheKey);
459      }
460    }
461
462  }
463}