001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.UUID; 025 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 033import org.apache.hadoop.hbase.mapreduce.Import.Importer; 034import org.apache.hadoop.hbase.util.FSUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.util.Tool; 046import org.apache.hadoop.util.ToolRunner; 047 048/** 049 * Tool used to copy a table to another one which can be on a different setup. 050 * It is also configurable with a start and time as well as a specification 051 * of the region server implementation if different from the local cluster. 052 */ 053@InterfaceAudience.Public 054public class CopyTable extends Configured implements Tool { 055 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 056 057 final static String NAME = "copytable"; 058 long startTime = 0; 059 long endTime = HConstants.LATEST_TIMESTAMP; 060 int batch = Integer.MAX_VALUE; 061 int cacheRow = -1; 062 int versions = -1; 063 String tableName = null; 064 String startRow = null; 065 String stopRow = null; 066 String dstTableName = null; 067 String peerAddress = null; 068 String families = null; 069 boolean allCells = false; 070 static boolean shuffle = false; 071 072 boolean bulkload = false; 073 Path bulkloadDir = null; 074 075 boolean readingSnapshot = false; 076 String snapshot = null; 077 078 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 079 080 private Path generateUniqTempDir(boolean withDirCreated) throws IOException { 081 FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); 082 Path dir = new Path(fs.getWorkingDirectory(), NAME); 083 if (!fs.exists(dir)) { 084 fs.mkdirs(dir); 085 } 086 Path newDir = new Path(dir, UUID.randomUUID().toString()); 087 if (withDirCreated) { 088 fs.mkdirs(newDir); 089 } 090 return newDir; 091 } 092 093 private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException { 094 Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class; 095 if (readingSnapshot) { 096 TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, 097 generateUniqTempDir(true)); 098 } else { 099 TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); 100 } 101 } 102 103 /** 104 * Sets up the actual job. 105 * 106 * @param args The command line parameters. 107 * @return The newly created job. 108 * @throws IOException When setting up the job fails. 109 */ 110 public Job createSubmittableJob(String[] args) throws IOException { 111 if (!doCommandLine(args)) { 112 return null; 113 } 114 115 String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); 116 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); 117 job.setJarByClass(CopyTable.class); 118 Scan scan = new Scan(); 119 120 scan.setBatch(batch); 121 scan.setCacheBlocks(false); 122 123 if (cacheRow > 0) { 124 scan.setCaching(cacheRow); 125 } else { 126 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 127 } 128 129 scan.setTimeRange(startTime, endTime); 130 131 if (allCells) { 132 scan.setRaw(true); 133 } 134 if (shuffle) { 135 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 136 } 137 if (versions >= 0) { 138 scan.readVersions(versions); 139 } 140 141 if (startRow != null) { 142 scan.withStartRow(Bytes.toBytesBinary(startRow)); 143 } 144 145 if (stopRow != null) { 146 scan.withStopRow(Bytes.toBytesBinary(stopRow)); 147 } 148 149 if(families != null) { 150 String[] fams = families.split(","); 151 Map<String,String> cfRenameMap = new HashMap<>(); 152 for(String fam : fams) { 153 String sourceCf; 154 if(fam.contains(":")) { 155 // fam looks like "sourceCfName:destCfName" 156 String[] srcAndDest = fam.split(":", 2); 157 sourceCf = srcAndDest[0]; 158 String destCf = srcAndDest[1]; 159 cfRenameMap.put(sourceCf, destCf); 160 } else { 161 // fam is just "sourceCf" 162 sourceCf = fam; 163 } 164 scan.addFamily(Bytes.toBytes(sourceCf)); 165 } 166 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 167 } 168 job.setNumReduceTasks(0); 169 170 if (bulkload) { 171 initCopyTableMapperReducerJob(job, scan); 172 173 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 174 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 175 176 bulkloadDir = generateUniqTempDir(false); 177 LOG.info("HFiles will be stored at " + this.bulkloadDir); 178 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 179 try (Connection conn = ConnectionFactory.createConnection(getConf()); 180 Admin admin = conn.getAdmin()) { 181 HFileOutputFormat2.configureIncrementalLoadMap(job, 182 admin.getDescriptor((TableName.valueOf(dstTableName)))); 183 } 184 } else { 185 initCopyTableMapperReducerJob(job, scan); 186 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, 187 null); 188 } 189 190 return job; 191 } 192 193 /* 194 * @param errorMsg Error message. Can be null. 195 */ 196 private static void printUsage(final String errorMsg) { 197 if (errorMsg != null && errorMsg.length() > 0) { 198 System.err.println("ERROR: " + errorMsg); 199 } 200 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + 201 "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>"); 202 System.err.println(); 203 System.err.println("Options:"); 204 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 205 System.err.println(" specify if different from current cluster"); 206 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 207 System.err.println(" startrow the start row"); 208 System.err.println(" stoprow the stop row"); 209 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 210 System.err.println(" without endtime means from starttime to forever"); 211 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 212 System.err.println(" versions number of cell versions to copy"); 213 System.err.println(" new.name new table's name"); 214 System.err.println(" peer.adr Address of the peer cluster given in the format"); 215 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 216 + ".port:zookeeper.znode.parent"); 217 System.err.println(" families comma-separated list of families to copy"); 218 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 219 System.err.println(" To keep the same name, just give \"cfName\""); 220 System.err.println(" all.cells also copy delete markers and deleted cells"); 221 System.err.println(" bulkload Write input into HFiles and bulk load to the destination " 222 + "table"); 223 System.err.println(" snapshot Copy the data from snapshot to destination table."); 224 System.err.println(); 225 System.err.println("Args:"); 226 System.err.println(" tablename Name of the table to copy"); 227 System.err.println(); 228 System.err.println("Examples:"); 229 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 230 System.err.println(" $ hbase " + 231 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + 232 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 233 System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); 234 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 235 + "--snapshot --new.name=destTable sourceTableSnapshot"); 236 System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); 237 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 238 + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); 239 System.err.println("For performance consider the following general option:\n" 240 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 241 + " decreases the round trip time to the server and may increase performance.\n" 242 + " -Dhbase.client.scanner.caching=100\n" 243 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 244 + " inaccurate results.\n" 245 + " -Dmapreduce.map.speculative=false"); 246 } 247 248 private boolean doCommandLine(final String[] args) { 249 if (args.length < 1) { 250 printUsage(null); 251 return false; 252 } 253 try { 254 for (int i = 0; i < args.length; i++) { 255 String cmd = args[i]; 256 if (cmd.equals("-h") || cmd.startsWith("--h")) { 257 printUsage(null); 258 return false; 259 } 260 261 final String startRowArgKey = "--startrow="; 262 if (cmd.startsWith(startRowArgKey)) { 263 startRow = cmd.substring(startRowArgKey.length()); 264 continue; 265 } 266 267 final String stopRowArgKey = "--stoprow="; 268 if (cmd.startsWith(stopRowArgKey)) { 269 stopRow = cmd.substring(stopRowArgKey.length()); 270 continue; 271 } 272 273 final String startTimeArgKey = "--starttime="; 274 if (cmd.startsWith(startTimeArgKey)) { 275 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 276 continue; 277 } 278 279 final String endTimeArgKey = "--endtime="; 280 if (cmd.startsWith(endTimeArgKey)) { 281 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 282 continue; 283 } 284 285 final String batchArgKey = "--batch="; 286 if (cmd.startsWith(batchArgKey)) { 287 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 288 continue; 289 } 290 291 final String cacheRowArgKey = "--cacheRow="; 292 if (cmd.startsWith(cacheRowArgKey)) { 293 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 294 continue; 295 } 296 297 final String versionsArgKey = "--versions="; 298 if (cmd.startsWith(versionsArgKey)) { 299 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 300 continue; 301 } 302 303 final String newNameArgKey = "--new.name="; 304 if (cmd.startsWith(newNameArgKey)) { 305 dstTableName = cmd.substring(newNameArgKey.length()); 306 continue; 307 } 308 309 final String peerAdrArgKey = "--peer.adr="; 310 if (cmd.startsWith(peerAdrArgKey)) { 311 peerAddress = cmd.substring(peerAdrArgKey.length()); 312 continue; 313 } 314 315 final String familiesArgKey = "--families="; 316 if (cmd.startsWith(familiesArgKey)) { 317 families = cmd.substring(familiesArgKey.length()); 318 continue; 319 } 320 321 if (cmd.startsWith("--all.cells")) { 322 allCells = true; 323 continue; 324 } 325 326 if (cmd.startsWith("--bulkload")) { 327 bulkload = true; 328 continue; 329 } 330 331 if (cmd.startsWith("--shuffle")) { 332 shuffle = true; 333 continue; 334 } 335 336 if(cmd.startsWith("--snapshot")){ 337 readingSnapshot = true; 338 continue; 339 } 340 341 if (i == args.length - 1) { 342 if (readingSnapshot) { 343 snapshot = cmd; 344 } else { 345 tableName = cmd; 346 } 347 } else { 348 printUsage("Invalid argument '" + cmd + "'"); 349 return false; 350 } 351 } 352 if (dstTableName == null && peerAddress == null) { 353 printUsage("At least a new table name or a peer address must be specified"); 354 return false; 355 } 356 if ((endTime != 0) && (startTime > endTime)) { 357 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 358 return false; 359 } 360 361 if (bulkload && peerAddress != null) { 362 printUsage("Remote bulkload is not supported!"); 363 return false; 364 } 365 366 if (readingSnapshot && peerAddress != null) { 367 printUsage("Loading data from snapshot to remote peer cluster is not supported."); 368 return false; 369 } 370 371 if (readingSnapshot && dstTableName == null) { 372 printUsage("The --new.name=<table> for destination table should be " 373 + "provided when copying data from snapshot ."); 374 return false; 375 } 376 377 if (readingSnapshot && snapshot == null) { 378 printUsage("Snapshot shouldn't be null when --snapshot is enabled."); 379 return false; 380 } 381 382 // set dstTableName if necessary 383 if (dstTableName == null) { 384 dstTableName = tableName; 385 } 386 } catch (Exception e) { 387 e.printStackTrace(); 388 printUsage("Can't start because " + e.getMessage()); 389 return false; 390 } 391 return true; 392 } 393 394 /** 395 * Main entry point. 396 * 397 * @param args The command line parameters. 398 * @throws Exception When running the job fails. 399 */ 400 public static void main(String[] args) throws Exception { 401 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 402 System.exit(ret); 403 } 404 405 @Override 406 public int run(String[] args) throws Exception { 407 Job job = createSubmittableJob(args); 408 if (job == null) return 1; 409 if (!job.waitForCompletion(true)) { 410 LOG.info("Map-reduce job failed!"); 411 if (bulkload) { 412 LOG.info("Files are not bulkloaded!"); 413 } 414 return 1; 415 } 416 int code = 0; 417 if (bulkload) { 418 LOG.info("Trying to bulk load data to destination table: " + dstTableName); 419 LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}", 420 this.bulkloadDir.toString(), this.dstTableName); 421 code = new LoadIncrementalHFiles(this.getConf()) 422 .run(new String[] { this.bulkloadDir.toString(), this.dstTableName }); 423 if (code == 0) { 424 // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun 425 // LoadIncrementalHFiles. 426 FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); 427 if (!fs.delete(this.bulkloadDir, true)) { 428 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 429 code = 1; 430 } 431 } 432 } 433 return code; 434 } 435}