001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.CountDownLatch;
028import java.util.concurrent.atomic.AtomicBoolean;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparatorImpl;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
037import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
038import org.apache.hadoop.hbase.coprocessor.ObserverContext;
039import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
041import org.apache.hadoop.hbase.coprocessor.RegionObserver;
042import org.apache.hadoop.hbase.io.hfile.BlockCache;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CachedBlock;
046import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HStore;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.regionserver.ScanType;
051import org.apache.hadoop.hbase.regionserver.ScannerContext;
052import org.apache.hadoop.hbase.regionserver.Store;
053import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
054import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.AfterClass;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
067
068@Category({ LargeTests.class, ClientTests.class })
069public class TestAvoidCellReferencesIntoShippedBlocks {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class);
074
075  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
076  static byte[][] ROWS = new byte[2][];
077  private static byte[] ROW = Bytes.toBytes("testRow");
078  private static byte[] ROW1 = Bytes.toBytes("testRow1");
079  private static byte[] ROW2 = Bytes.toBytes("testRow2");
080  private static byte[] ROW3 = Bytes.toBytes("testRow3");
081  private static byte[] ROW4 = Bytes.toBytes("testRow4");
082  private static byte[] ROW5 = Bytes.toBytes("testRow5");
083  private static byte[] FAMILY = Bytes.toBytes("testFamily");
084  private static byte[][] FAMILIES_1 = new byte[1][0];
085  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
086  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
087  private static byte[] data = new byte[1000];
088  protected static int SLAVES = 1;
089  private CountDownLatch latch = new CountDownLatch(1);
090  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
091  private static AtomicBoolean doScan = new AtomicBoolean(false);
092
093  @Rule
094  public TestName name = new TestName();
095
096  /**
097   * @throws java.lang.Exception
098   */
099  @BeforeClass
100  public static void setUpBeforeClass() throws Exception {
101    ROWS[0] = ROW;
102    ROWS[1] = ROW1;
103    Configuration conf = TEST_UTIL.getConfiguration();
104    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
105      MultiRowMutationEndpoint.class.getName());
106    conf.setBoolean("hbase.table.sanity.checks", true); // enable for below
107                                                        // tests
108    conf.setInt("hbase.regionserver.handler.count", 20);
109    conf.setInt("hbase.bucketcache.size", 400);
110    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
111    conf.setInt("hbase.hstore.compactionThreshold", 7);
112    conf.setFloat("hfile.block.cache.size", 0.2f);
113    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
114    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
115    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
116    FAMILIES_1[0] = FAMILY;
117    TEST_UTIL.startMiniCluster(SLAVES);
118    compactReadLatch = new CountDownLatch(1);
119  }
120
121  /**
122   * @throws java.lang.Exception
123   */
124  @AfterClass
125  public static void tearDownAfterClass() throws Exception {
126    TEST_UTIL.shutdownMiniCluster();
127  }
128
129  @Test
130  public void testHBase16372InCompactionWritePath() throws Exception {
131    final TableName tableName = TableName.valueOf(name.getMethodName());
132    // Create a table with block size as 1024
133    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
134      CompactorRegionObserver.class.getName());
135    try {
136      // get the block cache and region
137      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
138      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
139      HRegion region =
140          (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
141      HStore store = region.getStores().iterator().next();
142      CacheConfig cacheConf = store.getCacheConfig();
143      cacheConf.setCacheDataOnWrite(true);
144      cacheConf.setEvictOnClose(true);
145      final BlockCache cache = cacheConf.getBlockCache();
146      // insert data. 5 Rows are added
147      Put put = new Put(ROW);
148      put.addColumn(FAMILY, QUALIFIER, data);
149      table.put(put);
150      put = new Put(ROW);
151      put.addColumn(FAMILY, QUALIFIER1, data);
152      table.put(put);
153      put = new Put(ROW1);
154      put.addColumn(FAMILY, QUALIFIER, data);
155      table.put(put);
156      // data was in memstore so don't expect any changes
157      region.flush(true);
158      put = new Put(ROW1);
159      put.addColumn(FAMILY, QUALIFIER1, data);
160      table.put(put);
161      put = new Put(ROW2);
162      put.addColumn(FAMILY, QUALIFIER, data);
163      table.put(put);
164      put = new Put(ROW2);
165      put.addColumn(FAMILY, QUALIFIER1, data);
166      table.put(put);
167      // data was in memstore so don't expect any changes
168      region.flush(true);
169      put = new Put(ROW3);
170      put.addColumn(FAMILY, QUALIFIER, data);
171      table.put(put);
172      put = new Put(ROW3);
173      put.addColumn(FAMILY, QUALIFIER1, data);
174      table.put(put);
175      put = new Put(ROW4);
176      put.addColumn(FAMILY, QUALIFIER, data);
177      table.put(put);
178      // data was in memstore so don't expect any changes
179      region.flush(true);
180      put = new Put(ROW4);
181      put.addColumn(FAMILY, QUALIFIER1, data);
182      table.put(put);
183      put = new Put(ROW5);
184      put.addColumn(FAMILY, QUALIFIER, data);
185      table.put(put);
186      put = new Put(ROW5);
187      put.addColumn(FAMILY, QUALIFIER1, data);
188      table.put(put);
189      // data was in memstore so don't expect any changes
190      region.flush(true);
191      // Load cache
192      Scan s = new Scan();
193      s.setMaxResultSize(1000);
194      int count;
195      try (ResultScanner scanner = table.getScanner(s)) {
196        count = Iterables.size(scanner);
197      }
198      assertEquals("Count all the rows ", 6, count);
199      // all the cache is loaded
200      // trigger a major compaction
201      ScannerThread scannerThread = new ScannerThread(table, cache);
202      scannerThread.start();
203      region.compact(true);
204      s = new Scan();
205      s.setMaxResultSize(1000);
206      try (ResultScanner scanner = table.getScanner(s)) {
207        count = Iterables.size(scanner);
208      }
209      assertEquals("Count all the rows ", 6, count);
210    } finally {
211      table.close();
212    }
213  }
214
215  private static class ScannerThread extends Thread {
216    private final Table table;
217    private final BlockCache cache;
218
219    public ScannerThread(Table table, BlockCache cache) {
220      this.table = table;
221      this.cache = cache;
222    }
223
224    @Override
225    public void run() {
226      Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
227      try {
228        while(!doScan.get()) {
229          try {
230            // Sleep till you start scan
231            Thread.sleep(1);
232          } catch (InterruptedException e) {
233          }
234        }
235        List<BlockCacheKey> cacheList = new ArrayList<>();
236        Iterator<CachedBlock> iterator = cache.iterator();
237        // evict all the blocks
238        while (iterator.hasNext()) {
239          CachedBlock next = iterator.next();
240          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
241          cacheList.add(cacheKey);
242          // evict what ever is available
243          cache.evictBlock(cacheKey);
244        }
245        try (ResultScanner scanner = table.getScanner(s)) {
246          while (scanner.next() != null) {
247          }
248        }
249        compactReadLatch.countDown();
250      } catch (IOException e) {
251      }
252    }
253  }
254
255  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
256
257    @Override
258    public Optional<RegionObserver> getRegionObserver() {
259      return Optional.of(this);
260    }
261
262    @Override
263    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
264        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
265        CompactionRequest request) throws IOException {
266      return new CompactorInternalScanner(scanner);
267    }
268  }
269
270  private static final class CompactorInternalScanner extends DelegatingInternalScanner {
271
272    public CompactorInternalScanner(InternalScanner scanner) {
273      super(scanner);
274    }
275
276    @Override
277    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
278      boolean next = scanner.next(result, scannerContext);
279      for (Cell cell : result) {
280        if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
281          try {
282            // hold the compaction
283            // set doscan to true
284            doScan.compareAndSet(false, true);
285            compactReadLatch.await();
286          } catch (InterruptedException e) {
287          }
288        }
289      }
290      return next;
291    }
292  }
293
294  @Test
295  public void testHBASE16372InReadPath() throws Exception {
296    final TableName tableName = TableName.valueOf(name.getMethodName());
297    // Create a table with block size as 1024
298    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null);
299    try {
300      // get the block cache and region
301      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
302      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
303      HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName)
304          .getRegion(regionName);
305      HStore store = region.getStores().iterator().next();
306      CacheConfig cacheConf = store.getCacheConfig();
307      cacheConf.setCacheDataOnWrite(true);
308      cacheConf.setEvictOnClose(true);
309      final BlockCache cache = cacheConf.getBlockCache();
310      // insert data. 5 Rows are added
311      Put put = new Put(ROW);
312      put.addColumn(FAMILY, QUALIFIER, data);
313      table.put(put);
314      put = new Put(ROW);
315      put.addColumn(FAMILY, QUALIFIER1, data);
316      table.put(put);
317      put = new Put(ROW1);
318      put.addColumn(FAMILY, QUALIFIER, data);
319      table.put(put);
320      put = new Put(ROW1);
321      put.addColumn(FAMILY, QUALIFIER1, data);
322      table.put(put);
323      put = new Put(ROW2);
324      put.addColumn(FAMILY, QUALIFIER, data);
325      table.put(put);
326      put = new Put(ROW2);
327      put.addColumn(FAMILY, QUALIFIER1, data);
328      table.put(put);
329      put = new Put(ROW3);
330      put.addColumn(FAMILY, QUALIFIER, data);
331      table.put(put);
332      put = new Put(ROW3);
333      put.addColumn(FAMILY, QUALIFIER1, data);
334      table.put(put);
335      put = new Put(ROW4);
336      put.addColumn(FAMILY, QUALIFIER, data);
337      table.put(put);
338      put = new Put(ROW4);
339      put.addColumn(FAMILY, QUALIFIER1, data);
340      table.put(put);
341      put = new Put(ROW5);
342      put.addColumn(FAMILY, QUALIFIER, data);
343      table.put(put);
344      put = new Put(ROW5);
345      put.addColumn(FAMILY, QUALIFIER1, data);
346      table.put(put);
347      // data was in memstore so don't expect any changes
348      region.flush(true);
349      // Load cache
350      Scan s = new Scan();
351      s.setMaxResultSize(1000);
352      int count;
353      try (ResultScanner scanner = table.getScanner(s)) {
354        count = Iterables.size(scanner);
355      }
356      assertEquals("Count all the rows ", 6, count);
357
358      // Scan from cache
359      s = new Scan();
360      // Start a scan from row3
361      s.setCaching(1);
362      s.withStartRow(ROW1);
363      // set partial as true so that the scan can send partial columns also
364      s.setAllowPartialResults(true);
365      s.setMaxResultSize(1000);
366      try (ResultScanner scanner = table.getScanner(s)) {
367        Thread evictorThread = new Thread() {
368          @Override
369          public void run() {
370            List<BlockCacheKey> cacheList = new ArrayList<>();
371            Iterator<CachedBlock> iterator = cache.iterator();
372            // evict all the blocks
373            while (iterator.hasNext()) {
374              CachedBlock next = iterator.next();
375              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
376              cacheList.add(cacheKey);
377              cache.evictBlock(cacheKey);
378            }
379            try {
380              Thread.sleep(1);
381            } catch (InterruptedException e1) {
382            }
383            iterator = cache.iterator();
384            int refBlockCount = 0;
385            while (iterator.hasNext()) {
386              iterator.next();
387              refBlockCount++;
388            }
389            assertEquals("One block should be there ", 1, refBlockCount);
390            // Rescan to prepopulate the data
391            // cache this row.
392            Scan s1 = new Scan();
393            // This scan will start from ROW1 and it will populate the cache with a
394            // row that is lower than ROW3.
395            s1.withStartRow(ROW3);
396            s1.withStopRow(ROW5);
397            s1.setCaching(1);
398            ResultScanner scanner;
399            try {
400              scanner = table.getScanner(s1);
401              int count = Iterables.size(scanner);
402              assertEquals("Count the rows", 2, count);
403              int newBlockRefCount = 0;
404              List<BlockCacheKey> newCacheList = new ArrayList<>();
405              while (true) {
406                newBlockRefCount = 0;
407                newCacheList.clear();
408                iterator = cache.iterator();
409                while (iterator.hasNext()) {
410                  CachedBlock next = iterator.next();
411                  BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
412                  newCacheList.add(cacheKey);
413                }
414                for (BlockCacheKey key : cacheList) {
415                  if (newCacheList.contains(key)) {
416                    newBlockRefCount++;
417                  }
418                }
419                if (newBlockRefCount == 6) {
420                  break;
421                }
422              }
423              latch.countDown();
424            } catch (IOException e) {
425            }
426          }
427        };
428        count = 0;
429        while (scanner.next() != null) {
430          count++;
431          if (count == 2) {
432            evictorThread.start();
433            latch.await();
434          }
435        }
436      }
437      assertEquals("Count should give all rows ", 10, count);
438    } finally {
439      table.close();
440    }
441  }
442}