View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.coordination;
20  
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
26  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentMap;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.CoordinatedStateManager;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.Server;
44  import org.apache.hadoop.hbase.ServerName;
45  import org.apache.hadoop.hbase.SplitLogCounters;
46  import org.apache.hadoop.hbase.SplitLogTask;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
49  import org.apache.hadoop.hbase.exceptions.DeserializationException;
50  import org.apache.hadoop.hbase.master.MasterServices;
51  import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
52  import org.apache.hadoop.hbase.master.SplitLogManager.Task;
53  import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
54  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
57  import org.apache.hadoop.hbase.wal.WALSplitter;
58  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
59  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
60  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
61  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
62  import org.apache.hadoop.util.StringUtils;
63  import org.apache.zookeeper.AsyncCallback;
64  import org.apache.zookeeper.CreateMode;
65  import org.apache.zookeeper.KeeperException;
66  import org.apache.zookeeper.KeeperException.NoNodeException;
67  import org.apache.zookeeper.ZooDefs.Ids;
68  import org.apache.zookeeper.data.Stat;
69  
70  /**
71   * ZooKeeper based implementation of 
72   * {@link org.apache.hadoop.hbase.master.SplitLogManagerCoordination}
73   */
74  @InterfaceAudience.Private
75  public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
76      SplitLogManagerCoordination {
77  
78    public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails {
79  
80      ZkSplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
81          Set<String> failedDeletions, ServerName serverName) {
82        super(tasks, master, failedDeletions, serverName);
83      }
84    }
85  
86    public static final int DEFAULT_TIMEOUT = 120000;
87    public static final int DEFAULT_ZK_RETRIES = 3;
88    public static final int DEFAULT_MAX_RESUBMIT = 3;
89  
90    private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class);
91  
92    private Server server;
93    private long zkretries;
94    private long resubmitThreshold;
95    private long timeout;
96    private TaskFinisher taskFinisher;
97  
98    SplitLogManagerDetails details;
99  
100   // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
101   // whether to GC stale recovering znodes
102   private volatile long lastRecoveringNodeCreationTime = 0;
103   private Configuration conf;
104   public boolean ignoreZKDeleteForTesting = false;
105 
106   private RecoveryMode recoveryMode;
107 
108   private boolean isDrainingDone = false;
109 
110   public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
111       ZooKeeperWatcher watcher) {
112     super(watcher);
113     taskFinisher = new TaskFinisher() {
114       @Override
115       public Status finish(ServerName workerName, String logfile) {
116         try {
117           WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
118         } catch (IOException e) {
119           LOG.warn("Could not finish splitting of log file " + logfile, e);
120           return Status.ERR;
121         }
122         return Status.DONE;
123       }
124     };
125     this.server = manager.getServer();
126     this.conf = server.getConfiguration();
127   }
128 
129   @Override
130   public void init() throws IOException {
131     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
132     this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
133     this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
134     setRecoveryMode(true);
135     if (this.watcher != null) {
136       this.watcher.registerListener(this);
137       lookForOrphans();
138     }
139   }
140 
141   @Override
142   public String prepareTask(String taskname) {
143     return ZKSplitLog.getEncodedNodeName(watcher, taskname);
144   }
145 
146   @Override
147   public int remainingTasksInCoordination() {
148     int count = 0;
149     try {
150       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
151       if (tasks != null) {
152         int listSize = tasks.size();
153         for (int i = 0; i < listSize; i++) {
154           if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
155             count++;
156           }
157         }
158       }
159     } catch (KeeperException ke) {
160       LOG.warn("Failed to check remaining tasks", ke);
161       count = -1;
162     }
163     return count;
164   }
165 
166   /**
167    * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants
168    * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create
169    * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this
170    * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
171    * @param path
172    */
173   private void handleUnassignedTask(String path) {
174     if (ZKSplitLog.isRescanNode(watcher, path)) {
175       return;
176     }
177     Task task = findOrCreateOrphanTask(path);
178     if (task.isOrphan() && (task.incarnation == 0)) {
179       LOG.info("resubmitting unassigned orphan task " + path);
180       // ignore failure to resubmit. The timeout-monitor will handle it later
181       // albeit in a more crude fashion
182       resubmitTask(path, task, FORCE);
183     }
184   }
185 
186   @Override
187   public void deleteTask(String path) {
188     deleteNode(path, zkretries);
189   }
190 
191   @Override
192   public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
193     // its ok if this thread misses the update to task.deleted. It will fail later
194     if (task.status != IN_PROGRESS) {
195       return false;
196     }
197     int version;
198     if (directive != FORCE) {
199       // We're going to resubmit:
200       // 1) immediately if the worker server is now marked as dead
201       // 2) after a configurable timeout if the server is not marked as dead but has still not
202       // finished the task. This allows to continue if the worker cannot actually handle it,
203       // for any reason.
204       final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
205       final boolean alive =
206           details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
207               .isServerOnline(task.cur_worker_name) : true;
208       if (alive && time < timeout) {
209         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server "
210             + task.cur_worker_name + " is not marked as dead, we waited for " + time
211             + " while the timeout is " + timeout);
212         return false;
213       }
214 
215       if (task.unforcedResubmits.get() >= resubmitThreshold) {
216         if (!task.resubmitThresholdReached) {
217           task.resubmitThresholdReached = true;
218           SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
219           LOG.info("Skipping resubmissions of task " + path + " because threshold "
220               + resubmitThreshold + " reached");
221         }
222         return false;
223       }
224       // race with heartbeat() that might be changing last_version
225       version = task.last_version;
226     } else {
227       SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
228       version = -1;
229     }
230     LOG.info("resubmitting task " + path);
231     task.incarnation++;
232     boolean result = resubmit(this.details.getServerName(), path, version);
233     if (!result) {
234       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
235       return false;
236     }
237     // don't count forced resubmits
238     if (directive != FORCE) {
239       task.unforcedResubmits.incrementAndGet();
240     }
241     task.setUnassigned();
242     rescan(Long.MAX_VALUE);
243     SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
244     return true;
245   }
246 
247 
248   @Override
249   public void checkTasks() {
250     rescan(Long.MAX_VALUE);
251   };
252 
253   /**
254    * signal the workers that a task was resubmitted by creating the RESCAN node.
255    */
256   private void rescan(long retries) {
257     // The RESCAN node will be deleted almost immediately by the
258     // SplitLogManager as soon as it is created because it is being
259     // created in the DONE state. This behavior prevents a buildup
260     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
261     // might miss the watch-trigger that creation of RESCAN node provides.
262     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
263     // therefore this behavior is safe.
264     SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode());
265     this.watcher
266         .getRecoverableZooKeeper()
267         .getZooKeeper()
268         .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
269           CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
270   }
271 
272   @Override
273   public void submitTask(String path) {
274     createNode(path, zkretries);
275   }
276 
277   @Override
278   public void checkTaskStillAvailable(String path) {
279     // A negative retry count will lead to ignoring all error processing.
280     this.watcher
281         .getRecoverableZooKeeper()
282         .getZooKeeper()
283         .getData(path, this.watcher, new GetDataAsyncCallback(),
284           Long.valueOf(-1) /* retry count */);
285     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
286   }
287 
288   /**
289    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
290    * region server hosting the region can allow reads to the recovered region
291    * @param recoveredServerNameSet servers which are just recovered
292    * @param isMetaRecovery whether current recovery is for the meta region on
293    *          <code>serverNames<code>
294    */
295   @Override
296   public void removeRecoveringRegions(final Set<String> recoveredServerNameSet,
297       Boolean isMetaRecovery)
298   throws IOException {
299     final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
300     int count = 0;
301     try {
302       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
303       if (tasks != null) {
304         int listSize = tasks.size();
305         for (int i = 0; i < listSize; i++) {
306           if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
307             count++;
308           }
309         }
310       }
311       if (count == 0 && this.details.getMaster().isInitialized()
312           && !this.details.getMaster().getServerManager().areDeadServersInProgress()) {
313         // No splitting work items left
314         ZKSplitLog.deleteRecoveringRegionZNodes(watcher, null);
315         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
316         // this point.
317         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
318       } else if (!recoveredServerNameSet.isEmpty()) {
319         // Remove recovering regions which don't have any RS associated with it
320         List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
321         if (regions != null) {
322           int listSize = regions.size();
323           if (LOG.isDebugEnabled()) {
324             LOG.debug("Processing recovering " + regions + " and servers "  +
325                 recoveredServerNameSet + ", isMetaRecovery=" + isMetaRecovery);
326           }
327           for (int i = 0; i < listSize; i++) {
328             String region = regions.get(i);
329             if (isMetaRecovery != null) {
330               if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
331                   || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
332                 // skip non-meta regions when recovering the meta region or
333                 // skip the meta region when recovering user regions
334                 continue;
335               }
336             }
337             String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
338             List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
339             if (failedServers == null || failedServers.isEmpty()) {
340               ZKUtil.deleteNode(watcher, nodePath);
341               continue;
342             }
343             if (recoveredServerNameSet.containsAll(failedServers)) {
344               ZKUtil.deleteNodeRecursively(watcher, nodePath);
345             } else {
346               int tmpFailedServerSize = failedServers.size();
347               for (int j = 0; j < tmpFailedServerSize; j++) {
348                 String failedServer = failedServers.get(j);
349                 if (recoveredServerNameSet.contains(failedServer)) {
350                   String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
351                   ZKUtil.deleteNode(watcher, tmpPath);
352                 }
353               }
354             }
355           }
356         }
357       }
358     } catch (KeeperException ke) {
359       LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
360       throw new IOException(ke);
361     }
362   }
363 
364   private void deleteNode(String path, Long retries) {
365     SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
366     // Once a task znode is ready for delete, that is it is in the TASK_DONE
367     // state, then no one should be writing to it anymore. That is no one
368     // will be updating the znode version any more.
369     this.watcher.getRecoverableZooKeeper().getZooKeeper()
370         .delete(path, -1, new DeleteAsyncCallback(), retries);
371   }
372 
373   private void deleteNodeSuccess(String path) {
374     if (ignoreZKDeleteForTesting) {
375       return;
376     }
377     Task task;
378     task = details.getTasks().remove(path);
379     if (task == null) {
380       if (ZKSplitLog.isRescanNode(watcher, path)) {
381         SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
382       }
383       SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
384       LOG.debug("deleted task without in memory state " + path);
385       return;
386     }
387     synchronized (task) {
388       task.status = DELETED;
389       task.notify();
390     }
391     SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
392   }
393 
394   private void deleteNodeFailure(String path) {
395     LOG.info("Failed to delete node " + path + " and will retry soon.");
396     return;
397   }
398 
399   private void createRescanSuccess(String path) {
400     SplitLogCounters.tot_mgr_rescan.incrementAndGet();
401     getDataSetWatch(path, zkretries);
402   }
403 
404   private void createRescanFailure() {
405     LOG.fatal("logic failure, rescan failure must not happen");
406   }
407 
408   /**
409    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
410    * @param statusCode integer value of a ZooKeeper exception code
411    * @param action description message about the retried action
412    * @return true when need to abandon retries otherwise false
413    */
414   private boolean needAbandonRetries(int statusCode, String action) {
415     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
416       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
417           + "action=" + action);
418       return true;
419     }
420     return false;
421   }
422 
423   private void createNode(String path, Long retry_count) {
424     SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
425     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
426       retry_count);
427     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
428     return;
429   }
430 
431   private void createNodeSuccess(String path) {
432     LOG.debug("put up splitlog task at znode " + path);
433     getDataSetWatch(path, zkretries);
434   }
435 
436   private void createNodeFailure(String path) {
437     // TODO the Manager should split the log locally instead of giving up
438     LOG.warn("failed to create task node" + path);
439     setDone(path, FAILURE);
440   }
441 
442   private void getDataSetWatch(String path, Long retry_count) {
443     this.watcher.getRecoverableZooKeeper().getZooKeeper()
444         .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
445     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
446   }
447 
448 
449   private void getDataSetWatchSuccess(String path, byte[] data, int version)
450       throws DeserializationException {
451     if (data == null) {
452       if (version == Integer.MIN_VALUE) {
453         // assume all done. The task znode suddenly disappeared.
454         setDone(path, SUCCESS);
455         return;
456       }
457       SplitLogCounters.tot_mgr_null_data.incrementAndGet();
458       LOG.fatal("logic error - got null data " + path);
459       setDone(path, FAILURE);
460       return;
461     }
462     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
463     SplitLogTask slt = SplitLogTask.parseFrom(data);
464     if (slt.isUnassigned()) {
465       LOG.debug("task not yet acquired " + path + " ver = " + version);
466       handleUnassignedTask(path);
467     } else if (slt.isOwned()) {
468       heartbeat(path, version, slt.getServerName());
469     } else if (slt.isResigned()) {
470       LOG.info("task " + path + " entered state: " + slt.toString());
471       resubmitOrFail(path, FORCE);
472     } else if (slt.isDone()) {
473       LOG.info("task " + path + " entered state: " + slt.toString());
474       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
475         if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
476           setDone(path, SUCCESS);
477         } else {
478           resubmitOrFail(path, CHECK);
479         }
480       } else {
481         setDone(path, SUCCESS);
482       }
483     } else if (slt.isErr()) {
484       LOG.info("task " + path + " entered state: " + slt.toString());
485       resubmitOrFail(path, CHECK);
486     } else {
487       LOG.fatal("logic error - unexpected zk state for path = " + path + " data = "
488           + slt.toString());
489       setDone(path, FAILURE);
490     }
491   }
492 
493   private void resubmitOrFail(String path, ResubmitDirective directive) {
494     if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
495       setDone(path, FAILURE);
496     }
497   }
498 
499   private void getDataSetWatchFailure(String path) {
500     LOG.warn("failed to set data watch " + path);
501     setDone(path, FAILURE);
502   }
503 
504   private void setDone(String path, TerminationStatus status) {
505     Task task = details.getTasks().get(path);
506     if (task == null) {
507       if (!ZKSplitLog.isRescanNode(watcher, path)) {
508         SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
509         LOG.debug("unacquired orphan task is done " + path);
510       }
511     } else {
512       synchronized (task) {
513         if (task.status == IN_PROGRESS) {
514           if (status == SUCCESS) {
515             SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
516             LOG.info("Done splitting " + path);
517           } else {
518             SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
519             LOG.warn("Error splitting " + path);
520           }
521           task.status = status;
522           if (task.batch != null) {
523             synchronized (task.batch) {
524               if (status == SUCCESS) {
525                 task.batch.done++;
526               } else {
527                 task.batch.error++;
528               }
529               task.batch.notify();
530             }
531           }
532         }
533       }
534     }
535     // delete the task node in zk. It's an async
536     // call and no one is blocked waiting for this node to be deleted. All
537     // task names are unique (log.<timestamp>) there is no risk of deleting
538     // a future task.
539     // if a deletion fails, TimeoutMonitor will retry the same deletion later
540     deleteNode(path, zkretries);
541     return;
542   }
543 
544   Task findOrCreateOrphanTask(String path) {
545     Task orphanTask = new Task();
546     Task task;
547     task = details.getTasks().putIfAbsent(path, orphanTask);
548     if (task == null) {
549       LOG.info("creating orphan task " + path);
550       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
551       task = orphanTask;
552     }
553     return task;
554   }
555 
556   private void heartbeat(String path, int new_version, ServerName workerName) {
557     Task task = findOrCreateOrphanTask(path);
558     if (new_version != task.last_version) {
559       if (task.isUnassigned()) {
560         LOG.info("task " + path + " acquired by " + workerName);
561       }
562       task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
563       SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
564     } else {
565       // duplicate heartbeats - heartbeats w/o zk node version
566       // changing - are possible. The timeout thread does
567       // getDataSetWatch() just to check whether a node still
568       // exists or not
569     }
570     return;
571   }
572 
573   private void lookForOrphans() {
574     List<String> orphans;
575     try {
576       orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.splitLogZNode);
577       if (orphans == null) {
578         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
579         return;
580       }
581     } catch (KeeperException e) {
582       LOG.warn("could not get children of " + this.watcher.splitLogZNode + " "
583           + StringUtils.stringifyException(e));
584       return;
585     }
586     int rescan_nodes = 0;
587     int listSize = orphans.size();
588     for (int i = 0; i < listSize; i++) {
589       String path = orphans.get(i);
590       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
591       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
592         rescan_nodes++;
593         LOG.debug("found orphan rescan node " + path);
594       } else {
595         LOG.info("found orphan task " + path);
596       }
597       getDataSetWatch(nodepath, zkretries);
598     }
599     LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
600         + " rescan nodes");
601   }
602 
603   /**
604    * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
605    * all regions of the passed in region servers
606    * @param serverName the name of a region server
607    * @param userRegions user regiones assigned on the region server
608    */
609   @Override
610   public void markRegionsRecovering(final ServerName serverName, Set<HRegionInfo> userRegions)
611       throws IOException, InterruptedIOException {
612     this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTime();
613     for (HRegionInfo region : userRegions) {
614       String regionEncodeName = region.getEncodedName();
615       long retries = this.zkretries;
616 
617       do {
618         String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
619         long lastRecordedFlushedSequenceId = -1;
620         try {
621           long lastSequenceId =
622               this.details.getMaster().getServerManager()
623                   .getLastFlushedSequenceId(regionEncodeName.getBytes()).getLastFlushedSequenceId();
624 
625           /*
626            * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
627            * flushed sequence id for the server]
628            */
629           byte[] data = ZKUtil.getData(this.watcher, nodePath);
630           if (data == null) {
631             ZKUtil
632                 .createSetData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
633           } else {
634             lastRecordedFlushedSequenceId =
635                 ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
636             if (lastRecordedFlushedSequenceId < lastSequenceId) {
637               // update last flushed sequence id in the region level
638               ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
639             }
640           }
641           // go one level deeper with server name
642           nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
643           if (lastSequenceId <= lastRecordedFlushedSequenceId) {
644             // the newly assigned RS failed even before any flush to the region
645             lastSequenceId = lastRecordedFlushedSequenceId;
646           }
647           ZKUtil.createSetData(this.watcher, nodePath,
648             ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
649           if (LOG.isDebugEnabled()) {
650             LOG.debug("Marked " + regionEncodeName + " as recovering from " + serverName +
651               ": " + nodePath);
652           }
653           // break retry loop
654           break;
655         } catch (KeeperException e) {
656           // ignore ZooKeeper exceptions inside retry loop
657           if (retries <= 1) {
658             throw new IOException(e);
659           }
660           // wait a little bit for retry
661           try {
662             Thread.sleep(20);
663           } catch (InterruptedException e1) {
664             throw new InterruptedIOException();
665           }
666         } catch (InterruptedException e) {
667           throw new InterruptedIOException();
668         }
669       } while ((--retries) > 0);
670     }
671   }
672 
673   @Override
674   public void nodeDataChanged(String path) {
675     Task task;
676     task = details.getTasks().get(path);
677     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
678       if (task != null) {
679         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
680       }
681       getDataSetWatch(path, zkretries);
682     }
683   }
684 
685   /**
686    * ZooKeeper implementation of
687    * {@link org.apache.hadoop.hbase.master.
688    * SplitLogManagerCoordination#removeStaleRecoveringRegions(Set)}
689    */
690   @Override
691   public void removeStaleRecoveringRegions(final Set<String> knownFailedServers)
692       throws IOException, InterruptedIOException {
693 
694     try {
695       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
696       if (tasks != null) {
697         int listSize = tasks.size();
698         for (int i = 0; i < listSize; i++) {
699           String t = tasks.get(i);
700           byte[] data;
701           try {
702             data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
703           } catch (InterruptedException e) {
704             throw new InterruptedIOException();
705           }
706           if (data != null) {
707             SplitLogTask slt = null;
708             try {
709               slt = SplitLogTask.parseFrom(data);
710             } catch (DeserializationException e) {
711               LOG.warn("Failed parse data for znode " + t, e);
712             }
713             if (slt != null && slt.isDone()) {
714               continue;
715             }
716           }
717           // decode the file name
718           t = ZKSplitLog.getFileName(t);
719           ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(new Path(t));
720           if (serverName != null) {
721             knownFailedServers.add(serverName.getServerName());
722           } else {
723             LOG.warn("Found invalid WAL log file name:" + t);
724           }
725         }
726       }
727 
728       // remove recovering regions which doesn't have any RS associated with it
729       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
730       if (regions != null) {
731         int listSize = regions.size();
732         for (int i = 0; i < listSize; i++) {
733           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regions.get(i));
734           List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
735           if (regionFailedServers == null || regionFailedServers.isEmpty()) {
736             ZKUtil.deleteNode(watcher, nodePath);
737             continue;
738           }
739           boolean needMoreRecovery = false;
740           int tmpFailedServerSize = regionFailedServers.size();
741           for (int j = 0; j < tmpFailedServerSize; j++) {
742             if (knownFailedServers.contains(regionFailedServers.get(j))) {
743               needMoreRecovery = true;
744               break;
745             }
746           }
747           if (!needMoreRecovery) {
748             ZKUtil.deleteNodeRecursively(watcher, nodePath);
749           }
750         }
751       }
752     } catch (KeeperException e) {
753       throw new IOException(e);
754     }
755   }
756 
757   @Override
758   public synchronized boolean isReplaying() {
759     return this.recoveryMode == RecoveryMode.LOG_REPLAY;
760   }
761 
762   @Override
763   public synchronized boolean isSplitting() {
764     return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
765   }
766 
767   private List<String> listSplitLogTasks() throws KeeperException {
768     List<String> taskOrRescanList = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
769     if (taskOrRescanList == null || taskOrRescanList.isEmpty()) {
770       return Collections.<String> emptyList();
771     }
772     List<String> taskList = new ArrayList<String>();
773     for (String taskOrRescan : taskOrRescanList) {
774       // Remove rescan nodes
775       if (!ZKSplitLog.isRescanNode(taskOrRescan)) {
776         taskList.add(taskOrRescan);
777       }
778     }
779     return taskList;
780   }
781 
782   /**
783    * This function is to set recovery mode from outstanding split log tasks from before or current
784    * configuration setting
785    * @param isForInitialization
786    * @throws IOException
787    */
788   @Override
789   public void setRecoveryMode(boolean isForInitialization) throws IOException {
790     synchronized(this) {
791       if (this.isDrainingDone) {
792         // when there is no outstanding splitlogtask after master start up, we already have up to 
793         // date recovery mode
794         return;
795       }
796     }
797     if (this.watcher == null) {
798       // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
799       synchronized(this) {
800         this.isDrainingDone = true;
801         this.recoveryMode = RecoveryMode.LOG_SPLITTING;
802       }
803       return;
804     }
805     boolean hasSplitLogTask = false;
806     boolean hasRecoveringRegions = false;
807     RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
808     RecoveryMode recoveryModeInConfig =
809         (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
810 
811     // Firstly check if there are outstanding recovering regions
812     try {
813       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
814       if (regions != null && !regions.isEmpty()) {
815         hasRecoveringRegions = true;
816         previousRecoveryMode = RecoveryMode.LOG_REPLAY;
817       }
818       if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
819         // Secondly check if there are outstanding split log task
820         List<String> tasks = listSplitLogTasks();
821         if (!tasks.isEmpty()) {
822           hasSplitLogTask = true;
823           if (isForInitialization) {
824             // during initialization, try to get recovery mode from splitlogtask
825             int listSize = tasks.size();
826             for (int i = 0; i < listSize; i++) {
827               String task = tasks.get(i);
828               try {
829                 byte[] data =
830                     ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, task));
831                 if (data == null) continue;
832                 SplitLogTask slt = SplitLogTask.parseFrom(data);
833                 previousRecoveryMode = slt.getMode();
834                 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
835                   // created by old code base where we don't set recovery mode in splitlogtask
836                   // we can safely set to LOG_SPLITTING because we're in master initialization code
837                   // before SSH is enabled & there is no outstanding recovering regions
838                   previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
839                 }
840                 break;
841               } catch (DeserializationException e) {
842                 LOG.warn("Failed parse data for znode " + task, e);
843               } catch (InterruptedException e) {
844                 throw new InterruptedIOException();
845               }
846             }
847           }
848         }
849       }
850     } catch (KeeperException e) {
851       throw new IOException(e);
852     }
853 
854     synchronized (this) {
855       if (this.isDrainingDone) {
856         return;
857       }
858       if (!hasSplitLogTask && !hasRecoveringRegions) {
859         this.isDrainingDone = true;
860         this.recoveryMode = recoveryModeInConfig;
861         return;
862       } else if (!isForInitialization) {
863         // splitlogtask hasn't drained yet, keep existing recovery mode
864         return;
865       }
866 
867       if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
868         this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
869         this.recoveryMode = previousRecoveryMode;
870       } else {
871         this.recoveryMode = recoveryModeInConfig;
872       }
873     }
874   }
875 
876   /**
877    * Returns if distributed log replay is turned on or not
878    * @param conf
879    * @return true when distributed log replay is turned on
880    */
881   private boolean isDistributedLogReplay(Configuration conf) {
882     boolean dlr =
883         conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
884           HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
885     if (LOG.isDebugEnabled()) {
886       LOG.debug("Distributed log replay=" + dlr);
887     }
888     return dlr;
889   }
890 
891   private boolean resubmit(ServerName serverName, String path, int version) {
892     try {
893       // blocking zk call but this is done from the timeout thread
894       SplitLogTask slt =
895           new SplitLogTask.Unassigned(this.details.getServerName(), getRecoveryMode());
896       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
897         LOG.debug("failed to resubmit task " + path + " version changed");
898         return false;
899       }
900     } catch (NoNodeException e) {
901       LOG.warn("failed to resubmit because znode doesn't exist " + path
902           + " task done (or forced done by removing the znode)");
903       try {
904         getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
905       } catch (DeserializationException e1) {
906         LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
907         return false;
908       }
909       return false;
910     } catch (KeeperException.BadVersionException e) {
911       LOG.debug("failed to resubmit task " + path + " version changed");
912       return false;
913     } catch (KeeperException e) {
914       SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
915       LOG.warn("failed to resubmit " + path, e);
916       return false;
917     }
918     return true;
919   }
920 
921 
922   /**
923    * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use 
924    * objects implementing this interface to finish off a partially
925    * done task by {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. 
926    * This provides a serialization point at the end of the task
927    * processing. Must be restartable and idempotent.
928    */
929   public interface TaskFinisher {
930     /**
931      * status that can be returned finish()
932      */
933     enum Status {
934       /**
935        * task completed successfully
936        */
937       DONE(),
938       /**
939        * task completed with error
940        */
941       ERR();
942     }
943 
944     /**
945      * finish the partially done task. workername provides clue to where the partial results of the
946      * partially done tasks are present. taskname is the name of the task that was put up in
947      * zookeeper.
948      * <p>
949      * @param workerName
950      * @param taskname
951      * @return DONE if task completed successfully, ERR otherwise
952      */
953     Status finish(ServerName workerName, String taskname);
954   }
955 
956   /**
957    * Asynchronous handler for zk create node results. Retries on failures.
958    */
959   public class CreateAsyncCallback implements AsyncCallback.StringCallback {
960     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
961 
962     @Override
963     public void processResult(int rc, String path, Object ctx, String name) {
964       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
965       if (rc != 0) {
966         if (needAbandonRetries(rc, "Create znode " + path)) {
967           createNodeFailure(path);
968           return;
969         }
970         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
971           // What if there is a delete pending against this pre-existing
972           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
973           // state. Only operations that will be carried out on this node by
974           // this manager are get-znode-data, task-finisher and delete-znode.
975           // And all code pieces correctly handle the case of suddenly
976           // disappearing task-znode.
977           LOG.debug("found pre-existing znode " + path);
978           SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
979         } else {
980           Long retry_count = (Long) ctx;
981           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path
982               + " remaining retries=" + retry_count);
983           if (retry_count == 0) {
984             SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
985             createNodeFailure(path);
986           } else {
987             SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
988             createNode(path, retry_count - 1);
989           }
990           return;
991         }
992       }
993       createNodeSuccess(path);
994     }
995   }
996 
997   /**
998    * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures.
999    */
1000   public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1001     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1002 
1003     @Override
1004     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
1005       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1006       if (rc != 0) {
1007         if (needAbandonRetries(rc, "GetData from znode " + path)) {
1008           return;
1009         }
1010         if (rc == KeeperException.Code.NONODE.intValue()) {
1011           SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1012           LOG.warn("task znode " + path + " vanished or not created yet.");
1013           // ignore since we should not end up in a case where there is in-memory task,
1014           // but no znode. The only case is between the time task is created in-memory
1015           // and the znode is created. See HBASE-11217.
1016           return;
1017         }
1018         Long retry_count = (Long) ctx;
1019 
1020         if (retry_count < 0) {
1021           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
1022               + ". Ignoring error. No error handling. No retrying.");
1023           return;
1024         }
1025         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
1026             + " remaining retries=" + retry_count);
1027         if (retry_count == 0) {
1028           SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1029           getDataSetWatchFailure(path);
1030         } else {
1031           SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1032           getDataSetWatch(path, retry_count - 1);
1033         }
1034         return;
1035       }
1036       try {
1037         getDataSetWatchSuccess(path, data, stat.getVersion());
1038       } catch (DeserializationException e) {
1039         LOG.warn("Deserialization problem", e);
1040       }
1041       return;
1042     }
1043   }
1044 
1045   /**
1046    * Asynchronous handler for zk delete node results. Retries on failures.
1047    */
1048   public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1049     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1050 
1051     @Override
1052     public void processResult(int rc, String path, Object ctx) {
1053       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1054       if (rc != 0) {
1055         if (needAbandonRetries(rc, "Delete znode " + path)) {
1056           details.getFailedDeletions().add(path);
1057           return;
1058         }
1059         if (rc != KeeperException.Code.NONODE.intValue()) {
1060           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1061           Long retry_count = (Long) ctx;
1062           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
1063               + " remaining retries=" + retry_count);
1064           if (retry_count == 0) {
1065             LOG.warn("delete failed " + path);
1066             details.getFailedDeletions().add(path);
1067             deleteNodeFailure(path);
1068           } else {
1069             deleteNode(path, retry_count - 1);
1070           }
1071           return;
1072         } else {
1073           LOG.info(path + " does not exist. Either was created but deleted behind our"
1074               + " back by another pending delete OR was deleted"
1075               + " in earlier retry rounds. zkretries = " + ctx);
1076         }
1077       } else {
1078         LOG.debug("deleted " + path);
1079       }
1080       deleteNodeSuccess(path);
1081     }
1082   }
1083 
1084   /**
1085    * Asynchronous handler for zk create RESCAN-node results. Retries on failures.
1086    * <p>
1087    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the
1088    * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks.
1089    */
1090   public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1091     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1092 
1093     @Override
1094     public void processResult(int rc, String path, Object ctx, String name) {
1095       if (rc != 0) {
1096         if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1097           return;
1098         }
1099         Long retry_count = (Long) ctx;
1100         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
1101             + retry_count);
1102         if (retry_count == 0) {
1103           createRescanFailure();
1104         } else {
1105           rescan(retry_count - 1);
1106         }
1107         return;
1108       }
1109       // path is the original arg, name is the actual name that was created
1110       createRescanSuccess(name);
1111     }
1112   }
1113 
1114   @Override
1115   public void setDetails(SplitLogManagerDetails details) {
1116     this.details = details;
1117   }
1118 
1119   @Override
1120   public SplitLogManagerDetails getDetails() {
1121     return details;
1122   }
1123 
1124   @Override
1125   public synchronized RecoveryMode getRecoveryMode() {
1126     return recoveryMode;
1127   }
1128 
1129   @Override
1130   public long getLastRecoveryTime() {
1131     return lastRecoveringNodeCreationTime;
1132   }
1133 
1134   /**
1135    * Temporary function that is used by unit tests only
1136    */
1137   public void setIgnoreDeleteForTesting(boolean b) {
1138     ignoreZKDeleteForTesting = b;
1139   }
1140 }