1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.hadoop.hbase.classification.InterfaceAudience;
22 import org.apache.hadoop.hbase.classification.InterfaceStability;
23
24 import java.io.IOException;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Random;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.Connection;
39 import org.apache.hadoop.hbase.client.ConnectionFactory;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.FSUtils;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.util.GenericOptionsParser;
46 import org.apache.hadoop.util.Tool;
47 import org.apache.hadoop.util.ToolRunner;
48
49
50
51
52
53
54 @InterfaceAudience.Public
55 @InterfaceStability.Stable
56 public class CopyTable extends Configured implements Tool {
57 private static final Log LOG = LogFactory.getLog(CopyTable.class);
58
59 final static String NAME = "copytable";
60 long startTime = 0;
61 long endTime = 0;
62 int versions = -1;
63 String tableName = null;
64 String startRow = null;
65 String stopRow = null;
66 String dstTableName = null;
67 String peerAddress = null;
68 String families = null;
69 boolean allCells = false;
70 static boolean shuffle = false;
71
72 boolean bulkload = false;
73 Path bulkloadDir = null;
74
75 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
76
77 public CopyTable(Configuration conf) {
78 super(conf);
79 }
80
81
82
83
84
85
86
87 public Job createSubmittableJob(String[] args)
88 throws IOException {
89 if (!doCommandLine(args)) {
90 return null;
91 }
92
93 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
94
95 job.setJarByClass(CopyTable.class);
96 Scan scan = new Scan();
97 scan.setCacheBlocks(false);
98 if (startTime != 0) {
99 scan.setTimeRange(startTime,
100 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
101 }
102 if (allCells) {
103 scan.setRaw(true);
104 }
105 if (shuffle) {
106 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
107 }
108 if (versions >= 0) {
109 scan.setMaxVersions(versions);
110 }
111
112 if (startRow != null) {
113 scan.setStartRow(Bytes.toBytes(startRow));
114 }
115
116 if (stopRow != null) {
117 scan.setStopRow(Bytes.toBytes(stopRow));
118 }
119
120 if(families != null) {
121 String[] fams = families.split(",");
122 Map<String,String> cfRenameMap = new HashMap<String,String>();
123 for(String fam : fams) {
124 String sourceCf;
125 if(fam.contains(":")) {
126
127 String[] srcAndDest = fam.split(":", 2);
128 sourceCf = srcAndDest[0];
129 String destCf = srcAndDest[1];
130 cfRenameMap.put(sourceCf, destCf);
131 } else {
132
133 sourceCf = fam;
134 }
135 scan.addFamily(Bytes.toBytes(sourceCf));
136 }
137 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
138 }
139 job.setNumReduceTasks(0);
140
141 if (bulkload) {
142 TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
143 null, job);
144
145
146 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
147
148 FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
149 Random rand = new Random();
150 Path root = new Path(fs.getWorkingDirectory(), "copytable");
151 fs.mkdirs(root);
152 while (true) {
153 bulkloadDir = new Path(root, "" + rand.nextLong());
154 if (!fs.exists(bulkloadDir)) {
155 break;
156 }
157 }
158
159 System.out.println("HFiles will be stored at " + this.bulkloadDir);
160 HFileOutputFormat2.setOutputPath(job, bulkloadDir);
161 try (Connection conn = ConnectionFactory.createConnection(getConf());
162 Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
163 HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
164 }
165 } else {
166 TableMapReduceUtil.initTableMapperJob(tableName, scan,
167 Import.Importer.class, null, null, job);
168
169 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
170 null);
171 }
172
173 return job;
174 }
175
176
177
178
179 private static void printUsage(final String errorMsg) {
180 if (errorMsg != null && errorMsg.length() > 0) {
181 System.err.println("ERROR: " + errorMsg);
182 }
183 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
184 "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
185 System.err.println();
186 System.err.println("Options:");
187 System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
188 System.err.println(" specify if different from current cluster");
189 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster");
190 System.err.println(" startrow the start row");
191 System.err.println(" stoprow the stop row");
192 System.err.println(" starttime beginning of the time range (unixtime in millis)");
193 System.err.println(" without endtime means from starttime to forever");
194 System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
195 System.err.println(" versions number of cell versions to copy");
196 System.err.println(" new.name new table's name");
197 System.err.println(" peer.adr Address of the peer cluster given in the format");
198 System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
199 System.err.println(" families comma-separated list of families to copy");
200 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
201 System.err.println(" To keep the same name, just give \"cfName\"");
202 System.err.println(" all.cells also copy delete markers and deleted cells");
203 System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
204 + "table");
205 System.err.println();
206 System.err.println("Args:");
207 System.err.println(" tablename Name of the table to copy");
208 System.err.println();
209 System.err.println("Examples:");
210 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
211 System.err.println(" $ bin/hbase " +
212 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
213 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
214 System.err.println("For performance consider the following general option:\n"
215 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
216 + " decreases the round trip time to the server and may increase performance.\n"
217 + " -Dhbase.client.scanner.caching=100\n"
218 + " The following should always be set to false, to prevent writing data twice, which may produce \n"
219 + " inaccurate results.\n"
220 + " -Dmapreduce.map.speculative=false");
221 }
222
223 private boolean doCommandLine(final String[] args) {
224
225
226 if (args.length < 1) {
227 printUsage(null);
228 return false;
229 }
230 try {
231 for (int i = 0; i < args.length; i++) {
232 String cmd = args[i];
233 if (cmd.equals("-h") || cmd.startsWith("--h")) {
234 printUsage(null);
235 return false;
236 }
237
238 final String startRowArgKey = "--startrow=";
239 if (cmd.startsWith(startRowArgKey)) {
240 startRow = cmd.substring(startRowArgKey.length());
241 continue;
242 }
243
244 final String stopRowArgKey = "--stoprow=";
245 if (cmd.startsWith(stopRowArgKey)) {
246 stopRow = cmd.substring(stopRowArgKey.length());
247 continue;
248 }
249
250 final String startTimeArgKey = "--starttime=";
251 if (cmd.startsWith(startTimeArgKey)) {
252 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
253 continue;
254 }
255
256 final String endTimeArgKey = "--endtime=";
257 if (cmd.startsWith(endTimeArgKey)) {
258 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
259 continue;
260 }
261
262 final String versionsArgKey = "--versions=";
263 if (cmd.startsWith(versionsArgKey)) {
264 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
265 continue;
266 }
267
268 final String newNameArgKey = "--new.name=";
269 if (cmd.startsWith(newNameArgKey)) {
270 dstTableName = cmd.substring(newNameArgKey.length());
271 continue;
272 }
273
274 final String peerAdrArgKey = "--peer.adr=";
275 if (cmd.startsWith(peerAdrArgKey)) {
276 peerAddress = cmd.substring(peerAdrArgKey.length());
277 continue;
278 }
279
280 final String familiesArgKey = "--families=";
281 if (cmd.startsWith(familiesArgKey)) {
282 families = cmd.substring(familiesArgKey.length());
283 continue;
284 }
285
286 if (cmd.startsWith("--all.cells")) {
287 allCells = true;
288 continue;
289 }
290
291 if (cmd.startsWith("--bulkload")) {
292 bulkload = true;
293 continue;
294 }
295
296 if (cmd.startsWith("--shuffle")) {
297 shuffle = true;
298 continue;
299 }
300
301 if (i == args.length-1) {
302 tableName = cmd;
303 } else {
304 printUsage("Invalid argument '" + cmd + "'" );
305 return false;
306 }
307 }
308 if (dstTableName == null && peerAddress == null) {
309 printUsage("At least a new table name or a " +
310 "peer address must be specified");
311 return false;
312 }
313 if ((endTime != 0) && (startTime > endTime)) {
314 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
315 return false;
316 }
317
318 if (bulkload && peerAddress != null) {
319 printUsage("Remote bulkload is not supported!");
320 return false;
321 }
322
323
324 if (dstTableName == null) {
325 dstTableName = tableName;
326 }
327 } catch (Exception e) {
328 e.printStackTrace();
329 printUsage("Can't start because " + e.getMessage());
330 return false;
331 }
332 return true;
333 }
334
335
336
337
338
339
340
341 public static void main(String[] args) throws Exception {
342 int ret = ToolRunner.run(new CopyTable(HBaseConfiguration.create()), args);
343 System.exit(ret);
344 }
345
346 @Override
347 public int run(String[] args) throws Exception {
348 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
349 Job job = createSubmittableJob(otherArgs);
350 if (job == null) return 1;
351 if (!job.waitForCompletion(true)) {
352 LOG.info("Map-reduce job failed!");
353 if (bulkload) {
354 LOG.info("Files are not bulkloaded!");
355 }
356 return 1;
357 }
358 int code = 0;
359 if (bulkload) {
360 code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
361 this.dstTableName});
362 if (code == 0) {
363
364
365 FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
366 if (!fs.delete(this.bulkloadDir, true)) {
367 LOG.error("Deleting folder " + bulkloadDir + " failed!");
368 code = 1;
369 }
370 }
371 }
372 return code;
373 }
374 }