001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.KeyValueTestUtil.create; 021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.NavigableSet; 029import java.util.Random; 030import java.util.TreeSet; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.locks.ReentrantReadWriteLock; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HColumnDescriptor; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionInfo; 043import org.apache.hadoop.hbase.HTableDescriptor; 044import org.apache.hadoop.hbase.KeepDeletedCells; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFileContext; 051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 052import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 053import org.apache.hadoop.hbase.testclassification.RegionServerTests; 054import org.apache.hadoop.hbase.testclassification.SmallTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064/** 065 * This test tests whether parallel {@link StoreScanner#close()} and 066 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring 067 * that there are no references on the existing Storescanner readers. 068 */ 069@Category({ RegionServerTests.class, SmallTests.class }) 070public class TestStoreScannerClosure { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestStoreScannerClosure.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); 077 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 078 @Rule 079 public TestName name = new TestName(); 080 private static final String CF_STR = "cf"; 081 private static HRegion region; 082 private static final byte[] CF = Bytes.toBytes(CF_STR); 083 static Configuration CONF = HBaseConfiguration.create(); 084 private static CacheConfig cacheConf; 085 private static FileSystem fs; 086 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 087 private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString(); 088 private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 089 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); 090 private final static byte[] fam = Bytes.toBytes("cf_1"); 091 private static final KeyValue[] kvs = 092 new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), 093 create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), 094 create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), 095 create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), 096 create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), 097 create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), 098 create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), 099 create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), 100 create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), 101 create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; 102 103 @BeforeClass 104 public static void setUp() throws Exception { 105 CONF = TEST_UTIL.getConfiguration(); 106 cacheConf = new CacheConfig(CONF); 107 fs = TEST_UTIL.getTestFileSystem(); 108 TableName tableName = TableName.valueOf("test"); 109 HTableDescriptor htd = new HTableDescriptor(tableName); 110 htd.addFamily(new HColumnDescriptor(fam)); 111 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 112 Path path = TEST_UTIL.getDataTestDir("test"); 113 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 114 } 115 116 @Test 117 public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception { 118 Put p = new Put(Bytes.toBytes("row")); 119 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 120 region.put(p); 121 // create the store scanner here. 122 // for easiness, use Long.MAX_VALUE as read pt 123 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 124 new Scan(), getCols("q1"), Long.MAX_VALUE)) { 125 p = new Put(Bytes.toBytes("row1")); 126 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 127 region.put(p); 128 HStore store = region.getStore(fam); 129 ReentrantReadWriteLock lock = store.lock; 130 // use the lock to manually get a new memstore scanner. this is what 131 // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here 132 //since it is just a testcase). 133 lock.readLock().lock(); 134 final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE); 135 lock.readLock().unlock(); 136 Thread closeThread = new Thread() { 137 public void run() { 138 // close should be completed 139 scan.close(false, true); 140 } 141 }; 142 closeThread.start(); 143 Thread updateThread = new Thread() { 144 public void run() { 145 try { 146 // use the updated memstoreScanners and pass it to updateReaders 147 scan.updateReaders(true, Collections.emptyList(), memScanners); 148 } catch (IOException e) { 149 e.printStackTrace(); 150 } 151 } 152 }; 153 updateThread.start(); 154 // wait for close and updateThread to complete 155 closeThread.join(); 156 updateThread.join(); 157 MemStoreLAB memStoreLAB; 158 for (KeyValueScanner scanner : memScanners) { 159 if (scanner instanceof SegmentScanner) { 160 memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB(); 161 if (memStoreLAB != null) { 162 // There should be no unpooled chunks 163 int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount(); 164 assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0); 165 } 166 } 167 } 168 } 169 } 170 171 @Test 172 public void testScannerCloseAndUpdateReaders1() throws Exception { 173 testScannerCloseAndUpdateReaderInternal(true, false); 174 } 175 176 @Test 177 public void testScannerCloseAndUpdateReaders2() throws Exception { 178 testScannerCloseAndUpdateReaderInternal(false, true); 179 } 180 181 private Path writeStoreFile() throws IOException { 182 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 183 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 184 StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) 185 .withFileContext(meta).build(); 186 187 final int rowLen = 32; 188 Random RNG = new Random(); 189 for (int i = 0; i < 1000; ++i) { 190 byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i); 191 byte[] v = RandomKeyValueUtil.randomValue(RNG); 192 int cfLen = RNG.nextInt(k.length - rowLen + 1); 193 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 194 k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length); 195 sfw.append(kv); 196 } 197 198 sfw.close(); 199 return sfw.getPath(); 200 } 201 202 private static KeyValue.Type generateKeyType(Random rand) { 203 if (rand.nextBoolean()) { 204 // Let's make half of KVs puts. 205 return KeyValue.Type.Put; 206 } else { 207 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 208 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 209 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 210 + "Probably the layout of KeyValue.Type has changed."); 211 } 212 return keyType; 213 } 214 } 215 216 private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception { 217 // Open the file reader with block cache disabled. 218 HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true); 219 return file; 220 } 221 222 private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) 223 throws IOException, InterruptedException { 224 // start write to store file. 225 Path path = writeStoreFile(); 226 HStoreFile file = null; 227 List<HStoreFile> files = new ArrayList<HStoreFile>(); 228 try { 229 file = readStoreFile(path, CONF); 230 files.add(file); 231 } catch (Exception e) { 232 // fail test 233 assertTrue(false); 234 } 235 scanFixture(kvs); 236 // scanners.add(storeFileScanner); 237 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 238 new Scan(), getCols("a", "d"), 100L)) { 239 Thread closeThread = new Thread() { 240 public void run() { 241 scan.close(awaitClose, true); 242 } 243 }; 244 closeThread.start(); 245 Thread updateThread = new Thread() { 246 public void run() { 247 try { 248 scan.updateReaders(awaitUpdate, files, Collections.emptyList()); 249 } catch (IOException e) { 250 e.printStackTrace(); 251 } 252 } 253 }; 254 updateThread.start(); 255 // complete both the threads 256 closeThread.join(); 257 // complete both the threads 258 updateThread.join(); 259 if (file.getReader() != null) { 260 // the fileReader is not null when the updateReaders has completed first. 261 // in the other case the fileReader will be null. 262 int refCount = file.getReader().getRefCount(); 263 LOG.info("the store scanner count is " + refCount); 264 assertTrue("The store scanner count should be 0", refCount == 0); 265 } 266 } 267 } 268 269 private static class ExtendedStoreScanner extends StoreScanner { 270 private CountDownLatch latch = new CountDownLatch(1); 271 272 public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, 273 NavigableSet<byte[]> columns, long readPt) throws IOException { 274 super(store, scanInfo, scan, columns, readPt); 275 } 276 277 public void updateReaders(boolean await, List<HStoreFile> sfs, 278 List<KeyValueScanner> memStoreScanners) throws IOException { 279 if (await) { 280 try { 281 latch.await(); 282 } catch (InterruptedException e) { 283 // TODO Auto-generated catch block 284 e.printStackTrace(); 285 } 286 } 287 super.updateReaders(sfs, memStoreScanners); 288 if (!await) { 289 latch.countDown(); 290 } 291 } 292 293 // creating a dummy close 294 public void close(boolean await, boolean dummy) { 295 if (await) { 296 try { 297 latch.await(); 298 } catch (InterruptedException e) { 299 // TODO Auto-generated catch block 300 e.printStackTrace(); 301 } 302 } 303 super.close(); 304 if (!await) { 305 latch.countDown(); 306 } 307 } 308 } 309 310 NavigableSet<byte[]> getCols(String... strCols) { 311 NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); 312 for (String col : strCols) { 313 byte[] bytes = Bytes.toBytes(col); 314 cols.add(bytes); 315 } 316 return cols; 317 } 318}