001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.hamcrest.CoreMatchers.is; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.BlockingQueue; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 033import org.apache.hadoop.hbase.io.hfile.Cacheable; 034import org.apache.hadoop.hbase.io.hfile.HFileBlock; 035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 036import org.apache.hadoop.hbase.testclassification.IOTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.mockito.Mockito; 044 045@Category({ IOTests.class, SmallTests.class }) 046public class TestBucketWriterThread { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestBucketWriterThread.class); 051 052 private BucketCache bc; 053 private BucketCache.WriterThread wt; 054 private BlockingQueue<RAMQueueEntry> q; 055 private Cacheable plainCacheable; 056 private BlockCacheKey plainKey; 057 058 /** A BucketCache that does not start its writer threads. */ 059 private static class MockBucketCache extends BucketCache { 060 061 public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 062 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) 063 throws IOException { 064 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 065 persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); 066 } 067 068 @Override 069 protected void startWriterThreads() { 070 // intentional noop 071 } 072 } 073 074 /** 075 * Set up variables and get BucketCache and WriterThread into state where tests can manually 076 * control the running of WriterThread and BucketCache is empty. 077 */ 078 @Before 079 public void setUp() throws Exception { 080 // Arbitrary capacity. 081 final int capacity = 16; 082 // Run with one writer thread only. Means there will be one writer queue only too. We depend 083 // on this in below. 084 final int writerThreadsCount = 1; 085 this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount, 086 capacity, null, 100/* Tolerate ioerrors for 100ms */); 087 assertEquals(writerThreadsCount, bc.writerThreads.length); 088 assertEquals(writerThreadsCount, bc.writerQueues.size()); 089 // Get reference to our single WriterThread instance. 090 this.wt = bc.writerThreads[0]; 091 this.q = bc.writerQueues.get(0); 092 093 wt.disableWriter(); 094 this.plainKey = new BlockCacheKey("f", 0); 095 this.plainCacheable = Mockito.mock(Cacheable.class); 096 097 assertThat(bc.ramCache.isEmpty(), is(true)); 098 assertTrue(q.isEmpty()); 099 } 100 101 @After 102 public void tearDown() throws Exception { 103 if (this.bc != null) this.bc.shutdown(); 104 } 105 106 /** 107 * Test non-error case just works. nnn 108 */ 109 @Test 110 public void testNonErrorCase() throws IOException, InterruptedException { 111 bc.cacheBlock(this.plainKey, this.plainCacheable); 112 doDrainOfOneEntry(this.bc, this.wt, this.q); 113 } 114 115 /** 116 * Pass through a too big entry and ensure it is cleared from queues and ramCache. Manually run 117 * the WriterThread. n 118 */ 119 @Test 120 public void testTooBigEntry() throws InterruptedException { 121 Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); 122 Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); 123 this.bc.cacheBlock(this.plainKey, tooBigCacheable); 124 doDrainOfOneEntry(this.bc, this.wt, this.q); 125 } 126 127 /** 128 * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then put it 129 * back and process it. nn 130 */ 131 @SuppressWarnings("unchecked") 132 @Test 133 public void testIOE() throws IOException, InterruptedException { 134 this.bc.cacheBlock(this.plainKey, plainCacheable); 135 RAMQueueEntry rqe = q.remove(); 136 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 137 Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(), 138 Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 139 this.q.add(spiedRqe); 140 doDrainOfOneEntry(bc, wt, q); 141 // Cache disabled when ioes w/o ever healing. 142 assertTrue(!bc.isCacheEnabled()); 143 } 144 145 /** 146 * Do Cache full exception nn 147 */ 148 @Test 149 public void testCacheFullException() throws IOException, InterruptedException { 150 this.bc.cacheBlock(this.plainKey, plainCacheable); 151 RAMQueueEntry rqe = q.remove(); 152 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 153 final CacheFullException cfe = new CacheFullException(0, 0); 154 BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); 155 Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(), 156 Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 157 this.q.add(spiedRqe); 158 doDrainOfOneEntry(bc, wt, q); 159 } 160 161 private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, 162 final BlockingQueue<RAMQueueEntry> q) throws InterruptedException { 163 List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); 164 bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 165 assertTrue(q.isEmpty()); 166 assertTrue(bc.ramCache.isEmpty()); 167 assertEquals(0, bc.heapSize()); 168 } 169}