001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.KeyValueTestUtil.create;
021import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.NavigableSet;
029import java.util.Random;
030import java.util.TreeSet;
031import java.util.concurrent.CountDownLatch;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.CellComparator;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HColumnDescriptor;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionInfo;
044import org.apache.hadoop.hbase.HTableDescriptor;
045import org.apache.hadoop.hbase.KeepDeletedCells;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.io.hfile.CacheConfig;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.RegionServerTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064/**
065 * This test tests whether parallel {@link StoreScanner#close()} and
066 * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring
067 * that there are no references on the existing Storescanner readers.
068 */
069@Category({ RegionServerTests.class, MediumTests.class })
070public class TestStoreScannerClosure {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestStoreScannerClosure.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class);
077  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
078  @Rule
079  public TestName name = new TestName();
080  private static final String CF_STR = "cf";
081  private static HRegion region;
082  private static final byte[] CF = Bytes.toBytes(CF_STR);
083  static Configuration CONF = HBaseConfiguration.create();
084  private static CacheConfig cacheConf;
085  private static FileSystem fs;
086  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
087  private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString();
088  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
089      KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
090  private final static byte[] fam = Bytes.toBytes("cf_1");
091  private static final KeyValue[] kvs =
092      new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
093          create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
094          create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
095          create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
096          create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
097          create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
098          create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
099          create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
100          create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
101          create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), };
102
103  @BeforeClass
104  public static void setUp() throws Exception {
105    CONF = TEST_UTIL.getConfiguration();
106    cacheConf = new CacheConfig(CONF);
107    fs = TEST_UTIL.getTestFileSystem();
108    TableName tableName = TableName.valueOf("test");
109    HTableDescriptor htd = new HTableDescriptor(tableName);
110    htd.addFamily(new HColumnDescriptor(fam));
111    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
112    Path path = TEST_UTIL.getDataTestDir("test");
113    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
114  }
115
116  @Test
117  public void testScannerCloseAndUpdateReaders1() throws Exception {
118    testScannerCloseAndUpdateReaderInternal(true, false);
119  }
120
121  @Test
122  public void testScannerCloseAndUpdateReaders2() throws Exception {
123    testScannerCloseAndUpdateReaderInternal(false, true);
124  }
125
126  private Path writeStoreFile() throws IOException {
127    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile");
128    HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build();
129    StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir)
130        .withComparator(CellComparatorImpl.COMPARATOR).withFileContext(meta).build();
131
132    final int rowLen = 32;
133    Random RNG = new Random();
134    for (int i = 0; i < 1000; ++i) {
135      byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i);
136      byte[] v = RandomKeyValueUtil.randomValue(RNG);
137      int cfLen = RNG.nextInt(k.length - rowLen + 1);
138      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
139          k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length);
140      sfw.append(kv);
141    }
142
143    sfw.close();
144    return sfw.getPath();
145  }
146
147  private static KeyValue.Type generateKeyType(Random rand) {
148    if (rand.nextBoolean()) {
149      // Let's make half of KVs puts.
150      return KeyValue.Type.Put;
151    } else {
152      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
153      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
154        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
155            + "Probably the layout of KeyValue.Type has changed.");
156      }
157      return keyType;
158    }
159  }
160
161  private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception {
162    // Open the file reader with block cache disabled.
163    HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true);
164    return file;
165  }
166
167  private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose)
168      throws IOException, InterruptedException {
169    // start write to store file.
170    Path path = writeStoreFile();
171    HStoreFile file = null;
172    List<HStoreFile> files = new ArrayList<HStoreFile>();
173    try {
174      file = readStoreFile(path, CONF);
175      files.add(file);
176    } catch (Exception e) {
177      // fail test
178      assertTrue(false);
179    }
180    scanFixture(kvs);
181    // scanners.add(storeFileScanner);
182    try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
183        new Scan(), getCols("a", "d"), 100L)) {
184      Thread closeThread = new Thread() {
185        public void run() {
186          scan.close(awaitClose, true);
187        }
188      };
189      closeThread.start();
190      Thread updateThread = new Thread() {
191        public void run() {
192          try {
193            scan.updateReaders(awaitUpdate, files, Collections.emptyList());
194          } catch (IOException e) {
195            e.printStackTrace();
196          }
197        }
198      };
199      updateThread.start();
200      // complete both the threads
201      closeThread.join();
202      // complete both the threads
203      updateThread.join();
204      if (file.getReader() != null) {
205        // the fileReader is not null when the updateReaders has completed first.
206        // in the other case the fileReader will be null.
207        int refCount = file.getReader().getRefCount();
208        LOG.info("the store scanner count is " + refCount);
209        assertTrue("The store scanner count should be 0", refCount == 0);
210      }
211    }
212  }
213
214  private static class ExtendedStoreScanner extends StoreScanner {
215    private CountDownLatch latch = new CountDownLatch(1);
216
217    public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
218        NavigableSet<byte[]> columns, long readPt) throws IOException {
219      super(store, scanInfo, scan, columns, readPt);
220    }
221
222    public void updateReaders(boolean await, List<HStoreFile> sfs,
223        List<KeyValueScanner> memStoreScanners) throws IOException {
224      if (await) {
225        try {
226          latch.await();
227        } catch (InterruptedException e) {
228          // TODO Auto-generated catch block
229          e.printStackTrace();
230        }
231      }
232      super.updateReaders(sfs, memStoreScanners);
233      if (!await) {
234        latch.countDown();
235      }
236    }
237
238    // creating a dummy close
239    public void close(boolean await, boolean dummy) {
240      if (await) {
241        try {
242          latch.await();
243        } catch (InterruptedException e) {
244          // TODO Auto-generated catch block
245          e.printStackTrace();
246        }
247      }
248      super.close();
249      if (!await) {
250        latch.countDown();
251      }
252    }
253  }
254
255  NavigableSet<byte[]> getCols(String... strCols) {
256    NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR);
257    for (String col : strCols) {
258      byte[] bytes = Bytes.toBytes(col);
259      cols.add(bytes);
260    }
261    return cols;
262  }
263}