001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.UUID; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 037import org.apache.hadoop.hbase.mapreduce.Import.Importer; 038import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 039import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.util.Tool; 044import org.apache.hadoop.util.ToolRunner; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Tool used to copy a table to another one which can be on a different setup. It is also 051 * configurable with a start and time as well as a specification of the region server implementation 052 * if different from the local cluster. 053 */ 054@InterfaceAudience.Public 055public class CopyTable extends Configured implements Tool { 056 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 057 058 final static String NAME = "copytable"; 059 long startTime = 0; 060 long endTime = HConstants.LATEST_TIMESTAMP; 061 int batch = Integer.MAX_VALUE; 062 int cacheRow = -1; 063 int versions = -1; 064 String tableName = null; 065 String startRow = null; 066 String stopRow = null; 067 String dstTableName = null; 068 URI peerUri = null; 069 /** 070 * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead. 071 */ 072 @Deprecated 073 String peerAddress = null; 074 String families = null; 075 boolean allCells = false; 076 static boolean shuffle = false; 077 078 boolean bulkload = false; 079 Path bulkloadDir = null; 080 081 boolean readingSnapshot = false; 082 String snapshot = null; 083 084 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 085 086 private Path generateUniqTempDir(boolean withDirCreated) throws IOException { 087 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 088 Path dir = new Path(fs.getWorkingDirectory(), NAME); 089 if (!fs.exists(dir)) { 090 fs.mkdirs(dir); 091 } 092 Path newDir = new Path(dir, UUID.randomUUID().toString()); 093 if (withDirCreated) { 094 fs.mkdirs(newDir); 095 } 096 return newDir; 097 } 098 099 private void initCopyTableMapperJob(Job job, Scan scan) throws IOException { 100 Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class; 101 if (readingSnapshot) { 102 TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, 103 generateUniqTempDir(true)); 104 } else { 105 TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); 106 } 107 } 108 109 /** 110 * Sets up the actual job. 111 * @param args The command line parameters. 112 * @return The newly created job. 113 * @throws IOException When setting up the job fails. 114 */ 115 public Job createSubmittableJob(String[] args) throws IOException { 116 if (!doCommandLine(args)) { 117 return null; 118 } 119 120 String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); 121 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); 122 job.setJarByClass(CopyTable.class); 123 Scan scan = new Scan(); 124 125 scan.setBatch(batch); 126 scan.setCacheBlocks(false); 127 128 if (cacheRow > 0) { 129 scan.setCaching(cacheRow); 130 } else { 131 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 132 } 133 134 scan.setTimeRange(startTime, endTime); 135 136 if (allCells) { 137 scan.setRaw(true); 138 } 139 if (shuffle) { 140 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 141 } 142 if (versions >= 0) { 143 scan.readVersions(versions); 144 } 145 146 if (startRow != null) { 147 scan.withStartRow(Bytes.toBytesBinary(startRow)); 148 } 149 150 if (stopRow != null) { 151 scan.withStopRow(Bytes.toBytesBinary(stopRow)); 152 } 153 154 if (families != null) { 155 String[] fams = families.split(","); 156 Map<String, String> cfRenameMap = new HashMap<>(); 157 for (String fam : fams) { 158 String sourceCf; 159 if (fam.contains(":")) { 160 // fam looks like "sourceCfName:destCfName" 161 String[] srcAndDest = fam.split(":", 2); 162 sourceCf = srcAndDest[0]; 163 String destCf = srcAndDest[1]; 164 cfRenameMap.put(sourceCf, destCf); 165 } else { 166 // fam is just "sourceCf" 167 sourceCf = fam; 168 } 169 scan.addFamily(Bytes.toBytes(sourceCf)); 170 } 171 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 172 } 173 job.setNumReduceTasks(0); 174 175 if (bulkload) { 176 initCopyTableMapperJob(job, scan); 177 178 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 179 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 180 181 bulkloadDir = generateUniqTempDir(false); 182 LOG.info("HFiles will be stored at " + this.bulkloadDir); 183 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 184 try (Connection conn = ConnectionFactory.createConnection(getConf()); 185 Admin admin = conn.getAdmin()) { 186 HFileOutputFormat2.configureIncrementalLoadMap(job, 187 admin.getDescriptor((TableName.valueOf(dstTableName)))); 188 } 189 } else { 190 initCopyTableMapperJob(job, scan); 191 if (peerUri != null) { 192 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri); 193 } else if (peerAddress != null) { 194 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress); 195 } else { 196 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job); 197 } 198 199 } 200 201 return job; 202 } 203 204 /* 205 * @param errorMsg Error message. Can be null. 206 */ 207 private static void printUsage(final String errorMsg) { 208 if (errorMsg != null && errorMsg.length() > 0) { 209 System.err.println("ERROR: " + errorMsg); 210 } 211 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " 212 + "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>"); 213 System.err.println(); 214 System.err.println("Options:"); 215 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 216 System.err.println(" specify if different from current cluster"); 217 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 218 System.err.println(" startrow the start row"); 219 System.err.println(" stoprow the stop row"); 220 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 221 System.err.println(" without endtime means from starttime to forever"); 222 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 223 System.err.println(" versions number of cell versions to copy"); 224 System.err.println(" new.name new table's name"); 225 System.err.println(" peer.uri The URI of the peer cluster"); 226 System.err.println(" peer.adr Address of the peer cluster given in the format"); 227 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 228 + ".port:zookeeper.znode.parent"); 229 System.err.println(" Do not take effect if peer.uri is specified"); 230 System.err.println(" Deprecated, please use peer.uri instead"); 231 System.err.println(" families comma-separated list of families to copy"); 232 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 233 System.err.println(" To keep the same name, just give \"cfName\""); 234 System.err.println(" all.cells also copy delete markers and deleted cells"); 235 System.err 236 .println(" bulkload Write input into HFiles and bulk load to the destination " + "table"); 237 System.err.println(" snapshot Copy the data from snapshot to destination table."); 238 System.err.println(); 239 System.err.println("Args:"); 240 System.err.println(" tablename Name of the table to copy"); 241 System.err.println(); 242 System.err.println("Examples:"); 243 System.err 244 .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 245 System.err.println(" $ hbase " 246 + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " 247 + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 248 System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); 249 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 250 + "--snapshot --new.name=destTable sourceTableSnapshot"); 251 System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); 252 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 253 + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); 254 System.err.println("For performance consider the following general option:\n" 255 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 256 + " decreases the round trip time to the server and may increase performance.\n" 257 + " -Dhbase.client.scanner.caching=100\n" 258 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 259 + " inaccurate results.\n" + " -Dmapreduce.map.speculative=false"); 260 } 261 262 private boolean doCommandLine(final String[] args) { 263 if (args.length < 1) { 264 printUsage(null); 265 return false; 266 } 267 for (int i = 0; i < args.length; i++) { 268 String cmd = args[i]; 269 if (cmd.equals("-h") || cmd.startsWith("--h")) { 270 printUsage(null); 271 return false; 272 } 273 274 final String startRowArgKey = "--startrow="; 275 if (cmd.startsWith(startRowArgKey)) { 276 startRow = cmd.substring(startRowArgKey.length()); 277 continue; 278 } 279 280 final String stopRowArgKey = "--stoprow="; 281 if (cmd.startsWith(stopRowArgKey)) { 282 stopRow = cmd.substring(stopRowArgKey.length()); 283 continue; 284 } 285 286 final String startTimeArgKey = "--starttime="; 287 if (cmd.startsWith(startTimeArgKey)) { 288 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 289 continue; 290 } 291 292 final String endTimeArgKey = "--endtime="; 293 if (cmd.startsWith(endTimeArgKey)) { 294 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 295 continue; 296 } 297 298 final String batchArgKey = "--batch="; 299 if (cmd.startsWith(batchArgKey)) { 300 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 301 continue; 302 } 303 304 final String cacheRowArgKey = "--cacheRow="; 305 if (cmd.startsWith(cacheRowArgKey)) { 306 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 307 continue; 308 } 309 310 final String versionsArgKey = "--versions="; 311 if (cmd.startsWith(versionsArgKey)) { 312 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 313 continue; 314 } 315 316 final String newNameArgKey = "--new.name="; 317 if (cmd.startsWith(newNameArgKey)) { 318 dstTableName = cmd.substring(newNameArgKey.length()); 319 continue; 320 } 321 322 final String peerUriArgKey = "--peer.uri="; 323 if (cmd.startsWith(peerUriArgKey)) { 324 try { 325 peerUri = new URI(cmd.substring(peerUriArgKey.length())); 326 } catch (URISyntaxException e) { 327 LOG.error("Malformed peer uri specified: {}", cmd, e); 328 return false; 329 } 330 continue; 331 } 332 333 final String peerAdrArgKey = "--peer.adr="; 334 if (cmd.startsWith(peerAdrArgKey)) { 335 peerAddress = cmd.substring(peerAdrArgKey.length()); 336 continue; 337 } 338 339 final String familiesArgKey = "--families="; 340 if (cmd.startsWith(familiesArgKey)) { 341 families = cmd.substring(familiesArgKey.length()); 342 continue; 343 } 344 345 if (cmd.startsWith("--all.cells")) { 346 allCells = true; 347 continue; 348 } 349 350 if (cmd.startsWith("--bulkload")) { 351 bulkload = true; 352 continue; 353 } 354 355 if (cmd.startsWith("--shuffle")) { 356 shuffle = true; 357 continue; 358 } 359 360 if (cmd.startsWith("--snapshot")) { 361 readingSnapshot = true; 362 continue; 363 } 364 365 if (i == args.length - 1) { 366 if (readingSnapshot) { 367 snapshot = cmd; 368 } else { 369 tableName = cmd; 370 } 371 } else { 372 printUsage("Invalid argument '" + cmd + "'"); 373 return false; 374 } 375 } 376 if (dstTableName == null && peerAddress == null) { 377 printUsage("At least a new table name or a peer address must be specified"); 378 return false; 379 } 380 if ((endTime != 0) && (startTime > endTime)) { 381 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 382 return false; 383 } 384 385 if (bulkload && (peerUri != null || peerAddress != null)) { 386 printUsage("Remote bulkload is not supported!"); 387 return false; 388 } 389 390 if (readingSnapshot && (peerUri != null || peerAddress != null)) { 391 printUsage("Loading data from snapshot to remote peer cluster is not supported."); 392 return false; 393 } 394 395 if (readingSnapshot && dstTableName == null) { 396 printUsage("The --new.name=<table> for destination table should be " 397 + "provided when copying data from snapshot ."); 398 return false; 399 } 400 401 if (readingSnapshot && snapshot == null) { 402 printUsage("Snapshot shouldn't be null when --snapshot is enabled."); 403 return false; 404 } 405 406 // set dstTableName if necessary 407 if (dstTableName == null) { 408 dstTableName = tableName; 409 } 410 return true; 411 } 412 413 /** 414 * Main entry point. 415 * @param args The command line parameters. 416 * @throws Exception When running the job fails. 417 */ 418 public static void main(String[] args) throws Exception { 419 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 420 System.exit(ret); 421 } 422 423 @Override 424 public int run(String[] args) throws Exception { 425 Job job = createSubmittableJob(args); 426 if (job == null) { 427 return 1; 428 } 429 if (!job.waitForCompletion(true)) { 430 LOG.info("Map-reduce job failed!"); 431 if (bulkload) { 432 LOG.info("Files are not bulkloaded!"); 433 } 434 return 1; 435 } 436 int code = 0; 437 if (bulkload) { 438 LOG.info("Trying to bulk load data to destination table: " + dstTableName); 439 LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME, 440 this.bulkloadDir.toString(), this.dstTableName); 441 if ( 442 !BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir) 443 .isEmpty() 444 ) { 445 // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun 446 // BulkLoadHFiles. 447 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 448 if (!fs.delete(this.bulkloadDir, true)) { 449 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 450 code = 1; 451 } 452 } 453 } 454 return code; 455 } 456}