001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collection;
035import java.util.Collections;
036import java.util.EnumSet;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.List;
040import java.util.Locale;
041import java.util.Set;
042import java.util.concurrent.Callable;
043import java.util.concurrent.CancellationException;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.TimeoutException;
050import java.util.function.Predicate;
051import org.apache.commons.io.IOUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.ClusterMetrics.Option;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionFactory;
061import org.apache.hadoop.hbase.client.RegionInfo;
062import org.apache.hadoop.hbase.client.ResultScanner;
063import org.apache.hadoop.hbase.client.Scan;
064import org.apache.hadoop.hbase.client.Table;
065import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
066import org.apache.hadoop.hbase.net.Address;
067import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
073import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
074import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
075
076/**
077 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
078 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
079 * acknowledges if regions are online after movement while noAck mode is best effort mode that
080 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
081 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
082 * anyways. This can also be used by constructiong an Object using the builder and then calling
083 * {@link #load()} or {@link #unload()} methods for the desired operations.
084 */
085@InterfaceAudience.Public
086public class RegionMover extends AbstractHBaseTool implements Closeable {
087  private static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
088  private static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
089  static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
090  private static final int DEFAULT_MOVE_RETRIES_MAX = 5;
091  private static final int DEFAULT_MOVE_WAIT_MAX = 60;
092  private static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
093
094  private static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
095
096  private RegionMoverBuilder rmbuilder;
097  private boolean ack = true;
098  private int maxthreads = 1;
099  private int timeout;
100  private String loadUnload;
101  private String hostname;
102  private String filename;
103  private String excludeFile;
104  private int port;
105  private Connection conn;
106  private Admin admin;
107
108  private RegionMover(RegionMoverBuilder builder) throws IOException {
109    this.hostname = builder.hostname;
110    this.filename = builder.filename;
111    this.excludeFile = builder.excludeFile;
112    this.maxthreads = builder.maxthreads;
113    this.ack = builder.ack;
114    this.port = builder.port;
115    this.timeout = builder.timeout;
116    setConf(builder.conf);
117    this.conn = ConnectionFactory.createConnection(conf);
118    this.admin = conn.getAdmin();
119  }
120
121  private RegionMover() {
122  }
123
124  @Override
125  public void close() {
126    IOUtils.closeQuietly(this.admin);
127    IOUtils.closeQuietly(this.conn);
128  }
129
130  /**
131   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
132   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
133   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
134   */
135  public static class RegionMoverBuilder {
136    private boolean ack = true;
137    private int maxthreads = 1;
138    private int timeout = Integer.MAX_VALUE;
139    private String hostname;
140    private String filename;
141    private String excludeFile = null;
142    private String defaultDir = System.getProperty("java.io.tmpdir");
143    @VisibleForTesting
144    final int port;
145    private final Configuration conf;
146
147    public RegionMoverBuilder(String hostname) {
148      this(hostname, createConf());
149    }
150
151    /**
152     * Creates a new configuration and sets region mover specific overrides
153     */
154    private static Configuration createConf() {
155      Configuration conf = HBaseConfiguration.create();
156      conf.setInt("hbase.client.prefetch.limit", 1);
157      conf.setInt("hbase.client.pause", 500);
158      conf.setInt("hbase.client.retries.number", 100);
159      return conf;
160    }
161
162    /**
163     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
164     *     or hostname:port.
165     * @param conf Configuration object
166     */
167    public RegionMoverBuilder(String hostname, Configuration conf) {
168      String[] splitHostname = hostname.toLowerCase().split(":");
169      this.hostname = splitHostname[0];
170      if (splitHostname.length == 2) {
171        this.port = Integer.parseInt(splitHostname[1]);
172      } else {
173        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
174      }
175      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
176        + ":" + Integer.toString(this.port);
177      this.conf = conf;
178    }
179
180    /**
181     * Path of file where regions will be written to during unloading/read from during loading
182     * @param filename
183     * @return RegionMoverBuilder object
184     */
185    public RegionMoverBuilder filename(String filename) {
186      this.filename = filename;
187      return this;
188    }
189
190    /**
191     * Set the max number of threads that will be used to move regions
192     */
193    public RegionMoverBuilder maxthreads(int threads) {
194      this.maxthreads = threads;
195      return this;
196    }
197
198    /**
199     * Path of file containing hostnames to be excluded during region movement. Exclude file should
200     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
201     * host.
202     */
203    public RegionMoverBuilder excludeFile(String excludefile) {
204      this.excludeFile = excludefile;
205      return this;
206    }
207
208    /**
209     * Set ack/noAck mode.
210     * <p>
211     * In ack mode regions are acknowledged before and after moving and the move is retried
212     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
213     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
214     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
215     * <p>
216     * @param ack
217     * @return RegionMoverBuilder object
218     */
219    public RegionMoverBuilder ack(boolean ack) {
220      this.ack = ack;
221      return this;
222    }
223
224    /**
225     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
226     * movers also have a separate time which is hbase.move.wait.max * number of regions to
227     * load/unload
228     * @param timeout in seconds
229     * @return RegionMoverBuilder object
230     */
231    public RegionMoverBuilder timeout(int timeout) {
232      this.timeout = timeout;
233      return this;
234    }
235
236    /**
237     * This method builds the appropriate RegionMover object which can then be used to load/unload
238     * using load and unload methods
239     * @return RegionMover object
240     */
241    public RegionMover build() throws IOException {
242      return new RegionMover(this);
243    }
244  }
245
246  /**
247   * Move Regions and make sure that they are up on the target server.If a region movement fails we
248   * exit as failure
249   */
250  private class MoveWithAck implements Callable<Boolean> {
251    private RegionInfo region;
252    private ServerName targetServer;
253    private List<RegionInfo> movedRegions;
254    private ServerName sourceServer;
255
256    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
257        ServerName targetServer, List<RegionInfo> movedRegions) {
258      this.region = regionInfo;
259      this.targetServer = targetServer;
260      this.movedRegions = movedRegions;
261      this.sourceServer = sourceServer;
262    }
263
264    @Override
265    public Boolean call() throws IOException, InterruptedException {
266      boolean moved = false;
267      int count = 0;
268      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
269      int maxWaitInSeconds =
270          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
271      long startTime = EnvironmentEdgeManager.currentTime();
272      boolean sameServer = true;
273      // Assert we can scan the region in its current location
274      isSuccessfulScan(region);
275      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
276          + targetServer);
277      while (count < retries && sameServer) {
278        if (count > 0) {
279          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
280        }
281        count = count + 1;
282        admin.move(region.getEncodedNameAsBytes(), targetServer);
283        long maxWait = startTime + (maxWaitInSeconds * 1000);
284        while (EnvironmentEdgeManager.currentTime() < maxWait) {
285          sameServer = isSameServer(region, sourceServer);
286          if (!sameServer) {
287            break;
288          }
289          Thread.sleep(100);
290        }
291      }
292      if (sameServer) {
293        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
294            + ",newServer=" + this.targetServer);
295      } else {
296        isSuccessfulScan(region);
297        LOG.info("Moved Region "
298            + region.getRegionNameAsString()
299            + " cost:"
300            + String.format("%.3f",
301            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
302        moved = true;
303        movedRegions.add(region);
304      }
305      return moved;
306    }
307  }
308
309  /**
310   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
311   * RS down anyways and not abort on a stuck region. Improves movement performance
312   */
313  private class MoveWithoutAck implements Callable<Boolean> {
314    private RegionInfo region;
315    private ServerName targetServer;
316    private List<RegionInfo> movedRegions;
317    private ServerName sourceServer;
318
319    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
320        ServerName targetServer, List<RegionInfo> movedRegions) {
321      this.region = regionInfo;
322      this.targetServer = targetServer;
323      this.movedRegions = movedRegions;
324      this.sourceServer = sourceServer;
325    }
326
327    @Override
328    public Boolean call() {
329      try {
330        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
331            + targetServer);
332        admin.move(region.getEncodedNameAsBytes(), targetServer);
333        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
334            + targetServer);
335      } catch (Exception e) {
336        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
337      } finally {
338        // we add region to the moved regions list in No Ack Mode since this is best effort
339        movedRegions.add(region);
340      }
341      return true;
342    }
343  }
344
345  /**
346   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
347   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
348   * @return true if loading succeeded, false otherwise
349   */
350  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
351    ExecutorService loadPool = Executors.newFixedThreadPool(1);
352    Future<Boolean> loadTask = loadPool.submit(() -> {
353      try {
354        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
355        if (regionsToMove.isEmpty()) {
356          LOG.info("No regions to load.Exiting");
357          return true;
358        }
359        loadRegions(regionsToMove);
360      } catch (Exception e) {
361        LOG.error("Error while loading regions to " + hostname, e);
362        return false;
363      }
364      return true;
365    });
366    return waitTaskToFinish(loadPool, loadTask, "loading");
367  }
368
369  private void loadRegions(List<RegionInfo> regionsToMove)
370      throws Exception {
371    ServerName server = getTargetServer();
372    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
373    LOG.info(
374        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
375            + " threads.Ack mode:" + this.ack);
376
377    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
378    List<Future<Boolean>> taskList = new ArrayList<>();
379    int counter = 0;
380    while (counter < regionsToMove.size()) {
381      RegionInfo region = regionsToMove.get(counter);
382      ServerName currentServer = getServerNameForRegion(region);
383      if (currentServer == null) {
384        LOG.warn(
385            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
386        counter++;
387        continue;
388      } else if (server.equals(currentServer)) {
389        LOG.info(
390            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
391        counter++;
392        continue;
393      }
394      if (ack) {
395        Future<Boolean> task =
396            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
397        taskList.add(task);
398      } else {
399        Future<Boolean> task =
400            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
401        taskList.add(task);
402      }
403      counter++;
404    }
405
406    moveRegionsPool.shutdown();
407    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
408        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
409    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
410  }
411
412  /**
413   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
414   * noAck mode we do not make sure that region is successfully online on the target region
415   * server,hence it is best effort.We do not unload regions to hostnames given in
416   * {@link #excludeFile}.
417   * @return true if unloading succeeded, false otherwise
418   */
419  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
420    deleteFile(this.filename);
421    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
422    Future<Boolean> unloadTask = unloadPool.submit(() -> {
423      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
424      try {
425        // Get Online RegionServers
426        List<ServerName> regionServers = new ArrayList<>();
427        RSGroupInfo rsgroup = admin.getRSGroup(Address.fromParts(hostname, port));
428        LOG.info("{} belongs to {}", hostname, rsgroup.getName());
429        regionServers.addAll(filterRSGroupServers(rsgroup, admin.getRegionServers()));
430        // Remove the host Region server from target Region Servers list
431        ServerName server = stripServer(regionServers, hostname, port);
432        if (server == null) {
433          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
434              hostname, port);
435          LOG.debug("List of region servers: {}", regionServers);
436          return false;
437        }
438        // Remove RS present in the exclude file
439        stripExcludes(regionServers);
440
441        // Remove decommissioned RS
442        Set<ServerName> decommissionedRS = new HashSet<>(admin.listDecommissionedRegionServers());
443        if (CollectionUtils.isNotEmpty(decommissionedRS)) {
444          regionServers.removeIf(decommissionedRS::contains);
445          LOG.debug("Excluded RegionServers from unloading regions to because they " +
446            "are marked as decommissioned. Servers: {}", decommissionedRS);
447        }
448
449        stripMaster(regionServers);
450        if (regionServers.isEmpty()) {
451          LOG.warn("No Regions were moved - no servers available");
452          return false;
453        } else {
454          LOG.info("Available servers {}", regionServers);
455        }
456        unloadRegions(server, regionServers, movedRegions);
457      } catch (Exception e) {
458        LOG.error("Error while unloading regions ", e);
459        return false;
460      } finally {
461        if (movedRegions != null) {
462          writeFile(filename, movedRegions);
463        }
464      }
465      return true;
466    });
467    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
468  }
469
470  @VisibleForTesting
471   Collection<ServerName> filterRSGroupServers(RSGroupInfo rsgroup,
472      Collection<ServerName> onlineServers) {
473    if (rsgroup.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
474      return onlineServers;
475    }
476    List<ServerName> serverLists = new ArrayList<>(rsgroup.getServers().size());
477    for (ServerName server : onlineServers) {
478      Address address = Address.fromParts(server.getHostname(), server.getPort());
479      if (rsgroup.containsServer(address)) {
480        serverLists.add(server);
481      }
482    }
483    return serverLists;
484  }
485
486  private void unloadRegions(ServerName server, List<ServerName> regionServers,
487      List<RegionInfo> movedRegions) throws Exception {
488    while (true) {
489      List<RegionInfo> regionsToMove = admin.getRegions(server);
490      regionsToMove.removeAll(movedRegions);
491      if (regionsToMove.isEmpty()) {
492        LOG.info("No Regions to move....Quitting now");
493        break;
494      }
495      int counter = 0;
496      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
497          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
498          + ack);
499      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
500      List<Future<Boolean>> taskList = new ArrayList<>();
501      int serverIndex = 0;
502      while (counter < regionsToMove.size()) {
503        if (ack) {
504          Future<Boolean> task = moveRegionsPool.submit(
505              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
506                  movedRegions));
507          taskList.add(task);
508        } else {
509          Future<Boolean> task = moveRegionsPool.submit(
510              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
511                  movedRegions));
512          taskList.add(task);
513        }
514        counter++;
515        serverIndex = (serverIndex + 1) % regionServers.size();
516      }
517      moveRegionsPool.shutdown();
518      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
519          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
520      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
521    }
522  }
523
524  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
525      throws TimeoutException, InterruptedException, ExecutionException {
526    pool.shutdown();
527    try {
528      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
529        LOG.warn(
530            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
531                + "sec");
532        pool.shutdownNow();
533      }
534    } catch (InterruptedException e) {
535      pool.shutdownNow();
536      Thread.currentThread().interrupt();
537    }
538    try {
539      return task.get(5, TimeUnit.SECONDS);
540    } catch (InterruptedException e) {
541      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
542      throw e;
543    } catch (ExecutionException e) {
544      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
545      throw e;
546    }
547  }
548
549  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
550      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
551    try {
552      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
553        moveRegionsPool.shutdownNow();
554      }
555    } catch (InterruptedException e) {
556      moveRegionsPool.shutdownNow();
557      Thread.currentThread().interrupt();
558    }
559    for (Future<Boolean> future : taskList) {
560      try {
561        // if even after shutdownNow threads are stuck we wait for 5 secs max
562        if (!future.get(5, TimeUnit.SECONDS)) {
563          LOG.error("Was Not able to move region....Exiting Now");
564          throw new Exception("Could not move region Exception");
565        }
566      } catch (InterruptedException e) {
567        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
568        throw e;
569      } catch (ExecutionException e) {
570        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
571        throw e;
572      } catch (CancellationException e) {
573        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
574            + "secs", e);
575        throw e;
576      }
577    }
578  }
579
580  private ServerName getTargetServer() throws Exception {
581    ServerName server = null;
582    int maxWaitInSeconds =
583        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
584    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
585    while (EnvironmentEdgeManager.currentTime() < maxWait) {
586      try {
587        List<ServerName> regionServers = new ArrayList<>();
588        regionServers.addAll(admin.getRegionServers());
589        // Remove the host Region server from target Region Servers list
590        server = stripServer(regionServers, hostname, port);
591        if (server != null) {
592          break;
593        } else {
594          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
595        }
596      } catch (IOException e) {
597        LOG.warn("Could not get list of region servers", e);
598      }
599      Thread.sleep(500);
600    }
601    if (server == null) {
602      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
603      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
604    }
605    return server;
606  }
607
608  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
609    List<RegionInfo> regions = new ArrayList<>();
610    File f = new File(filename);
611    if (!f.exists()) {
612      return regions;
613    }
614    try (DataInputStream dis = new DataInputStream(
615        new BufferedInputStream(new FileInputStream(f)))) {
616      int numRegions = dis.readInt();
617      int index = 0;
618      while (index < numRegions) {
619        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
620        index++;
621      }
622    } catch (IOException e) {
623      LOG.error("Error while reading regions from file:" + filename, e);
624      throw e;
625    }
626    return regions;
627  }
628
629  /**
630   * Write the number of regions moved in the first line followed by regions moved in subsequent
631   * lines
632   */
633  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
634    try (DataOutputStream dos = new DataOutputStream(
635        new BufferedOutputStream(new FileOutputStream(filename)))) {
636      dos.writeInt(movedRegions.size());
637      for (RegionInfo region : movedRegions) {
638        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
639      }
640    } catch (IOException e) {
641      LOG.error(
642          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
643              .size() + " regions", e);
644      throw e;
645    }
646  }
647
648  private void deleteFile(String filename) {
649    File f = new File(filename);
650    if (f.exists()) {
651      f.delete();
652    }
653  }
654
655  /**
656   * @return List of servers from the exclude file in format 'hostname:port'.
657   */
658  private List<String> readExcludes(String excludeFile) throws IOException {
659    List<String> excludeServers = new ArrayList<>();
660    if (excludeFile == null) {
661      return excludeServers;
662    } else {
663      try {
664        Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim)
665            .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
666            .forEach(excludeServers::add);
667      } catch (IOException e) {
668        LOG.warn("Exception while reading excludes file, continuing anyways", e);
669      }
670      return excludeServers;
671    }
672  }
673
674  /**
675   * Excludes the servername whose hostname and port portion matches the list given in exclude file
676   */
677  private void stripExcludes(List<ServerName> regionServers) throws IOException {
678    if (excludeFile != null) {
679      List<String> excludes = readExcludes(excludeFile);
680      Iterator<ServerName> i = regionServers.iterator();
681      while (i.hasNext()) {
682        String rs = i.next().getServerName();
683        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
684            .split(ServerName.SERVERNAME_SEPARATOR)[1];
685        if (excludes.contains(rsPort)) {
686          i.remove();
687        }
688      }
689      LOG.info("Valid Region server targets are:" + regionServers.toString());
690      LOG.info("Excluded Servers are" + excludes.toString());
691    }
692  }
693
694  /**
695   * Exclude master from list of RSs to move regions to
696   */
697  private void stripMaster(List<ServerName> regionServers) throws IOException {
698    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
699    stripServer(regionServers, master.getHostname(), master.getPort());
700  }
701
702  /**
703   * Remove the servername whose hostname and port portion matches from the passed array of servers.
704   * Returns as side-effect the servername removed.
705   * @return server removed from list of Region Servers
706   */
707  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
708    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
709      ServerName server = iter.next();
710      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
711        server.getAddress().getPort() == port) {
712        iter.remove();
713        return server;
714      }
715    }
716    return null;
717  }
718
719  /**
720   * Tries to scan a row from passed region
721   */
722  private void isSuccessfulScan(RegionInfo region) throws IOException {
723    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
724        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
725        .setCacheBlocks(false);
726    try (Table table = conn.getTable(region.getTable());
727        ResultScanner scanner = table.getScanner(scan)) {
728      scanner.next();
729    } catch (IOException e) {
730      LOG.error("Could not scan region:" + region.getEncodedName(), e);
731      throw e;
732    }
733  }
734
735  /**
736   * Returns true if passed region is still on serverName when we look at hbase:meta.
737   * @return true if region is hosted on serverName otherwise false
738   */
739  private boolean isSameServer(RegionInfo region, ServerName serverName)
740      throws IOException {
741    ServerName serverForRegion = getServerNameForRegion(region);
742    if (serverForRegion != null && serverForRegion.equals(serverName)) {
743      return true;
744    }
745    return false;
746  }
747
748  /**
749   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
750   * startcode comma-delimited. Can return null
751   * @return regionServer hosting the given region
752   */
753  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
754    if (!admin.isTableEnabled(region.getTable())) {
755      return null;
756    }
757    HRegionLocation loc =
758      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(),
759        region.getReplicaId(),true);
760    if (loc != null) {
761      return loc.getServerName();
762    } else {
763      return null;
764    }
765  }
766
767  @Override
768  protected void addOptions() {
769    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
770    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
771    this.addOptWithArg("m", "maxthreads",
772        "Define the maximum number of threads to use to unload and reload the regions");
773    this.addOptWithArg("x", "excludefile",
774        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
775            + "target host; useful for rack decommisioning.");
776    this.addOptWithArg("f", "filename",
777        "File to save regions list into unloading, or read from loading; "
778            + "default /tmp/<usernamehostname:port>");
779    this.addOptNoArg("n", "noack",
780        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
781            + "RegionServer, hence best effort. This is more performant in unloading and loading "
782            + "but might lead to region being unavailable for some time till master reassigns it "
783            + "in case the move failed");
784    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
785        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
786  }
787
788  @Override
789  protected void processOptions(CommandLine cmd) {
790    String hostname = cmd.getOptionValue("r");
791    rmbuilder = new RegionMoverBuilder(hostname);
792    if (cmd.hasOption('m')) {
793      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
794    }
795    if (cmd.hasOption('n')) {
796      rmbuilder.ack(false);
797    }
798    if (cmd.hasOption('f')) {
799      rmbuilder.filename(cmd.getOptionValue('f'));
800    }
801    if (cmd.hasOption('x')) {
802      rmbuilder.excludeFile(cmd.getOptionValue('x'));
803    }
804    if (cmd.hasOption('t')) {
805      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
806    }
807    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
808  }
809
810  @Override
811  protected int doWork() throws Exception {
812    boolean success;
813    try (RegionMover rm = rmbuilder.build()) {
814      if (loadUnload.equalsIgnoreCase("load")) {
815        success = rm.load();
816      } else if (loadUnload.equalsIgnoreCase("unload")) {
817        success = rm.unload();
818      } else {
819        printUsage();
820        success = false;
821      }
822    }
823    return (success ? 0 : 1);
824  }
825
826  public static void main(String[] args) {
827    try (RegionMover mover = new RegionMover()) {
828      mover.doStaticMain(args);
829    }
830  }
831}