1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce.replication;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.HConnectable;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.client.HConnectionManager;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
39 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
40 import org.apache.hadoop.hbase.mapreduce.TableMapper;
41 import org.apache.hadoop.hbase.mapreduce.TableSplit;
42 import org.apache.hadoop.hbase.replication.ReplicationException;
43 import org.apache.hadoop.hbase.replication.ReplicationFactory;
44 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
45 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46 import org.apache.hadoop.hbase.replication.ReplicationPeers;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
51 import org.apache.hadoop.mapreduce.Job;
52 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
53 import org.apache.hadoop.util.Tool;
54 import org.apache.hadoop.util.ToolRunner;
55
56
57
58
59
60
61
62
63
64
65
66 public class VerifyReplication extends Configured implements Tool {
67
68 private static final Log LOG =
69 LogFactory.getLog(VerifyReplication.class);
70
71 public final static String NAME = "verifyrep";
72 static long startTime = 0;
73 static long endTime = Long.MAX_VALUE;
74 static int versions = -1;
75 static String tableName = null;
76 static String families = null;
77 static String peerId = null;
78
79
80
81
82 public static class Verifier
83 extends TableMapper<ImmutableBytesWritable, Put> {
84
85 public static enum Counters {
86 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
87
88 private ResultScanner replicatedScanner;
89 private Result currentCompareRowInPeerTable;
90 private Table replicatedTable;
91
92
93
94
95
96
97
98
99
100 @Override
101 public void map(ImmutableBytesWritable row, final Result value,
102 Context context)
103 throws IOException {
104 if (replicatedScanner == null) {
105 Configuration conf = context.getConfiguration();
106 final Scan scan = new Scan();
107 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
108 long startTime = conf.getLong(NAME + ".startTime", 0);
109 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
110 String families = conf.get(NAME + ".families", null);
111 if(families != null) {
112 String[] fams = families.split(",");
113 for(String fam : fams) {
114 scan.addFamily(Bytes.toBytes(fam));
115 }
116 }
117 scan.setTimeRange(startTime, endTime);
118 if (versions >= 0) {
119 scan.setMaxVersions(versions);
120 }
121
122 final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
123 HConnectionManager.execute(new HConnectable<Void>(conf) {
124 @Override
125 public Void connect(HConnection conn) throws IOException {
126 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
127 Configuration peerConf = HBaseConfiguration.create(conf);
128 ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
129
130 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
131 replicatedTable = new HTable(peerConf, tableName);
132 scan.setStartRow(value.getRow());
133 scan.setStopRow(tableSplit.getEndRow());
134 replicatedScanner = replicatedTable.getScanner(scan);
135 return null;
136 }
137 });
138 currentCompareRowInPeerTable = replicatedScanner.next();
139 }
140 while (true) {
141 if (currentCompareRowInPeerTable == null) {
142
143 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
144 break;
145 }
146 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
147 if (rowCmpRet == 0) {
148
149 try {
150 Result.compareResults(value, currentCompareRowInPeerTable);
151 context.getCounter(Counters.GOODROWS).increment(1);
152 } catch (Exception e) {
153 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
154 LOG.error("Exception while comparing row : " + e);
155 }
156 currentCompareRowInPeerTable = replicatedScanner.next();
157 break;
158 } else if (rowCmpRet < 0) {
159
160 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
161 break;
162 } else {
163
164 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
165 currentCompareRowInPeerTable);
166 currentCompareRowInPeerTable = replicatedScanner.next();
167 }
168 }
169 }
170
171 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
172 context.getCounter(counter).increment(1);
173 context.getCounter(Counters.BADROWS).increment(1);
174 LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
175 }
176
177 @Override
178 protected void cleanup(Context context) {
179 if (replicatedScanner != null) {
180 try {
181 while (currentCompareRowInPeerTable != null) {
182 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
183 currentCompareRowInPeerTable);
184 currentCompareRowInPeerTable = replicatedScanner.next();
185 }
186 } catch (Exception e) {
187 LOG.error("fail to scan peer table in cleanup", e);
188 } finally {
189 replicatedScanner.close();
190 replicatedScanner = null;
191 }
192 }
193 if (replicatedTable != null) {
194 TableName tableName = replicatedTable.getName();
195 try {
196 replicatedTable.close();
197 } catch (IOException ioe) {
198 LOG.warn("Exception closing " + tableName, ioe);
199 }
200 }
201 }
202 }
203
204 private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
205 ZooKeeperWatcher localZKW = null;
206 ReplicationPeerZKImpl peer = null;
207 try {
208 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
209 new Abortable() {
210 @Override public void abort(String why, Throwable e) {}
211 @Override public boolean isAborted() {return false;}
212 });
213
214 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
215 rp.init();
216
217 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
218 if (pair == null) {
219 throw new IOException("Couldn't get peer conf!");
220 }
221 Configuration peerConf = rp.getPeerConf(peerId).getSecond();
222 return ZKUtil.getZooKeeperClusterKey(peerConf);
223 } catch (ReplicationException e) {
224 throw new IOException(
225 "An error occured while trying to connect to the remove peer cluster", e);
226 } finally {
227 if (peer != null) {
228 peer.close();
229 }
230 if (localZKW != null) {
231 localZKW.close();
232 }
233 }
234 }
235
236
237
238
239
240
241
242
243
244 public static Job createSubmittableJob(Configuration conf, String[] args)
245 throws IOException {
246 if (!doCommandLine(args)) {
247 return null;
248 }
249 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
250 HConstants.REPLICATION_ENABLE_DEFAULT)) {
251 throw new IOException("Replication needs to be enabled to verify it.");
252 }
253 conf.set(NAME+".peerId", peerId);
254 conf.set(NAME+".tableName", tableName);
255 conf.setLong(NAME+".startTime", startTime);
256 conf.setLong(NAME+".endTime", endTime);
257 if (families != null) {
258 conf.set(NAME+".families", families);
259 }
260
261 String peerQuorumAddress = getPeerQuorumAddress(conf);
262 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
263 LOG.info("Peer Quorum Address: " + peerQuorumAddress);
264
265 Job job = new Job(conf, NAME + "_" + tableName);
266 job.setJarByClass(VerifyReplication.class);
267
268 Scan scan = new Scan();
269 scan.setTimeRange(startTime, endTime);
270 if (versions >= 0) {
271 scan.setMaxVersions(versions);
272 }
273 if(families != null) {
274 String[] fams = families.split(",");
275 for(String fam : fams) {
276 scan.addFamily(Bytes.toBytes(fam));
277 }
278 }
279 TableMapReduceUtil.initTableMapperJob(tableName, scan,
280 Verifier.class, null, null, job);
281
282
283 TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
284
285 job.setOutputFormatClass(NullOutputFormat.class);
286 job.setNumReduceTasks(0);
287 return job;
288 }
289
290 private static boolean doCommandLine(final String[] args) {
291 if (args.length < 2) {
292 printUsage(null);
293 return false;
294 }
295 try {
296 for (int i = 0; i < args.length; i++) {
297 String cmd = args[i];
298 if (cmd.equals("-h") || cmd.startsWith("--h")) {
299 printUsage(null);
300 return false;
301 }
302
303 final String startTimeArgKey = "--starttime=";
304 if (cmd.startsWith(startTimeArgKey)) {
305 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
306 continue;
307 }
308
309 final String endTimeArgKey = "--endtime=";
310 if (cmd.startsWith(endTimeArgKey)) {
311 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
312 continue;
313 }
314
315 final String versionsArgKey = "--versions=";
316 if (cmd.startsWith(versionsArgKey)) {
317 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
318 continue;
319 }
320
321 final String familiesArgKey = "--families=";
322 if (cmd.startsWith(familiesArgKey)) {
323 families = cmd.substring(familiesArgKey.length());
324 continue;
325 }
326
327 if (i == args.length-2) {
328 peerId = cmd;
329 }
330
331 if (i == args.length-1) {
332 tableName = cmd;
333 }
334 }
335 } catch (Exception e) {
336 e.printStackTrace();
337 printUsage("Can't start because " + e.getMessage());
338 return false;
339 }
340 return true;
341 }
342
343
344
345
346 private static void printUsage(final String errorMsg) {
347 if (errorMsg != null && errorMsg.length() > 0) {
348 System.err.println("ERROR: " + errorMsg);
349 }
350 System.err.println("Usage: verifyrep [--starttime=X]" +
351 " [--stoptime=Y] [--families=A] <peerid> <tablename>");
352 System.err.println();
353 System.err.println("Options:");
354 System.err.println(" starttime beginning of the time range");
355 System.err.println(" without endtime means from starttime to forever");
356 System.err.println(" endtime end of the time range");
357 System.err.println(" versions number of cell versions to verify");
358 System.err.println(" families comma-separated list of families to copy");
359 System.err.println();
360 System.err.println("Args:");
361 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
362 System.err.println(" tablename Name of the table to verify");
363 System.err.println();
364 System.err.println("Examples:");
365 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
366 System.err.println(" $ bin/hbase " +
367 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
368 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
369 }
370
371 @Override
372 public int run(String[] args) throws Exception {
373 Configuration conf = this.getConf();
374 Job job = createSubmittableJob(conf, args);
375 if (job != null) {
376 return job.waitForCompletion(true) ? 0 : 1;
377 }
378 return 1;
379 }
380
381
382
383
384
385
386
387 public static void main(String[] args) throws Exception {
388 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
389 System.exit(res);
390 }
391 }