001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.BufferedInputStream;
023import java.io.BufferedOutputStream;
024import java.io.Closeable;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileOutputStream;
030import java.io.IOException;
031import java.nio.file.Files;
032import java.nio.file.Paths;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.EnumSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Locale;
039import java.util.concurrent.Callable;
040import java.util.concurrent.CancellationException;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.Future;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.TimeoutException;
047import java.util.function.Predicate;
048import org.apache.commons.io.IOUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.ClusterMetrics.Option;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
068import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
069
070/**
071 * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
072 * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
073 * acknowledges if regions are online after movement while noAck mode is best effort mode that
074 * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
075 * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
076 * anyways. This can also be used by constructiong an Object using the builder and then calling
077 * {@link #load()} or {@link #unload()} methods for the desired operations.
078 */
079@InterfaceAudience.Public
080public class RegionMover extends AbstractHBaseTool implements Closeable {
081  public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
082  public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
083  public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
084  public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
085  public static final int DEFAULT_MOVE_WAIT_MAX = 60;
086  public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
087  static final Logger LOG = LoggerFactory.getLogger(RegionMover.class);
088  private RegionMoverBuilder rmbuilder;
089  private boolean ack = true;
090  private int maxthreads = 1;
091  private int timeout;
092  private String loadUnload;
093  private String hostname;
094  private String filename;
095  private String excludeFile;
096  private int port;
097  private Connection conn;
098  private Admin admin;
099
100  private RegionMover(RegionMoverBuilder builder) throws IOException {
101    this.hostname = builder.hostname;
102    this.filename = builder.filename;
103    this.excludeFile = builder.excludeFile;
104    this.maxthreads = builder.maxthreads;
105    this.ack = builder.ack;
106    this.port = builder.port;
107    this.timeout = builder.timeout;
108    setConf(builder.conf);
109    this.conn = ConnectionFactory.createConnection(conf);
110    this.admin = conn.getAdmin();
111  }
112
113  private RegionMover() {
114  }
115
116  @Override
117  public void close() {
118    IOUtils.closeQuietly(this.admin);
119    IOUtils.closeQuietly(this.conn);
120  }
121
122  /**
123   * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has
124   * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
125   * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
126   */
127  public static class RegionMoverBuilder {
128    private boolean ack = true;
129    private int maxthreads = 1;
130    private int timeout = Integer.MAX_VALUE;
131    private String hostname;
132    private String filename;
133    private String excludeFile = null;
134    private String defaultDir = System.getProperty("java.io.tmpdir");
135    @VisibleForTesting
136    final int port;
137    private final Configuration conf;
138
139    public RegionMoverBuilder(String hostname) {
140      this(hostname, createConf());
141    }
142
143    /**
144     * Creates a new configuration and sets region mover specific overrides
145     */
146    private static Configuration createConf() {
147      Configuration conf = HBaseConfiguration.create();
148      conf.setInt("hbase.client.prefetch.limit", 1);
149      conf.setInt("hbase.client.pause", 500);
150      conf.setInt("hbase.client.retries.number", 100);
151      return conf;
152    }
153
154    /**
155     * @param hostname Hostname to unload regions from or load regions to. Can be either hostname
156     *     or hostname:port.
157     * @param conf Configuration object
158     */
159    public RegionMoverBuilder(String hostname, Configuration conf) {
160      String[] splitHostname = hostname.toLowerCase().split(":");
161      this.hostname = splitHostname[0];
162      if (splitHostname.length == 2) {
163        this.port = Integer.parseInt(splitHostname[1]);
164      } else {
165        this.port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
166      }
167      this.filename = defaultDir + File.separator + System.getProperty("user.name") + this.hostname
168        + ":" + Integer.toString(this.port);
169      this.conf = conf;
170    }
171
172    /**
173     * Path of file where regions will be written to during unloading/read from during loading
174     * @param filename
175     * @return RegionMoverBuilder object
176     */
177    public RegionMoverBuilder filename(String filename) {
178      this.filename = filename;
179      return this;
180    }
181
182    /**
183     * Set the max number of threads that will be used to move regions
184     */
185    public RegionMoverBuilder maxthreads(int threads) {
186      this.maxthreads = threads;
187      return this;
188    }
189
190    /**
191     * Path of file containing hostnames to be excluded during region movement. Exclude file should
192     * have 'host:port' per line. Port is mandatory here as we can have many RS running on a single
193     * host.
194     */
195    public RegionMoverBuilder excludeFile(String excludefile) {
196      this.excludeFile = excludefile;
197      return this;
198    }
199
200    /**
201     * Set ack/noAck mode.
202     * <p>
203     * In ack mode regions are acknowledged before and after moving and the move is retried
204     * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
205     * effort mode,each region movement is tried once.This can be used during graceful shutdown as
206     * even if we have a stuck region,upon shutdown it'll be reassigned anyway.
207     * <p>
208     * @param ack
209     * @return RegionMoverBuilder object
210     */
211    public RegionMoverBuilder ack(boolean ack) {
212      this.ack = ack;
213      return this;
214    }
215
216    /**
217     * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
218     * movers also have a separate time which is hbase.move.wait.max * number of regions to
219     * load/unload
220     * @param timeout in seconds
221     * @return RegionMoverBuilder object
222     */
223    public RegionMoverBuilder timeout(int timeout) {
224      this.timeout = timeout;
225      return this;
226    }
227
228    /**
229     * This method builds the appropriate RegionMover object which can then be used to load/unload
230     * using load and unload methods
231     * @return RegionMover object
232     */
233    public RegionMover build() throws IOException {
234      return new RegionMover(this);
235    }
236  }
237
238  /**
239   * Move Regions and make sure that they are up on the target server.If a region movement fails we
240   * exit as failure
241   */
242  private class MoveWithAck implements Callable<Boolean> {
243    private RegionInfo region;
244    private ServerName targetServer;
245    private List<RegionInfo> movedRegions;
246    private ServerName sourceServer;
247
248    public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer,
249        ServerName targetServer, List<RegionInfo> movedRegions) {
250      this.region = regionInfo;
251      this.targetServer = targetServer;
252      this.movedRegions = movedRegions;
253      this.sourceServer = sourceServer;
254    }
255
256    @Override
257    public Boolean call() throws IOException, InterruptedException {
258      boolean moved = false;
259      int count = 0;
260      int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
261      int maxWaitInSeconds =
262          admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
263      long startTime = EnvironmentEdgeManager.currentTime();
264      boolean sameServer = true;
265      // Assert we can scan the region in its current location
266      isSuccessfulScan(region);
267      LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
268          + targetServer);
269      while (count < retries && sameServer) {
270        if (count > 0) {
271          LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
272        }
273        count = count + 1;
274        admin.move(region.getEncodedNameAsBytes(), targetServer);
275        long maxWait = startTime + (maxWaitInSeconds * 1000);
276        while (EnvironmentEdgeManager.currentTime() < maxWait) {
277          sameServer = isSameServer(region, sourceServer);
278          if (!sameServer) {
279            break;
280          }
281          Thread.sleep(100);
282        }
283      }
284      if (sameServer) {
285        LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
286            + ",newServer=" + this.targetServer);
287      } else {
288        isSuccessfulScan(region);
289        LOG.info("Moved Region "
290            + region.getRegionNameAsString()
291            + " cost:"
292            + String.format("%.3f",
293            (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000));
294        moved = true;
295        movedRegions.add(region);
296      }
297      return moved;
298    }
299  }
300
301  /**
302   * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
303   * RS down anyways and not abort on a stuck region. Improves movement performance
304   */
305  private class MoveWithoutAck implements Callable<Boolean> {
306    private RegionInfo region;
307    private ServerName targetServer;
308    private List<RegionInfo> movedRegions;
309    private ServerName sourceServer;
310
311    public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer,
312        ServerName targetServer, List<RegionInfo> movedRegions) {
313      this.region = regionInfo;
314      this.targetServer = targetServer;
315      this.movedRegions = movedRegions;
316      this.sourceServer = sourceServer;
317    }
318
319    @Override
320    public Boolean call() {
321      try {
322        LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
323            + targetServer);
324        admin.move(region.getEncodedNameAsBytes(), targetServer);
325        LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
326            + targetServer);
327      } catch (Exception e) {
328        LOG.error("Error Moving Region:" + region.getEncodedName(), e);
329      } finally {
330        // we add region to the moved regions list in No Ack Mode since this is best effort
331        movedRegions.add(region);
332      }
333      return true;
334    }
335  }
336
337  /**
338   * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
339   * Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
340   * @return true if loading succeeded, false otherwise
341   */
342  public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
343    ExecutorService loadPool = Executors.newFixedThreadPool(1);
344    Future<Boolean> loadTask = loadPool.submit(() -> {
345      try {
346        List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
347        if (regionsToMove.isEmpty()) {
348          LOG.info("No regions to load.Exiting");
349          return true;
350        }
351        loadRegions(regionsToMove);
352      } catch (Exception e) {
353        LOG.error("Error while loading regions to " + hostname, e);
354        return false;
355      }
356      return true;
357    });
358    return waitTaskToFinish(loadPool, loadTask, "loading");
359  }
360
361  private void loadRegions(List<RegionInfo> regionsToMove)
362      throws Exception {
363    ServerName server = getTargetServer();
364    List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
365    LOG.info(
366        "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads
367            + " threads.Ack mode:" + this.ack);
368
369    ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
370    List<Future<Boolean>> taskList = new ArrayList<>();
371    int counter = 0;
372    while (counter < regionsToMove.size()) {
373      RegionInfo region = regionsToMove.get(counter);
374      ServerName currentServer = getServerNameForRegion(region);
375      if (currentServer == null) {
376        LOG.warn(
377            "Could not get server for Region:" + region.getRegionNameAsString() + " moving on");
378        counter++;
379        continue;
380      } else if (server.equals(currentServer)) {
381        LOG.info(
382            "Region " + region.getRegionNameAsString() + " is already on target server=" + server);
383        counter++;
384        continue;
385      }
386      if (ack) {
387        Future<Boolean> task =
388            moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions));
389        taskList.add(task);
390      } else {
391        Future<Boolean> task =
392            moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions));
393        taskList.add(task);
394      }
395      counter++;
396    }
397
398    moveRegionsPool.shutdown();
399    long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
400        .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
401    waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
402  }
403
404  /**
405   * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
406   * noAck mode we do not make sure that region is successfully online on the target region
407   * server,hence it is best effort.We do not unload regions to hostnames given in
408   * {@link #excludeFile}.
409   * @return true if unloading succeeded, false otherwise
410   */
411  public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
412    deleteFile(this.filename);
413    ExecutorService unloadPool = Executors.newFixedThreadPool(1);
414    Future<Boolean> unloadTask = unloadPool.submit(() -> {
415      List<RegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<>());
416      try {
417        // Get Online RegionServers
418        List<ServerName> regionServers = new ArrayList<>();
419        regionServers.addAll(admin.getRegionServers());
420        // Remove the host Region server from target Region Servers list
421        ServerName server = stripServer(regionServers, hostname, port);
422        if (server == null) {
423          LOG.info("Could not find server '{}:{}' in the set of region servers. giving up.",
424              hostname, port);
425          LOG.debug("List of region servers: {}", regionServers);
426          return false;
427        }
428        // Remove RS present in the exclude file
429        stripExcludes(regionServers);
430        stripMaster(regionServers);
431        if (regionServers.isEmpty()) {
432          LOG.warn("No Regions were moved - no servers available");
433          return false;
434        }
435        unloadRegions(server, regionServers, movedRegions);
436      } catch (Exception e) {
437        LOG.error("Error while unloading regions ", e);
438        return false;
439      } finally {
440        if (movedRegions != null) {
441          writeFile(filename, movedRegions);
442        }
443      }
444      return true;
445    });
446    return waitTaskToFinish(unloadPool, unloadTask, "unloading");
447  }
448
449  private void unloadRegions(ServerName server, List<ServerName> regionServers,
450      List<RegionInfo> movedRegions) throws Exception {
451    while (true) {
452      List<RegionInfo> regionsToMove = admin.getRegions(server);
453      regionsToMove.removeAll(movedRegions);
454      if (regionsToMove.isEmpty()) {
455        LOG.info("No Regions to move....Quitting now");
456        break;
457      }
458      int counter = 0;
459      LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
460          + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
461          + ack);
462      ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
463      List<Future<Boolean>> taskList = new ArrayList<>();
464      int serverIndex = 0;
465      while (counter < regionsToMove.size()) {
466        if (ack) {
467          Future<Boolean> task = moveRegionsPool.submit(
468              new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
469                  movedRegions));
470          taskList.add(task);
471        } else {
472          Future<Boolean> task = moveRegionsPool.submit(
473              new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex),
474                  movedRegions));
475          taskList.add(task);
476        }
477        counter++;
478        serverIndex = (serverIndex + 1) % regionServers.size();
479      }
480      moveRegionsPool.shutdown();
481      long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
482          .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
483      waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
484    }
485  }
486
487  private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
488      throws TimeoutException, InterruptedException, ExecutionException {
489    pool.shutdown();
490    try {
491      if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
492        LOG.warn(
493            "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout
494                + "sec");
495        pool.shutdownNow();
496      }
497    } catch (InterruptedException e) {
498      pool.shutdownNow();
499      Thread.currentThread().interrupt();
500    }
501    try {
502      return task.get(5, TimeUnit.SECONDS);
503    } catch (InterruptedException e) {
504      LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e);
505      throw e;
506    } catch (ExecutionException e) {
507      LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e);
508      throw e;
509    }
510  }
511
512  private void waitMoveTasksToFinish(ExecutorService moveRegionsPool,
513      List<Future<Boolean>> taskList, long timeoutInSeconds) throws Exception {
514    try {
515      if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
516        moveRegionsPool.shutdownNow();
517      }
518    } catch (InterruptedException e) {
519      moveRegionsPool.shutdownNow();
520      Thread.currentThread().interrupt();
521    }
522    for (Future<Boolean> future : taskList) {
523      try {
524        // if even after shutdownNow threads are stuck we wait for 5 secs max
525        if (!future.get(5, TimeUnit.SECONDS)) {
526          LOG.error("Was Not able to move region....Exiting Now");
527          throw new Exception("Could not move region Exception");
528        }
529      } catch (InterruptedException e) {
530        LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
531        throw e;
532      } catch (ExecutionException e) {
533        LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
534        throw e;
535      } catch (CancellationException e) {
536        LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds
537            + "secs", e);
538        throw e;
539      }
540    }
541  }
542
543  private ServerName getTargetServer() throws Exception {
544    ServerName server = null;
545    int maxWaitInSeconds =
546        admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
547    long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000;
548    while (EnvironmentEdgeManager.currentTime() < maxWait) {
549      try {
550        List<ServerName> regionServers = new ArrayList<>();
551        regionServers.addAll(admin.getRegionServers());
552        // Remove the host Region server from target Region Servers list
553        server = stripServer(regionServers, hostname, port);
554        if (server != null) {
555          break;
556        } else {
557          LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting");
558        }
559      } catch (IOException e) {
560        LOG.warn("Could not get list of region servers", e);
561      }
562      Thread.sleep(500);
563    }
564    if (server == null) {
565      LOG.error("Server " + hostname + ":" + port + " is not up. Giving up.");
566      throw new Exception("Server " + hostname + ":" + port + " to load regions not online");
567    }
568    return server;
569  }
570
571  private List<RegionInfo> readRegionsFromFile(String filename) throws IOException {
572    List<RegionInfo> regions = new ArrayList<>();
573    File f = new File(filename);
574    if (!f.exists()) {
575      return regions;
576    }
577    try (DataInputStream dis = new DataInputStream(
578        new BufferedInputStream(new FileInputStream(f)))) {
579      int numRegions = dis.readInt();
580      int index = 0;
581      while (index < numRegions) {
582        regions.add(RegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
583        index++;
584      }
585    } catch (IOException e) {
586      LOG.error("Error while reading regions from file:" + filename, e);
587      throw e;
588    }
589    return regions;
590  }
591
592  /**
593   * Write the number of regions moved in the first line followed by regions moved in subsequent
594   * lines
595   */
596  private void writeFile(String filename, List<RegionInfo> movedRegions) throws IOException {
597    try (DataOutputStream dos = new DataOutputStream(
598        new BufferedOutputStream(new FileOutputStream(filename)))) {
599      dos.writeInt(movedRegions.size());
600      for (RegionInfo region : movedRegions) {
601        Bytes.writeByteArray(dos, RegionInfo.toByteArray(region));
602      }
603    } catch (IOException e) {
604      LOG.error(
605          "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions
606              .size() + " regions", e);
607      throw e;
608    }
609  }
610
611  private void deleteFile(String filename) {
612    File f = new File(filename);
613    if (f.exists()) {
614      f.delete();
615    }
616  }
617
618  /**
619   * @return List of servers from the exclude file in format 'hostname:port'.
620   */
621  private List<String> readExcludes(String excludeFile) throws IOException {
622    List<String> excludeServers = new ArrayList<>();
623    if (excludeFile == null) {
624      return excludeServers;
625    } else {
626      try {
627        Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim)
628            .filter(((Predicate<String>) String::isEmpty).negate()).map(String::toLowerCase)
629            .forEach(excludeServers::add);
630      } catch (IOException e) {
631        LOG.warn("Exception while reading excludes file, continuing anyways", e);
632      }
633      return excludeServers;
634    }
635  }
636
637  /**
638   * Excludes the servername whose hostname and port portion matches the list given in exclude file
639   */
640  private void stripExcludes(List<ServerName> regionServers) throws IOException {
641    if (excludeFile != null) {
642      List<String> excludes = readExcludes(excludeFile);
643      Iterator<ServerName> i = regionServers.iterator();
644      while (i.hasNext()) {
645        String rs = i.next().getServerName();
646        String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs
647            .split(ServerName.SERVERNAME_SEPARATOR)[1];
648        if (excludes.contains(rsPort)) {
649          i.remove();
650        }
651      }
652      LOG.info("Valid Region server targets are:" + regionServers.toString());
653      LOG.info("Excluded Servers are" + excludes.toString());
654    }
655  }
656
657  /**
658   * Exclude master from list of RSs to move regions to
659   */
660  private void stripMaster(List<ServerName> regionServers) throws IOException {
661    ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName();
662    stripServer(regionServers, master.getHostname(), master.getPort());
663  }
664
665  /**
666   * Remove the servername whose hostname and port portion matches from the passed array of servers.
667   * Returns as side-effect the servername removed.
668   * @return server removed from list of Region Servers
669   */
670  private ServerName stripServer(List<ServerName> regionServers, String hostname, int port) {
671    for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
672      ServerName server = iter.next();
673      if (server.getAddress().getHostname().equalsIgnoreCase(hostname) &&
674        server.getAddress().getPort() == port) {
675        iter.remove();
676        return server;
677      }
678    }
679    return null;
680  }
681
682  /**
683   * Tries to scan a row from passed region
684   */
685  private void isSuccessfulScan(RegionInfo region) throws IOException {
686    Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
687        .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
688        .setCacheBlocks(false);
689    try (Table table = conn.getTable(region.getTable());
690        ResultScanner scanner = table.getScanner(scan)) {
691      scanner.next();
692    } catch (IOException e) {
693      LOG.error("Could not scan region:" + region.getEncodedName(), e);
694      throw e;
695    }
696  }
697
698  /**
699   * Returns true if passed region is still on serverName when we look at hbase:meta.
700   * @return true if region is hosted on serverName otherwise false
701   */
702  private boolean isSameServer(RegionInfo region, ServerName serverName)
703      throws IOException {
704    ServerName serverForRegion = getServerNameForRegion(region);
705    if (serverForRegion != null && serverForRegion.equals(serverName)) {
706      return true;
707    }
708    return false;
709  }
710
711  /**
712   * Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
713   * startcode comma-delimited. Can return null
714   * @return regionServer hosting the given region
715   */
716  private ServerName getServerNameForRegion(RegionInfo region) throws IOException {
717    if (!admin.isTableEnabled(region.getTable())) {
718      return null;
719    }
720    HRegionLocation loc =
721      conn.getRegionLocator(region.getTable()).getRegionLocation(region.getStartKey(), true);
722    if (loc != null) {
723      return loc.getServerName();
724    } else {
725      return null;
726    }
727  }
728
729  @Override
730  protected void addOptions() {
731    this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
732    this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
733    this.addOptWithArg("m", "maxthreads",
734        "Define the maximum number of threads to use to unload and reload the regions");
735    this.addOptWithArg("x", "excludefile",
736        "File with <hostname:port> per line to exclude as unload targets; default excludes only "
737            + "target host; useful for rack decommisioning.");
738    this.addOptWithArg("f", "filename",
739        "File to save regions list into unloading, or read from loading; "
740            + "default /tmp/<usernamehostname:port>");
741    this.addOptNoArg("n", "noack",
742        "Turn on No-Ack mode(default: false) which won't check if region is online on target "
743            + "RegionServer, hence best effort. This is more performant in unloading and loading "
744            + "but might lead to region being unavailable for some time till master reassigns it "
745            + "in case the move failed");
746    this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
747        + "irrespective of whether it finished or not;default Integer.MAX_VALUE");
748  }
749
750  @Override
751  protected void processOptions(CommandLine cmd) {
752    String hostname = cmd.getOptionValue("r");
753    rmbuilder = new RegionMoverBuilder(hostname);
754    if (cmd.hasOption('m')) {
755      rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
756    }
757    if (cmd.hasOption('n')) {
758      rmbuilder.ack(false);
759    }
760    if (cmd.hasOption('f')) {
761      rmbuilder.filename(cmd.getOptionValue('f'));
762    }
763    if (cmd.hasOption('x')) {
764      rmbuilder.excludeFile(cmd.getOptionValue('x'));
765    }
766    if (cmd.hasOption('t')) {
767      rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
768    }
769    this.loadUnload = cmd.getOptionValue("o").toLowerCase(Locale.ROOT);
770  }
771
772  @Override
773  protected int doWork() throws Exception {
774    boolean success;
775    try (RegionMover rm = rmbuilder.build()) {
776      if (loadUnload.equalsIgnoreCase("load")) {
777        success = rm.load();
778      } else if (loadUnload.equalsIgnoreCase("unload")) {
779        success = rm.unload();
780      } else {
781        printUsage();
782        success = false;
783      }
784    }
785    return (success ? 0 : 1);
786  }
787
788  public static void main(String[] args) {
789    try (RegionMover mover = new RegionMover()) {
790      mover.doStaticMain(args);
791    }
792  }
793}