001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.UUID; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 037import org.apache.hadoop.hbase.mapreduce.Import.Importer; 038import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 039import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.util.Tool; 044import org.apache.hadoop.util.ToolRunner; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Tool used to copy a table to another one which can be on a different setup. It is also 051 * configurable with a start and time as well as a specification of the region server implementation 052 * if different from the local cluster. 053 */ 054@InterfaceAudience.Public 055public class CopyTable extends Configured implements Tool { 056 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 057 058 final static String NAME = "copytable"; 059 long startTime = 0; 060 long endTime = HConstants.LATEST_TIMESTAMP; 061 int batch = Integer.MAX_VALUE; 062 int cacheRow = -1; 063 int versions = -1; 064 String tableName = null; 065 String startRow = null; 066 String stopRow = null; 067 String dstTableName = null; 068 URI peerUri = null; 069 /** 070 * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead. 071 */ 072 @Deprecated 073 String peerAddress = null; 074 String families = null; 075 boolean allCells = false; 076 static boolean shuffle = false; 077 078 boolean bulkload = false; 079 Path bulkloadDir = null; 080 081 boolean readingSnapshot = false; 082 String snapshot = null; 083 084 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 085 086 private Path generateUniqTempDir(boolean withDirCreated) throws IOException { 087 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 088 Path dir = new Path(fs.getWorkingDirectory(), NAME); 089 if (!fs.exists(dir)) { 090 fs.mkdirs(dir); 091 } 092 Path newDir = new Path(dir, UUID.randomUUID().toString()); 093 if (withDirCreated) { 094 fs.mkdirs(newDir); 095 } 096 return newDir; 097 } 098 099 private void initCopyTableMapperJob(Job job, Scan scan) throws IOException { 100 Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class; 101 if (readingSnapshot) { 102 TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, 103 generateUniqTempDir(true)); 104 } else { 105 TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); 106 } 107 } 108 109 /** 110 * Sets up the actual job. 111 * @param args The command line parameters. 112 * @return The newly created job. 113 * @throws IOException When setting up the job fails. 114 */ 115 public Job createSubmittableJob(String[] args) throws IOException { 116 if (!doCommandLine(args)) { 117 return null; 118 } 119 120 String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); 121 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); 122 job.setJarByClass(CopyTable.class); 123 Scan scan = new Scan(); 124 125 scan.setBatch(batch); 126 scan.setCacheBlocks(false); 127 128 if (cacheRow > 0) { 129 scan.setCaching(cacheRow); 130 } else { 131 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 132 } 133 134 scan.setTimeRange(startTime, endTime); 135 136 if (allCells) { 137 scan.setRaw(true); 138 } 139 if (shuffle) { 140 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 141 } 142 if (versions >= 0) { 143 scan.readVersions(versions); 144 } 145 146 if (startRow != null) { 147 scan.withStartRow(Bytes.toBytesBinary(startRow)); 148 } 149 150 if (stopRow != null) { 151 scan.withStopRow(Bytes.toBytesBinary(stopRow)); 152 } 153 154 if (families != null) { 155 String[] fams = families.split(","); 156 Map<String, String> cfRenameMap = new HashMap<>(); 157 for (String fam : fams) { 158 String sourceCf; 159 if (fam.contains(":")) { 160 // fam looks like "sourceCfName:destCfName" 161 String[] srcAndDest = fam.split(":", 2); 162 sourceCf = srcAndDest[0]; 163 String destCf = srcAndDest[1]; 164 cfRenameMap.put(sourceCf, destCf); 165 } else { 166 // fam is just "sourceCf" 167 sourceCf = fam; 168 } 169 scan.addFamily(Bytes.toBytes(sourceCf)); 170 } 171 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 172 } 173 job.setNumReduceTasks(0); 174 175 if (bulkload) { 176 initCopyTableMapperJob(job, scan); 177 178 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 179 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 180 181 bulkloadDir = generateUniqTempDir(false); 182 LOG.info("HFiles will be stored at " + this.bulkloadDir); 183 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 184 try (Connection conn = ConnectionFactory.createConnection(getConf()); 185 Admin admin = conn.getAdmin()) { 186 HFileOutputFormat2.configureIncrementalLoadMap(job, 187 admin.getDescriptor((TableName.valueOf(dstTableName)))); 188 } 189 } else { 190 initCopyTableMapperJob(job, scan); 191 if (peerUri != null) { 192 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri); 193 } else if (peerAddress != null) { 194 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress); 195 } else { 196 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job); 197 } 198 199 } 200 201 return job; 202 } 203 204 /* 205 * @param errorMsg Error message. Can be null. 206 */ 207 private static void printUsage(final String errorMsg) { 208 if (errorMsg != null && errorMsg.length() > 0) { 209 System.err.println("ERROR: " + errorMsg); 210 } 211 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " 212 + "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>"); 213 System.err.println(); 214 System.err.println("Options:"); 215 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 216 System.err.println(" specify if different from current cluster"); 217 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 218 System.err.println(" startrow the start row"); 219 System.err.println(" stoprow the stop row"); 220 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 221 System.err.println(" without endtime means from starttime to forever"); 222 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 223 System.err.println(" versions number of cell versions to copy"); 224 System.err.println(" new.name new table's name"); 225 System.err.println(" peer.uri The URI of the peer cluster"); 226 System.err.println(" peer.adr Address of the peer cluster given in the format"); 227 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 228 + ".port:zookeeper.znode.parent"); 229 System.err.println(" Do not take effect if peer.uri is specified"); 230 System.err.println(" Deprecated, please use peer.uri instead"); 231 System.err.println(" families comma-separated list of families to copy"); 232 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 233 System.err.println(" To keep the same name, just give \"cfName\""); 234 System.err.println(" all.cells also copy delete markers and deleted cells"); 235 System.err 236 .println(" bulkload Write input into HFiles and bulk load to the destination " + "table"); 237 System.err.println(" snapshot Copy the data from snapshot to destination table."); 238 System.err.println(); 239 System.err.println("Args:"); 240 System.err.println(" tablename Name of the table to copy"); 241 System.err.println(); 242 System.err.println("Examples:"); 243 System.err 244 .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 245 System.err.println(" $ hbase " 246 + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " 247 + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 248 System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); 249 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 250 + "--snapshot --new.name=destTable sourceTableSnapshot"); 251 System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); 252 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 253 + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); 254 System.err.println(); 255 System.err.println( 256 " To copy the data of 'TestTable' from the secured local cluster to a non-secured peer" 257 + " cluster (cluster-b)"); 258 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 259 + "-Dhbase.mapred.output.hbase.security.authentication=simple " 260 + "--peer.adr=cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 261 + "2181:/cluster-b" + " TestTable"); 262 System.err.println(); 263 System.err.println( 264 " To copy the data of 'TestTable' from the local secured cluster to a peer secured cluster " 265 + "in a different Kerberos realm."); 266 System.err.println(" Assume cluster-b uses a different Kerberos principal " 267 + "(cluster-b/_HOST@EXAMPLE.COM) for its master and regionserver."); 268 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 269 + "-Dhbase.mapred.output.hbase.regionserver.kerberos.principal=" 270 + "cluster-b/_HOST@EXAMPLE.COM " 271 + "-Dhbase.mapred.output.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM " 272 + "--peer.adr=cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 273 + "2181:/cluster-b" + " TestTable"); 274 System.err.println(); 275 System.err.println( 276 " To copy the data of 'TestTable' from a non-secured local cluster to a secured peer cluster" 277 + " (cluster-b)"); 278 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 279 + "-Dhbase.mapred.output.hbase.security.authentication=kerberos " 280 + "-Dhbase.mapred.output.hbase.regionserver.kerberos.principal=" 281 + "cluster-b/_HOST@EXAMPLE.COM " 282 + "-Dhbase.mapred.output.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM " 283 + "--peer.adr=cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 284 + "2181:/cluster-b" + " TestTable"); 285 System.err.println(); 286 System.err.println("For performance consider the following general option:\n" 287 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 288 + " decreases the round trip time to the server and may increase performance.\n" 289 + " -Dhbase.client.scanner.caching=100\n" 290 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 291 + " inaccurate results.\n" + " -Dmapreduce.map.speculative=false"); 292 } 293 294 private boolean doCommandLine(final String[] args) { 295 if (args.length < 1) { 296 printUsage(null); 297 return false; 298 } 299 for (int i = 0; i < args.length; i++) { 300 String cmd = args[i]; 301 if (cmd.equals("-h") || cmd.startsWith("--h")) { 302 printUsage(null); 303 return false; 304 } 305 306 final String startRowArgKey = "--startrow="; 307 if (cmd.startsWith(startRowArgKey)) { 308 startRow = cmd.substring(startRowArgKey.length()); 309 continue; 310 } 311 312 final String stopRowArgKey = "--stoprow="; 313 if (cmd.startsWith(stopRowArgKey)) { 314 stopRow = cmd.substring(stopRowArgKey.length()); 315 continue; 316 } 317 318 final String startTimeArgKey = "--starttime="; 319 if (cmd.startsWith(startTimeArgKey)) { 320 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 321 continue; 322 } 323 324 final String endTimeArgKey = "--endtime="; 325 if (cmd.startsWith(endTimeArgKey)) { 326 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 327 continue; 328 } 329 330 final String batchArgKey = "--batch="; 331 if (cmd.startsWith(batchArgKey)) { 332 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 333 continue; 334 } 335 336 final String cacheRowArgKey = "--cacheRow="; 337 if (cmd.startsWith(cacheRowArgKey)) { 338 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 339 continue; 340 } 341 342 final String versionsArgKey = "--versions="; 343 if (cmd.startsWith(versionsArgKey)) { 344 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 345 continue; 346 } 347 348 final String newNameArgKey = "--new.name="; 349 if (cmd.startsWith(newNameArgKey)) { 350 dstTableName = cmd.substring(newNameArgKey.length()); 351 continue; 352 } 353 354 final String peerUriArgKey = "--peer.uri="; 355 if (cmd.startsWith(peerUriArgKey)) { 356 try { 357 peerUri = new URI(cmd.substring(peerUriArgKey.length())); 358 } catch (URISyntaxException e) { 359 LOG.error("Malformed peer uri specified: {}", cmd, e); 360 return false; 361 } 362 continue; 363 } 364 365 final String peerAdrArgKey = "--peer.adr="; 366 if (cmd.startsWith(peerAdrArgKey)) { 367 peerAddress = cmd.substring(peerAdrArgKey.length()); 368 continue; 369 } 370 371 final String familiesArgKey = "--families="; 372 if (cmd.startsWith(familiesArgKey)) { 373 families = cmd.substring(familiesArgKey.length()); 374 continue; 375 } 376 377 if (cmd.startsWith("--all.cells")) { 378 allCells = true; 379 continue; 380 } 381 382 if (cmd.startsWith("--bulkload")) { 383 bulkload = true; 384 continue; 385 } 386 387 if (cmd.startsWith("--shuffle")) { 388 shuffle = true; 389 continue; 390 } 391 392 if (cmd.startsWith("--snapshot")) { 393 readingSnapshot = true; 394 continue; 395 } 396 397 if (i == args.length - 1) { 398 if (readingSnapshot) { 399 snapshot = cmd; 400 } else { 401 tableName = cmd; 402 } 403 } else { 404 printUsage("Invalid argument '" + cmd + "'"); 405 return false; 406 } 407 } 408 if (dstTableName == null && peerAddress == null) { 409 printUsage("At least a new table name or a peer address must be specified"); 410 return false; 411 } 412 if ((endTime != 0) && (startTime > endTime)) { 413 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 414 return false; 415 } 416 417 if (bulkload && (peerUri != null || peerAddress != null)) { 418 printUsage("Remote bulkload is not supported!"); 419 return false; 420 } 421 422 if (readingSnapshot && (peerUri != null || peerAddress != null)) { 423 printUsage("Loading data from snapshot to remote peer cluster is not supported."); 424 return false; 425 } 426 427 if (readingSnapshot && dstTableName == null) { 428 printUsage("The --new.name=<table> for destination table should be " 429 + "provided when copying data from snapshot ."); 430 return false; 431 } 432 433 if (readingSnapshot && snapshot == null) { 434 printUsage("Snapshot shouldn't be null when --snapshot is enabled."); 435 return false; 436 } 437 438 // set dstTableName if necessary 439 if (dstTableName == null) { 440 dstTableName = tableName; 441 } 442 return true; 443 } 444 445 /** 446 * Main entry point. 447 * @param args The command line parameters. 448 * @throws Exception When running the job fails. 449 */ 450 public static void main(String[] args) throws Exception { 451 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 452 System.exit(ret); 453 } 454 455 @Override 456 public int run(String[] args) throws Exception { 457 Job job = createSubmittableJob(args); 458 if (job == null) { 459 return 1; 460 } 461 if (!job.waitForCompletion(true)) { 462 LOG.info("Map-reduce job failed!"); 463 if (bulkload) { 464 LOG.info("Files are not bulkloaded!"); 465 } 466 return 1; 467 } 468 int code = 0; 469 if (bulkload) { 470 LOG.info("Trying to bulk load data to destination table: " + dstTableName); 471 LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME, 472 this.bulkloadDir.toString(), this.dstTableName); 473 if ( 474 !BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir) 475 .isEmpty() 476 ) { 477 // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun 478 // BulkLoadHFiles. 479 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 480 if (!fs.delete(this.bulkloadDir, true)) { 481 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 482 code = 1; 483 } 484 } 485 } 486 return code; 487 } 488}