001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.UUID;
025
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
033import org.apache.hadoop.hbase.mapreduce.Import.Importer;
034import org.apache.hadoop.hbase.util.FSUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.util.Tool;
046import org.apache.hadoop.util.ToolRunner;
047
048/**
049 * Tool used to copy a table to another one which can be on a different setup.
050 * It is also configurable with a start and time as well as a specification
051 * of the region server implementation if different from the local cluster.
052 */
053@InterfaceAudience.Public
054public class CopyTable extends Configured implements Tool {
055  private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class);
056
057  final static String NAME = "copytable";
058  long startTime = 0;
059  long endTime = HConstants.LATEST_TIMESTAMP;
060  int batch = Integer.MAX_VALUE;
061  int cacheRow = -1;
062  int versions = -1;
063  String tableName = null;
064  String startRow = null;
065  String stopRow = null;
066  String dstTableName = null;
067  String peerAddress = null;
068  String families = null;
069  boolean allCells = false;
070  static boolean shuffle = false;
071
072  boolean bulkload = false;
073  Path bulkloadDir = null;
074
075  boolean readingSnapshot = false;
076  String snapshot = null;
077
078  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
079
080  private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
081    FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
082    Path dir = new Path(fs.getWorkingDirectory(), NAME);
083    if (!fs.exists(dir)) {
084      fs.mkdirs(dir);
085    }
086    Path newDir = new Path(dir, UUID.randomUUID().toString());
087    if (withDirCreated) {
088      fs.mkdirs(newDir);
089    }
090    return newDir;
091  }
092
093  private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
094    Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
095    if (readingSnapshot) {
096      TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
097        generateUniqTempDir(true));
098    } else {
099      TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
100    }
101  }
102
103  /**
104   * Sets up the actual job.
105   *
106   * @param args  The command line parameters.
107   * @return The newly created job.
108   * @throws IOException When setting up the job fails.
109   */
110  public Job createSubmittableJob(String[] args) throws IOException {
111    if (!doCommandLine(args)) {
112      return null;
113    }
114
115    String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
116    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
117    job.setJarByClass(CopyTable.class);
118    Scan scan = new Scan();
119
120    scan.setBatch(batch);
121    scan.setCacheBlocks(false);
122
123    if (cacheRow > 0) {
124      scan.setCaching(cacheRow);
125    } else {
126      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
127    }
128
129    scan.setTimeRange(startTime, endTime);
130
131    if (allCells) {
132      scan.setRaw(true);
133    }
134    if (shuffle) {
135      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
136    }
137    if (versions >= 0) {
138      scan.readVersions(versions);
139    }
140
141    if (startRow != null) {
142      scan.withStartRow(Bytes.toBytesBinary(startRow));
143    }
144
145    if (stopRow != null) {
146      scan.withStopRow(Bytes.toBytesBinary(stopRow));
147    }
148
149    if(families != null) {
150      String[] fams = families.split(",");
151      Map<String,String> cfRenameMap = new HashMap<>();
152      for(String fam : fams) {
153        String sourceCf;
154        if(fam.contains(":")) {
155            // fam looks like "sourceCfName:destCfName"
156            String[] srcAndDest = fam.split(":", 2);
157            sourceCf = srcAndDest[0];
158            String destCf = srcAndDest[1];
159            cfRenameMap.put(sourceCf, destCf);
160        } else {
161            // fam is just "sourceCf"
162            sourceCf = fam;
163        }
164        scan.addFamily(Bytes.toBytes(sourceCf));
165      }
166      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
167    }
168    job.setNumReduceTasks(0);
169
170    if (bulkload) {
171      initCopyTableMapperReducerJob(job, scan);
172
173      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
174      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
175
176      bulkloadDir = generateUniqTempDir(false);
177      LOG.info("HFiles will be stored at " + this.bulkloadDir);
178      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
179      try (Connection conn = ConnectionFactory.createConnection(getConf());
180          Admin admin = conn.getAdmin()) {
181        HFileOutputFormat2.configureIncrementalLoadMap(job,
182          admin.getDescriptor((TableName.valueOf(dstTableName))));
183      }
184    } else {
185      initCopyTableMapperReducerJob(job, scan);
186      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
187        null);
188    }
189
190    return job;
191  }
192
193  /*
194   * @param errorMsg Error message.  Can be null.
195   */
196  private static void printUsage(final String errorMsg) {
197    if (errorMsg != null && errorMsg.length() > 0) {
198      System.err.println("ERROR: " + errorMsg);
199    }
200    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
201        "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
202    System.err.println();
203    System.err.println("Options:");
204    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
205    System.err.println("              specify if different from current cluster");
206    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
207    System.err.println(" startrow     the start row");
208    System.err.println(" stoprow      the stop row");
209    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
210    System.err.println("              without endtime means from starttime to forever");
211    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
212    System.err.println(" versions     number of cell versions to copy");
213    System.err.println(" new.name     new table's name");
214    System.err.println(" peer.adr     Address of the peer cluster given in the format");
215    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
216        + ".port:zookeeper.znode.parent");
217    System.err.println(" families     comma-separated list of families to copy");
218    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
219    System.err.println("              To keep the same name, just give \"cfName\"");
220    System.err.println(" all.cells    also copy delete markers and deleted cells");
221    System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
222        + "table");
223    System.err.println(" snapshot     Copy the data from snapshot to destination table.");
224    System.err.println();
225    System.err.println("Args:");
226    System.err.println(" tablename    Name of the table to copy");
227    System.err.println();
228    System.err.println("Examples:");
229    System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
230    System.err.println(" $ hbase " +
231        "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
232        "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
233    System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
234    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
235        + "--snapshot --new.name=destTable sourceTableSnapshot");
236    System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
237    System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
238        + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
239    System.err.println("For performance consider the following general option:\n"
240        + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
241        + "  decreases the round trip time to the server and may increase performance.\n"
242        + "    -Dhbase.client.scanner.caching=100\n"
243        + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
244        + "  inaccurate results.\n"
245        + "    -Dmapreduce.map.speculative=false");
246  }
247
248  private boolean doCommandLine(final String[] args) {
249    if (args.length < 1) {
250      printUsage(null);
251      return false;
252    }
253    try {
254      for (int i = 0; i < args.length; i++) {
255        String cmd = args[i];
256        if (cmd.equals("-h") || cmd.startsWith("--h")) {
257          printUsage(null);
258          return false;
259        }
260
261        final String startRowArgKey = "--startrow=";
262        if (cmd.startsWith(startRowArgKey)) {
263          startRow = cmd.substring(startRowArgKey.length());
264          continue;
265        }
266
267        final String stopRowArgKey = "--stoprow=";
268        if (cmd.startsWith(stopRowArgKey)) {
269          stopRow = cmd.substring(stopRowArgKey.length());
270          continue;
271        }
272
273        final String startTimeArgKey = "--starttime=";
274        if (cmd.startsWith(startTimeArgKey)) {
275          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
276          continue;
277        }
278
279        final String endTimeArgKey = "--endtime=";
280        if (cmd.startsWith(endTimeArgKey)) {
281          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
282          continue;
283        }
284
285        final String batchArgKey = "--batch=";
286        if (cmd.startsWith(batchArgKey)) {
287          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
288          continue;
289        }
290
291        final String cacheRowArgKey = "--cacheRow=";
292        if (cmd.startsWith(cacheRowArgKey)) {
293          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
294          continue;
295        }
296
297        final String versionsArgKey = "--versions=";
298        if (cmd.startsWith(versionsArgKey)) {
299          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
300          continue;
301        }
302
303        final String newNameArgKey = "--new.name=";
304        if (cmd.startsWith(newNameArgKey)) {
305          dstTableName = cmd.substring(newNameArgKey.length());
306          continue;
307        }
308
309        final String peerAdrArgKey = "--peer.adr=";
310        if (cmd.startsWith(peerAdrArgKey)) {
311          peerAddress = cmd.substring(peerAdrArgKey.length());
312          continue;
313        }
314
315        final String familiesArgKey = "--families=";
316        if (cmd.startsWith(familiesArgKey)) {
317          families = cmd.substring(familiesArgKey.length());
318          continue;
319        }
320
321        if (cmd.startsWith("--all.cells")) {
322          allCells = true;
323          continue;
324        }
325
326        if (cmd.startsWith("--bulkload")) {
327          bulkload = true;
328          continue;
329        }
330
331        if (cmd.startsWith("--shuffle")) {
332          shuffle = true;
333          continue;
334        }
335
336        if(cmd.startsWith("--snapshot")){
337          readingSnapshot = true;
338          continue;
339        }
340
341        if (i == args.length - 1) {
342          if (readingSnapshot) {
343            snapshot = cmd;
344          } else {
345            tableName = cmd;
346          }
347        } else {
348          printUsage("Invalid argument '" + cmd + "'");
349          return false;
350        }
351      }
352      if (dstTableName == null && peerAddress == null) {
353        printUsage("At least a new table name or a peer address must be specified");
354        return false;
355      }
356      if ((endTime != 0) && (startTime > endTime)) {
357        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
358        return false;
359      }
360
361      if (bulkload && peerAddress != null) {
362        printUsage("Remote bulkload is not supported!");
363        return false;
364      }
365
366      if (readingSnapshot && peerAddress != null) {
367        printUsage("Loading data from snapshot to remote peer cluster is not supported.");
368        return false;
369      }
370
371      if (readingSnapshot && dstTableName == null) {
372        printUsage("The --new.name=<table> for destination table should be "
373            + "provided when copying data from snapshot .");
374        return false;
375      }
376
377      if (readingSnapshot && snapshot == null) {
378        printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
379        return false;
380      }
381
382      // set dstTableName if necessary
383      if (dstTableName == null) {
384        dstTableName = tableName;
385      }
386    } catch (Exception e) {
387      e.printStackTrace();
388      printUsage("Can't start because " + e.getMessage());
389      return false;
390    }
391    return true;
392  }
393
394  /**
395   * Main entry point.
396   *
397   * @param args  The command line parameters.
398   * @throws Exception When running the job fails.
399   */
400  public static void main(String[] args) throws Exception {
401    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
402    System.exit(ret);
403  }
404
405  @Override
406  public int run(String[] args) throws Exception {
407    Job job = createSubmittableJob(args);
408    if (job == null) return 1;
409    if (!job.waitForCompletion(true)) {
410      LOG.info("Map-reduce job failed!");
411      if (bulkload) {
412        LOG.info("Files are not bulkloaded!");
413      }
414      return 1;
415    }
416    int code = 0;
417    if (bulkload) {
418      LOG.info("Trying to bulk load data to destination table: " + dstTableName);
419      LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
420        this.bulkloadDir.toString(), this.dstTableName);
421      code = new LoadIncrementalHFiles(this.getConf())
422          .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
423      if (code == 0) {
424        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
425        // LoadIncrementalHFiles.
426        FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
427        if (!fs.delete(this.bulkloadDir, true)) {
428          LOG.error("Deleting folder " + bulkloadDir + " failed!");
429          code = 1;
430        }
431      }
432    }
433    return code;
434  }
435}