001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.hamcrest.CoreMatchers.is; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertThat; 023import static org.junit.Assert.assertTrue; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.BlockingQueue; 030import java.util.concurrent.atomic.LongAdder; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 034import org.apache.hadoop.hbase.io.hfile.Cacheable; 035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; 036import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 037import org.apache.hadoop.hbase.testclassification.IOTests; 038import org.apache.hadoop.hbase.testclassification.SmallTests; 039import org.junit.After; 040import org.junit.Before; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.mockito.Mockito; 045 046@Category({IOTests.class, SmallTests.class}) 047public class TestBucketWriterThread { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestBucketWriterThread.class); 052 053 private BucketCache bc; 054 private BucketCache.WriterThread wt; 055 private BlockingQueue<RAMQueueEntry> q; 056 private Cacheable plainCacheable; 057 private BlockCacheKey plainKey; 058 059 /** A BucketCache that does not start its writer threads. */ 060 private static class MockBucketCache extends BucketCache { 061 062 public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 063 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) 064 throws FileNotFoundException, IOException { 065 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 066 persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); 067 } 068 069 @Override 070 protected void startWriterThreads() { 071 // intentional noop 072 } 073 } 074 075 /** 076 * Set up variables and get BucketCache and WriterThread into state where tests can manually 077 * control the running of WriterThread and BucketCache is empty. 078 * @throws Exception 079 */ 080 @Before 081 public void setUp() throws Exception { 082 // Arbitrary capacity. 083 final int capacity = 16; 084 // Run with one writer thread only. Means there will be one writer queue only too. We depend 085 // on this in below. 086 final int writerThreadsCount = 1; 087 this.bc = new MockBucketCache("offheap", capacity, 1, new int [] {1}, writerThreadsCount, 088 capacity, null, 100/*Tolerate ioerrors for 100ms*/); 089 assertEquals(writerThreadsCount, bc.writerThreads.length); 090 assertEquals(writerThreadsCount, bc.writerQueues.size()); 091 // Get reference to our single WriterThread instance. 092 this.wt = bc.writerThreads[0]; 093 this.q = bc.writerQueues.get(0); 094 095 wt.disableWriter(); 096 this.plainKey = new BlockCacheKey("f", 0); 097 this.plainCacheable = Mockito.mock(Cacheable.class); 098 099 assertThat(bc.ramCache.isEmpty(), is(true)); 100 assertTrue(q.isEmpty()); 101 } 102 103 @After 104 public void tearDown() throws Exception { 105 if (this.bc != null) this.bc.shutdown(); 106 } 107 108 /** 109 * Test non-error case just works. 110 * @throws FileNotFoundException 111 * @throws IOException 112 * @throws InterruptedException 113 */ 114 @Test 115 public void testNonErrorCase() throws IOException, InterruptedException { 116 bc.cacheBlock(this.plainKey, this.plainCacheable); 117 doDrainOfOneEntry(this.bc, this.wt, this.q); 118 } 119 120 /** 121 * Pass through a too big entry and ensure it is cleared from queues and ramCache. 122 * Manually run the WriterThread. 123 * @throws InterruptedException 124 */ 125 @Test 126 public void testTooBigEntry() throws InterruptedException { 127 Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); 128 Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); 129 this.bc.cacheBlock(this.plainKey, tooBigCacheable); 130 doDrainOfOneEntry(this.bc, this.wt, this.q); 131 } 132 133 /** 134 * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then 135 * put it back and process it. 136 * @throws IOException 137 * @throws InterruptedException 138 */ 139 @SuppressWarnings("unchecked") 140 @Test 141 public void testIOE() throws IOException, InterruptedException { 142 this.bc.cacheBlock(this.plainKey, plainCacheable); 143 RAMQueueEntry rqe = q.remove(); 144 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 145 Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). 146 writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), 147 (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any()); 148 this.q.add(spiedRqe); 149 doDrainOfOneEntry(bc, wt, q); 150 // Cache disabled when ioes w/o ever healing. 151 assertTrue(!bc.isCacheEnabled()); 152 } 153 154 /** 155 * Do Cache full exception 156 * @throws IOException 157 * @throws InterruptedException 158 */ 159 @Test 160 public void testCacheFullException() 161 throws IOException, InterruptedException { 162 this.bc.cacheBlock(this.plainKey, plainCacheable); 163 RAMQueueEntry rqe = q.remove(); 164 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 165 final CacheFullException cfe = new CacheFullException(0, 0); 166 BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); 167 Mockito.doThrow(cfe). 168 doReturn(mockedBucketEntry). 169 when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), 170 (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any()); 171 this.q.add(spiedRqe); 172 doDrainOfOneEntry(bc, wt, q); 173 } 174 175 private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, 176 final BlockingQueue<RAMQueueEntry> q) 177 throws InterruptedException { 178 List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); 179 wt.doDrain(rqes); 180 assertTrue(q.isEmpty()); 181 assertTrue(bc.ramCache.isEmpty()); 182 assertEquals(0, bc.heapSize()); 183 } 184}