001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.hamcrest.CoreMatchers.is;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.BlockingQueue;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
032import org.apache.hadoop.hbase.io.hfile.Cacheable;
033import org.apache.hadoop.hbase.io.hfile.HFileBlock;
034import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
035import org.apache.hadoop.hbase.testclassification.IOTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.junit.jupiter.api.AfterEach;
038import org.junit.jupiter.api.BeforeEach;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041import org.mockito.Mockito;
042
043@Tag(IOTests.TAG)
044@Tag(SmallTests.TAG)
045public class TestBucketWriterThread {
046
047  private BucketCache bc;
048  private BucketCache.WriterThread wt;
049  private BlockingQueue<RAMQueueEntry> q;
050  private Cacheable plainCacheable;
051  private BlockCacheKey plainKey;
052
053  /** A BucketCache that does not start its writer threads. */
054  private static class MockBucketCache extends BucketCache {
055
056    public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
057      int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
058      throws IOException {
059      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
060        persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
061    }
062
063    @Override
064    protected void startWriterThreads() {
065      // intentional noop
066    }
067  }
068
069  /**
070   * Set up variables and get BucketCache and WriterThread into state where tests can manually
071   * control the running of WriterThread and BucketCache is empty.
072   */
073  @BeforeEach
074  public void setUp() throws Exception {
075    // Arbitrary capacity.
076    final int capacity = 16;
077    // Run with one writer thread only. Means there will be one writer queue only too. We depend
078    // on this in below.
079    final int writerThreadsCount = 1;
080    this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount,
081      capacity, null, 100/* Tolerate ioerrors for 100ms */);
082    this.bc.waitForCacheInitialization(10000);
083    assertEquals(writerThreadsCount, bc.writerThreads.length);
084    assertEquals(writerThreadsCount, bc.writerQueues.size());
085    // Get reference to our single WriterThread instance.
086    this.wt = bc.writerThreads[0];
087    this.q = bc.writerQueues.get(0);
088
089    wt.disableWriter();
090    this.plainKey = new BlockCacheKey("f", 0);
091    this.plainCacheable = Mockito.mock(Cacheable.class);
092
093    assertThat(bc.ramCache.isEmpty(), is(true));
094    assertTrue(q.isEmpty());
095  }
096
097  @AfterEach
098  public void tearDown() throws Exception {
099    if (this.bc != null) this.bc.shutdown();
100  }
101
102  /**
103   * Test non-error case just works.
104   */
105  @Test
106  public void testNonErrorCase() throws IOException, InterruptedException {
107    bc.cacheBlock(this.plainKey, this.plainCacheable);
108    doDrainOfOneEntry(this.bc, this.wt, this.q);
109  }
110
111  /**
112   * Pass through a too big entry and ensure it is cleared from queues and ramCache. Manually run
113   * the WriterThread.
114   */
115  @Test
116  public void testTooBigEntry() throws InterruptedException {
117    Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
118    Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
119    this.bc.cacheBlock(this.plainKey, tooBigCacheable);
120    doDrainOfOneEntry(this.bc, this.wt, this.q);
121    assertTrue(bc.blocksByHFile.isEmpty());
122    assertTrue(bc.getBackingMap().isEmpty());
123  }
124
125  /**
126   * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then put it
127   * back and process it.
128   */
129  @SuppressWarnings("unchecked")
130  @Test
131  public void testIOE() throws IOException, InterruptedException {
132    this.bc.cacheBlock(this.plainKey, plainCacheable);
133    RAMQueueEntry rqe = q.remove();
134    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
135    Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(),
136      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
137    this.q.add(spiedRqe);
138    doDrainOfOneEntry(bc, wt, q);
139    assertTrue(bc.blocksByHFile.isEmpty());
140    assertTrue(bc.getBackingMap().isEmpty());
141    // Cache disabled when ioes w/o ever healing.
142    assertTrue(!bc.isCacheEnabled());
143  }
144
145  /**
146   * Do Cache full exception
147   */
148  @Test
149  public void testCacheFullException() throws IOException, InterruptedException {
150    this.bc.cacheBlock(this.plainKey, plainCacheable);
151    RAMQueueEntry rqe = q.remove();
152    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
153    final CacheFullException cfe = new CacheFullException(0, 0);
154    BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
155    Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(),
156      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
157    this.q.add(spiedRqe);
158    doDrainOfOneEntry(bc, wt, q);
159  }
160
161  private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
162    final BlockingQueue<RAMQueueEntry> q) throws InterruptedException {
163    List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
164    bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
165    assertTrue(q.isEmpty());
166    assertTrue(bc.ramCache.isEmpty());
167    assertEquals(0, bc.heapSize());
168  }
169}