001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.KeyValueTestUtil.create; 021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.NavigableSet; 029import java.util.Random; 030import java.util.TreeSet; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.locks.ReentrantReadWriteLock; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.CellComparatorImpl; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HColumnDescriptor; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HRegionInfo; 045import org.apache.hadoop.hbase.HTableDescriptor; 046import org.apache.hadoop.hbase.KeepDeletedCells; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.io.hfile.CacheConfig; 052import org.apache.hadoop.hbase.io.hfile.HFileContext; 053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 054import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.testclassification.RegionServerTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066/** 067 * This test tests whether parallel {@link StoreScanner#close()} and 068 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring 069 * that there are no references on the existing Storescanner readers. 070 */ 071@Category({ RegionServerTests.class, MediumTests.class }) 072public class TestStoreScannerClosure { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestStoreScannerClosure.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); 079 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 080 @Rule 081 public TestName name = new TestName(); 082 private static final String CF_STR = "cf"; 083 private static HRegion region; 084 private static final byte[] CF = Bytes.toBytes(CF_STR); 085 static Configuration CONF = HBaseConfiguration.create(); 086 private static CacheConfig cacheConf; 087 private static FileSystem fs; 088 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 089 private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString(); 090 private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 091 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); 092 private final static byte[] fam = Bytes.toBytes("cf_1"); 093 private static final KeyValue[] kvs = 094 new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), 095 create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), 096 create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), 097 create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), 098 create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), 099 create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), 100 create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), 101 create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), 102 create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), 103 create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; 104 105 @BeforeClass 106 public static void setUp() throws Exception { 107 CONF = TEST_UTIL.getConfiguration(); 108 cacheConf = new CacheConfig(CONF); 109 fs = TEST_UTIL.getTestFileSystem(); 110 TableName tableName = TableName.valueOf("test"); 111 HTableDescriptor htd = new HTableDescriptor(tableName); 112 htd.addFamily(new HColumnDescriptor(fam)); 113 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 114 Path path = TEST_UTIL.getDataTestDir("test"); 115 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 116 } 117 118 @Test 119 public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception { 120 Put p = new Put(Bytes.toBytes("row")); 121 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 122 region.put(p); 123 // create the store scanner here. 124 // for easiness, use Long.MAX_VALUE as read pt 125 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 126 new Scan(), getCols("q1"), Long.MAX_VALUE)) { 127 p = new Put(Bytes.toBytes("row1")); 128 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 129 region.put(p); 130 HStore store = region.getStore(fam); 131 ReentrantReadWriteLock lock = store.lock; 132 // use the lock to manually get a new memstore scanner. this is what 133 // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here 134 //since it is just a testcase). 135 lock.readLock().lock(); 136 final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE); 137 lock.readLock().unlock(); 138 Thread closeThread = new Thread() { 139 public void run() { 140 // close should be completed 141 scan.close(false, true); 142 } 143 }; 144 closeThread.start(); 145 Thread updateThread = new Thread() { 146 public void run() { 147 try { 148 // use the updated memstoreScanners and pass it to updateReaders 149 scan.updateReaders(true, Collections.emptyList(), memScanners); 150 } catch (IOException e) { 151 e.printStackTrace(); 152 } 153 } 154 }; 155 updateThread.start(); 156 // wait for close and updateThread to complete 157 closeThread.join(); 158 updateThread.join(); 159 MemStoreLAB memStoreLAB; 160 for (KeyValueScanner scanner : memScanners) { 161 if (scanner instanceof SegmentScanner) { 162 memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB(); 163 if (memStoreLAB != null) { 164 // There should be no unpooled chunks 165 int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount(); 166 assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0); 167 } 168 } 169 } 170 } 171 } 172 173 @Test 174 public void testScannerCloseAndUpdateReaders1() throws Exception { 175 testScannerCloseAndUpdateReaderInternal(true, false); 176 } 177 178 @Test 179 public void testScannerCloseAndUpdateReaders2() throws Exception { 180 testScannerCloseAndUpdateReaderInternal(false, true); 181 } 182 183 private Path writeStoreFile() throws IOException { 184 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 185 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 186 StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) 187 .withComparator(CellComparatorImpl.COMPARATOR).withFileContext(meta).build(); 188 189 final int rowLen = 32; 190 Random RNG = new Random(); 191 for (int i = 0; i < 1000; ++i) { 192 byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i); 193 byte[] v = RandomKeyValueUtil.randomValue(RNG); 194 int cfLen = RNG.nextInt(k.length - rowLen + 1); 195 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 196 k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length); 197 sfw.append(kv); 198 } 199 200 sfw.close(); 201 return sfw.getPath(); 202 } 203 204 private static KeyValue.Type generateKeyType(Random rand) { 205 if (rand.nextBoolean()) { 206 // Let's make half of KVs puts. 207 return KeyValue.Type.Put; 208 } else { 209 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 210 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 211 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 212 + "Probably the layout of KeyValue.Type has changed."); 213 } 214 return keyType; 215 } 216 } 217 218 private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception { 219 // Open the file reader with block cache disabled. 220 HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true); 221 return file; 222 } 223 224 private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) 225 throws IOException, InterruptedException { 226 // start write to store file. 227 Path path = writeStoreFile(); 228 HStoreFile file = null; 229 List<HStoreFile> files = new ArrayList<HStoreFile>(); 230 try { 231 file = readStoreFile(path, CONF); 232 files.add(file); 233 } catch (Exception e) { 234 // fail test 235 assertTrue(false); 236 } 237 scanFixture(kvs); 238 // scanners.add(storeFileScanner); 239 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 240 new Scan(), getCols("a", "d"), 100L)) { 241 Thread closeThread = new Thread() { 242 public void run() { 243 scan.close(awaitClose, true); 244 } 245 }; 246 closeThread.start(); 247 Thread updateThread = new Thread() { 248 public void run() { 249 try { 250 scan.updateReaders(awaitUpdate, files, Collections.emptyList()); 251 } catch (IOException e) { 252 e.printStackTrace(); 253 } 254 } 255 }; 256 updateThread.start(); 257 // complete both the threads 258 closeThread.join(); 259 // complete both the threads 260 updateThread.join(); 261 if (file.getReader() != null) { 262 // the fileReader is not null when the updateReaders has completed first. 263 // in the other case the fileReader will be null. 264 int refCount = file.getReader().getRefCount(); 265 LOG.info("the store scanner count is " + refCount); 266 assertTrue("The store scanner count should be 0", refCount == 0); 267 } 268 } 269 } 270 271 private static class ExtendedStoreScanner extends StoreScanner { 272 private CountDownLatch latch = new CountDownLatch(1); 273 274 public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, 275 NavigableSet<byte[]> columns, long readPt) throws IOException { 276 super(store, scanInfo, scan, columns, readPt); 277 } 278 279 public void updateReaders(boolean await, List<HStoreFile> sfs, 280 List<KeyValueScanner> memStoreScanners) throws IOException { 281 if (await) { 282 try { 283 latch.await(); 284 } catch (InterruptedException e) { 285 // TODO Auto-generated catch block 286 e.printStackTrace(); 287 } 288 } 289 super.updateReaders(sfs, memStoreScanners); 290 if (!await) { 291 latch.countDown(); 292 } 293 } 294 295 // creating a dummy close 296 public void close(boolean await, boolean dummy) { 297 if (await) { 298 try { 299 latch.await(); 300 } catch (InterruptedException e) { 301 // TODO Auto-generated catch block 302 e.printStackTrace(); 303 } 304 } 305 super.close(); 306 if (!await) { 307 latch.countDown(); 308 } 309 } 310 } 311 312 NavigableSet<byte[]> getCols(String... strCols) { 313 NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); 314 for (String col : strCols) { 315 byte[] bytes = Bytes.toBytes(col); 316 cols.add(bytes); 317 } 318 return cols; 319 } 320}