001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.hamcrest.CoreMatchers.is; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertThat; 023import static org.junit.Assert.assertTrue; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.BlockingQueue; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 033import org.apache.hadoop.hbase.io.hfile.Cacheable; 034import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 035import org.apache.hadoop.hbase.testclassification.IOTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.junit.After; 038import org.junit.Before; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.mockito.Mockito; 043 044@Category({IOTests.class, SmallTests.class}) 045public class TestBucketWriterThread { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestBucketWriterThread.class); 050 051 private BucketCache bc; 052 private BucketCache.WriterThread wt; 053 private BlockingQueue<RAMQueueEntry> q; 054 private Cacheable plainCacheable; 055 private BlockCacheKey plainKey; 056 057 /** A BucketCache that does not start its writer threads. */ 058 private static class MockBucketCache extends BucketCache { 059 060 public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 061 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) 062 throws IOException { 063 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 064 persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); 065 } 066 067 @Override 068 protected void startWriterThreads() { 069 // intentional noop 070 } 071 } 072 073 /** 074 * Set up variables and get BucketCache and WriterThread into state where tests can manually 075 * control the running of WriterThread and BucketCache is empty. 076 * @throws Exception 077 */ 078 @Before 079 public void setUp() throws Exception { 080 // Arbitrary capacity. 081 final int capacity = 16; 082 // Run with one writer thread only. Means there will be one writer queue only too. We depend 083 // on this in below. 084 final int writerThreadsCount = 1; 085 this.bc = new MockBucketCache("offheap", capacity, 1, new int [] {1}, writerThreadsCount, 086 capacity, null, 100/*Tolerate ioerrors for 100ms*/); 087 assertEquals(writerThreadsCount, bc.writerThreads.length); 088 assertEquals(writerThreadsCount, bc.writerQueues.size()); 089 // Get reference to our single WriterThread instance. 090 this.wt = bc.writerThreads[0]; 091 this.q = bc.writerQueues.get(0); 092 093 wt.disableWriter(); 094 this.plainKey = new BlockCacheKey("f", 0); 095 this.plainCacheable = Mockito.mock(Cacheable.class); 096 097 assertThat(bc.ramCache.isEmpty(), is(true)); 098 assertTrue(q.isEmpty()); 099 } 100 101 @After 102 public void tearDown() throws Exception { 103 if (this.bc != null) this.bc.shutdown(); 104 } 105 106 /** 107 * Test non-error case just works. 108 * @throws FileNotFoundException 109 * @throws IOException 110 * @throws InterruptedException 111 */ 112 @Test 113 public void testNonErrorCase() throws IOException, InterruptedException { 114 bc.cacheBlock(this.plainKey, this.plainCacheable); 115 doDrainOfOneEntry(this.bc, this.wt, this.q); 116 } 117 118 /** 119 * Pass through a too big entry and ensure it is cleared from queues and ramCache. 120 * Manually run the WriterThread. 121 * @throws InterruptedException 122 */ 123 @Test 124 public void testTooBigEntry() throws InterruptedException { 125 Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); 126 Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); 127 this.bc.cacheBlock(this.plainKey, tooBigCacheable); 128 doDrainOfOneEntry(this.bc, this.wt, this.q); 129 } 130 131 /** 132 * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then 133 * put it back and process it. 134 * @throws IOException 135 * @throws InterruptedException 136 */ 137 @SuppressWarnings("unchecked") 138 @Test 139 public void testIOE() throws IOException, InterruptedException { 140 this.bc.cacheBlock(this.plainKey, plainCacheable); 141 RAMQueueEntry rqe = q.remove(); 142 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 143 Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). 144 writeToCache(Mockito.any(), Mockito.any(), Mockito.any()); 145 this.q.add(spiedRqe); 146 doDrainOfOneEntry(bc, wt, q); 147 // Cache disabled when ioes w/o ever healing. 148 assertTrue(!bc.isCacheEnabled()); 149 } 150 151 /** 152 * Do Cache full exception 153 * @throws IOException 154 * @throws InterruptedException 155 */ 156 @Test 157 public void testCacheFullException() 158 throws IOException, InterruptedException { 159 this.bc.cacheBlock(this.plainKey, plainCacheable); 160 RAMQueueEntry rqe = q.remove(); 161 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 162 final CacheFullException cfe = new CacheFullException(0, 0); 163 BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); 164 Mockito.doThrow(cfe). 165 doReturn(mockedBucketEntry). 166 when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any()); 167 this.q.add(spiedRqe); 168 doDrainOfOneEntry(bc, wt, q); 169 } 170 171 private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, 172 final BlockingQueue<RAMQueueEntry> q) 173 throws InterruptedException { 174 List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); 175 wt.doDrain(rqes); 176 assertTrue(q.isEmpty()); 177 assertTrue(bc.ramCache.isEmpty()); 178 assertEquals(0, bc.heapSize()); 179 } 180}