001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.EnumSet;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Set;
042import java.util.concurrent.Callable;
043import java.util.concurrent.CancellationException;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.TimeoutException;
050import java.util.function.Predicate;
051import org.apache.commons.io.IOUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.RegionInfo;
062import org.apache.hadoop.hbase.client.ResultScanner;
063import org.apache.hadoop.hbase.client.Scan;
064import org.apache.hadoop.hbase.client.Table;
065import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
066import org.apache.hadoop.hbase.net.Address;
067import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
073import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
074import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
075
076/**
077 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
078 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
079 * acknowledges if regions are online after movement while noAck mode is best effort mode that
080 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
081 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
082 * anyways. This can also be used by constructiong an Object using the builder and then calling
083 * {@link #load()} or {@link #unload()} methods for the desired operations.
084 */
085@InterfaceAudience.Public
086public class RegionMover extends AbstractHBaseTool implements Closeable {
087  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
088  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
089  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
090  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
091  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
092  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
093
094  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
095
096  private RegionMoverBuilder rmbuilder;
097  private boolean ack = true;
098  private int maxthreads = 1;
099  private int timeout;
100  private String loadUnload;
101  private String hostname;
102  private String filename;
103  private String excludeFile;
104  private String designatedFile;
105  private int port;
106  private Connection conn;
107  private Admin admin;
108
109  private RegionMover(RegionMoverBuilder builder) throws IOException {
110    this.hostname = builder.hostname;
111    this.filename = builder.filename;
112    this.excludeFile = builder.excludeFile;
113    this.designatedFile = builder.designatedFile;
114    this.maxthreads = builder.maxthreads;
115    this.ack = builder.ack;
116    this.port = builder.port;
117    this.timeout = builder.timeout;
118    setConf(builder.conf);
119    this.conn = ConnectionFactory.createConnection(conf);
120    this.admin = conn.getAdmin();
121  }
122
123  private RegionMover() {
124  }
125
126  @Override
127  public void close() {
128    IOUtils.closeQuietly(this.admin);
129    IOUtils.closeQuietly(this.conn);
130  }
131
132  /**
133   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
134   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
135   * {@link #ack(boolean)}, {@link #timeout(int)}, {@link #designatedFile(String)} methods to set
136   * the corresponding options.
137   */
138  public static class RegionMoverBuilder {
139    private boolean ack = true;
140    private int maxthreads = 1;
141    private int timeout = Integer.MAX_VALUE;
142    private String hostname;
143    private String filename;
144    private String excludeFile = null;
145    private String designatedFile = null;
146    private String defaultDir = System.getProperty("java.io.tmpdir");
147    @VisibleForTesting
148    final int port;
149    private final Configuration conf;
150
151    public RegionMoverBuilder(String hostname) {
152      this(hostname, createConf());
153    }
154
155    /**
156     * Creates a new configuration and sets region mover specific overrides
157     */
158    private static Configuration createConf() {
159      Configuration conf = HBaseConfiguration.create();
160      conf.setInt("hbase.client.prefetch.limit", 1);
161      conf.setInt("hbase.client.pause", 500);
162      conf.setInt("hbase.client.retries.number", 100);
163      return conf;
164    }
165
166    /**
167     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
168     *     or hostname:port.
169     * @param conf Configuration object
170     */
171    public RegionMoverBuilder(String hostname, Configuration conf) {
172      String[] splitHostname = hostname.toLowerCase().split(":");
173      this.hostname = splitHostname[0];
174      if (splitHostname.length == 2) {
175        this.port = Integer.parseInt(splitHostname[1]);
176      } else {
177        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
178      }
179      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
180        + ":" + Integer.toString(this.port);
181      this.conf = conf;
182    }
183
184    /**
185     * Path of file where regions will be written to during unloading/read from during loading
186     * @param filename
187     * @return RegionMoverBuilder object
188     */
189    public RegionMoverBuilder filename(String filename) {
190      this.filename = filename;
191      return this;
192    }
193
194    /**
195     * Set the max number of threads that will be used to move regions
196     */
197    public RegionMoverBuilder maxthreads(int threads) {
198      this.maxthreads = threads;
199      return this;
200    }
201
202    /**
203     * Path of file containing hostnames to be excluded during region movement. Exclude file should
204     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
205     * host.
206     */
207    public RegionMoverBuilder excludeFile(String excludefile) {
208      this.excludeFile = excludefile;
209      return this;
210    }
211
212    /**
213     * Set the designated file. Designated file contains hostnames where region moves. Designated
214     * file should have 'host:port' per line. Port is mandatory here as we can have many RS running
215     * on a single host.
216     * @param designatedFile The designated file
217     * @return RegionMoverBuilder object
218     */
219    public RegionMoverBuilder designatedFile(String designatedFile) {
220      this.designatedFile = designatedFile;
221      return this;
222    }
223
224    /**
225     * Set ack/noAck mode.
226     * <p>
227     * In ack mode regions are acknowledged before and after moving and the move is retried
228     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
229     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
230     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
231     * <p>
232     * @param ack
233     * @return RegionMoverBuilder object
234     */
235    public RegionMoverBuilder ack(boolean ack) {
236      this.ack = ack;
237      return this;
238    }
239
240    /**
241     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
242     * movers also have a separate time which is hbase.move.wait.max * number of regions to
243     * load/unload
244     * @param timeout in seconds
245     * @return RegionMoverBuilder object
246     */
247    public RegionMoverBuilder timeout(int timeout) {
248      this.timeout = timeout;
249      return this;
250    }
251
252    /**
253     * This method builds the appropriate RegionMover object which can then be used to load/unload
254     * using load and unload methods
255     * @return RegionMover object
256     */
257    public RegionMover build() throws IOException {
258      return new RegionMover(this);
259    }
260  }
261
262  /**
263   * Move Regions and make sure that they are up on the target server.If a region movement fails we
264   * exit as failure
265   */
266  private class MoveWithAck implements Callable<Boolean> {
267    private RegionInfo region;
268    private ServerName targetServer;
269    private List<RegionInfo> movedRegions;
270    private ServerName sourceServer;
271
272    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
273        ServerName targetServer, List<RegionInfo> movedRegions) {
274      this.region = regionInfo;
275      this.targetServer = targetServer;
276      this.movedRegions = movedRegions;
277      this.sourceServer = sourceServer;
278    }
279
280    @Override
281    public Boolean call() throws IOException, InterruptedException {
282      boolean moved = false;
283      int count = 0;
284      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
285      int maxWaitInSeconds =
286          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
287      long startTime = EnvironmentEdgeManager.currentTime();
288      boolean sameServer = true;
289      // Assert we can scan the region in its current location
290      isSuccessfulScan(region);
291      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
292          + targetServer);
293      while (count < retries && sameServer) {
294        if (count > 0) {
295          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
296        }
297        count = count + 1;
298        admin.move(region.getEncodedNameAsBytes(), targetServer);
299        long maxWait = startTime + (maxWaitInSeconds * 1000);
300        while (EnvironmentEdgeManager.currentTime() < maxWait) {
301          sameServer = isSameServer(region, sourceServer);
302          if (!sameServer) {
303            break;
304          }
305          Thread.sleep(100);
306        }
307      }
308      if (sameServer) {
309        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
310            + ",newServer=" + this.targetServer);
311      } else {
312        isSuccessfulScan(region);
313        LOG.info("Moved Region "
314            + region.getRegionNameAsString()
315            + " cost:"
316            + String.format("%.3f",
317            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
318        moved = true;
319        movedRegions.add(region);
320      }
321      return moved;
322    }
323  }
324
325  /**
326   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
327   * RS down anyways and not abort on a stuck region. Improves movement performance
328   */
329  private class MoveWithoutAck implements Callable<Boolean> {
330    private RegionInfo region;
331    private ServerName targetServer;
332    private List<RegionInfo> movedRegions;
333    private ServerName sourceServer;
334
335    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
336        ServerName targetServer, List<RegionInfo> movedRegions) {
337      this.region = regionInfo;
338      this.targetServer = targetServer;
339      this.movedRegions = movedRegions;
340      this.sourceServer = sourceServer;
341    }
342
343    @Override
344    public Boolean call() {
345      try {
346        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
347            + targetServer);
348        admin.move(region.getEncodedNameAsBytes(), targetServer);
349        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
350            + targetServer);
351      } catch (Exception e) {
352        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
353      } finally {
354        // we add region to the moved regions list in No Ack Mode since this is best effort
355        movedRegions.add(region);
356      }
357      return true;
358    }
359  }
360
361  /**
362   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
363   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
364   * @return true if loading succeeded, false otherwise
365   */
366  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
367    ExecutorService loadPool = Executors.newFixedThreadPool(1);
368    Future<Boolean> loadTask = loadPool.submit(() -> {
369      try {
370        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
371        if (regionsToMove.isEmpty()) {
372          LOG.info("No regions to load.Exiting");
373          return true;
374        }
375        loadRegions(regionsToMove);
376      } catch (Exception e) {
377        LOG.error("Error while loading regions to " + hostname, e);
378        return false;
379      }
380      return true;
381    });
382    return waitTaskToFinish(loadPool, loadTask, "loading");
383  }
384
385  private void loadRegions(List<RegionInfo> regionsToMove)
386      throws Exception {
387    ServerName server = getTargetServer();
388    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
389    LOG.info(
390        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
391            + " threads.Ack mode:" + this.ack);
392
393    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
394    List<Future<Boolean>> taskList = new ArrayList<>();
395    int counter = 0;
396    while (counter < regionsToMove.size()) {
397      RegionInfo region = regionsToMove.get(counter);
398      ServerName currentServer = getServerNameForRegion(region);
399      if (currentServer == null) {
400        LOG.warn(
401            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
402        counter++;
403        continue;
404      } else if (server.equals(currentServer)) {
405        LOG.info(
406            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
407        counter++;
408        continue;
409      }
410      if (ack) {
411        Future<Boolean> task =
412            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
413        taskList.add(task);
414      } else {
415        Future<Boolean> task =
416            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
417        taskList.add(task);
418      }
419      counter++;
420    }
421
422    moveRegionsPool.shutdown();
423    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
424        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
425    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
426  }
427
428  /**
429   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
430   * noAck mode we do not make sure that region is successfully online on the target region
431   * server,hence it is best effort.We do not unload regions to hostnames given in
432   * {@link #excludeFile}. If designatedFile is present with some contents, we will unload regions
433   * to hostnames provided in {@link #designatedFile}
434   * @return true if unloading succeeded, false otherwise
435   */
436  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
437    deleteFile(this.filename);
438    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
439    Future<Boolean> unloadTask = unloadPool.submit(() -> {
440      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
441      try {
442        // Get Online RegionServers
443        List<ServerName> regionServers = new ArrayList<>();
444        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
445        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
446        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
447        // Remove the host Region server from target Region Servers list
448        ServerName server = stripServer(regionServers, hostname, port);
449        if (server == null) {
450          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
451              hostname, port);
452          LOG.debug("List of region servers: {}", regionServers);
453          return false;
454        }
455        // Remove RS not present in the designated file
456        includeExcludeRegionServers(designatedFile, regionServers, true);
457
458        // Remove RS present in the exclude file
459        includeExcludeRegionServers(excludeFile, regionServers, false);
460
461        // Remove decommissioned RS
462        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
463        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
464          regionServers.removeIf(decommissionedRS::contains);
465          LOG.debug("Excluded RegionServers from unloading regions to because they " +
466            "are marked as decommissioned. Servers: {}", decommissionedRS);
467        }
468
469        stripMaster(regionServers);
470        if (regionServers.isEmpty()) {
471          LOG.warn("No Regions were moved - no servers available");
472          return false;
473        } else {
474          LOG.info("Available servers {}", regionServers);
475        }
476        unloadRegions(server, regionServers, movedRegions);
477      } catch (Exception e) {
478        LOG.error("Error while unloading regions ", e);
479        return false;
480      } finally {
481        if (movedRegions != null) {
482          writeFile(filename, movedRegions);
483        }
484      }
485      return true;
486    });
487    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
488  }
489
490  @VisibleForTesting
491   Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
492      Collection<ServerName> onlineServers) {
493    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
494      return onlineServers;
495    }
496    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
497    for (ServerName server : onlineServers) {
498      Address address = Address.fromParts(server.getHostname(), server.getPort());
499      if (rsgroup.containsServer(address)) {
500        serverLists.add(server);
501      }
502    }
503    return serverLists;
504  }
505
506  private void unloadRegions(ServerName server, List<ServerName> regionServers,
507      List<RegionInfo> movedRegions) throws Exception {
508    while (true) {
509      List<RegionInfo> regionsToMove = admin.getRegions(server);
510      regionsToMove.removeAll(movedRegions);
511      if (regionsToMove.isEmpty()) {
512        LOG.info("No Regions to move....Quitting now");
513        break;
514      }
515      int counter = 0;
516      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
517          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
518          + ack);
519      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
520      List<Future<Boolean>> taskList = new ArrayList<>();
521      int serverIndex = 0;
522      while (counter < regionsToMove.size()) {
523        if (ack) {
524          Future<Boolean> task = moveRegionsPool.submit(
525              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
526                  movedRegions));
527          taskList.add(task);
528        } else {
529          Future<Boolean> task = moveRegionsPool.submit(
530              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
531                  movedRegions));
532          taskList.add(task);
533        }
534        counter++;
535        serverIndex = (serverIndex + 1) % regionServers.size();
536      }
537      moveRegionsPool.shutdown();
538      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
539          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
540      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
541    }
542  }
543
544  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
545      throws TimeoutException, InterruptedException, ExecutionException {
546    pool.shutdown();
547    try {
548      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
549        LOG.warn(
550            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
551                + "sec");
552        pool.shutdownNow();
553      }
554    } catch (InterruptedException e) {
555      pool.shutdownNow();
556      Thread.currentThread().interrupt();
557    }
558    try {
559      return task.get(5, TimeUnit.SECONDS);
560    } catch (InterruptedException e) {
561      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
562      throw e;
563    } catch (ExecutionException e) {
564      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
565      throw e;
566    }
567  }
568
569  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
570      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
571    try {
572      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
573        moveRegionsPool.shutdownNow();
574      }
575    } catch (InterruptedException e) {
576      moveRegionsPool.shutdownNow();
577      Thread.currentThread().interrupt();
578    }
579    for (Future<Boolean> future : taskList) {
580      try {
581        // if even after shutdownNow threads are stuck we wait for 5 secs max
582        if (!future.get(5, TimeUnit.SECONDS)) {
583          LOG.error("Was Not able to move region....Exiting Now");
584          throw new Exception("Could not move region Exception");
585        }
586      } catch (InterruptedException e) {
587        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
588        throw e;
589      } catch (ExecutionException e) {
590        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
591        throw e;
592      } catch (CancellationException e) {
593        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
594            + "secs", e);
595        throw e;
596      }
597    }
598  }
599
600  private ServerName getTargetServer() throws Exception {
601    ServerName server = null;
602    int maxWaitInSeconds =
603        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
604    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
605    while (EnvironmentEdgeManager.currentTime() < maxWait) {
606      try {
607        List<ServerName> regionServers = new ArrayList<>();
608        regionServers.addAll(admin.getRegionServers());
609        // Remove the host Region server from target Region Servers list
610        server = stripServer(regionServers, hostname, port);
611        if (server != null) {
612          break;
613        } else {
614          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
615        }
616      } catch (IOException e) {
617        LOG.warn("Could not get list of region servers", e);
618      }
619      Thread.sleep(500);
620    }
621    if (server == null) {
622      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
623      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
624    }
625    return server;
626  }
627
628  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
629    List<RegionInfo> regions = new ArrayList<>();
630    File f = new File(filename);
631    if (!f.exists()) {
632      return regions;
633    }
634    try (DataInputStream dis = new DataInputStream(
635        new BufferedInputStream(new FileInputStream(f)))) {
636      int numRegions = dis.readInt();
637      int index = 0;
638      while (index < numRegions) {
639        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
640        index++;
641      }
642    } catch (IOException e) {
643      LOG.error("Error while reading regions from file:" + filename, e);
644      throw e;
645    }
646    return regions;
647  }
648
649  /**
650   * Write the number of regions moved in the first line followed by regions moved in subsequent
651   * lines
652   */
653  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
654    try (DataOutputStream dos = new DataOutputStream(
655        new BufferedOutputStream(new FileOutputStream(filename)))) {
656      dos.writeInt(movedRegions.size());
657      for (RegionInfo region : movedRegions) {
658        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
659      }
660    } catch (IOException e) {
661      LOG.error(
662          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
663              .size() + " regions", e);
664      throw e;
665    }
666  }
667
668  private void deleteFile(String filename) {
669    File f = new File(filename);
670    if (f.exists()) {
671      f.delete();
672    }
673  }
674
675  /**
676   * @param filename The file should have 'host:port' per line
677   * @return List of servers from the file in format 'hostname:port'.
678   */
679  private List<String> readServersFromFile(String filename) throws IOException {
680    List<String> servers = new ArrayList<>();
681    if (filename != null) {
682      try {
683        Files.readAllLines(Paths.get(filename)).stream().map(String::trim)
684          .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
685          .forEach(servers::add);
686      } catch (IOException e) {
687        LOG.error("Exception while reading servers from file,", e);
688        throw e;
689      }
690    }
691    return servers;
692  }
693
694  /**
695   * Designates or excludes the servername whose hostname and port portion matches the list given
696   * in the file.
697   * Example:<br>
698   * If you want to designated RSs, suppose designatedFile has RS1, regionServers has RS1, RS2 and
699   * RS3. When we call includeExcludeRegionServers(designatedFile, regionServers, true), RS2 and
700   * RS3 are removed from regionServers list so that regions can move to only RS1.
701   * If you want to exclude RSs, suppose excludeFile has RS1, regionServers has RS1, RS2 and RS3.
702   * When we call includeExcludeRegionServers(excludeFile, servers, false), RS1 is removed from
703   * regionServers list so that regions can move to only RS2 and RS3.
704   */
705  private void includeExcludeRegionServers(String fileName, List<ServerName> regionServers,
706      boolean isInclude) throws IOException {
707    if (fileName != null) {
708      List<String> servers = readServersFromFile(fileName);
709      if (servers.isEmpty()) {
710        LOG.warn("No servers provided in the file: {}." + fileName);
711        return;
712      }
713      Iterator<ServerName> i = regionServers.iterator();
714      while (i.hasNext()) {
715        String rs = i.next().getServerName();
716        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
717          .split(ServerName.SERVERNAME_SEPARATOR)[1];
718        if (isInclude != servers.contains(rsPort)) {
719          i.remove();
720        }
721      }
722    }
723  }
724
725  /**
726   * Exclude master from list of RSs to move regions to
727   */
728  private void stripMaster(List<ServerName> regionServers) throws IOException {
729    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
730    stripServer(regionServers, master.getHostname(), master.getPort());
731  }
732
733  /**
734   * Remove the servername whose hostname and port portion matches from the passed array of servers.
735   * Returns as side-effect the servername removed.
736   * @return server removed from list of Region Servers
737   */
738  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
739    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
740      ServerName server = iter.next();
741      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
742        server.getAddress().getPort() == port) {
743        iter.remove();
744        return server;
745      }
746    }
747    return null;
748  }
749
750  /**
751   * Tries to scan a row from passed region
752   */
753  private void isSuccessfulScan(RegionInfo region) throws IOException {
754    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
755        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
756        .setCacheBlocks(false);
757    try (Table table = conn.getTable(region.getTable());
758        ResultScanner scanner = table.getScanner(scan)) {
759      scanner.next();
760    } catch (IOException e) {
761      LOG.error("Could not scan region:" + region.getEncodedName(), e);
762      throw e;
763    }
764  }
765
766  /**
767   * Returns true if passed region is still on serverName when we look at hbase:meta.
768   * @return true if region is hosted on serverName otherwise false
769   */
770  private boolean isSameServer(RegionInfo region, ServerName serverName)
771      throws IOException {
772    ServerName serverForRegion = getServerNameForRegion(region);
773    if (serverForRegion != null && serverForRegion.equals(serverName)) {
774      return true;
775    }
776    return false;
777  }
778
779  /**
780   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
781   * startcode comma-delimited. Can return null
782   * @return regionServer hosting the given region
783   */
784  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
785    if (!admin.isTableEnabled(region.getTable())) {
786      return null;
787    }
788    HRegionLocation loc =
789      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(),
790        region.getReplicaId(),true);
791    if (loc != null) {
792      return loc.getServerName();
793    } else {
794      return null;
795    }
796  }
797
798  @Override
799  protected void addOptions() {
800    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
801    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
802    this.addOptWithArg("m", "maxthreads",
803        "Define the maximum number of threads to use to unload and reload the regions");
804    this.addOptWithArg("x", "excludefile",
805        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
806            + "target host; useful for rack decommisioning.");
807    this.addOptWithArg("d","designatedfile","File with <hostname:port> per line as unload targets;"
808            + "default is all online hosts");
809    this.addOptWithArg("f", "filename",
810        "File to save regions list into unloading, or read from loading; "
811            + "default /tmp/<usernamehostname:port>");
812    this.addOptNoArg("n", "noack",
813        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
814            + "RegionServer, hence best effort. This is more performant in unloading and loading "
815            + "but might lead to region being unavailable for some time till master reassigns it "
816            + "in case the move failed");
817    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
818        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
819  }
820
821  @Override
822  protected void processOptions(CommandLine cmd) {
823    String hostname = cmd.getOptionValue("r");
824    rmbuilder = new RegionMoverBuilder(hostname);
825    if (cmd.hasOption('m')) {
826      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
827    }
828    if (cmd.hasOption('n')) {
829      rmbuilder.ack(false);
830    }
831    if (cmd.hasOption('f')) {
832      rmbuilder.filename(cmd.getOptionValue('f'));
833    }
834    if (cmd.hasOption('x')) {
835      rmbuilder.excludeFile(cmd.getOptionValue('x'));
836    }
837    if (cmd.hasOption('d')) {
838      rmbuilder.designatedFile(cmd.getOptionValue('d'));
839    }
840    if (cmd.hasOption('t')) {
841      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
842    }
843    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
844  }
845
846  @Override
847  protected int doWork() throws Exception {
848    boolean success;
849    try (RegionMover rm = rmbuilder.build()) {
850      if (loadUnload.equalsIgnoreCase("load")) {
851        success = rm.load();
852      } else if (loadUnload.equalsIgnoreCase("unload")) {
853        success = rm.unload();
854      } else {
855        printUsage();
856        success = false;
857      }
858    }
859    return (success ? 0 : 1);
860  }
861
862  public static void main(String[] args) {
863    try (RegionMover mover = new RegionMover()) {
864      mover.doStaticMain(args);
865    }
866  }
867}