001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.Mockito.doReturn; 024import static org.mockito.Mockito.mock; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.client.RegionInfoBuilder; 030import org.apache.hadoop.hbase.testclassification.RegionServerTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.jupiter.api.BeforeEach; 034import org.junit.jupiter.api.Tag; 035import org.junit.jupiter.api.Test; 036import org.junit.jupiter.api.TestInfo; 037 038@Tag(RegionServerTests.TAG) 039@Tag(SmallTests.TAG) 040public class TestMemStoreFlusher { 041 042 private String name; 043 044 public MemStoreFlusher msf; 045 046 @BeforeEach 047 public void setUp(TestInfo testInfo) throws Exception { 048 this.name = testInfo.getTestMethod().get().getName(); 049 Configuration conf = new Configuration(); 050 conf.set("hbase.hstore.flusher.count", "0"); 051 msf = new MemStoreFlusher(conf, null); 052 } 053 054 @Test 055 public void testReplaceDelayedFlushEntry() { 056 RegionInfo hri = 057 RegionInfoBuilder.newBuilder(TableName.valueOf(name)).setRegionId(1).setReplicaId(0).build(); 058 HRegion r = mock(HRegion.class); 059 doReturn(hri).when(r).getRegionInfo(); 060 061 // put a delayed task with 30s delay 062 msf.requestDelayedFlush(r, 30000); 063 assertEquals(1, msf.getFlushQueueSize()); 064 assertTrue(msf.regionsInQueue.get(r).isDelay()); 065 066 // put a non-delayed task, then the delayed one should be replaced 067 assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); 068 assertEquals(1, msf.getFlushQueueSize()); 069 assertFalse(msf.regionsInQueue.get(r).isDelay()); 070 } 071 072 @Test 073 public void testNotReplaceDelayedFlushEntryWhichExpired() { 074 RegionInfo hri = 075 RegionInfoBuilder.newBuilder(TableName.valueOf(name)).setRegionId(1).setReplicaId(0).build(); 076 HRegion r = mock(HRegion.class); 077 doReturn(hri).when(r).getRegionInfo(); 078 079 // put a delayed task with 100ms delay 080 msf.requestDelayedFlush(r, 100); 081 assertEquals(1, msf.getFlushQueueSize()); 082 assertTrue(msf.regionsInQueue.get(r).isDelay()); 083 084 Threads.sleep(200); 085 086 // put a non-delayed task, and the delayed one is expired, so it should not be replaced 087 assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); 088 assertEquals(1, msf.getFlushQueueSize()); 089 assertTrue(msf.regionsInQueue.get(r).isDelay()); 090 } 091 092 @Test 093 public void testChangeFlusherCount() { 094 Configuration conf = new Configuration(); 095 conf.set("hbase.hstore.flusher.count", "0"); 096 HRegionServer rs = mock(HRegionServer.class); 097 doReturn(false).when(rs).isStopped(); 098 doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting(); 099 100 msf = new MemStoreFlusher(conf, rs); 101 msf.start(Threads.LOGGING_EXCEPTION_HANDLER); 102 103 Configuration newConf = new Configuration(); 104 105 newConf.set("hbase.hstore.flusher.count", "3"); 106 msf.onConfigurationChange(newConf); 107 assertEquals(3, msf.getFlusherCount()); 108 109 newConf.set("hbase.hstore.flusher.count", "0"); 110 msf.onConfigurationChange(newConf); 111 assertEquals(1, msf.getFlusherCount()); 112 } 113}