001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.hamcrest.CoreMatchers.is; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.BlockingQueue; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 032import org.apache.hadoop.hbase.io.hfile.Cacheable; 033import org.apache.hadoop.hbase.io.hfile.HFileBlock; 034import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 035import org.apache.hadoop.hbase.testclassification.IOTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.junit.jupiter.api.AfterEach; 038import org.junit.jupiter.api.BeforeEach; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041import org.mockito.Mockito; 042 043@Tag(IOTests.TAG) 044@Tag(SmallTests.TAG) 045public class TestBucketWriterThread { 046 047 private BucketCache bc; 048 private BucketCache.WriterThread wt; 049 private BlockingQueue<RAMQueueEntry> q; 050 private Cacheable plainCacheable; 051 private BlockCacheKey plainKey; 052 053 /** A BucketCache that does not start its writer threads. */ 054 private static class MockBucketCache extends BucketCache { 055 056 public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 057 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) 058 throws IOException { 059 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 060 persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); 061 } 062 063 @Override 064 protected void startWriterThreads() { 065 // intentional noop 066 } 067 } 068 069 /** 070 * Set up variables and get BucketCache and WriterThread into state where tests can manually 071 * control the running of WriterThread and BucketCache is empty. 072 */ 073 @BeforeEach 074 public void setUp() throws Exception { 075 // Arbitrary capacity. 076 final int capacity = 16; 077 // Run with one writer thread only. Means there will be one writer queue only too. We depend 078 // on this in below. 079 final int writerThreadsCount = 1; 080 this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount, 081 capacity, null, 100/* Tolerate ioerrors for 100ms */); 082 this.bc.waitForCacheInitialization(10000); 083 assertEquals(writerThreadsCount, bc.writerThreads.length); 084 assertEquals(writerThreadsCount, bc.writerQueues.size()); 085 // Get reference to our single WriterThread instance. 086 this.wt = bc.writerThreads[0]; 087 this.q = bc.writerQueues.get(0); 088 089 wt.disableWriter(); 090 this.plainKey = new BlockCacheKey("f", 0); 091 this.plainCacheable = Mockito.mock(Cacheable.class); 092 093 assertThat(bc.ramCache.isEmpty(), is(true)); 094 assertTrue(q.isEmpty()); 095 } 096 097 @AfterEach 098 public void tearDown() throws Exception { 099 if (this.bc != null) this.bc.shutdown(); 100 } 101 102 /** 103 * Test non-error case just works. 104 */ 105 @Test 106 public void testNonErrorCase() throws IOException, InterruptedException { 107 bc.cacheBlock(this.plainKey, this.plainCacheable); 108 doDrainOfOneEntry(this.bc, this.wt, this.q); 109 } 110 111 /** 112 * Pass through a too big entry and ensure it is cleared from queues and ramCache. Manually run 113 * the WriterThread. 114 */ 115 @Test 116 public void testTooBigEntry() throws InterruptedException { 117 Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); 118 Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); 119 this.bc.cacheBlock(this.plainKey, tooBigCacheable); 120 doDrainOfOneEntry(this.bc, this.wt, this.q); 121 assertTrue(bc.blocksByHFile.isEmpty()); 122 assertTrue(bc.getBackingMap().isEmpty()); 123 } 124 125 /** 126 * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then put it 127 * back and process it. 128 */ 129 @SuppressWarnings("unchecked") 130 @Test 131 public void testIOE() throws IOException, InterruptedException { 132 this.bc.cacheBlock(this.plainKey, plainCacheable); 133 RAMQueueEntry rqe = q.remove(); 134 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 135 Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(), 136 Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 137 this.q.add(spiedRqe); 138 doDrainOfOneEntry(bc, wt, q); 139 assertTrue(bc.blocksByHFile.isEmpty()); 140 assertTrue(bc.getBackingMap().isEmpty()); 141 // Cache disabled when ioes w/o ever healing. 142 assertTrue(!bc.isCacheEnabled()); 143 } 144 145 /** 146 * Do Cache full exception 147 */ 148 @Test 149 public void testCacheFullException() throws IOException, InterruptedException { 150 this.bc.cacheBlock(this.plainKey, plainCacheable); 151 RAMQueueEntry rqe = q.remove(); 152 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 153 final CacheFullException cfe = new CacheFullException(0, 0); 154 BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); 155 Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(), 156 Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 157 this.q.add(spiedRqe); 158 doDrainOfOneEntry(bc, wt, q); 159 } 160 161 private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, 162 final BlockingQueue<RAMQueueEntry> q) throws InterruptedException { 163 List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); 164 bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 165 assertTrue(q.isEmpty()); 166 assertTrue(bc.ramCache.isEmpty()); 167 assertEquals(0, bc.heapSize()); 168 } 169}