001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.UUID; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 035import org.apache.hadoop.hbase.mapreduce.Import.Importer; 036import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 037import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.CommonFSUtils; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.util.Tool; 042import org.apache.hadoop.util.ToolRunner; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Tool used to copy a table to another one which can be on a different setup. It is also 049 * configurable with a start and time as well as a specification of the region server implementation 050 * if different from the local cluster. 051 */ 052@InterfaceAudience.Public 053public class CopyTable extends Configured implements Tool { 054 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 055 056 final static String NAME = "copytable"; 057 long startTime = 0; 058 long endTime = HConstants.LATEST_TIMESTAMP; 059 int batch = Integer.MAX_VALUE; 060 int cacheRow = -1; 061 int versions = -1; 062 String tableName = null; 063 String startRow = null; 064 String stopRow = null; 065 String dstTableName = null; 066 String peerAddress = null; 067 String families = null; 068 boolean allCells = false; 069 static boolean shuffle = false; 070 071 boolean bulkload = false; 072 Path bulkloadDir = null; 073 074 boolean readingSnapshot = false; 075 String snapshot = null; 076 077 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 078 079 private Path generateUniqTempDir(boolean withDirCreated) throws IOException { 080 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 081 Path dir = new Path(fs.getWorkingDirectory(), NAME); 082 if (!fs.exists(dir)) { 083 fs.mkdirs(dir); 084 } 085 Path newDir = new Path(dir, UUID.randomUUID().toString()); 086 if (withDirCreated) { 087 fs.mkdirs(newDir); 088 } 089 return newDir; 090 } 091 092 private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException { 093 Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class; 094 if (readingSnapshot) { 095 TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, 096 generateUniqTempDir(true)); 097 } else { 098 TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); 099 } 100 } 101 102 /** 103 * Sets up the actual job. 104 * @param args The command line parameters. 105 * @return The newly created job. 106 * @throws IOException When setting up the job fails. 107 */ 108 public Job createSubmittableJob(String[] args) throws IOException { 109 if (!doCommandLine(args)) { 110 return null; 111 } 112 113 String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); 114 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); 115 job.setJarByClass(CopyTable.class); 116 Scan scan = new Scan(); 117 118 scan.setBatch(batch); 119 scan.setCacheBlocks(false); 120 121 if (cacheRow > 0) { 122 scan.setCaching(cacheRow); 123 } else { 124 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 125 } 126 127 scan.setTimeRange(startTime, endTime); 128 129 if (allCells) { 130 scan.setRaw(true); 131 } 132 if (shuffle) { 133 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 134 } 135 if (versions >= 0) { 136 scan.readVersions(versions); 137 } 138 139 if (startRow != null) { 140 scan.withStartRow(Bytes.toBytesBinary(startRow)); 141 } 142 143 if (stopRow != null) { 144 scan.withStopRow(Bytes.toBytesBinary(stopRow)); 145 } 146 147 if (families != null) { 148 String[] fams = families.split(","); 149 Map<String, String> cfRenameMap = new HashMap<>(); 150 for (String fam : fams) { 151 String sourceCf; 152 if (fam.contains(":")) { 153 // fam looks like "sourceCfName:destCfName" 154 String[] srcAndDest = fam.split(":", 2); 155 sourceCf = srcAndDest[0]; 156 String destCf = srcAndDest[1]; 157 cfRenameMap.put(sourceCf, destCf); 158 } else { 159 // fam is just "sourceCf" 160 sourceCf = fam; 161 } 162 scan.addFamily(Bytes.toBytes(sourceCf)); 163 } 164 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 165 } 166 job.setNumReduceTasks(0); 167 168 if (bulkload) { 169 initCopyTableMapperReducerJob(job, scan); 170 171 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 172 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 173 174 bulkloadDir = generateUniqTempDir(false); 175 LOG.info("HFiles will be stored at " + this.bulkloadDir); 176 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 177 try (Connection conn = ConnectionFactory.createConnection(getConf()); 178 Admin admin = conn.getAdmin()) { 179 HFileOutputFormat2.configureIncrementalLoadMap(job, 180 admin.getDescriptor((TableName.valueOf(dstTableName)))); 181 } 182 } else { 183 initCopyTableMapperReducerJob(job, scan); 184 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, 185 null); 186 } 187 188 return job; 189 } 190 191 /* 192 * @param errorMsg Error message. Can be null. 193 */ 194 private static void printUsage(final String errorMsg) { 195 if (errorMsg != null && errorMsg.length() > 0) { 196 System.err.println("ERROR: " + errorMsg); 197 } 198 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " 199 + "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>"); 200 System.err.println(); 201 System.err.println("Options:"); 202 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 203 System.err.println(" specify if different from current cluster"); 204 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 205 System.err.println(" startrow the start row"); 206 System.err.println(" stoprow the stop row"); 207 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 208 System.err.println(" without endtime means from starttime to forever"); 209 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 210 System.err.println(" versions number of cell versions to copy"); 211 System.err.println(" new.name new table's name"); 212 System.err.println(" peer.adr Address of the peer cluster given in the format"); 213 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 214 + ".port:zookeeper.znode.parent"); 215 System.err.println(" families comma-separated list of families to copy"); 216 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 217 System.err.println(" To keep the same name, just give \"cfName\""); 218 System.err.println(" all.cells also copy delete markers and deleted cells"); 219 System.err 220 .println(" bulkload Write input into HFiles and bulk load to the destination " + "table"); 221 System.err.println(" snapshot Copy the data from snapshot to destination table."); 222 System.err.println(); 223 System.err.println("Args:"); 224 System.err.println(" tablename Name of the table to copy"); 225 System.err.println(); 226 System.err.println("Examples:"); 227 System.err 228 .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 229 System.err.println(" $ hbase " 230 + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " 231 + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 232 System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); 233 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 234 + "--snapshot --new.name=destTable sourceTableSnapshot"); 235 System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); 236 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 237 + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); 238 System.err.println("For performance consider the following general option:\n" 239 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 240 + " decreases the round trip time to the server and may increase performance.\n" 241 + " -Dhbase.client.scanner.caching=100\n" 242 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 243 + " inaccurate results.\n" + " -Dmapreduce.map.speculative=false"); 244 } 245 246 private boolean doCommandLine(final String[] args) { 247 if (args.length < 1) { 248 printUsage(null); 249 return false; 250 } 251 try { 252 for (int i = 0; i < args.length; i++) { 253 String cmd = args[i]; 254 if (cmd.equals("-h") || cmd.startsWith("--h")) { 255 printUsage(null); 256 return false; 257 } 258 259 final String startRowArgKey = "--startrow="; 260 if (cmd.startsWith(startRowArgKey)) { 261 startRow = cmd.substring(startRowArgKey.length()); 262 continue; 263 } 264 265 final String stopRowArgKey = "--stoprow="; 266 if (cmd.startsWith(stopRowArgKey)) { 267 stopRow = cmd.substring(stopRowArgKey.length()); 268 continue; 269 } 270 271 final String startTimeArgKey = "--starttime="; 272 if (cmd.startsWith(startTimeArgKey)) { 273 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 274 continue; 275 } 276 277 final String endTimeArgKey = "--endtime="; 278 if (cmd.startsWith(endTimeArgKey)) { 279 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 280 continue; 281 } 282 283 final String batchArgKey = "--batch="; 284 if (cmd.startsWith(batchArgKey)) { 285 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 286 continue; 287 } 288 289 final String cacheRowArgKey = "--cacheRow="; 290 if (cmd.startsWith(cacheRowArgKey)) { 291 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 292 continue; 293 } 294 295 final String versionsArgKey = "--versions="; 296 if (cmd.startsWith(versionsArgKey)) { 297 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 298 continue; 299 } 300 301 final String newNameArgKey = "--new.name="; 302 if (cmd.startsWith(newNameArgKey)) { 303 dstTableName = cmd.substring(newNameArgKey.length()); 304 continue; 305 } 306 307 final String peerAdrArgKey = "--peer.adr="; 308 if (cmd.startsWith(peerAdrArgKey)) { 309 peerAddress = cmd.substring(peerAdrArgKey.length()); 310 continue; 311 } 312 313 final String familiesArgKey = "--families="; 314 if (cmd.startsWith(familiesArgKey)) { 315 families = cmd.substring(familiesArgKey.length()); 316 continue; 317 } 318 319 if (cmd.startsWith("--all.cells")) { 320 allCells = true; 321 continue; 322 } 323 324 if (cmd.startsWith("--bulkload")) { 325 bulkload = true; 326 continue; 327 } 328 329 if (cmd.startsWith("--shuffle")) { 330 shuffle = true; 331 continue; 332 } 333 334 if (cmd.startsWith("--snapshot")) { 335 readingSnapshot = true; 336 continue; 337 } 338 339 if (i == args.length - 1) { 340 if (readingSnapshot) { 341 snapshot = cmd; 342 } else { 343 tableName = cmd; 344 } 345 } else { 346 printUsage("Invalid argument '" + cmd + "'"); 347 return false; 348 } 349 } 350 if (dstTableName == null && peerAddress == null) { 351 printUsage("At least a new table name or a peer address must be specified"); 352 return false; 353 } 354 if ((endTime != 0) && (startTime > endTime)) { 355 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 356 return false; 357 } 358 359 if (bulkload && peerAddress != null) { 360 printUsage("Remote bulkload is not supported!"); 361 return false; 362 } 363 364 if (readingSnapshot && peerAddress != null) { 365 printUsage("Loading data from snapshot to remote peer cluster is not supported."); 366 return false; 367 } 368 369 if (readingSnapshot && dstTableName == null) { 370 printUsage("The --new.name=<table> for destination table should be " 371 + "provided when copying data from snapshot ."); 372 return false; 373 } 374 375 if (readingSnapshot && snapshot == null) { 376 printUsage("Snapshot shouldn't be null when --snapshot is enabled."); 377 return false; 378 } 379 380 // set dstTableName if necessary 381 if (dstTableName == null) { 382 dstTableName = tableName; 383 } 384 } catch (Exception e) { 385 LOG.error("Failed to parse commandLine arguments", e); 386 printUsage("Can't start because " + e.getMessage()); 387 return false; 388 } 389 return true; 390 } 391 392 /** 393 * Main entry point. 394 * @param args The command line parameters. 395 * @throws Exception When running the job fails. 396 */ 397 public static void main(String[] args) throws Exception { 398 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 399 System.exit(ret); 400 } 401 402 @Override 403 public int run(String[] args) throws Exception { 404 Job job = createSubmittableJob(args); 405 if (job == null) return 1; 406 if (!job.waitForCompletion(true)) { 407 LOG.info("Map-reduce job failed!"); 408 if (bulkload) { 409 LOG.info("Files are not bulkloaded!"); 410 } 411 return 1; 412 } 413 int code = 0; 414 if (bulkload) { 415 LOG.info("Trying to bulk load data to destination table: " + dstTableName); 416 LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME, 417 this.bulkloadDir.toString(), this.dstTableName); 418 if ( 419 !BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir) 420 .isEmpty() 421 ) { 422 // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun 423 // BulkLoadHFiles. 424 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 425 if (!fs.delete(this.bulkloadDir, true)) { 426 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 427 code = 1; 428 } 429 } 430 } 431 return code; 432 } 433}