001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.UUID;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
035import org.apache.hadoop.hbase.mapreduce.Import.Importer;
036import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.util.Tool;
041import org.apache.hadoop.util.ToolRunner;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Tool used to copy a table to another one which can be on a different setup. It is also
048 * configurable with a start and time as well as a specification of the region server implementation
049 * if different from the local cluster.
050 */
051@InterfaceAudience.Public
052public class CopyTable extends Configured implements Tool {
053  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
054
055  final static String NAME = "copytable";
056  long startTime = 0;
057  long endTime = HConstants.LATEST_TIMESTAMP;
058  int batch = Integer.MAX_VALUE;
059  int cacheRow = -1;
060  int versions = -1;
061  String tableName = null;
062  String startRow = null;
063  String stopRow = null;
064  String dstTableName = null;
065  String peerAddress = null;
066  String families = null;
067  boolean allCells = false;
068  static boolean shuffle = false;
069
070  boolean bulkload = false;
071  Path bulkloadDir = null;
072
073  boolean readingSnapshot = false;
074  String snapshot = null;
075
076  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
077
078  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
079    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
080    Path dir = new Path(fs.getWorkingDirectory(), NAME);
081    if (!fs.exists(dir)) {
082      fs.mkdirs(dir);
083    }
084    Path newDir = new Path(dir, UUID.randomUUID().toString());
085    if (withDirCreated) {
086      fs.mkdirs(newDir);
087    }
088    return newDir;
089  }
090
091  private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
092    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
093    if (readingSnapshot) {
094      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
095        generateUniqTempDir(true));
096    } else {
097      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
098    }
099  }
100
101  /**
102   * Sets up the actual job.
103   * @param args The command line parameters.
104   * @return The newly created job.
105   * @throws IOException When setting up the job fails.
106   */
107  public Job createSubmittableJob(String[] args) throws IOException {
108    if (!doCommandLine(args)) {
109      return null;
110    }
111
112    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
113    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
114    job.setJarByClass(CopyTable.class);
115    Scan scan = new Scan();
116
117    scan.setBatch(batch);
118    scan.setCacheBlocks(false);
119
120    if (cacheRow > 0) {
121      scan.setCaching(cacheRow);
122    } else {
123      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
124    }
125
126    scan.setTimeRange(startTime, endTime);
127
128    if (allCells) {
129      scan.setRaw(true);
130    }
131    if (shuffle) {
132      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
133    }
134    if (versions >= 0) {
135      scan.readVersions(versions);
136    }
137
138    if (startRow != null) {
139      scan.withStartRow(Bytes.toBytesBinary(startRow));
140    }
141
142    if (stopRow != null) {
143      scan.withStopRow(Bytes.toBytesBinary(stopRow));
144    }
145
146    if (families != null) {
147      String[] fams = families.split(",");
148      Map<String, String> cfRenameMap = new HashMap<>();
149      for (String fam : fams) {
150        String sourceCf;
151        if (fam.contains(":")) {
152          // fam looks like "sourceCfName:destCfName"
153          String[] srcAndDest = fam.split(":", 2);
154          sourceCf = srcAndDest[0];
155          String destCf = srcAndDest[1];
156          cfRenameMap.put(sourceCf, destCf);
157        } else {
158          // fam is just "sourceCf"
159          sourceCf = fam;
160        }
161        scan.addFamily(Bytes.toBytes(sourceCf));
162      }
163      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
164    }
165    job.setNumReduceTasks(0);
166
167    if (bulkload) {
168      initCopyTableMapperReducerJob(job, scan);
169
170      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
171      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
172
173      bulkloadDir = generateUniqTempDir(false);
174      LOG.info("HFiles will be stored at " + this.bulkloadDir);
175      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
176      try (Connection conn = ConnectionFactory.createConnection(getConf());
177        Admin admin = conn.getAdmin()) {
178        HFileOutputFormat2.configureIncrementalLoadMap(job,
179          admin.getDescriptor((TableName.valueOf(dstTableName))));
180      }
181    } else {
182      initCopyTableMapperReducerJob(job, scan);
183      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
184        null);
185    }
186
187    return job;
188  }
189
190  /*
191   * @param errorMsg Error message. Can be null.
192   */
193  private static void printUsage(final String errorMsg) {
194    if (errorMsg != null && errorMsg.length() > 0) {
195      System.err.println("ERROR: " + errorMsg);
196    }
197    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
198      + "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
199    System.err.println();
200    System.err.println("Options:");
201    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
202    System.err.println("              specify if different from current cluster");
203    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
204    System.err.println(" startrow     the start row");
205    System.err.println(" stoprow      the stop row");
206    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
207    System.err.println("              without endtime means from starttime to forever");
208    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
209    System.err.println(" versions     number of cell versions to copy");
210    System.err.println(" new.name     new table's name");
211    System.err.println(" peer.adr     Address of the peer cluster given in the format");
212    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
213      + ".port:zookeeper.znode.parent");
214    System.err.println(" families     comma-separated list of families to copy");
215    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
216    System.err.println("              To keep the same name, just give \"cfName\"");
217    System.err.println(" all.cells    also copy delete markers and deleted cells");
218    System.err
219      .println(" bulkload     Write input into HFiles and bulk load to the destination " + "table");
220    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
221    System.err.println();
222    System.err.println("Args:");
223    System.err.println(" tablename    Name of the table to copy");
224    System.err.println();
225    System.err.println("Examples:");
226    System.err
227      .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
228    System.err.println(" $ hbase "
229      + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 "
230      + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
231    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
232    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
233      + "--snapshot --new.name=destTable sourceTableSnapshot");
234    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
235    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
236      + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
237    System.err.println("For performance consider the following general option:\n"
238      + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
239      + "  decreases the round trip time to the server and may increase performance.\n"
240      + "    -Dhbase.client.scanner.caching=100\n"
241      + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
242      + "  inaccurate results.\n" + "    -Dmapreduce.map.speculative=false");
243  }
244
245  private boolean doCommandLine(final String[] args) {
246    if (args.length < 1) {
247      printUsage(null);
248      return false;
249    }
250    try {
251      for (int i = 0; i < args.length; i++) {
252        String cmd = args[i];
253        if (cmd.equals("-h") || cmd.startsWith("--h")) {
254          printUsage(null);
255          return false;
256        }
257
258        final String startRowArgKey = "--startrow=";
259        if (cmd.startsWith(startRowArgKey)) {
260          startRow = cmd.substring(startRowArgKey.length());
261          continue;
262        }
263
264        final String stopRowArgKey = "--stoprow=";
265        if (cmd.startsWith(stopRowArgKey)) {
266          stopRow = cmd.substring(stopRowArgKey.length());
267          continue;
268        }
269
270        final String startTimeArgKey = "--starttime=";
271        if (cmd.startsWith(startTimeArgKey)) {
272          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
273          continue;
274        }
275
276        final String endTimeArgKey = "--endtime=";
277        if (cmd.startsWith(endTimeArgKey)) {
278          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
279          continue;
280        }
281
282        final String batchArgKey = "--batch=";
283        if (cmd.startsWith(batchArgKey)) {
284          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
285          continue;
286        }
287
288        final String cacheRowArgKey = "--cacheRow=";
289        if (cmd.startsWith(cacheRowArgKey)) {
290          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
291          continue;
292        }
293
294        final String versionsArgKey = "--versions=";
295        if (cmd.startsWith(versionsArgKey)) {
296          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
297          continue;
298        }
299
300        final String newNameArgKey = "--new.name=";
301        if (cmd.startsWith(newNameArgKey)) {
302          dstTableName = cmd.substring(newNameArgKey.length());
303          continue;
304        }
305
306        final String peerAdrArgKey = "--peer.adr=";
307        if (cmd.startsWith(peerAdrArgKey)) {
308          peerAddress = cmd.substring(peerAdrArgKey.length());
309          continue;
310        }
311
312        final String familiesArgKey = "--families=";
313        if (cmd.startsWith(familiesArgKey)) {
314          families = cmd.substring(familiesArgKey.length());
315          continue;
316        }
317
318        if (cmd.startsWith("--all.cells")) {
319          allCells = true;
320          continue;
321        }
322
323        if (cmd.startsWith("--bulkload")) {
324          bulkload = true;
325          continue;
326        }
327
328        if (cmd.startsWith("--shuffle")) {
329          shuffle = true;
330          continue;
331        }
332
333        if (cmd.startsWith("--snapshot")) {
334          readingSnapshot = true;
335          continue;
336        }
337
338        if (i == args.length - 1) {
339          if (readingSnapshot) {
340            snapshot = cmd;
341          } else {
342            tableName = cmd;
343          }
344        } else {
345          printUsage("Invalid argument '" + cmd + "'");
346          return false;
347        }
348      }
349      if (dstTableName == null && peerAddress == null) {
350        printUsage("At least a new table name or a peer address must be specified");
351        return false;
352      }
353      if ((endTime != 0) && (startTime > endTime)) {
354        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
355        return false;
356      }
357
358      if (bulkload && peerAddress != null) {
359        printUsage("Remote bulkload is not supported!");
360        return false;
361      }
362
363      if (readingSnapshot && peerAddress != null) {
364        printUsage("Loading data from snapshot to remote peer cluster is not supported.");
365        return false;
366      }
367
368      if (readingSnapshot && dstTableName == null) {
369        printUsage("The --new.name=<table> for destination table should be "
370          + "provided when copying data from snapshot .");
371        return false;
372      }
373
374      if (readingSnapshot && snapshot == null) {
375        printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
376        return false;
377      }
378
379      // set dstTableName if necessary
380      if (dstTableName == null) {
381        dstTableName = tableName;
382      }
383    } catch (Exception e) {
384      e.printStackTrace();
385      printUsage("Can't start because " + e.getMessage());
386      return false;
387    }
388    return true;
389  }
390
391  /**
392   * Main entry point.
393   * @param args The command line parameters.
394   * @throws Exception When running the job fails.
395   */
396  public static void main(String[] args) throws Exception {
397    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
398    System.exit(ret);
399  }
400
401  @Override
402  public int run(String[] args) throws Exception {
403    Job job = createSubmittableJob(args);
404    if (job == null) return 1;
405    if (!job.waitForCompletion(true)) {
406      LOG.info("Map-reduce job failed!");
407      if (bulkload) {
408        LOG.info("Files are not bulkloaded!");
409      }
410      return 1;
411    }
412    int code = 0;
413    if (bulkload) {
414      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
415      LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
416        this.bulkloadDir.toString(), this.dstTableName);
417      code = new LoadIncrementalHFiles(this.getConf())
418        .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
419      if (code == 0) {
420        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
421        // LoadIncrementalHFiles.
422        FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
423        if (!fs.delete(this.bulkloadDir, true)) {
424          LOG.error("Deleting folder " + bulkloadDir + " failed!");
425          code = 1;
426        }
427      }
428    }
429    return code;
430  }
431}