001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configured; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 036import org.apache.hadoop.hbase.mapreduce.Import.Importer; 037import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.CommonFSUtils; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.util.Tool; 042import org.apache.hadoop.util.ToolRunner; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Tool used to copy a table to another one which can be on a different setup. 049 * It is also configurable with a start and time as well as a specification 050 * of the region server implementation if different from the local cluster. 051 */ 052@InterfaceAudience.Public 053public class CopyTable extends Configured implements Tool { 054 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 055 056 final static String NAME = "copytable"; 057 long startTime = 0; 058 long endTime = HConstants.LATEST_TIMESTAMP; 059 int batch = Integer.MAX_VALUE; 060 int cacheRow = -1; 061 int versions = -1; 062 String tableName = null; 063 String startRow = null; 064 String stopRow = null; 065 String dstTableName = null; 066 String peerAddress = null; 067 String families = null; 068 boolean allCells = false; 069 static boolean shuffle = false; 070 071 boolean bulkload = false; 072 Path bulkloadDir = null; 073 074 boolean readingSnapshot = false; 075 String snapshot = null; 076 077 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 078 079 private Path generateUniqTempDir(boolean withDirCreated) throws IOException { 080 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 081 Path dir = new Path(fs.getWorkingDirectory(), NAME); 082 if (!fs.exists(dir)) { 083 fs.mkdirs(dir); 084 } 085 Path newDir = new Path(dir, UUID.randomUUID().toString()); 086 if (withDirCreated) { 087 fs.mkdirs(newDir); 088 } 089 return newDir; 090 } 091 092 private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException { 093 Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class; 094 if (readingSnapshot) { 095 TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, 096 generateUniqTempDir(true)); 097 } else { 098 TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); 099 } 100 } 101 102 /** 103 * Sets up the actual job. 104 * 105 * @param args The command line parameters. 106 * @return The newly created job. 107 * @throws IOException When setting up the job fails. 108 */ 109 public Job createSubmittableJob(String[] args) throws IOException { 110 if (!doCommandLine(args)) { 111 return null; 112 } 113 114 String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); 115 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); 116 job.setJarByClass(CopyTable.class); 117 Scan scan = new Scan(); 118 119 scan.setBatch(batch); 120 scan.setCacheBlocks(false); 121 122 if (cacheRow > 0) { 123 scan.setCaching(cacheRow); 124 } else { 125 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 126 } 127 128 scan.setTimeRange(startTime, endTime); 129 130 if (allCells) { 131 scan.setRaw(true); 132 } 133 if (shuffle) { 134 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 135 } 136 if (versions >= 0) { 137 scan.readVersions(versions); 138 } 139 140 if (startRow != null) { 141 scan.withStartRow(Bytes.toBytesBinary(startRow)); 142 } 143 144 if (stopRow != null) { 145 scan.withStopRow(Bytes.toBytesBinary(stopRow)); 146 } 147 148 if(families != null) { 149 String[] fams = families.split(","); 150 Map<String,String> cfRenameMap = new HashMap<>(); 151 for(String fam : fams) { 152 String sourceCf; 153 if(fam.contains(":")) { 154 // fam looks like "sourceCfName:destCfName" 155 String[] srcAndDest = fam.split(":", 2); 156 sourceCf = srcAndDest[0]; 157 String destCf = srcAndDest[1]; 158 cfRenameMap.put(sourceCf, destCf); 159 } else { 160 // fam is just "sourceCf" 161 sourceCf = fam; 162 } 163 scan.addFamily(Bytes.toBytes(sourceCf)); 164 } 165 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 166 } 167 job.setNumReduceTasks(0); 168 169 if (bulkload) { 170 initCopyTableMapperReducerJob(job, scan); 171 172 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 173 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 174 175 bulkloadDir = generateUniqTempDir(false); 176 LOG.info("HFiles will be stored at " + this.bulkloadDir); 177 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 178 try (Connection conn = ConnectionFactory.createConnection(getConf()); 179 Admin admin = conn.getAdmin()) { 180 HFileOutputFormat2.configureIncrementalLoadMap(job, 181 admin.getDescriptor((TableName.valueOf(dstTableName)))); 182 } 183 } else { 184 initCopyTableMapperReducerJob(job, scan); 185 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, 186 null); 187 } 188 189 return job; 190 } 191 192 /* 193 * @param errorMsg Error message. Can be null. 194 */ 195 private static void printUsage(final String errorMsg) { 196 if (errorMsg != null && errorMsg.length() > 0) { 197 System.err.println("ERROR: " + errorMsg); 198 } 199 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + 200 "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>"); 201 System.err.println(); 202 System.err.println("Options:"); 203 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 204 System.err.println(" specify if different from current cluster"); 205 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 206 System.err.println(" startrow the start row"); 207 System.err.println(" stoprow the stop row"); 208 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 209 System.err.println(" without endtime means from starttime to forever"); 210 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 211 System.err.println(" versions number of cell versions to copy"); 212 System.err.println(" new.name new table's name"); 213 System.err.println(" peer.adr Address of the peer cluster given in the format"); 214 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 215 + ".port:zookeeper.znode.parent"); 216 System.err.println(" families comma-separated list of families to copy"); 217 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 218 System.err.println(" To keep the same name, just give \"cfName\""); 219 System.err.println(" all.cells also copy delete markers and deleted cells"); 220 System.err.println(" bulkload Write input into HFiles and bulk load to the destination " 221 + "table"); 222 System.err.println(" snapshot Copy the data from snapshot to destination table."); 223 System.err.println(); 224 System.err.println("Args:"); 225 System.err.println(" tablename Name of the table to copy"); 226 System.err.println(); 227 System.err.println("Examples:"); 228 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 229 System.err.println(" $ hbase " + 230 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + 231 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 232 System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); 233 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 234 + "--snapshot --new.name=destTable sourceTableSnapshot"); 235 System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); 236 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 237 + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); 238 System.err.println("For performance consider the following general option:\n" 239 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 240 + " decreases the round trip time to the server and may increase performance.\n" 241 + " -Dhbase.client.scanner.caching=100\n" 242 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 243 + " inaccurate results.\n" 244 + " -Dmapreduce.map.speculative=false"); 245 } 246 247 private boolean doCommandLine(final String[] args) { 248 if (args.length < 1) { 249 printUsage(null); 250 return false; 251 } 252 try { 253 for (int i = 0; i < args.length; i++) { 254 String cmd = args[i]; 255 if (cmd.equals("-h") || cmd.startsWith("--h")) { 256 printUsage(null); 257 return false; 258 } 259 260 final String startRowArgKey = "--startrow="; 261 if (cmd.startsWith(startRowArgKey)) { 262 startRow = cmd.substring(startRowArgKey.length()); 263 continue; 264 } 265 266 final String stopRowArgKey = "--stoprow="; 267 if (cmd.startsWith(stopRowArgKey)) { 268 stopRow = cmd.substring(stopRowArgKey.length()); 269 continue; 270 } 271 272 final String startTimeArgKey = "--starttime="; 273 if (cmd.startsWith(startTimeArgKey)) { 274 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 275 continue; 276 } 277 278 final String endTimeArgKey = "--endtime="; 279 if (cmd.startsWith(endTimeArgKey)) { 280 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 281 continue; 282 } 283 284 final String batchArgKey = "--batch="; 285 if (cmd.startsWith(batchArgKey)) { 286 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 287 continue; 288 } 289 290 final String cacheRowArgKey = "--cacheRow="; 291 if (cmd.startsWith(cacheRowArgKey)) { 292 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 293 continue; 294 } 295 296 final String versionsArgKey = "--versions="; 297 if (cmd.startsWith(versionsArgKey)) { 298 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 299 continue; 300 } 301 302 final String newNameArgKey = "--new.name="; 303 if (cmd.startsWith(newNameArgKey)) { 304 dstTableName = cmd.substring(newNameArgKey.length()); 305 continue; 306 } 307 308 final String peerAdrArgKey = "--peer.adr="; 309 if (cmd.startsWith(peerAdrArgKey)) { 310 peerAddress = cmd.substring(peerAdrArgKey.length()); 311 continue; 312 } 313 314 final String familiesArgKey = "--families="; 315 if (cmd.startsWith(familiesArgKey)) { 316 families = cmd.substring(familiesArgKey.length()); 317 continue; 318 } 319 320 if (cmd.startsWith("--all.cells")) { 321 allCells = true; 322 continue; 323 } 324 325 if (cmd.startsWith("--bulkload")) { 326 bulkload = true; 327 continue; 328 } 329 330 if (cmd.startsWith("--shuffle")) { 331 shuffle = true; 332 continue; 333 } 334 335 if(cmd.startsWith("--snapshot")){ 336 readingSnapshot = true; 337 continue; 338 } 339 340 if (i == args.length - 1) { 341 if (readingSnapshot) { 342 snapshot = cmd; 343 } else { 344 tableName = cmd; 345 } 346 } else { 347 printUsage("Invalid argument '" + cmd + "'"); 348 return false; 349 } 350 } 351 if (dstTableName == null && peerAddress == null) { 352 printUsage("At least a new table name or a peer address must be specified"); 353 return false; 354 } 355 if ((endTime != 0) && (startTime > endTime)) { 356 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 357 return false; 358 } 359 360 if (bulkload && peerAddress != null) { 361 printUsage("Remote bulkload is not supported!"); 362 return false; 363 } 364 365 if (readingSnapshot && peerAddress != null) { 366 printUsage("Loading data from snapshot to remote peer cluster is not supported."); 367 return false; 368 } 369 370 if (readingSnapshot && dstTableName == null) { 371 printUsage("The --new.name=<table> for destination table should be " 372 + "provided when copying data from snapshot ."); 373 return false; 374 } 375 376 if (readingSnapshot && snapshot == null) { 377 printUsage("Snapshot shouldn't be null when --snapshot is enabled."); 378 return false; 379 } 380 381 // set dstTableName if necessary 382 if (dstTableName == null) { 383 dstTableName = tableName; 384 } 385 } catch (Exception e) { 386 e.printStackTrace(); 387 printUsage("Can't start because " + e.getMessage()); 388 return false; 389 } 390 return true; 391 } 392 393 /** 394 * Main entry point. 395 * 396 * @param args The command line parameters. 397 * @throws Exception When running the job fails. 398 */ 399 public static void main(String[] args) throws Exception { 400 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 401 System.exit(ret); 402 } 403 404 @Override 405 public int run(String[] args) throws Exception { 406 Job job = createSubmittableJob(args); 407 if (job == null) return 1; 408 if (!job.waitForCompletion(true)) { 409 LOG.info("Map-reduce job failed!"); 410 if (bulkload) { 411 LOG.info("Files are not bulkloaded!"); 412 } 413 return 1; 414 } 415 int code = 0; 416 if (bulkload) { 417 LOG.info("Trying to bulk load data to destination table: " + dstTableName); 418 LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}", 419 this.bulkloadDir.toString(), this.dstTableName); 420 code = new LoadIncrementalHFiles(this.getConf()) 421 .run(new String[] { this.bulkloadDir.toString(), this.dstTableName }); 422 if (code == 0) { 423 // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun 424 // LoadIncrementalHFiles. 425 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 426 if (!fs.delete(this.bulkloadDir, true)) { 427 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 428 code = 1; 429 } 430 } 431 } 432 return code; 433 } 434}