001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.hamcrest.CoreMatchers.is;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.BlockingQueue;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
033import org.apache.hadoop.hbase.io.hfile.Cacheable;
034import org.apache.hadoop.hbase.io.hfile.HFileBlock;
035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
036import org.apache.hadoop.hbase.testclassification.IOTests;
037import org.apache.hadoop.hbase.testclassification.SmallTests;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044
045@Category({ IOTests.class, SmallTests.class })
046public class TestBucketWriterThread {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestBucketWriterThread.class);
051
052  private BucketCache bc;
053  private BucketCache.WriterThread wt;
054  private BlockingQueue<RAMQueueEntry> q;
055  private Cacheable plainCacheable;
056  private BlockCacheKey plainKey;
057
058  /** A BucketCache that does not start its writer threads. */
059  private static class MockBucketCache extends BucketCache {
060
061    public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
062      int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
063      throws IOException {
064      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
065        persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
066    }
067
068    @Override
069    protected void startWriterThreads() {
070      // intentional noop
071    }
072  }
073
074  /**
075   * Set up variables and get BucketCache and WriterThread into state where tests can manually
076   * control the running of WriterThread and BucketCache is empty.
077   */
078  @Before
079  public void setUp() throws Exception {
080    // Arbitrary capacity.
081    final int capacity = 16;
082    // Run with one writer thread only. Means there will be one writer queue only too. We depend
083    // on this in below.
084    final int writerThreadsCount = 1;
085    this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount,
086      capacity, null, 100/* Tolerate ioerrors for 100ms */);
087    assertEquals(writerThreadsCount, bc.writerThreads.length);
088    assertEquals(writerThreadsCount, bc.writerQueues.size());
089    // Get reference to our single WriterThread instance.
090    this.wt = bc.writerThreads[0];
091    this.q = bc.writerQueues.get(0);
092
093    wt.disableWriter();
094    this.plainKey = new BlockCacheKey("f", 0);
095    this.plainCacheable = Mockito.mock(Cacheable.class);
096
097    assertThat(bc.ramCache.isEmpty(), is(true));
098    assertTrue(q.isEmpty());
099  }
100
101  @After
102  public void tearDown() throws Exception {
103    if (this.bc != null) this.bc.shutdown();
104  }
105
106  /**
107   * Test non-error case just works.
108   */
109  @Test
110  public void testNonErrorCase() throws IOException, InterruptedException {
111    bc.cacheBlock(this.plainKey, this.plainCacheable);
112    doDrainOfOneEntry(this.bc, this.wt, this.q);
113  }
114
115  /**
116   * Pass through a too big entry and ensure it is cleared from queues and ramCache. Manually run
117   * the WriterThread.
118   */
119  @Test
120  public void testTooBigEntry() throws InterruptedException {
121    Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
122    Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
123    this.bc.cacheBlock(this.plainKey, tooBigCacheable);
124    doDrainOfOneEntry(this.bc, this.wt, this.q);
125    assertTrue(bc.blocksByHFile.isEmpty());
126    assertTrue(bc.getBackingMap().isEmpty());
127  }
128
129  /**
130   * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then put it
131   * back and process it.
132   */
133  @SuppressWarnings("unchecked")
134  @Test
135  public void testIOE() throws IOException, InterruptedException {
136    this.bc.cacheBlock(this.plainKey, plainCacheable);
137    RAMQueueEntry rqe = q.remove();
138    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
139    Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(),
140      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
141    this.q.add(spiedRqe);
142    doDrainOfOneEntry(bc, wt, q);
143    assertTrue(bc.blocksByHFile.isEmpty());
144    assertTrue(bc.getBackingMap().isEmpty());
145    // Cache disabled when ioes w/o ever healing.
146    assertTrue(!bc.isCacheEnabled());
147  }
148
149  /**
150   * Do Cache full exception
151   */
152  @Test
153  public void testCacheFullException() throws IOException, InterruptedException {
154    this.bc.cacheBlock(this.plainKey, plainCacheable);
155    RAMQueueEntry rqe = q.remove();
156    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
157    final CacheFullException cfe = new CacheFullException(0, 0);
158    BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
159    Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(),
160      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
161    this.q.add(spiedRqe);
162    doDrainOfOneEntry(bc, wt, q);
163  }
164
165  private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
166    final BlockingQueue<RAMQueueEntry> q) throws InterruptedException {
167    List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
168    bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
169    assertTrue(q.isEmpty());
170    assertTrue(bc.ramCache.isEmpty());
171    assertEquals(0, bc.heapSize());
172  }
173}