001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.Closeable; 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.nio.file.Files; 030import java.nio.file.Paths; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.EnumSet; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Locale; 038import java.util.Optional; 039import java.util.Set; 040import java.util.concurrent.Callable; 041import java.util.concurrent.CancellationException; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Executors; 045import java.util.concurrent.Future; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.TimeoutException; 048import java.util.function.Predicate; 049import org.apache.commons.io.IOUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.ClusterMetrics.Option; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.UnknownRegionException; 056import org.apache.hadoop.hbase.client.Admin; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.master.RackManager; 062import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 068import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 069 070/** 071 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 072 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 073 * acknowledges if regions are online after movement while noAck mode is best effort mode that 074 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 075 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 076 * anyways. This can also be used by constructiong an Object using the builder and then calling 077 * {@link #load()} or {@link #unload()} methods for the desired operations. 078 */ 079@InterfaceAudience.Public 080public class RegionMover extends AbstractHBaseTool implements Closeable { 081 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 082 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 083 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 084 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 085 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 086 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 087 088 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 089 090 private RegionMoverBuilder rmbuilder; 091 private boolean ack = true; 092 private int maxthreads = 1; 093 private int timeout; 094 private String loadUnload; 095 private String hostname; 096 private String filename; 097 private String excludeFile; 098 private String designatedFile; 099 private int port; 100 private Connection conn; 101 private Admin admin; 102 private RackManager rackManager; 103 104 private RegionMover(RegionMoverBuilder builder) throws IOException { 105 this.hostname = builder.hostname; 106 this.filename = builder.filename; 107 this.excludeFile = builder.excludeFile; 108 this.designatedFile = builder.designatedFile; 109 this.maxthreads = builder.maxthreads; 110 this.ack = builder.ack; 111 this.port = builder.port; 112 this.timeout = builder.timeout; 113 setConf(builder.conf); 114 this.conn = ConnectionFactory.createConnection(conf); 115 this.admin = conn.getAdmin(); 116 // Only while running unit tests, builder.rackManager will not be null for the convenience of 117 // providing custom rackManager. Otherwise for regular workflow/user triggered action, 118 // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is 119 // provided as @InterfaceAudience.Private and it is commented that this is just 120 // to be used by unit test. 121 rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager; 122 } 123 124 private RegionMover() { 125 } 126 127 @Override 128 public void close() { 129 IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e)); 130 IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e)); 131 } 132 133 /** 134 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 135 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 136 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 137 * the corresponding options. 138 */ 139 public static class RegionMoverBuilder { 140 private boolean ack = true; 141 private int maxthreads = 1; 142 private int timeout = Integer.MAX_VALUE; 143 private String hostname; 144 private String filename; 145 private String excludeFile = null; 146 private String designatedFile = null; 147 private String defaultDir = System.getProperty("java.io.tmpdir"); 148 @InterfaceAudience.Private 149 final int port; 150 private final Configuration conf; 151 private RackManager rackManager; 152 153 public RegionMoverBuilder(String hostname) { 154 this(hostname, createConf()); 155 } 156 157 /** 158 * Creates a new configuration and sets region mover specific overrides 159 */ 160 private static Configuration createConf() { 161 Configuration conf = HBaseConfiguration.create(); 162 conf.setInt("hbase.client.prefetch.limit", 1); 163 conf.setInt("hbase.client.pause", 500); 164 conf.setInt("hbase.client.retries.number", 100); 165 return conf; 166 } 167 168 /** 169 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or 170 * hostname:port. 171 * @param conf Configuration object 172 */ 173 public RegionMoverBuilder(String hostname, Configuration conf) { 174 String[] splitHostname = hostname.toLowerCase().split(":"); 175 this.hostname = splitHostname[0]; 176 if (splitHostname.length == 2) { 177 this.port = Integer.parseInt(splitHostname[1]); 178 } else { 179 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 180 } 181 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 182 + ":" + Integer.toString(this.port); 183 this.conf = conf; 184 } 185 186 /** 187 * Path of file where regions will be written to during unloading/read from during loading n 188 * * @return RegionMoverBuilder object 189 */ 190 public RegionMoverBuilder filename(String filename) { 191 this.filename = filename; 192 return this; 193 } 194 195 /** 196 * Set the max number of threads that will be used to move regions 197 */ 198 public RegionMoverBuilder maxthreads(int threads) { 199 this.maxthreads = threads; 200 return this; 201 } 202 203 /** 204 * Path of file containing hostnames to be excluded during region movement. Exclude file should 205 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 206 * host. 207 */ 208 public RegionMoverBuilder excludeFile(String excludefile) { 209 this.excludeFile = excludefile; 210 return this; 211 } 212 213 /** 214 * Set the designated file. Designated file contains hostnames where region moves. Designated 215 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 216 * on a single host. 217 * @param designatedFile The designated file 218 * @return RegionMoverBuilder object 219 */ 220 public RegionMoverBuilder designatedFile(String designatedFile) { 221 this.designatedFile = designatedFile; 222 return this; 223 } 224 225 /** 226 * Set ack/noAck mode. 227 * <p> 228 * In ack mode regions are acknowledged before and after moving and the move is retried 229 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 230 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 231 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 232 * <p> 233 * n * @return RegionMoverBuilder object 234 */ 235 public RegionMoverBuilder ack(boolean ack) { 236 this.ack = ack; 237 return this; 238 } 239 240 /** 241 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 242 * movers also have a separate time which is hbase.move.wait.max * number of regions to 243 * load/unload 244 * @param timeout in seconds 245 * @return RegionMoverBuilder object 246 */ 247 public RegionMoverBuilder timeout(int timeout) { 248 this.timeout = timeout; 249 return this; 250 } 251 252 /** 253 * Set specific rackManager implementation. This setter method is for testing purpose only. 254 * @param rackManager rackManager impl 255 * @return RegionMoverBuilder object 256 */ 257 @InterfaceAudience.Private 258 public RegionMoverBuilder rackManager(RackManager rackManager) { 259 this.rackManager = rackManager; 260 return this; 261 } 262 263 /** 264 * This method builds the appropriate RegionMover object which can then be used to load/unload 265 * using load and unload methods 266 * @return RegionMover object 267 */ 268 public RegionMover build() throws IOException { 269 return new RegionMover(this); 270 } 271 } 272 273 /** 274 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 275 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 276 * @return true if loading succeeded, false otherwise 277 */ 278 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 279 ExecutorService loadPool = Executors.newFixedThreadPool(1); 280 Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan()); 281 boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading"); 282 if (!isMetaMoved) { 283 return false; 284 } 285 loadPool = Executors.newFixedThreadPool(1); 286 loadTask = loadPool.submit(getNonMetaRegionsMovePlan()); 287 return waitTaskToFinish(loadPool, loadTask, "loading"); 288 } 289 290 private Callable<Boolean> getMetaRegionMovePlan() { 291 return getRegionsMovePlan(true); 292 } 293 294 private Callable<Boolean> getNonMetaRegionsMovePlan() { 295 return getRegionsMovePlan(false); 296 } 297 298 private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) { 299 return () -> { 300 try { 301 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 302 if (regionsToMove.isEmpty()) { 303 LOG.info("No regions to load.Exiting"); 304 return true; 305 } 306 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 307 if (moveMetaRegion) { 308 if (metaRegion.isPresent()) { 309 loadRegions(Collections.singletonList(metaRegion.get())); 310 } 311 } else { 312 metaRegion.ifPresent(regionsToMove::remove); 313 loadRegions(regionsToMove); 314 } 315 } catch (Exception e) { 316 LOG.error("Error while loading regions to " + hostname, e); 317 return false; 318 } 319 return true; 320 }; 321 } 322 323 private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) { 324 return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst(); 325 } 326 327 private void loadRegions(List<RegionInfo> regionsToMove) throws Exception { 328 ServerName server = getTargetServer(); 329 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 330 LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " 331 + this.maxthreads + " threads.Ack mode:" + this.ack); 332 333 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 334 List<Future<Boolean>> taskList = new ArrayList<>(); 335 int counter = 0; 336 while (counter < regionsToMove.size()) { 337 RegionInfo region = regionsToMove.get(counter); 338 ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn); 339 if (currentServer == null) { 340 LOG 341 .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 342 counter++; 343 continue; 344 } else if (server.equals(currentServer)) { 345 LOG.info( 346 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 347 counter++; 348 continue; 349 } 350 if (ack) { 351 Future<Boolean> task = moveRegionsPool 352 .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions)); 353 taskList.add(task); 354 } else { 355 Future<Boolean> task = moveRegionsPool 356 .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions)); 357 taskList.add(task); 358 } 359 counter++; 360 } 361 362 moveRegionsPool.shutdown(); 363 long timeoutInSeconds = regionsToMove.size() 364 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 365 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 366 } 367 368 /** 369 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 370 * noAck mode we do not make sure that region is successfully online on the target region 371 * server,hence it is best effort.We do not unload regions to hostnames given in 372 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 373 * to hostnames provided in {@link #designatedFile} 374 * @return true if unloading succeeded, false otherwise 375 */ 376 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 377 return unloadRegions(false); 378 } 379 380 /** 381 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 382 * noAck mode we do not make sure that region is successfully online on the target region 383 * server,hence it is best effort.We do not unload regions to hostnames given in 384 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 385 * to hostnames provided in {@link #designatedFile}. While unloading regions, destination 386 * RegionServers are selected from different rack i.e regions should not move to any RegionServers 387 * that belong to same rack as source RegionServer. 388 * @return true if unloading succeeded, false otherwise 389 */ 390 public boolean unloadFromRack() 391 throws InterruptedException, ExecutionException, TimeoutException { 392 return unloadRegions(true); 393 } 394 395 private boolean unloadRegions(boolean unloadFromRack) 396 throws InterruptedException, ExecutionException, TimeoutException { 397 deleteFile(this.filename); 398 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 399 Future<Boolean> unloadTask = unloadPool.submit(() -> { 400 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 401 try { 402 // Get Online RegionServers 403 List<ServerName> regionServers = new ArrayList<>(); 404 regionServers.addAll(admin.getRegionServers()); 405 // Remove the host Region server from target Region Servers list 406 ServerName server = stripServer(regionServers, hostname, port); 407 if (server == null) { 408 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 409 hostname, port); 410 LOG.debug("List of region servers: {}", regionServers); 411 return false; 412 } 413 // Remove RS not present in the designated file 414 includeExcludeRegionServers(designatedFile, regionServers, true); 415 416 // Remove RS present in the exclude file 417 includeExcludeRegionServers(excludeFile, regionServers, false); 418 419 if (unloadFromRack) { 420 // remove regionServers that belong to same rack (as source host) since the goal is to 421 // unload regions from source regionServer to destination regionServers 422 // that belong to different rack only. 423 String sourceRack = rackManager.getRack(server); 424 List<String> racks = rackManager.getRack(regionServers); 425 Iterator<ServerName> iterator = regionServers.iterator(); 426 int i = 0; 427 while (iterator.hasNext()) { 428 iterator.next(); 429 if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) { 430 iterator.remove(); 431 } 432 i++; 433 } 434 } 435 436 // Remove decommissioned RS 437 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 438 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 439 regionServers.removeIf(decommissionedRS::contains); 440 LOG.debug("Excluded RegionServers from unloading regions to because they " 441 + "are marked as decommissioned. Servers: {}", decommissionedRS); 442 } 443 444 stripMaster(regionServers); 445 if (regionServers.isEmpty()) { 446 LOG.warn("No Regions were moved - no servers available"); 447 return false; 448 } 449 unloadRegions(server, regionServers, movedRegions); 450 } catch (Exception e) { 451 LOG.error("Error while unloading regions ", e); 452 return false; 453 } finally { 454 if (movedRegions != null) { 455 writeFile(filename, movedRegions); 456 } 457 } 458 return true; 459 }); 460 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 461 } 462 463 private void unloadRegions(ServerName server, List<ServerName> regionServers, 464 List<RegionInfo> movedRegions) throws Exception { 465 while (true) { 466 List<RegionInfo> regionsToMove = admin.getRegions(server); 467 regionsToMove.removeAll(movedRegions); 468 if (regionsToMove.isEmpty()) { 469 LOG.info("No Regions to move....Quitting now"); 470 break; 471 } 472 LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}", 473 regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack); 474 475 Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove); 476 if (metaRegion.isPresent()) { 477 RegionInfo meta = metaRegion.get(); 478 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, 479 Collections.singletonList(meta)); 480 regionsToMove.remove(meta); 481 } 482 submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove); 483 } 484 } 485 486 private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers, 487 List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception { 488 final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 489 List<Future<Boolean>> taskList = new ArrayList<>(); 490 int serverIndex = 0; 491 for (RegionInfo regionToMove : regionsToMove) { 492 if (ack) { 493 Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server, 494 regionServers.get(serverIndex), movedRegions)); 495 taskList.add(task); 496 } else { 497 Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove, 498 server, regionServers.get(serverIndex), movedRegions)); 499 taskList.add(task); 500 } 501 serverIndex = (serverIndex + 1) % regionServers.size(); 502 } 503 moveRegionsPool.shutdown(); 504 long timeoutInSeconds = regionsToMove.size() 505 * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 506 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 507 } 508 509 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 510 throws TimeoutException, InterruptedException, ExecutionException { 511 pool.shutdown(); 512 try { 513 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 514 LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: " 515 + this.timeout + "sec"); 516 pool.shutdownNow(); 517 } 518 } catch (InterruptedException e) { 519 pool.shutdownNow(); 520 Thread.currentThread().interrupt(); 521 } 522 try { 523 return task.get(5, TimeUnit.SECONDS); 524 } catch (InterruptedException e) { 525 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 526 throw e; 527 } catch (ExecutionException e) { 528 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 529 throw e; 530 } 531 } 532 533 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 534 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 535 try { 536 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 537 moveRegionsPool.shutdownNow(); 538 } 539 } catch (InterruptedException e) { 540 moveRegionsPool.shutdownNow(); 541 Thread.currentThread().interrupt(); 542 } 543 for (Future<Boolean> future : taskList) { 544 try { 545 // if even after shutdownNow threads are stuck we wait for 5 secs max 546 if (!future.get(5, TimeUnit.SECONDS)) { 547 LOG.error("Was Not able to move region....Exiting Now"); 548 throw new Exception("Could not move region Exception"); 549 } 550 } catch (InterruptedException e) { 551 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 552 throw e; 553 } catch (ExecutionException e) { 554 boolean ignoreFailure = ignoreRegionMoveFailure(e); 555 if (ignoreFailure) { 556 LOG.debug("Ignore region move failure, it might have been split/merged.", e); 557 } else { 558 LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e); 559 throw e; 560 } 561 } catch (CancellationException e) { 562 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 563 + "secs", e); 564 throw e; 565 } 566 } 567 } 568 569 private boolean ignoreRegionMoveFailure(ExecutionException e) { 570 boolean ignoreFailure = false; 571 if (e.getCause() instanceof UnknownRegionException) { 572 // region does not exist anymore 573 ignoreFailure = true; 574 } else if ( 575 e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null 576 && e.getCause().getMessage() 577 .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,") 578 ) { 579 // region is recently split 580 ignoreFailure = true; 581 } 582 return ignoreFailure; 583 } 584 585 private ServerName getTargetServer() throws Exception { 586 ServerName server = null; 587 int maxWaitInSeconds = 588 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 589 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 590 while (EnvironmentEdgeManager.currentTime() < maxWait) { 591 try { 592 List<ServerName> regionServers = new ArrayList<>(); 593 regionServers.addAll(admin.getRegionServers()); 594 // Remove the host Region server from target Region Servers list 595 server = stripServer(regionServers, hostname, port); 596 if (server != null) { 597 break; 598 } else { 599 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 600 } 601 } catch (IOException e) { 602 LOG.warn("Could not get list of region servers", e); 603 } 604 Thread.sleep(500); 605 } 606 if (server == null) { 607 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 608 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 609 } 610 return server; 611 } 612 613 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 614 List<RegionInfo> regions = new ArrayList<>(); 615 File f = new File(filename); 616 if (!f.exists()) { 617 return regions; 618 } 619 try ( 620 DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) { 621 int numRegions = dis.readInt(); 622 int index = 0; 623 while (index < numRegions) { 624 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 625 index++; 626 } 627 } catch (IOException e) { 628 LOG.error("Error while reading regions from file:" + filename, e); 629 throw e; 630 } 631 return regions; 632 } 633 634 /** 635 * Write the number of regions moved in the first line followed by regions moved in subsequent 636 * lines 637 */ 638 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 639 try (DataOutputStream dos = 640 new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) { 641 dos.writeInt(movedRegions.size()); 642 for (RegionInfo region : movedRegions) { 643 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 644 } 645 } catch (IOException e) { 646 LOG.error("ERROR: Was Not able to write regions moved to output file but moved " 647 + movedRegions.size() + " regions", e); 648 throw e; 649 } 650 } 651 652 private void deleteFile(String filename) { 653 File f = new File(filename); 654 if (f.exists()) { 655 f.delete(); 656 } 657 } 658 659 /** 660 * @param filename The file should have 'host:port' per line 661 * @return List of servers from the file in format 'hostname:port'. 662 */ 663 private List<String> readServersFromFile(String filename) throws IOException { 664 List<String> servers = new ArrayList<>(); 665 if (filename != null) { 666 try { 667 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 668 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 669 .forEach(servers::add); 670 } catch (IOException e) { 671 LOG.error("Exception while reading servers from file,", e); 672 throw e; 673 } 674 } 675 return servers; 676 } 677 678 /** 679 * Designates or excludes the servername whose hostname and port portion matches the list given in 680 * the file. Example:<br> 681 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 682 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3 683 * are removed from regionServers list so that regions can move to only RS1. If you want to 684 * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call 685 * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers 686 * list so that regions can move to only RS2 and RS3. 687 */ 688 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 689 boolean isInclude) throws IOException { 690 if (fileName != null) { 691 List<String> servers = readServersFromFile(fileName); 692 if (servers.isEmpty()) { 693 LOG.warn("No servers provided in the file: {}." + fileName); 694 return; 695 } 696 Iterator<ServerName> i = regionServers.iterator(); 697 while (i.hasNext()) { 698 String rs = i.next().getServerName(); 699 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" 700 + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; 701 if (isInclude != servers.contains(rsPort)) { 702 i.remove(); 703 } 704 } 705 } 706 } 707 708 /** 709 * Exclude master from list of RSs to move regions to 710 */ 711 private void stripMaster(List<ServerName> regionServers) throws IOException { 712 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 713 stripServer(regionServers, master.getHostname(), master.getPort()); 714 } 715 716 /** 717 * Remove the servername whose hostname and port portion matches from the passed array of servers. 718 * Returns as side-effect the servername removed. 719 * @return server removed from list of Region Servers 720 */ 721 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 722 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 723 ServerName server = iter.next(); 724 if ( 725 server.getAddress().getHostName().equalsIgnoreCase(hostname) 726 && server.getAddress().getPort() == port 727 ) { 728 iter.remove(); 729 return server; 730 } 731 } 732 return null; 733 } 734 735 @Override 736 protected void addOptions() { 737 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 738 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack"); 739 this.addOptWithArg("m", "maxthreads", 740 "Define the maximum number of threads to use to unload and reload the regions"); 741 this.addOptWithArg("x", "excludefile", 742 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 743 + "target host; useful for rack decommisioning."); 744 this.addOptWithArg("d", "designatedfile", 745 "File with <hostname:port> per line as unload targets;" + "default is all online hosts"); 746 this.addOptWithArg("f", "filename", 747 "File to save regions list into unloading, or read from loading; " 748 + "default /tmp/<usernamehostname:port>"); 749 this.addOptNoArg("n", "noack", 750 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 751 + "RegionServer, hence best effort. This is more performant in unloading and loading " 752 + "but might lead to region being unavailable for some time till master reassigns it " 753 + "in case the move failed"); 754 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 755 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 756 } 757 758 @Override 759 protected void processOptions(CommandLine cmd) { 760 String hostname = cmd.getOptionValue("r"); 761 rmbuilder = new RegionMoverBuilder(hostname); 762 if (cmd.hasOption('m')) { 763 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 764 } 765 if (cmd.hasOption('n')) { 766 rmbuilder.ack(false); 767 } 768 if (cmd.hasOption('f')) { 769 rmbuilder.filename(cmd.getOptionValue('f')); 770 } 771 if (cmd.hasOption('x')) { 772 rmbuilder.excludeFile(cmd.getOptionValue('x')); 773 } 774 if (cmd.hasOption('d')) { 775 rmbuilder.designatedFile(cmd.getOptionValue('d')); 776 } 777 if (cmd.hasOption('t')) { 778 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 779 } 780 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 781 } 782 783 @Override 784 protected int doWork() throws Exception { 785 boolean success; 786 try (RegionMover rm = rmbuilder.build()) { 787 if (loadUnload.equalsIgnoreCase("load")) { 788 success = rm.load(); 789 } else if (loadUnload.equalsIgnoreCase("unload")) { 790 success = rm.unload(); 791 } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) { 792 success = rm.unloadFromRack(); 793 } else { 794 printUsage(); 795 success = false; 796 } 797 } 798 return (success ? 0 : 1); 799 } 800 801 public static void main(String[] args) { 802 try (RegionMover mover = new RegionMover()) { 803 mover.doStaticMain(args); 804 } 805 } 806}