001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import java.util.UUID; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; 035import org.apache.hadoop.hbase.mapreduce.Import.Importer; 036import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.CommonFSUtils; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.util.Tool; 041import org.apache.hadoop.util.ToolRunner; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Tool used to copy a table to another one which can be on a different setup. It is also 048 * configurable with a start and time as well as a specification of the region server implementation 049 * if different from the local cluster. 050 */ 051@InterfaceAudience.Public 052public class CopyTable extends Configured implements Tool { 053 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 054 055 final static String NAME = "copytable"; 056 long startTime = 0; 057 long endTime = HConstants.LATEST_TIMESTAMP; 058 int batch = Integer.MAX_VALUE; 059 int cacheRow = -1; 060 int versions = -1; 061 String tableName = null; 062 String startRow = null; 063 String stopRow = null; 064 String dstTableName = null; 065 String peerAddress = null; 066 String families = null; 067 boolean allCells = false; 068 static boolean shuffle = false; 069 070 boolean bulkload = false; 071 Path bulkloadDir = null; 072 073 boolean readingSnapshot = false; 074 String snapshot = null; 075 076 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 077 078 private Path generateUniqTempDir(boolean withDirCreated) throws IOException { 079 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 080 Path dir = new Path(fs.getWorkingDirectory(), NAME); 081 if (!fs.exists(dir)) { 082 fs.mkdirs(dir); 083 } 084 Path newDir = new Path(dir, UUID.randomUUID().toString()); 085 if (withDirCreated) { 086 fs.mkdirs(newDir); 087 } 088 return newDir; 089 } 090 091 private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException { 092 Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class; 093 if (readingSnapshot) { 094 TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, 095 generateUniqTempDir(true)); 096 } else { 097 TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); 098 } 099 } 100 101 /** 102 * Sets up the actual job. 103 * @param args The command line parameters. 104 * @return The newly created job. 105 * @throws IOException When setting up the job fails. 106 */ 107 public Job createSubmittableJob(String[] args) throws IOException { 108 if (!doCommandLine(args)) { 109 return null; 110 } 111 112 String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); 113 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); 114 job.setJarByClass(CopyTable.class); 115 Scan scan = new Scan(); 116 117 scan.setBatch(batch); 118 scan.setCacheBlocks(false); 119 120 if (cacheRow > 0) { 121 scan.setCaching(cacheRow); 122 } else { 123 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 124 } 125 126 scan.setTimeRange(startTime, endTime); 127 128 if (allCells) { 129 scan.setRaw(true); 130 } 131 if (shuffle) { 132 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 133 } 134 if (versions >= 0) { 135 scan.readVersions(versions); 136 } 137 138 if (startRow != null) { 139 scan.withStartRow(Bytes.toBytesBinary(startRow)); 140 } 141 142 if (stopRow != null) { 143 scan.withStopRow(Bytes.toBytesBinary(stopRow)); 144 } 145 146 if (families != null) { 147 String[] fams = families.split(","); 148 Map<String, String> cfRenameMap = new HashMap<>(); 149 for (String fam : fams) { 150 String sourceCf; 151 if (fam.contains(":")) { 152 // fam looks like "sourceCfName:destCfName" 153 String[] srcAndDest = fam.split(":", 2); 154 sourceCf = srcAndDest[0]; 155 String destCf = srcAndDest[1]; 156 cfRenameMap.put(sourceCf, destCf); 157 } else { 158 // fam is just "sourceCf" 159 sourceCf = fam; 160 } 161 scan.addFamily(Bytes.toBytes(sourceCf)); 162 } 163 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 164 } 165 job.setNumReduceTasks(0); 166 167 if (bulkload) { 168 initCopyTableMapperReducerJob(job, scan); 169 170 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 171 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 172 173 bulkloadDir = generateUniqTempDir(false); 174 LOG.info("HFiles will be stored at " + this.bulkloadDir); 175 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 176 try (Connection conn = ConnectionFactory.createConnection(getConf()); 177 Admin admin = conn.getAdmin()) { 178 HFileOutputFormat2.configureIncrementalLoadMap(job, 179 admin.getDescriptor((TableName.valueOf(dstTableName)))); 180 } 181 } else { 182 initCopyTableMapperReducerJob(job, scan); 183 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, 184 null); 185 } 186 187 return job; 188 } 189 190 /* 191 * @param errorMsg Error message. Can be null. 192 */ 193 private static void printUsage(final String errorMsg) { 194 if (errorMsg != null && errorMsg.length() > 0) { 195 System.err.println("ERROR: " + errorMsg); 196 } 197 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " 198 + "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>"); 199 System.err.println(); 200 System.err.println("Options:"); 201 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 202 System.err.println(" specify if different from current cluster"); 203 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 204 System.err.println(" startrow the start row"); 205 System.err.println(" stoprow the stop row"); 206 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 207 System.err.println(" without endtime means from starttime to forever"); 208 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 209 System.err.println(" versions number of cell versions to copy"); 210 System.err.println(" new.name new table's name"); 211 System.err.println(" peer.adr Address of the peer cluster given in the format"); 212 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 213 + ".port:zookeeper.znode.parent"); 214 System.err.println(" families comma-separated list of families to copy"); 215 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 216 System.err.println(" To keep the same name, just give \"cfName\""); 217 System.err.println(" all.cells also copy delete markers and deleted cells"); 218 System.err 219 .println(" bulkload Write input into HFiles and bulk load to the destination " + "table"); 220 System.err.println(" snapshot Copy the data from snapshot to destination table."); 221 System.err.println(); 222 System.err.println("Args:"); 223 System.err.println(" tablename Name of the table to copy"); 224 System.err.println(); 225 System.err.println("Examples:"); 226 System.err 227 .println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 228 System.err.println(" $ hbase " 229 + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " 230 + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 231 System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); 232 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 233 + "--snapshot --new.name=destTable sourceTableSnapshot"); 234 System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); 235 System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " 236 + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); 237 System.err.println("For performance consider the following general option:\n" 238 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 239 + " decreases the round trip time to the server and may increase performance.\n" 240 + " -Dhbase.client.scanner.caching=100\n" 241 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 242 + " inaccurate results.\n" + " -Dmapreduce.map.speculative=false"); 243 } 244 245 private boolean doCommandLine(final String[] args) { 246 if (args.length < 1) { 247 printUsage(null); 248 return false; 249 } 250 try { 251 for (int i = 0; i < args.length; i++) { 252 String cmd = args[i]; 253 if (cmd.equals("-h") || cmd.startsWith("--h")) { 254 printUsage(null); 255 return false; 256 } 257 258 final String startRowArgKey = "--startrow="; 259 if (cmd.startsWith(startRowArgKey)) { 260 startRow = cmd.substring(startRowArgKey.length()); 261 continue; 262 } 263 264 final String stopRowArgKey = "--stoprow="; 265 if (cmd.startsWith(stopRowArgKey)) { 266 stopRow = cmd.substring(stopRowArgKey.length()); 267 continue; 268 } 269 270 final String startTimeArgKey = "--starttime="; 271 if (cmd.startsWith(startTimeArgKey)) { 272 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 273 continue; 274 } 275 276 final String endTimeArgKey = "--endtime="; 277 if (cmd.startsWith(endTimeArgKey)) { 278 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 279 continue; 280 } 281 282 final String batchArgKey = "--batch="; 283 if (cmd.startsWith(batchArgKey)) { 284 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 285 continue; 286 } 287 288 final String cacheRowArgKey = "--cacheRow="; 289 if (cmd.startsWith(cacheRowArgKey)) { 290 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 291 continue; 292 } 293 294 final String versionsArgKey = "--versions="; 295 if (cmd.startsWith(versionsArgKey)) { 296 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 297 continue; 298 } 299 300 final String newNameArgKey = "--new.name="; 301 if (cmd.startsWith(newNameArgKey)) { 302 dstTableName = cmd.substring(newNameArgKey.length()); 303 continue; 304 } 305 306 final String peerAdrArgKey = "--peer.adr="; 307 if (cmd.startsWith(peerAdrArgKey)) { 308 peerAddress = cmd.substring(peerAdrArgKey.length()); 309 continue; 310 } 311 312 final String familiesArgKey = "--families="; 313 if (cmd.startsWith(familiesArgKey)) { 314 families = cmd.substring(familiesArgKey.length()); 315 continue; 316 } 317 318 if (cmd.startsWith("--all.cells")) { 319 allCells = true; 320 continue; 321 } 322 323 if (cmd.startsWith("--bulkload")) { 324 bulkload = true; 325 continue; 326 } 327 328 if (cmd.startsWith("--shuffle")) { 329 shuffle = true; 330 continue; 331 } 332 333 if (cmd.startsWith("--snapshot")) { 334 readingSnapshot = true; 335 continue; 336 } 337 338 if (i == args.length - 1) { 339 if (readingSnapshot) { 340 snapshot = cmd; 341 } else { 342 tableName = cmd; 343 } 344 } else { 345 printUsage("Invalid argument '" + cmd + "'"); 346 return false; 347 } 348 } 349 if (dstTableName == null && peerAddress == null) { 350 printUsage("At least a new table name or a peer address must be specified"); 351 return false; 352 } 353 if ((endTime != 0) && (startTime > endTime)) { 354 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 355 return false; 356 } 357 358 if (bulkload && peerAddress != null) { 359 printUsage("Remote bulkload is not supported!"); 360 return false; 361 } 362 363 if (readingSnapshot && peerAddress != null) { 364 printUsage("Loading data from snapshot to remote peer cluster is not supported."); 365 return false; 366 } 367 368 if (readingSnapshot && dstTableName == null) { 369 printUsage("The --new.name=<table> for destination table should be " 370 + "provided when copying data from snapshot ."); 371 return false; 372 } 373 374 if (readingSnapshot && snapshot == null) { 375 printUsage("Snapshot shouldn't be null when --snapshot is enabled."); 376 return false; 377 } 378 379 // set dstTableName if necessary 380 if (dstTableName == null) { 381 dstTableName = tableName; 382 } 383 } catch (Exception e) { 384 e.printStackTrace(); 385 printUsage("Can't start because " + e.getMessage()); 386 return false; 387 } 388 return true; 389 } 390 391 /** 392 * Main entry point. 393 * @param args The command line parameters. 394 * @throws Exception When running the job fails. 395 */ 396 public static void main(String[] args) throws Exception { 397 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 398 System.exit(ret); 399 } 400 401 @Override 402 public int run(String[] args) throws Exception { 403 Job job = createSubmittableJob(args); 404 if (job == null) return 1; 405 if (!job.waitForCompletion(true)) { 406 LOG.info("Map-reduce job failed!"); 407 if (bulkload) { 408 LOG.info("Files are not bulkloaded!"); 409 } 410 return 1; 411 } 412 int code = 0; 413 if (bulkload) { 414 LOG.info("Trying to bulk load data to destination table: " + dstTableName); 415 LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}", 416 this.bulkloadDir.toString(), this.dstTableName); 417 code = new LoadIncrementalHFiles(this.getConf()) 418 .run(new String[] { this.bulkloadDir.toString(), this.dstTableName }); 419 if (code == 0) { 420 // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun 421 // LoadIncrementalHFiles. 422 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 423 if (!fs.delete(this.bulkloadDir, true)) { 424 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 425 code = 1; 426 } 427 } 428 } 429 return code; 430 } 431}