001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.hamcrest.CoreMatchers.is;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertThat;
023import static org.junit.Assert.assertTrue;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.BlockingQueue;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
034import org.apache.hadoop.hbase.io.hfile.Cacheable;
035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
036import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
037import org.apache.hadoop.hbase.testclassification.IOTests;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.junit.After;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.mockito.Mockito;
045
046@Category({IOTests.class, SmallTests.class})
047public class TestBucketWriterThread {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestBucketWriterThread.class);
052
053  private BucketCache bc;
054  private BucketCache.WriterThread wt;
055  private BlockingQueue<RAMQueueEntry> q;
056  private Cacheable plainCacheable;
057  private BlockCacheKey plainKey;
058
059  /** A BucketCache that does not start its writer threads. */
060  private static class MockBucketCache extends BucketCache {
061
062    public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
063      int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
064      throws FileNotFoundException, IOException {
065      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
066        persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
067    }
068
069    @Override
070    protected void startWriterThreads() {
071      // intentional noop
072    }
073  }
074
075  /**
076   * Set up variables and get BucketCache and WriterThread into state where tests can  manually
077   * control the running of WriterThread and BucketCache is empty.
078   * @throws Exception
079   */
080  @Before
081  public void setUp() throws Exception {
082    // Arbitrary capacity.
083    final int capacity = 16;
084    // Run with one writer thread only. Means there will be one writer queue only too.  We depend
085    // on this in below.
086    final int writerThreadsCount = 1;
087    this.bc = new MockBucketCache("offheap", capacity, 1, new int [] {1}, writerThreadsCount,
088      capacity, null, 100/*Tolerate ioerrors for 100ms*/);
089    assertEquals(writerThreadsCount, bc.writerThreads.length);
090    assertEquals(writerThreadsCount, bc.writerQueues.size());
091    // Get reference to our single WriterThread instance.
092    this.wt = bc.writerThreads[0];
093    this.q = bc.writerQueues.get(0);
094
095    wt.disableWriter();
096    this.plainKey = new BlockCacheKey("f", 0);
097    this.plainCacheable = Mockito.mock(Cacheable.class);
098
099    assertThat(bc.ramCache.isEmpty(), is(true));
100    assertTrue(q.isEmpty());
101  }
102
103  @After
104  public void tearDown() throws Exception {
105    if (this.bc != null) this.bc.shutdown();
106  }
107
108  /**
109   * Test non-error case just works.
110   * @throws FileNotFoundException
111   * @throws IOException
112   * @throws InterruptedException
113   */
114  @Test
115  public void testNonErrorCase() throws IOException, InterruptedException {
116    bc.cacheBlock(this.plainKey, this.plainCacheable);
117    doDrainOfOneEntry(this.bc, this.wt, this.q);
118  }
119
120  /**
121   * Pass through a too big entry and ensure it is cleared from queues and ramCache.
122   * Manually run the WriterThread.
123   * @throws InterruptedException
124   */
125  @Test
126  public void testTooBigEntry() throws InterruptedException {
127    Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
128    Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
129    this.bc.cacheBlock(this.plainKey, tooBigCacheable);
130    doDrainOfOneEntry(this.bc, this.wt, this.q);
131  }
132
133  /**
134   * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then
135   * put it back and process it.
136   * @throws IOException
137   * @throws InterruptedException
138   */
139  @SuppressWarnings("unchecked")
140  @Test
141  public void testIOE() throws IOException, InterruptedException {
142    this.bc.cacheBlock(this.plainKey, plainCacheable);
143    RAMQueueEntry rqe = q.remove();
144    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
145    Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
146      writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
147        (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
148    this.q.add(spiedRqe);
149    doDrainOfOneEntry(bc, wt, q);
150    // Cache disabled when ioes w/o ever healing.
151    assertTrue(!bc.isCacheEnabled());
152  }
153
154  /**
155   * Do Cache full exception
156   * @throws IOException
157   * @throws InterruptedException
158   */
159  @Test
160  public void testCacheFullException()
161      throws IOException, InterruptedException {
162    this.bc.cacheBlock(this.plainKey, plainCacheable);
163    RAMQueueEntry rqe = q.remove();
164    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
165    final CacheFullException cfe = new CacheFullException(0, 0);
166    BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
167    Mockito.doThrow(cfe).
168      doReturn(mockedBucketEntry).
169      when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
170        (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
171    this.q.add(spiedRqe);
172    doDrainOfOneEntry(bc, wt, q);
173  }
174
175  private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
176      final BlockingQueue<RAMQueueEntry> q)
177  throws InterruptedException {
178    List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
179    wt.doDrain(rqes);
180    assertTrue(q.isEmpty());
181    assertTrue(bc.ramCache.isEmpty());
182    assertEquals(0, bc.heapSize());
183  }
184}