001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.Closeable;
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.nio.file.Files;
030import java.nio.file.Paths;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.EnumSet;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Locale;
038import java.util.Optional;
039import java.util.Set;
040import java.util.concurrent.Callable;
041import java.util.concurrent.CancellationException;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Executors;
045import java.util.concurrent.Future;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.TimeoutException;
048import java.util.function.Predicate;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.ClusterMetrics.Option;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.UnknownRegionException;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.master.RackManager;
062import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
068import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
069
070/**
071 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
072 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
073 * acknowledges if regions are online after movement while noAck mode is best effort mode that
074 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
075 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
076 * anyways. This can also be used by constructiong an Object using the builder and then calling
077 * {@link #load()} or {@link #unload()} methods for the desired operations.
078 */
079@InterfaceAudience.Public
080public class RegionMover extends AbstractHBaseTool implements Closeable {
081  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
082  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
083  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
084  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
085  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
086  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
087
088  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
089
090  private RegionMoverBuilder rmbuilder;
091  private boolean ack = true;
092  private int maxthreads = 1;
093  private int timeout;
094  private String loadUnload;
095  private String hostname;
096  private String filename;
097  private String excludeFile;
098  private String designatedFile;
099  private int port;
100  private Connection conn;
101  private Admin admin;
102  private RackManager rackManager;
103
104  private RegionMover(RegionMoverBuilder builder) throws IOException {
105    this.hostname = builder.hostname;
106    this.filename = builder.filename;
107    this.excludeFile = builder.excludeFile;
108    this.designatedFile = builder.designatedFile;
109    this.maxthreads = builder.maxthreads;
110    this.ack = builder.ack;
111    this.port = builder.port;
112    this.timeout = builder.timeout;
113    setConf(builder.conf);
114    this.conn = ConnectionFactory.createConnection(conf);
115    this.admin = conn.getAdmin();
116    // Only while running unit tests, builder.rackManager will not be null for the convenience of
117    // providing custom rackManager. Otherwise for regular workflow/user triggered action,
118    // builder.rackManager is supposed to be null. Hence, setter of builder.rackManager is
119    // provided as @InterfaceAudience.Private and it is commented that this is just
120    // to be used by unit test.
121    rackManager = builder.rackManager == null ? new RackManager(conf) : builder.rackManager;
122  }
123
124  private RegionMover() {
125  }
126
127  @Override
128  public void close() {
129    IOUtils.closeQuietly(this.admin, e -> LOG.warn("failed to close admin", e));
130    IOUtils.closeQuietly(this.conn, e -> LOG.warn("failed to close conn", e));
131  }
132
133  /**
134   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
135   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
136   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
137   * the corresponding options.
138   */
139  public static class RegionMoverBuilder {
140    private boolean ack = true;
141    private int maxthreads = 1;
142    private int timeout = Integer.MAX_VALUE;
143    private String hostname;
144    private String filename;
145    private String excludeFile = null;
146    private String designatedFile = null;
147    private String defaultDir = System.getProperty("java.io.tmpdir");
148    @InterfaceAudience.Private
149    final int port;
150    private final Configuration conf;
151    private RackManager rackManager;
152
153    public RegionMoverBuilder(String hostname) {
154      this(hostname, createConf());
155    }
156
157    /**
158     * Creates a new configuration and sets region mover specific overrides
159     */
160    private static Configuration createConf() {
161      Configuration conf = HBaseConfiguration.create();
162      conf.setInt("hbase.client.prefetch.limit", 1);
163      conf.setInt("hbase.client.pause", 500);
164      conf.setInt("hbase.client.retries.number", 100);
165      return conf;
166    }
167
168    /**
169     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname or
170     *                 hostname:port.
171     * @param conf     Configuration object
172     */
173    public RegionMoverBuilder(String hostname, Configuration conf) {
174      String[] splitHostname = hostname.toLowerCase().split(":");
175      this.hostname = splitHostname[0];
176      if (splitHostname.length == 2) {
177        this.port = Integer.parseInt(splitHostname[1]);
178      } else {
179        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
180      }
181      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
182        + ":" + Integer.toString(this.port);
183      this.conf = conf;
184    }
185
186    /**
187     * Path of file where regions will be written to during unloading/read from during loading n
188     * * @return RegionMoverBuilder object
189     */
190    public RegionMoverBuilder filename(String filename) {
191      this.filename = filename;
192      return this;
193    }
194
195    /**
196     * Set the max number of threads that will be used to move regions
197     */
198    public RegionMoverBuilder maxthreads(int threads) {
199      this.maxthreads = threads;
200      return this;
201    }
202
203    /**
204     * Path of file containing hostnames to be excluded during region movement. Exclude file should
205     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
206     * host.
207     */
208    public RegionMoverBuilder excludeFile(String excludefile) {
209      this.excludeFile = excludefile;
210      return this;
211    }
212
213    /**
214     * Set the designated file. Designated file contains hostnames where region moves. Designated
215     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
216     * on a single host.
217     * @param designatedFile The designated file
218     * @return RegionMoverBuilder object
219     */
220    public RegionMoverBuilder designatedFile(String designatedFile) {
221      this.designatedFile = designatedFile;
222      return this;
223    }
224
225    /**
226     * Set ack/noAck mode.
227     * <p>
228     * In ack mode regions are acknowledged before and after moving and the move is retried
229     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
230     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
231     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
232     * <p>
233     * n * @return RegionMoverBuilder object
234     */
235    public RegionMoverBuilder ack(boolean ack) {
236      this.ack = ack;
237      return this;
238    }
239
240    /**
241     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
242     * movers also have a separate time which is hbase.move.wait.max * number of regions to
243     * load/unload
244     * @param timeout in seconds
245     * @return RegionMoverBuilder object
246     */
247    public RegionMoverBuilder timeout(int timeout) {
248      this.timeout = timeout;
249      return this;
250    }
251
252    /**
253     * Set specific rackManager implementation. This setter method is for testing purpose only.
254     * @param rackManager rackManager impl
255     * @return RegionMoverBuilder object
256     */
257    @InterfaceAudience.Private
258    public RegionMoverBuilder rackManager(RackManager rackManager) {
259      this.rackManager = rackManager;
260      return this;
261    }
262
263    /**
264     * This method builds the appropriate RegionMover object which can then be used to load/unload
265     * using load and unload methods
266     * @return RegionMover object
267     */
268    public RegionMover build() throws IOException {
269      return new RegionMover(this);
270    }
271  }
272
273  /**
274   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
275   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
276   * @return true if loading succeeded, false otherwise
277   */
278  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
279    ExecutorService loadPool = Executors.newFixedThreadPool(1);
280    Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
281    boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
282    if (!isMetaMoved) {
283      return false;
284    }
285    loadPool = Executors.newFixedThreadPool(1);
286    loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
287    return waitTaskToFinish(loadPool, loadTask, "loading");
288  }
289
290  private Callable<Boolean> getMetaRegionMovePlan() {
291    return getRegionsMovePlan(true);
292  }
293
294  private Callable<Boolean> getNonMetaRegionsMovePlan() {
295    return getRegionsMovePlan(false);
296  }
297
298  private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
299    return () -> {
300      try {
301        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
302        if (regionsToMove.isEmpty()) {
303          LOG.info("No regions to load.Exiting");
304          return true;
305        }
306        Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
307        if (moveMetaRegion) {
308          if (metaRegion.isPresent()) {
309            loadRegions(Collections.singletonList(metaRegion.get()));
310          }
311        } else {
312          metaRegion.ifPresent(regionsToMove::remove);
313          loadRegions(regionsToMove);
314        }
315      } catch (Exception e) {
316        LOG.error("Error while loading regions to " + hostname, e);
317        return false;
318      }
319      return true;
320    };
321  }
322
323  private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
324    return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
325  }
326
327  private void loadRegions(List<RegionInfo> regionsToMove) throws Exception {
328    ServerName server = getTargetServer();
329    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
330    LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
331      + this.maxthreads + " threads.Ack mode:" + this.ack);
332
333    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
334    List<Future<Boolean>> taskList = new ArrayList<>();
335    int counter = 0;
336    while (counter < regionsToMove.size()) {
337      RegionInfo region = regionsToMove.get(counter);
338      ServerName currentServer = MoveWithAck.getServerNameForRegion(region, admin, conn);
339      if (currentServer == null) {
340        LOG
341          .warn("Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
342        counter++;
343        continue;
344      } else if (server.equals(currentServer)) {
345        LOG.info(
346          "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
347        counter++;
348        continue;
349      }
350      if (ack) {
351        Future<Boolean> task = moveRegionsPool
352          .submit(new MoveWithAck(conn, region, currentServer, server, movedRegions));
353        taskList.add(task);
354      } else {
355        Future<Boolean> task = moveRegionsPool
356          .submit(new MoveWithoutAck(admin, region, currentServer, server, movedRegions));
357        taskList.add(task);
358      }
359      counter++;
360    }
361
362    moveRegionsPool.shutdown();
363    long timeoutInSeconds = regionsToMove.size()
364      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
365    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
366  }
367
368  /**
369   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
370   * noAck mode we do not make sure that region is successfully online on the target region
371   * server,hence it is best effort.We do not unload regions to hostnames given in
372   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
373   * to hostnames provided in {@link #designatedFile}
374   * @return true if unloading succeeded, false otherwise
375   */
376  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
377    return unloadRegions(false);
378  }
379
380  /**
381   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
382   * noAck mode we do not make sure that region is successfully online on the target region
383   * server,hence it is best effort.We do not unload regions to hostnames given in
384   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
385   * to hostnames provided in {@link #designatedFile}. While unloading regions, destination
386   * RegionServers are selected from different rack i.e regions should not move to any RegionServers
387   * that belong to same rack as source RegionServer.
388   * @return true if unloading succeeded, false otherwise
389   */
390  public boolean unloadFromRack()
391    throws InterruptedException, ExecutionException, TimeoutException {
392    return unloadRegions(true);
393  }
394
395  private boolean unloadRegions(boolean unloadFromRack)
396    throws InterruptedException, ExecutionException, TimeoutException {
397    deleteFile(this.filename);
398    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
399    Future<Boolean> unloadTask = unloadPool.submit(() -> {
400      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
401      try {
402        // Get Online RegionServers
403        List<ServerName> regionServers = new ArrayList<>();
404        regionServers.addAll(admin.getRegionServers());
405        // Remove the host Region server from target Region Servers list
406        ServerName server = stripServer(regionServers, hostname, port);
407        if (server == null) {
408          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
409            hostname, port);
410          LOG.debug("List of region servers: {}", regionServers);
411          return false;
412        }
413        // Remove RS not present in the designated file
414        includeExcludeRegionServers(designatedFile, regionServers, true);
415
416        // Remove RS present in the exclude file
417        includeExcludeRegionServers(excludeFile, regionServers, false);
418
419        if (unloadFromRack) {
420          // remove regionServers that belong to same rack (as source host) since the goal is to
421          // unload regions from source regionServer to destination regionServers
422          // that belong to different rack only.
423          String sourceRack = rackManager.getRack(server);
424          List<String> racks = rackManager.getRack(regionServers);
425          Iterator<ServerName> iterator = regionServers.iterator();
426          int i = 0;
427          while (iterator.hasNext()) {
428            iterator.next();
429            if (racks.size() > i && racks.get(i) != null && racks.get(i).equals(sourceRack)) {
430              iterator.remove();
431            }
432            i++;
433          }
434        }
435
436        // Remove decommissioned RS
437        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
438        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
439          regionServers.removeIf(decommissionedRS::contains);
440          LOG.debug("Excluded RegionServers from unloading regions to because they "
441            + "are marked as decommissioned. Servers: {}", decommissionedRS);
442        }
443
444        stripMaster(regionServers);
445        if (regionServers.isEmpty()) {
446          LOG.warn("No Regions were moved - no servers available");
447          return false;
448        }
449        unloadRegions(server, regionServers, movedRegions);
450      } catch (Exception e) {
451        LOG.error("Error while unloading regions ", e);
452        return false;
453      } finally {
454        if (movedRegions != null) {
455          writeFile(filename, movedRegions);
456        }
457      }
458      return true;
459    });
460    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
461  }
462
463  private void unloadRegions(ServerName server, List<ServerName> regionServers,
464    List<RegionInfo> movedRegions) throws Exception {
465    while (true) {
466      List<RegionInfo> regionsToMove = admin.getRegions(server);
467      regionsToMove.removeAll(movedRegions);
468      if (regionsToMove.isEmpty()) {
469        LOG.info("No Regions to move....Quitting now");
470        break;
471      }
472      LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
473        regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
474
475      Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
476      if (metaRegion.isPresent()) {
477        RegionInfo meta = metaRegion.get();
478        submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
479          Collections.singletonList(meta));
480        regionsToMove.remove(meta);
481      }
482      submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
483    }
484  }
485
486  private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
487    List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
488    final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
489    List<Future<Boolean>> taskList = new ArrayList<>();
490    int serverIndex = 0;
491    for (RegionInfo regionToMove : regionsToMove) {
492      if (ack) {
493        Future<Boolean> task = moveRegionsPool.submit(new MoveWithAck(conn, regionToMove, server,
494          regionServers.get(serverIndex), movedRegions));
495        taskList.add(task);
496      } else {
497        Future<Boolean> task = moveRegionsPool.submit(new MoveWithoutAck(admin, regionToMove,
498          server, regionServers.get(serverIndex), movedRegions));
499        taskList.add(task);
500      }
501      serverIndex = (serverIndex + 1) % regionServers.size();
502    }
503    moveRegionsPool.shutdown();
504    long timeoutInSeconds = regionsToMove.size()
505      * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
506    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
507  }
508
509  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
510    throws TimeoutException, InterruptedException, ExecutionException {
511    pool.shutdown();
512    try {
513      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
514        LOG.warn("Timed out before finishing the " + operation + " operation. Timeout: "
515          + this.timeout + "sec");
516        pool.shutdownNow();
517      }
518    } catch (InterruptedException e) {
519      pool.shutdownNow();
520      Thread.currentThread().interrupt();
521    }
522    try {
523      return task.get(5, TimeUnit.SECONDS);
524    } catch (InterruptedException e) {
525      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
526      throw e;
527    } catch (ExecutionException e) {
528      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
529      throw e;
530    }
531  }
532
533  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
534    List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
535    try {
536      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
537        moveRegionsPool.shutdownNow();
538      }
539    } catch (InterruptedException e) {
540      moveRegionsPool.shutdownNow();
541      Thread.currentThread().interrupt();
542    }
543    for (Future<Boolean> future : taskList) {
544      try {
545        // if even after shutdownNow threads are stuck we wait for 5 secs max
546        if (!future.get(5, TimeUnit.SECONDS)) {
547          LOG.error("Was Not able to move region....Exiting Now");
548          throw new Exception("Could not move region Exception");
549        }
550      } catch (InterruptedException e) {
551        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
552        throw e;
553      } catch (ExecutionException e) {
554        boolean ignoreFailure = ignoreRegionMoveFailure(e);
555        if (ignoreFailure) {
556          LOG.debug("Ignore region move failure, it might have been split/merged.", e);
557        } else {
558          LOG.error("Got Exception From Thread While moving region {}", e.getMessage(), e);
559          throw e;
560        }
561      } catch (CancellationException e) {
562        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
563          + "secs", e);
564        throw e;
565      }
566    }
567  }
568
569  private boolean ignoreRegionMoveFailure(ExecutionException e) {
570    boolean ignoreFailure = false;
571    if (e.getCause() instanceof UnknownRegionException) {
572      // region does not exist anymore
573      ignoreFailure = true;
574    } else if (
575      e.getCause() instanceof DoNotRetryRegionException && e.getCause().getMessage() != null
576        && e.getCause().getMessage()
577          .contains(AssignmentManager.UNEXPECTED_STATE_REGION + "state=SPLIT,")
578    ) {
579      // region is recently split
580      ignoreFailure = true;
581    }
582    return ignoreFailure;
583  }
584
585  private ServerName getTargetServer() throws Exception {
586    ServerName server = null;
587    int maxWaitInSeconds =
588      admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
589    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
590    while (EnvironmentEdgeManager.currentTime() < maxWait) {
591      try {
592        List<ServerName> regionServers = new ArrayList<>();
593        regionServers.addAll(admin.getRegionServers());
594        // Remove the host Region server from target Region Servers list
595        server = stripServer(regionServers, hostname, port);
596        if (server != null) {
597          break;
598        } else {
599          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
600        }
601      } catch (IOException e) {
602        LOG.warn("Could not get list of region servers", e);
603      }
604      Thread.sleep(500);
605    }
606    if (server == null) {
607      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
608      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
609    }
610    return server;
611  }
612
613  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
614    List<RegionInfo> regions = new ArrayList<>();
615    File f = new File(filename);
616    if (!f.exists()) {
617      return regions;
618    }
619    try (
620      DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(f)))) {
621      int numRegions = dis.readInt();
622      int index = 0;
623      while (index < numRegions) {
624        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
625        index++;
626      }
627    } catch (IOException e) {
628      LOG.error("Error while reading regions from file:" + filename, e);
629      throw e;
630    }
631    return regions;
632  }
633
634  /**
635   * Write the number of regions moved in the first line followed by regions moved in subsequent
636   * lines
637   */
638  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
639    try (DataOutputStream dos =
640      new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filename)))) {
641      dos.writeInt(movedRegions.size());
642      for (RegionInfo region : movedRegions) {
643        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
644      }
645    } catch (IOException e) {
646      LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
647        + movedRegions.size() + " regions", e);
648      throw e;
649    }
650  }
651
652  private void deleteFile(String filename) {
653    File f = new File(filename);
654    if (f.exists()) {
655      f.delete();
656    }
657  }
658
659  /**
660   * @param filename The file should have 'host:port' per line
661   * @return List of servers from the file in format 'hostname:port'.
662   */
663  private List<String> readServersFromFile(String filename) throws IOException {
664    List<String> servers = new ArrayList<>();
665    if (filename != null) {
666      try {
667        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
668          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
669          .forEach(servers::add);
670      } catch (IOException e) {
671        LOG.error("Exception while reading servers from file,", e);
672        throw e;
673      }
674    }
675    return servers;
676  }
677
678  /**
679   * Designates or excludes the servername whose hostname and port portion matches the list given in
680   * the file. Example:<br>
681   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
682   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and RS3
683   * are removed from regionServers list so that regions can move to only RS1. If you want to
684   * exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3. When we call
685   * includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from regionServers
686   * list so that regions can move to only RS2 and RS3.
687   */
688  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
689    boolean isInclude) throws IOException {
690    if (fileName != null) {
691      List<String> servers = readServersFromFile(fileName);
692      if (servers.isEmpty()) {
693        LOG.warn("No servers provided in the file: {}." + fileName);
694        return;
695      }
696      Iterator<ServerName> i = regionServers.iterator();
697      while (i.hasNext()) {
698        String rs = i.next().getServerName();
699        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":"
700          + rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
701        if (isInclude != servers.contains(rsPort)) {
702          i.remove();
703        }
704      }
705    }
706  }
707
708  /**
709   * Exclude master from list of RSs to move regions to
710   */
711  private void stripMaster(List<ServerName> regionServers) throws IOException {
712    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
713    stripServer(regionServers, master.getHostname(), master.getPort());
714  }
715
716  /**
717   * Remove the servername whose hostname and port portion matches from the passed array of servers.
718   * Returns as side-effect the servername removed.
719   * @return server removed from list of Region Servers
720   */
721  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
722    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
723      ServerName server = iter.next();
724      if (
725        server.getAddress().getHostName().equalsIgnoreCase(hostname)
726          && server.getAddress().getPort() == port
727      ) {
728        iter.remove();
729        return server;
730      }
731    }
732    return null;
733  }
734
735  @Override
736  protected void addOptions() {
737    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
738    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload/unload_from_rack");
739    this.addOptWithArg("m", "maxthreads",
740      "Define the maximum number of threads to use to unload and reload the regions");
741    this.addOptWithArg("x", "excludefile",
742      "File with <hostname:port> per line to exclude as unload targets; default excludes only "
743        + "target host; useful for rack decommisioning.");
744    this.addOptWithArg("d", "designatedfile",
745      "File with <hostname:port> per line as unload targets;" + "default is all online hosts");
746    this.addOptWithArg("f", "filename",
747      "File to save regions list into unloading, or read from loading; "
748        + "default /tmp/<usernamehostname:port>");
749    this.addOptNoArg("n", "noack",
750      "Turn on No-Ack mode(default: false) which won't check if region is online on target "
751        + "RegionServer, hence best effort. This is more performant in unloading and loading "
752        + "but might lead to region being unavailable for some time till master reassigns it "
753        + "in case the move failed");
754    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
755      + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
756  }
757
758  @Override
759  protected void processOptions(CommandLine cmd) {
760    String hostname = cmd.getOptionValue("r");
761    rmbuilder = new RegionMoverBuilder(hostname);
762    if (cmd.hasOption('m')) {
763      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
764    }
765    if (cmd.hasOption('n')) {
766      rmbuilder.ack(false);
767    }
768    if (cmd.hasOption('f')) {
769      rmbuilder.filename(cmd.getOptionValue('f'));
770    }
771    if (cmd.hasOption('x')) {
772      rmbuilder.excludeFile(cmd.getOptionValue('x'));
773    }
774    if (cmd.hasOption('d')) {
775      rmbuilder.designatedFile(cmd.getOptionValue('d'));
776    }
777    if (cmd.hasOption('t')) {
778      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
779    }
780    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
781  }
782
783  @Override
784  protected int doWork() throws Exception {
785    boolean success;
786    try (RegionMover rm = rmbuilder.build()) {
787      if (loadUnload.equalsIgnoreCase("load")) {
788        success = rm.load();
789      } else if (loadUnload.equalsIgnoreCase("unload")) {
790        success = rm.unload();
791      } else if (loadUnload.equalsIgnoreCase("unload_from_rack")) {
792        success = rm.unloadFromRack();
793      } else {
794        printUsage();
795        success = false;
796      }
797    }
798    return (success ? 0 : 1);
799  }
800
801  public static void main(String[] args) {
802    try (RegionMover mover = new RegionMover()) {
803      mover.doStaticMain(args);
804    }
805  }
806}