001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.Arrays;
025import java.util.List;
026import java.util.concurrent.TimeUnit;
027import java.util.stream.Collectors;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.regionserver.HRegion;
032import org.apache.hadoop.hbase.regionserver.HRegionServer;
033import org.apache.hadoop.hbase.testclassification.ClientTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.JVMClusterUtil;
037import org.apache.hadoop.io.IOUtils;
038import org.junit.After;
039import org.junit.AfterClass;
040import org.junit.Before;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Category({MediumTests.class, ClientTests.class})
052public class TestFlushFromClient {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestFlushFromClient.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class);
059  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
060  private static AsyncConnection asyncConn;
061  private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")};
062  private static final List<byte[]> ROWS = Arrays.asList(
063    Bytes.toBytes("1"),
064    Bytes.toBytes("4"),
065    Bytes.toBytes("8"));
066  private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
067  private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
068  public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2};
069  @Rule
070  public TestName name = new TestName();
071
072  public TableName tableName;
073
074  @BeforeClass
075  public static void setUpBeforeClass() throws Exception {
076    TEST_UTIL.startMiniCluster(ROWS.size());
077    asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
078  }
079
080  @AfterClass
081  public static void tearDownAfterClass() throws Exception {
082    IOUtils.cleanup(null, asyncConn);
083    TEST_UTIL.shutdownMiniCluster();
084  }
085
086  @Before
087  public void setUp() throws Exception {
088    tableName = TableName.valueOf(name.getMethodName());
089    try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
090      List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
091      for (int i = 0; i != 20; ++i) {
092        byte[] value = Bytes.toBytes(i);
093        puts.forEach(p -> {
094          p.addColumn(FAMILY_1, value, value);
095          p.addColumn(FAMILY_2, value, value);
096        });
097      }
098      t.put(puts);
099    }
100    assertFalse(getRegionInfo().isEmpty());
101    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0));
102  }
103
104  @After
105  public void tearDown() throws Exception {
106    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
107      LOG.info("Tear down, remove table=" + htd.getTableName());
108      TEST_UTIL.deleteTable(htd.getTableName());
109    }
110  }
111
112  @Test
113  public void testFlushTable() throws Exception {
114    try (Admin admin = TEST_UTIL.getAdmin()) {
115      admin.flush(tableName);
116      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
117    }
118  }
119
120  @Test
121  public void testFlushTableFamily() throws Exception {
122    try (Admin admin = TEST_UTIL.getAdmin()) {
123      long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
124      admin.flush(tableName, FAMILY_1);
125      assertFalse(getRegionInfo().stream().
126        anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
127    }
128  }
129
130  @Test
131  public void testAsyncFlushTable() throws Exception {
132    AsyncAdmin admin = asyncConn.getAdmin();
133    admin.flush(tableName).get();
134    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
135  }
136
137  @Test
138  public void testAsyncFlushTableFamily() throws Exception {
139    AsyncAdmin admin = asyncConn.getAdmin();
140    long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
141    admin.flush(tableName, FAMILY_1).get();
142    assertFalse(getRegionInfo().stream().
143      anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
144  }
145
146  @Test
147  public void testFlushRegion() throws Exception {
148    try (Admin admin = TEST_UTIL.getAdmin()) {
149      for (HRegion r : getRegionInfo()) {
150        admin.flushRegion(r.getRegionInfo().getRegionName());
151        TimeUnit.SECONDS.sleep(1);
152        assertEquals(0, r.getMemStoreDataSize());
153      }
154    }
155  }
156
157  @Test
158  public void testFlushRegionFamily() throws Exception {
159    try (Admin admin = TEST_UTIL.getAdmin()) {
160      for (HRegion r : getRegionInfo()) {
161        long sizeBeforeFlush = r.getMemStoreDataSize();
162        admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
163        TimeUnit.SECONDS.sleep(1);
164        assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
165      }
166    }
167  }
168
169  @Test
170  public void testAsyncFlushRegion() throws Exception {
171    AsyncAdmin admin = asyncConn.getAdmin();
172    for (HRegion r : getRegionInfo()) {
173      admin.flushRegion(r.getRegionInfo().getRegionName()).get();
174      TimeUnit.SECONDS.sleep(1);
175      assertEquals(0, r.getMemStoreDataSize());
176    }
177  }
178
179  @Test
180  public void testAsyncFlushRegionFamily() throws Exception {
181    AsyncAdmin admin = asyncConn.getAdmin();
182    for (HRegion r : getRegionInfo()) {
183      long sizeBeforeFlush = r.getMemStoreDataSize();
184      admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
185      TimeUnit.SECONDS.sleep(1);
186      assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
187    }
188  }
189
190  @Test
191  public void testFlushRegionServer() throws Exception {
192    try (Admin admin = TEST_UTIL.getAdmin()) {
193      for (HRegionServer rs : TEST_UTIL.getHBaseCluster()
194            .getLiveRegionServerThreads()
195            .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
196            .collect(Collectors.toList())) {
197        admin.flushRegionServer(rs.getServerName());
198        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
199      }
200    }
201  }
202
203  @Test
204  public void testAsyncFlushRegionServer() throws Exception {
205    AsyncAdmin admin = asyncConn.getAdmin();
206    for (HRegionServer rs : TEST_UTIL.getHBaseCluster()
207      .getLiveRegionServerThreads()
208      .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
209      .collect(Collectors.toList())) {
210      admin.flushRegionServer(rs.getServerName()).get();
211      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
212    }
213  }
214
215  private List<HRegion> getRegionInfo() {
216    return TEST_UTIL.getHBaseCluster().getRegions(tableName);
217  }
218
219  private List<HRegion> getRegionInfo(HRegionServer rs) {
220    return rs.getRegions().stream()
221      .filter(v -> v.getTableDescriptor().getTableName().equals(tableName))
222      .collect(Collectors.toList());
223  }
224}