001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertNotEquals; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.File; 032import java.io.IOException; 033import java.util.Map; 034import java.util.Random; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.function.BiConsumer; 038import java.util.function.BiFunction; 039import org.apache.commons.lang3.mutable.MutableLong; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.Waiter; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionInfoBuilder; 052import org.apache.hadoop.hbase.fs.HFileSystem; 053import org.apache.hadoop.hbase.io.ByteBuffAllocator; 054import org.apache.hadoop.hbase.io.hfile.BlockCache; 055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 056import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 057import org.apache.hadoop.hbase.io.hfile.BlockType; 058import org.apache.hadoop.hbase.io.hfile.CacheConfig; 059import org.apache.hadoop.hbase.io.hfile.Cacheable; 060import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 061import org.apache.hadoop.hbase.io.hfile.HFile; 062import org.apache.hadoop.hbase.io.hfile.HFileBlock; 063import org.apache.hadoop.hbase.io.hfile.HFileContext; 064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 065import org.apache.hadoop.hbase.io.hfile.HFileScanner; 066import org.apache.hadoop.hbase.io.hfile.LruBlockCache; 067import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 068import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 069import org.apache.hadoop.hbase.regionserver.BloomType; 070import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 071import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 072import org.apache.hadoop.hbase.regionserver.HStoreFile; 073import org.apache.hadoop.hbase.regionserver.StoreContext; 074import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 075import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 077import org.apache.hadoop.hbase.testclassification.IOTests; 078import org.apache.hadoop.hbase.testclassification.MediumTests; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.junit.After; 081import org.junit.Before; 082import org.junit.ClassRule; 083import org.junit.Rule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.junit.rules.TestName; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 091 092@Category({ IOTests.class, MediumTests.class }) 093public class TestPrefetchWithBucketCache { 094 095 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); 096 097 @ClassRule 098 public static final HBaseClassTestRule CLASS_RULE = 099 HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); 100 101 @Rule 102 public TestName name = new TestName(); 103 104 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 105 106 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 107 private static final int DATA_BLOCK_SIZE = 2048; 108 private Configuration conf; 109 private CacheConfig cacheConf; 110 private FileSystem fs; 111 private BlockCache blockCache; 112 113 @Before 114 public void setUp() throws IOException { 115 conf = TEST_UTIL.getConfiguration(); 116 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 117 fs = HFileSystem.get(conf); 118 File testDir = new File(name.getMethodName()); 119 testDir.mkdir(); 120 conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); 121 } 122 123 @After 124 public void tearDown() { 125 File cacheFile = new File(name.getMethodName() + "/bucket.cache"); 126 File dir = new File(name.getMethodName()); 127 cacheFile.delete(); 128 dir.delete(); 129 } 130 131 @Test 132 public void testPrefetchDoesntOverwork() throws Exception { 133 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 134 blockCache = BlockCacheFactory.createBlockCache(conf); 135 cacheConf = new CacheConfig(conf, blockCache); 136 Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); 137 // Prefetches the file blocks 138 LOG.debug("First read should prefetch the blocks."); 139 readStoreFile(storeFile); 140 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 141 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 142 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 143 Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); 144 LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache(); 145 assertEquals(1, l1.getBlockCount()); 146 // Removes the meta block from L1 cache 147 l1.clearCache(); 148 // Reads file again. Checks we are not prefetching data blocks again, 149 // but fetch back the meta block 150 LOG.debug("Second read, prefetch should run, without altering bucket cache state," 151 + " only the meta block should be fetched again."); 152 readStoreFile(storeFile); 153 // Makes sure the bucketcache entries have not changed 154 snapshot.entrySet().forEach(e -> { 155 BucketEntry entry = bc.getBackingMap().get(e.getKey()); 156 assertNotNull(entry); 157 assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); 158 }); 159 assertEquals(1, l1.getBlockCount()); 160 // forcibly removes first block from the bc backing map, in order to cause it to be cached again 161 BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); 162 LOG.debug("removing block {}", key); 163 bc.getBackingMap().remove(key); 164 bc.getFullyCachedFiles().get().remove(storeFile.getName()); 165 assertTrue(snapshot.size() > bc.getBackingMap().size()); 166 LOG.debug("Third read should prefetch again, as we removed one block for the file."); 167 readStoreFile(storeFile); 168 Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); 169 assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); 170 } 171 172 @Test 173 public void testPrefetchRefsAfterSplit() throws Exception { 174 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 175 blockCache = BlockCacheFactory.createBlockCache(conf); 176 cacheConf = new CacheConfig(conf, blockCache); 177 178 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); 179 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 180 Path regionDir = new Path(tableDir, region.getEncodedName()); 181 Path cfDir = new Path(regionDir, "cf"); 182 HRegionFileSystem regionFS = 183 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 184 Path storeFile = writeStoreFile(100, cfDir); 185 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 186 StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir) 187 .withCacheConfig(cacheConf).build()); 188 // Prefetches the file blocks 189 LOG.debug("First read should prefetch the blocks."); 190 readStoreFile(storeFile); 191 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 192 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 193 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 194 195 // split the file and return references to the original file 196 Random rand = ThreadLocalRandom.current(); 197 byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); 198 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 199 Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, 200 new ConstantSizeRegionSplitPolicy(), sft); 201 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 202 // starts reader for the ref. The ref should resolve to the original file blocks 203 // and not duplicate blocks in the cache. 204 refHsf.initReader(); 205 HFile.Reader reader = refHsf.getReader().getHFileReader(); 206 while (!reader.prefetchComplete()) { 207 // Sleep for a bit 208 Thread.sleep(1000); 209 } 210 // the ref file blocks keys should actually resolve to the referred file blocks, 211 // so we should not see additional blocks in the cache. 212 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 213 214 BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); 215 Cacheable result = bc.getBlock(refCacheKey, true, false, true); 216 assertNotNull(result); 217 BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); 218 assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); 219 assertNull(bc.getBackingMap().get(refCacheKey)); 220 assertNotNull(bc.getBlockForReference(refCacheKey)); 221 } 222 223 @Test 224 public void testPrefetchInterruptOnCapacity() throws Exception { 225 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 226 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 227 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 228 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 229 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 230 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 231 blockCache = BlockCacheFactory.createBlockCache(conf); 232 cacheConf = new CacheConfig(conf, blockCache); 233 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 234 // Prefetches the file blocks 235 LOG.debug("First read should prefetch the blocks."); 236 createReaderAndWaitForPrefetchInterruption(storeFile); 237 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 238 () -> PrefetchExecutor.isCompleted(storeFile)); 239 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 240 long evictedFirstPrefetch = bc.getStats().getEvictedCount(); 241 HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); 242 assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount()); 243 HFileScanner scanner = reader.getScanner(conf, true, true); 244 scanner.seekTo(); 245 while (scanner.next()) { 246 // do a full scan to force some evictions 247 LOG.trace("Iterating the full scan to evict some blocks"); 248 } 249 scanner.close(); 250 Waiter.waitFor(conf, 5000, () -> { 251 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 252 if (!queue.isEmpty()) { 253 return false; 254 } 255 } 256 return true; 257 }); 258 // The scanner should had triggered at least 3x evictions from the prefetch, 259 // as we try cache each block without interruption. 260 assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch); 261 } 262 263 @Test 264 public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { 265 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 266 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 267 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 268 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 269 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 270 blockCache = BlockCacheFactory.createBlockCache(conf); 271 ColumnFamilyDescriptor family = 272 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); 273 cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); 274 Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); 275 // Prefetches the file blocks 276 LOG.debug("First read should prefetch the blocks."); 277 createReaderAndWaitForPrefetchInterruption(storeFile); 278 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 279 Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile)); 280 long evictions = bc.getStats().getEvictedCount(); 281 LOG.debug("Total evicted at this point: {}", evictions); 282 // creates another reader, now that cache is full, no block would fit and prefetch should not 283 // trigger any new evictions 284 createReaderAndWaitForPrefetchInterruption(storeFile); 285 assertEquals(evictions, bc.getStats().getEvictedCount()); 286 } 287 288 @Test 289 public void testPrefetchRunNoEvictions() throws Exception { 290 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 291 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 292 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 293 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 294 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 295 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 296 blockCache = BlockCacheFactory.createBlockCache(conf); 297 cacheConf = new CacheConfig(conf, blockCache); 298 Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000); 299 // Prefetches the file blocks 300 createReaderAndWaitForPrefetchInterruption(storeFile); 301 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 302 () -> PrefetchExecutor.isCompleted(storeFile)); 303 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 304 // Wait until all cache writer queues are empty 305 Waiter.waitFor(conf, 5000, () -> { 306 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 307 if (!queue.isEmpty()) { 308 return false; 309 } 310 } 311 return true; 312 }); 313 // With the wait time configuration, prefetch should trigger no evictions once it reaches 314 // cache capacity 315 assertEquals(0, bc.getStats().getEvictedCount()); 316 } 317 318 @Test 319 public void testPrefetchRunTriggersEvictions() throws Exception { 320 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 321 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 322 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 323 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 324 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 325 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0); 326 blockCache = BlockCacheFactory.createBlockCache(conf); 327 cacheConf = new CacheConfig(conf, blockCache); 328 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 329 // Prefetches the file blocks 330 createReaderAndWaitForPrefetchInterruption(storeFile); 331 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 332 () -> PrefetchExecutor.isCompleted(storeFile)); 333 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 334 // Wait until all cache writer queues are empty 335 Waiter.waitFor(conf, 5000, () -> { 336 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 337 if (!queue.isEmpty()) { 338 return false; 339 } 340 } 341 return true; 342 }); 343 if (bc.getStats().getFailedInserts() == 0) { 344 // With no wait time configuration, prefetch should trigger evictions once it reaches 345 // cache capacity 346 assertNotEquals(0, bc.getStats().getEvictedCount()); 347 } else { 348 LOG.info("We had {} cache insert failures, which may cause cache usage " 349 + "to never reach capacity.", bc.getStats().getFailedInserts()); 350 } 351 } 352 353 @Test 354 public void testPrefetchMetricProgress() throws Exception { 355 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 356 blockCache = BlockCacheFactory.createBlockCache(conf); 357 cacheConf = new CacheConfig(conf, blockCache); 358 Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100); 359 // Prefetches the file blocks 360 LOG.debug("First read should prefetch the blocks."); 361 readStoreFile(storeFile); 362 String regionName = storeFile.getParent().getParent().getName(); 363 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 364 MutableLong regionCachedSize = new MutableLong(0); 365 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 366 long waitedTime = Waiter.waitFor(conf, 300, () -> { 367 if (bc.getBackingMap().size() > 0) { 368 long currentSize = bc.getRegionCachedInfo().get().get(regionName); 369 assertTrue(regionCachedSize.getValue() <= currentSize); 370 LOG.debug("Logging progress of region caching: {}", currentSize); 371 regionCachedSize.setValue(currentSize); 372 } 373 return bc.getBackingMap().size() == 6; 374 }); 375 } 376 377 private void readStoreFile(Path storeFilePath) throws Exception { 378 readStoreFile(storeFilePath, (r, o) -> { 379 HFileBlock block = null; 380 try { 381 block = r.readBlock(o, -1, false, true, false, true, null, null); 382 } catch (IOException e) { 383 fail(e.getMessage()); 384 } 385 return block; 386 }, (key, block) -> { 387 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 388 if ( 389 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 390 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 391 ) { 392 assertTrue(isCached); 393 } 394 }); 395 } 396 397 private void readStoreFile(Path storeFilePath, 398 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 399 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 400 // Open the file 401 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 402 403 while (!reader.prefetchComplete()) { 404 // Sleep for a bit 405 Thread.sleep(1000); 406 } 407 long offset = 0; 408 long sizeForDataBlocks = 0; 409 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 410 HFileBlock block = readFunction.apply(reader, offset); 411 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 412 validationFunction.accept(blockCacheKey, block); 413 offset += block.getOnDiskSizeWithHeader(); 414 } 415 } 416 417 private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) 418 throws Exception { 419 // Open the file 420 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 421 422 while (!reader.prefetchComplete()) { 423 // Sleep for a bit 424 Thread.sleep(1000); 425 } 426 assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() 427 .get().size()); 428 429 return reader; 430 } 431 432 private Path writeStoreFile(String fname, int numKVs) throws IOException { 433 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 434 return writeStoreFile(fname, meta, numKVs); 435 } 436 437 private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException { 438 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 439 return writeStoreFile(meta, numKVs, regionCFDir); 440 } 441 442 private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { 443 return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); 444 } 445 446 private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) 447 throws IOException { 448 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 449 .withOutputDir(regionCFDir).withFileContext(context).build(); 450 Random rand = ThreadLocalRandom.current(); 451 final int rowLen = 32; 452 for (int i = 0; i < numKVs; ++i) { 453 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 454 byte[] v = RandomKeyValueUtil.randomValue(rand); 455 int cfLen = rand.nextInt(k.length - rowLen + 1); 456 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 457 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 458 sfw.append(kv); 459 } 460 461 sfw.close(); 462 return sfw.getPath(); 463 } 464 465 public static KeyValue.Type generateKeyType(Random rand) { 466 if (rand.nextBoolean()) { 467 // Let's make half of KVs puts. 468 return KeyValue.Type.Put; 469 } else { 470 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 471 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 472 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 473 + "Probably the layout of KeyValue.Type has changed."); 474 } 475 return keyType; 476 } 477 } 478}