001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.Mockito.doReturn;
024import static org.mockito.Mockito.mock;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.RegionInfoBuilder;
030import org.apache.hadoop.hbase.testclassification.RegionServerTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.apache.hadoop.hbase.util.Threads;
033import org.junit.jupiter.api.BeforeEach;
034import org.junit.jupiter.api.Tag;
035import org.junit.jupiter.api.Test;
036import org.junit.jupiter.api.TestInfo;
037
038@Tag(RegionServerTests.TAG)
039@Tag(SmallTests.TAG)
040public class TestMemStoreFlusher {
041
042  private String name;
043
044  public MemStoreFlusher msf;
045
046  @BeforeEach
047  public void setUp(TestInfo testInfo) throws Exception {
048    this.name = testInfo.getTestMethod().get().getName();
049    Configuration conf = new Configuration();
050    conf.set("hbase.hstore.flusher.count", "0");
051    msf = new MemStoreFlusher(conf, null);
052  }
053
054  @Test
055  public void testReplaceDelayedFlushEntry() {
056    RegionInfo hri =
057      RegionInfoBuilder.newBuilder(TableName.valueOf(name)).setRegionId(1).setReplicaId(0).build();
058    HRegion r = mock(HRegion.class);
059    doReturn(hri).when(r).getRegionInfo();
060
061    // put a delayed task with 30s delay
062    msf.requestDelayedFlush(r, 30000);
063    assertEquals(1, msf.getFlushQueueSize());
064    assertTrue(msf.regionsInQueue.get(r).isDelay());
065
066    // put a non-delayed task, then the delayed one should be replaced
067    assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY));
068    assertEquals(1, msf.getFlushQueueSize());
069    assertFalse(msf.regionsInQueue.get(r).isDelay());
070  }
071
072  @Test
073  public void testNotReplaceDelayedFlushEntryWhichExpired() {
074    RegionInfo hri =
075      RegionInfoBuilder.newBuilder(TableName.valueOf(name)).setRegionId(1).setReplicaId(0).build();
076    HRegion r = mock(HRegion.class);
077    doReturn(hri).when(r).getRegionInfo();
078
079    // put a delayed task with 100ms delay
080    msf.requestDelayedFlush(r, 100);
081    assertEquals(1, msf.getFlushQueueSize());
082    assertTrue(msf.regionsInQueue.get(r).isDelay());
083
084    Threads.sleep(200);
085
086    // put a non-delayed task, and the delayed one is expired, so it should not be replaced
087    assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY));
088    assertEquals(1, msf.getFlushQueueSize());
089    assertTrue(msf.regionsInQueue.get(r).isDelay());
090  }
091
092  @Test
093  public void testChangeFlusherCount() {
094    Configuration conf = new Configuration();
095    conf.set("hbase.hstore.flusher.count", "0");
096    HRegionServer rs = mock(HRegionServer.class);
097    doReturn(false).when(rs).isStopped();
098    doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting();
099
100    msf = new MemStoreFlusher(conf, rs);
101    msf.start(Threads.LOGGING_EXCEPTION_HANDLER);
102
103    Configuration newConf = new Configuration();
104
105    newConf.set("hbase.hstore.flusher.count", "3");
106    msf.onConfigurationChange(newConf);
107    assertEquals(3, msf.getFlusherCount());
108
109    newConf.set("hbase.hstore.flusher.count", "0");
110    msf.onConfigurationChange(newConf);
111    assertEquals(1, msf.getFlusherCount());
112  }
113}