001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.io.BufferedInputStream; 023import java.io.BufferedOutputStream; 024import java.io.Closeable; 025import java.io.DataInputStream; 026import java.io.DataOutputStream; 027import java.io.File; 028import java.io.FileInputStream; 029import java.io.FileOutputStream; 030import java.io.IOException; 031import java.nio.file.Files; 032import java.nio.file.Paths; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.EnumSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Locale; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CancellationException; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.Future; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.TimeoutException; 047import java.util.function.Predicate; 048import org.apache.commons.io.IOUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.ClusterMetrics.Option; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HRegionLocation; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.client.Admin; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.RegionInfo; 059import org.apache.hadoop.hbase.client.ResultScanner; 060import org.apache.hadoop.hbase.client.Scan; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 068import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 069 070/** 071 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 072 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 073 * acknowledges if regions are online after movement while noAck mode is best effort mode that 074 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 075 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 076 * anyways. This can also be used by constructiong an Object using the builder and then calling 077 * {@link #load()} or {@link #unload()} methods for the desired operations. 078 */ 079@InterfaceAudience.Public 080public class RegionMover extends AbstractHBaseTool implements Closeable { 081 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 082 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 083 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 084 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 085 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 086 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 087 static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 088 private RegionMoverBuilder rmbuilder; 089 private boolean ack = true; 090 private int maxthreads = 1; 091 private int timeout; 092 private String loadUnload; 093 private String hostname; 094 private String filename; 095 private String excludeFile; 096 private int port; 097 private Connection conn; 098 private Admin admin; 099 100 private RegionMover(RegionMoverBuilder builder) throws IOException { 101 this.hostname = builder.hostname; 102 this.filename = builder.filename; 103 this.excludeFile = builder.excludeFile; 104 this.maxthreads = builder.maxthreads; 105 this.ack = builder.ack; 106 this.port = builder.port; 107 this.timeout = builder.timeout; 108 setConf(builder.conf); 109 this.conn = ConnectionFactory.createConnection(conf); 110 this.admin = conn.getAdmin(); 111 } 112 113 private RegionMover() { 114 } 115 116 @Override 117 public void close() { 118 IOUtils.closeQuietly(this.admin); 119 IOUtils.closeQuietly(this.conn); 120 } 121 122 /** 123 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 124 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 125 * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options 126 */ 127 public static class RegionMoverBuilder { 128 private boolean ack = true; 129 private int maxthreads = 1; 130 private int timeout = Integer.MAX_VALUE; 131 private String hostname; 132 private String filename; 133 private String excludeFile = null; 134 private String defaultDir = System.getProperty("java.io.tmpdir"); 135 @VisibleForTesting 136 final int port; 137 private final Configuration conf; 138 139 public RegionMoverBuilder(String hostname) { 140 this(hostname, createConf()); 141 } 142 143 /** 144 * Creates a new configuration and sets region mover specific overrides 145 */ 146 private static Configuration createConf() { 147 Configuration conf = HBaseConfiguration.create(); 148 conf.setInt("hbase.client.prefetch.limit", 1); 149 conf.setInt("hbase.client.pause", 500); 150 conf.setInt("hbase.client.retries.number", 100); 151 return conf; 152 } 153 154 /** 155 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname 156 * or hostname:port. 157 * @param conf Configuration object 158 */ 159 public RegionMoverBuilder(String hostname, Configuration conf) { 160 String[] splitHostname = hostname.toLowerCase().split(":"); 161 this.hostname = splitHostname[0]; 162 if (splitHostname.length == 2) { 163 this.port = Integer.parseInt(splitHostname[1]); 164 } else { 165 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 166 } 167 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 168 + ":" + Integer.toString(this.port); 169 this.conf = conf; 170 } 171 172 /** 173 * Path of file where regions will be written to during unloading/read from during loading 174 * @param filename 175 * @return RegionMoverBuilder object 176 */ 177 public RegionMoverBuilder filename(String filename) { 178 this.filename = filename; 179 return this; 180 } 181 182 /** 183 * Set the max number of threads that will be used to move regions 184 */ 185 public RegionMoverBuilder maxthreads(int threads) { 186 this.maxthreads = threads; 187 return this; 188 } 189 190 /** 191 * Path of file containing hostnames to be excluded during region movement. Exclude file should 192 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 193 * host. 194 */ 195 public RegionMoverBuilder excludeFile(String excludefile) { 196 this.excludeFile = excludefile; 197 return this; 198 } 199 200 /** 201 * Set ack/noAck mode. 202 * <p> 203 * In ack mode regions are acknowledged before and after moving and the move is retried 204 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 205 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 206 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 207 * <p> 208 * @param ack 209 * @return RegionMoverBuilder object 210 */ 211 public RegionMoverBuilder ack(boolean ack) { 212 this.ack = ack; 213 return this; 214 } 215 216 /** 217 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 218 * movers also have a separate time which is hbase.move.wait.max * number of regions to 219 * load/unload 220 * @param timeout in seconds 221 * @return RegionMoverBuilder object 222 */ 223 public RegionMoverBuilder timeout(int timeout) { 224 this.timeout = timeout; 225 return this; 226 } 227 228 /** 229 * This method builds the appropriate RegionMover object which can then be used to load/unload 230 * using load and unload methods 231 * @return RegionMover object 232 */ 233 public RegionMover build() throws IOException { 234 return new RegionMover(this); 235 } 236 } 237 238 /** 239 * Move Regions and make sure that they are up on the target server.If a region movement fails we 240 * exit as failure 241 */ 242 private class MoveWithAck implements Callable<Boolean> { 243 private RegionInfo region; 244 private ServerName targetServer; 245 private List<RegionInfo> movedRegions; 246 private ServerName sourceServer; 247 248 public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer, 249 ServerName targetServer, List<RegionInfo> movedRegions) { 250 this.region = regionInfo; 251 this.targetServer = targetServer; 252 this.movedRegions = movedRegions; 253 this.sourceServer = sourceServer; 254 } 255 256 @Override 257 public Boolean call() throws IOException, InterruptedException { 258 boolean moved = false; 259 int count = 0; 260 int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); 261 int maxWaitInSeconds = 262 admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 263 long startTime = EnvironmentEdgeManager.currentTime(); 264 boolean sameServer = true; 265 // Assert we can scan the region in its current location 266 isSuccessfulScan(region); 267 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " 268 + targetServer); 269 while (count < retries && sameServer) { 270 if (count > 0) { 271 LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); 272 } 273 count = count + 1; 274 admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName())); 275 long maxWait = startTime + (maxWaitInSeconds * 1000); 276 while (EnvironmentEdgeManager.currentTime() < maxWait) { 277 sameServer = isSameServer(region, sourceServer); 278 if (!sameServer) { 279 break; 280 } 281 Thread.sleep(100); 282 } 283 } 284 if (sameServer) { 285 LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer 286 + ",newServer=" + this.targetServer); 287 } else { 288 isSuccessfulScan(region); 289 LOG.info("Moved Region " 290 + region.getRegionNameAsString() 291 + " cost:" 292 + String.format("%.3f", 293 (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); 294 moved = true; 295 movedRegions.add(region); 296 } 297 return moved; 298 } 299 } 300 301 /** 302 * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the 303 * RS down anyways and not abort on a stuck region. Improves movement performance 304 */ 305 private class MoveWithoutAck implements Callable<Boolean> { 306 private RegionInfo region; 307 private ServerName targetServer; 308 private List<RegionInfo> movedRegions; 309 private ServerName sourceServer; 310 311 public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer, 312 ServerName targetServer, List<RegionInfo> movedRegions) { 313 this.region = regionInfo; 314 this.targetServer = targetServer; 315 this.movedRegions = movedRegions; 316 this.sourceServer = sourceServer; 317 } 318 319 @Override 320 public Boolean call() { 321 try { 322 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " 323 + targetServer); 324 admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName())); 325 LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to " 326 + targetServer); 327 } catch (Exception e) { 328 LOG.error("Error Moving Region:" + region.getEncodedName(), e); 329 } finally { 330 // we add region to the moved regions list in No Ack Mode since this is best effort 331 movedRegions.add(region); 332 } 333 return true; 334 } 335 } 336 337 /** 338 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 339 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 340 * @return true if loading succeeded, false otherwise 341 */ 342 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 343 ExecutorService loadPool = Executors.newFixedThreadPool(1); 344 Future<Boolean> loadTask = loadPool.submit(() -> { 345 try { 346 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 347 if (regionsToMove.isEmpty()) { 348 LOG.info("No regions to load.Exiting"); 349 return true; 350 } 351 loadRegions(regionsToMove); 352 } catch (Exception e) { 353 LOG.error("Error while loading regions to " + hostname, e); 354 return false; 355 } 356 return true; 357 }); 358 return waitTaskToFinish(loadPool, loadTask, "loading"); 359 } 360 361 private void loadRegions(List<RegionInfo> regionsToMove) 362 throws Exception { 363 ServerName server = getTargetServer(); 364 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 365 LOG.info( 366 "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads 367 + " threads.Ack mode:" + this.ack); 368 369 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 370 List<Future<Boolean>> taskList = new ArrayList<>(); 371 int counter = 0; 372 while (counter < regionsToMove.size()) { 373 RegionInfo region = regionsToMove.get(counter); 374 ServerName currentServer = getServerNameForRegion(region); 375 if (currentServer == null) { 376 LOG.warn( 377 "Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 378 counter++; 379 continue; 380 } else if (server.equals(currentServer)) { 381 LOG.info( 382 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 383 counter++; 384 continue; 385 } 386 if (ack) { 387 Future<Boolean> task = 388 moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions)); 389 taskList.add(task); 390 } else { 391 Future<Boolean> task = 392 moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions)); 393 taskList.add(task); 394 } 395 counter++; 396 } 397 398 moveRegionsPool.shutdown(); 399 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() 400 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 401 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 402 } 403 404 /** 405 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 406 * noAck mode we do not make sure that region is successfully online on the target region 407 * server,hence it is best effort.We do not unload regions to hostnames given in 408 * {@link #excludeFile}. 409 * @return true if unloading succeeded, false otherwise 410 */ 411 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 412 deleteFile(this.filename); 413 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 414 Future<Boolean> unloadTask = unloadPool.submit(() -> { 415 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 416 try { 417 // Get Online RegionServers 418 List<ServerName> regionServers = new ArrayList<>(); 419 regionServers.addAll( 420 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics() 421 .keySet()); 422 // Remove the host Region server from target Region Servers list 423 ServerName server = stripServer(regionServers, hostname, port); 424 // Remove RS present in the exclude file 425 stripExcludes(regionServers); 426 stripMaster(regionServers); 427 if (regionServers.isEmpty()) { 428 LOG.warn("No Regions were moved - no servers available"); 429 return false; 430 } 431 unloadRegions(server, regionServers, movedRegions); 432 } catch (Exception e) { 433 LOG.error("Error while unloading regions ", e); 434 return false; 435 } finally { 436 if (movedRegions != null) { 437 writeFile(filename, movedRegions); 438 } 439 } 440 return true; 441 }); 442 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 443 } 444 445 private void unloadRegions(ServerName server, List<ServerName> regionServers, 446 List<RegionInfo> movedRegions) throws Exception { 447 while (true) { 448 List<RegionInfo> regionsToMove = admin.getRegions(server); 449 regionsToMove.removeAll(movedRegions); 450 if (regionsToMove.isEmpty()) { 451 LOG.info("No Regions to move....Quitting now"); 452 break; 453 } 454 int counter = 0; 455 LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " 456 + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" 457 + ack); 458 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 459 List<Future<Boolean>> taskList = new ArrayList<>(); 460 int serverIndex = 0; 461 while (counter < regionsToMove.size()) { 462 if (ack) { 463 Future<Boolean> task = moveRegionsPool.submit( 464 new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), 465 movedRegions)); 466 taskList.add(task); 467 } else { 468 Future<Boolean> task = moveRegionsPool.submit( 469 new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), 470 movedRegions)); 471 taskList.add(task); 472 } 473 counter++; 474 serverIndex = (serverIndex + 1) % regionServers.size(); 475 } 476 moveRegionsPool.shutdown(); 477 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() 478 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 479 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 480 } 481 } 482 483 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 484 throws TimeoutException, InterruptedException, ExecutionException { 485 pool.shutdown(); 486 try { 487 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 488 LOG.warn( 489 "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout 490 + "sec"); 491 pool.shutdownNow(); 492 } 493 } catch (InterruptedException e) { 494 pool.shutdownNow(); 495 Thread.currentThread().interrupt(); 496 } 497 try { 498 return task.get(5, TimeUnit.SECONDS); 499 } catch (InterruptedException e) { 500 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 501 throw e; 502 } catch (ExecutionException e) { 503 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 504 throw e; 505 } 506 } 507 508 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 509 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 510 try { 511 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 512 moveRegionsPool.shutdownNow(); 513 } 514 } catch (InterruptedException e) { 515 moveRegionsPool.shutdownNow(); 516 Thread.currentThread().interrupt(); 517 } 518 for (Future<Boolean> future : taskList) { 519 try { 520 // if even after shutdownNow threads are stuck we wait for 5 secs max 521 if (!future.get(5, TimeUnit.SECONDS)) { 522 LOG.error("Was Not able to move region....Exiting Now"); 523 throw new Exception("Could not move region Exception"); 524 } 525 } catch (InterruptedException e) { 526 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 527 throw e; 528 } catch (ExecutionException e) { 529 LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); 530 throw e; 531 } catch (CancellationException e) { 532 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 533 + "secs", e); 534 throw e; 535 } 536 } 537 } 538 539 private ServerName getTargetServer() throws Exception { 540 ServerName server = null; 541 int maxWaitInSeconds = 542 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 543 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 544 while (EnvironmentEdgeManager.currentTime() < maxWait) { 545 try { 546 List<ServerName> regionServers = new ArrayList<>(); 547 regionServers.addAll( 548 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics() 549 .keySet()); 550 // Remove the host Region server from target Region Servers list 551 server = stripServer(regionServers, hostname, port); 552 if (server != null) { 553 break; 554 } else { 555 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 556 } 557 } catch (IOException e) { 558 LOG.warn("Could not get list of region servers", e); 559 } 560 Thread.sleep(500); 561 } 562 if (server == null) { 563 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 564 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 565 } 566 return server; 567 } 568 569 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 570 List<RegionInfo> regions = new ArrayList<>(); 571 File f = new File(filename); 572 if (!f.exists()) { 573 return regions; 574 } 575 try (DataInputStream dis = new DataInputStream( 576 new BufferedInputStream(new FileInputStream(f)))) { 577 int numRegions = dis.readInt(); 578 int index = 0; 579 while (index < numRegions) { 580 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 581 index++; 582 } 583 } catch (IOException e) { 584 LOG.error("Error while reading regions from file:" + filename, e); 585 throw e; 586 } 587 return regions; 588 } 589 590 /** 591 * Write the number of regions moved in the first line followed by regions moved in subsequent 592 * lines 593 */ 594 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 595 try (DataOutputStream dos = new DataOutputStream( 596 new BufferedOutputStream(new FileOutputStream(filename)))) { 597 dos.writeInt(movedRegions.size()); 598 for (RegionInfo region : movedRegions) { 599 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 600 } 601 } catch (IOException e) { 602 LOG.error( 603 "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions 604 .size() + " regions", e); 605 throw e; 606 } 607 } 608 609 private void deleteFile(String filename) { 610 File f = new File(filename); 611 if (f.exists()) { 612 f.delete(); 613 } 614 } 615 616 /** 617 * @return List of servers from the exclude file in format 'hostname:port'. 618 */ 619 private List<String> readExcludes(String excludeFile) throws IOException { 620 List<String> excludeServers = new ArrayList<>(); 621 if (excludeFile == null) { 622 return excludeServers; 623 } else { 624 try { 625 Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim) 626 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 627 .forEach(excludeServers::add); 628 } catch (IOException e) { 629 LOG.warn("Exception while reading excludes file, continuing anyways", e); 630 } 631 return excludeServers; 632 } 633 } 634 635 /** 636 * Excludes the servername whose hostname and port portion matches the list given in exclude file 637 */ 638 private void stripExcludes(List<ServerName> regionServers) throws IOException { 639 if (excludeFile != null) { 640 List<String> excludes = readExcludes(excludeFile); 641 Iterator<ServerName> i = regionServers.iterator(); 642 while (i.hasNext()) { 643 String rs = i.next().getServerName(); 644 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs 645 .split(ServerName.SERVERNAME_SEPARATOR)[1]; 646 if (excludes.contains(rsPort)) { 647 i.remove(); 648 } 649 } 650 LOG.info("Valid Region server targets are:" + regionServers.toString()); 651 LOG.info("Excluded Servers are" + excludes.toString()); 652 } 653 } 654 655 /** 656 * Exclude master from list of RSs to move regions to 657 */ 658 private void stripMaster(List<ServerName> regionServers) throws IOException { 659 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 660 stripServer(regionServers, master.getHostname(), master.getPort()); 661 } 662 663 /** 664 * Remove the servername whose hostname and port portion matches from the passed array of servers. 665 * Returns as side-effect the servername removed. 666 * @return server removed from list of Region Servers 667 */ 668 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 669 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 670 ServerName server = iter.next(); 671 if (server.getAddress().getHostname().equalsIgnoreCase(hostname) && 672 server.getAddress().getPort() == port) { 673 iter.remove(); 674 return server; 675 } 676 } 677 return null; 678 } 679 680 /** 681 * Tries to scan a row from passed region 682 */ 683 private void isSuccessfulScan(RegionInfo region) throws IOException { 684 Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit() 685 .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter()) 686 .setCacheBlocks(false); 687 try (Table table = conn.getTable(region.getTable()); 688 ResultScanner scanner = table.getScanner(scan)) { 689 scanner.next(); 690 } catch (IOException e) { 691 LOG.error("Could not scan region:" + region.getEncodedName(), e); 692 throw e; 693 } 694 } 695 696 /** 697 * Returns true if passed region is still on serverName when we look at hbase:meta. 698 * @return true if region is hosted on serverName otherwise false 699 */ 700 private boolean isSameServer(RegionInfo region, ServerName serverName) 701 throws IOException { 702 ServerName serverForRegion = getServerNameForRegion(region); 703 if (serverForRegion != null && serverForRegion.equals(serverName)) { 704 return true; 705 } 706 return false; 707 } 708 709 /** 710 * Get servername that is up in hbase:meta hosting the given region. this is hostname + port + 711 * startcode comma-delimited. Can return null 712 * @return regionServer hosting the given region 713 */ 714 private ServerName getServerNameForRegion(RegionInfo region) throws IOException { 715 if (!admin.isTableEnabled(region.getTable())) { 716 return null; 717 } 718 HRegionLocation loc = 719 conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), true); 720 if (loc != null) { 721 return loc.getServerName(); 722 } else { 723 return null; 724 } 725 } 726 727 @Override 728 protected void addOptions() { 729 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 730 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload"); 731 this.addOptWithArg("m", "maxthreads", 732 "Define the maximum number of threads to use to unload and reload the regions"); 733 this.addOptWithArg("x", "excludefile", 734 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 735 + "target host; useful for rack decommisioning."); 736 this.addOptWithArg("f", "filename", 737 "File to save regions list into unloading, or read from loading; " 738 + "default /tmp/<usernamehostname:port>"); 739 this.addOptNoArg("n", "noack", 740 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 741 + "RegionServer, hence best effort. This is more performant in unloading and loading " 742 + "but might lead to region being unavailable for some time till master reassigns it " 743 + "in case the move failed"); 744 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 745 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 746 } 747 748 @Override 749 protected void processOptions(CommandLine cmd) { 750 String hostname = cmd.getOptionValue("r"); 751 rmbuilder = new RegionMoverBuilder(hostname); 752 if (cmd.hasOption('m')) { 753 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 754 } 755 if (cmd.hasOption('n')) { 756 rmbuilder.ack(false); 757 } 758 if (cmd.hasOption('f')) { 759 rmbuilder.filename(cmd.getOptionValue('f')); 760 } 761 if (cmd.hasOption('x')) { 762 rmbuilder.excludeFile(cmd.getOptionValue('x')); 763 } 764 if (cmd.hasOption('t')) { 765 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 766 } 767 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 768 } 769 770 @Override 771 protected int doWork() throws Exception { 772 boolean success; 773 try (RegionMover rm = rmbuilder.build()) { 774 if (loadUnload.equalsIgnoreCase("load")) { 775 success = rm.load(); 776 } else if (loadUnload.equalsIgnoreCase("unload")) { 777 success = rm.unload(); 778 } else { 779 printUsage(); 780 success = false; 781 } 782 } 783 return (success ? 0 : 1); 784 } 785 786 public static void main(String[] args) { 787 try (RegionMover mover = new RegionMover()) { 788 mover.doStaticMain(args); 789 } 790 } 791}