001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseTestingUtility;
027import org.apache.hadoop.hbase.MiniHBaseCluster;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
035import org.apache.hadoop.hbase.regionserver.HRegion;
036import org.apache.hadoop.hbase.regionserver.Region;
037import org.apache.hadoop.hbase.testclassification.MasterTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Rule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.junit.rules.TestName;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Category({MasterTests.class, MediumTests.class})
052public class TestAssignmentListener {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestAssignmentListener.class);
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentListener.class);
059
060  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061
062  @Rule
063  public TestName name = new TestName();
064
065  static class DummyListener {
066    protected AtomicInteger modified = new AtomicInteger(0);
067
068    public void awaitModifications(int count) throws InterruptedException {
069      while (!modified.compareAndSet(count, 0)) {
070        Thread.sleep(100);
071      }
072    }
073  }
074
075  static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
076    private AtomicInteger closeCount = new AtomicInteger(0);
077    private AtomicInteger openCount = new AtomicInteger(0);
078
079    public DummyAssignmentListener() {
080    }
081
082    @Override
083    public void regionOpened(final RegionInfo regionInfo, final ServerName serverName) {
084      LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
085      openCount.incrementAndGet();
086      modified.incrementAndGet();
087    }
088
089    @Override
090    public void regionClosed(final RegionInfo regionInfo) {
091      LOG.info("Assignment close region=" + regionInfo);
092      closeCount.incrementAndGet();
093      modified.incrementAndGet();
094    }
095
096    public void reset() {
097      openCount.set(0);
098      closeCount.set(0);
099    }
100
101    public int getLoadCount() {
102      return openCount.get();
103    }
104
105    public int getCloseCount() {
106      return closeCount.get();
107    }
108  }
109
110  static class DummyServerListener extends DummyListener implements ServerListener {
111    private AtomicInteger removedCount = new AtomicInteger(0);
112    private AtomicInteger addedCount = new AtomicInteger(0);
113
114    public DummyServerListener() {
115    }
116
117    @Override
118    public void serverAdded(final ServerName serverName) {
119      LOG.info("Server added " + serverName);
120      addedCount.incrementAndGet();
121      modified.incrementAndGet();
122    }
123
124    @Override
125    public void serverRemoved(final ServerName serverName) {
126      LOG.info("Server removed " + serverName);
127      removedCount.incrementAndGet();
128      modified.incrementAndGet();
129    }
130
131    public void reset() {
132      addedCount.set(0);
133      removedCount.set(0);
134    }
135
136    public int getAddedCount() {
137      return addedCount.get();
138    }
139
140    public int getRemovedCount() {
141      return removedCount.get();
142    }
143  }
144
145  @BeforeClass
146  public static void beforeAllTests() throws Exception {
147    TEST_UTIL.startMiniCluster(2);
148  }
149
150  @AfterClass
151  public static void afterAllTests() throws Exception {
152    TEST_UTIL.shutdownMiniCluster();
153  }
154
155  @Test
156  public void testServerListener() throws IOException, InterruptedException {
157    ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
158
159    DummyServerListener listener = new DummyServerListener();
160    serverManager.registerListener(listener);
161    try {
162      MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
163
164      // Start a new Region Server
165      miniCluster.startRegionServer();
166      listener.awaitModifications(1);
167      assertEquals(1, listener.getAddedCount());
168      assertEquals(0, listener.getRemovedCount());
169
170      // Start another Region Server
171      listener.reset();
172      miniCluster.startRegionServer();
173      listener.awaitModifications(1);
174      assertEquals(1, listener.getAddedCount());
175      assertEquals(0, listener.getRemovedCount());
176
177      int nrs = miniCluster.getRegionServerThreads().size();
178
179      // Stop a Region Server
180      listener.reset();
181      miniCluster.stopRegionServer(nrs - 1);
182      listener.awaitModifications(1);
183      assertEquals(0, listener.getAddedCount());
184      assertEquals(1, listener.getRemovedCount());
185
186      // Stop another Region Server
187      listener.reset();
188      miniCluster.stopRegionServer(nrs - 2);
189      listener.awaitModifications(1);
190      assertEquals(0, listener.getAddedCount());
191      assertEquals(1, listener.getRemovedCount());
192    } finally {
193      serverManager.unregisterListener(listener);
194    }
195  }
196
197  @Test
198  public void testAssignmentListener() throws IOException, InterruptedException {
199    AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
200    Admin admin = TEST_UTIL.getAdmin();
201
202    DummyAssignmentListener listener = new DummyAssignmentListener();
203    am.registerListener(listener);
204    try {
205      final TableName tableName = TableName.valueOf(name.getMethodName());
206      final byte[] FAMILY = Bytes.toBytes("cf");
207
208      // Create a new table, with a single region
209      LOG.info("Create Table");
210      TEST_UTIL.createTable(tableName, FAMILY);
211      listener.awaitModifications(1);
212      assertEquals(1, listener.getLoadCount());
213      assertEquals(0, listener.getCloseCount());
214
215      // Add some data
216      Table table = TEST_UTIL.getConnection().getTable(tableName);
217      try {
218        for (int i = 0; i < 10; ++i) {
219          byte[] key = Bytes.toBytes("row-" + i);
220          Put put = new Put(key);
221          put.addColumn(FAMILY, null, key);
222          table.put(put);
223        }
224      } finally {
225        table.close();
226      }
227
228      // Split the table in two
229      LOG.info("Split Table");
230      listener.reset();
231      admin.split(tableName, Bytes.toBytes("row-3"));
232      listener.awaitModifications(3);
233      assertEquals(2, listener.getLoadCount());     // daughters added
234      assertEquals(1, listener.getCloseCount());    // parent removed
235
236      // Wait for the Regions to be mergeable
237      MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
238      int mergeable = 0;
239      while (mergeable < 2) {
240        Thread.sleep(100);
241        admin.majorCompact(tableName);
242        mergeable = 0;
243        for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
244          for (Region region: regionThread.getRegionServer().getRegions(tableName)) {
245            mergeable += ((HRegion)region).isMergeable() ? 1 : 0;
246          }
247        }
248      }
249
250      // Merge the two regions
251      LOG.info("Merge Regions");
252      listener.reset();
253      List<RegionInfo> regions = admin.getRegions(tableName);
254      assertEquals(2, regions.size());
255      boolean sameServer = areAllRegionsLocatedOnSameServer(tableName);
256      // If the regions are located by different server, we need to move
257      // regions to same server before merging. So the expected modifications
258      // will increaes to 5. (open + close)
259      final int expectedModifications = sameServer ? 3 : 5;
260      final int expectedLoadCount = sameServer ? 1 : 2;
261      final int expectedCloseCount = sameServer ? 2 : 3;
262      admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
263        regions.get(1).getEncodedNameAsBytes(), true);
264      listener.awaitModifications(expectedModifications);
265      assertEquals(1, admin.getRegions(tableName).size());
266      assertEquals(expectedLoadCount, listener.getLoadCount());     // new merged region added
267      assertEquals(expectedCloseCount, listener.getCloseCount());    // daughters removed
268
269      // Delete the table
270      LOG.info("Drop Table");
271      listener.reset();
272      TEST_UTIL.deleteTable(tableName);
273      listener.awaitModifications(1);
274      assertEquals(0, listener.getLoadCount());
275      assertEquals(1, listener.getCloseCount());
276    } finally {
277      am.unregisterListener(listener);
278    }
279  }
280
281  private boolean areAllRegionsLocatedOnSameServer(TableName TABLE_NAME) {
282    MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
283    int serverCount = 0;
284    for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
285      if (!regionThread.getRegionServer().getRegions(TABLE_NAME).isEmpty()) {
286        ++serverCount;
287      }
288      if (serverCount > 1) {
289        return false;
290      }
291    }
292    return serverCount == 1;
293  }
294}