001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.nio.file.Files;
030import java.nio.file.Paths;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.EnumSet;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Locale;
039import java.util.Optional;
040import java.util.Set;
041import java.util.concurrent.Callable;
042import java.util.concurrent.CancellationException;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.TimeoutException;
049import java.util.function.Predicate;
050import org.apache.commons.io.IOUtils;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.hbase.ClusterMetrics.Option;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.ServerName;
056import org.apache.hadoop.hbase.UnknownRegionException;
057import org.apache.hadoop.hbase.client.Admin;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
061import org.apache.hadoop.hbase.client.RegionInfo;
062import org.apache.hadoop.hbase.master.RackManager;
063import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
064import org.apache.hadoop.hbase.net.Address;
065import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
071import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
072
073/**
074 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
075 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
076 * acknowledges if regions are online after movement while noAck mode is best effort mode that
077 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
078 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
079 * anyways. This can also be used by constructiong an Object using the builder and then calling
080 * {@link #load()} or {@link #unload()} methods for the desired operations.
081 */
082@InterfaceAudience.Public
083public class RegionMover extends AbstractHBaseTool implements Closeable {
084  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
085  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
086  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
087  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
088  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
089  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
090
091  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
092
093  private RegionMoverBuilder rmbuilder;
094  private boolean ack = true;
095  private int maxthreads = 1;
096  private int timeout;
097  private String loadUnload;
098  private String hostname;
099  private String filename;
100  private String excludeFile;
101  private String designatedFile;
102  private int port;
103  private Connection conn;
104  private Admin admin;
105  private RackManager rackManager;
106
107  private RegionMover(RegionMoverBuilder builder) throws IOException {
108    this.hostname = builder.hostname;
109    this.filename = builder.filename;
110    this.excludeFile = builder.excludeFile;
111    this.designatedFile = builder.designatedFile;
112    this.maxthreads = builder.maxthreads;
113    this.ack = builder.ack;
114    this.port = builder.port;
115    this.timeout = builder.timeout;
116    setConf(builder.conf);
117    this.conn = ConnectionFactory.createConnection(conf);
118    this.admin = conn.getAdmin();
119    // Only while running unit tests, builder.rackManager will not be null for the convenience of
120    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
121    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
122    // provided as @InterfaceAudience.Private and it is commented that this is just
123    // to be used by unit test.
124    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
125  }
126
127  private RegionMover() {
128  }
129
130  @Override
131  public void close() {
132    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
133    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
134  }
135
136  /**
137   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
138   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
139   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
140   * the corresponding options.
141   */
142  public static class RegionMoverBuilder {
143    private boolean ack = true;
144    private int maxthreads = 1;
145    private int timeout = Integer.MAX_VALUE;
146    private String hostname;
147    private String filename;
148    private String excludeFile = null;
149    private String designatedFile = null;
150    private String defaultDir = System.getProperty("java.io.tmpdir");
151    @InterfaceAudience.Private
152    final int port;
153    private final Configuration conf;
154    private RackManager rackManager;
155
156    public RegionMoverBuilder(String hostname) {
157      this(hostname, createConf());
158    }
159
160    /**
161     * Creates a new configuration and sets region mover specific overrides
162     */
163    private static Configuration createConf() {
164      Configuration conf = HBaseConfiguration.create();
165      conf.setInt("hbase.client.prefetch.limit", 1);
166      conf.setInt("hbase.client.pause", 500);
167      conf.setInt("hbase.client.retries.number", 100);
168      return conf;
169    }
170
171    /**
172     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
173     *                 hostname:port.
174     * @param conf     Configuration object
175     */
176    public RegionMoverBuilder(String hostname, Configuration conf) {
177      String[] splitHostname = hostname.toLowerCase().split(":");
178      this.hostname = splitHostname[0];
179      if (splitHostname.length == 2) {
180        this.port = Integer.parseInt(splitHostname[1]);
181      } else {
182        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
183      }
184      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
185        + ":" + Integer.toString(this.port);
186      this.conf = conf;
187    }
188
189    /**
190     * Path of file where regions will be written to during unloading/read from during loading n
191     * * @return RegionMoverBuilder object
192     */
193    public RegionMoverBuilder filename(String filename) {
194      this.filename = filename;
195      return this;
196    }
197
198    /**
199     * Set the max number of threads that will be used to move regions
200     */
201    public RegionMoverBuilder maxthreads(int threads) {
202      this.maxthreads = threads;
203      return this;
204    }
205
206    /**
207     * Path of file containing hostnames to be excluded during region movement. Exclude file should
208     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
209     * host.
210     */
211    public RegionMoverBuilder excludeFile(String excludefile) {
212      this.excludeFile = excludefile;
213      return this;
214    }
215
216    /**
217     * Set the designated file. Designated file contains hostnames where region moves. Designated
218     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
219     * on a single host.
220     * @param designatedFile The designated file
221     * @return RegionMoverBuilder object
222     */
223    public RegionMoverBuilder designatedFile(String designatedFile) {
224      this.designatedFile = designatedFile;
225      return this;
226    }
227
228    /**
229     * Set ack/noAck mode.
230     * <p>
231     * In ack mode regions are acknowledged before and after moving and the move is retried
232     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
233     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
234     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
235     * <p>
236     * n * @return RegionMoverBuilder object
237     */
238    public RegionMoverBuilder ack(boolean ack) {
239      this.ack = ack;
240      return this;
241    }
242
243    /**
244     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
245     * movers also have a separate time which is hbase.move.wait.max * number of regions to
246     * load/unload
247     * @param timeout in seconds
248     * @return RegionMoverBuilder object
249     */
250    public RegionMoverBuilder timeout(int timeout) {
251      this.timeout = timeout;
252      return this;
253    }
254
255    /**
256     * Set specific rackManager implementation. This setter method is for testing purpose only.
257     * @param rackManager rackManager impl
258     * @return RegionMoverBuilder object
259     */
260    @InterfaceAudience.Private
261    public RegionMoverBuilder rackManager(RackManager rackManager) {
262      this.rackManager = rackManager;
263      return this;
264    }
265
266    /**
267     * This method builds the appropriate RegionMover object which can then be used to load/unload
268     * using load and unload methods
269     * @return RegionMover object
270     */
271    public RegionMover build() throws IOException {
272      return new RegionMover(this);
273    }
274  }
275
276  /**
277   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
278   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
279   * @return true if loading succeeded, false otherwise
280   */
281  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
282    ExecutorService loadPool = Executors.newFixedThreadPool(1);
283    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
284    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
285    if (!isMetaMoved) {
286      return false;
287    }
288    loadPool = Executors.newFixedThreadPool(1);
289    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
290    return waitTaskToFinish(loadPool, loadTask, "loading");
291  }
292
293  private Callable<Boolean> getMetaRegionMovePlan() {
294    return getRegionsMovePlan(true);
295  }
296
297  private Callable<Boolean> getNonMetaRegionsMovePlan() {
298    return getRegionsMovePlan(false);
299  }
300
301  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
302    return () -> {
303      try {
304        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
305        if (regionsToMove.isEmpty()) {
306          LOG.info("No regions to load.Exiting");
307          return true;
308        }
309        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
310        if (moveMetaRegion) {
311          if (metaRegion.isPresent()) {
312            loadRegions(Collections.singletonList(metaRegion.get()));
313          }
314        } else {
315          metaRegion.ifPresent(regionsToMove::remove);
316          loadRegions(regionsToMove);
317        }
318      } catch (Exception e) {
319        LOG.error("Error while loading regions to " + hostname, e);
320        return false;
321      }
322      return true;
323    };
324  }
325
326  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
327    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
328  }
329
330  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
331    ServerName server = getTargetServer();
332    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
333    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
334      + this.maxthreads + " threads.Ack mode:" + this.ack);
335
336    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
337    List<Future<Boolean>> taskList = new ArrayList<>();
338    int counter = 0;
339    while (counter < regionsToMove.size()) {
340      RegionInfo region = regionsToMove.get(counter);
341      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
342      if (currentServer == null) {
343        LOG
344          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
345        counter++;
346        continue;
347      } else if (server.equals(currentServer)) {
348        LOG.info(
349          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
350        counter++;
351        continue;
352      }
353      if (ack) {
354        Future<Boolean> task = moveRegionsPool
355          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
356        taskList.add(task);
357      } else {
358        Future<Boolean> task = moveRegionsPool
359          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
360        taskList.add(task);
361      }
362      counter++;
363    }
364
365    moveRegionsPool.shutdown();
366    long timeoutInSeconds = regionsToMove.size()
367      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
368    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
369  }
370
371  /**
372   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
373   * noAck mode we do not make sure that region is successfully online on the target region
374   * server,hence it is best effort.We do not unload regions to hostnames given in
375   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
376   * to hostnames provided in {@link #designatedFile}
377   * @return true if unloading succeeded, false otherwise
378   */
379  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
380    return unloadRegions(false);
381  }
382
383  /**
384   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
385   * noAck mode we do not make sure that region is successfully online on the target region
386   * server,hence it is best effort.We do not unload regions to hostnames given in
387   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
388   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
389   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
390   * that belong to same rack as source RegionServer.
391   * @return true if unloading succeeded, false otherwise
392   */
393  public boolean unloadFromRack()
394    throws InterruptedException, ExecutionException, TimeoutException {
395    return unloadRegions(true);
396  }
397
398  private boolean unloadRegions(boolean unloadFromRack)
399    throws InterruptedException, ExecutionException, TimeoutException {
400    deleteFile(this.filename);
401    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
402    Future<Boolean> unloadTask = unloadPool.submit(() -> {
403      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
404      try {
405        // Get Online RegionServers
406        List<ServerName> regionServers = new ArrayList<>();
407        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
408        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
409        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
410        // Remove the host Region server from target Region Servers list
411        ServerName server = stripServer(regionServers, hostname, port);
412        if (server == null) {
413          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
414            hostname, port);
415          LOG.debug("List of region servers: {}", regionServers);
416          return false;
417        }
418        // Remove RS not present in the designated file
419        includeExcludeRegionServers(designatedFile, regionServers, true);
420
421        // Remove RS present in the exclude file
422        includeExcludeRegionServers(excludeFile, regionServers, false);
423
424        if (unloadFromRack) {
425          // remove regionServers that belong to same rack (as source host) since the goal is to
426          // unload regions from source regionServer to destination regionServers
427          // that belong to different rack only.
428          String sourceRack = rackManager.getRack(server);
429          List<String> racks = rackManager.getRack(regionServers);
430          Iterator<ServerName> iterator = regionServers.iterator();
431          int i = 0;
432          while (iterator.hasNext()) {
433            iterator.next();
434            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
435              iterator.remove();
436            }
437            i++;
438          }
439        }
440
441        // Remove decommissioned RS
442        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
443        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
444          regionServers.removeIf(decommissionedRS::contains);
445          LOG.debug("Excluded RegionServers from unloading regions to because they "
446            + "are marked as decommissioned. Servers: {}", decommissionedRS);
447        }
448
449        stripMaster(regionServers);
450        if (regionServers.isEmpty()) {
451          LOG.warn("No Regions were moved - no servers available");
452          return false;
453        } else {
454          LOG.info("Available servers {}", regionServers);
455        }
456        unloadRegions(server, regionServers, movedRegions);
457      } catch (Exception e) {
458        LOG.error("Error while unloading regions ", e);
459        return false;
460      } finally {
461        if (movedRegions != null) {
462          writeFile(filename, movedRegions);
463        }
464      }
465      return true;
466    });
467    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
468  }
469
470  @InterfaceAudience.Private
471  Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
472    Collection<ServerName> onlineServers) {
473    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
474      return onlineServers;
475    }
476    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
477    for (ServerName server : onlineServers) {
478      Address address = Address.fromParts(server.getHostname(), server.getPort());
479      if (rsgroup.containsServer(address)) {
480        serverLists.add(server);
481      }
482    }
483    return serverLists;
484  }
485
486  private void unloadRegions(ServerName server, List<ServerName> regionServers,
487    List<RegionInfo> movedRegions) throws Exception {
488    while (true) {
489      List<RegionInfo> regionsToMove = admin.getRegions(server);
490      regionsToMove.removeAll(movedRegions);
491      if (regionsToMove.isEmpty()) {
492        LOG.info("No Regions to move....Quitting now");
493        break;
494      }
495      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
496        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
497
498      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
499      if (metaRegion.isPresent()) {
500        RegionInfo meta = metaRegion.get();
501        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
502          Collections.singletonList(meta));
503        regionsToMove.remove(meta);
504      }
505      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
506    }
507  }
508
509  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
510    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
511    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
512    List<Future<Boolean>> taskList = new ArrayList<>();
513    int serverIndex = 0;
514    for (RegionInfo regionToMove : regionsToMove) {
515      if (ack) {
516        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
517          regionServers.get(serverIndex), movedRegions));
518        taskList.add(task);
519      } else {
520        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
521          server, regionServers.get(serverIndex), movedRegions));
522        taskList.add(task);
523      }
524      serverIndex = (serverIndex + 1) % regionServers.size();
525    }
526    moveRegionsPool.shutdown();
527    long timeoutInSeconds = regionsToMove.size()
528      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
529    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
530  }
531
532  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
533    throws TimeoutException, InterruptedException, ExecutionException {
534    pool.shutdown();
535    try {
536      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
537        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
538          + this.timeout + "sec");
539        pool.shutdownNow();
540      }
541    } catch (InterruptedException e) {
542      pool.shutdownNow();
543      Thread.currentThread().interrupt();
544    }
545    try {
546      return task.get(5, TimeUnit.SECONDS);
547    } catch (InterruptedException e) {
548      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
549      throw e;
550    } catch (ExecutionException e) {
551      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
552      throw e;
553    }
554  }
555
556  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
557    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
558    try {
559      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
560        moveRegionsPool.shutdownNow();
561      }
562    } catch (InterruptedException e) {
563      moveRegionsPool.shutdownNow();
564      Thread.currentThread().interrupt();
565    }
566    for (Future<Boolean> future : taskList) {
567      try {
568        // if even after shutdownNow threads are stuck we wait for 5 secs max
569        if (!future.get(5, TimeUnit.SECONDS)) {
570          LOG.error("Was Not able to move region....Exiting Now");
571          throw new Exception("Could not move region Exception");
572        }
573      } catch (InterruptedException e) {
574        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
575        throw e;
576      } catch (ExecutionException e) {
577        boolean ignoreFailure = ignoreRegionMoveFailure(e);
578        if (ignoreFailure) {
579          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
580        } else {
581          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
582          throw e;
583        }
584      } catch (CancellationException e) {
585        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
586          + "secs", e);
587        throw e;
588      }
589    }
590  }
591
592  private boolean ignoreRegionMoveFailure(ExecutionException e) {
593    boolean ignoreFailure = false;
594    if (e.getCause() instanceof UnknownRegionException) {
595      // region does not exist anymore
596      ignoreFailure = true;
597    } else if (
598      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
599        && e.getCause().getMessage()
600          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
601    ) {
602      // region is recently split
603      ignoreFailure = true;
604    }
605    return ignoreFailure;
606  }
607
608  private ServerName getTargetServer() throws Exception {
609    ServerName server = null;
610    int maxWaitInSeconds =
611      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
612    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
613    while (EnvironmentEdgeManager.currentTime() < maxWait) {
614      try {
615        List<ServerName> regionServers = new ArrayList<>();
616        regionServers.addAll(admin.getRegionServers());
617        // Remove the host Region server from target Region Servers list
618        server = stripServer(regionServers, hostname, port);
619        if (server != null) {
620          break;
621        } else {
622          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
623        }
624      } catch (IOException e) {
625        LOG.warn("Could not get list of region servers", e);
626      }
627      Thread.sleep(500);
628    }
629    if (server == null) {
630      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
631      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
632    }
633    return server;
634  }
635
636  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
637    List<RegionInfo> regions = new ArrayList<>();
638    File f = new File(filename);
639    if (!f.exists()) {
640      return regions;
641    }
642    try (
643      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
644      int numRegions = dis.readInt();
645      int index = 0;
646      while (index < numRegions) {
647        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
648        index++;
649      }
650    } catch (IOException e) {
651      LOG.error("Error while reading regions from file:" + filename, e);
652      throw e;
653    }
654    return regions;
655  }
656
657  /**
658   * Write the number of regions moved in the first line followed by regions moved in subsequent
659   * lines
660   */
661  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
662    try (DataOutputStream dos =
663      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
664      dos.writeInt(movedRegions.size());
665      for (RegionInfo region : movedRegions) {
666        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
667      }
668    } catch (IOException e) {
669      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
670        + movedRegions.size() + " regions", e);
671      throw e;
672    }
673  }
674
675  private void deleteFile(String filename) {
676    File f = new File(filename);
677    if (f.exists()) {
678      f.delete();
679    }
680  }
681
682  /**
683   * @param filename The file should have 'host:port' per line
684   * @return List of servers from the file in format 'hostname:port'.
685   */
686  private List<String> readServersFromFile(String filename) throws IOException {
687    List<String> servers = new ArrayList<>();
688    if (filename != null) {
689      try {
690        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
691          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
692          .forEach(servers::add);
693      } catch (IOException e) {
694        LOG.error("Exception while reading servers from file,", e);
695        throw e;
696      }
697    }
698    return servers;
699  }
700
701  /**
702   * Designates or excludes the servername whose hostname and port portion matches the list given in
703   * the file. Example:<br>
704   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
705   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
706   * are removed from regionServers list so that regions can move to only RS1. If you want to
707   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
708   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
709   * list so that regions can move to only RS2 and RS3.
710   */
711  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
712    boolean isInclude) throws IOException {
713    if (fileName != null) {
714      List<String> servers = readServersFromFile(fileName);
715      if (servers.isEmpty()) {
716        LOG.warn("No servers provided in the file: {}." + fileName);
717        return;
718      }
719      Iterator<ServerName> i = regionServers.iterator();
720      while (i.hasNext()) {
721        String rs = i.next().getServerName();
722        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
723          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
724        if (isInclude != servers.contains(rsPort)) {
725          i.remove();
726        }
727      }
728    }
729  }
730
731  /**
732   * Exclude master from list of RSs to move regions to
733   */
734  private void stripMaster(List<ServerName> regionServers) throws IOException {
735    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
736    stripServer(regionServers, master.getHostname(), master.getPort());
737  }
738
739  /**
740   * Remove the servername whose hostname and port portion matches from the passed array of servers.
741   * Returns as side-effect the servername removed.
742   * @return server removed from list of Region Servers
743   */
744  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
745    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
746      ServerName server = iter.next();
747      if (
748        server.getAddress().getHostName().equalsIgnoreCase(hostname)
749          && server.getAddress().getPort() == port
750      ) {
751        iter.remove();
752        return server;
753      }
754    }
755    return null;
756  }
757
758  @Override
759  protected void addOptions() {
760    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
761    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack");
762    this.addOptWithArg("m", "maxthreads",
763      "Define the maximum number of threads to use to unload and reload the regions");
764    this.addOptWithArg("x", "excludefile",
765      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
766        + "target host; useful for rack decommisioning.");
767    this.addOptWithArg("d", "designatedfile",
768      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
769    this.addOptWithArg("f", "filename",
770      "File to save regions list into unloading, or read from loading; "
771        + "default /tmp/<usernamehostname:port>");
772    this.addOptNoArg("n", "noack",
773      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
774        + "RegionServer, hence best effort. This is more performant in unloading and loading "
775        + "but might lead to region being unavailable for some time till master reassigns it "
776        + "in case the move failed");
777    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
778      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
779  }
780
781  @Override
782  protected void processOptions(CommandLine cmd) {
783    String hostname = cmd.getOptionValue("r");
784    rmbuilder = new RegionMoverBuilder(hostname);
785    if (cmd.hasOption('m')) {
786      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
787    }
788    if (cmd.hasOption('n')) {
789      rmbuilder.ack(false);
790    }
791    if (cmd.hasOption('f')) {
792      rmbuilder.filename(cmd.getOptionValue('f'));
793    }
794    if (cmd.hasOption('x')) {
795      rmbuilder.excludeFile(cmd.getOptionValue('x'));
796    }
797    if (cmd.hasOption('d')) {
798      rmbuilder.designatedFile(cmd.getOptionValue('d'));
799    }
800    if (cmd.hasOption('t')) {
801      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
802    }
803    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
804  }
805
806  @Override
807  protected int doWork() throws Exception {
808    boolean success;
809    try (RegionMover rm = rmbuilder.build()) {
810      if (loadUnload.equalsIgnoreCase("load")) {
811        success = rm.load();
812      } else if (loadUnload.equalsIgnoreCase("unload")) {
813        success = rm.unload();
814      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
815        success = rm.unloadFromRack();
816      } else {
817        printUsage();
818        success = false;
819      }
820    }
821    return (success ? 0 : 1);
822  }
823
824  public static void main(String[] args) {
825    try (RegionMover mover = new RegionMover()) {
826      mover.doStaticMain(args);
827    }
828  }
829}