001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertThrows;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.stream.Collectors;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.regionserver.HRegion;
036import org.apache.hadoop.hbase.regionserver.HRegionServer;
037import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.FutureUtils;
042import org.apache.hadoop.hbase.util.JVMClusterUtil;
043import org.junit.jupiter.api.AfterAll;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.junit.jupiter.api.TestInfo;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
054
055@Tag(MediumTests.TAG)
056@Tag(ClientTests.TAG)
057public class TestFlushFromClient {
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class);
060  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
061  private static AsyncConnection asyncConn;
062  private static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("7") };
063  private static final List<byte[]> ROWS =
064    Arrays.asList(Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8"));
065  private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
066  private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
067  public static final byte[][] FAMILIES = { FAMILY_1, FAMILY_2 };
068
069  public TableName tableName;
070
071  @BeforeAll
072  public static void setUpBeforeClass() throws Exception {
073    TEST_UTIL.startMiniCluster(ROWS.size());
074    asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
075  }
076
077  @AfterAll
078  public static void tearDownAfterClass() throws Exception {
079    Closeables.close(asyncConn, true);
080    TEST_UTIL.shutdownMiniCluster();
081  }
082
083  @BeforeEach
084  public void setUp(TestInfo testInfo) throws Exception {
085    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
086    try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
087      List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
088      for (int i = 0; i != 20; ++i) {
089        byte[] value = Bytes.toBytes(i);
090        puts.forEach(p -> {
091          p.addColumn(FAMILY_1, value, value);
092          p.addColumn(FAMILY_2, value, value);
093        });
094      }
095      t.put(puts);
096    }
097    assertFalse(getRegionInfo().isEmpty());
098    assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0));
099  }
100
101  @AfterEach
102  public void tearDown() throws Exception {
103    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
104      LOG.info("Tear down, remove table=" + htd.getTableName());
105      TEST_UTIL.deleteTable(htd.getTableName());
106    }
107  }
108
109  @Test
110  public void testFlushTable() throws Exception {
111    try (Admin admin = TEST_UTIL.getAdmin()) {
112      admin.flush(tableName);
113      assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
114    }
115  }
116
117  @Test
118  public void testFlushTableFamily() throws Exception {
119    try (Admin admin = TEST_UTIL.getAdmin()) {
120      long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
121      admin.flush(tableName, FAMILY_1);
122      assertFalse(
123        getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
124    }
125  }
126
127  @Test
128  public void testAsyncFlushTable() throws Exception {
129    AsyncAdmin admin = asyncConn.getAdmin();
130    admin.flush(tableName).get();
131    assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
132  }
133
134  @Test
135  public void testAsyncFlushTableFamily() throws Exception {
136    AsyncAdmin admin = asyncConn.getAdmin();
137    long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
138    admin.flush(tableName, FAMILY_1).get();
139    assertFalse(
140      getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
141  }
142
143  @Test
144  public void testFlushRegion() throws Exception {
145    try (Admin admin = TEST_UTIL.getAdmin()) {
146      for (HRegion r : getRegionInfo()) {
147        admin.flushRegion(r.getRegionInfo().getRegionName());
148        TimeUnit.SECONDS.sleep(1);
149        assertEquals(0, r.getMemStoreDataSize());
150      }
151    }
152  }
153
154  @Test
155  public void testFlushRegionFamily() throws Exception {
156    try (Admin admin = TEST_UTIL.getAdmin()) {
157      for (HRegion r : getRegionInfo()) {
158        long sizeBeforeFlush = r.getMemStoreDataSize();
159        admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
160        TimeUnit.SECONDS.sleep(1);
161        assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
162      }
163    }
164  }
165
166  @Test
167  public void testAsyncFlushRegion() throws Exception {
168    AsyncAdmin admin = asyncConn.getAdmin();
169    for (HRegion r : getRegionInfo()) {
170      admin.flushRegion(r.getRegionInfo().getRegionName()).get();
171      TimeUnit.SECONDS.sleep(1);
172      assertEquals(0, r.getMemStoreDataSize());
173    }
174  }
175
176  @Test
177  public void testAsyncFlushRegionFamily() throws Exception {
178    AsyncAdmin admin = asyncConn.getAdmin();
179    for (HRegion r : getRegionInfo()) {
180      long sizeBeforeFlush = r.getMemStoreDataSize();
181      admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
182      TimeUnit.SECONDS.sleep(1);
183      assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
184    }
185  }
186
187  @Test
188  public void testAsyncFlushTableWithNonExistingFamilies() throws IOException {
189    AsyncAdmin admin = asyncConn.getAdmin();
190    List<byte[]> families = new ArrayList<>();
191    families.add(FAMILY_1);
192    families.add(FAMILY_2);
193    families.add(Bytes.toBytes("non_family01"));
194    families.add(Bytes.toBytes("non_family02"));
195    CompletableFuture<Void> future = CompletableFuture.allOf(admin.flush(tableName, families));
196    assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
197  }
198
199  @Test
200  public void testAsyncFlushRegionWithNonExistingFamily() throws IOException {
201    AsyncAdmin admin = asyncConn.getAdmin();
202    List<HRegion> regions = getRegionInfo();
203    assertNotNull(regions);
204    assertTrue(regions.size() > 0);
205    HRegion region = regions.get(0);
206    CompletableFuture<Void> future = CompletableFuture.allOf(admin
207      .flushRegion(region.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes("non_family")));
208    assertThrows(NoSuchColumnFamilyException.class, () -> FutureUtils.get(future));
209  }
210
211  @Test
212  public void testFlushRegionServer() throws Exception {
213    try (Admin admin = TEST_UTIL.getAdmin()) {
214      for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
215        .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) {
216        admin.flushRegionServer(rs.getServerName());
217        assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
218      }
219    }
220  }
221
222  @Test
223  public void testAsyncFlushRegionServer() throws Exception {
224    AsyncAdmin admin = asyncConn.getAdmin();
225    for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
226      .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) {
227      admin.flushRegionServer(rs.getServerName()).get();
228      assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
229    }
230  }
231
232  private List<HRegion> getRegionInfo() {
233    return TEST_UTIL.getHBaseCluster().getRegions(tableName);
234  }
235
236  private List<HRegion> getRegionInfo(HRegionServer rs) {
237    return rs.getRegions().stream()
238      .filter(v -> v.getTableDescriptor().getTableName().equals(tableName))
239      .collect(Collectors.toList());
240  }
241}