001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.Arrays; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import java.util.stream.Collectors; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegionServer; 033import org.apache.hadoop.hbase.testclassification.ClientTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.JVMClusterUtil; 037import org.apache.hadoop.io.IOUtils; 038import org.junit.After; 039import org.junit.AfterClass; 040import org.junit.Before; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Category({MediumTests.class, ClientTests.class}) 052public class TestFlushFromClient { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestFlushFromClient.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestFlushFromClient.class); 059 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 060 private static AsyncConnection asyncConn; 061 private static final byte[][] SPLITS = new byte[][]{Bytes.toBytes("3"), Bytes.toBytes("7")}; 062 private static final List<byte[]> ROWS = Arrays.asList( 063 Bytes.toBytes("1"), 064 Bytes.toBytes("4"), 065 Bytes.toBytes("8")); 066 private static final byte[] FAMILY = Bytes.toBytes("f1"); 067 068 @Rule 069 public TestName name = new TestName(); 070 071 public TableName tableName; 072 073 @BeforeClass 074 public static void setUpBeforeClass() throws Exception { 075 TEST_UTIL.startMiniCluster(ROWS.size()); 076 asyncConn = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 077 } 078 079 @AfterClass 080 public static void tearDownAfterClass() throws Exception { 081 IOUtils.cleanup(null, asyncConn); 082 TEST_UTIL.shutdownMiniCluster(); 083 } 084 085 @Before 086 public void setUp() throws Exception { 087 tableName = TableName.valueOf(name.getMethodName()); 088 try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) { 089 List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); 090 for (int i = 0; i != 20; ++i) { 091 byte[] value = Bytes.toBytes(i); 092 puts.forEach(p -> p.addColumn(FAMILY, value, value)); 093 } 094 t.put(puts); 095 } 096 assertFalse(getRegionInfo().isEmpty()); 097 assertTrue(getRegionInfo().stream().allMatch(r -> r.getMemStoreDataSize() != 0)); 098 } 099 100 @After 101 public void tearDown() throws Exception { 102 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 103 LOG.info("Tear down, remove table=" + htd.getTableName()); 104 TEST_UTIL.deleteTable(htd.getTableName()); 105 } 106 } 107 108 @Test 109 public void testFlushTable() throws Exception { 110 try (Admin admin = TEST_UTIL.getAdmin()) { 111 admin.flush(tableName); 112 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 113 } 114 } 115 116 @Test 117 public void testAsyncFlushTable() throws Exception { 118 AsyncAdmin admin = asyncConn.getAdmin(); 119 admin.flush(tableName).get(); 120 assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 121 } 122 123 @Test 124 public void testFlushRegion() throws Exception { 125 try (Admin admin = TEST_UTIL.getAdmin()) { 126 for (HRegion r : getRegionInfo()) { 127 admin.flushRegion(r.getRegionInfo().getRegionName()); 128 TimeUnit.SECONDS.sleep(1); 129 assertEquals(0, r.getMemStoreDataSize()); 130 } 131 } 132 } 133 134 @Test 135 public void testAsyncFlushRegion() throws Exception { 136 AsyncAdmin admin = asyncConn.getAdmin(); 137 for (HRegion r : getRegionInfo()) { 138 admin.flushRegion(r.getRegionInfo().getRegionName()).get(); 139 TimeUnit.SECONDS.sleep(1); 140 assertEquals(0, r.getMemStoreDataSize()); 141 } 142 } 143 144 @Test 145 public void testFlushRegionServer() throws Exception { 146 try (Admin admin = TEST_UTIL.getAdmin()) { 147 for (HRegionServer rs : TEST_UTIL.getHBaseCluster() 148 .getLiveRegionServerThreads() 149 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 150 .collect(Collectors.toList())) { 151 admin.flushRegionServer(rs.getServerName()); 152 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 153 } 154 } 155 } 156 157 @Test 158 public void testAsyncFlushRegionServer() throws Exception { 159 AsyncAdmin admin = asyncConn.getAdmin(); 160 for (HRegionServer rs : TEST_UTIL.getHBaseCluster() 161 .getLiveRegionServerThreads() 162 .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) 163 .collect(Collectors.toList())) { 164 admin.flushRegionServer(rs.getServerName()).get(); 165 assertFalse(getRegionInfo(rs).stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); 166 } 167 } 168 169 private List<HRegion> getRegionInfo() { 170 return TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() 171 .map(JVMClusterUtil.RegionServerThread::getRegionServer) 172 .flatMap(r -> r.getRegions().stream()) 173 .filter(r -> r.getTableDescriptor().getTableName().equals(tableName)) 174 .collect(Collectors.toList()); 175 } 176 177 private List<HRegion> getRegionInfo(HRegionServer rs) { 178 return rs.getRegions().stream() 179 .filter(v -> v.getTableDescriptor().getTableName().equals(tableName)) 180 .collect(Collectors.toList()); 181 } 182}