001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.hamcrest.CoreMatchers.is; 022import static org.hamcrest.CoreMatchers.not; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.atomic.LongAdder; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.ChoreService; 036import org.apache.hadoop.hbase.CoordinatedStateManager; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.SplitLogCounters; 043import org.apache.hadoop.hbase.SplitLogTask; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.AsyncClusterConnection; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 048import org.apache.hadoop.hbase.executor.ExecutorService; 049import org.apache.hadoop.hbase.executor.ExecutorType; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.CancelableProgressable; 053import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 054import org.apache.hadoop.hbase.zookeeper.ZKUtil; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 057import org.apache.zookeeper.CreateMode; 058import org.apache.zookeeper.ZooDefs.Ids; 059import org.junit.After; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067@Category({RegionServerTests.class, MediumTests.class}) 068public class TestSplitLogWorker { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestSplitLogWorker.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class); 075 private static final int WAIT_TIME = 15000; 076 private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); 077 private final static HBaseTestingUtility TEST_UTIL = 078 new HBaseTestingUtility(); 079 private DummyServer ds; 080 private ZKWatcher zkw; 081 private SplitLogWorker slw; 082 private ExecutorService executorService; 083 084 static class DummyServer implements Server { 085 private ZKWatcher zkw; 086 private Configuration conf; 087 private CoordinatedStateManager cm; 088 089 public DummyServer(ZKWatcher zkw, Configuration conf) { 090 this.zkw = zkw; 091 this.conf = conf; 092 cm = new ZkCoordinatedStateManager(this); 093 } 094 095 @Override 096 public void abort(String why, Throwable e) { 097 } 098 099 @Override 100 public boolean isAborted() { 101 return false; 102 } 103 104 @Override 105 public void stop(String why) { 106 } 107 108 @Override 109 public boolean isStopped() { 110 return false; 111 } 112 113 @Override 114 public Configuration getConfiguration() { 115 return conf; 116 } 117 118 @Override 119 public ZKWatcher getZooKeeper() { 120 return zkw; 121 } 122 123 @Override 124 public ServerName getServerName() { 125 return null; 126 } 127 128 @Override 129 public CoordinatedStateManager getCoordinatedStateManager() { 130 return cm; 131 } 132 133 @Override 134 public Connection getConnection() { 135 return null; 136 } 137 138 @Override 139 public ChoreService getChoreService() { 140 return null; 141 } 142 143 @Override 144 public FileSystem getFileSystem() { 145 return null; 146 } 147 148 @Override 149 public boolean isStopping() { 150 return false; 151 } 152 153 @Override 154 public Connection createConnection(Configuration conf) throws IOException { 155 return null; 156 } 157 158 @Override 159 public AsyncClusterConnection getAsyncClusterConnection() { 160 return null; 161 } 162 } 163 164 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 165 throws Exception { 166 assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval, 167 waitForCounterBoolean(ctr, oldval, newval, timems)); 168 } 169 170 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, 171 long timems) throws Exception { 172 173 return waitForCounterBoolean(ctr, oldval, newval, timems, true); 174 } 175 176 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, 177 long timems, boolean failIfTimeout) throws Exception { 178 179 long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, 180 new Waiter.Predicate<Exception>() { 181 @Override 182 public boolean evaluate() throws Exception { 183 return (ctr.sum() >= newval); 184 } 185 }); 186 187 if( timeWaited > 0) { 188 // when not timed out 189 assertEquals(newval, ctr.sum()); 190 } 191 return true; 192 } 193 194 @Before 195 public void setup() throws Exception { 196 TEST_UTIL.startMiniZKCluster(); 197 Configuration conf = TEST_UTIL.getConfiguration(); 198 zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), 199 "split-log-worker-tests", null); 200 ds = new DummyServer(zkw, conf); 201 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 202 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 203 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1))); 204 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 205 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 206 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1))); 207 208 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 209 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode); 210 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1))); 211 212 SplitLogCounters.resetCounters(); 213 executorService = new ExecutorService("TestSplitLogWorker"); 214 executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); 215 } 216 217 @After 218 public void teardown() throws Exception { 219 if (executorService != null) { 220 executorService.shutdown(); 221 } 222 TEST_UTIL.shutdownMiniZKCluster(); 223 } 224 225 SplitLogWorker.TaskExecutor neverEndingTask = 226 new SplitLogWorker.TaskExecutor() { 227 228 @Override 229 public Status exec(String name, CancelableProgressable p) { 230 while (true) { 231 try { 232 Thread.sleep(1000); 233 } catch (InterruptedException e) { 234 return Status.PREEMPTED; 235 } 236 if (!p.progress()) { 237 return Status.PREEMPTED; 238 } 239 } 240 } 241 242 }; 243 244 @Test 245 public void testAcquireTaskAtStartup() throws Exception { 246 LOG.info("testAcquireTaskAtStartup"); 247 SplitLogCounters.resetCounters(); 248 final String TATAS = "tatas"; 249 final ServerName RS = ServerName.valueOf("rs,1,1"); 250 RegionServerServices mockedRS = getRegionServer(RS); 251 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), 252 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 253 Ids.OPEN_ACL_UNSAFE, 254 CreateMode.PERSISTENT); 255 256 SplitLogWorker slw = 257 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 258 slw.start(); 259 try { 260 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 261 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); 262 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 263 assertTrue(slt.isOwned(RS)); 264 } finally { 265 stopSplitLogWorker(slw); 266 } 267 } 268 269 private void stopSplitLogWorker(final SplitLogWorker slw) 270 throws InterruptedException { 271 if (slw != null) { 272 slw.stop(); 273 slw.worker.join(WAIT_TIME); 274 if (slw.worker.isAlive()) { 275 assertTrue(("Could not stop the worker thread slw=" + slw) == null); 276 } 277 } 278 } 279 280 @Test 281 public void testRaceForTask() throws Exception { 282 LOG.info("testRaceForTask"); 283 SplitLogCounters.resetCounters(); 284 final String TRFT = "trft"; 285 final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); 286 final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); 287 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), 288 new SplitLogTask.Unassigned(MANAGER).toByteArray(), 289 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 290 RegionServerServices mockedRS1 = getRegionServer(SVR1); 291 RegionServerServices mockedRS2 = getRegionServer(SVR2); 292 SplitLogWorker slw1 = 293 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); 294 SplitLogWorker slw2 = 295 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); 296 slw1.start(); 297 slw2.start(); 298 try { 299 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 300 // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if 301 // not it, that we fell through to the next counter in line and it was set. 302 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 303 WAIT_TIME, false) || 304 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); 305 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); 306 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 307 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); 308 } finally { 309 stopSplitLogWorker(slw1); 310 stopSplitLogWorker(slw2); 311 } 312 } 313 314 @Test 315 public void testPreemptTask() throws Exception { 316 LOG.info("testPreemptTask"); 317 SplitLogCounters.resetCounters(); 318 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); 319 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); 320 RegionServerServices mockedRS = getRegionServer(SRV); 321 SplitLogWorker slw = 322 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 323 slw.start(); 324 try { 325 Thread.yield(); // let the worker start 326 Thread.sleep(1000); 327 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 328 329 // this time create a task node after starting the splitLogWorker 330 zkw.getRecoverableZooKeeper().create(PATH, 331 new SplitLogTask.Unassigned(MANAGER).toByteArray(), 332 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 333 334 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 335 assertEquals(1, slw.getTaskReadySeq()); 336 byte [] bytes = ZKUtil.getData(zkw, PATH); 337 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 338 assertTrue(slt.isOwned(SRV)); 339 slt = new SplitLogTask.Owned(MANAGER); 340 ZKUtil.setData(zkw, PATH, slt.toByteArray()); 341 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 342 } finally { 343 stopSplitLogWorker(slw); 344 } 345 } 346 347 @Test 348 public void testMultipleTasks() throws Exception { 349 LOG.info("testMultipleTasks"); 350 SplitLogCounters.resetCounters(); 351 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); 352 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); 353 RegionServerServices mockedRS = getRegionServer(SRV); 354 SplitLogWorker slw = 355 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 356 slw.start(); 357 try { 358 Thread.yield(); // let the worker start 359 Thread.sleep(100); 360 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 361 362 SplitLogTask unassignedManager = 363 new SplitLogTask.Unassigned(MANAGER); 364 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), 365 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 366 367 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 368 // now the worker is busy doing the above task 369 370 // create another task 371 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); 372 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), 373 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 374 375 // preempt the first task, have it owned by another worker 376 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); 377 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); 378 ZKUtil.setData(zkw, PATH1, slt.toByteArray()); 379 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 380 381 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 382 assertEquals(2, slw.getTaskReadySeq()); 383 byte [] bytes = ZKUtil.getData(zkw, PATH2); 384 slt = SplitLogTask.parseFrom(bytes); 385 assertTrue(slt.isOwned(SRV)); 386 } finally { 387 stopSplitLogWorker(slw); 388 } 389 } 390 391 @Test 392 public void testRescan() throws Exception { 393 LOG.info("testRescan"); 394 SplitLogCounters.resetCounters(); 395 final ServerName SRV = ServerName.valueOf("svr,1,1"); 396 RegionServerServices mockedRS = getRegionServer(SRV); 397 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 398 slw.start(); 399 Thread.yield(); // let the worker start 400 Thread.sleep(100); 401 402 String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); 403 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); 404 zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 405 CreateMode.PERSISTENT); 406 407 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 408 // now the worker is busy doing the above task 409 410 // preempt the task, have it owned by another worker 411 ZKUtil.setData(zkw, task, slt.toByteArray()); 412 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 413 414 // create a RESCAN node 415 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); 416 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 417 CreateMode.PERSISTENT_SEQUENTIAL); 418 419 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 420 // RESCAN node might not have been processed if the worker became busy 421 // with the above task. preempt the task again so that now the RESCAN 422 // node is processed 423 ZKUtil.setData(zkw, task, slt.toByteArray()); 424 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); 425 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); 426 427 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode); 428 LOG.debug(Objects.toString(nodes)); 429 int num = 0; 430 for (String node : nodes) { 431 num++; 432 if (node.startsWith("RESCAN")) { 433 String name = ZKSplitLog.getEncodedNodeName(zkw, node); 434 String fn = ZKSplitLog.getFileName(name); 435 byte [] data = ZKUtil.getData(zkw, 436 ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn)); 437 slt = SplitLogTask.parseFrom(data); 438 assertTrue(slt.toString(), slt.isDone(SRV)); 439 } 440 } 441 assertEquals(2, num); 442 } 443 444 @Test 445 public void testAcquireMultiTasks() throws Exception { 446 LOG.info("testAcquireMultiTasks"); 447 SplitLogCounters.resetCounters(); 448 final String TATAS = "tatas"; 449 final ServerName RS = ServerName.valueOf("rs,1,1"); 450 final int maxTasks = 3; 451 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 452 testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); 453 RegionServerServices mockedRS = getRegionServer(RS); 454 for (int i = 0; i < maxTasks; i++) { 455 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), 456 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 457 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 458 } 459 460 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); 461 slw.start(); 462 try { 463 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); 464 for (int i = 0; i < maxTasks; i++) { 465 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); 466 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 467 assertTrue(slt.isOwned(RS)); 468 } 469 } finally { 470 stopSplitLogWorker(slw); 471 } 472 } 473 474 /** 475 * Create a mocked region server service instance 476 */ 477 private RegionServerServices getRegionServer(ServerName name) { 478 479 RegionServerServices mockedServer = mock(RegionServerServices.class); 480 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 481 when(mockedServer.getServerName()).thenReturn(name); 482 when(mockedServer.getZooKeeper()).thenReturn(zkw); 483 when(mockedServer.isStopped()).thenReturn(false); 484 when(mockedServer.getExecutorService()).thenReturn(executorService); 485 486 return mockedServer; 487 } 488 489}