001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertNotEquals;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.File;
032import java.io.IOException;
033import java.util.Map;
034import java.util.Random;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.function.BiConsumer;
038import java.util.function.BiFunction;
039import org.apache.commons.lang3.mutable.MutableLong;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Waiter;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.fs.HFileSystem;
053import org.apache.hadoop.hbase.io.ByteBuffAllocator;
054import org.apache.hadoop.hbase.io.hfile.BlockCache;
055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
056import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
057import org.apache.hadoop.hbase.io.hfile.BlockType;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.Cacheable;
060import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
061import org.apache.hadoop.hbase.io.hfile.HFile;
062import org.apache.hadoop.hbase.io.hfile.HFileBlock;
063import org.apache.hadoop.hbase.io.hfile.HFileContext;
064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
065import org.apache.hadoop.hbase.io.hfile.HFileScanner;
066import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
067import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
068import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
069import org.apache.hadoop.hbase.regionserver.BloomType;
070import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
071import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
072import org.apache.hadoop.hbase.regionserver.HStoreFile;
073import org.apache.hadoop.hbase.regionserver.StoreContext;
074import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
075import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
077import org.apache.hadoop.hbase.testclassification.IOTests;
078import org.apache.hadoop.hbase.testclassification.MediumTests;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.junit.After;
081import org.junit.Before;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.rules.TestName;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
091
092@Category({ IOTests.class, MediumTests.class })
093public class TestPrefetchWithBucketCache {
094
095  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099    HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
100
101  @Rule
102  public TestName name = new TestName();
103
104  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
105
106  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
107  private static final int DATA_BLOCK_SIZE = 2048;
108  private Configuration conf;
109  private CacheConfig cacheConf;
110  private FileSystem fs;
111  private BlockCache blockCache;
112
113  @Before
114  public void setUp() throws IOException {
115    conf = TEST_UTIL.getConfiguration();
116    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
117    fs = HFileSystem.get(conf);
118    File testDir = new File(name.getMethodName());
119    testDir.mkdir();
120    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
121  }
122
123  @After
124  public void tearDown() {
125    File cacheFile = new File(name.getMethodName() + "/bucket.cache");
126    File dir = new File(name.getMethodName());
127    cacheFile.delete();
128    dir.delete();
129  }
130
131  @Test
132  public void testPrefetchDoesntOverwork() throws Exception {
133    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
134    blockCache = BlockCacheFactory.createBlockCache(conf);
135    cacheConf = new CacheConfig(conf, blockCache);
136    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
137    // Prefetches the file blocks
138    LOG.debug("First read should prefetch the blocks.");
139    readStoreFile(storeFile);
140    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
141    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
142    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
143    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
144    LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache();
145    assertEquals(1, l1.getBlockCount());
146    // Removes the meta block from L1 cache
147    l1.clearCache();
148    // Reads file again. Checks we are not prefetching data blocks again,
149    // but fetch back the meta block
150    LOG.debug("Second read, prefetch should run, without altering bucket cache state,"
151      + " only the meta block should be fetched again.");
152    readStoreFile(storeFile);
153    // Makes sure the bucketcache entries have not changed
154    snapshot.entrySet().forEach(e -> {
155      BucketEntry entry = bc.getBackingMap().get(e.getKey());
156      assertNotNull(entry);
157      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
158    });
159    assertEquals(1, l1.getBlockCount());
160    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
161    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
162    LOG.debug("removing block {}", key);
163    bc.getBackingMap().remove(key);
164    bc.getFullyCachedFiles().get().remove(storeFile.getName());
165    assertTrue(snapshot.size() > bc.getBackingMap().size());
166    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
167    readStoreFile(storeFile);
168    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
169    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
170  }
171
172  @Test
173  public void testPrefetchRefsAfterSplit() throws Exception {
174    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
175    blockCache = BlockCacheFactory.createBlockCache(conf);
176    cacheConf = new CacheConfig(conf, blockCache);
177
178    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit");
179    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
180    Path regionDir = new Path(tableDir, region.getEncodedName());
181    Path cfDir = new Path(regionDir, "cf");
182    HRegionFileSystem regionFS =
183      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
184    Path storeFile = writeStoreFile(100, cfDir);
185    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
186      StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir)
187        .withCacheConfig(cacheConf).build());
188    // Prefetches the file blocks
189    LOG.debug("First read should prefetch the blocks.");
190    readStoreFile(storeFile);
191    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
192    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
193    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
194
195    // split the file and return references to the original file
196    Random rand = ThreadLocalRandom.current();
197    byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50);
198    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft);
199    Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false,
200      new ConstantSizeRegionSplitPolicy(), sft);
201    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft);
202    // starts reader for the ref. The ref should resolve to the original file blocks
203    // and not duplicate blocks in the cache.
204    refHsf.initReader();
205    HFile.Reader reader = refHsf.getReader().getHFileReader();
206    while (!reader.prefetchComplete()) {
207      // Sleep for a bit
208      Thread.sleep(1000);
209    }
210    // the ref file blocks keys should actually resolve to the referred file blocks,
211    // so we should not see additional blocks in the cache.
212    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
213
214    BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0);
215    Cacheable result = bc.getBlock(refCacheKey, true, false, true);
216    assertNotNull(result);
217    BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0);
218    assertEquals(result, bc.getBlock(fileCacheKey, true, false, true));
219    assertNull(bc.getBackingMap().get(refCacheKey));
220    assertNotNull(bc.getBlockForReference(refCacheKey));
221  }
222
223  @Test
224  public void testPrefetchInterruptOnCapacity() throws Exception {
225    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
226    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
227    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
228    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
229    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
230    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
231    blockCache = BlockCacheFactory.createBlockCache(conf);
232    cacheConf = new CacheConfig(conf, blockCache);
233    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
234    // Prefetches the file blocks
235    LOG.debug("First read should prefetch the blocks.");
236    createReaderAndWaitForPrefetchInterruption(storeFile);
237    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
238      () -> PrefetchExecutor.isCompleted(storeFile));
239    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
240    long evictedFirstPrefetch = bc.getStats().getEvictedCount();
241    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
242    assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
243    HFileScanner scanner = reader.getScanner(conf, true, true);
244    scanner.seekTo();
245    while (scanner.next()) {
246      // do a full scan to force some evictions
247      LOG.trace("Iterating the full scan to evict some blocks");
248    }
249    scanner.close();
250    Waiter.waitFor(conf, 5000, () -> {
251      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
252        if (!queue.isEmpty()) {
253          return false;
254        }
255      }
256      return true;
257    });
258    // The scanner should had triggered at least 3x evictions from the prefetch,
259    // as we try cache each block without interruption.
260    assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
261  }
262
263  @Test
264  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
265    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
266    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
267    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
268    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
269    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
270    blockCache = BlockCacheFactory.createBlockCache(conf);
271    ColumnFamilyDescriptor family =
272      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
273    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
274    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
275    // Prefetches the file blocks
276    LOG.debug("First read should prefetch the blocks.");
277    createReaderAndWaitForPrefetchInterruption(storeFile);
278    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
279    Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
280    long evictions = bc.getStats().getEvictedCount();
281    LOG.debug("Total evicted at this point: {}", evictions);
282    // creates another reader, now that cache is full, no block would fit and prefetch should not
283    // trigger any new evictions
284    createReaderAndWaitForPrefetchInterruption(storeFile);
285    assertEquals(evictions, bc.getStats().getEvictedCount());
286  }
287
288  @Test
289  public void testPrefetchRunNoEvictions() throws Exception {
290    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
291    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
292    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
293    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
294    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
295    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
296    blockCache = BlockCacheFactory.createBlockCache(conf);
297    cacheConf = new CacheConfig(conf, blockCache);
298    Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
299    // Prefetches the file blocks
300    createReaderAndWaitForPrefetchInterruption(storeFile);
301    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
302      () -> PrefetchExecutor.isCompleted(storeFile));
303    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
304    // Wait until all cache writer queues are empty
305    Waiter.waitFor(conf, 5000, () -> {
306      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
307        if (!queue.isEmpty()) {
308          return false;
309        }
310      }
311      return true;
312    });
313    // With the wait time configuration, prefetch should trigger no evictions once it reaches
314    // cache capacity
315    assertEquals(0, bc.getStats().getEvictedCount());
316  }
317
318  @Test
319  public void testPrefetchRunTriggersEvictions() throws Exception {
320    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
321    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
322    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
323    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
324    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
325    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
326    blockCache = BlockCacheFactory.createBlockCache(conf);
327    cacheConf = new CacheConfig(conf, blockCache);
328    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
329    // Prefetches the file blocks
330    createReaderAndWaitForPrefetchInterruption(storeFile);
331    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
332      () -> PrefetchExecutor.isCompleted(storeFile));
333    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
334    // Wait until all cache writer queues are empty
335    Waiter.waitFor(conf, 5000, () -> {
336      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
337        if (!queue.isEmpty()) {
338          return false;
339        }
340      }
341      return true;
342    });
343    if (bc.getStats().getFailedInserts() == 0) {
344      // With no wait time configuration, prefetch should trigger evictions once it reaches
345      // cache capacity
346      assertNotEquals(0, bc.getStats().getEvictedCount());
347    } else {
348      LOG.info("We had {} cache insert failures, which may cause cache usage "
349        + "to never reach capacity.", bc.getStats().getFailedInserts());
350    }
351  }
352
353  @Test
354  public void testPrefetchMetricProgress() throws Exception {
355    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
356    blockCache = BlockCacheFactory.createBlockCache(conf);
357    cacheConf = new CacheConfig(conf, blockCache);
358    Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100);
359    // Prefetches the file blocks
360    LOG.debug("First read should prefetch the blocks.");
361    readStoreFile(storeFile);
362    String regionName = storeFile.getParent().getParent().getName();
363    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
364    MutableLong regionCachedSize = new MutableLong(0);
365    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
366    long waitedTime = Waiter.waitFor(conf, 300, () -> {
367      if (bc.getBackingMap().size() > 0) {
368        long currentSize = bc.getRegionCachedInfo().get().get(regionName);
369        assertTrue(regionCachedSize.getValue() <= currentSize);
370        LOG.debug("Logging progress of region caching: {}", currentSize);
371        regionCachedSize.setValue(currentSize);
372      }
373      return bc.getBackingMap().size() == 6;
374    });
375  }
376
377  private void readStoreFile(Path storeFilePath) throws Exception {
378    readStoreFile(storeFilePath, (r, o) -> {
379      HFileBlock block = null;
380      try {
381        block = r.readBlock(o, -1, false, true, false, true, null, null);
382      } catch (IOException e) {
383        fail(e.getMessage());
384      }
385      return block;
386    }, (key, block) -> {
387      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
388      if (
389        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
390          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
391      ) {
392        assertTrue(isCached);
393      }
394    });
395  }
396
397  private void readStoreFile(Path storeFilePath,
398    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
399    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
400    // Open the file
401    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
402
403    while (!reader.prefetchComplete()) {
404      // Sleep for a bit
405      Thread.sleep(1000);
406    }
407    long offset = 0;
408    long sizeForDataBlocks = 0;
409    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
410      HFileBlock block = readFunction.apply(reader, offset);
411      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
412      validationFunction.accept(blockCacheKey, block);
413      offset += block.getOnDiskSizeWithHeader();
414    }
415  }
416
417  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
418    throws Exception {
419    // Open the file
420    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
421
422    while (!reader.prefetchComplete()) {
423      // Sleep for a bit
424      Thread.sleep(1000);
425    }
426    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
427      .get().size());
428
429    return reader;
430  }
431
432  private Path writeStoreFile(String fname, int numKVs) throws IOException {
433    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
434    return writeStoreFile(fname, meta, numKVs);
435  }
436
437  private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException {
438    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
439    return writeStoreFile(meta, numKVs, regionCFDir);
440  }
441
442  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
443    return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname));
444  }
445
446  private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir)
447    throws IOException {
448    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
449      .withOutputDir(regionCFDir).withFileContext(context).build();
450    Random rand = ThreadLocalRandom.current();
451    final int rowLen = 32;
452    for (int i = 0; i < numKVs; ++i) {
453      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
454      byte[] v = RandomKeyValueUtil.randomValue(rand);
455      int cfLen = rand.nextInt(k.length - rowLen + 1);
456      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
457        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
458      sfw.append(kv);
459    }
460
461    sfw.close();
462    return sfw.getPath();
463  }
464
465  public static KeyValue.Type generateKeyType(Random rand) {
466    if (rand.nextBoolean()) {
467      // Let's make half of KVs puts.
468      return KeyValue.Type.Put;
469    } else {
470      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
471      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
472        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
473          + "Probably the layout of KeyValue.Type has changed.");
474      }
475      return keyType;
476    }
477  }
478}