001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.InetAddress;
030import java.nio.file.Files;
031import java.nio.file.Paths;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.EnumSet;
036import java.util.HashSet;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Locale;
040import java.util.Optional;
041import java.util.Set;
042import java.util.concurrent.Callable;
043import java.util.concurrent.CancellationException;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.TimeoutException;
050import java.util.function.Predicate;
051import org.apache.commons.io.IOUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.UnknownRegionException;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
062import org.apache.hadoop.hbase.client.RegionInfo;
063import org.apache.hadoop.hbase.master.RackManager;
064import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
065import org.apache.hadoop.hbase.net.Address;
066import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
072import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
073import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
074
075/**
076 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
077 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
078 * acknowledges if regions are online after movement while noAck mode is best effort mode that
079 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
080 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
081 * anyways. This can also be used by constructiong an Object using the builder and then calling
082 * {@link #load()} or {@link #unload()} methods for the desired operations.
083 */
084@InterfaceAudience.Public
085public class RegionMover extends AbstractHBaseTool implements Closeable {
086  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
087  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
088  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
089  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
090  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
091  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
092
093  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
094
095  private RegionMoverBuilder rmbuilder;
096  private boolean ack = true;
097  private int maxthreads = 1;
098  private int timeout;
099  private String loadUnload;
100  private String hostname;
101  private String filename;
102  private String excludeFile;
103  private String designatedFile;
104  private int port;
105  private Connection conn;
106  private Admin admin;
107  private RackManager rackManager;
108
109  private RegionMover(RegionMoverBuilder builder) throws IOException {
110    this.hostname = builder.hostname;
111    this.filename = builder.filename;
112    this.excludeFile = builder.excludeFile;
113    this.designatedFile = builder.designatedFile;
114    this.maxthreads = builder.maxthreads;
115    this.ack = builder.ack;
116    this.port = builder.port;
117    this.timeout = builder.timeout;
118    setConf(builder.conf);
119    this.conn = ConnectionFactory.createConnection(conf);
120    this.admin = conn.getAdmin();
121
122    // if the hostname of master is ip, it indicates that the master/RS has enabled use-ip, we need
123    // to resolve the current hostname to ip to ensure that the RegionMover logic can be executed
124    // normally, see HBASE-27304 for details.
125    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
126    if (InetAddresses.isInetAddress(master.getHostname())) {
127      if (!InetAddresses.isInetAddress(this.hostname)) {
128        this.hostname = InetAddress.getByName(this.hostname).getHostAddress();
129      }
130    }
131
132    // Only while running unit tests, builder.rackManager will not be null for the convenience of
133    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
134    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
135    // provided as @InterfaceAudience.Private and it is commented that this is just
136    // to be used by unit test.
137    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
138  }
139
140  private RegionMover() {
141  }
142
143  @Override
144  public void close() {
145    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
146    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
147  }
148
149  /**
150   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
151   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
152   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
153   * the corresponding options.
154   */
155  public static class RegionMoverBuilder {
156    private boolean ack = true;
157    private int maxthreads = 1;
158    private int timeout = Integer.MAX_VALUE;
159    private String hostname;
160    private String filename;
161    private String excludeFile = null;
162    private String designatedFile = null;
163    private String defaultDir = System.getProperty("java.io.tmpdir");
164    @InterfaceAudience.Private
165    final int port;
166    private final Configuration conf;
167    private RackManager rackManager;
168
169    public RegionMoverBuilder(String hostname) {
170      this(hostname, createConf());
171    }
172
173    /**
174     * Creates a new configuration and sets region mover specific overrides
175     */
176    private static Configuration createConf() {
177      Configuration conf = HBaseConfiguration.create();
178      conf.setInt("hbase.client.prefetch.limit", 1);
179      conf.setInt("hbase.client.pause", 500);
180      conf.setInt("hbase.client.retries.number", 100);
181      return conf;
182    }
183
184    /**
185     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
186     *                 hostname:port.
187     * @param conf     Configuration object
188     */
189    public RegionMoverBuilder(String hostname, Configuration conf) {
190      String[] splitHostname = hostname.toLowerCase().split(":");
191      this.hostname = splitHostname[0];
192      if (splitHostname.length == 2) {
193        this.port = Integer.parseInt(splitHostname[1]);
194      } else {
195        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
196      }
197      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
198        + ":" + Integer.toString(this.port);
199      this.conf = conf;
200    }
201
202    /**
203     * Path of file where regions will be written to during unloading/read from during loading
204     * @return RegionMoverBuilder object
205     */
206    public RegionMoverBuilder filename(String filename) {
207      this.filename = filename;
208      return this;
209    }
210
211    /**
212     * Set the max number of threads that will be used to move regions
213     */
214    public RegionMoverBuilder maxthreads(int threads) {
215      this.maxthreads = threads;
216      return this;
217    }
218
219    /**
220     * Path of file containing hostnames to be excluded during region movement. Exclude file should
221     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
222     * host.
223     */
224    public RegionMoverBuilder excludeFile(String excludefile) {
225      this.excludeFile = excludefile;
226      return this;
227    }
228
229    /**
230     * Set the designated file. Designated file contains hostnames where region moves. Designated
231     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
232     * on a single host.
233     * @param designatedFile The designated file
234     * @return RegionMoverBuilder object
235     */
236    public RegionMoverBuilder designatedFile(String designatedFile) {
237      this.designatedFile = designatedFile;
238      return this;
239    }
240
241    /**
242     * Set ack/noAck mode.
243     * <p>
244     * In ack mode regions are acknowledged before and after moving and the move is retried
245     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
246     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
247     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
248     * <p>
249     * @return RegionMoverBuilder object
250     */
251    public RegionMoverBuilder ack(boolean ack) {
252      this.ack = ack;
253      return this;
254    }
255
256    /**
257     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
258     * movers also have a separate time which is hbase.move.wait.max * number of regions to
259     * load/unload
260     * @param timeout in seconds
261     * @return RegionMoverBuilder object
262     */
263    public RegionMoverBuilder timeout(int timeout) {
264      this.timeout = timeout;
265      return this;
266    }
267
268    /**
269     * Set specific rackManager implementation. This setter method is for testing purpose only.
270     * @param rackManager rackManager impl
271     * @return RegionMoverBuilder object
272     */
273    @InterfaceAudience.Private
274    public RegionMoverBuilder rackManager(RackManager rackManager) {
275      this.rackManager = rackManager;
276      return this;
277    }
278
279    /**
280     * This method builds the appropriate RegionMover object which can then be used to load/unload
281     * using load and unload methods
282     * @return RegionMover object
283     */
284    public RegionMover build() throws IOException {
285      return new RegionMover(this);
286    }
287  }
288
289  /**
290   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
291   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
292   * @return true if loading succeeded, false otherwise
293   */
294  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
295    ExecutorService loadPool = Executors.newFixedThreadPool(1);
296    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
297    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
298    if (!isMetaMoved) {
299      return false;
300    }
301    loadPool = Executors.newFixedThreadPool(1);
302    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
303    return waitTaskToFinish(loadPool, loadTask, "loading");
304  }
305
306  private Callable<Boolean> getMetaRegionMovePlan() {
307    return getRegionsMovePlan(true);
308  }
309
310  private Callable<Boolean> getNonMetaRegionsMovePlan() {
311    return getRegionsMovePlan(false);
312  }
313
314  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
315    return () -> {
316      try {
317        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
318        if (regionsToMove.isEmpty()) {
319          LOG.info("No regions to load.Exiting");
320          return true;
321        }
322        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
323        if (moveMetaRegion) {
324          if (metaRegion.isPresent()) {
325            loadRegions(Collections.singletonList(metaRegion.get()));
326          }
327        } else {
328          metaRegion.ifPresent(regionsToMove::remove);
329          loadRegions(regionsToMove);
330        }
331      } catch (Exception e) {
332        LOG.error("Error while loading regions to " + hostname, e);
333        return false;
334      }
335      return true;
336    };
337  }
338
339  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
340    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
341  }
342
343  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
344    ServerName server = getTargetServer();
345    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
346    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
347      + this.maxthreads + " threads.Ack mode:" + this.ack);
348
349    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
350    List<Future<Boolean>> taskList = new ArrayList<>();
351    int counter = 0;
352    while (counter < regionsToMove.size()) {
353      RegionInfo region = regionsToMove.get(counter);
354      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
355      if (currentServer == null) {
356        LOG
357          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
358        counter++;
359        continue;
360      } else if (server.equals(currentServer)) {
361        LOG.info(
362          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
363        counter++;
364        continue;
365      }
366      if (ack) {
367        Future<Boolean> task = moveRegionsPool
368          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
369        taskList.add(task);
370      } else {
371        Future<Boolean> task = moveRegionsPool
372          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
373        taskList.add(task);
374      }
375      counter++;
376    }
377
378    moveRegionsPool.shutdown();
379    long timeoutInSeconds = regionsToMove.size()
380      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
381    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
382  }
383
384  /**
385   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
386   * noAck mode we do not make sure that region is successfully online on the target region
387   * server,hence it is best effort.We do not unload regions to hostnames given in
388   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
389   * to hostnames provided in {@link #designatedFile}
390   * @return true if unloading succeeded, false otherwise
391   */
392  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
393    return unloadRegions(false);
394  }
395
396  /**
397   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
398   * noAck mode we do not make sure that region is successfully online on the target region
399   * server,hence it is best effort.We do not unload regions to hostnames given in
400   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
401   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
402   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
403   * that belong to same rack as source RegionServer.
404   * @return true if unloading succeeded, false otherwise
405   */
406  public boolean unloadFromRack()
407    throws InterruptedException, ExecutionException, TimeoutException {
408    return unloadRegions(true);
409  }
410
411  private boolean unloadRegions(boolean unloadFromRack)
412    throws InterruptedException, ExecutionException, TimeoutException {
413    deleteFile(this.filename);
414    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
415    Future<Boolean> unloadTask = unloadPool.submit(() -> {
416      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
417      try {
418        // Get Online RegionServers
419        List<ServerName> regionServers = new ArrayList<>();
420        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
421        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
422        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
423        // Remove the host Region server from target Region Servers list
424        ServerName server = stripServer(regionServers, hostname, port);
425        if (server == null) {
426          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
427            hostname, port);
428          LOG.debug("List of region servers: {}", regionServers);
429          return false;
430        }
431        // Remove RS not present in the designated file
432        includeExcludeRegionServers(designatedFile, regionServers, true);
433
434        // Remove RS present in the exclude file
435        includeExcludeRegionServers(excludeFile, regionServers, false);
436
437        if (unloadFromRack) {
438          // remove regionServers that belong to same rack (as source host) since the goal is to
439          // unload regions from source regionServer to destination regionServers
440          // that belong to different rack only.
441          String sourceRack = rackManager.getRack(server);
442          List<String> racks = rackManager.getRack(regionServers);
443          Iterator<ServerName> iterator = regionServers.iterator();
444          int i = 0;
445          while (iterator.hasNext()) {
446            iterator.next();
447            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
448              iterator.remove();
449            }
450            i++;
451          }
452        }
453
454        // Remove decommissioned RS
455        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
456        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
457          regionServers.removeIf(decommissionedRS::contains);
458          LOG.debug("Excluded RegionServers from unloading regions to because they "
459            + "are marked as decommissioned. Servers: {}", decommissionedRS);
460        }
461
462        stripMaster(regionServers);
463        if (regionServers.isEmpty()) {
464          LOG.warn("No Regions were moved - no servers available");
465          return false;
466        } else {
467          LOG.info("Available servers {}", regionServers);
468        }
469        unloadRegions(server, regionServers, movedRegions);
470      } catch (Exception e) {
471        LOG.error("Error while unloading regions ", e);
472        return false;
473      } finally {
474        if (movedRegions != null) {
475          writeFile(filename, movedRegions);
476        }
477      }
478      return true;
479    });
480    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
481  }
482
483  @InterfaceAudience.Private
484  Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
485    Collection<ServerName> onlineServers) {
486    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
487      return onlineServers;
488    }
489    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
490    for (ServerName server : onlineServers) {
491      Address address = Address.fromParts(server.getHostname(), server.getPort());
492      if (rsgroup.containsServer(address)) {
493        serverLists.add(server);
494      }
495    }
496    return serverLists;
497  }
498
499  private void unloadRegions(ServerName server, List<ServerName> regionServers,
500    List<RegionInfo> movedRegions) throws Exception {
501    while (true) {
502      List<RegionInfo> regionsToMove = admin.getRegions(server);
503      regionsToMove.removeAll(movedRegions);
504      if (regionsToMove.isEmpty()) {
505        LOG.info("No Regions to move....Quitting now");
506        break;
507      }
508      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
509        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
510
511      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
512      if (metaRegion.isPresent()) {
513        RegionInfo meta = metaRegion.get();
514        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
515          Collections.singletonList(meta));
516        regionsToMove.remove(meta);
517      }
518      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
519    }
520  }
521
522  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
523    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
524    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
525    List<Future<Boolean>> taskList = new ArrayList<>();
526    int serverIndex = 0;
527    for (RegionInfo regionToMove : regionsToMove) {
528      if (ack) {
529        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
530          regionServers.get(serverIndex), movedRegions));
531        taskList.add(task);
532      } else {
533        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
534          server, regionServers.get(serverIndex), movedRegions));
535        taskList.add(task);
536      }
537      serverIndex = (serverIndex + 1) % regionServers.size();
538    }
539    moveRegionsPool.shutdown();
540    long timeoutInSeconds = regionsToMove.size()
541      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
542    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
543  }
544
545  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
546    throws TimeoutException, InterruptedException, ExecutionException {
547    pool.shutdown();
548    try {
549      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
550        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
551          + this.timeout + "sec");
552        pool.shutdownNow();
553      }
554    } catch (InterruptedException e) {
555      pool.shutdownNow();
556      Thread.currentThread().interrupt();
557    }
558    try {
559      return task.get(5, TimeUnit.SECONDS);
560    } catch (InterruptedException e) {
561      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
562      throw e;
563    } catch (ExecutionException e) {
564      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
565      throw e;
566    }
567  }
568
569  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
570    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
571    try {
572      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
573        moveRegionsPool.shutdownNow();
574      }
575    } catch (InterruptedException e) {
576      moveRegionsPool.shutdownNow();
577      Thread.currentThread().interrupt();
578    }
579    for (Future<Boolean> future : taskList) {
580      try {
581        // if even after shutdownNow threads are stuck we wait for 5 secs max
582        if (!future.get(5, TimeUnit.SECONDS)) {
583          LOG.error("Was Not able to move region....Exiting Now");
584          throw new Exception("Could not move region Exception");
585        }
586      } catch (InterruptedException e) {
587        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
588        throw e;
589      } catch (ExecutionException e) {
590        boolean ignoreFailure = ignoreRegionMoveFailure(e);
591        if (ignoreFailure) {
592          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
593        } else {
594          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
595          throw e;
596        }
597      } catch (CancellationException e) {
598        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
599          + "secs", e);
600        throw e;
601      }
602    }
603  }
604
605  private boolean ignoreRegionMoveFailure(ExecutionException e) {
606    boolean ignoreFailure = false;
607    if (e.getCause() instanceof UnknownRegionException) {
608      // region does not exist anymore
609      ignoreFailure = true;
610    } else if (
611      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
612        && e.getCause().getMessage()
613          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
614    ) {
615      // region is recently split
616      ignoreFailure = true;
617    }
618    return ignoreFailure;
619  }
620
621  private ServerName getTargetServer() throws Exception {
622    ServerName server = null;
623    int maxWaitInSeconds =
624      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
625    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
626    while (EnvironmentEdgeManager.currentTime() < maxWait) {
627      try {
628        List<ServerName> regionServers = new ArrayList<>();
629        regionServers.addAll(admin.getRegionServers());
630        // Remove the host Region server from target Region Servers list
631        server = stripServer(regionServers, hostname, port);
632        if (server != null) {
633          break;
634        } else {
635          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
636        }
637      } catch (IOException e) {
638        LOG.warn("Could not get list of region servers", e);
639      }
640      Thread.sleep(500);
641    }
642    if (server == null) {
643      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
644      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
645    }
646    return server;
647  }
648
649  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
650    List<RegionInfo> regions = new ArrayList<>();
651    File f = new File(filename);
652    if (!f.exists()) {
653      return regions;
654    }
655    try (
656      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
657      int numRegions = dis.readInt();
658      int index = 0;
659      while (index < numRegions) {
660        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
661        index++;
662      }
663    } catch (IOException e) {
664      LOG.error("Error while reading regions from file:" + filename, e);
665      throw e;
666    }
667    return regions;
668  }
669
670  /**
671   * Write the number of regions moved in the first line followed by regions moved in subsequent
672   * lines
673   */
674  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
675    try (DataOutputStream dos =
676      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
677      dos.writeInt(movedRegions.size());
678      for (RegionInfo region : movedRegions) {
679        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
680      }
681    } catch (IOException e) {
682      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
683        + movedRegions.size() + " regions", e);
684      throw e;
685    }
686  }
687
688  private void deleteFile(String filename) {
689    File f = new File(filename);
690    if (f.exists()) {
691      f.delete();
692    }
693  }
694
695  /**
696   * @param filename The file should have 'host:port' per line
697   * @return List of servers from the file in format 'hostname:port'.
698   */
699  private List<String> readServersFromFile(String filename) throws IOException {
700    List<String> servers = new ArrayList<>();
701    if (filename != null) {
702      try {
703        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
704          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
705          .forEach(servers::add);
706      } catch (IOException e) {
707        LOG.error("Exception while reading servers from file,", e);
708        throw e;
709      }
710    }
711    return servers;
712  }
713
714  /**
715   * Designates or excludes the servername whose hostname and port portion matches the list given in
716   * the file. Example:<br>
717   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
718   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
719   * are removed from regionServers list so that regions can move to only RS1. If you want to
720   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
721   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
722   * list so that regions can move to only RS2 and RS3.
723   */
724  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
725    boolean isInclude) throws IOException {
726    if (fileName != null) {
727      List<String> servers = readServersFromFile(fileName);
728      if (servers.isEmpty()) {
729        LOG.warn("No servers provided in the file: {}." + fileName);
730        return;
731      }
732      Iterator<ServerName> i = regionServers.iterator();
733      while (i.hasNext()) {
734        String rs = i.next().getServerName();
735        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
736          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
737        if (isInclude != servers.contains(rsPort)) {
738          i.remove();
739        }
740      }
741    }
742  }
743
744  /**
745   * Exclude master from list of RSs to move regions to
746   */
747  private void stripMaster(List<ServerName> regionServers) throws IOException {
748    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
749    stripServer(regionServers, master.getHostname(), master.getPort());
750  }
751
752  /**
753   * Remove the servername whose hostname and port portion matches from the passed array of servers.
754   * Returns as side-effect the servername removed.
755   * @return server removed from list of Region Servers
756   */
757  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
758    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
759      ServerName server = iter.next();
760      if (
761        server.getAddress().getHostName().equalsIgnoreCase(hostname)
762          && server.getAddress().getPort() == port
763      ) {
764        iter.remove();
765        return server;
766      }
767    }
768    return null;
769  }
770
771  @Override
772  protected void addOptions() {
773    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
774    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack");
775    this.addOptWithArg("m", "maxthreads",
776      "Define the maximum number of threads to use to unload and reload the regions");
777    this.addOptWithArg("x", "excludefile",
778      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
779        + "target host; useful for rack decommisioning.");
780    this.addOptWithArg("d", "designatedfile",
781      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
782    this.addOptWithArg("f", "filename",
783      "File to save regions list into unloading, or read from loading; "
784        + "default /tmp/<usernamehostname:port>");
785    this.addOptNoArg("n", "noack",
786      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
787        + "RegionServer, hence best effort. This is more performant in unloading and loading "
788        + "but might lead to region being unavailable for some time till master reassigns it "
789        + "in case the move failed");
790    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
791      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
792  }
793
794  @Override
795  protected void processOptions(CommandLine cmd) {
796    String hostname = cmd.getOptionValue("r");
797    rmbuilder = new RegionMoverBuilder(hostname);
798    if (cmd.hasOption('m')) {
799      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
800    }
801    if (cmd.hasOption('n')) {
802      rmbuilder.ack(false);
803    }
804    if (cmd.hasOption('f')) {
805      rmbuilder.filename(cmd.getOptionValue('f'));
806    }
807    if (cmd.hasOption('x')) {
808      rmbuilder.excludeFile(cmd.getOptionValue('x'));
809    }
810    if (cmd.hasOption('d')) {
811      rmbuilder.designatedFile(cmd.getOptionValue('d'));
812    }
813    if (cmd.hasOption('t')) {
814      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
815    }
816    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
817  }
818
819  @Override
820  protected int doWork() throws Exception {
821    boolean success;
822    try (RegionMover rm = rmbuilder.build()) {
823      if (loadUnload.equalsIgnoreCase("load")) {
824        success = rm.load();
825      } else if (loadUnload.equalsIgnoreCase("unload")) {
826        success = rm.unload();
827      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
828        success = rm.unloadFromRack();
829      } else {
830        printUsage();
831        success = false;
832      }
833    }
834    return (success ? 0 : 1);
835  }
836
837  public static void main(String[] args) {
838    try (RegionMover mover = new RegionMover()) {
839      mover.doStaticMain(args);
840    }
841  }
842}