001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.EnumSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Locale;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CancellationException;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.TimeoutException;
047import java.util.function.Predicate;
048import org.apache.commons.io.IOUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.ClusterMetrics.Option;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
069
070/**
071 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
072 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
073 * acknowledges if regions are online after movement while noAck mode is best effort mode that
074 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
075 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
076 * anyways. This can also be used by constructiong an Object using the builder and then calling
077 * {@link #load()} or {@link #unload()} methods for the desired operations.
078 */
079@InterfaceAudience.Public
080public class RegionMover extends AbstractHBaseTool implements Closeable {
081  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
082  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
083  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
084  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
085  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
086  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
087  static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
088  private RegionMoverBuilder rmbuilder;
089  private boolean ack = true;
090  private int maxthreads = 1;
091  private int timeout;
092  private String loadUnload;
093  private String hostname;
094  private String filename;
095  private String excludeFile;
096  private int port;
097  private Connection conn;
098  private Admin admin;
099
100  private RegionMover(RegionMoverBuilder builder) throws IOException {
101    this.hostname = builder.hostname;
102    this.filename = builder.filename;
103    this.excludeFile = builder.excludeFile;
104    this.maxthreads = builder.maxthreads;
105    this.ack = builder.ack;
106    this.port = builder.port;
107    this.timeout = builder.timeout;
108    setConf(builder.conf);
109    this.conn = ConnectionFactory.createConnection(conf);
110    this.admin = conn.getAdmin();
111  }
112
113  private RegionMover() {
114  }
115
116  @Override
117  public void close() {
118    IOUtils.closeQuietly(this.admin);
119    IOUtils.closeQuietly(this.conn);
120  }
121
122  /**
123   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
124   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
125   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
126   */
127  public static class RegionMoverBuilder {
128    private boolean ack = true;
129    private int maxthreads = 1;
130    private int timeout = Integer.MAX_VALUE;
131    private String hostname;
132    private String filename;
133    private String excludeFile = null;
134    private String defaultDir = System.getProperty("java.io.tmpdir");
135    @VisibleForTesting
136    final int port;
137    private final Configuration conf;
138
139    public RegionMoverBuilder(String hostname) {
140      this(hostname, createConf());
141    }
142
143    /**
144     * Creates a new configuration and sets region mover specific overrides
145     */
146    private static Configuration createConf() {
147      Configuration conf = HBaseConfiguration.create();
148      conf.setInt("hbase.client.prefetch.limit", 1);
149      conf.setInt("hbase.client.pause", 500);
150      conf.setInt("hbase.client.retries.number", 100);
151      return conf;
152    }
153
154    /**
155     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
156     *     or hostname:port.
157     * @param conf Configuration object
158     */
159    public RegionMoverBuilder(String hostname, Configuration conf) {
160      String[] splitHostname = hostname.toLowerCase().split(":");
161      this.hostname = splitHostname[0];
162      if (splitHostname.length == 2) {
163        this.port = Integer.parseInt(splitHostname[1]);
164      } else {
165        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
166      }
167      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
168        + ":" + Integer.toString(this.port);
169      this.conf = conf;
170    }
171
172    /**
173     * Path of file where regions will be written to during unloading/read from during loading
174     * @param filename
175     * @return RegionMoverBuilder object
176     */
177    public RegionMoverBuilder filename(String filename) {
178      this.filename = filename;
179      return this;
180    }
181
182    /**
183     * Set the max number of threads that will be used to move regions
184     */
185    public RegionMoverBuilder maxthreads(int threads) {
186      this.maxthreads = threads;
187      return this;
188    }
189
190    /**
191     * Path of file containing hostnames to be excluded during region movement. Exclude file should
192     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
193     * host.
194     */
195    public RegionMoverBuilder excludeFile(String excludefile) {
196      this.excludeFile = excludefile;
197      return this;
198    }
199
200    /**
201     * Set ack/noAck mode.
202     * <p>
203     * In ack mode regions are acknowledged before and after moving and the move is retried
204     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
205     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
206     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
207     * <p>
208     * @param ack
209     * @return RegionMoverBuilder object
210     */
211    public RegionMoverBuilder ack(boolean ack) {
212      this.ack = ack;
213      return this;
214    }
215
216    /**
217     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
218     * movers also have a separate time which is hbase.move.wait.max * number of regions to
219     * load/unload
220     * @param timeout in seconds
221     * @return RegionMoverBuilder object
222     */
223    public RegionMoverBuilder timeout(int timeout) {
224      this.timeout = timeout;
225      return this;
226    }
227
228    /**
229     * This method builds the appropriate RegionMover object which can then be used to load/unload
230     * using load and unload methods
231     * @return RegionMover object
232     */
233    public RegionMover build() throws IOException {
234      return new RegionMover(this);
235    }
236  }
237
238  /**
239   * Move Regions and make sure that they are up on the target server.If a region movement fails we
240   * exit as failure
241   */
242  private class MoveWithAck implements Callable<Boolean> {
243    private RegionInfo region;
244    private ServerName targetServer;
245    private List<RegionInfo> movedRegions;
246    private ServerName sourceServer;
247
248    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
249        ServerName targetServer, List<RegionInfo> movedRegions) {
250      this.region = regionInfo;
251      this.targetServer = targetServer;
252      this.movedRegions = movedRegions;
253      this.sourceServer = sourceServer;
254    }
255
256    @Override
257    public Boolean call() throws IOException, InterruptedException {
258      boolean moved = false;
259      int count = 0;
260      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
261      int maxWaitInSeconds =
262          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
263      long startTime = EnvironmentEdgeManager.currentTime();
264      boolean sameServer = true;
265      // Assert we can scan the region in its current location
266      isSuccessfulScan(region);
267      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
268          + targetServer);
269      while (count < retries && sameServer) {
270        if (count > 0) {
271          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
272        }
273        count = count + 1;
274        admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName()));
275        long maxWait = startTime + (maxWaitInSeconds * 1000);
276        while (EnvironmentEdgeManager.currentTime() < maxWait) {
277          sameServer = isSameServer(region, sourceServer);
278          if (!sameServer) {
279            break;
280          }
281          Thread.sleep(100);
282        }
283      }
284      if (sameServer) {
285        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
286            + ",newServer=" + this.targetServer);
287      } else {
288        isSuccessfulScan(region);
289        LOG.info("Moved Region "
290            + region.getRegionNameAsString()
291            + " cost:"
292            + String.format("%.3f",
293            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
294        moved = true;
295        movedRegions.add(region);
296      }
297      return moved;
298    }
299  }
300
301  /**
302   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
303   * RS down anyways and not abort on a stuck region. Improves movement performance
304   */
305  private class MoveWithoutAck implements Callable<Boolean> {
306    private RegionInfo region;
307    private ServerName targetServer;
308    private List<RegionInfo> movedRegions;
309    private ServerName sourceServer;
310
311    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
312        ServerName targetServer, List<RegionInfo> movedRegions) {
313      this.region = regionInfo;
314      this.targetServer = targetServer;
315      this.movedRegions = movedRegions;
316      this.sourceServer = sourceServer;
317    }
318
319    @Override
320    public Boolean call() {
321      try {
322        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
323            + targetServer);
324        admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName()));
325        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
326            + targetServer);
327      } catch (Exception e) {
328        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
329      } finally {
330        // we add region to the moved regions list in No Ack Mode since this is best effort
331        movedRegions.add(region);
332      }
333      return true;
334    }
335  }
336
337  /**
338   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
339   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
340   * @return true if loading succeeded, false otherwise
341   */
342  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
343    ExecutorService loadPool = Executors.newFixedThreadPool(1);
344    Future<Boolean> loadTask = loadPool.submit(() -> {
345      try {
346        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
347        if (regionsToMove.isEmpty()) {
348          LOG.info("No regions to load.Exiting");
349          return true;
350        }
351        loadRegions(regionsToMove);
352      } catch (Exception e) {
353        LOG.error("Error while loading regions to " + hostname, e);
354        return false;
355      }
356      return true;
357    });
358    return waitTaskToFinish(loadPool, loadTask, "loading");
359  }
360
361  private void loadRegions(List<RegionInfo> regionsToMove)
362      throws Exception {
363    ServerName server = getTargetServer();
364    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
365    LOG.info(
366        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
367            + " threads.Ack mode:" + this.ack);
368
369    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
370    List<Future<Boolean>> taskList = new ArrayList<>();
371    int counter = 0;
372    while (counter < regionsToMove.size()) {
373      RegionInfo region = regionsToMove.get(counter);
374      ServerName currentServer = getServerNameForRegion(region);
375      if (currentServer == null) {
376        LOG.warn(
377            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
378        counter++;
379        continue;
380      } else if (server.equals(currentServer)) {
381        LOG.info(
382            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
383        counter++;
384        continue;
385      }
386      if (ack) {
387        Future<Boolean> task =
388            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
389        taskList.add(task);
390      } else {
391        Future<Boolean> task =
392            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
393        taskList.add(task);
394      }
395      counter++;
396    }
397
398    moveRegionsPool.shutdown();
399    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
400        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
401    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
402  }
403
404  /**
405   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
406   * noAck mode we do not make sure that region is successfully online on the target region
407   * server,hence it is best effort.We do not unload regions to hostnames given in
408   * {@link #excludeFile}.
409   * @return true if unloading succeeded, false otherwise
410   */
411  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
412    deleteFile(this.filename);
413    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
414    Future<Boolean> unloadTask = unloadPool.submit(() -> {
415      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
416      try {
417        // Get Online RegionServers
418        List<ServerName> regionServers = new ArrayList<>();
419        regionServers.addAll(
420            admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
421                .keySet());
422        // Remove the host Region server from target Region Servers list
423        ServerName server = stripServer(regionServers, hostname, port);
424        if (server == null) {
425          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
426              hostname, port);
427          LOG.debug("List of region servers: {}", regionServers);
428          return false;
429        }
430        // Remove RS present in the exclude file
431        stripExcludes(regionServers);
432        stripMaster(regionServers);
433        if (regionServers.isEmpty()) {
434          LOG.warn("No Regions were moved - no servers available");
435          return false;
436        }
437        unloadRegions(server, regionServers, movedRegions);
438      } catch (Exception e) {
439        LOG.error("Error while unloading regions ", e);
440        return false;
441      } finally {
442        if (movedRegions != null) {
443          writeFile(filename, movedRegions);
444        }
445      }
446      return true;
447    });
448    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
449  }
450
451  private void unloadRegions(ServerName server, List<ServerName> regionServers,
452      List<RegionInfo> movedRegions) throws Exception {
453    while (true) {
454      List<RegionInfo> regionsToMove = admin.getRegions(server);
455      regionsToMove.removeAll(movedRegions);
456      if (regionsToMove.isEmpty()) {
457        LOG.info("No Regions to move....Quitting now");
458        break;
459      }
460      int counter = 0;
461      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
462          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
463          + ack);
464      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
465      List<Future<Boolean>> taskList = new ArrayList<>();
466      int serverIndex = 0;
467      while (counter < regionsToMove.size()) {
468        if (ack) {
469          Future<Boolean> task = moveRegionsPool.submit(
470              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
471                  movedRegions));
472          taskList.add(task);
473        } else {
474          Future<Boolean> task = moveRegionsPool.submit(
475              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
476                  movedRegions));
477          taskList.add(task);
478        }
479        counter++;
480        serverIndex = (serverIndex + 1) % regionServers.size();
481      }
482      moveRegionsPool.shutdown();
483      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
484          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
485      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
486    }
487  }
488
489  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
490      throws TimeoutException, InterruptedException, ExecutionException {
491    pool.shutdown();
492    try {
493      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
494        LOG.warn(
495            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
496                + "sec");
497        pool.shutdownNow();
498      }
499    } catch (InterruptedException e) {
500      pool.shutdownNow();
501      Thread.currentThread().interrupt();
502    }
503    try {
504      return task.get(5, TimeUnit.SECONDS);
505    } catch (InterruptedException e) {
506      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
507      throw e;
508    } catch (ExecutionException e) {
509      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
510      throw e;
511    }
512  }
513
514  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
515      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
516    try {
517      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
518        moveRegionsPool.shutdownNow();
519      }
520    } catch (InterruptedException e) {
521      moveRegionsPool.shutdownNow();
522      Thread.currentThread().interrupt();
523    }
524    for (Future<Boolean> future : taskList) {
525      try {
526        // if even after shutdownNow threads are stuck we wait for 5 secs max
527        if (!future.get(5, TimeUnit.SECONDS)) {
528          LOG.error("Was Not able to move region....Exiting Now");
529          throw new Exception("Could not move region Exception");
530        }
531      } catch (InterruptedException e) {
532        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
533        throw e;
534      } catch (ExecutionException e) {
535        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
536        throw e;
537      } catch (CancellationException e) {
538        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
539            + "secs", e);
540        throw e;
541      }
542    }
543  }
544
545  private ServerName getTargetServer() throws Exception {
546    ServerName server = null;
547    int maxWaitInSeconds =
548        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
549    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
550    while (EnvironmentEdgeManager.currentTime() < maxWait) {
551      try {
552        List<ServerName> regionServers = new ArrayList<>();
553        regionServers.addAll(
554            admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
555                .keySet());
556        // Remove the host Region server from target Region Servers list
557        server = stripServer(regionServers, hostname, port);
558        if (server != null) {
559          break;
560        } else {
561          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
562        }
563      } catch (IOException e) {
564        LOG.warn("Could not get list of region servers", e);
565      }
566      Thread.sleep(500);
567    }
568    if (server == null) {
569      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
570      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
571    }
572    return server;
573  }
574
575  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
576    List<RegionInfo> regions = new ArrayList<>();
577    File f = new File(filename);
578    if (!f.exists()) {
579      return regions;
580    }
581    try (DataInputStream dis = new DataInputStream(
582        new BufferedInputStream(new FileInputStream(f)))) {
583      int numRegions = dis.readInt();
584      int index = 0;
585      while (index < numRegions) {
586        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
587        index++;
588      }
589    } catch (IOException e) {
590      LOG.error("Error while reading regions from file:" + filename, e);
591      throw e;
592    }
593    return regions;
594  }
595
596  /**
597   * Write the number of regions moved in the first line followed by regions moved in subsequent
598   * lines
599   */
600  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
601    try (DataOutputStream dos = new DataOutputStream(
602        new BufferedOutputStream(new FileOutputStream(filename)))) {
603      dos.writeInt(movedRegions.size());
604      for (RegionInfo region : movedRegions) {
605        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
606      }
607    } catch (IOException e) {
608      LOG.error(
609          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
610              .size() + " regions", e);
611      throw e;
612    }
613  }
614
615  private void deleteFile(String filename) {
616    File f = new File(filename);
617    if (f.exists()) {
618      f.delete();
619    }
620  }
621
622  /**
623   * @return List of servers from the exclude file in format 'hostname:port'.
624   */
625  private List<String> readExcludes(String excludeFile) throws IOException {
626    List<String> excludeServers = new ArrayList<>();
627    if (excludeFile == null) {
628      return excludeServers;
629    } else {
630      try {
631        Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim)
632            .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
633            .forEach(excludeServers::add);
634      } catch (IOException e) {
635        LOG.warn("Exception while reading excludes file, continuing anyways", e);
636      }
637      return excludeServers;
638    }
639  }
640
641  /**
642   * Excludes the servername whose hostname and port portion matches the list given in exclude file
643   */
644  private void stripExcludes(List<ServerName> regionServers) throws IOException {
645    if (excludeFile != null) {
646      List<String> excludes = readExcludes(excludeFile);
647      Iterator<ServerName> i = regionServers.iterator();
648      while (i.hasNext()) {
649        String rs = i.next().getServerName();
650        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
651            .split(ServerName.SERVERNAME_SEPARATOR)[1];
652        if (excludes.contains(rsPort)) {
653          i.remove();
654        }
655      }
656      LOG.info("Valid Region server targets are:" + regionServers.toString());
657      LOG.info("Excluded Servers are" + excludes.toString());
658    }
659  }
660
661  /**
662   * Exclude master from list of RSs to move regions to
663   */
664  private void stripMaster(List<ServerName> regionServers) throws IOException {
665    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
666    stripServer(regionServers, master.getHostname(), master.getPort());
667  }
668
669  /**
670   * Remove the servername whose hostname and port portion matches from the passed array of servers.
671   * Returns as side-effect the servername removed.
672   * @return server removed from list of Region Servers
673   */
674  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
675    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
676      ServerName server = iter.next();
677      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
678        server.getAddress().getPort() == port) {
679        iter.remove();
680        return server;
681      }
682    }
683    return null;
684  }
685
686  /**
687   * Tries to scan a row from passed region
688   */
689  private void isSuccessfulScan(RegionInfo region) throws IOException {
690    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
691        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
692        .setCacheBlocks(false);
693    try (Table table = conn.getTable(region.getTable());
694        ResultScanner scanner = table.getScanner(scan)) {
695      scanner.next();
696    } catch (IOException e) {
697      LOG.error("Could not scan region:" + region.getEncodedName(), e);
698      throw e;
699    }
700  }
701
702  /**
703   * Returns true if passed region is still on serverName when we look at hbase:meta.
704   * @return true if region is hosted on serverName otherwise false
705   */
706  private boolean isSameServer(RegionInfo region, ServerName serverName)
707      throws IOException {
708    ServerName serverForRegion = getServerNameForRegion(region);
709    if (serverForRegion != null && serverForRegion.equals(serverName)) {
710      return true;
711    }
712    return false;
713  }
714
715  /**
716   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
717   * startcode comma-delimited. Can return null
718   * @return regionServer hosting the given region
719   */
720  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
721    if (!admin.isTableEnabled(region.getTable())) {
722      return null;
723    }
724    HRegionLocation loc =
725      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), true);
726    if (loc != null) {
727      return loc.getServerName();
728    } else {
729      return null;
730    }
731  }
732
733  @Override
734  protected void addOptions() {
735    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
736    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
737    this.addOptWithArg("m", "maxthreads",
738        "Define the maximum number of threads to use to unload and reload the regions");
739    this.addOptWithArg("x", "excludefile",
740        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
741            + "target host; useful for rack decommisioning.");
742    this.addOptWithArg("f", "filename",
743        "File to save regions list into unloading, or read from loading; "
744            + "default /tmp/<usernamehostname:port>");
745    this.addOptNoArg("n", "noack",
746        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
747            + "RegionServer, hence best effort. This is more performant in unloading and loading "
748            + "but might lead to region being unavailable for some time till master reassigns it "
749            + "in case the move failed");
750    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
751        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
752  }
753
754  @Override
755  protected void processOptions(CommandLine cmd) {
756    String hostname = cmd.getOptionValue("r");
757    rmbuilder = new RegionMoverBuilder(hostname);
758    if (cmd.hasOption('m')) {
759      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
760    }
761    if (cmd.hasOption('n')) {
762      rmbuilder.ack(false);
763    }
764    if (cmd.hasOption('f')) {
765      rmbuilder.filename(cmd.getOptionValue('f'));
766    }
767    if (cmd.hasOption('x')) {
768      rmbuilder.excludeFile(cmd.getOptionValue('x'));
769    }
770    if (cmd.hasOption('t')) {
771      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
772    }
773    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
774  }
775
776  @Override
777  protected int doWork() throws Exception {
778    boolean success;
779    try (RegionMover rm = rmbuilder.build()) {
780      if (loadUnload.equalsIgnoreCase("load")) {
781        success = rm.load();
782      } else if (loadUnload.equalsIgnoreCase("unload")) {
783        success = rm.unload();
784      } else {
785        printUsage();
786        success = false;
787      }
788    }
789    return (success ? 0 : 1);
790  }
791
792  public static void main(String[] args) {
793    try (RegionMover mover = new RegionMover()) {
794      mover.doStaticMain(args);
795    }
796  }
797}