001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.hamcrest.CoreMatchers.is;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertThat;
023import static org.junit.Assert.assertTrue;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.BlockingQueue;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
033import org.apache.hadoop.hbase.io.hfile.Cacheable;
034import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
035import org.apache.hadoop.hbase.testclassification.IOTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.junit.After;
038import org.junit.Before;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.mockito.Mockito;
043
044@Category({IOTests.class, SmallTests.class})
045public class TestBucketWriterThread {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049      HBaseClassTestRule.forClass(TestBucketWriterThread.class);
050
051  private BucketCache bc;
052  private BucketCache.WriterThread wt;
053  private BlockingQueue<RAMQueueEntry> q;
054  private Cacheable plainCacheable;
055  private BlockCacheKey plainKey;
056
057  /** A BucketCache that does not start its writer threads. */
058  private static class MockBucketCache extends BucketCache {
059
060    public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
061        int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
062        throws IOException {
063      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
064          persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
065    }
066
067    @Override
068    protected void startWriterThreads() {
069      // intentional noop
070    }
071  }
072
073  /**
074   * Set up variables and get BucketCache and WriterThread into state where tests can  manually
075   * control the running of WriterThread and BucketCache is empty.
076   * @throws Exception
077   */
078  @Before
079  public void setUp() throws Exception {
080    // Arbitrary capacity.
081    final int capacity = 16;
082    // Run with one writer thread only. Means there will be one writer queue only too.  We depend
083    // on this in below.
084    final int writerThreadsCount = 1;
085    this.bc = new MockBucketCache("offheap", capacity, 1, new int [] {1}, writerThreadsCount,
086      capacity, null, 100/*Tolerate ioerrors for 100ms*/);
087    assertEquals(writerThreadsCount, bc.writerThreads.length);
088    assertEquals(writerThreadsCount, bc.writerQueues.size());
089    // Get reference to our single WriterThread instance.
090    this.wt = bc.writerThreads[0];
091    this.q = bc.writerQueues.get(0);
092
093    wt.disableWriter();
094    this.plainKey = new BlockCacheKey("f", 0);
095    this.plainCacheable = Mockito.mock(Cacheable.class);
096
097    assertThat(bc.ramCache.isEmpty(), is(true));
098    assertTrue(q.isEmpty());
099  }
100
101  @After
102  public void tearDown() throws Exception {
103    if (this.bc != null) this.bc.shutdown();
104  }
105
106  /**
107   * Test non-error case just works.
108   * @throws FileNotFoundException
109   * @throws IOException
110   * @throws InterruptedException
111   */
112  @Test
113  public void testNonErrorCase() throws IOException, InterruptedException {
114    bc.cacheBlock(this.plainKey, this.plainCacheable);
115    doDrainOfOneEntry(this.bc, this.wt, this.q);
116  }
117
118  /**
119   * Pass through a too big entry and ensure it is cleared from queues and ramCache.
120   * Manually run the WriterThread.
121   * @throws InterruptedException
122   */
123  @Test
124  public void testTooBigEntry() throws InterruptedException {
125    Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
126    Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
127    this.bc.cacheBlock(this.plainKey, tooBigCacheable);
128    doDrainOfOneEntry(this.bc, this.wt, this.q);
129  }
130
131  /**
132   * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then
133   * put it back and process it.
134   * @throws IOException
135   * @throws InterruptedException
136   */
137  @SuppressWarnings("unchecked")
138  @Test
139  public void testIOE() throws IOException, InterruptedException {
140    this.bc.cacheBlock(this.plainKey, plainCacheable);
141    RAMQueueEntry rqe = q.remove();
142    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
143    Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
144      writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
145    this.q.add(spiedRqe);
146    doDrainOfOneEntry(bc, wt, q);
147    // Cache disabled when ioes w/o ever healing.
148    assertTrue(!bc.isCacheEnabled());
149  }
150
151  /**
152   * Do Cache full exception
153   * @throws IOException
154   * @throws InterruptedException
155   */
156  @Test
157  public void testCacheFullException()
158      throws IOException, InterruptedException {
159    this.bc.cacheBlock(this.plainKey, plainCacheable);
160    RAMQueueEntry rqe = q.remove();
161    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
162    final CacheFullException cfe = new CacheFullException(0, 0);
163    BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
164    Mockito.doThrow(cfe).
165      doReturn(mockedBucketEntry).
166      when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
167    this.q.add(spiedRqe);
168    doDrainOfOneEntry(bc, wt, q);
169  }
170
171  private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
172      final BlockingQueue<RAMQueueEntry> q)
173  throws InterruptedException {
174    List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
175    wt.doDrain(rqes);
176    assertTrue(q.isEmpty());
177    assertTrue(bc.ramCache.isEmpty());
178    assertEquals(0, bc.heapSize());
179  }
180}