001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.Arrays;
025import java.util.List;
026import java.util.concurrent.TimeUnit;
027import java.util.stream.Collectors;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.testclassification.ClientTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.JVMClusterUtil;
037import org.junit.After;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Rule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.junit.rules.TestName;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050
051@Category({ MediumTests.class, ClientTests.class })
052public class TestFlushFromClient {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestFlushFromClient.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class);
059  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060  private static AsyncConnection asyncConn;
061  private static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("7") };
062  private static final List<byte[]> ROWS =
063    Arrays.asList(Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8"));
064  private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
065  private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
066  public static final byte[][] FAMILIES = { FAMILY_1, FAMILY_2 };
067  @Rule
068  public TestName name = new TestName();
069
070  public TableName tableName;
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    TEST_UTIL.startMiniCluster(ROWS.size());
075    asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
076  }
077
078  @AfterClass
079  public static void tearDownAfterClass() throws Exception {
080    Closeables.close(asyncConn, true);
081    TEST_UTIL.shutdownMiniCluster();
082  }
083
084  @Before
085  public void setUp() throws Exception {
086    tableName = TableName.valueOf(name.getMethodName());
087    try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
088      List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
089      for (int i = 0; i != 20; ++i) {
090        byte[] value = Bytes.toBytes(i);
091        puts.forEach(p -> {
092          p.addColumn(FAMILY_1, value, value);
093          p.addColumn(FAMILY_2, value, value);
094        });
095      }
096      t.put(puts);
097    }
098    assertFalse(getRegionInfo().isEmpty());
099    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0));
100  }
101
102  @After
103  public void tearDown() throws Exception {
104    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
105      LOG.info("Tear down, remove table=" + htd.getTableName());
106      TEST_UTIL.deleteTable(htd.getTableName());
107    }
108  }
109
110  @Test
111  public void testFlushTable() throws Exception {
112    try (Admin admin = TEST_UTIL.getAdmin()) {
113      admin.flush(tableName);
114      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
115    }
116  }
117
118  @Test
119  public void testFlushTableFamily() throws Exception {
120    try (Admin admin = TEST_UTIL.getAdmin()) {
121      long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
122      admin.flush(tableName, FAMILY_1);
123      assertFalse(
124        getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
125    }
126  }
127
128  @Test
129  public void testAsyncFlushTable() throws Exception {
130    AsyncAdmin admin = asyncConn.getAdmin();
131    admin.flush(tableName).get();
132    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
133  }
134
135  @Test
136  public void testAsyncFlushTableFamily() throws Exception {
137    AsyncAdmin admin = asyncConn.getAdmin();
138    long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
139    admin.flush(tableName, FAMILY_1).get();
140    assertFalse(
141      getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
142  }
143
144  @Test
145  public void testFlushRegion() throws Exception {
146    try (Admin admin = TEST_UTIL.getAdmin()) {
147      for (HRegion r : getRegionInfo()) {
148        admin.flushRegion(r.getRegionInfo().getRegionName());
149        TimeUnit.SECONDS.sleep(1);
150        assertEquals(0, r.getMemStoreDataSize());
151      }
152    }
153  }
154
155  @Test
156  public void testFlushRegionFamily() throws Exception {
157    try (Admin admin = TEST_UTIL.getAdmin()) {
158      for (HRegion r : getRegionInfo()) {
159        long sizeBeforeFlush = r.getMemStoreDataSize();
160        admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
161        TimeUnit.SECONDS.sleep(1);
162        assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
163      }
164    }
165  }
166
167  @Test
168  public void testAsyncFlushRegion() throws Exception {
169    AsyncAdmin admin = asyncConn.getAdmin();
170    for (HRegion r : getRegionInfo()) {
171      admin.flushRegion(r.getRegionInfo().getRegionName()).get();
172      TimeUnit.SECONDS.sleep(1);
173      assertEquals(0, r.getMemStoreDataSize());
174    }
175  }
176
177  @Test
178  public void testAsyncFlushRegionFamily() throws Exception {
179    AsyncAdmin admin = asyncConn.getAdmin();
180    for (HRegion r : getRegionInfo()) {
181      long sizeBeforeFlush = r.getMemStoreDataSize();
182      admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
183      TimeUnit.SECONDS.sleep(1);
184      assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
185    }
186  }
187
188  @Test
189  public void testFlushRegionServer() throws Exception {
190    try (Admin admin = TEST_UTIL.getAdmin()) {
191      for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
192        .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) {
193        admin.flushRegionServer(rs.getServerName());
194        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
195      }
196    }
197  }
198
199  @Test
200  public void testAsyncFlushRegionServer() throws Exception {
201    AsyncAdmin admin = asyncConn.getAdmin();
202    for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
203      .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) {
204      admin.flushRegionServer(rs.getServerName()).get();
205      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
206    }
207  }
208
209  private List<HRegion> getRegionInfo() {
210    return TEST_UTIL.getHBaseCluster().getRegions(tableName);
211  }
212
213  private List<HRegion> getRegionInfo(HRegionServer rs) {
214    return rs.getRegions().stream()
215      .filter(v -> v.getTableDescriptor().getTableName().equals(tableName))
216      .collect(Collectors.toList());
217  }
218}