1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce.replication;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.*;
27 import org.apache.hadoop.hbase.client.HConnectable;
28 import org.apache.hadoop.hbase.client.HConnection;
29 import org.apache.hadoop.hbase.client.HConnectionManager;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.ResultScanner;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
37 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
38 import org.apache.hadoop.hbase.mapreduce.TableMapper;
39 import org.apache.hadoop.hbase.replication.ReplicationPeer;
40 import org.apache.hadoop.hbase.replication.ReplicationPeers;
41 import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
42 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
47 import org.apache.zookeeper.KeeperException;
48
49
50
51
52
53
54
55
56
57
58
59 public class VerifyReplication {
60
61 private static final Log LOG =
62 LogFactory.getLog(VerifyReplication.class);
63
64 public final static String NAME = "verifyrep";
65 static long startTime = 0;
66 static long endTime = 0;
67 static String tableName = null;
68 static String families = null;
69 static String peerId = null;
70
71
72
73
74 public static class Verifier
75 extends TableMapper<ImmutableBytesWritable, Put> {
76
77 public static enum Counters {GOODROWS, BADROWS}
78
79 private ResultScanner replicatedScanner;
80
81
82
83
84
85
86
87
88
89 @Override
90 public void map(ImmutableBytesWritable row, final Result value,
91 Context context)
92 throws IOException {
93 if (replicatedScanner == null) {
94 Configuration conf = context.getConfiguration();
95 final Scan scan = new Scan();
96 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
97 long startTime = conf.getLong(NAME + ".startTime", 0);
98 long endTime = conf.getLong(NAME + ".endTime", 0);
99 String families = conf.get(NAME + ".families", null);
100 if(families != null) {
101 String[] fams = families.split(",");
102 for(String fam : fams) {
103 scan.addFamily(Bytes.toBytes(fam));
104 }
105 }
106 if (startTime != 0) {
107 scan.setTimeRange(startTime,
108 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
109 }
110 HConnectionManager.execute(new HConnectable<Void>(conf) {
111 @Override
112 public Void connect(HConnection conn) throws IOException {
113 ZooKeeperWatcher localZKW = null;
114 ReplicationZookeeper zk = null;
115 ReplicationPeer peer = null;
116 try {
117 localZKW = new ZooKeeperWatcher(
118 conf, "VerifyReplication", new Abortable() {
119 @Override public void abort(String why, Throwable e) {}
120 @Override public boolean isAborted() {return false;}
121 });
122 ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW);
123 rp.init();
124 Configuration peerConf = rp.getPeerConf(peerId);
125 if (peerConf == null) {
126 throw new IOException("Couldn't get peer conf!");
127 }
128 HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
129 scan.setStartRow(value.getRow());
130 replicatedScanner = replicatedTable.getScanner(scan);
131 } catch (KeeperException e) {
132 throw new IOException("Got a ZK exception", e);
133 } finally {
134 if (peer != null) {
135 peer.close();
136 }
137 if (zk != null) {
138 zk.close();
139 }
140 if (localZKW != null) {
141 localZKW.close();
142 }
143 }
144 return null;
145 }
146 });
147 }
148 Result res = replicatedScanner.next();
149 try {
150 Result.compareResults(value, res);
151 context.getCounter(Counters.GOODROWS).increment(1);
152 } catch (Exception e) {
153 LOG.warn("Bad row", e);
154 context.getCounter(Counters.BADROWS).increment(1);
155 }
156 }
157
158 protected void cleanup(Context context) {
159 if (replicatedScanner != null) {
160 replicatedScanner.close();
161 replicatedScanner = null;
162 }
163 }
164 }
165
166
167
168
169
170
171
172
173
174 public static Job createSubmittableJob(Configuration conf, String[] args)
175 throws IOException {
176 if (!doCommandLine(args)) {
177 return null;
178 }
179 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
180 throw new IOException("Replication needs to be enabled to verify it.");
181 }
182 conf.set(NAME+".peerId", peerId);
183 conf.set(NAME+".tableName", tableName);
184 conf.setLong(NAME+".startTime", startTime);
185 conf.setLong(NAME+".endTime", endTime);
186 if (families != null) {
187 conf.set(NAME+".families", families);
188 }
189 Job job = new Job(conf, NAME + "_" + tableName);
190 job.setJarByClass(VerifyReplication.class);
191
192 Scan scan = new Scan();
193 if (startTime != 0) {
194 scan.setTimeRange(startTime,
195 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
196 }
197 if(families != null) {
198 String[] fams = families.split(",");
199 for(String fam : fams) {
200 scan.addFamily(Bytes.toBytes(fam));
201 }
202 }
203 TableMapReduceUtil.initTableMapperJob(tableName, scan,
204 Verifier.class, null, null, job);
205 job.setOutputFormatClass(NullOutputFormat.class);
206 job.setNumReduceTasks(0);
207 return job;
208 }
209
210 private static boolean doCommandLine(final String[] args) {
211 if (args.length < 2) {
212 printUsage(null);
213 return false;
214 }
215 try {
216 for (int i = 0; i < args.length; i++) {
217 String cmd = args[i];
218 if (cmd.equals("-h") || cmd.startsWith("--h")) {
219 printUsage(null);
220 return false;
221 }
222
223 final String startTimeArgKey = "--starttime=";
224 if (cmd.startsWith(startTimeArgKey)) {
225 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
226 continue;
227 }
228
229 final String endTimeArgKey = "--endtime=";
230 if (cmd.startsWith(endTimeArgKey)) {
231 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
232 continue;
233 }
234
235 final String familiesArgKey = "--families=";
236 if (cmd.startsWith(familiesArgKey)) {
237 families = cmd.substring(familiesArgKey.length());
238 continue;
239 }
240
241 if (i == args.length-2) {
242 peerId = cmd;
243 }
244
245 if (i == args.length-1) {
246 tableName = cmd;
247 }
248 }
249 } catch (Exception e) {
250 e.printStackTrace();
251 printUsage("Can't start because " + e.getMessage());
252 return false;
253 }
254 return true;
255 }
256
257
258
259
260 private static void printUsage(final String errorMsg) {
261 if (errorMsg != null && errorMsg.length() > 0) {
262 System.err.println("ERROR: " + errorMsg);
263 }
264 System.err.println("Usage: verifyrep [--starttime=X]" +
265 " [--stoptime=Y] [--families=A] <peerid> <tablename>");
266 System.err.println();
267 System.err.println("Options:");
268 System.err.println(" starttime beginning of the time range");
269 System.err.println(" without endtime means from starttime to forever");
270 System.err.println(" stoptime end of the time range");
271 System.err.println(" families comma-separated list of families to copy");
272 System.err.println();
273 System.err.println("Args:");
274 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
275 System.err.println(" tablename Name of the table to verify");
276 System.err.println();
277 System.err.println("Examples:");
278 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
279 System.err.println(" $ bin/hbase " +
280 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
281 " --starttime=1265875194289 --stoptime=1265878794289 5 TestTable ");
282 }
283
284
285
286
287
288
289
290 public static void main(String[] args) throws Exception {
291 Configuration conf = HBaseConfiguration.create();
292 Job job = createSubmittableJob(conf, args);
293 if (job != null) {
294 System.exit(job.waitForCompletion(true) ? 0 : 1);
295 }
296 }
297 }