001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.is; 021import static org.hamcrest.CoreMatchers.not; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertThat; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.Objects; 031import java.util.concurrent.atomic.LongAdder; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.hbase.ChoreService; 035import org.apache.hadoop.hbase.CoordinatedStateManager; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.SplitLogCounters; 042import org.apache.hadoop.hbase.SplitLogTask; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.client.ClusterConnection; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 047import org.apache.hadoop.hbase.executor.ExecutorService; 048import org.apache.hadoop.hbase.executor.ExecutorType; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.CancelableProgressable; 052import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 053import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 054import org.apache.hadoop.hbase.zookeeper.ZKUtil; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 057import org.apache.zookeeper.CreateMode; 058import org.apache.zookeeper.ZooDefs.Ids; 059import org.junit.After; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067@Category({RegionServerTests.class, MediumTests.class}) 068public class TestSplitLogWorker { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestSplitLogWorker.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class); 075 private static final int WAIT_TIME = 15000; 076 private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); 077 private final static HBaseTestingUtility TEST_UTIL = 078 new HBaseTestingUtility(); 079 private DummyServer ds; 080 private ZKWatcher zkw; 081 private SplitLogWorker slw; 082 private ExecutorService executorService; 083 084 static class DummyServer implements Server { 085 private ZKWatcher zkw; 086 private Configuration conf; 087 private CoordinatedStateManager cm; 088 089 public DummyServer(ZKWatcher zkw, Configuration conf) { 090 this.zkw = zkw; 091 this.conf = conf; 092 cm = new ZkCoordinatedStateManager(this); 093 } 094 095 @Override 096 public void abort(String why, Throwable e) { 097 } 098 099 @Override 100 public boolean isAborted() { 101 return false; 102 } 103 104 @Override 105 public void stop(String why) { 106 } 107 108 @Override 109 public boolean isStopped() { 110 return false; 111 } 112 113 @Override 114 public Configuration getConfiguration() { 115 return conf; 116 } 117 118 @Override 119 public ZKWatcher getZooKeeper() { 120 return zkw; 121 } 122 123 @Override 124 public ServerName getServerName() { 125 return null; 126 } 127 128 @Override 129 public CoordinatedStateManager getCoordinatedStateManager() { 130 return cm; 131 } 132 133 @Override 134 public ClusterConnection getConnection() { 135 return null; 136 } 137 138 @Override 139 public MetaTableLocator getMetaTableLocator() { 140 return null; 141 } 142 143 @Override 144 public ChoreService getChoreService() { 145 return null; 146 } 147 148 @Override 149 public ClusterConnection getClusterConnection() { 150 // TODO Auto-generated method stub 151 return null; 152 } 153 154 @Override 155 public FileSystem getFileSystem() { 156 return null; 157 } 158 159 @Override 160 public boolean isStopping() { 161 return false; 162 } 163 164 @Override 165 public Connection createConnection(Configuration conf) throws IOException { 166 return null; 167 } 168 } 169 170 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 171 throws Exception { 172 assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval, 173 waitForCounterBoolean(ctr, oldval, newval, timems)); 174 } 175 176 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, 177 long timems) throws Exception { 178 179 return waitForCounterBoolean(ctr, oldval, newval, timems, true); 180 } 181 182 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, 183 long timems, boolean failIfTimeout) throws Exception { 184 185 long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, 186 new Waiter.Predicate<Exception>() { 187 @Override 188 public boolean evaluate() throws Exception { 189 return (ctr.sum() >= newval); 190 } 191 }); 192 193 if( timeWaited > 0) { 194 // when not timed out 195 assertEquals(newval, ctr.sum()); 196 } 197 return true; 198 } 199 200 @Before 201 public void setup() throws Exception { 202 TEST_UTIL.startMiniZKCluster(); 203 Configuration conf = TEST_UTIL.getConfiguration(); 204 zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), 205 "split-log-worker-tests", null); 206 ds = new DummyServer(zkw, conf); 207 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 208 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 209 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1))); 210 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 211 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 212 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1))); 213 214 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 215 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode); 216 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1))); 217 218 SplitLogCounters.resetCounters(); 219 executorService = new ExecutorService("TestSplitLogWorker"); 220 executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); 221 } 222 223 @After 224 public void teardown() throws Exception { 225 if (executorService != null) { 226 executorService.shutdown(); 227 } 228 TEST_UTIL.shutdownMiniZKCluster(); 229 } 230 231 SplitLogWorker.TaskExecutor neverEndingTask = 232 new SplitLogWorker.TaskExecutor() { 233 234 @Override 235 public Status exec(String name, CancelableProgressable p) { 236 while (true) { 237 try { 238 Thread.sleep(1000); 239 } catch (InterruptedException e) { 240 return Status.PREEMPTED; 241 } 242 if (!p.progress()) { 243 return Status.PREEMPTED; 244 } 245 } 246 } 247 248 }; 249 250 @Test 251 public void testAcquireTaskAtStartup() throws Exception { 252 LOG.info("testAcquireTaskAtStartup"); 253 SplitLogCounters.resetCounters(); 254 final String TATAS = "tatas"; 255 final ServerName RS = ServerName.valueOf("rs,1,1"); 256 RegionServerServices mockedRS = getRegionServer(RS); 257 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), 258 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 259 Ids.OPEN_ACL_UNSAFE, 260 CreateMode.PERSISTENT); 261 262 SplitLogWorker slw = 263 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 264 slw.start(); 265 try { 266 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 267 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); 268 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 269 assertTrue(slt.isOwned(RS)); 270 } finally { 271 stopSplitLogWorker(slw); 272 } 273 } 274 275 private void stopSplitLogWorker(final SplitLogWorker slw) 276 throws InterruptedException { 277 if (slw != null) { 278 slw.stop(); 279 slw.worker.join(WAIT_TIME); 280 if (slw.worker.isAlive()) { 281 assertTrue(("Could not stop the worker thread slw=" + slw) == null); 282 } 283 } 284 } 285 286 @Test 287 public void testRaceForTask() throws Exception { 288 LOG.info("testRaceForTask"); 289 SplitLogCounters.resetCounters(); 290 final String TRFT = "trft"; 291 final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); 292 final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); 293 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), 294 new SplitLogTask.Unassigned(MANAGER).toByteArray(), 295 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 296 RegionServerServices mockedRS1 = getRegionServer(SVR1); 297 RegionServerServices mockedRS2 = getRegionServer(SVR2); 298 SplitLogWorker slw1 = 299 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); 300 SplitLogWorker slw2 = 301 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); 302 slw1.start(); 303 slw2.start(); 304 try { 305 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 306 // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if 307 // not it, that we fell through to the next counter in line and it was set. 308 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 309 WAIT_TIME, false) || 310 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); 311 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); 312 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 313 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); 314 } finally { 315 stopSplitLogWorker(slw1); 316 stopSplitLogWorker(slw2); 317 } 318 } 319 320 @Test 321 public void testPreemptTask() throws Exception { 322 LOG.info("testPreemptTask"); 323 SplitLogCounters.resetCounters(); 324 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); 325 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); 326 RegionServerServices mockedRS = getRegionServer(SRV); 327 SplitLogWorker slw = 328 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 329 slw.start(); 330 try { 331 Thread.yield(); // let the worker start 332 Thread.sleep(1000); 333 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 334 335 // this time create a task node after starting the splitLogWorker 336 zkw.getRecoverableZooKeeper().create(PATH, 337 new SplitLogTask.Unassigned(MANAGER).toByteArray(), 338 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 339 340 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 341 assertEquals(1, slw.getTaskReadySeq()); 342 byte [] bytes = ZKUtil.getData(zkw, PATH); 343 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 344 assertTrue(slt.isOwned(SRV)); 345 slt = new SplitLogTask.Owned(MANAGER); 346 ZKUtil.setData(zkw, PATH, slt.toByteArray()); 347 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 348 } finally { 349 stopSplitLogWorker(slw); 350 } 351 } 352 353 @Test 354 public void testMultipleTasks() throws Exception { 355 LOG.info("testMultipleTasks"); 356 SplitLogCounters.resetCounters(); 357 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); 358 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); 359 RegionServerServices mockedRS = getRegionServer(SRV); 360 SplitLogWorker slw = 361 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 362 slw.start(); 363 try { 364 Thread.yield(); // let the worker start 365 Thread.sleep(100); 366 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 367 368 SplitLogTask unassignedManager = 369 new SplitLogTask.Unassigned(MANAGER); 370 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), 371 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 372 373 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 374 // now the worker is busy doing the above task 375 376 // create another task 377 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); 378 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), 379 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 380 381 // preempt the first task, have it owned by another worker 382 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); 383 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); 384 ZKUtil.setData(zkw, PATH1, slt.toByteArray()); 385 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 386 387 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 388 assertEquals(2, slw.getTaskReadySeq()); 389 byte [] bytes = ZKUtil.getData(zkw, PATH2); 390 slt = SplitLogTask.parseFrom(bytes); 391 assertTrue(slt.isOwned(SRV)); 392 } finally { 393 stopSplitLogWorker(slw); 394 } 395 } 396 397 @Test 398 public void testRescan() throws Exception { 399 LOG.info("testRescan"); 400 SplitLogCounters.resetCounters(); 401 final ServerName SRV = ServerName.valueOf("svr,1,1"); 402 RegionServerServices mockedRS = getRegionServer(SRV); 403 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 404 slw.start(); 405 Thread.yield(); // let the worker start 406 Thread.sleep(100); 407 408 String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); 409 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); 410 zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 411 CreateMode.PERSISTENT); 412 413 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 414 // now the worker is busy doing the above task 415 416 // preempt the task, have it owned by another worker 417 ZKUtil.setData(zkw, task, slt.toByteArray()); 418 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 419 420 // create a RESCAN node 421 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); 422 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 423 CreateMode.PERSISTENT_SEQUENTIAL); 424 425 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 426 // RESCAN node might not have been processed if the worker became busy 427 // with the above task. preempt the task again so that now the RESCAN 428 // node is processed 429 ZKUtil.setData(zkw, task, slt.toByteArray()); 430 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); 431 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); 432 433 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode); 434 LOG.debug(Objects.toString(nodes)); 435 int num = 0; 436 for (String node : nodes) { 437 num++; 438 if (node.startsWith("RESCAN")) { 439 String name = ZKSplitLog.getEncodedNodeName(zkw, node); 440 String fn = ZKSplitLog.getFileName(name); 441 byte [] data = ZKUtil.getData(zkw, 442 ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn)); 443 slt = SplitLogTask.parseFrom(data); 444 assertTrue(slt.toString(), slt.isDone(SRV)); 445 } 446 } 447 assertEquals(2, num); 448 } 449 450 @Test 451 public void testAcquireMultiTasks() throws Exception { 452 LOG.info("testAcquireMultiTasks"); 453 SplitLogCounters.resetCounters(); 454 final String TATAS = "tatas"; 455 final ServerName RS = ServerName.valueOf("rs,1,1"); 456 final int maxTasks = 3; 457 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 458 testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); 459 RegionServerServices mockedRS = getRegionServer(RS); 460 for (int i = 0; i < maxTasks; i++) { 461 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), 462 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 463 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 464 } 465 466 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); 467 slw.start(); 468 try { 469 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); 470 for (int i = 0; i < maxTasks; i++) { 471 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); 472 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 473 assertTrue(slt.isOwned(RS)); 474 } 475 } finally { 476 stopSplitLogWorker(slw); 477 } 478 } 479 480 /** 481 * Create a mocked region server service instance 482 */ 483 private RegionServerServices getRegionServer(ServerName name) { 484 485 RegionServerServices mockedServer = mock(RegionServerServices.class); 486 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 487 when(mockedServer.getServerName()).thenReturn(name); 488 when(mockedServer.getZooKeeper()).thenReturn(zkw); 489 when(mockedServer.isStopped()).thenReturn(false); 490 when(mockedServer.getExecutorService()).thenReturn(executorService); 491 492 return mockedServer; 493 } 494 495}