001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.hamcrest.CoreMatchers.is; 022import static org.hamcrest.CoreMatchers.not; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.atomic.LongAdder; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.ChoreService; 036import org.apache.hadoop.hbase.CoordinatedStateManager; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.SplitLogCounters; 043import org.apache.hadoop.hbase.SplitLogTask; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.ClusterConnection; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 048import org.apache.hadoop.hbase.executor.ExecutorService; 049import org.apache.hadoop.hbase.executor.ExecutorType; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.CancelableProgressable; 053import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 054import org.apache.hadoop.hbase.zookeeper.ZKUtil; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 057import org.apache.zookeeper.CreateMode; 058import org.apache.zookeeper.ZooDefs.Ids; 059import org.junit.After; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067@Category({RegionServerTests.class, MediumTests.class}) 068public class TestSplitLogWorker { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestSplitLogWorker.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class); 075 private static final int WAIT_TIME = 15000; 076 private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); 077 private final static HBaseTestingUtility TEST_UTIL = 078 new HBaseTestingUtility(); 079 private DummyServer ds; 080 private ZKWatcher zkw; 081 private SplitLogWorker slw; 082 private ExecutorService executorService; 083 084 static class DummyServer implements Server { 085 private ZKWatcher zkw; 086 private Configuration conf; 087 private CoordinatedStateManager cm; 088 089 public DummyServer(ZKWatcher zkw, Configuration conf) { 090 this.zkw = zkw; 091 this.conf = conf; 092 cm = new ZkCoordinatedStateManager(this); 093 } 094 095 @Override 096 public void abort(String why, Throwable e) { 097 } 098 099 @Override 100 public boolean isAborted() { 101 return false; 102 } 103 104 @Override 105 public void stop(String why) { 106 } 107 108 @Override 109 public boolean isStopped() { 110 return false; 111 } 112 113 @Override 114 public Configuration getConfiguration() { 115 return conf; 116 } 117 118 @Override 119 public ZKWatcher getZooKeeper() { 120 return zkw; 121 } 122 123 @Override 124 public ServerName getServerName() { 125 return null; 126 } 127 128 @Override 129 public CoordinatedStateManager getCoordinatedStateManager() { 130 return cm; 131 } 132 133 @Override 134 public ClusterConnection getConnection() { 135 return null; 136 } 137 138 @Override 139 public ChoreService getChoreService() { 140 return null; 141 } 142 143 @Override 144 public ClusterConnection getClusterConnection() { 145 // TODO Auto-generated method stub 146 return null; 147 } 148 149 @Override 150 public FileSystem getFileSystem() { 151 return null; 152 } 153 154 @Override 155 public boolean isStopping() { 156 return false; 157 } 158 159 @Override 160 public Connection createConnection(Configuration conf) throws IOException { 161 return null; 162 } 163 } 164 165 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 166 throws Exception { 167 assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval, 168 waitForCounterBoolean(ctr, oldval, newval, timems)); 169 } 170 171 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, 172 long timems) throws Exception { 173 174 return waitForCounterBoolean(ctr, oldval, newval, timems, true); 175 } 176 177 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, 178 long timems, boolean failIfTimeout) throws Exception { 179 180 long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, 181 new Waiter.Predicate<Exception>() { 182 @Override 183 public boolean evaluate() throws Exception { 184 return (ctr.sum() >= newval); 185 } 186 }); 187 188 if( timeWaited > 0) { 189 // when not timed out 190 assertEquals(newval, ctr.sum()); 191 } 192 return true; 193 } 194 195 @Before 196 public void setup() throws Exception { 197 TEST_UTIL.startMiniZKCluster(); 198 Configuration conf = TEST_UTIL.getConfiguration(); 199 zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), 200 "split-log-worker-tests", null); 201 ds = new DummyServer(zkw, conf); 202 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 203 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 204 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1))); 205 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 206 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 207 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1))); 208 209 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 210 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode); 211 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1))); 212 213 SplitLogCounters.resetCounters(); 214 executorService = new ExecutorService("TestSplitLogWorker"); 215 executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); 216 } 217 218 @After 219 public void teardown() throws Exception { 220 if (executorService != null) { 221 executorService.shutdown(); 222 } 223 TEST_UTIL.shutdownMiniZKCluster(); 224 } 225 226 SplitLogWorker.TaskExecutor neverEndingTask = 227 new SplitLogWorker.TaskExecutor() { 228 229 @Override 230 public Status exec(String name, CancelableProgressable p) { 231 while (true) { 232 try { 233 Thread.sleep(1000); 234 } catch (InterruptedException e) { 235 return Status.PREEMPTED; 236 } 237 if (!p.progress()) { 238 return Status.PREEMPTED; 239 } 240 } 241 } 242 243 }; 244 245 @Test 246 public void testAcquireTaskAtStartup() throws Exception { 247 LOG.info("testAcquireTaskAtStartup"); 248 SplitLogCounters.resetCounters(); 249 final String TATAS = "tatas"; 250 final ServerName RS = ServerName.valueOf("rs,1,1"); 251 RegionServerServices mockedRS = getRegionServer(RS); 252 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), 253 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 254 Ids.OPEN_ACL_UNSAFE, 255 CreateMode.PERSISTENT); 256 257 SplitLogWorker slw = 258 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 259 slw.start(); 260 try { 261 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 262 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); 263 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 264 assertTrue(slt.isOwned(RS)); 265 } finally { 266 stopSplitLogWorker(slw); 267 } 268 } 269 270 private void stopSplitLogWorker(final SplitLogWorker slw) 271 throws InterruptedException { 272 if (slw != null) { 273 slw.stop(); 274 slw.worker.join(WAIT_TIME); 275 if (slw.worker.isAlive()) { 276 assertTrue(("Could not stop the worker thread slw=" + slw) == null); 277 } 278 } 279 } 280 281 @Test 282 public void testRaceForTask() throws Exception { 283 LOG.info("testRaceForTask"); 284 SplitLogCounters.resetCounters(); 285 final String TRFT = "trft"; 286 final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); 287 final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); 288 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), 289 new SplitLogTask.Unassigned(MANAGER).toByteArray(), 290 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 291 RegionServerServices mockedRS1 = getRegionServer(SVR1); 292 RegionServerServices mockedRS2 = getRegionServer(SVR2); 293 SplitLogWorker slw1 = 294 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); 295 SplitLogWorker slw2 = 296 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); 297 slw1.start(); 298 slw2.start(); 299 try { 300 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 301 // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if 302 // not it, that we fell through to the next counter in line and it was set. 303 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 304 WAIT_TIME, false) || 305 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); 306 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); 307 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 308 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); 309 } finally { 310 stopSplitLogWorker(slw1); 311 stopSplitLogWorker(slw2); 312 } 313 } 314 315 @Test 316 public void testPreemptTask() throws Exception { 317 LOG.info("testPreemptTask"); 318 SplitLogCounters.resetCounters(); 319 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); 320 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); 321 RegionServerServices mockedRS = getRegionServer(SRV); 322 SplitLogWorker slw = 323 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 324 slw.start(); 325 try { 326 Thread.yield(); // let the worker start 327 Thread.sleep(1000); 328 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 329 330 // this time create a task node after starting the splitLogWorker 331 zkw.getRecoverableZooKeeper().create(PATH, 332 new SplitLogTask.Unassigned(MANAGER).toByteArray(), 333 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 334 335 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 336 assertEquals(1, slw.getTaskReadySeq()); 337 byte [] bytes = ZKUtil.getData(zkw, PATH); 338 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 339 assertTrue(slt.isOwned(SRV)); 340 slt = new SplitLogTask.Owned(MANAGER); 341 ZKUtil.setData(zkw, PATH, slt.toByteArray()); 342 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 343 } finally { 344 stopSplitLogWorker(slw); 345 } 346 } 347 348 @Test 349 public void testMultipleTasks() throws Exception { 350 LOG.info("testMultipleTasks"); 351 SplitLogCounters.resetCounters(); 352 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); 353 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); 354 RegionServerServices mockedRS = getRegionServer(SRV); 355 SplitLogWorker slw = 356 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 357 slw.start(); 358 try { 359 Thread.yield(); // let the worker start 360 Thread.sleep(100); 361 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 362 363 SplitLogTask unassignedManager = 364 new SplitLogTask.Unassigned(MANAGER); 365 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), 366 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 367 368 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 369 // now the worker is busy doing the above task 370 371 // create another task 372 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); 373 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), 374 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 375 376 // preempt the first task, have it owned by another worker 377 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); 378 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); 379 ZKUtil.setData(zkw, PATH1, slt.toByteArray()); 380 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 381 382 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 383 assertEquals(2, slw.getTaskReadySeq()); 384 byte [] bytes = ZKUtil.getData(zkw, PATH2); 385 slt = SplitLogTask.parseFrom(bytes); 386 assertTrue(slt.isOwned(SRV)); 387 } finally { 388 stopSplitLogWorker(slw); 389 } 390 } 391 392 @Test 393 public void testRescan() throws Exception { 394 LOG.info("testRescan"); 395 SplitLogCounters.resetCounters(); 396 final ServerName SRV = ServerName.valueOf("svr,1,1"); 397 RegionServerServices mockedRS = getRegionServer(SRV); 398 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 399 slw.start(); 400 Thread.yield(); // let the worker start 401 Thread.sleep(100); 402 403 String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); 404 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); 405 zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 406 CreateMode.PERSISTENT); 407 408 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 409 // now the worker is busy doing the above task 410 411 // preempt the task, have it owned by another worker 412 ZKUtil.setData(zkw, task, slt.toByteArray()); 413 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 414 415 // create a RESCAN node 416 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); 417 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 418 CreateMode.PERSISTENT_SEQUENTIAL); 419 420 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 421 // RESCAN node might not have been processed if the worker became busy 422 // with the above task. preempt the task again so that now the RESCAN 423 // node is processed 424 ZKUtil.setData(zkw, task, slt.toByteArray()); 425 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); 426 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); 427 428 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode); 429 LOG.debug(Objects.toString(nodes)); 430 int num = 0; 431 for (String node : nodes) { 432 num++; 433 if (node.startsWith("RESCAN")) { 434 String name = ZKSplitLog.getEncodedNodeName(zkw, node); 435 String fn = ZKSplitLog.getFileName(name); 436 byte [] data = ZKUtil.getData(zkw, 437 ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn)); 438 slt = SplitLogTask.parseFrom(data); 439 assertTrue(slt.toString(), slt.isDone(SRV)); 440 } 441 } 442 assertEquals(2, num); 443 } 444 445 @Test 446 public void testAcquireMultiTasks() throws Exception { 447 LOG.info("testAcquireMultiTasks"); 448 SplitLogCounters.resetCounters(); 449 final String TATAS = "tatas"; 450 final ServerName RS = ServerName.valueOf("rs,1,1"); 451 final int maxTasks = 3; 452 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 453 testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); 454 RegionServerServices mockedRS = getRegionServer(RS); 455 for (int i = 0; i < maxTasks; i++) { 456 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), 457 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 458 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 459 } 460 461 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); 462 slw.start(); 463 try { 464 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); 465 for (int i = 0; i < maxTasks; i++) { 466 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); 467 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 468 assertTrue(slt.isOwned(RS)); 469 } 470 } finally { 471 stopSplitLogWorker(slw); 472 } 473 } 474 475 /** 476 * Create a mocked region server service instance 477 */ 478 private RegionServerServices getRegionServer(ServerName name) { 479 480 RegionServerServices mockedServer = mock(RegionServerServices.class); 481 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 482 when(mockedServer.getServerName()).thenReturn(name); 483 when(mockedServer.getZooKeeper()).thenReturn(zkw); 484 when(mockedServer.isStopped()).thenReturn(false); 485 when(mockedServer.getExecutorService()).thenReturn(executorService); 486 487 return mockedServer; 488 } 489 490}