001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.Random; 025 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.util.FSUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.util.Tool; 044import org.apache.hadoop.util.ToolRunner; 045 046/** 047 * Tool used to copy a table to another one which can be on a different setup. 048 * It is also configurable with a start and time as well as a specification 049 * of the region server implementation if different from the local cluster. 050 */ 051@InterfaceAudience.Public 052public class CopyTable extends Configured implements Tool { 053 private static final Logger LOG = LoggerFactory.getLogger(CopyTable.class); 054 055 final static String NAME = "copytable"; 056 long startTime = 0; 057 long endTime = HConstants.LATEST_TIMESTAMP; 058 int batch = Integer.MAX_VALUE; 059 int cacheRow = -1; 060 int versions = -1; 061 String tableName = null; 062 String startRow = null; 063 String stopRow = null; 064 String dstTableName = null; 065 String peerAddress = null; 066 String families = null; 067 boolean allCells = false; 068 static boolean shuffle = false; 069 070 boolean bulkload = false; 071 Path bulkloadDir = null; 072 073 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 074 075 /** 076 * Sets up the actual job. 077 * 078 * @param args The command line parameters. 079 * @return The newly created job. 080 * @throws IOException When setting up the job fails. 081 */ 082 public Job createSubmittableJob(String[] args) 083 throws IOException { 084 if (!doCommandLine(args)) { 085 return null; 086 } 087 088 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 089 job.setJarByClass(CopyTable.class); 090 Scan scan = new Scan(); 091 092 scan.setBatch(batch); 093 scan.setCacheBlocks(false); 094 095 if (cacheRow > 0) { 096 scan.setCaching(cacheRow); 097 } else { 098 scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); 099 } 100 101 scan.setTimeRange(startTime, endTime); 102 103 if (allCells) { 104 scan.setRaw(true); 105 } 106 if (shuffle) { 107 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); 108 } 109 if (versions >= 0) { 110 scan.setMaxVersions(versions); 111 } 112 113 if (startRow != null) { 114 scan.setStartRow(Bytes.toBytesBinary(startRow)); 115 } 116 117 if (stopRow != null) { 118 scan.setStopRow(Bytes.toBytesBinary(stopRow)); 119 } 120 121 if(families != null) { 122 String[] fams = families.split(","); 123 Map<String,String> cfRenameMap = new HashMap<>(); 124 for(String fam : fams) { 125 String sourceCf; 126 if(fam.contains(":")) { 127 // fam looks like "sourceCfName:destCfName" 128 String[] srcAndDest = fam.split(":", 2); 129 sourceCf = srcAndDest[0]; 130 String destCf = srcAndDest[1]; 131 cfRenameMap.put(sourceCf, destCf); 132 } else { 133 // fam is just "sourceCf" 134 sourceCf = fam; 135 } 136 scan.addFamily(Bytes.toBytes(sourceCf)); 137 } 138 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); 139 } 140 job.setNumReduceTasks(0); 141 142 if (bulkload) { 143 TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null, 144 job); 145 146 // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. 147 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); 148 149 FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); 150 Random rand = new Random(); 151 Path root = new Path(fs.getWorkingDirectory(), "copytable"); 152 fs.mkdirs(root); 153 while (true) { 154 bulkloadDir = new Path(root, "" + rand.nextLong()); 155 if (!fs.exists(bulkloadDir)) { 156 break; 157 } 158 } 159 160 System.out.println("HFiles will be stored at " + this.bulkloadDir); 161 HFileOutputFormat2.setOutputPath(job, bulkloadDir); 162 try (Connection conn = ConnectionFactory.createConnection(getConf()); 163 Admin admin = conn.getAdmin()) { 164 HFileOutputFormat2.configureIncrementalLoadMap(job, 165 admin.getDescriptor((TableName.valueOf(dstTableName)))); 166 } 167 } else { 168 TableMapReduceUtil.initTableMapperJob(tableName, scan, 169 Import.Importer.class, null, null, job); 170 171 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, 172 null); 173 } 174 175 return job; 176 } 177 178 /* 179 * @param errorMsg Error message. Can be null. 180 */ 181 private static void printUsage(final String errorMsg) { 182 if (errorMsg != null && errorMsg.length() > 0) { 183 System.err.println("ERROR: " + errorMsg); 184 } 185 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + 186 "[--new.name=NEW] [--peer.adr=ADR] <tablename>"); 187 System.err.println(); 188 System.err.println("Options:"); 189 System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); 190 System.err.println(" specify if different from current cluster"); 191 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); 192 System.err.println(" startrow the start row"); 193 System.err.println(" stoprow the stop row"); 194 System.err.println(" starttime beginning of the time range (unixtime in millis)"); 195 System.err.println(" without endtime means from starttime to forever"); 196 System.err.println(" endtime end of the time range. Ignored if no starttime specified."); 197 System.err.println(" versions number of cell versions to copy"); 198 System.err.println(" new.name new table's name"); 199 System.err.println(" peer.adr Address of the peer cluster given in the format"); 200 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" 201 + ".port:zookeeper.znode.parent"); 202 System.err.println(" families comma-separated list of families to copy"); 203 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); 204 System.err.println(" To keep the same name, just give \"cfName\""); 205 System.err.println(" all.cells also copy delete markers and deleted cells"); 206 System.err.println(" bulkload Write input into HFiles and bulk load to the destination " 207 + "table"); 208 System.err.println(); 209 System.err.println("Args:"); 210 System.err.println(" tablename Name of the table to copy"); 211 System.err.println(); 212 System.err.println("Examples:"); 213 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); 214 System.err.println(" $ hbase " + 215 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + 216 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); 217 System.err.println("For performance consider the following general option:\n" 218 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" 219 + " decreases the round trip time to the server and may increase performance.\n" 220 + " -Dhbase.client.scanner.caching=100\n" 221 + " The following should always be set to false, to prevent writing data twice, which may produce \n" 222 + " inaccurate results.\n" 223 + " -Dmapreduce.map.speculative=false"); 224 } 225 226 private boolean doCommandLine(final String[] args) { 227 // Process command-line args. TODO: Better cmd-line processing 228 // (but hopefully something not as painful as cli options). 229 if (args.length < 1) { 230 printUsage(null); 231 return false; 232 } 233 try { 234 for (int i = 0; i < args.length; i++) { 235 String cmd = args[i]; 236 if (cmd.equals("-h") || cmd.startsWith("--h")) { 237 printUsage(null); 238 return false; 239 } 240 241 final String startRowArgKey = "--startrow="; 242 if (cmd.startsWith(startRowArgKey)) { 243 startRow = cmd.substring(startRowArgKey.length()); 244 continue; 245 } 246 247 final String stopRowArgKey = "--stoprow="; 248 if (cmd.startsWith(stopRowArgKey)) { 249 stopRow = cmd.substring(stopRowArgKey.length()); 250 continue; 251 } 252 253 final String startTimeArgKey = "--starttime="; 254 if (cmd.startsWith(startTimeArgKey)) { 255 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 256 continue; 257 } 258 259 final String endTimeArgKey = "--endtime="; 260 if (cmd.startsWith(endTimeArgKey)) { 261 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 262 continue; 263 } 264 265 final String batchArgKey = "--batch="; 266 if (cmd.startsWith(batchArgKey)) { 267 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 268 continue; 269 } 270 271 final String cacheRowArgKey = "--cacheRow="; 272 if (cmd.startsWith(cacheRowArgKey)) { 273 cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); 274 continue; 275 } 276 277 final String versionsArgKey = "--versions="; 278 if (cmd.startsWith(versionsArgKey)) { 279 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 280 continue; 281 } 282 283 final String newNameArgKey = "--new.name="; 284 if (cmd.startsWith(newNameArgKey)) { 285 dstTableName = cmd.substring(newNameArgKey.length()); 286 continue; 287 } 288 289 final String peerAdrArgKey = "--peer.adr="; 290 if (cmd.startsWith(peerAdrArgKey)) { 291 peerAddress = cmd.substring(peerAdrArgKey.length()); 292 continue; 293 } 294 295 final String familiesArgKey = "--families="; 296 if (cmd.startsWith(familiesArgKey)) { 297 families = cmd.substring(familiesArgKey.length()); 298 continue; 299 } 300 301 if (cmd.startsWith("--all.cells")) { 302 allCells = true; 303 continue; 304 } 305 306 if (cmd.startsWith("--bulkload")) { 307 bulkload = true; 308 continue; 309 } 310 311 if (cmd.startsWith("--shuffle")) { 312 shuffle = true; 313 continue; 314 } 315 316 if (i == args.length-1) { 317 tableName = cmd; 318 } else { 319 printUsage("Invalid argument '" + cmd + "'"); 320 return false; 321 } 322 } 323 if (dstTableName == null && peerAddress == null) { 324 printUsage("At least a new table name or a " + 325 "peer address must be specified"); 326 return false; 327 } 328 if ((endTime != 0) && (startTime > endTime)) { 329 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); 330 return false; 331 } 332 333 if (bulkload && peerAddress != null) { 334 printUsage("Remote bulkload is not supported!"); 335 return false; 336 } 337 338 // set dstTableName if necessary 339 if (dstTableName == null) { 340 dstTableName = tableName; 341 } 342 } catch (Exception e) { 343 e.printStackTrace(); 344 printUsage("Can't start because " + e.getMessage()); 345 return false; 346 } 347 return true; 348 } 349 350 /** 351 * Main entry point. 352 * 353 * @param args The command line parameters. 354 * @throws Exception When running the job fails. 355 */ 356 public static void main(String[] args) throws Exception { 357 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); 358 System.exit(ret); 359 } 360 361 @Override 362 public int run(String[] args) throws Exception { 363 Job job = createSubmittableJob(args); 364 if (job == null) return 1; 365 if (!job.waitForCompletion(true)) { 366 LOG.info("Map-reduce job failed!"); 367 if (bulkload) { 368 LOG.info("Files are not bulkloaded!"); 369 } 370 return 1; 371 } 372 int code = 0; 373 if (bulkload) { 374 code = new LoadIncrementalHFiles(this.getConf()) 375 .run(new String[] { this.bulkloadDir.toString(), this.dstTableName }); 376 if (code == 0) { 377 // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun 378 // LoadIncrementalHFiles. 379 FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); 380 if (!fs.delete(this.bulkloadDir, true)) { 381 LOG.error("Deleting folder " + bulkloadDir + " failed!"); 382 code = 1; 383 } 384 } 385 } 386 return code; 387 } 388}