1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.net.ConnectException;
25 import java.net.SocketTimeoutException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Comparator;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.NavigableMap;
32 import java.util.Random;
33 import java.util.Set;
34 import java.util.UUID;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.PriorityBlockingQueue;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.classification.InterfaceAudience;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.KeyValue;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.Stoppable;
51 import org.apache.hadoop.hbase.client.HConnection;
52 import org.apache.hadoop.hbase.client.HConnectionManager;
53 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
54 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
56 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
57 import org.apache.hadoop.hbase.regionserver.wal.HLog;
58 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
59 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
60 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
61 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.Threads;
64 import org.apache.hadoop.ipc.RemoteException;
65 import org.apache.zookeeper.KeeperException;
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @InterfaceAudience.Private
80 public class ReplicationSource extends Thread
81 implements ReplicationSourceInterface {
82
83 private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
84
85 private PriorityBlockingQueue<Path> queue;
86
87 private HLog.Entry[] entriesArray;
88 private HConnection conn;
89
90 private ReplicationZookeeper zkHelper;
91 private Configuration conf;
92
93 private float ratio;
94 private Random random;
95
96 private AtomicBoolean replicating;
97 private ReplicationQueueInfo replicationQueueInfo;
98
99 private String peerId;
100
101 private ReplicationSourceManager manager;
102
103 private Stoppable stopper;
104
105 private List<ServerName> currentPeers;
106
107 private long sleepForRetries;
108
109 private long replicationQueueSizeCapacity;
110
111 private int replicationQueueNbCapacity;
112
113 private HLog.Reader reader;
114
115 private long lastLoggedPosition = -1;
116
117 private volatile Path currentPath;
118 private FileSystem fs;
119
120 private UUID clusterId;
121
122 private UUID peerClusterId;
123
124 private long totalReplicatedEdits = 0;
125
126 private String peerClusterZnode;
127
128 private int maxRetriesMultiplier;
129
130 private int socketTimeoutMultiplier;
131
132 private int currentNbEntries = 0;
133
134 private int currentNbOperations = 0;
135
136 private int currentSize = 0;
137
138 private volatile boolean running = true;
139
140 private MetricsSource metrics;
141
142 private ReplicationHLogReaderManager repLogReader;
143
144
145
146
147
148
149
150
151
152
153
154
155 public void init(final Configuration conf,
156 final FileSystem fs,
157 final ReplicationSourceManager manager,
158 final Stoppable stopper,
159 final AtomicBoolean replicating,
160 final String peerClusterZnode)
161 throws IOException {
162 this.stopper = stopper;
163 this.conf = conf;
164 this.replicationQueueSizeCapacity =
165 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
166 this.replicationQueueNbCapacity =
167 this.conf.getInt("replication.source.nb.capacity", 25000);
168 this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
169 for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
170 this.entriesArray[i] = new HLog.Entry();
171 }
172 this.maxRetriesMultiplier =
173 this.conf.getInt("replication.source.maxretriesmultiplier", 10);
174 this.socketTimeoutMultiplier = maxRetriesMultiplier * maxRetriesMultiplier;
175 this.queue =
176 new PriorityBlockingQueue<Path>(
177 conf.getInt("hbase.regionserver.maxlogs", 32),
178 new LogsComparator());
179 this.conn = HConnectionManager.getConnection(conf);
180 this.zkHelper = manager.getRepZkWrapper();
181 this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
182 this.currentPeers = new ArrayList<ServerName>();
183 this.random = new Random();
184 this.replicating = replicating;
185 this.manager = manager;
186 this.sleepForRetries =
187 this.conf.getLong("replication.source.sleepforretries", 1000);
188 this.fs = fs;
189 this.metrics = new MetricsSource(peerClusterZnode);
190 this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
191 try {
192 this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
193 } catch (KeeperException ke) {
194 throw new IOException("Could not read cluster id", ke);
195 }
196 this.peerClusterZnode = peerClusterZnode;
197 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
198
199 this.peerId = this.replicationQueueInfo.getPeerId();
200 }
201
202
203
204
205 private void chooseSinks() {
206 this.currentPeers.clear();
207 List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
208 Set<ServerName> setOfAddr = new HashSet<ServerName>();
209 int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
210 LOG.info("Getting " + nbPeers +
211 " rs from peer cluster # " + this.peerId);
212 for (int i = 0; i < nbPeers; i++) {
213 ServerName sn;
214
215 do {
216 sn = addresses.get(this.random.nextInt(addresses.size()));
217 } while (setOfAddr.contains(sn));
218 LOG.info("Choosing peer " + sn);
219 setOfAddr.add(sn);
220 }
221 this.currentPeers.addAll(setOfAddr);
222 }
223
224 @Override
225 public void enqueueLog(Path log) {
226 this.queue.put(log);
227 this.metrics.setSizeOfLogQueue(queue.size());
228 }
229
230 @Override
231 public void run() {
232 connectToPeers();
233
234 if (!this.isActive()) {
235 metrics.clear();
236 return;
237 }
238 int sleepMultiplier = 1;
239
240 while (this.peerClusterId == null) {
241 this.peerClusterId = zkHelper.getPeerUUID(this.peerId);
242 if (this.peerClusterId == null) {
243 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
244 sleepMultiplier++;
245 }
246 }
247 }
248
249 sleepMultiplier = 1;
250
251 LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
252
253
254
255 if (this.replicationQueueInfo.isQueueRecovered()) {
256 try {
257 this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
258 this.peerClusterZnode, this.queue.peek().getName()));
259 } catch (KeeperException e) {
260 this.terminate("Couldn't get the position of this recovered queue " +
261 this.peerClusterZnode, e);
262 }
263 }
264
265 while (isActive()) {
266
267 if (!isPeerEnabled()) {
268 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
269 sleepMultiplier++;
270 }
271 continue;
272 }
273 Path oldPath = getCurrentPath();
274
275
276
277 boolean hasCurrentPath = getNextPath();
278 if (getCurrentPath() != null && oldPath == null) {
279 sleepMultiplier = 1;
280 }
281 if (!hasCurrentPath) {
282 if (sleepForRetries("No log to process", sleepMultiplier)) {
283 sleepMultiplier++;
284 }
285 continue;
286 }
287 boolean currentWALisBeingWrittenTo = false;
288
289
290
291
292
293
294
295
296 if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
297 currentWALisBeingWrittenTo = true;
298 }
299
300 if (!openReader(sleepMultiplier)) {
301
302 sleepMultiplier = 1;
303 continue;
304 }
305
306
307 if (this.reader == null) {
308 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
309 sleepMultiplier++;
310 }
311 continue;
312 }
313
314 boolean gotIOE = false;
315 currentNbOperations = 0;
316 currentNbEntries = 0;
317 currentSize = 0;
318 try {
319 if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
320 continue;
321 }
322 } catch (IOException ioe) {
323 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
324 gotIOE = true;
325 if (ioe.getCause() instanceof EOFException) {
326
327 boolean considerDumping = false;
328 if (this.replicationQueueInfo.isQueueRecovered()) {
329 try {
330 FileStatus stat = this.fs.getFileStatus(this.currentPath);
331 if (stat.getLen() == 0) {
332 LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
333 }
334 considerDumping = true;
335 } catch (IOException e) {
336 LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
337 }
338 } else if (currentNbEntries != 0) {
339 LOG.warn(this.peerClusterZnode +
340 " Got EOF while reading, " + "looks like this file is broken? " + currentPath);
341 considerDumping = true;
342 currentNbEntries = 0;
343 }
344
345 if (considerDumping &&
346 sleepMultiplier == this.maxRetriesMultiplier &&
347 processEndOfFile()) {
348 continue;
349 }
350 }
351 } finally {
352 try {
353 this.reader = null;
354 this.repLogReader.closeReader();
355 } catch (IOException e) {
356 gotIOE = true;
357 LOG.warn("Unable to finalize the tailing of a file", e);
358 }
359 }
360
361
362
363
364 if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
365 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
366 this.manager.logPositionAndCleanOldLogs(this.currentPath,
367 this.peerClusterZnode, this.repLogReader.getPosition(),
368 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
369 this.lastLoggedPosition = this.repLogReader.getPosition();
370 }
371 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
372 sleepMultiplier++;
373 }
374 continue;
375 }
376 sleepMultiplier = 1;
377 shipEdits(currentWALisBeingWrittenTo);
378
379 }
380 if (this.conn != null) {
381 try {
382 this.conn.close();
383 } catch (IOException e) {
384 LOG.debug("Attempt to close connection failed", e);
385 }
386 }
387 LOG.debug("Source exiting " + this.peerId);
388 metrics.clear();
389 }
390
391
392
393
394
395
396
397
398
399 protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
400 throws IOException{
401 long seenEntries = 0;
402 this.repLogReader.seek();
403 HLog.Entry entry =
404 this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
405 while (entry != null) {
406 WALEdit edit = entry.getEdit();
407 this.metrics.incrLogEditsRead();
408 seenEntries++;
409
410 HLogKey logKey = entry.getKey();
411
412 if (!logKey.getClusterId().equals(peerClusterId)) {
413 removeNonReplicableEdits(entry);
414
415
416 if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
417 Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
418 edit.size() != 0 && replicating.get()) {
419
420
421
422
423 if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
424 logKey.setClusterId(this.clusterId);
425 }
426 currentNbOperations += countDistinctRowKeys(edit);
427 currentNbEntries++;
428 currentSize += entry.getEdit().size();
429 } else {
430 this.metrics.incrLogEditsFiltered();
431 }
432 }
433
434 if (currentSize >= this.replicationQueueSizeCapacity ||
435 currentNbEntries >= this.replicationQueueNbCapacity) {
436 break;
437 }
438 try {
439 entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
440 } catch (IOException ie) {
441 LOG.debug("Break on IOE: " + ie.getMessage());
442 break;
443 }
444 }
445 if (currentWALisBeingWrittenTo) {
446 return false;
447 }
448
449
450 return seenEntries == 0 && processEndOfFile();
451 }
452
453 private void connectToPeers() {
454
455 while (this.isActive() && this.currentPeers.size() == 0) {
456
457 try {
458 chooseSinks();
459 Thread.sleep(this.sleepForRetries);
460 } catch (InterruptedException e) {
461 LOG.error("Interrupted while trying to connect to sinks", e);
462 }
463 }
464 }
465
466
467
468
469
470 protected boolean getNextPath() {
471 try {
472 if (this.currentPath == null) {
473 this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
474 this.metrics.setSizeOfLogQueue(queue.size());
475 }
476 } catch (InterruptedException e) {
477 LOG.warn("Interrupted while reading edits", e);
478 }
479 return this.currentPath != null;
480 }
481
482
483
484
485
486
487
488 protected boolean openReader(int sleepMultiplier) {
489 try {
490 try {
491 this.reader = repLogReader.openReader(this.currentPath);
492 } catch (FileNotFoundException fnfe) {
493 if (this.replicationQueueInfo.isQueueRecovered()) {
494
495
496
497 List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
498 LOG.info("NB dead servers : " + deadRegionServers.size());
499 for (String curDeadServerName : deadRegionServers) {
500 Path deadRsDirectory =
501 new Path(manager.getLogDir().getParent(), curDeadServerName);
502 Path[] locs = new Path[] {
503 new Path(deadRsDirectory, currentPath.getName()),
504 new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
505 currentPath.getName()),
506 };
507 for (Path possibleLogLocation : locs) {
508 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
509 if (this.manager.getFs().exists(possibleLogLocation)) {
510
511 LOG.info("Log " + this.currentPath + " still exists at " +
512 possibleLogLocation);
513
514 return true;
515 }
516 }
517 }
518
519
520
521
522
523
524
525 throw new IOException("File from recovered queue is " +
526 "nowhere to be found", fnfe);
527 } else {
528
529 Path archivedLogLocation =
530 new Path(manager.getOldLogDir(), currentPath.getName());
531 if (this.manager.getFs().exists(archivedLogLocation)) {
532 currentPath = archivedLogLocation;
533 LOG.info("Log " + this.currentPath + " was moved to " +
534 archivedLogLocation);
535
536 this.openReader(sleepMultiplier);
537
538 }
539
540 }
541 }
542 } catch (IOException ioe) {
543 if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
544 LOG.warn(this.peerClusterZnode + " Got: ", ioe);
545 this.reader = null;
546 if (ioe.getCause() instanceof NullPointerException) {
547
548
549
550 LOG.warn("Got NPE opening reader, will retry.");
551 } else if (sleepMultiplier == this.maxRetriesMultiplier) {
552
553
554 LOG.warn("Waited too long for this file, considering dumping");
555 return !processEndOfFile();
556 }
557 }
558 return true;
559 }
560
561
562
563
564
565
566
567 private boolean isCurrentLogEmpty() {
568 return (this.repLogReader.getPosition() == 0 &&
569 !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
570 }
571
572
573
574
575
576
577
578 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
579 try {
580 LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
581 Thread.sleep(this.sleepForRetries * sleepMultiplier);
582 } catch (InterruptedException e) {
583 LOG.debug("Interrupted while sleeping between retries");
584 }
585 return sleepMultiplier < maxRetriesMultiplier;
586 }
587
588
589
590
591
592 protected void removeNonReplicableEdits(HLog.Entry entry) {
593 NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
594 List<KeyValue> kvs = entry.getEdit().getKeyValues();
595 for (int i = kvs.size()-1; i >= 0; i--) {
596 KeyValue kv = kvs.get(i);
597
598
599 if (scopes == null || !scopes.containsKey(kv.getFamily())) {
600 kvs.remove(i);
601 }
602 }
603 }
604
605
606
607
608
609
610
611 private int countDistinctRowKeys(WALEdit edit) {
612 List<KeyValue> kvs = edit.getKeyValues();
613 int distinctRowKeys = 1;
614 KeyValue lastKV = kvs.get(0);
615 for (int i = 0; i < edit.size(); i++) {
616 if (!kvs.get(i).matchingRow(lastKV)) {
617 distinctRowKeys++;
618 }
619 }
620 return distinctRowKeys;
621 }
622
623
624
625
626
627
628 protected void shipEdits(boolean currentWALisBeingWrittenTo) {
629 int sleepMultiplier = 1;
630 if (this.currentNbEntries == 0) {
631 LOG.warn("Was given 0 edits to ship");
632 return;
633 }
634 while (this.isActive()) {
635 if (!isPeerEnabled()) {
636 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
637 sleepMultiplier++;
638 }
639 continue;
640 }
641 try {
642 AdminService.BlockingInterface rrs = getRS();
643 ReplicationProtbufUtil.replicateWALEntry(rrs,
644 Arrays.copyOf(this.entriesArray, currentNbEntries));
645 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
646 this.manager.logPositionAndCleanOldLogs(this.currentPath,
647 this.peerClusterZnode, this.repLogReader.getPosition(),
648 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
649 this.lastLoggedPosition = this.repLogReader.getPosition();
650 }
651 this.totalReplicatedEdits += currentNbEntries;
652 this.metrics.shipBatch(this.currentNbOperations);
653 this.metrics.setAgeOfLastShippedOp(
654 this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
655 break;
656
657 } catch (IOException ioe) {
658
659 this.metrics.refreshAgeOfLastShippedOp();
660 if (ioe instanceof RemoteException) {
661 ioe = ((RemoteException) ioe).unwrapRemoteException();
662 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
663 if (ioe instanceof TableNotFoundException) {
664 if (sleepForRetries("A table is missing in the peer cluster. "
665 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
666 sleepMultiplier++;
667 }
668 }
669 } else {
670 if (ioe instanceof SocketTimeoutException) {
671
672
673
674 sleepForRetries("Encountered a SocketTimeoutException. Since the " +
675 "call to the remote cluster timed out, which is usually " +
676 "caused by a machine failure or a massive slowdown",
677 this.socketTimeoutMultiplier);
678 } else if (ioe instanceof ConnectException) {
679 LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
680 chooseSinks();
681 } else {
682 LOG.warn("Can't replicate because of a local or network error: ", ioe);
683 }
684 }
685
686 try {
687 boolean down;
688
689 do {
690 down = isSlaveDown();
691 if (down) {
692 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
693 sleepMultiplier++;
694 } else {
695 chooseSinks();
696 }
697 }
698 } while (this.isActive() && down );
699 } catch (InterruptedException e) {
700 LOG.debug("Interrupted while trying to contact the peer cluster");
701 }
702 }
703 }
704 }
705
706
707
708
709
710
711 protected boolean isPeerEnabled() {
712 return this.replicating.get() &&
713 this.zkHelper.getPeerEnabled(this.peerId);
714 }
715
716
717
718
719
720
721
722
723 protected boolean processEndOfFile() {
724 if (this.queue.size() != 0) {
725 this.currentPath = null;
726 this.repLogReader.finishCurrentFile();
727 this.reader = null;
728 return true;
729 } else if (this.replicationQueueInfo.isQueueRecovered()) {
730 this.manager.closeRecoveredQueue(this);
731 LOG.info("Finished recovering the queue");
732 this.running = false;
733 return true;
734 }
735 return false;
736 }
737
738 public void startup() {
739 String n = Thread.currentThread().getName();
740 Thread.UncaughtExceptionHandler handler =
741 new Thread.UncaughtExceptionHandler() {
742 public void uncaughtException(final Thread t, final Throwable e) {
743 LOG.error("Unexpected exception in ReplicationSource," +
744 " currentPath=" + currentPath, e);
745 }
746 };
747 Threads.setDaemonThreadRunning(
748 this, n + ".replicationSource," +
749 this.peerClusterZnode, handler);
750 }
751
752 public void terminate(String reason) {
753 terminate(reason, null);
754 }
755
756 public void terminate(String reason, Exception cause) {
757 if (cause == null) {
758 LOG.info("Closing source "
759 + this.peerClusterZnode + " because: " + reason);
760
761 } else {
762 LOG.error("Closing source " + this.peerClusterZnode
763 + " because an error occurred: " + reason, cause);
764 }
765 this.running = false;
766 Threads.shutdown(this, this.sleepForRetries);
767 }
768
769
770
771
772
773
774 private AdminService.BlockingInterface getRS() throws IOException {
775 if (this.currentPeers.size() == 0) {
776 throw new IOException(this.peerClusterZnode + " has 0 region servers");
777 }
778 ServerName address =
779 currentPeers.get(random.nextInt(this.currentPeers.size()));
780 return this.conn.getAdmin(address);
781 }
782
783
784
785
786
787
788 public boolean isSlaveDown() throws InterruptedException {
789 final CountDownLatch latch = new CountDownLatch(1);
790 Thread pingThread = new Thread() {
791 public void run() {
792 try {
793 AdminService.BlockingInterface rrs = getRS();
794
795 ProtobufUtil.getServerInfo(rrs);
796 latch.countDown();
797 } catch (IOException ex) {
798 if (ex instanceof RemoteException) {
799 ex = ((RemoteException) ex).unwrapRemoteException();
800 }
801 LOG.info("Slave cluster looks down: " + ex.getMessage());
802 }
803 }
804 };
805 pingThread.start();
806
807 boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
808 pingThread.interrupt();
809 return down;
810 }
811
812 public String getPeerClusterZnode() {
813 return this.peerClusterZnode;
814 }
815
816 public String getPeerClusterId() {
817 return this.peerId;
818 }
819
820 public Path getCurrentPath() {
821 return this.currentPath;
822 }
823
824 private boolean isActive() {
825 return !this.stopper.isStopped() && this.running;
826 }
827
828
829
830
831 public static class LogsComparator implements Comparator<Path> {
832
833 @Override
834 public int compare(Path o1, Path o2) {
835 return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
836 }
837
838
839
840
841
842
843
844 private long getTS(Path p) {
845 String[] parts = p.getName().split("\\.");
846 return Long.parseLong(parts[parts.length-1]);
847 }
848 }
849
850 @Override
851 public String getStats() {
852 String position = "N/A";
853 try {
854 if (this.reader != null) {
855 position = this.reader.getPosition()+"";
856 }
857 } catch (IOException ioe) {
858 }
859 return "Total replicated edits: " + totalReplicatedEdits +
860 ", currently replicating from: " + this.currentPath +
861 " at position: " + position;
862 }
863 }