View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.regionserver.HRegionServer;
48  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationListener;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeers;
55  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
56  import org.apache.hadoop.hbase.replication.ReplicationQueues;
57  import org.apache.hadoop.hbase.replication.ReplicationTracker;
58  
59  import com.google.common.util.concurrent.ThreadFactoryBuilder;
60  
61  /**
62   * This class is responsible to manage all the replication
63   * sources. There are two classes of sources:
64   * <li> Normal sources are persistent and one per peer cluster</li>
65   * <li> Old sources are recovered from a failed region server and our
66   * only goal is to finish replicating the HLog queue it had up in ZK</li>
67   *
68   * When a region server dies, this class uses a watcher to get notified and it
69   * tries to grab a lock in order to transfer all the queues in a local
70   * old source.
71   *
72   * This class implements the ReplicationListener interface so that it can track changes in
73   * replication state.
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSourceManager implements ReplicationListener {
77    private static final Log LOG =
78        LogFactory.getLog(ReplicationSourceManager.class);
79    // List of all the sources that read this RS's logs
80    private final List<ReplicationSourceInterface> sources;
81    // List of all the sources we got from died RSs
82    private final List<ReplicationSourceInterface> oldsources;
83    private final ReplicationQueues replicationQueues;
84    private final ReplicationTracker replicationTracker;
85    private final ReplicationPeers replicationPeers;
86    // UUID for this cluster
87    private final UUID clusterId;
88    // All about stopping
89    private final Server server;
90    // All logs we are currently tracking
91    private final Map<String, SortedSet<String>> hlogsById;
92    // Logs for recovered sources we are currently tracking
93    private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
94    private final Configuration conf;
95    private final FileSystem fs;
96    // The path to the latest log we saw, for new coming sources
97    private Path latestPath;
98    // Path to the hlogs directories
99    private final Path logDir;
100   // Path to the hlog archive
101   private final Path oldLogDir;
102   // The number of ms that we wait before moving znodes, HBASE-3596
103   private final long sleepBeforeFailover;
104   // Homemade executer service for replication
105   private final ThreadPoolExecutor executor;
106 
107   private final Random rand;
108 
109 
110   /**
111    * Creates a replication manager and sets the watch on all the other registered region servers
112    * @param replicationQueues the interface for manipulating replication queues
113    * @param replicationPeers
114    * @param replicationTracker
115    * @param conf the configuration to use
116    * @param server the server for this region server
117    * @param fs the file system to use
118    * @param logDir the directory that contains all hlog directories of live RSs
119    * @param oldLogDir the directory where old logs are archived
120    * @param clusterId
121    */
122   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
123       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
124       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
125       final Path oldLogDir, final UUID clusterId) {
126     //CopyOnWriteArrayList is thread-safe.
127     //Generally, reading is more than modifying.
128     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
129     this.replicationQueues = replicationQueues;
130     this.replicationPeers = replicationPeers;
131     this.replicationTracker = replicationTracker;
132     this.server = server;
133     this.hlogsById = new HashMap<String, SortedSet<String>>();
134     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
135     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
136     this.conf = conf;
137     this.fs = fs;
138     this.logDir = logDir;
139     this.oldLogDir = oldLogDir;
140     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
141     this.clusterId = clusterId;
142     this.replicationTracker.registerListener(this);
143     this.replicationPeers.getAllPeerIds();
144     // It's preferable to failover 1 RS at a time, but with good zk servers
145     // more could be processed at the same time.
146     int nbWorkers = conf.getInt("replication.executor.workers", 1);
147     // use a short 100ms sleep since this could be done inline with a RS startup
148     // even if we fail, other region servers can take care of it
149     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
150         100, TimeUnit.MILLISECONDS,
151         new LinkedBlockingQueue<Runnable>());
152     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
153     tfb.setNameFormat("ReplicationExecutor-%d");
154     this.executor.setThreadFactory(tfb.build());
155     this.rand = new Random();
156   }
157 
158   /**
159    * Provide the id of the peer and a log key and this method will figure which
160    * hlog it belongs to and will log, for this region server, the current
161    * position. It will also clean old logs from the queue.
162    * @param log Path to the log currently being replicated from
163    * replication status in zookeeper. It will also delete older entries.
164    * @param id id of the peer cluster
165    * @param position current location in the log
166    * @param queueRecovered indicates if this queue comes from another region server
167    * @param holdLogInZK if true then the log is retained in ZK
168    */
169   public void logPositionAndCleanOldLogs(Path log, String id, long position,
170       boolean queueRecovered, boolean holdLogInZK) {
171     String fileName = log.getName();
172     this.replicationQueues.setLogPosition(id, fileName, position);
173     if (holdLogInZK) {
174      return;
175     }
176     cleanOldLogs(fileName, id, queueRecovered);
177   }
178 
179   /**
180    * Cleans a log file and all older files from ZK. Called when we are sure that a
181    * log file is closed and has no more entries.
182    * @param key Path to the log
183    * @param id id of the peer cluster
184    * @param queueRecovered Whether this is a recovered queue
185    */
186   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
187     if (queueRecovered) {
188       SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
189       if (hlogs != null && !hlogs.first().equals(key)) {
190         cleanOldLogs(hlogs, key, id);
191       }
192     } else {
193       synchronized (this.hlogsById) {
194         SortedSet<String> hlogs = hlogsById.get(id);
195         if (!hlogs.first().equals(key)) {
196           cleanOldLogs(hlogs, key, id);
197         }
198       }
199     }
200  }
201   
202   private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
203     SortedSet<String> hlogSet = hlogs.headSet(key);
204     LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
205     for (String hlog : hlogSet) {
206       this.replicationQueues.removeLog(id, hlog);
207     }
208     hlogSet.clear();
209   }
210 
211   /**
212    * Adds a normal source per registered peer cluster and tries to process all
213    * old region server hlog queues
214    */
215   protected void init() throws IOException, ReplicationException {
216     for (String id : this.replicationPeers.getPeerIds()) {
217       addSource(id);
218     }
219     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
220     if (currentReplicators == null || currentReplicators.size() == 0) {
221       return;
222     }
223     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
224     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
225         + otherRegionServers);
226 
227     // Look if there's anything to process after a restart
228     for (String rs : currentReplicators) {
229       if (!otherRegionServers.contains(rs)) {
230         transferQueues(rs);
231       }
232     }
233   }
234 
235   /**
236    * Add a new normal source to this region server
237    * @param id the id of the peer cluster
238    * @return the source that was created
239    * @throws IOException
240    */
241   protected ReplicationSourceInterface addSource(String id) throws IOException,
242       ReplicationException {
243     ReplicationPeerConfig peerConfig
244       = replicationPeers.getReplicationPeerConfig(id);
245     ReplicationPeer peer = replicationPeers.getPeer(id);
246     ReplicationSourceInterface src =
247         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
248           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
249     synchronized (this.hlogsById) {
250       this.sources.add(src);
251       this.hlogsById.put(id, new TreeSet<String>());
252       // Add the latest hlog to that source's queue
253       if (this.latestPath != null) {
254         String name = this.latestPath.getName();
255         this.hlogsById.get(id).add(name);
256         try {
257           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
258         } catch (ReplicationException e) {
259           String message =
260               "Cannot add log to queue when creating a new source, queueId="
261                   + src.getPeerClusterZnode() + ", filename=" + name;
262           server.stop(message);
263           throw e;
264         }
265         src.enqueueLog(this.latestPath);
266       }
267     }
268     src.startup();
269     return src;
270   }
271 
272   /**
273    * Delete a complete queue of hlogs associated with a peer cluster
274    * @param peerId Id of the peer cluster queue of hlogs to delete
275    */
276   public void deleteSource(String peerId, boolean closeConnection) {
277     this.replicationQueues.removeQueue(peerId);
278     if (closeConnection) {
279       this.replicationPeers.peerRemoved(peerId);
280     }
281   }
282 
283   /**
284    * Terminate the replication on this region server
285    */
286   public void join() {
287     this.executor.shutdown();
288     if (this.sources.size() == 0) {
289       this.replicationQueues.removeAllQueues();
290     }
291     for (ReplicationSourceInterface source : this.sources) {
292       source.terminate("Region server is closing");
293     }
294   }
295 
296   /**
297    * Get a copy of the hlogs of the first source on this rs
298    * @return a sorted set of hlog names
299    */
300   protected Map<String, SortedSet<String>> getHLogs() {
301     return Collections.unmodifiableMap(hlogsById);
302   }
303   
304   /**
305    * Get a copy of the hlogs of the recovered sources on this rs
306    * @return a sorted set of hlog names
307    */
308   protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
309     return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
310   }
311 
312   /**
313    * Get a list of all the normal sources of this rs
314    * @return lis of all sources
315    */
316   public List<ReplicationSourceInterface> getSources() {
317     return this.sources;
318   }
319 
320   /**
321    * Get a list of all the old sources of this rs
322    * @return list of all old sources
323    */
324   public List<ReplicationSourceInterface> getOldSources() {
325     return this.oldsources;
326   }
327 
328   void preLogRoll(Path newLog) throws IOException {
329     synchronized (this.hlogsById) {
330       String name = newLog.getName();
331       for (ReplicationSourceInterface source : this.sources) {
332         try {
333           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
334         } catch (ReplicationException e) {
335           throw new IOException("Cannot add log to replication queue with id="
336               + source.getPeerClusterZnode() + ", filename=" + name, e);
337         }
338       }
339       for (SortedSet<String> hlogs : this.hlogsById.values()) {
340         if (this.sources.isEmpty()) {
341           // If there's no slaves, don't need to keep the old hlogs since
342           // we only consider the last one when a new slave comes in
343           hlogs.clear();
344         }
345         hlogs.add(name);
346       }
347     }
348 
349     this.latestPath = newLog;
350   }
351 
352   void postLogRoll(Path newLog) throws IOException {
353     // This only updates the sources we own, not the recovered ones
354     for (ReplicationSourceInterface source : this.sources) {
355       source.enqueueLog(newLog);
356     }
357   }
358 
359   /**
360    * Factory method to create a replication source
361    * @param conf the configuration to use
362    * @param fs the file system to use
363    * @param manager the manager to use
364    * @param server the server object for this region server
365    * @param peerId the id of the peer cluster
366    * @return the created source
367    * @throws IOException
368    */
369   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
370       final FileSystem fs, final ReplicationSourceManager manager,
371       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
372       final Server server, final String peerId, final UUID clusterId,
373       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
374           throws IOException {
375     RegionServerCoprocessorHost rsServerHost = null;
376     if (server instanceof HRegionServer) {
377       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
378     }
379     ReplicationSourceInterface src;
380     try {
381       @SuppressWarnings("rawtypes")
382       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
383           ReplicationSource.class.getCanonicalName()));
384       src = (ReplicationSourceInterface) c.newInstance();
385     } catch (Exception e) {
386       LOG.warn("Passed replication source implementation throws errors, " +
387           "defaulting to ReplicationSource", e);
388       src = new ReplicationSource();
389     }
390 
391     ReplicationEndpoint replicationEndpoint = null;
392     try {
393       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
394       if (replicationEndpointImpl == null) {
395         // Default to HBase inter-cluster replication endpoint
396         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
397       }
398       @SuppressWarnings("rawtypes")
399       Class c = Class.forName(replicationEndpointImpl);
400       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
401       if(rsServerHost != null) {
402         ReplicationEndpoint newReplicationEndPoint = rsServerHost
403             .postCreateReplicationEndPoint(replicationEndpoint);
404         if(newReplicationEndPoint != null) {
405           // Override the newly created endpoint from the hook with configured end point
406           replicationEndpoint = newReplicationEndPoint;
407         }
408       }
409     } catch (Exception e) {
410       LOG.warn("Passed replication endpoint implementation throws errors", e);
411       throw new IOException(e);
412     }
413 
414     MetricsSource metrics = new MetricsSource(peerId);
415     // init replication source
416     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
417       clusterId, replicationEndpoint, metrics);
418 
419     // init replication endpoint
420     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
421       fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
422 
423     return src;
424   }
425 
426   /**
427    * Transfer all the queues of the specified to this region server.
428    * First it tries to grab a lock and if it works it will move the
429    * znodes and finally will delete the old znodes.
430    *
431    * It creates one old source for any type of source of the old rs.
432    * @param rsZnode
433    */
434   private void transferQueues(String rsZnode) {
435     NodeFailoverWorker transfer =
436         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
437             this.clusterId);
438     try {
439       this.executor.execute(transfer);
440     } catch (RejectedExecutionException ex) {
441       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
442     }
443   }
444 
445   /**
446    * Clear the references to the specified old source
447    * @param src source to clear
448    */
449   public void closeRecoveredQueue(ReplicationSourceInterface src) {
450     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
451     this.oldsources.remove(src);
452     deleteSource(src.getPeerClusterZnode(), false);
453     this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
454   }
455 
456   /**
457    * Thie method first deletes all the recovered sources for the specified
458    * id, then deletes the normal source (deleting all related data in ZK).
459    * @param id The id of the peer cluster
460    */
461   public void removePeer(String id) {
462     LOG.info("Closing the following queue " + id + ", currently have "
463         + sources.size() + " and another "
464         + oldsources.size() + " that were recovered");
465     String terminateMessage = "Replication stream was removed by a user";
466     ReplicationSourceInterface srcToRemove = null;
467     List<ReplicationSourceInterface> oldSourcesToDelete =
468         new ArrayList<ReplicationSourceInterface>();
469     // First close all the recovered sources for this peer
470     for (ReplicationSourceInterface src : oldsources) {
471       if (id.equals(src.getPeerClusterId())) {
472         oldSourcesToDelete.add(src);
473       }
474     }
475     for (ReplicationSourceInterface src : oldSourcesToDelete) {
476       src.terminate(terminateMessage);
477       closeRecoveredQueue((src));
478     }
479     LOG.info("Number of deleted recovered sources for " + id + ": "
480         + oldSourcesToDelete.size());
481     // Now look for the one on this cluster
482     for (ReplicationSourceInterface src : this.sources) {
483       if (id.equals(src.getPeerClusterId())) {
484         srcToRemove = src;
485         break;
486       }
487     }
488     if (srcToRemove == null) {
489       LOG.error("The queue we wanted to close is missing " + id);
490       return;
491     }
492     srcToRemove.terminate(terminateMessage);
493     this.sources.remove(srcToRemove);
494     deleteSource(id, true);
495   }
496 
497   @Override
498   public void regionServerRemoved(String regionserver) {
499     transferQueues(regionserver);
500   }
501 
502   @Override
503   public void peerRemoved(String peerId) {
504     removePeer(peerId);
505   }
506 
507   @Override
508   public void peerListChanged(List<String> peerIds) {
509     for (String id : peerIds) {
510       try {
511         boolean added = this.replicationPeers.peerAdded(id);
512         if (added) {
513           addSource(id);
514         }
515       } catch (Exception e) {
516         LOG.error("Error while adding a new peer", e);
517       }
518     }
519   }
520 
521   /**
522    * Class responsible to setup new ReplicationSources to take care of the
523    * queues from dead region servers.
524    */
525   class NodeFailoverWorker extends Thread {
526 
527     private String rsZnode;
528     private final ReplicationQueues rq;
529     private final ReplicationPeers rp;
530     private final UUID clusterId;
531 
532     /**
533      *
534      * @param rsZnode
535      */
536     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
537         final ReplicationPeers replicationPeers, final UUID clusterId) {
538       super("Failover-for-"+rsZnode);
539       this.rsZnode = rsZnode;
540       this.rq = replicationQueues;
541       this.rp = replicationPeers;
542       this.clusterId = clusterId;
543     }
544 
545     @Override
546     public void run() {
547       if (this.rq.isThisOurZnode(rsZnode)) {
548         return;
549       }
550       // Wait a bit before transferring the queues, we may be shutting down.
551       // This sleep may not be enough in some cases.
552       try {
553         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
554       } catch (InterruptedException e) {
555         LOG.warn("Interrupted while waiting before transferring a queue.");
556         Thread.currentThread().interrupt();
557       }
558       // We try to lock that rs' queue directory
559       if (server.isStopped()) {
560         LOG.info("Not transferring queue since we are shutting down");
561         return;
562       }
563       SortedMap<String, SortedSet<String>> newQueues = null;
564 
565       newQueues = this.rq.claimQueues(rsZnode);
566 
567       // Copying over the failed queue is completed.
568       if (newQueues.isEmpty()) {
569         // We either didn't get the lock or the failed region server didn't have any outstanding
570         // HLogs to replicate, so we are done.
571         return;
572       }
573 
574       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
575         String peerId = entry.getKey();
576         try {
577           // there is not an actual peer defined corresponding to peerId for the failover.
578           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
579           String actualPeerId = replicationQueueInfo.getPeerId();
580           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
581           ReplicationPeerConfig peerConfig = null;
582           try {
583             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
584           } catch (ReplicationException ex) {
585             LOG.warn("Received exception while getting replication peer config, skipping replay"
586                 + ex);
587           }
588           if (peer == null || peerConfig == null) {
589             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
590             continue;
591           }
592 
593           ReplicationSourceInterface src =
594               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
595                 server, peerId, this.clusterId, peerConfig, peer);
596           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
597             src.terminate("Recovered queue doesn't belong to any current peer");
598             break;
599           }
600           oldsources.add(src);
601           SortedSet<String> hlogsSet = entry.getValue();
602           for (String hlog : hlogsSet) {
603             src.enqueueLog(new Path(oldLogDir, hlog));
604           }
605           src.startup();
606           hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
607         } catch (IOException e) {
608           // TODO manage it
609           LOG.error("Failed creating a source", e);
610         }
611       }
612     }
613   }
614 
615   /**
616    * Get the directory where hlogs are archived
617    * @return the directory where hlogs are archived
618    */
619   public Path getOldLogDir() {
620     return this.oldLogDir;
621   }
622 
623   /**
624    * Get the directory where hlogs are stored by their RSs
625    * @return the directory where hlogs are stored by their RSs
626    */
627   public Path getLogDir() {
628     return this.logDir;
629   }
630 
631   /**
632    * Get the handle on the local file system
633    * @return Handle on the local file system
634    */
635   public FileSystem getFs() {
636     return this.fs;
637   }
638 
639   /**
640    * Get a string representation of all the sources' metrics
641    */
642   public String getStats() {
643     StringBuffer stats = new StringBuffer();
644     for (ReplicationSourceInterface source : sources) {
645       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
646       stats.append(source.getStats() + "\n");
647     }
648     for (ReplicationSourceInterface oldSource : oldsources) {
649       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
650       stats.append(oldSource.getStats()+ "\n");
651     }
652     return stats.toString();
653   }
654 }