001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.hamcrest.CoreMatchers.is; 022import static org.hamcrest.CoreMatchers.not; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.atomic.LongAdder; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.ChoreService; 036import org.apache.hadoop.hbase.CoordinatedStateManager; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.SplitLogCounters; 043import org.apache.hadoop.hbase.SplitLogTask; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.ClusterConnection; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 048import org.apache.hadoop.hbase.executor.ExecutorService; 049import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; 050import org.apache.hadoop.hbase.executor.ExecutorType; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.CancelableProgressable; 054import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 055import org.apache.hadoop.hbase.zookeeper.ZKUtil; 056import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 057import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 058import org.apache.zookeeper.CreateMode; 059import org.apache.zookeeper.ZooDefs.Ids; 060import org.junit.After; 061import org.junit.Before; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@Category({ RegionServerTests.class, MediumTests.class }) 069public class TestSplitLogWorker { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestSplitLogWorker.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class); 076 private static final int WAIT_TIME = 15000; 077 private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); 078 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 079 private DummyServer ds; 080 private ZKWatcher zkw; 081 private SplitLogWorker slw; 082 private ExecutorService executorService; 083 084 static class DummyServer implements Server { 085 private ZKWatcher zkw; 086 private Configuration conf; 087 private CoordinatedStateManager cm; 088 089 public DummyServer(ZKWatcher zkw, Configuration conf) { 090 this.zkw = zkw; 091 this.conf = conf; 092 cm = new ZkCoordinatedStateManager(this); 093 } 094 095 @Override 096 public void abort(String why, Throwable e) { 097 } 098 099 @Override 100 public boolean isAborted() { 101 return false; 102 } 103 104 @Override 105 public void stop(String why) { 106 } 107 108 @Override 109 public boolean isStopped() { 110 return false; 111 } 112 113 @Override 114 public Configuration getConfiguration() { 115 return conf; 116 } 117 118 @Override 119 public ZKWatcher getZooKeeper() { 120 return zkw; 121 } 122 123 @Override 124 public ServerName getServerName() { 125 return null; 126 } 127 128 @Override 129 public CoordinatedStateManager getCoordinatedStateManager() { 130 return cm; 131 } 132 133 @Override 134 public ClusterConnection getConnection() { 135 return null; 136 } 137 138 @Override 139 public ChoreService getChoreService() { 140 return null; 141 } 142 143 @Override 144 public ClusterConnection getClusterConnection() { 145 // TODO Auto-generated method stub 146 return null; 147 } 148 149 @Override 150 public FileSystem getFileSystem() { 151 return null; 152 } 153 154 @Override 155 public boolean isStopping() { 156 return false; 157 } 158 159 @Override 160 public Connection createConnection(Configuration conf) throws IOException { 161 return null; 162 } 163 } 164 165 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 166 throws Exception { 167 assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval, 168 waitForCounterBoolean(ctr, oldval, newval, timems)); 169 } 170 171 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, 172 long timems) throws Exception { 173 174 return waitForCounterBoolean(ctr, oldval, newval, timems, true); 175 } 176 177 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, 178 long timems, boolean failIfTimeout) throws Exception { 179 180 long timeWaited = 181 TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() { 182 @Override 183 public boolean evaluate() throws Exception { 184 return (ctr.sum() >= newval); 185 } 186 }); 187 188 if (timeWaited > 0) { 189 // when not timed out 190 assertEquals(newval, ctr.sum()); 191 } 192 return true; 193 } 194 195 @Before 196 public void setup() throws Exception { 197 TEST_UTIL.startMiniZKCluster(); 198 Configuration conf = TEST_UTIL.getConfiguration(); 199 zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); 200 ds = new DummyServer(zkw, conf); 201 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 202 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 203 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1))); 204 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 205 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 206 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1))); 207 208 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 209 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode); 210 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1))); 211 212 SplitLogCounters.resetCounters(); 213 executorService = new ExecutorService("TestSplitLogWorker"); 214 executorService.startExecutorService(executorService.new ExecutorConfig() 215 .setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10)); 216 } 217 218 @After 219 public void teardown() throws Exception { 220 if (executorService != null) { 221 executorService.shutdown(); 222 } 223 TEST_UTIL.shutdownMiniZKCluster(); 224 } 225 226 SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() { 227 228 @Override 229 public Status exec(String name, CancelableProgressable p) { 230 while (true) { 231 try { 232 Thread.sleep(1000); 233 } catch (InterruptedException e) { 234 return Status.PREEMPTED; 235 } 236 if (!p.progress()) { 237 return Status.PREEMPTED; 238 } 239 } 240 } 241 242 }; 243 244 @Test 245 public void testAcquireTaskAtStartup() throws Exception { 246 LOG.info("testAcquireTaskAtStartup"); 247 SplitLogCounters.resetCounters(); 248 final String TATAS = "tatas"; 249 final ServerName RS = ServerName.valueOf("rs,1,1"); 250 RegionServerServices mockedRS = getRegionServer(RS); 251 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), 252 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, 253 CreateMode.PERSISTENT); 254 255 SplitLogWorker slw = 256 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 257 slw.start(); 258 try { 259 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 260 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); 261 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 262 assertTrue(slt.isOwned(RS)); 263 } finally { 264 stopSplitLogWorker(slw); 265 } 266 } 267 268 private void stopSplitLogWorker(final SplitLogWorker slw) throws InterruptedException { 269 if (slw != null) { 270 slw.stop(); 271 slw.worker.join(WAIT_TIME); 272 if (slw.worker.isAlive()) { 273 assertTrue(("Could not stop the worker thread slw=" + slw) == null); 274 } 275 } 276 } 277 278 @Test 279 public void testRaceForTask() throws Exception { 280 LOG.info("testRaceForTask"); 281 SplitLogCounters.resetCounters(); 282 final String TRFT = "trft"; 283 final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); 284 final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); 285 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), 286 new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, 287 CreateMode.PERSISTENT); 288 RegionServerServices mockedRS1 = getRegionServer(SVR1); 289 RegionServerServices mockedRS2 = getRegionServer(SVR2); 290 SplitLogWorker slw1 = 291 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); 292 SplitLogWorker slw2 = 293 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); 294 slw1.start(); 295 slw2.start(); 296 try { 297 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 298 // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if 299 // not it, that we fell through to the next counter in line and it was set. 300 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 301 WAIT_TIME, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); 302 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); 303 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 304 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); 305 } finally { 306 stopSplitLogWorker(slw1); 307 stopSplitLogWorker(slw2); 308 } 309 } 310 311 @Test 312 public void testPreemptTask() throws Exception { 313 LOG.info("testPreemptTask"); 314 SplitLogCounters.resetCounters(); 315 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); 316 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); 317 RegionServerServices mockedRS = getRegionServer(SRV); 318 SplitLogWorker slw = 319 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 320 slw.start(); 321 try { 322 Thread.yield(); // let the worker start 323 Thread.sleep(1000); 324 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 325 326 // this time create a task node after starting the splitLogWorker 327 zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(), 328 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 329 330 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 331 assertEquals(1, slw.getTaskReadySeq()); 332 byte[] bytes = ZKUtil.getData(zkw, PATH); 333 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 334 assertTrue(slt.isOwned(SRV)); 335 slt = new SplitLogTask.Owned(MANAGER); 336 ZKUtil.setData(zkw, PATH, slt.toByteArray()); 337 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 338 } finally { 339 stopSplitLogWorker(slw); 340 } 341 } 342 343 @Test 344 public void testMultipleTasks() throws Exception { 345 LOG.info("testMultipleTasks"); 346 SplitLogCounters.resetCounters(); 347 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); 348 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); 349 RegionServerServices mockedRS = getRegionServer(SRV); 350 SplitLogWorker slw = 351 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 352 slw.start(); 353 try { 354 Thread.yield(); // let the worker start 355 Thread.sleep(100); 356 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 357 358 SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); 359 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), 360 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 361 362 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 363 // now the worker is busy doing the above task 364 365 // create another task 366 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); 367 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), 368 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 369 370 // preempt the first task, have it owned by another worker 371 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); 372 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); 373 ZKUtil.setData(zkw, PATH1, slt.toByteArray()); 374 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 375 376 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 377 assertEquals(2, slw.getTaskReadySeq()); 378 byte[] bytes = ZKUtil.getData(zkw, PATH2); 379 slt = SplitLogTask.parseFrom(bytes); 380 assertTrue(slt.isOwned(SRV)); 381 } finally { 382 stopSplitLogWorker(slw); 383 } 384 } 385 386 @Test 387 public void testRescan() throws Exception { 388 LOG.info("testRescan"); 389 SplitLogCounters.resetCounters(); 390 final ServerName SRV = ServerName.valueOf("svr,1,1"); 391 RegionServerServices mockedRS = getRegionServer(SRV); 392 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 393 slw.start(); 394 Thread.yield(); // let the worker start 395 Thread.sleep(100); 396 397 String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); 398 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); 399 zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 400 CreateMode.PERSISTENT); 401 402 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 403 // now the worker is busy doing the above task 404 405 // preempt the task, have it owned by another worker 406 ZKUtil.setData(zkw, task, slt.toByteArray()); 407 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 408 409 // create a RESCAN node 410 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); 411 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 412 CreateMode.PERSISTENT_SEQUENTIAL); 413 414 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 415 // RESCAN node might not have been processed if the worker became busy 416 // with the above task. preempt the task again so that now the RESCAN 417 // node is processed 418 ZKUtil.setData(zkw, task, slt.toByteArray()); 419 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); 420 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); 421 422 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode); 423 LOG.debug(Objects.toString(nodes)); 424 int num = 0; 425 for (String node : nodes) { 426 num++; 427 if (node.startsWith("RESCAN")) { 428 String name = ZKSplitLog.getEncodedNodeName(zkw, node); 429 String fn = ZKSplitLog.getFileName(name); 430 byte[] data = 431 ZKUtil.getData(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn)); 432 slt = SplitLogTask.parseFrom(data); 433 assertTrue(slt.toString(), slt.isDone(SRV)); 434 } 435 } 436 assertEquals(2, num); 437 } 438 439 @Test 440 public void testAcquireMultiTasks() throws Exception { 441 LOG.info("testAcquireMultiTasks"); 442 SplitLogCounters.resetCounters(); 443 final String TATAS = "tatas"; 444 final ServerName RS = ServerName.valueOf("rs,1,1"); 445 final int maxTasks = 3; 446 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 447 testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); 448 RegionServerServices mockedRS = getRegionServer(RS); 449 for (int i = 0; i < maxTasks; i++) { 450 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), 451 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 452 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 453 } 454 455 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); 456 slw.start(); 457 try { 458 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); 459 for (int i = 0; i < maxTasks; i++) { 460 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); 461 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 462 assertTrue(slt.isOwned(RS)); 463 } 464 } finally { 465 stopSplitLogWorker(slw); 466 } 467 } 468 469 /** 470 * Create a mocked region server service instance 471 */ 472 private RegionServerServices getRegionServer(ServerName name) { 473 474 RegionServerServices mockedServer = mock(RegionServerServices.class); 475 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 476 when(mockedServer.getServerName()).thenReturn(name); 477 when(mockedServer.getZooKeeper()).thenReturn(zkw); 478 when(mockedServer.isStopped()).thenReturn(false); 479 when(mockedServer.getExecutorService()).thenReturn(executorService); 480 481 return mockedServer; 482 } 483 484}