View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.fs.PathFilter;
41  import org.apache.hadoop.hbase.Chore;
42  import org.apache.hadoop.hbase.HBaseFileSystem;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.Stoppable;
45  import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
46  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
47  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
48  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
49  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.FSUtils;
52  import org.apache.hadoop.hbase.util.Threads;
53  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
54  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
55  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
57  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58  import org.apache.hadoop.util.StringUtils;
59  import org.apache.zookeeper.AsyncCallback;
60  import org.apache.zookeeper.CreateMode;
61  import org.apache.zookeeper.KeeperException;
62  import org.apache.zookeeper.KeeperException.NoNodeException;
63  import org.apache.zookeeper.ZooDefs.Ids;
64  import org.apache.zookeeper.data.Stat;
65  
66  import com.google.common.base.Strings;
67  
68  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*;
69  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*;
70  
71  /**
72   * Distributes the task of log splitting to the available region servers.
73   * Coordination happens via zookeeper. For every log file that has to be split a
74   * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task.
75   *
76   * SplitLogManager monitors the task znodes that it creates using the
77   * timeoutMonitor thread. If a task's progress is slow then
78   * resubmit(String, boolean) will take away the task from the owner
79   * {@link SplitLogWorker} and the task will be
80   * upforgrabs again. When the task is done then the task's znode is deleted by
81   * SplitLogManager.
82   *
83   * Clients call {@link #splitLogDistributed(Path)} to split a region server's
84   * log files. The caller thread waits in this method until all the log files
85   * have been split.
86   *
87   * All the zookeeper calls made by this class are asynchronous. This is mainly
88   * to help reduce response time seen by the callers.
89   *
90   * There is race in this design between the SplitLogManager and the
91   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
92   * already been completed by a SplitLogWorker. We rely on the idempotency of
93   * the log splitting task for correctness.
94   *
95   * It is also assumed that every log splitting task is unique and once
96   * completed (either with success or with error) it will be not be submitted
97   * again. If a task is resubmitted then there is a risk that old "delete task"
98   * can delete the re-submission.
99   */
100 public class SplitLogManager extends ZooKeeperListener {
101   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
102 
103   private final Stoppable stopper;
104   private final MasterServices master;
105   private final String serverName;
106   private final TaskFinisher taskFinisher;
107   private FileSystem fs;
108   private Configuration conf;
109 
110   private long zkretries;
111   private long resubmit_threshold;
112   private long timeout;
113   private long unassignedTimeout;
114   private long lastTaskCreateTime = Long.MAX_VALUE;
115   public boolean ignoreZKDeleteForTesting = false;
116 
117   private final ConcurrentMap<String, Task> tasks =
118     new ConcurrentHashMap<String, Task>();
119   private TimeoutMonitor timeoutMonitor;
120 
121   private Set<String> deadWorkers = null;
122   private final Object deadWorkersLock = new Object();
123 
124   private Set<String> failedDeletions = null;
125 
126   /**
127    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
128    * Stoppable, String, TaskFinisher)} that provides a task finisher for
129    * copying recovered edits to their final destination. The task finisher
130    * has to be robust because it can be arbitrarily restarted or called
131    * multiple times.
132    * 
133    * @param zkw
134    * @param conf
135    * @param stopper
136    * @param serverName
137    */
138   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
139       Stoppable stopper, MasterServices master, String serverName) {
140     this(zkw, conf, stopper, master, serverName, new TaskFinisher() {
141       @Override
142       public Status finish(String workerName, String logfile) {
143         try {
144           HLogSplitter.finishSplitLogFile(logfile, conf);
145         } catch (IOException e) {
146           LOG.warn("Could not finish splitting of log file " + logfile, e);
147           return Status.ERR;
148         }
149         return Status.DONE;
150       }
151     });
152   }
153 
154   /**
155    * Its OK to construct this object even when region-servers are not online. It
156    * does lookup the orphan tasks in zk but it doesn't block waiting for them
157    * to be done.
158    *
159    * @param zkw
160    * @param conf
161    * @param stopper
162    * @param serverName
163    * @param tf task finisher 
164    */
165   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
166       Stoppable stopper, MasterServices master, String serverName, TaskFinisher tf) {
167     super(zkw);
168     this.taskFinisher = tf;
169     this.conf = conf;
170     this.stopper = stopper;
171     this.master = master;
172     this.zkretries = conf.getLong("hbase.splitlog.zk.retries",
173         ZKSplitLog.DEFAULT_ZK_RETRIES);
174     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit",
175         ZKSplitLog.DEFAULT_MAX_RESUBMIT);
176     this.timeout = conf.getInt("hbase.splitlog.manager.timeout",
177         ZKSplitLog.DEFAULT_TIMEOUT);
178     this.unassignedTimeout =
179       conf.getInt("hbase.splitlog.manager.unassigned.timeout",
180         ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT);
181     LOG.info("timeout = " + timeout);
182     LOG.info("unassigned timeout = " + unassignedTimeout);
183     LOG.info("resubmit threshold = " + this.resubmit_threshold);
184 
185     this.serverName = serverName;
186     this.timeoutMonitor = new TimeoutMonitor(
187         conf.getInt("hbase.splitlog.manager.timeoutmonitor.period",
188             1000),
189         stopper);
190 
191     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
192   }
193 
194   public void finishInitialization(boolean masterRecovery) {
195     if (!masterRecovery) {
196       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
197           + ".splitLogManagerTimeoutMonitor");
198     }
199     // Watcher can be null during tests with Mock'd servers.
200     if (this.watcher != null) {
201       this.watcher.registerListener(this);
202       lookForOrphans();
203     }
204   }
205 
206   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
207     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
208     for (Path hLogDir : logDirs) {
209       this.fs = hLogDir.getFileSystem(conf);
210       if (!fs.exists(hLogDir)) {
211         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
212         continue;
213       }
214       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
215       if (logfiles == null || logfiles.length == 0) {
216         LOG.info(hLogDir + " is empty dir, no logs to split");
217       } else {
218         for (FileStatus status : logfiles)
219           fileStatus.add(status);
220       }
221     }
222     FileStatus[] a = new FileStatus[fileStatus.size()];
223     return fileStatus.toArray(a);
224   }
225 
226   /**
227    * @param logDir
228    *            one region sever hlog dir path in .logs
229    * @throws IOException
230    *             if there was an error while splitting any log file
231    * @return cumulative size of the logfiles split
232    * @throws IOException
233    */
234   public long splitLogDistributed(final Path logDir) throws IOException {
235     List<Path> logDirs = new ArrayList<Path>();
236     logDirs.add(logDir);
237     return splitLogDistributed(logDirs);
238   }
239 
240   /**
241    * The caller will block until all the log files of the given region server
242    * have been processed - successfully split or an error is encountered - by an
243    * available worker region server. This method must only be called after the
244    * region servers have been brought online.
245    *
246    * @param logDirs
247    * @throws IOException
248    *          if there was an error while splitting any log file
249    * @return cumulative size of the logfiles split
250    */
251   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
252     return splitLogDistributed(logDirs, null);
253   }
254 
255   /**
256    * The caller will block until all the META log files of the given region server
257    * have been processed - successfully split or an error is encountered - by an
258    * available worker region server. This method must only be called after the
259    * region servers have been brought online.
260    *
261    * @param logDirs List of log dirs to split
262    * @param filter the Path filter to select specific files for considering
263    * @throws IOException If there was an error while splitting any log file
264    * @return cumulative size of the logfiles split
265    */
266   public long splitLogDistributed(final List<Path> logDirs, PathFilter filter) 
267       throws IOException {
268     MonitoredTask status = TaskMonitor.get().createStatus(
269           "Doing distributed log split in " + logDirs);
270     FileStatus[] logfiles = getFileList(logDirs, filter);
271     status.setStatus("Checking directory contents...");
272     LOG.debug("Scheduling batch of logs to split");
273     tot_mgr_log_split_batch_start.incrementAndGet();
274     LOG.info("started splitting logs in " + logDirs);
275     long t = EnvironmentEdgeManager.currentTimeMillis();
276     long totalSize = 0;
277     TaskBatch batch = new TaskBatch();
278     for (FileStatus lf : logfiles) {
279       // TODO If the log file is still being written to - which is most likely
280       // the case for the last log file - then its length will show up here
281       // as zero. The size of such a file can only be retrieved after
282       // recover-lease is done. totalSize will be under in most cases and the
283       // metrics that it drives will also be under-reported.
284       totalSize += lf.getLen();
285       if (enqueueSplitTask(lf.getPath().toString(), batch) == false) {
286         throw new IOException("duplicate log split scheduled for "
287             + lf.getPath());
288       }
289     }
290     waitForSplittingCompletion(batch, status);
291     if (batch.done != batch.installed) {
292       batch.isDead = true;
293       tot_mgr_log_split_batch_err.incrementAndGet();
294       LOG.warn("error while splitting logs in " + logDirs +
295       " installed = " + batch.installed + " but only " + batch.done + " done");
296       String msg = "error or interrupted while splitting logs in "
297         + logDirs + " Task = " + batch;
298       status.abort(msg);
299       throw new IOException(msg);
300     }
301     for(Path logDir: logDirs){
302       status.setStatus("Cleaning up log directory...");
303       try {
304         if (fs.exists(logDir) && !HBaseFileSystem.deleteFileFromFileSystem(fs, logDir)) {
305           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
306         }
307       } catch (IOException ioe) {
308         FileStatus[] files = fs.listStatus(logDir);
309         if (files != null && files.length > 0) {
310           LOG.warn("returning success without actually splitting and " +
311               "deleting all the log files in path " + logDir);
312         } else {
313           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
314         }
315       }
316       tot_mgr_log_split_batch_success.incrementAndGet();
317     }
318     String msg = "finished splitting (more than or equal to) " + totalSize +
319         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
320         (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
321     status.markComplete(msg);
322     LOG.info(msg);
323     return totalSize;
324   }
325 
326   /**
327    * Add a task entry to splitlog znode if it is not already there.
328    *
329    * @param taskname the path of the log to be split
330    * @param batch the batch this task belongs to
331    * @return true if a new entry is created, false if it is already there.
332    */
333   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
334     tot_mgr_log_split_start.incrementAndGet();
335     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
336     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
337     Task oldtask = createTaskIfAbsent(path, batch);
338     if (oldtask == null) {
339       // publish the task in zk
340       createNode(path, zkretries);
341       return true;
342     }
343     return false;
344   }
345 
346   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
347     synchronized (batch) {
348       while ((batch.done + batch.error) != batch.installed) {
349         try {
350           status.setStatus("Waiting for distributed tasks to finish. "
351               + " scheduled=" + batch.installed
352               + " done=" + batch.done
353               + " error=" + batch.error);
354           int remaining = batch.installed - (batch.done + batch.error);
355           int actual = activeTasks(batch);
356           if (remaining != actual) {
357             LOG.warn("Expected " + remaining
358               + " active tasks, but actually there are " + actual);
359           }
360           int remainingInZK = remainingTasksInZK();
361           if (remainingInZK >= 0 && actual > remainingInZK) {
362             LOG.warn("Expected at least" + actual
363               + " tasks in ZK, but actually there are " + remainingInZK);
364           }
365           if (remainingInZK == 0 || actual == 0) {
366             LOG.warn("No more task remaining (ZK or task map), splitting "
367               + "should have completed. Remaining tasks in ZK " + remainingInZK
368               + ", active tasks in map " + actual);
369             if (remainingInZK == 0 && actual == 0) {
370               return;
371             }
372           }
373           batch.wait(100);
374           if (stopper.isStopped()) {
375             LOG.warn("Stopped while waiting for log splits to be completed");
376             return;
377           }
378         } catch (InterruptedException e) {
379           LOG.warn("Interrupted while waiting for log splits to be completed");
380           Thread.currentThread().interrupt();
381           return;
382         }
383       }
384     }
385   }
386 
387   private int activeTasks(final TaskBatch batch) {
388     int count = 0;
389     for (Task t: tasks.values()) {
390       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
391         count++;
392       }
393     }
394     return count;
395   }
396 
397   private int remainingTasksInZK() {
398     int count = 0;
399     try {
400       List<String> tasks =
401         ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
402       if (tasks != null) {
403         for (String t: tasks) {
404           if (!ZKSplitLog.isRescanNode(watcher, t)) {
405             count++;
406           }
407         }
408       }
409     } catch (KeeperException ke) {
410       LOG.warn("Failed to check remaining tasks", ke);
411       count = -1;
412     }
413     return count;
414   }
415 
416   private void setDone(String path, TerminationStatus status) {
417     Task task = tasks.get(path);
418     if (task == null) {
419       if (!ZKSplitLog.isRescanNode(watcher, path)) {
420         tot_mgr_unacquired_orphan_done.incrementAndGet();
421         LOG.debug("unacquired orphan task is done " + path);
422       }
423     } else {
424       synchronized (task) {
425         if (task.status == IN_PROGRESS) {
426           if (status == SUCCESS) {
427             tot_mgr_log_split_success.incrementAndGet();
428             LOG.info("Done splitting " + path);
429           } else {
430             tot_mgr_log_split_err.incrementAndGet();
431             LOG.warn("Error splitting " + path);
432           }
433           task.status = status;
434           if (task.batch != null) {
435             synchronized (task.batch) {
436               if (status == SUCCESS) {
437                 task.batch.done++;
438               } else {
439                 task.batch.error++;
440               }
441               task.batch.notify();
442             }
443           }
444         }
445       }
446     }
447     // delete the task node in zk. It's an async
448     // call and no one is blocked waiting for this node to be deleted. All
449     // task names are unique (log.<timestamp>) there is no risk of deleting
450     // a future task.
451     // if a deletion fails, TimeoutMonitor will retry the same deletion later
452     deleteNode(path, zkretries);
453     return;
454   }
455 
456   private void createNode(String path, Long retry_count) {
457     ZKUtil.asyncCreate(this.watcher, path,
458         TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(),
459         retry_count);
460     tot_mgr_node_create_queued.incrementAndGet();
461     return;
462   }
463 
464   private void createNodeSuccess(String path) {
465     LOG.debug("put up splitlog task at znode " + path);
466     getDataSetWatch(path, zkretries);
467   }
468 
469   private void createNodeFailure(String path) {
470     // TODO the Manager should split the log locally instead of giving up
471     LOG.warn("failed to create task node" + path);
472     setDone(path, FAILURE);
473   }
474 
475 
476   private void getDataSetWatch(String path, Long retry_count) {
477     this.watcher.getRecoverableZooKeeper().getZooKeeper().
478         getData(path, this.watcher,
479         new GetDataAsyncCallback(true), retry_count);
480     tot_mgr_get_data_queued.incrementAndGet();
481   }
482 
483   private void tryGetDataSetWatch(String path) {
484     // A negative retry count will lead to ignoring all error processing.
485     this.watcher.getRecoverableZooKeeper().getZooKeeper().
486         getData(path, this.watcher,
487         new GetDataAsyncCallback(false), new Long(-1) /* retry count */);
488     tot_mgr_get_data_queued.incrementAndGet();
489   }
490 
491   private void getDataSetWatchSuccess(String path, byte[] data, int version) {
492     if (data == null) {
493       if (version == Integer.MIN_VALUE) {
494         // assume all done. The task znode suddenly disappeared.
495         setDone(path, SUCCESS);
496         return;
497       }
498       tot_mgr_null_data.incrementAndGet();
499       LOG.fatal("logic error - got null data " + path);
500       setDone(path, FAILURE);
501       return;
502     }
503     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
504     // LOG.debug("set watch on " + path + " got data " + new String(data));
505     if (TaskState.TASK_UNASSIGNED.equals(data)) {
506       LOG.debug("task not yet acquired " + path + " ver = " + version);
507       handleUnassignedTask(path);
508     } else if (TaskState.TASK_OWNED.equals(data)) {
509       heartbeat(path, version,
510           TaskState.TASK_OWNED.getWriterName(data));
511     } else if (TaskState.TASK_RESIGNED.equals(data)) {
512       LOG.info("task " + path + " entered state " + new String(data));
513       resubmitOrFail(path, FORCE);
514     } else if (TaskState.TASK_DONE.equals(data)) {
515       LOG.info("task " + path + " entered state " + new String(data));
516       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
517         if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data),
518             ZKSplitLog.getFileName(path)) == Status.DONE) {
519           setDone(path, SUCCESS);
520         } else {
521           resubmitOrFail(path, CHECK);
522         }
523       } else {
524         setDone(path, SUCCESS);
525       }
526     } else if (TaskState.TASK_ERR.equals(data)) {
527       LOG.info("task " + path + " entered state " + new String(data));
528       resubmitOrFail(path, CHECK);
529     } else {
530       LOG.fatal("logic error - unexpected zk state for path = " + path
531           + " data = " + new String(data));
532       setDone(path, FAILURE);
533     }
534   }
535 
536   private void getDataSetWatchFailure(String path) {
537     LOG.warn("failed to set data watch " + path);
538     setDone(path, FAILURE);
539   }
540 
541   /**
542    * It is possible for a task to stay in UNASSIGNED state indefinitely - say
543    * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
544    * state but it dies before it could create the RESCAN task node to signal
545    * the SplitLogWorkers to pick up the task. To prevent this scenario the
546    * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
547    *
548    * @param path
549    */
550   private void handleUnassignedTask(String path) {
551     if (ZKSplitLog.isRescanNode(watcher, path)) {
552       return;
553     }
554     Task task = findOrCreateOrphanTask(path);
555     if (task.isOrphan() && (task.incarnation == 0)) {
556       LOG.info("resubmitting unassigned orphan task " + path);
557       // ignore failure to resubmit. The timeout-monitor will handle it later
558       // albeit in a more crude fashion
559       resubmit(path, task, FORCE);
560     }
561   }
562 
563   /**
564    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
565    * @param statusCode integer value of a ZooKeeper exception code
566    * @param action description message about the retried action
567    * @return true when need to abandon retries, otherwise false
568    */
569   private boolean shouldAbandonRetries(int statusCode, String action) {
570     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
571       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
572           + "action=" + action);
573       return true;
574     }
575     return false;
576   }
577 
578   private void heartbeat(String path, int new_version,
579       String workerName) {
580     Task task = findOrCreateOrphanTask(path);
581     if (new_version != task.last_version) {
582       if (task.isUnassigned()) {
583         LOG.info("task " + path + " acquired by " + workerName);
584       }
585       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(),
586           new_version, workerName);
587       tot_mgr_heartbeat.incrementAndGet();
588     } else {
589       // duplicate heartbeats - heartbeats w/o zk node version
590       // changing - are possible. The timeout thread does
591       // getDataSetWatch() just to check whether a node still
592       // exists or not
593     }
594     return;
595   }
596 
597   private boolean resubmit(String path, Task task,
598       ResubmitDirective directive) {
599     // its ok if this thread misses the update to task.deleted. It will
600     // fail later
601     if (task.status != IN_PROGRESS) {
602       return false;
603     }
604     int version;
605     if (directive != FORCE) {
606       // We're going to resubmit:
607       // 1) immediately if the worker server is now marked as dead
608       // 2) after a configurable timeout if the server is not marked as dead but has still not
609       // finished the task. This allows to continue if the worker cannot actually handle it,
610       // for any reason.
611       final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
612       ServerName curWorker = null;
613       if (!Strings.isNullOrEmpty(task.cur_worker_name)) {
614         try {
615           curWorker = ServerName.parseServerName(task.cur_worker_name);
616         } catch (IllegalArgumentException ie) {
617           LOG.error("Got invalid server name:" + task.cur_worker_name + " - task for path:" + path
618               + " won't be resubmitted before timeout");
619         }
620       } else {
621         LOG.error("Got empty/null server name:" + task.cur_worker_name + " - task for path:" + path
622             + " won't be resubmitted before timeout");
623       }
624       final boolean alive =
625           (master.getServerManager() != null && curWorker != null) ? master.getServerManager()
626               .isServerOnline(curWorker) : true;
627       if (alive && time < timeout) {
628         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server "
629             + task.cur_worker_name + " is not marked as dead, we waited for " + time
630             + " while the timeout is " + timeout);
631         return false;
632       }
633       if (task.unforcedResubmits >= resubmit_threshold) {
634         if (!task.resubmitThresholdReached) {
635           task.resubmitThresholdReached = true;
636           tot_mgr_resubmit_threshold_reached.incrementAndGet();
637           LOG.info("Skipping resubmissions of task " + path +
638               " because threshold " + resubmit_threshold + " reached");
639         }
640         return false;
641       }
642       // race with heartbeat() that might be changing last_version
643       version = task.last_version;
644     } else {
645       version = -1;
646     }
647     LOG.info("resubmitting task " + path);
648     task.incarnation++;
649     try {
650       // blocking zk call but this is done from the timeout thread
651       if (ZKUtil.setData(this.watcher, path,
652           TaskState.TASK_UNASSIGNED.get(serverName),
653           version) == false) {
654         LOG.debug("failed to resubmit task " + path +
655             " version changed");
656         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
657         return false;
658       }
659     } catch (NoNodeException e) {
660       LOG.warn("failed to resubmit because znode doesn't exist " + path +
661           " task done (or forced done by removing the znode)");
662       getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
663       return false;
664     } catch (KeeperException.BadVersionException e) {
665       LOG.debug("failed to resubmit task " + path +
666           " version changed");
667       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
668       return false;
669     } catch (KeeperException e) {
670       tot_mgr_resubmit_failed.incrementAndGet();
671       LOG.warn("failed to resubmit " + path, e);
672       return false;
673     }
674     // don't count forced resubmits
675     if (directive != FORCE) {
676       task.unforcedResubmits++;
677     }
678     task.setUnassigned();
679     createRescanNode(Long.MAX_VALUE);
680     tot_mgr_resubmit.incrementAndGet();
681     return true;
682   }
683 
684   private void resubmitOrFail(String path, ResubmitDirective directive) {
685     if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
686       setDone(path, FAILURE);
687     }
688   }
689 
690   private void deleteNode(String path, Long retries) {
691     tot_mgr_node_delete_queued.incrementAndGet();
692     // Once a task znode is ready for delete, that is it is in the TASK_DONE
693     // state, then no one should be writing to it anymore. That is no one
694     // will be updating the znode version any more.
695     this.watcher.getRecoverableZooKeeper().getZooKeeper().
696       delete(path, -1, new DeleteAsyncCallback(),
697         retries);
698   }
699 
700   private void deleteNodeSuccess(String path) {
701     if (ignoreZKDeleteForTesting) {
702       return;
703     }
704     Task task;
705     task = tasks.remove(path);
706     if (task == null) {
707       if (ZKSplitLog.isRescanNode(watcher, path)) {
708         tot_mgr_rescan_deleted.incrementAndGet();
709       }
710       tot_mgr_missing_state_in_delete.incrementAndGet();
711       LOG.debug("deleted task without in memory state " + path);
712       return;
713     }
714     synchronized (task) {
715       task.status = DELETED;
716       task.notify();
717     }
718     tot_mgr_task_deleted.incrementAndGet();
719   }
720 
721   private void deleteNodeFailure(String path) {
722     LOG.info("Failed to delete node " + path + " and will retry soon.");
723     return;
724   }
725 
726   /**
727    * signal the workers that a task was resubmitted by creating the
728    * RESCAN node.
729    * @throws KeeperException
730    */
731   private void createRescanNode(long retries) {
732     // The RESCAN node will be deleted almost immediately by the
733     // SplitLogManager as soon as it is created because it is being
734     // created in the DONE state. This behavior prevents a buildup
735     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
736     // might miss the watch-trigger that creation of RESCAN node provides.
737     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
738     // therefore this behavior is safe.
739     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
740     this.watcher.getRecoverableZooKeeper().getZooKeeper().
741       create(ZKSplitLog.getRescanNode(watcher),
742         TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE,
743         CreateMode.EPHEMERAL_SEQUENTIAL,
744         new CreateRescanAsyncCallback(), Long.valueOf(retries));
745   }
746 
747   private void createRescanSuccess(String path) {
748     tot_mgr_rescan.incrementAndGet();
749     getDataSetWatch(path, zkretries);
750   }
751 
752   private void createRescanFailure() {
753     LOG.fatal("logic failure, rescan failure must not happen");
754   }
755 
756   /**
757    * @param path
758    * @param batch
759    * @return null on success, existing task on error
760    */
761   private Task createTaskIfAbsent(String path, TaskBatch batch) {
762     Task oldtask;
763     // batch.installed is only changed via this function and
764     // a single thread touches batch.installed.
765     Task newtask = new Task();
766     newtask.batch = batch;
767     oldtask = tasks.putIfAbsent(path, newtask);
768     if (oldtask == null) {
769       batch.installed++;
770       return  null;
771     }
772     // new task was not used.
773     synchronized (oldtask) {
774       if (oldtask.isOrphan()) {
775         if (oldtask.status == SUCCESS) {
776           // The task is already done. Do not install the batch for this
777           // task because it might be too late for setDone() to update
778           // batch.done. There is no need for the batch creator to wait for
779           // this task to complete.
780           return (null);
781         }
782         if (oldtask.status == IN_PROGRESS) {
783           oldtask.batch = batch;
784           batch.installed++;
785           LOG.debug("Previously orphan task " + path +
786               " is now being waited upon");
787           return null;
788         }
789         while (oldtask.status == FAILURE) {
790           LOG.debug("wait for status of task " + path +
791               " to change to DELETED");
792           tot_mgr_wait_for_zk_delete.incrementAndGet();
793           try {
794             oldtask.wait();
795           } catch (InterruptedException e) {
796             Thread.currentThread().interrupt();
797             LOG.warn("Interrupted when waiting for znode delete callback");
798             // fall through to return failure
799             break;
800           }
801         }
802         if (oldtask.status != DELETED) {
803           LOG.warn("Failure because previously failed task" +
804               " state still present. Waiting for znode delete callback" +
805               " path=" + path);
806           return oldtask;
807         }
808         // reinsert the newTask and it must succeed this time
809         Task t = tasks.putIfAbsent(path, newtask);
810         if (t == null) {
811           batch.installed++;
812           return  null;
813         }
814         LOG.fatal("Logic error. Deleted task still present in tasks map");
815         assert false : "Deleted task still present in tasks map";
816         return t;
817       }
818       LOG.warn("Failure because two threads can't wait for the same task. " +
819           " path=" + path);
820       return oldtask;
821     }
822   }
823 
824   Task findOrCreateOrphanTask(String path) {
825     Task orphanTask = new Task();
826     Task task;
827     task = tasks.putIfAbsent(path, orphanTask);
828     if (task == null) {
829       LOG.info("creating orphan task " + path);
830       tot_mgr_orphan_task_acquired.incrementAndGet();
831       task = orphanTask;
832     }
833     return task;
834   }
835 
836   @Override
837   public void nodeDataChanged(String path) {
838     Task task;
839     task = tasks.get(path);
840     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
841       if (task != null) {
842         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
843       }
844       getDataSetWatch(path, zkretries);
845     }
846   }
847 
848   public void stop() {
849     if (timeoutMonitor != null) {
850       timeoutMonitor.interrupt();
851     }
852   }
853 
854   private void lookForOrphans() {
855     List<String> orphans;
856     try {
857        orphans = ZKUtil.listChildrenNoWatch(this.watcher,
858           this.watcher.splitLogZNode);
859       if (orphans == null) {
860         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
861         return;
862       }
863     } catch (KeeperException e) {
864       LOG.warn("could not get children of " + this.watcher.splitLogZNode +
865           " " + StringUtils.stringifyException(e));
866       return;
867     }
868     int rescan_nodes = 0;
869     for (String path : orphans) {
870       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
871       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
872         rescan_nodes++;
873         LOG.debug("found orphan rescan node " + path);
874       } else {
875         LOG.info("found orphan task " + path);
876       }
877       getDataSetWatch(nodepath, zkretries);
878     }
879     LOG.info("found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
880         rescan_nodes + " rescan nodes");
881   }
882 
883   /**
884    * Keeps track of the batch of tasks submitted together by a caller in
885    * splitLogDistributed(). Clients threads use this object to wait for all
886    * their tasks to be done.
887    * <p>
888    * All access is synchronized.
889    */
890   static class TaskBatch {
891     int installed = 0;
892     int done = 0;
893     int error = 0;
894     volatile boolean isDead = false;
895 
896     @Override
897     public String toString() {
898       return ("installed = " + installed + " done = " + done + " error = "
899           + error);
900     }
901   }
902 
903   /**
904    * in memory state of an active task.
905    */
906   static class Task {
907     volatile long last_update;
908     volatile int last_version;
909     volatile String cur_worker_name;
910     volatile TaskBatch batch;
911     volatile TerminationStatus status;
912     volatile int incarnation;
913     volatile int unforcedResubmits;
914     volatile boolean resubmitThresholdReached;
915 
916     @Override
917     public String toString() {
918       return ("last_update = " + last_update +
919           " last_version = " + last_version +
920           " cur_worker_name = " + cur_worker_name +
921           " status = " + status +
922           " incarnation = " + incarnation +
923           " resubmits = " + unforcedResubmits +
924           " batch = " + batch);
925     }
926 
927     Task() {
928       incarnation = 0;
929       last_version = -1;
930       status = IN_PROGRESS;
931       setUnassigned();
932     }
933 
934     public boolean isOrphan() {
935       return (batch == null || batch.isDead);
936     }
937 
938     public boolean isUnassigned() {
939       return (cur_worker_name == null);
940     }
941 
942     public void heartbeatNoDetails(long time) {
943       last_update = time;
944     }
945 
946     public void heartbeat(long time, int version, String worker) {
947       last_version = version;
948       last_update = time;
949       cur_worker_name = worker;
950     }
951 
952     public void setUnassigned() {
953       cur_worker_name = null;
954       last_update = -1;
955     }
956   }
957 
958   void handleDeadWorker(String workerName) {
959     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
960     // to reason about concurrency. Makes it easier to retry.
961     synchronized (deadWorkersLock) {
962       if (deadWorkers == null) {
963         deadWorkers = new HashSet<String>(100);
964       }
965       deadWorkers.add(workerName);
966     }
967     LOG.info("dead splitlog worker " + workerName);
968   }
969 
970   void handleDeadWorkers(List<ServerName> serverNames) {
971     List<String> workerNames = new ArrayList<String>(serverNames.size());
972     for (ServerName serverName : serverNames) {
973       workerNames.add(serverName.toString());
974     }
975     synchronized (deadWorkersLock) {
976       if (deadWorkers == null) {
977         deadWorkers = new HashSet<String>(100);
978       }
979       deadWorkers.addAll(workerNames);
980     }
981     LOG.info("dead splitlog workers " + workerNames);
982   }
983 
984   /**
985    * Periodically checks all active tasks and resubmits the ones that have timed
986    * out
987    */
988   private class TimeoutMonitor extends Chore {
989     public TimeoutMonitor(final int period, Stoppable stopper) {
990       super("SplitLogManager Timeout Monitor", period, stopper);
991     }
992 
993     @Override
994     protected void chore() {
995       int resubmitted = 0;
996       int unassigned = 0;
997       int tot = 0;
998       boolean found_assigned_task = false;
999       Set<String> localDeadWorkers;
1000 
1001       synchronized (deadWorkersLock) {
1002         localDeadWorkers = deadWorkers;
1003         deadWorkers = null;
1004       }
1005 
1006       for (Map.Entry<String, Task> e : tasks.entrySet()) {
1007         String path = e.getKey();
1008         Task task = e.getValue();
1009         String cur_worker = task.cur_worker_name;
1010         tot++;
1011         // don't easily resubmit a task which hasn't been picked up yet. It
1012         // might be a long while before a SplitLogWorker is free to pick up a
1013         // task. This is because a SplitLogWorker picks up a task one at a
1014         // time. If we want progress when there are no region servers then we
1015         // will have to run a SplitLogWorker thread in the Master.
1016         if (task.isUnassigned()) {
1017           unassigned++;
1018           continue;
1019         }
1020         found_assigned_task = true;
1021         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1022           tot_mgr_resubmit_dead_server_task.incrementAndGet();
1023           if (resubmit(path, task, FORCE)) {
1024             resubmitted++;
1025           } else {
1026             handleDeadWorker(cur_worker);
1027             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1028                 cur_worker + ", will retry.");
1029           }
1030         } else if (resubmit(path, task, CHECK)) {
1031           resubmitted++;
1032         }
1033       }
1034       if (tot > 0) {
1035         LOG.debug("total tasks = " + tot + " unassigned = " + unassigned);
1036       }
1037       if (resubmitted > 0) {
1038         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1039       }
1040       // If there are pending tasks and all of them have been unassigned for
1041       // some time then put up a RESCAN node to ping the workers.
1042       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
1043       // because a. it is very unlikely that every worker had a
1044       // transient error when trying to grab the task b. if there are no
1045       // workers then all tasks wills stay unassigned indefinitely and the
1046       // manager will be indefinitely creating RESCAN nodes. TODO may be the
1047       // master should spawn both a manager and a worker thread to guarantee
1048       // that there is always one worker in the system
1049       if (tot > 0 && !found_assigned_task &&
1050           ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
1051           unassignedTimeout)) {
1052         for (Map.Entry<String, Task> e : tasks.entrySet()) {
1053           String path = e.getKey();
1054           Task task = e.getValue();
1055           // we have to do task.isUnassigned() check again because tasks might
1056           // have been asynchronously assigned. There is no locking required
1057           // for these checks ... it is OK even if tryGetDataSetWatch() is
1058           // called unnecessarily for a task
1059           if (task.isUnassigned() && (task.status != FAILURE)) {
1060             // We just touch the znode to make sure its still there
1061             tryGetDataSetWatch(path);
1062           }
1063         }
1064         createRescanNode(Long.MAX_VALUE);
1065         tot_mgr_resubmit_unassigned.incrementAndGet();
1066         LOG.debug("resubmitting unassigned task(s) after timeout");
1067       }
1068 
1069       // Retry previously failed deletes
1070       if (failedDeletions.size() > 0) {
1071         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1072         failedDeletions.removeAll(tmpPaths);
1073         for (String tmpPath : tmpPaths) {
1074           // deleteNode is an async call
1075           deleteNode(tmpPath, zkretries);
1076         }
1077       }
1078     }
1079   }
1080 
1081   /**
1082    * Asynchronous handler for zk create node results.
1083    * Retries on failures.
1084    */
1085   class CreateAsyncCallback implements AsyncCallback.StringCallback {
1086     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1087 
1088     @Override
1089     public void processResult(int rc, String path, Object ctx, String name) {
1090       tot_mgr_node_create_result.incrementAndGet();
1091       if (rc != 0) {
1092         if (shouldAbandonRetries(rc, "Create znode " + path)) {
1093           createNodeFailure(path);
1094           return;
1095         }
1096         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1097           // What if there is a delete pending against this pre-existing
1098           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
1099           // state. Only operations that will be carried out on this node by
1100           // this manager are get-znode-data, task-finisher and delete-znode.
1101           // And all code pieces correctly handle the case of suddenly
1102           // disappearing task-znode.
1103           LOG.debug("found pre-existing znode " + path);
1104           tot_mgr_node_already_exists.incrementAndGet();
1105         } else {
1106           Long retry_count = (Long)ctx;
1107           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1108               path + " remaining retries=" + retry_count);
1109           if (retry_count == 0) {
1110             tot_mgr_node_create_err.incrementAndGet();
1111             createNodeFailure(path);
1112           } else {
1113             tot_mgr_node_create_retry.incrementAndGet();
1114             createNode(path, retry_count - 1);
1115           }
1116           return;
1117         }
1118       }
1119       createNodeSuccess(path);
1120     }
1121   }
1122 
1123   /**
1124    * Asynchronous handler for zk get-data-set-watch on node results.
1125    * Retries on failures.
1126    */
1127   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1128     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1129     private boolean completeTaskOnNoNode;
1130 
1131     /**
1132      * @param completeTaskOnNoNode Complete the task if the znode cannot be found.
1133      * Since in-memory task creation and znode creation are not atomic, there might be
1134      * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor).
1135      * In this case completeTaskOnNoNode should be set to false. See HBASE-11217.
1136      */
1137     public GetDataAsyncCallback(boolean completeTaskOnNoNode) {
1138       this.completeTaskOnNoNode = completeTaskOnNoNode;
1139     }
1140 
1141     @Override
1142     public void processResult(int rc, String path, Object ctx, byte[] data,
1143         Stat stat) {
1144       tot_mgr_get_data_result.incrementAndGet();
1145       if (rc != 0) {
1146         if (shouldAbandonRetries(rc, "GetData from znode " + path)) {
1147           return;
1148         }
1149         if (rc == KeeperException.Code.NONODE.intValue()) {
1150           tot_mgr_get_data_nonode.incrementAndGet();
1151           LOG.warn("task znode " + path + " vanished.");
1152           if (completeTaskOnNoNode) {
1153             // The task znode has been deleted. Must be some pending delete
1154             // that deleted the task. Assume success because a task-znode is
1155             // is only deleted after TaskFinisher is successful.
1156             getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1157           }
1158           return;
1159         }
1160         Long retry_count = (Long) ctx;
1161 
1162         if (retry_count < 0) {
1163           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1164               path + ". Ignoring error. No error handling. No retrying.");
1165           return;
1166         }
1167         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1168             path + " remaining retries=" + retry_count);
1169         if (retry_count == 0) {
1170           tot_mgr_get_data_err.incrementAndGet();
1171           getDataSetWatchFailure(path);
1172         } else {
1173           tot_mgr_get_data_retry.incrementAndGet();
1174           getDataSetWatch(path, retry_count - 1);
1175         }
1176         return;
1177       }
1178       getDataSetWatchSuccess(path, data, stat.getVersion());
1179       return;
1180     }
1181   }
1182 
1183   /**
1184    * Asynchronous handler for zk delete node results.
1185    * Retries on failures.
1186    */
1187   class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1188     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1189 
1190     @Override
1191     public void processResult(int rc, String path, Object ctx) {
1192       tot_mgr_node_delete_result.incrementAndGet();
1193       if (rc != 0) {
1194         if (shouldAbandonRetries(rc, "Delete znode " + path)) {
1195           failedDeletions.add(path);
1196           return;
1197         }
1198         if (rc != KeeperException.Code.NONODE.intValue()) {
1199           tot_mgr_node_delete_err.incrementAndGet();
1200           Long retry_count = (Long) ctx;
1201           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1202               path + " remaining retries=" + retry_count);
1203           if (retry_count == 0) {
1204             LOG.warn("delete failed " + path);
1205             failedDeletions.add(path);
1206             deleteNodeFailure(path);
1207           } else {
1208             deleteNode(path, retry_count - 1);
1209           }
1210           return;
1211         } else {
1212         LOG.info(path +
1213             " does not exist. Either was created but deleted behind our" +
1214             " back by another pending delete OR was deleted" +
1215             " in earlier retry rounds. zkretries = " + (Long) ctx);
1216         }
1217       } else {
1218         LOG.debug("deleted " + path);
1219       }
1220       deleteNodeSuccess(path);
1221     }
1222   }
1223 
1224   /**
1225    * Asynchronous handler for zk create RESCAN-node results.
1226    * Retries on failures.
1227    * <p>
1228    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
1229    * for all the {@link SplitLogWorker}s to rescan for new tasks.
1230    */
1231   class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1232     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1233 
1234     @Override
1235     public void processResult(int rc, String path, Object ctx, String name) {
1236       if (rc != 0) {
1237         if (shouldAbandonRetries(rc, "CreateRescan znode " + path)) {
1238           return;
1239         }
1240         Long retry_count = (Long)ctx;
1241         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1242             " remaining retries=" + retry_count);
1243         if (retry_count == 0) {
1244           createRescanFailure();
1245         } else {
1246           createRescanNode(retry_count - 1);
1247         }
1248         return;
1249       }
1250       // path is the original arg, name is the actual name that was created
1251       createRescanSuccess(name);
1252     }
1253   }
1254 
1255   /**
1256    * {@link SplitLogManager} can use objects implementing this interface to
1257    * finish off a partially done task by {@link SplitLogWorker}. This provides
1258    * a serialization point at the end of the task processing. Must be
1259    * restartable and idempotent.
1260    */
1261   static public interface TaskFinisher {
1262     /**
1263      * status that can be returned finish()
1264      */
1265     static public enum Status {
1266       /**
1267        * task completed successfully
1268        */
1269       DONE(),
1270       /**
1271        * task completed with error
1272        */
1273       ERR();
1274     }
1275     /**
1276      * finish the partially done task. workername provides clue to where the
1277      * partial results of the partially done tasks are present. taskname is the
1278      * name of the task that was put up in zookeeper.
1279      * <p>
1280      * @param workerName
1281      * @param taskname
1282      * @return DONE if task completed successfully, ERR otherwise
1283      */
1284     public Status finish(String workerName, String taskname);
1285   }
1286   enum ResubmitDirective {
1287     CHECK(),
1288     FORCE();
1289   }
1290   enum TerminationStatus {
1291     IN_PROGRESS("in_progress"),
1292     SUCCESS("success"),
1293     FAILURE("failure"),
1294     DELETED("deleted");
1295 
1296     String statusMsg;
1297     TerminationStatus(String msg) {
1298       statusMsg = msg;
1299     }
1300 
1301     @Override
1302     public String toString() {
1303       return statusMsg;
1304     }
1305   }
1306   
1307   /**
1308    * Completes the initialization
1309    */
1310   public void finishInitialization() {
1311     finishInitialization(false);
1312   }
1313 }