1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce.replication;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.HConnectable;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.client.HConnectionManager;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
39 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
40 import org.apache.hadoop.hbase.mapreduce.TableMapper;
41 import org.apache.hadoop.hbase.mapreduce.TableSplit;
42 import org.apache.hadoop.hbase.replication.ReplicationException;
43 import org.apache.hadoop.hbase.replication.ReplicationFactory;
44 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
45 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46 import org.apache.hadoop.hbase.replication.ReplicationPeers;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50 import org.apache.hadoop.mapreduce.Job;
51 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
52 import org.apache.hadoop.util.Tool;
53 import org.apache.hadoop.util.ToolRunner;
54
55
56
57
58
59
60
61
62
63
64
65 public class VerifyReplication extends Configured implements Tool {
66
67 private static final Log LOG =
68 LogFactory.getLog(VerifyReplication.class);
69
70 public final static String NAME = "verifyrep";
71 private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
72 static long startTime = 0;
73 static long endTime = Long.MAX_VALUE;
74 static int versions = -1;
75 static String tableName = null;
76 static String families = null;
77 static String peerId = null;
78
79
80
81
82 public static class Verifier
83 extends TableMapper<ImmutableBytesWritable, Put> {
84
85 public static enum Counters {
86 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
87
88 private ResultScanner replicatedScanner;
89 private Result currentCompareRowInPeerTable;
90 private Table replicatedTable;
91
92
93
94
95
96
97
98
99
100 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_ALWAYS_NULL",
101 justification="The replicatedScanner will be assigned when connect() is called")
102 @Override
103 public void map(ImmutableBytesWritable row, final Result value,
104 Context context)
105 throws IOException {
106 if (replicatedScanner == null) {
107 Configuration conf = context.getConfiguration();
108 final Scan scan = new Scan();
109 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
110 long startTime = conf.getLong(NAME + ".startTime", 0);
111 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
112 String families = conf.get(NAME + ".families", null);
113 if(families != null) {
114 String[] fams = families.split(",");
115 for(String fam : fams) {
116 scan.addFamily(Bytes.toBytes(fam));
117 }
118 }
119 scan.setTimeRange(startTime, endTime);
120 int versions = conf.getInt(NAME+".versions", -1);
121 LOG.info("Setting number of version inside map as: " + versions);
122 if (versions >= 0) {
123 scan.setMaxVersions(versions);
124 }
125
126 final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
127 HConnectionManager.execute(new HConnectable<Void>(conf) {
128 @Override
129 public Void connect(HConnection conn) throws IOException {
130 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
131 Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
132 zkClusterKey, PEER_CONFIG_PREFIX);
133
134 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
135 replicatedTable = new HTable(peerConf, tableName);
136 scan.setStartRow(value.getRow());
137 scan.setStopRow(tableSplit.getEndRow());
138 replicatedScanner = replicatedTable.getScanner(scan);
139 return null;
140 }
141 });
142 currentCompareRowInPeerTable = replicatedScanner.next();
143 }
144 while (true) {
145 if (currentCompareRowInPeerTable == null) {
146
147 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
148 break;
149 }
150 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
151 if (rowCmpRet == 0) {
152
153 try {
154 Result.compareResults(value, currentCompareRowInPeerTable);
155 context.getCounter(Counters.GOODROWS).increment(1);
156 } catch (Exception e) {
157 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
158 LOG.error("Exception while comparing row : " + e);
159 }
160 currentCompareRowInPeerTable = replicatedScanner.next();
161 break;
162 } else if (rowCmpRet < 0) {
163
164 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
165 break;
166 } else {
167
168 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
169 currentCompareRowInPeerTable);
170 currentCompareRowInPeerTable = replicatedScanner.next();
171 }
172 }
173 }
174
175 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
176 context.getCounter(counter).increment(1);
177 context.getCounter(Counters.BADROWS).increment(1);
178 LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
179 }
180
181 @Override
182 protected void cleanup(Context context) {
183 if (replicatedScanner != null) {
184 try {
185 while (currentCompareRowInPeerTable != null) {
186 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
187 currentCompareRowInPeerTable);
188 currentCompareRowInPeerTable = replicatedScanner.next();
189 }
190 } catch (Exception e) {
191 LOG.error("fail to scan peer table in cleanup", e);
192 } finally {
193 replicatedScanner.close();
194 replicatedScanner = null;
195 }
196 }
197 if (replicatedTable != null) {
198 TableName tableName = replicatedTable.getName();
199 try {
200 replicatedTable.close();
201 } catch (IOException ioe) {
202 LOG.warn("Exception closing " + tableName, ioe);
203 }
204 }
205 }
206 }
207
208 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
209 final Configuration conf) throws IOException {
210 ZooKeeperWatcher localZKW = null;
211 ReplicationPeerZKImpl peer = null;
212 try {
213 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
214 new Abortable() {
215 @Override public void abort(String why, Throwable e) {}
216 @Override public boolean isAborted() {return false;}
217 });
218
219 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
220 rp.init();
221
222 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
223 if (pair == null) {
224 throw new IOException("Couldn't get peer conf!");
225 }
226
227 return pair;
228 } catch (ReplicationException e) {
229 throw new IOException(
230 "An error occured while trying to connect to the remove peer cluster", e);
231 } finally {
232 if (peer != null) {
233 peer.close();
234 }
235 if (localZKW != null) {
236 localZKW.close();
237 }
238 }
239 }
240
241
242
243
244
245
246
247
248
249 public static Job createSubmittableJob(Configuration conf, String[] args)
250 throws IOException {
251 if (!doCommandLine(args)) {
252 return null;
253 }
254 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
255 HConstants.REPLICATION_ENABLE_DEFAULT)) {
256 throw new IOException("Replication needs to be enabled to verify it.");
257 }
258 conf.set(NAME+".peerId", peerId);
259 conf.set(NAME+".tableName", tableName);
260 conf.setLong(NAME+".startTime", startTime);
261 conf.setLong(NAME+".endTime", endTime);
262 if (families != null) {
263 conf.set(NAME+".families", families);
264 }
265
266 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
267 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
268 String peerQuorumAddress = peerConfig.getClusterKey();
269 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
270 peerConfig.getConfiguration());
271 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
272 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
273 peerConfig.getConfiguration().entrySet());
274
275 conf.setInt(NAME + ".versions", versions);
276 LOG.info("Number of version: " + versions);
277
278 Job job = new Job(conf, NAME + "_" + tableName);
279 job.setJarByClass(VerifyReplication.class);
280
281 Scan scan = new Scan();
282 scan.setTimeRange(startTime, endTime);
283 if (versions >= 0) {
284 scan.setMaxVersions(versions);
285 LOG.info("Number of versions set to " + versions);
286 }
287 if(families != null) {
288 String[] fams = families.split(",");
289 for(String fam : fams) {
290 scan.addFamily(Bytes.toBytes(fam));
291 }
292 }
293 TableMapReduceUtil.initTableMapperJob(tableName, scan,
294 Verifier.class, null, null, job);
295
296 Configuration peerClusterConf = peerConfigPair.getSecond();
297
298 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
299
300 job.setOutputFormatClass(NullOutputFormat.class);
301 job.setNumReduceTasks(0);
302 return job;
303 }
304
305 private static boolean doCommandLine(final String[] args) {
306 if (args.length < 2) {
307 printUsage(null);
308 return false;
309 }
310 try {
311 for (int i = 0; i < args.length; i++) {
312 String cmd = args[i];
313 if (cmd.equals("-h") || cmd.startsWith("--h")) {
314 printUsage(null);
315 return false;
316 }
317
318 final String startTimeArgKey = "--starttime=";
319 if (cmd.startsWith(startTimeArgKey)) {
320 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
321 continue;
322 }
323
324 final String endTimeArgKey = "--endtime=";
325 if (cmd.startsWith(endTimeArgKey)) {
326 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
327 continue;
328 }
329
330 final String versionsArgKey = "--versions=";
331 if (cmd.startsWith(versionsArgKey)) {
332 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
333 continue;
334 }
335
336 final String familiesArgKey = "--families=";
337 if (cmd.startsWith(familiesArgKey)) {
338 families = cmd.substring(familiesArgKey.length());
339 continue;
340 }
341
342 if (i == args.length-2) {
343 peerId = cmd;
344 }
345
346 if (i == args.length-1) {
347 tableName = cmd;
348 }
349 }
350 } catch (Exception e) {
351 e.printStackTrace();
352 printUsage("Can't start because " + e.getMessage());
353 return false;
354 }
355 return true;
356 }
357
358
359
360
361 private static void printUsage(final String errorMsg) {
362 if (errorMsg != null && errorMsg.length() > 0) {
363 System.err.println("ERROR: " + errorMsg);
364 }
365 System.err.println("Usage: verifyrep [--starttime=X]" +
366 " [--endtime=Y] [--families=A] <peerid> <tablename>");
367 System.err.println();
368 System.err.println("Options:");
369 System.err.println(" starttime beginning of the time range");
370 System.err.println(" without endtime means from starttime to forever");
371 System.err.println(" endtime end of the time range");
372 System.err.println(" versions number of cell versions to verify");
373 System.err.println(" families comma-separated list of families to copy");
374 System.err.println();
375 System.err.println("Args:");
376 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
377 System.err.println(" tablename Name of the table to verify");
378 System.err.println();
379 System.err.println("Examples:");
380 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
381 System.err.println(" $ bin/hbase " +
382 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
383 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
384 }
385
386 @Override
387 public int run(String[] args) throws Exception {
388 Configuration conf = this.getConf();
389 Job job = createSubmittableJob(conf, args);
390 if (job != null) {
391 return job.waitForCompletion(true) ? 0 : 1;
392 }
393 return 1;
394 }
395
396
397
398
399
400
401
402 public static void main(String[] args) throws Exception {
403 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
404 System.exit(res);
405 }
406 }