001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.ArgumentMatchers.anyBoolean;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.Collections;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.concurrent.atomic.AtomicLong;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.Waiter;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HStore;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.junit.jupiter.api.AfterEach;
043import org.junit.jupiter.api.BeforeEach;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046
047@Tag(MasterTests.TAG)
048@Tag(MediumTests.TAG)
049public class TestMasterRegionFlush {
050
051  private Configuration conf;
052
053  private HRegion region;
054
055  private MasterRegionFlusherAndCompactor flusher;
056
057  private AtomicInteger flushCalled;
058
059  private AtomicLong memstoreHeapSize;
060
061  private AtomicLong memstoreOffHeapSize;
062
063  @BeforeEach
064  public void setUp() throws IOException {
065    conf = HBaseConfiguration.create();
066    region = mock(HRegion.class);
067    HStore store = mock(HStore.class);
068    when(store.getStorefilesCount()).thenReturn(1);
069    when(region.getStores()).thenReturn(Collections.singletonList(store));
070    when(region.getRegionInfo())
071      .thenReturn(RegionInfoBuilder.newBuilder(TableName.valueOf("hbase:local")).build());
072    flushCalled = new AtomicInteger(0);
073    memstoreHeapSize = new AtomicLong(0);
074    memstoreOffHeapSize = new AtomicLong(0);
075    when(region.getMemStoreHeapSize()).thenAnswer(invocation -> memstoreHeapSize.get());
076    when(region.getMemStoreOffHeapSize()).thenAnswer(invocation -> memstoreOffHeapSize.get());
077    when(region.flush(anyBoolean())).thenAnswer(invocation -> {
078      assertTrue(invocation.getArgument(0, Boolean.class));
079      memstoreHeapSize.set(0);
080      memstoreOffHeapSize.set(0);
081      flushCalled.incrementAndGet();
082      // Return a mock FlushResult since FlushResultImpl constructor is package-private
083      HRegion.FlushResult mockResult = mock(HRegion.FlushResult.class);
084      when(mockResult.getResult())
085        .thenReturn(HRegion.FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED);
086      when(mockResult.isFlushSucceeded()).thenReturn(true);
087      when(mockResult.isCompactionNeeded()).thenReturn(false);
088      return mockResult;
089    });
090  }
091
092  @AfterEach
093  public void tearDown() {
094    if (flusher != null) {
095      flusher.close();
096      flusher = null;
097    }
098  }
099
100  private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
101    flusher = new MasterRegionFlusherAndCompactor(conf, new Abortable() {
102
103      @Override
104      public boolean isAborted() {
105        return false;
106      }
107
108      @Override
109      public void abort(String why, Throwable e) {
110      }
111    }, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), "");
112  }
113
114  @Test
115  public void testTriggerFlushBySize() throws IOException, InterruptedException {
116    initFlusher(1024 * 1024, 1_000_000, TimeUnit.MINUTES.toMillis(15));
117    memstoreHeapSize.set(1000 * 1024);
118    flusher.onUpdate();
119    Thread.sleep(1000);
120    assertEquals(0, flushCalled.get());
121    memstoreOffHeapSize.set(1000 * 1024);
122    flusher.onUpdate();
123    Waiter.waitFor(conf, 2000, () -> flushCalled.get() == 1);
124  }
125
126  private void assertTriggerFlushByChanges(int changes) throws InterruptedException {
127    int currentFlushCalled = flushCalled.get();
128    for (int i = 0; i < changes; i++) {
129      flusher.onUpdate();
130    }
131    Thread.sleep(1000);
132    assertEquals(currentFlushCalled, flushCalled.get());
133    flusher.onUpdate();
134    Waiter.waitFor(conf, 5000, () -> flushCalled.get() == currentFlushCalled + 1);
135  }
136
137  @Test
138  public void testTriggerFlushByChanges() throws InterruptedException {
139    initFlusher(128 * 1024 * 1024, 10, TimeUnit.MINUTES.toMillis(15));
140    assertTriggerFlushByChanges(10);
141    assertTriggerFlushByChanges(10);
142  }
143
144  @Test
145  public void testPeriodicalFlush() throws InterruptedException {
146    initFlusher(128 * 1024 * 1024, 1_000_000, TimeUnit.SECONDS.toMillis(1));
147    assertEquals(0, flushCalled.get());
148    Thread.sleep(1500);
149    assertEquals(1, flushCalled.get());
150    Thread.sleep(1000);
151    assertEquals(2, flushCalled.get());
152  }
153}