001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtility; 027import org.apache.hadoop.hbase.MiniHBaseCluster; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 035import org.apache.hadoop.hbase.regionserver.HRegion; 036import org.apache.hadoop.hbase.regionserver.Region; 037import org.apache.hadoop.hbase.testclassification.MasterTests; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Rule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047import org.junit.rules.TestName; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Category({MasterTests.class, MediumTests.class}) 052public class TestAssignmentListener { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestAssignmentListener.class); 057 058 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentListener.class); 059 060 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 061 062 @Rule 063 public TestName name = new TestName(); 064 065 static class DummyListener { 066 protected AtomicInteger modified = new AtomicInteger(0); 067 068 public void awaitModifications(int count) throws InterruptedException { 069 while (!modified.compareAndSet(count, 0)) { 070 Thread.sleep(100); 071 } 072 } 073 } 074 075 static class DummyAssignmentListener extends DummyListener implements AssignmentListener { 076 private AtomicInteger closeCount = new AtomicInteger(0); 077 private AtomicInteger openCount = new AtomicInteger(0); 078 079 public DummyAssignmentListener() { 080 } 081 082 @Override 083 public void regionOpened(final RegionInfo regionInfo, final ServerName serverName) { 084 LOG.info("Assignment open region=" + regionInfo + " server=" + serverName); 085 openCount.incrementAndGet(); 086 modified.incrementAndGet(); 087 } 088 089 @Override 090 public void regionClosed(final RegionInfo regionInfo) { 091 LOG.info("Assignment close region=" + regionInfo); 092 closeCount.incrementAndGet(); 093 modified.incrementAndGet(); 094 } 095 096 public void reset() { 097 openCount.set(0); 098 closeCount.set(0); 099 } 100 101 public int getLoadCount() { 102 return openCount.get(); 103 } 104 105 public int getCloseCount() { 106 return closeCount.get(); 107 } 108 } 109 110 static class DummyServerListener extends DummyListener implements ServerListener { 111 private AtomicInteger removedCount = new AtomicInteger(0); 112 private AtomicInteger addedCount = new AtomicInteger(0); 113 114 public DummyServerListener() { 115 } 116 117 @Override 118 public void serverAdded(final ServerName serverName) { 119 LOG.info("Server added " + serverName); 120 addedCount.incrementAndGet(); 121 modified.incrementAndGet(); 122 } 123 124 @Override 125 public void serverRemoved(final ServerName serverName) { 126 LOG.info("Server removed " + serverName); 127 removedCount.incrementAndGet(); 128 modified.incrementAndGet(); 129 } 130 131 public void reset() { 132 addedCount.set(0); 133 removedCount.set(0); 134 } 135 136 public int getAddedCount() { 137 return addedCount.get(); 138 } 139 140 public int getRemovedCount() { 141 return removedCount.get(); 142 } 143 } 144 145 @BeforeClass 146 public static void beforeAllTests() throws Exception { 147 TEST_UTIL.startMiniCluster(2); 148 } 149 150 @AfterClass 151 public static void afterAllTests() throws Exception { 152 TEST_UTIL.shutdownMiniCluster(); 153 } 154 155 @Test 156 public void testServerListener() throws IOException, InterruptedException { 157 ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager(); 158 159 DummyServerListener listener = new DummyServerListener(); 160 serverManager.registerListener(listener); 161 try { 162 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster(); 163 164 // Start a new Region Server 165 miniCluster.startRegionServer(); 166 listener.awaitModifications(1); 167 assertEquals(1, listener.getAddedCount()); 168 assertEquals(0, listener.getRemovedCount()); 169 170 // Start another Region Server 171 listener.reset(); 172 miniCluster.startRegionServer(); 173 listener.awaitModifications(1); 174 assertEquals(1, listener.getAddedCount()); 175 assertEquals(0, listener.getRemovedCount()); 176 177 int nrs = miniCluster.getRegionServerThreads().size(); 178 179 // Stop a Region Server 180 listener.reset(); 181 miniCluster.stopRegionServer(nrs - 1); 182 listener.awaitModifications(1); 183 assertEquals(0, listener.getAddedCount()); 184 assertEquals(1, listener.getRemovedCount()); 185 186 // Stop another Region Server 187 listener.reset(); 188 miniCluster.stopRegionServer(nrs - 2); 189 listener.awaitModifications(1); 190 assertEquals(0, listener.getAddedCount()); 191 assertEquals(1, listener.getRemovedCount()); 192 } finally { 193 serverManager.unregisterListener(listener); 194 } 195 } 196 197 @Test 198 public void testAssignmentListener() throws IOException, InterruptedException { 199 AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager(); 200 Admin admin = TEST_UTIL.getAdmin(); 201 202 DummyAssignmentListener listener = new DummyAssignmentListener(); 203 am.registerListener(listener); 204 try { 205 final TableName tableName = TableName.valueOf(name.getMethodName()); 206 final byte[] FAMILY = Bytes.toBytes("cf"); 207 208 // Create a new table, with a single region 209 LOG.info("Create Table"); 210 TEST_UTIL.createTable(tableName, FAMILY); 211 listener.awaitModifications(1); 212 assertEquals(1, listener.getLoadCount()); 213 assertEquals(0, listener.getCloseCount()); 214 215 // Add some data 216 Table table = TEST_UTIL.getConnection().getTable(tableName); 217 try { 218 for (int i = 0; i < 10; ++i) { 219 byte[] key = Bytes.toBytes("row-" + i); 220 Put put = new Put(key); 221 put.addColumn(FAMILY, null, key); 222 table.put(put); 223 } 224 } finally { 225 table.close(); 226 } 227 228 // Split the table in two 229 LOG.info("Split Table"); 230 listener.reset(); 231 admin.split(tableName, Bytes.toBytes("row-3")); 232 listener.awaitModifications(3); 233 assertEquals(2, listener.getLoadCount()); // daughters added 234 assertEquals(1, listener.getCloseCount()); // parent removed 235 236 // Wait for the Regions to be mergeable 237 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster(); 238 int mergeable = 0; 239 while (mergeable < 2) { 240 Thread.sleep(100); 241 admin.majorCompact(tableName); 242 mergeable = 0; 243 for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) { 244 for (Region region: regionThread.getRegionServer().getRegions(tableName)) { 245 mergeable += ((HRegion)region).isMergeable() ? 1 : 0; 246 } 247 } 248 } 249 250 // Merge the two regions 251 LOG.info("Merge Regions"); 252 listener.reset(); 253 List<RegionInfo> regions = admin.getRegions(tableName); 254 assertEquals(2, regions.size()); 255 boolean sameServer = areAllRegionsLocatedOnSameServer(tableName); 256 // If the regions are located by different server, we need to move 257 // regions to same server before merging. So the expected modifications 258 // will increaes to 5. (open + close) 259 final int expectedModifications = sameServer ? 3 : 5; 260 final int expectedLoadCount = sameServer ? 1 : 2; 261 final int expectedCloseCount = sameServer ? 2 : 3; 262 admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(), 263 regions.get(1).getEncodedNameAsBytes(), true); 264 listener.awaitModifications(expectedModifications); 265 assertEquals(1, admin.getRegions(tableName).size()); 266 assertEquals(expectedLoadCount, listener.getLoadCount()); // new merged region added 267 assertEquals(expectedCloseCount, listener.getCloseCount()); // daughters removed 268 269 // Delete the table 270 LOG.info("Drop Table"); 271 listener.reset(); 272 TEST_UTIL.deleteTable(tableName); 273 listener.awaitModifications(1); 274 assertEquals(0, listener.getLoadCount()); 275 assertEquals(1, listener.getCloseCount()); 276 } finally { 277 am.unregisterListener(listener); 278 } 279 } 280 281 private boolean areAllRegionsLocatedOnSameServer(TableName TABLE_NAME) { 282 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster(); 283 int serverCount = 0; 284 for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) { 285 if (!regionThread.getRegionServer().getRegions(TABLE_NAME).isEmpty()) { 286 ++serverCount; 287 } 288 if (serverCount > 1) { 289 return false; 290 } 291 } 292 return serverCount == 1; 293 } 294}