001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.io.BufferedInputStream; 023import java.io.BufferedOutputStream; 024import java.io.Closeable; 025import java.io.DataInputStream; 026import java.io.DataOutputStream; 027import java.io.File; 028import java.io.FileInputStream; 029import java.io.FileOutputStream; 030import java.io.IOException; 031import java.nio.file.Files; 032import java.nio.file.Paths; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.EnumSet; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.List; 040import java.util.Locale; 041import java.util.Set; 042import java.util.concurrent.Callable; 043import java.util.concurrent.CancellationException; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.TimeoutException; 050import java.util.function.Predicate; 051import org.apache.commons.io.IOUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.ClusterMetrics.Option; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HRegionLocation; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.client.Admin; 059import org.apache.hadoop.hbase.client.Connection; 060import org.apache.hadoop.hbase.client.ConnectionFactory; 061import org.apache.hadoop.hbase.client.RegionInfo; 062import org.apache.hadoop.hbase.client.ResultScanner; 063import org.apache.hadoop.hbase.client.Scan; 064import org.apache.hadoop.hbase.client.Table; 065import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 066import org.apache.hadoop.hbase.net.Address; 067import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 073import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 074import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 075 076/** 077 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command 078 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode 079 * acknowledges if regions are online after movement while noAck mode is best effort mode that 080 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck 081 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it 082 * anyways. This can also be used by constructiong an Object using the builder and then calling 083 * {@link #load()} or {@link #unload()} methods for the desired operations. 084 */ 085@InterfaceAudience.Public 086public class RegionMover extends AbstractHBaseTool implements Closeable { 087 public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; 088 public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; 089 public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; 090 public static final int DEFAULT_MOVE_RETRIES_MAX = 5; 091 public static final int DEFAULT_MOVE_WAIT_MAX = 60; 092 public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; 093 094 private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class); 095 096 private RegionMoverBuilder rmbuilder; 097 private boolean ack = true; 098 private int maxthreads = 1; 099 private int timeout; 100 private String loadUnload; 101 private String hostname; 102 private String filename; 103 private String excludeFile; 104 private String designatedFile; 105 private int port; 106 private Connection conn; 107 private Admin admin; 108 109 private RegionMover(RegionMoverBuilder builder) throws IOException { 110 this.hostname = builder.hostname; 111 this.filename = builder.filename; 112 this.excludeFile = builder.excludeFile; 113 this.designatedFile = builder.designatedFile; 114 this.maxthreads = builder.maxthreads; 115 this.ack = builder.ack; 116 this.port = builder.port; 117 this.timeout = builder.timeout; 118 setConf(builder.conf); 119 this.conn = ConnectionFactory.createConnection(conf); 120 this.admin = conn.getAdmin(); 121 } 122 123 private RegionMover() { 124 } 125 126 @Override 127 public void close() { 128 IOUtils.closeQuietly(this.admin); 129 IOUtils.closeQuietly(this.conn); 130 } 131 132 /** 133 * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has 134 * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, 135 * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set 136 * the corresponding options. 137 */ 138 public static class RegionMoverBuilder { 139 private boolean ack = true; 140 private int maxthreads = 1; 141 private int timeout = Integer.MAX_VALUE; 142 private String hostname; 143 private String filename; 144 private String excludeFile = null; 145 private String designatedFile = null; 146 private String defaultDir = System.getProperty("java.io.tmpdir"); 147 @VisibleForTesting 148 final int port; 149 private final Configuration conf; 150 151 public RegionMoverBuilder(String hostname) { 152 this(hostname, createConf()); 153 } 154 155 /** 156 * Creates a new configuration and sets region mover specific overrides 157 */ 158 private static Configuration createConf() { 159 Configuration conf = HBaseConfiguration.create(); 160 conf.setInt("hbase.client.prefetch.limit", 1); 161 conf.setInt("hbase.client.pause", 500); 162 conf.setInt("hbase.client.retries.number", 100); 163 return conf; 164 } 165 166 /** 167 * @param hostname Hostname to unload regions from or load regions to. Can be either hostname 168 * or hostname:port. 169 * @param conf Configuration object 170 */ 171 public RegionMoverBuilder(String hostname, Configuration conf) { 172 String[] splitHostname = hostname.toLowerCase().split(":"); 173 this.hostname = splitHostname[0]; 174 if (splitHostname.length == 2) { 175 this.port = Integer.parseInt(splitHostname[1]); 176 } else { 177 this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 178 } 179 this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname 180 + ":" + Integer.toString(this.port); 181 this.conf = conf; 182 } 183 184 /** 185 * Path of file where regions will be written to during unloading/read from during loading 186 * @param filename 187 * @return RegionMoverBuilder object 188 */ 189 public RegionMoverBuilder filename(String filename) { 190 this.filename = filename; 191 return this; 192 } 193 194 /** 195 * Set the max number of threads that will be used to move regions 196 */ 197 public RegionMoverBuilder maxthreads(int threads) { 198 this.maxthreads = threads; 199 return this; 200 } 201 202 /** 203 * Path of file containing hostnames to be excluded during region movement. Exclude file should 204 * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single 205 * host. 206 */ 207 public RegionMoverBuilder excludeFile(String excludefile) { 208 this.excludeFile = excludefile; 209 return this; 210 } 211 212 /** 213 * Set the designated file. Designated file contains hostnames where region moves. Designated 214 * file should have 'host:port' per line. Port is mandatory here as we can have many RS running 215 * on a single host. 216 * @param designatedFile The designated file 217 * @return RegionMoverBuilder object 218 */ 219 public RegionMoverBuilder designatedFile(String designatedFile) { 220 this.designatedFile = designatedFile; 221 return this; 222 } 223 224 /** 225 * Set ack/noAck mode. 226 * <p> 227 * In ack mode regions are acknowledged before and after moving and the move is retried 228 * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best 229 * effort mode,each region movement is tried once.This can be used during graceful shutdown as 230 * even if we have a stuck region,upon shutdown it'll be reassigned anyway. 231 * <p> 232 * @param ack 233 * @return RegionMoverBuilder object 234 */ 235 public RegionMoverBuilder ack(boolean ack) { 236 this.ack = ack; 237 return this; 238 } 239 240 /** 241 * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for 242 * movers also have a separate time which is hbase.move.wait.max * number of regions to 243 * load/unload 244 * @param timeout in seconds 245 * @return RegionMoverBuilder object 246 */ 247 public RegionMoverBuilder timeout(int timeout) { 248 this.timeout = timeout; 249 return this; 250 } 251 252 /** 253 * This method builds the appropriate RegionMover object which can then be used to load/unload 254 * using load and unload methods 255 * @return RegionMover object 256 */ 257 public RegionMover build() throws IOException { 258 return new RegionMover(this); 259 } 260 } 261 262 /** 263 * Move Regions and make sure that they are up on the target server.If a region movement fails we 264 * exit as failure 265 */ 266 private class MoveWithAck implements Callable<Boolean> { 267 private RegionInfo region; 268 private ServerName targetServer; 269 private List<RegionInfo> movedRegions; 270 private ServerName sourceServer; 271 272 public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer, 273 ServerName targetServer, List<RegionInfo> movedRegions) { 274 this.region = regionInfo; 275 this.targetServer = targetServer; 276 this.movedRegions = movedRegions; 277 this.sourceServer = sourceServer; 278 } 279 280 @Override 281 public Boolean call() throws IOException, InterruptedException { 282 boolean moved = false; 283 int count = 0; 284 int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); 285 int maxWaitInSeconds = 286 admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 287 long startTime = EnvironmentEdgeManager.currentTime(); 288 boolean sameServer = true; 289 // Assert we can scan the region in its current location 290 isSuccessfulScan(region); 291 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " 292 + targetServer); 293 while (count < retries && sameServer) { 294 if (count > 0) { 295 LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); 296 } 297 count = count + 1; 298 admin.move(region.getEncodedNameAsBytes(), targetServer); 299 long maxWait = startTime + (maxWaitInSeconds * 1000); 300 while (EnvironmentEdgeManager.currentTime() < maxWait) { 301 sameServer = isSameServer(region, sourceServer); 302 if (!sameServer) { 303 break; 304 } 305 Thread.sleep(100); 306 } 307 } 308 if (sameServer) { 309 LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer 310 + ",newServer=" + this.targetServer); 311 } else { 312 isSuccessfulScan(region); 313 LOG.info("Moved Region " 314 + region.getRegionNameAsString() 315 + " cost:" 316 + String.format("%.3f", 317 (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); 318 moved = true; 319 movedRegions.add(region); 320 } 321 return moved; 322 } 323 } 324 325 /** 326 * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the 327 * RS down anyways and not abort on a stuck region. Improves movement performance 328 */ 329 private class MoveWithoutAck implements Callable<Boolean> { 330 private RegionInfo region; 331 private ServerName targetServer; 332 private List<RegionInfo> movedRegions; 333 private ServerName sourceServer; 334 335 public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer, 336 ServerName targetServer, List<RegionInfo> movedRegions) { 337 this.region = regionInfo; 338 this.targetServer = targetServer; 339 this.movedRegions = movedRegions; 340 this.sourceServer = sourceServer; 341 } 342 343 @Override 344 public Boolean call() { 345 try { 346 LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " 347 + targetServer); 348 admin.move(region.getEncodedNameAsBytes(), targetServer); 349 LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to " 350 + targetServer); 351 } catch (Exception e) { 352 LOG.error("Error Moving Region:" + region.getEncodedName(), e); 353 } finally { 354 // we add region to the moved regions list in No Ack Mode since this is best effort 355 movedRegions.add(region); 356 } 357 return true; 358 } 359 } 360 361 /** 362 * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover 363 * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} 364 * @return true if loading succeeded, false otherwise 365 */ 366 public boolean load() throws ExecutionException, InterruptedException, TimeoutException { 367 ExecutorService loadPool = Executors.newFixedThreadPool(1); 368 Future<Boolean> loadTask = loadPool.submit(() -> { 369 try { 370 List<RegionInfo> regionsToMove = readRegionsFromFile(filename); 371 if (regionsToMove.isEmpty()) { 372 LOG.info("No regions to load.Exiting"); 373 return true; 374 } 375 loadRegions(regionsToMove); 376 } catch (Exception e) { 377 LOG.error("Error while loading regions to " + hostname, e); 378 return false; 379 } 380 return true; 381 }); 382 return waitTaskToFinish(loadPool, loadTask, "loading"); 383 } 384 385 private void loadRegions(List<RegionInfo> regionsToMove) 386 throws Exception { 387 ServerName server = getTargetServer(); 388 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 389 LOG.info( 390 "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads 391 + " threads.Ack mode:" + this.ack); 392 393 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 394 List<Future<Boolean>> taskList = new ArrayList<>(); 395 int counter = 0; 396 while (counter < regionsToMove.size()) { 397 RegionInfo region = regionsToMove.get(counter); 398 ServerName currentServer = getServerNameForRegion(region); 399 if (currentServer == null) { 400 LOG.warn( 401 "Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); 402 counter++; 403 continue; 404 } else if (server.equals(currentServer)) { 405 LOG.info( 406 "Region " + region.getRegionNameAsString() + " is already on target server=" + server); 407 counter++; 408 continue; 409 } 410 if (ack) { 411 Future<Boolean> task = 412 moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions)); 413 taskList.add(task); 414 } else { 415 Future<Boolean> task = 416 moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions)); 417 taskList.add(task); 418 } 419 counter++; 420 } 421 422 moveRegionsPool.shutdown(); 423 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() 424 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 425 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 426 } 427 428 /** 429 * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In 430 * noAck mode we do not make sure that region is successfully online on the target region 431 * server,hence it is best effort.We do not unload regions to hostnames given in 432 * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions 433 * to hostnames provided in {@link #designatedFile} 434 * @return true if unloading succeeded, false otherwise 435 */ 436 public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { 437 deleteFile(this.filename); 438 ExecutorService unloadPool = Executors.newFixedThreadPool(1); 439 Future<Boolean> unloadTask = unloadPool.submit(() -> { 440 List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>()); 441 try { 442 // Get Online RegionServers 443 List<ServerName> regionServers = new ArrayList<>(); 444 RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port)); 445 LOG.info("{} belongs to {}", hostname, rsgroup.getName()); 446 regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers())); 447 // Remove the host Region server from target Region Servers list 448 ServerName server = stripServer(regionServers, hostname, port); 449 if (server == null) { 450 LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.", 451 hostname, port); 452 LOG.debug("List of region servers: {}", regionServers); 453 return false; 454 } 455 // Remove RS not present in the designated file 456 includeExcludeRegionServers(designatedFile, regionServers, true); 457 458 // Remove RS present in the exclude file 459 includeExcludeRegionServers(excludeFile, regionServers, false); 460 461 // Remove decommissioned RS 462 Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers()); 463 if (CollectionUtils.isNotEmpty(decommissionedRS)) { 464 regionServers.removeIf(decommissionedRS::contains); 465 LOG.debug("Excluded RegionServers from unloading regions to because they " + 466 "are marked as decommissioned. Servers: {}", decommissionedRS); 467 } 468 469 stripMaster(regionServers); 470 if (regionServers.isEmpty()) { 471 LOG.warn("No Regions were moved - no servers available"); 472 return false; 473 } else { 474 LOG.info("Available servers {}", regionServers); 475 } 476 unloadRegions(server, regionServers, movedRegions); 477 } catch (Exception e) { 478 LOG.error("Error while unloading regions ", e); 479 return false; 480 } finally { 481 if (movedRegions != null) { 482 writeFile(filename, movedRegions); 483 } 484 } 485 return true; 486 }); 487 return waitTaskToFinish(unloadPool, unloadTask, "unloading"); 488 } 489 490 @VisibleForTesting 491 Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup, 492 Collection<ServerName> onlineServers) { 493 if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 494 return onlineServers; 495 } 496 List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size()); 497 for (ServerName server : onlineServers) { 498 Address address = Address.fromParts(server.getHostname(), server.getPort()); 499 if (rsgroup.containsServer(address)) { 500 serverLists.add(server); 501 } 502 } 503 return serverLists; 504 } 505 506 private void unloadRegions(ServerName server, List<ServerName> regionServers, 507 List<RegionInfo> movedRegions) throws Exception { 508 while (true) { 509 List<RegionInfo> regionsToMove = admin.getRegions(server); 510 regionsToMove.removeAll(movedRegions); 511 if (regionsToMove.isEmpty()) { 512 LOG.info("No Regions to move....Quitting now"); 513 break; 514 } 515 int counter = 0; 516 LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " 517 + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" 518 + ack); 519 ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); 520 List<Future<Boolean>> taskList = new ArrayList<>(); 521 int serverIndex = 0; 522 while (counter < regionsToMove.size()) { 523 if (ack) { 524 Future<Boolean> task = moveRegionsPool.submit( 525 new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), 526 movedRegions)); 527 taskList.add(task); 528 } else { 529 Future<Boolean> task = moveRegionsPool.submit( 530 new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), 531 movedRegions)); 532 taskList.add(task); 533 } 534 counter++; 535 serverIndex = (serverIndex + 1) % regionServers.size(); 536 } 537 moveRegionsPool.shutdown(); 538 long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() 539 .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); 540 waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); 541 } 542 } 543 544 private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation) 545 throws TimeoutException, InterruptedException, ExecutionException { 546 pool.shutdown(); 547 try { 548 if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { 549 LOG.warn( 550 "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout 551 + "sec"); 552 pool.shutdownNow(); 553 } 554 } catch (InterruptedException e) { 555 pool.shutdownNow(); 556 Thread.currentThread().interrupt(); 557 } 558 try { 559 return task.get(5, TimeUnit.SECONDS); 560 } catch (InterruptedException e) { 561 LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); 562 throw e; 563 } catch (ExecutionException e) { 564 LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); 565 throw e; 566 } 567 } 568 569 private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, 570 List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception { 571 try { 572 if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { 573 moveRegionsPool.shutdownNow(); 574 } 575 } catch (InterruptedException e) { 576 moveRegionsPool.shutdownNow(); 577 Thread.currentThread().interrupt(); 578 } 579 for (Future<Boolean> future : taskList) { 580 try { 581 // if even after shutdownNow threads are stuck we wait for 5 secs max 582 if (!future.get(5, TimeUnit.SECONDS)) { 583 LOG.error("Was Not able to move region....Exiting Now"); 584 throw new Exception("Could not move region Exception"); 585 } 586 } catch (InterruptedException e) { 587 LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); 588 throw e; 589 } catch (ExecutionException e) { 590 LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); 591 throw e; 592 } catch (CancellationException e) { 593 LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds 594 + "secs", e); 595 throw e; 596 } 597 } 598 } 599 600 private ServerName getTargetServer() throws Exception { 601 ServerName server = null; 602 int maxWaitInSeconds = 603 admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); 604 long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; 605 while (EnvironmentEdgeManager.currentTime() < maxWait) { 606 try { 607 List<ServerName> regionServers = new ArrayList<>(); 608 regionServers.addAll(admin.getRegionServers()); 609 // Remove the host Region server from target Region Servers list 610 server = stripServer(regionServers, hostname, port); 611 if (server != null) { 612 break; 613 } else { 614 LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); 615 } 616 } catch (IOException e) { 617 LOG.warn("Could not get list of region servers", e); 618 } 619 Thread.sleep(500); 620 } 621 if (server == null) { 622 LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); 623 throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); 624 } 625 return server; 626 } 627 628 private List<RegionInfo> readRegionsFromFile(String filename) throws IOException { 629 List<RegionInfo> regions = new ArrayList<>(); 630 File f = new File(filename); 631 if (!f.exists()) { 632 return regions; 633 } 634 try (DataInputStream dis = new DataInputStream( 635 new BufferedInputStream(new FileInputStream(f)))) { 636 int numRegions = dis.readInt(); 637 int index = 0; 638 while (index < numRegions) { 639 regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); 640 index++; 641 } 642 } catch (IOException e) { 643 LOG.error("Error while reading regions from file:" + filename, e); 644 throw e; 645 } 646 return regions; 647 } 648 649 /** 650 * Write the number of regions moved in the first line followed by regions moved in subsequent 651 * lines 652 */ 653 private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException { 654 try (DataOutputStream dos = new DataOutputStream( 655 new BufferedOutputStream(new FileOutputStream(filename)))) { 656 dos.writeInt(movedRegions.size()); 657 for (RegionInfo region : movedRegions) { 658 Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); 659 } 660 } catch (IOException e) { 661 LOG.error( 662 "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions 663 .size() + " regions", e); 664 throw e; 665 } 666 } 667 668 private void deleteFile(String filename) { 669 File f = new File(filename); 670 if (f.exists()) { 671 f.delete(); 672 } 673 } 674 675 /** 676 * @param filename The file should have 'host:port' per line 677 * @return List of servers from the file in format 'hostname:port'. 678 */ 679 private List<String> readServersFromFile(String filename) throws IOException { 680 List<String> servers = new ArrayList<>(); 681 if (filename != null) { 682 try { 683 Files.readAllLines(Paths.get(filename)).stream().map(String::trim) 684 .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase) 685 .forEach(servers::add); 686 } catch (IOException e) { 687 LOG.error("Exception while reading servers from file,", e); 688 throw e; 689 } 690 } 691 return servers; 692 } 693 694 /** 695 * Designates or excludes the servername whose hostname and port portion matches the list given 696 * in the file. 697 * Example:<br> 698 * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and 699 * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and 700 * RS3 are removed from regionServers list so that regions can move to only RS1. 701 * If you want to exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. 702 * When we call includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from 703 * regionServers list so that regions can move to only RS2 and RS3. 704 */ 705 private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers, 706 boolean isInclude) throws IOException { 707 if (fileName != null) { 708 List<String> servers = readServersFromFile(fileName); 709 if (servers.isEmpty()) { 710 LOG.warn("No servers provided in the file: {}." + fileName); 711 return; 712 } 713 Iterator<ServerName> i = regionServers.iterator(); 714 while (i.hasNext()) { 715 String rs = i.next().getServerName(); 716 String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs 717 .split(ServerName.SERVERNAME_SEPARATOR)[1]; 718 if (isInclude != servers.contains(rsPort)) { 719 i.remove(); 720 } 721 } 722 } 723 } 724 725 /** 726 * Exclude master from list of RSs to move regions to 727 */ 728 private void stripMaster(List<ServerName> regionServers) throws IOException { 729 ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); 730 stripServer(regionServers, master.getHostname(), master.getPort()); 731 } 732 733 /** 734 * Remove the servername whose hostname and port portion matches from the passed array of servers. 735 * Returns as side-effect the servername removed. 736 * @return server removed from list of Region Servers 737 */ 738 private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) { 739 for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) { 740 ServerName server = iter.next(); 741 if (server.getAddress().getHostname().equalsIgnoreCase(hostname) && 742 server.getAddress().getPort() == port) { 743 iter.remove(); 744 return server; 745 } 746 } 747 return null; 748 } 749 750 /** 751 * Tries to scan a row from passed region 752 */ 753 private void isSuccessfulScan(RegionInfo region) throws IOException { 754 Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit() 755 .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter()) 756 .setCacheBlocks(false); 757 try (Table table = conn.getTable(region.getTable()); 758 ResultScanner scanner = table.getScanner(scan)) { 759 scanner.next(); 760 } catch (IOException e) { 761 LOG.error("Could not scan region:" + region.getEncodedName(), e); 762 throw e; 763 } 764 } 765 766 /** 767 * Returns true if passed region is still on serverName when we look at hbase:meta. 768 * @return true if region is hosted on serverName otherwise false 769 */ 770 private boolean isSameServer(RegionInfo region, ServerName serverName) 771 throws IOException { 772 ServerName serverForRegion = getServerNameForRegion(region); 773 if (serverForRegion != null && serverForRegion.equals(serverName)) { 774 return true; 775 } 776 return false; 777 } 778 779 /** 780 * Get servername that is up in hbase:meta hosting the given region. this is hostname + port + 781 * startcode comma-delimited. Can return null 782 * @return regionServer hosting the given region 783 */ 784 private ServerName getServerNameForRegion(RegionInfo region) throws IOException { 785 if (!admin.isTableEnabled(region.getTable())) { 786 return null; 787 } 788 HRegionLocation loc = 789 conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), 790 region.getReplicaId(),true); 791 if (loc != null) { 792 return loc.getServerName(); 793 } else { 794 return null; 795 } 796 } 797 798 @Override 799 protected void addOptions() { 800 this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>"); 801 this.addRequiredOptWithArg("o", "operation", "Expected: load/unload"); 802 this.addOptWithArg("m", "maxthreads", 803 "Define the maximum number of threads to use to unload and reload the regions"); 804 this.addOptWithArg("x", "excludefile", 805 "File with <hostname:port> per line to exclude as unload targets; default excludes only " 806 + "target host; useful for rack decommisioning."); 807 this.addOptWithArg("d","designatedfile","File with <hostname:port> per line as unload targets;" 808 + "default is all online hosts"); 809 this.addOptWithArg("f", "filename", 810 "File to save regions list into unloading, or read from loading; " 811 + "default /tmp/<usernamehostname:port>"); 812 this.addOptNoArg("n", "noack", 813 "Turn on No-Ack mode(default: false) which won't check if region is online on target " 814 + "RegionServer, hence best effort. This is more performant in unloading and loading " 815 + "but might lead to region being unavailable for some time till master reassigns it " 816 + "in case the move failed"); 817 this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " 818 + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); 819 } 820 821 @Override 822 protected void processOptions(CommandLine cmd) { 823 String hostname = cmd.getOptionValue("r"); 824 rmbuilder = new RegionMoverBuilder(hostname); 825 if (cmd.hasOption('m')) { 826 rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); 827 } 828 if (cmd.hasOption('n')) { 829 rmbuilder.ack(false); 830 } 831 if (cmd.hasOption('f')) { 832 rmbuilder.filename(cmd.getOptionValue('f')); 833 } 834 if (cmd.hasOption('x')) { 835 rmbuilder.excludeFile(cmd.getOptionValue('x')); 836 } 837 if (cmd.hasOption('d')) { 838 rmbuilder.designatedFile(cmd.getOptionValue('d')); 839 } 840 if (cmd.hasOption('t')) { 841 rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); 842 } 843 this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT); 844 } 845 846 @Override 847 protected int doWork() throws Exception { 848 boolean success; 849 try (RegionMover rm = rmbuilder.build()) { 850 if (loadUnload.equalsIgnoreCase("load")) { 851 success = rm.load(); 852 } else if (loadUnload.equalsIgnoreCase("unload")) { 853 success = rm.unload(); 854 } else { 855 printUsage(); 856 success = false; 857 } 858 } 859 return (success ? 0 : 1); 860 } 861 862 public static void main(String[] args) { 863 try (RegionMover mover = new RegionMover()) { 864 mover.doStaticMain(args); 865 } 866 } 867}