001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.Arrays; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import java.util.stream.Collectors; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.testclassification.ClientTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.JVMClusterUtil; 037import org.apache.hadoop.io.IOUtils; 038import org.junit.After; 039import org.junit.AfterClass; 040import org.junit.Before; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Category({MediumTests.class, ClientTests.class}) 052public class TestFlushFromClient { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestFlushFromClient.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class); 059 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 060 private static AsyncConnection asyncConn; 061 private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")}; 062 private static final List<byte[]> ROWS = Arrays.asList( 063 Bytes.toBytes("1"), 064 Bytes.toBytes("4"), 065 Bytes.toBytes("8")); 066 private static final byte[] FAMILY_1 = Bytes.toBytes("f1"); 067 private static final byte[] FAMILY_2 = Bytes.toBytes("f2"); 068 public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2}; 069 @Rule 070 public TestName name = new TestName(); 071 072 public TableName tableName; 073 074 @BeforeClass 075 public static void setUpBeforeClass() throws Exception { 076 TEST_UTIL.startMiniCluster(ROWS.size()); 077 asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 078 } 079 080 @AfterClass 081 public static void tearDownAfterClass() throws Exception { 082 IOUtils.cleanup(null, asyncConn); 083 TEST_UTIL.shutdownMiniCluster(); 084 } 085 086 @Before 087 public void setUp() throws Exception { 088 tableName = TableName.valueOf(name.getMethodName()); 089 try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) { 090 List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); 091 for (int i = 0; i != 20; ++i) { 092 byte[] value = Bytes.toBytes(i); 093 puts.forEach(p -> { 094 p.addColumn(FAMILY_1, value, value); 095 p.addColumn(FAMILY_2, value, value); 096 }); 097 } 098 t.put(puts); 099 } 100 assertFalse(getRegionInfo().isEmpty()); 101 assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0)); 102 } 103 104 @After 105 public void tearDown() throws Exception { 106 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 107 LOG.info("Tear down, remove table=" + htd.getTableName()); 108 TEST_UTIL.deleteTable(htd.getTableName()); 109 } 110 } 111 112 @Test 113 public void testFlushTable() throws Exception { 114 try (Admin admin = TEST_UTIL.getAdmin()) { 115 admin.flush(tableName); 116 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 117 } 118 } 119 120 @Test 121 public void testFlushTableFamily() throws Exception { 122 try (Admin admin = TEST_UTIL.getAdmin()) { 123 long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); 124 admin.flush(tableName, FAMILY_1); 125 assertFalse(getRegionInfo().stream(). 126 anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); 127 } 128 } 129 130 @Test 131 public void testAsyncFlushTable() throws Exception { 132 AsyncAdmin admin = asyncConn.getAdmin(); 133 admin.flush(tableName).get(); 134 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 135 } 136 137 @Test 138 public void testAsyncFlushTableFamily() throws Exception { 139 AsyncAdmin admin = asyncConn.getAdmin(); 140 long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); 141 admin.flush(tableName, FAMILY_1).get(); 142 assertFalse(getRegionInfo().stream(). 143 anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); 144 } 145 146 @Test 147 public void testFlushRegion() throws Exception { 148 try (Admin admin = TEST_UTIL.getAdmin()) { 149 for (HRegion r : getRegionInfo()) { 150 admin.flushRegion(r.getRegionInfo().getRegionName()); 151 TimeUnit.SECONDS.sleep(1); 152 assertEquals(0, r.getMemStoreDataSize()); 153 } 154 } 155 } 156 157 @Test 158 public void testFlushRegionFamily() throws Exception { 159 try (Admin admin = TEST_UTIL.getAdmin()) { 160 for (HRegion r : getRegionInfo()) { 161 long sizeBeforeFlush = r.getMemStoreDataSize(); 162 admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1); 163 TimeUnit.SECONDS.sleep(1); 164 assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); 165 } 166 } 167 } 168 169 @Test 170 public void testAsyncFlushRegion() throws Exception { 171 AsyncAdmin admin = asyncConn.getAdmin(); 172 for (HRegion r : getRegionInfo()) { 173 admin.flushRegion(r.getRegionInfo().getRegionName()).get(); 174 TimeUnit.SECONDS.sleep(1); 175 assertEquals(0, r.getMemStoreDataSize()); 176 } 177 } 178 179 @Test 180 public void testAsyncFlushRegionFamily() throws Exception { 181 AsyncAdmin admin = asyncConn.getAdmin(); 182 for (HRegion r : getRegionInfo()) { 183 long sizeBeforeFlush = r.getMemStoreDataSize(); 184 admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get(); 185 TimeUnit.SECONDS.sleep(1); 186 assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); 187 } 188 } 189 190 @Test 191 public void testFlushRegionServer() throws Exception { 192 try (Admin admin = TEST_UTIL.getAdmin()) { 193 for (HRegionServer rs : TEST_UTIL.getHBaseCluster() 194 .getLiveRegionServerThreads() 195 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 196 .collect(Collectors.toList())) { 197 admin.flushRegionServer(rs.getServerName()); 198 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 199 } 200 } 201 } 202 203 @Test 204 public void testAsyncFlushRegionServer() throws Exception { 205 AsyncAdmin admin = asyncConn.getAdmin(); 206 for (HRegionServer rs : TEST_UTIL.getHBaseCluster() 207 .getLiveRegionServerThreads() 208 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 209 .collect(Collectors.toList())) { 210 admin.flushRegionServer(rs.getServerName()).get(); 211 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 212 } 213 } 214 215 private List<HRegion> getRegionInfo() { 216 return TEST_UTIL.getHBaseCluster().getRegions(tableName); 217 } 218 219 private List<HRegion> getRegionInfo(HRegionServer rs) { 220 return rs.getRegions().stream() 221 .filter(v -> v.getTableDescriptor().getTableName().equals(tableName)) 222 .collect(Collectors.toList()); 223 } 224}