001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.UUID;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
035import org.apache.hadoop.hbase.mapreduce.Import.Importer;
036import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
037import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.util.Tool;
042import org.apache.hadoop.util.ToolRunner;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Tool used to copy a table to another one which can be on a different setup. It is also
049 * configurable with a start and time as well as a specification of the region server implementation
050 * if different from the local cluster.
051 */
052@InterfaceAudience.Public
053public class CopyTable extends Configured implements Tool {
054  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
055
056  final static String NAME = "copytable";
057  long startTime = 0;
058  long endTime = HConstants.LATEST_TIMESTAMP;
059  int batch = Integer.MAX_VALUE;
060  int cacheRow = -1;
061  int versions = -1;
062  String tableName = null;
063  String startRow = null;
064  String stopRow = null;
065  String dstTableName = null;
066  String peerAddress = null;
067  String families = null;
068  boolean allCells = false;
069  static boolean shuffle = false;
070
071  boolean bulkload = false;
072  Path bulkloadDir = null;
073
074  boolean readingSnapshot = false;
075  String snapshot = null;
076
077  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
078
079  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
080    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
081    Path dir = new Path(fs.getWorkingDirectory(), NAME);
082    if (!fs.exists(dir)) {
083      fs.mkdirs(dir);
084    }
085    Path newDir = new Path(dir, UUID.randomUUID().toString());
086    if (withDirCreated) {
087      fs.mkdirs(newDir);
088    }
089    return newDir;
090  }
091
092  private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
093    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
094    if (readingSnapshot) {
095      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
096        generateUniqTempDir(true));
097    } else {
098      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
099    }
100  }
101
102  /**
103   * Sets up the actual job.
104   * @param args The command line parameters.
105   * @return The newly created job.
106   * @throws IOException When setting up the job fails.
107   */
108  public Job createSubmittableJob(String[] args) throws IOException {
109    if (!doCommandLine(args)) {
110      return null;
111    }
112
113    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
114    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
115    job.setJarByClass(CopyTable.class);
116    Scan scan = new Scan();
117
118    scan.setBatch(batch);
119    scan.setCacheBlocks(false);
120
121    if (cacheRow > 0) {
122      scan.setCaching(cacheRow);
123    } else {
124      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
125    }
126
127    scan.setTimeRange(startTime, endTime);
128
129    if (allCells) {
130      scan.setRaw(true);
131    }
132    if (shuffle) {
133      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
134    }
135    if (versions >= 0) {
136      scan.readVersions(versions);
137    }
138
139    if (startRow != null) {
140      scan.withStartRow(Bytes.toBytesBinary(startRow));
141    }
142
143    if (stopRow != null) {
144      scan.withStopRow(Bytes.toBytesBinary(stopRow));
145    }
146
147    if (families != null) {
148      String[] fams = families.split(",");
149      Map<String, String> cfRenameMap = new HashMap<>();
150      for (String fam : fams) {
151        String sourceCf;
152        if (fam.contains(":")) {
153          // fam looks like "sourceCfName:destCfName"
154          String[] srcAndDest = fam.split(":", 2);
155          sourceCf = srcAndDest[0];
156          String destCf = srcAndDest[1];
157          cfRenameMap.put(sourceCf, destCf);
158        } else {
159          // fam is just "sourceCf"
160          sourceCf = fam;
161        }
162        scan.addFamily(Bytes.toBytes(sourceCf));
163      }
164      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
165    }
166    job.setNumReduceTasks(0);
167
168    if (bulkload) {
169      initCopyTableMapperReducerJob(job, scan);
170
171      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
172      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
173
174      bulkloadDir = generateUniqTempDir(false);
175      LOG.info("HFiles will be stored at " + this.bulkloadDir);
176      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
177      try (Connection conn = ConnectionFactory.createConnection(getConf());
178        Admin admin = conn.getAdmin()) {
179        HFileOutputFormat2.configureIncrementalLoadMap(job,
180          admin.getDescriptor((TableName.valueOf(dstTableName))));
181      }
182    } else {
183      initCopyTableMapperReducerJob(job, scan);
184      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
185        null);
186    }
187
188    return job;
189  }
190
191  /*
192   * @param errorMsg Error message. Can be null.
193   */
194  private static void printUsage(final String errorMsg) {
195    if (errorMsg != null && errorMsg.length() > 0) {
196      System.err.println("ERROR: " + errorMsg);
197    }
198    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
199      + "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
200    System.err.println();
201    System.err.println("Options:");
202    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
203    System.err.println("              specify if different from current cluster");
204    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
205    System.err.println(" startrow     the start row");
206    System.err.println(" stoprow      the stop row");
207    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
208    System.err.println("              without endtime means from starttime to forever");
209    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
210    System.err.println(" versions     number of cell versions to copy");
211    System.err.println(" new.name     new table's name");
212    System.err.println(" peer.adr     Address of the peer cluster given in the format");
213    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
214      + ".port:zookeeper.znode.parent");
215    System.err.println(" families     comma-separated list of families to copy");
216    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
217    System.err.println("              To keep the same name, just give \"cfName\"");
218    System.err.println(" all.cells    also copy delete markers and deleted cells");
219    System.err
220      .println(" bulkload     Write input into HFiles and bulk load to the destination " + "table");
221    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
222    System.err.println();
223    System.err.println("Args:");
224    System.err.println(" tablename    Name of the table to copy");
225    System.err.println();
226    System.err.println("Examples:");
227    System.err
228      .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
229    System.err.println(" $ hbase "
230      + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 "
231      + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
232    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
233    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
234      + "--snapshot --new.name=destTable sourceTableSnapshot");
235    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
236    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
237      + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
238    System.err.println("For performance consider the following general option:\n"
239      + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
240      + "  decreases the round trip time to the server and may increase performance.\n"
241      + "    -Dhbase.client.scanner.caching=100\n"
242      + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
243      + "  inaccurate results.\n" + "    -Dmapreduce.map.speculative=false");
244  }
245
246  private boolean doCommandLine(final String[] args) {
247    if (args.length < 1) {
248      printUsage(null);
249      return false;
250    }
251    try {
252      for (int i = 0; i < args.length; i++) {
253        String cmd = args[i];
254        if (cmd.equals("-h") || cmd.startsWith("--h")) {
255          printUsage(null);
256          return false;
257        }
258
259        final String startRowArgKey = "--startrow=";
260        if (cmd.startsWith(startRowArgKey)) {
261          startRow = cmd.substring(startRowArgKey.length());
262          continue;
263        }
264
265        final String stopRowArgKey = "--stoprow=";
266        if (cmd.startsWith(stopRowArgKey)) {
267          stopRow = cmd.substring(stopRowArgKey.length());
268          continue;
269        }
270
271        final String startTimeArgKey = "--starttime=";
272        if (cmd.startsWith(startTimeArgKey)) {
273          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
274          continue;
275        }
276
277        final String endTimeArgKey = "--endtime=";
278        if (cmd.startsWith(endTimeArgKey)) {
279          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
280          continue;
281        }
282
283        final String batchArgKey = "--batch=";
284        if (cmd.startsWith(batchArgKey)) {
285          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
286          continue;
287        }
288
289        final String cacheRowArgKey = "--cacheRow=";
290        if (cmd.startsWith(cacheRowArgKey)) {
291          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
292          continue;
293        }
294
295        final String versionsArgKey = "--versions=";
296        if (cmd.startsWith(versionsArgKey)) {
297          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
298          continue;
299        }
300
301        final String newNameArgKey = "--new.name=";
302        if (cmd.startsWith(newNameArgKey)) {
303          dstTableName = cmd.substring(newNameArgKey.length());
304          continue;
305        }
306
307        final String peerAdrArgKey = "--peer.adr=";
308        if (cmd.startsWith(peerAdrArgKey)) {
309          peerAddress = cmd.substring(peerAdrArgKey.length());
310          continue;
311        }
312
313        final String familiesArgKey = "--families=";
314        if (cmd.startsWith(familiesArgKey)) {
315          families = cmd.substring(familiesArgKey.length());
316          continue;
317        }
318
319        if (cmd.startsWith("--all.cells")) {
320          allCells = true;
321          continue;
322        }
323
324        if (cmd.startsWith("--bulkload")) {
325          bulkload = true;
326          continue;
327        }
328
329        if (cmd.startsWith("--shuffle")) {
330          shuffle = true;
331          continue;
332        }
333
334        if (cmd.startsWith("--snapshot")) {
335          readingSnapshot = true;
336          continue;
337        }
338
339        if (i == args.length - 1) {
340          if (readingSnapshot) {
341            snapshot = cmd;
342          } else {
343            tableName = cmd;
344          }
345        } else {
346          printUsage("Invalid argument '" + cmd + "'");
347          return false;
348        }
349      }
350      if (dstTableName == null && peerAddress == null) {
351        printUsage("At least a new table name or a peer address must be specified");
352        return false;
353      }
354      if ((endTime != 0) && (startTime > endTime)) {
355        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
356        return false;
357      }
358
359      if (bulkload && peerAddress != null) {
360        printUsage("Remote bulkload is not supported!");
361        return false;
362      }
363
364      if (readingSnapshot && peerAddress != null) {
365        printUsage("Loading data from snapshot to remote peer cluster is not supported.");
366        return false;
367      }
368
369      if (readingSnapshot && dstTableName == null) {
370        printUsage("The --new.name=<table> for destination table should be "
371          + "provided when copying data from snapshot .");
372        return false;
373      }
374
375      if (readingSnapshot && snapshot == null) {
376        printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
377        return false;
378      }
379
380      // set dstTableName if necessary
381      if (dstTableName == null) {
382        dstTableName = tableName;
383      }
384    } catch (Exception e) {
385      LOG.error("Failed to parse commandLine arguments", e);
386      printUsage("Can't start because " + e.getMessage());
387      return false;
388    }
389    return true;
390  }
391
392  /**
393   * Main entry point.
394   * @param args The command line parameters.
395   * @throws Exception When running the job fails.
396   */
397  public static void main(String[] args) throws Exception {
398    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
399    System.exit(ret);
400  }
401
402  @Override
403  public int run(String[] args) throws Exception {
404    Job job = createSubmittableJob(args);
405    if (job == null) return 1;
406    if (!job.waitForCompletion(true)) {
407      LOG.info("Map-reduce job failed!");
408      if (bulkload) {
409        LOG.info("Files are not bulkloaded!");
410      }
411      return 1;
412    }
413    int code = 0;
414    if (bulkload) {
415      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
416      LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME,
417        this.bulkloadDir.toString(), this.dstTableName);
418      if (
419        !BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir)
420          .isEmpty()
421      ) {
422        // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun
423        // BulkLoadHFiles.
424        FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
425        if (!fs.delete(this.bulkloadDir, true)) {
426          LOG.error("Deleting folder " + bulkloadDir + " failed!");
427          code = 1;
428        }
429      }
430    }
431    return code;
432  }
433}