001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.Arrays; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import java.util.stream.Collectors; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.testclassification.ClientTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.JVMClusterUtil; 037import org.apache.hadoop.io.IOUtils; 038import org.junit.After; 039import org.junit.AfterClass; 040import org.junit.Before; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050@Category({ MediumTests.class, ClientTests.class }) 051public class TestFlushFromClient { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestFlushFromClient.class); 056 057 private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class); 058 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 059 private static AsyncConnection asyncConn; 060 private static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("7") }; 061 private static final List<byte[]> ROWS = 062 Arrays.asList(Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8")); 063 private static final byte[] FAMILY_1 = Bytes.toBytes("f1"); 064 private static final byte[] FAMILY_2 = Bytes.toBytes("f2"); 065 public static final byte[][] FAMILIES = { FAMILY_1, FAMILY_2 }; 066 @Rule 067 public TestName name = new TestName(); 068 069 public TableName tableName; 070 071 @BeforeClass 072 public static void setUpBeforeClass() throws Exception { 073 TEST_UTIL.startMiniCluster(ROWS.size()); 074 asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 075 } 076 077 @AfterClass 078 public static void tearDownAfterClass() throws Exception { 079 IOUtils.cleanup(null, asyncConn); 080 TEST_UTIL.shutdownMiniCluster(); 081 } 082 083 @Before 084 public void setUp() throws Exception { 085 tableName = TableName.valueOf(name.getMethodName()); 086 try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) { 087 List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); 088 for (int i = 0; i != 20; ++i) { 089 byte[] value = Bytes.toBytes(i); 090 puts.forEach(p -> { 091 p.addColumn(FAMILY_1, value, value); 092 p.addColumn(FAMILY_2, value, value); 093 }); 094 } 095 t.put(puts); 096 } 097 assertFalse(getRegionInfo().isEmpty()); 098 assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0)); 099 } 100 101 @After 102 public void tearDown() throws Exception { 103 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 104 LOG.info("Tear down, remove table=" + htd.getTableName()); 105 TEST_UTIL.deleteTable(htd.getTableName()); 106 } 107 } 108 109 @Test 110 public void testFlushTable() throws Exception { 111 try (Admin admin = TEST_UTIL.getAdmin()) { 112 admin.flush(tableName); 113 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 114 } 115 } 116 117 @Test 118 public void testFlushTableFamily() throws Exception { 119 try (Admin admin = TEST_UTIL.getAdmin()) { 120 long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); 121 admin.flush(tableName, FAMILY_1); 122 assertFalse( 123 getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); 124 } 125 } 126 127 @Test 128 public void testAsyncFlushTable() throws Exception { 129 AsyncAdmin admin = asyncConn.getAdmin(); 130 admin.flush(tableName).get(); 131 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 132 } 133 134 @Test 135 public void testAsyncFlushTableFamily() throws Exception { 136 AsyncAdmin admin = asyncConn.getAdmin(); 137 long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); 138 admin.flush(tableName, FAMILY_1).get(); 139 assertFalse( 140 getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); 141 } 142 143 @Test 144 public void testFlushRegion() throws Exception { 145 try (Admin admin = TEST_UTIL.getAdmin()) { 146 for (HRegion r : getRegionInfo()) { 147 admin.flushRegion(r.getRegionInfo().getRegionName()); 148 TimeUnit.SECONDS.sleep(1); 149 assertEquals(0, r.getMemStoreDataSize()); 150 } 151 } 152 } 153 154 @Test 155 public void testFlushRegionFamily() throws Exception { 156 try (Admin admin = TEST_UTIL.getAdmin()) { 157 for (HRegion r : getRegionInfo()) { 158 long sizeBeforeFlush = r.getMemStoreDataSize(); 159 admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1); 160 TimeUnit.SECONDS.sleep(1); 161 assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); 162 } 163 } 164 } 165 166 @Test 167 public void testAsyncFlushRegion() throws Exception { 168 AsyncAdmin admin = asyncConn.getAdmin(); 169 for (HRegion r : getRegionInfo()) { 170 admin.flushRegion(r.getRegionInfo().getRegionName()).get(); 171 TimeUnit.SECONDS.sleep(1); 172 assertEquals(0, r.getMemStoreDataSize()); 173 } 174 } 175 176 @Test 177 public void testAsyncFlushRegionFamily() throws Exception { 178 AsyncAdmin admin = asyncConn.getAdmin(); 179 for (HRegion r : getRegionInfo()) { 180 long sizeBeforeFlush = r.getMemStoreDataSize(); 181 admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get(); 182 TimeUnit.SECONDS.sleep(1); 183 assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); 184 } 185 } 186 187 @Test 188 public void testFlushRegionServer() throws Exception { 189 try (Admin admin = TEST_UTIL.getAdmin()) { 190 for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 191 .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) { 192 admin.flushRegionServer(rs.getServerName()); 193 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 194 } 195 } 196 } 197 198 @Test 199 public void testAsyncFlushRegionServer() throws Exception { 200 AsyncAdmin admin = asyncConn.getAdmin(); 201 for (HRegionServer rs : TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 202 .map(JVMClusterUtil.RegionServerThread::getRegionServer).collect(Collectors.toList())) { 203 admin.flushRegionServer(rs.getServerName()).get(); 204 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 205 } 206 } 207 208 private List<HRegion> getRegionInfo() { 209 return TEST_UTIL.getHBaseCluster().getRegions(tableName); 210 } 211 212 private List<HRegion> getRegionInfo(HRegionServer rs) { 213 return rs.getRegions().stream() 214 .filter(v -> v.getTableDescriptor().getTableName().equals(tableName)) 215 .collect(Collectors.toList()); 216 } 217}