001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.KeyValueTestUtil.create; 021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.NavigableSet; 029import java.util.Random; 030import java.util.TreeSet; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ThreadLocalRandom; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HColumnDescriptor; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionInfo; 043import org.apache.hadoop.hbase.HTableDescriptor; 044import org.apache.hadoop.hbase.KeepDeletedCells; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.io.hfile.CacheConfig; 050import org.apache.hadoop.hbase.io.hfile.HFileContext; 051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 052import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 053import org.apache.hadoop.hbase.testclassification.RegionServerTests; 054import org.apache.hadoop.hbase.testclassification.SmallTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * This test tests whether parallel {@link StoreScanner#close()} and 067 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring that there are no 068 * references on the existing Storescanner readers. 069 */ 070@Category({ RegionServerTests.class, SmallTests.class }) 071public class TestStoreScannerClosure { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestStoreScannerClosure.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); 078 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 079 @Rule 080 public TestName name = new TestName(); 081 private static final String CF_STR = "cf"; 082 private static HRegion region; 083 private static final byte[] CF = Bytes.toBytes(CF_STR); 084 static Configuration CONF = HBaseConfiguration.create(); 085 private static CacheConfig cacheConf; 086 private static FileSystem fs; 087 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 088 private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 089 KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); 090 private final static byte[] fam = Bytes.toBytes("cf_1"); 091 private static final KeyValue[] kvs = 092 new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), 093 create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), 094 create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), 095 create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), 096 create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), 097 create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), 098 create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), 099 create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), 100 create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), 101 create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; 102 103 @BeforeClass 104 public static void setUp() throws Exception { 105 CONF = TEST_UTIL.getConfiguration(); 106 cacheConf = new CacheConfig(CONF); 107 fs = TEST_UTIL.getTestFileSystem(); 108 TableName tableName = TableName.valueOf("test"); 109 HTableDescriptor htd = new HTableDescriptor(tableName); 110 htd.addFamily(new HColumnDescriptor(fam)); 111 HRegionInfo info = new HRegionInfo(tableName, null, null, false); 112 Path path = TEST_UTIL.getDataTestDir("test"); 113 region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); 114 } 115 116 @Test 117 public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception { 118 Put p = new Put(Bytes.toBytes("row")); 119 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 120 region.put(p); 121 // create the store scanner here. 122 // for easiness, use Long.MAX_VALUE as read pt 123 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 124 new Scan(), getCols("q1"), Long.MAX_VALUE)) { 125 p = new Put(Bytes.toBytes("row1")); 126 p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); 127 region.put(p); 128 HStore store = region.getStore(fam); 129 // use the lock to manually get a new memstore scanner. this is what 130 // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here 131 // since it is just a testcase). 132 store.getStoreEngine().readLock(); 133 final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE); 134 store.getStoreEngine().readUnlock(); 135 Thread closeThread = new Thread() { 136 public void run() { 137 // close should be completed 138 scan.close(false, true); 139 } 140 }; 141 closeThread.start(); 142 Thread updateThread = new Thread() { 143 public void run() { 144 try { 145 // use the updated memstoreScanners and pass it to updateReaders 146 scan.updateReaders(true, Collections.emptyList(), memScanners); 147 } catch (IOException e) { 148 e.printStackTrace(); 149 } 150 } 151 }; 152 updateThread.start(); 153 // wait for close and updateThread to complete 154 closeThread.join(); 155 updateThread.join(); 156 MemStoreLAB memStoreLAB; 157 for (KeyValueScanner scanner : memScanners) { 158 if (scanner instanceof SegmentScanner) { 159 memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB(); 160 if (memStoreLAB != null) { 161 // There should be no unpooled chunks 162 int refCount = ((MemStoreLABImpl) memStoreLAB).getRefCntValue(); 163 assertTrue("The memstore should not have unpooled chunks", refCount == 0); 164 } 165 } 166 } 167 } 168 } 169 170 @Test 171 public void testScannerCloseAndUpdateReaders1() throws Exception { 172 testScannerCloseAndUpdateReaderInternal(true, false); 173 } 174 175 @Test 176 public void testScannerCloseAndUpdateReaders2() throws Exception { 177 testScannerCloseAndUpdateReaderInternal(false, true); 178 } 179 180 private Path writeStoreFile() throws IOException { 181 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); 182 HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); 183 StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) 184 .withFileContext(meta).build(); 185 186 final int rowLen = 32; 187 Random rand = ThreadLocalRandom.current(); 188 for (int i = 0; i < 1000; ++i) { 189 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 190 byte[] v = RandomKeyValueUtil.randomValue(rand); 191 int cfLen = rand.nextInt(k.length - rowLen + 1); 192 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 193 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 194 sfw.append(kv); 195 } 196 197 sfw.close(); 198 return sfw.getPath(); 199 } 200 201 private static KeyValue.Type generateKeyType(Random rand) { 202 if (rand.nextBoolean()) { 203 // Let's make half of KVs puts. 204 return KeyValue.Type.Put; 205 } else { 206 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 207 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 208 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 209 + "Probably the layout of KeyValue.Type has changed."); 210 } 211 return keyType; 212 } 213 } 214 215 private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception { 216 // Open the file reader with block cache disabled. 217 HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true); 218 return file; 219 } 220 221 private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) 222 throws IOException, InterruptedException { 223 // start write to store file. 224 Path path = writeStoreFile(); 225 HStoreFile file = null; 226 List<HStoreFile> files = new ArrayList<HStoreFile>(); 227 try { 228 file = readStoreFile(path, CONF); 229 files.add(file); 230 } catch (Exception e) { 231 // fail test 232 assertTrue(false); 233 } 234 scanFixture(kvs); 235 // scanners.add(storeFileScanner); 236 try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, 237 new Scan(), getCols("a", "d"), 100L)) { 238 Thread closeThread = new Thread() { 239 public void run() { 240 scan.close(awaitClose, true); 241 } 242 }; 243 closeThread.start(); 244 Thread updateThread = new Thread() { 245 public void run() { 246 try { 247 scan.updateReaders(awaitUpdate, files, Collections.emptyList()); 248 } catch (IOException e) { 249 e.printStackTrace(); 250 } 251 } 252 }; 253 updateThread.start(); 254 // complete both the threads 255 closeThread.join(); 256 // complete both the threads 257 updateThread.join(); 258 if (file.getReader() != null) { 259 // the fileReader is not null when the updateReaders has completed first. 260 // in the other case the fileReader will be null. 261 int refCount = file.getReader().getRefCount(); 262 LOG.info("the store scanner count is " + refCount); 263 assertTrue("The store scanner count should be 0", refCount == 0); 264 } 265 } 266 } 267 268 private static class ExtendedStoreScanner extends StoreScanner { 269 private CountDownLatch latch = new CountDownLatch(1); 270 271 public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, 272 NavigableSet<byte[]> columns, long readPt) throws IOException { 273 super(store, scanInfo, scan, columns, readPt); 274 } 275 276 public void updateReaders(boolean await, List<HStoreFile> sfs, 277 List<KeyValueScanner> memStoreScanners) throws IOException { 278 if (await) { 279 try { 280 latch.await(); 281 } catch (InterruptedException e) { 282 // TODO Auto-generated catch block 283 e.printStackTrace(); 284 } 285 } 286 super.updateReaders(sfs, memStoreScanners); 287 if (!await) { 288 latch.countDown(); 289 } 290 } 291 292 // creating a dummy close 293 public void close(boolean await, boolean dummy) { 294 if (await) { 295 try { 296 latch.await(); 297 } catch (InterruptedException e) { 298 // TODO Auto-generated catch block 299 e.printStackTrace(); 300 } 301 } 302 super.close(); 303 if (!await) { 304 latch.countDown(); 305 } 306 } 307 } 308 309 NavigableSet<byte[]> getCols(String... strCols) { 310 NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); 311 for (String col : strCols) { 312 byte[] bytes = Bytes.toBytes(col); 313 cols.add(bytes); 314 } 315 return cols; 316 } 317}