001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.net.ConnectException;
025import java.net.SocketTimeoutException;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.NotServingRegionException;
034import org.apache.hadoop.hbase.Server;
035import org.apache.hadoop.hbase.client.RetriesExhaustedException;
036import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
037import org.apache.hadoop.hbase.wal.WALFactory;
038import org.apache.hadoop.hbase.wal.WALSplitter;
039import org.apache.hadoop.hbase.util.CancelableProgressable;
040import org.apache.hadoop.hbase.util.ExceptionUtil;
041import org.apache.hadoop.hbase.util.FSUtils;
042
043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
044
045/**
046 * This worker is spawned in every regionserver, including master. The Worker waits for log
047 * splitting tasks to be put up by the {@link org.apache.hadoop.hbase.master.SplitLogManager}
048 * running in the master and races with other workers in other serves to acquire those tasks.
049 * The coordination is done via coordination engine.
050 * <p>
051 * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task.
052 * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED
053 * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to
054 * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to
055 * RESIGNED.
056 * <p>
057 * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In
058 * the absence of a global lock there is a unavoidable race here - a worker might have just finished
059 * its task when it is stripped of its ownership. Here we rely on the idempotency of the log
060 * splitting task for correctness
061 */
062@InterfaceAudience.Private
063public class SplitLogWorker implements Runnable {
064
065  private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class);
066
067  Thread worker;
068  // thread pool which executes recovery work
069  private SplitLogWorkerCoordination coordination;
070  private Configuration conf;
071  private RegionServerServices server;
072
073  public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
074      TaskExecutor splitTaskExecutor) {
075    this.server = server;
076    this.conf = conf;
077    this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
078    coordination.init(server, conf, splitTaskExecutor, this);
079  }
080
081  public SplitLogWorker(final Server hserver, final Configuration conf,
082      final RegionServerServices server, final LastSequenceId sequenceIdChecker,
083      final WALFactory factory) {
084    this(hserver, conf, server, new TaskExecutor() {
085      @Override
086      public Status exec(String filename, CancelableProgressable p) {
087        Path walDir;
088        FileSystem fs;
089        try {
090          walDir = FSUtils.getWALRootDir(conf);
091          fs = walDir.getFileSystem(conf);
092        } catch (IOException e) {
093          LOG.warn("could not find root dir or fs", e);
094          return Status.RESIGNED;
095        }
096        // TODO have to correctly figure out when log splitting has been
097        // interrupted or has encountered a transient error and when it has
098        // encountered a bad non-retry-able persistent error.
099        // Note: this can actually get the master stuck (HBASE-22289) so treat preempted as error.
100        //       splitLogFile does return false for legitimate retriable errors.
101        try {
102          if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
103            fs, conf, p, sequenceIdChecker,
104              server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
105            return Status.PREEMPTED;
106          }
107        } catch (InterruptedIOException iioe) {
108          LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
109          return Status.RESIGNED;
110        } catch (IOException e) {
111          if (e instanceof FileNotFoundException) {
112            // A wal file may not exist anymore. Nothing can be recovered so move on
113            LOG.warn("WAL {} does not exist anymore", filename, e);
114            return Status.DONE;
115          }
116          Throwable cause = e.getCause();
117          if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
118                  || cause instanceof ConnectException
119                  || cause instanceof SocketTimeoutException)) {
120            LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
121                + "resigning", e);
122            return Status.RESIGNED;
123          } else if (cause instanceof InterruptedException) {
124            LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
125            return Status.RESIGNED;
126          }
127          LOG.warn("log splitting of " + filename + " failed, returning error", e);
128          return Status.ERR;
129        }
130        return Status.DONE;
131      }
132    });
133  }
134
135  @Override
136  public void run() {
137    try {
138      LOG.info("SplitLogWorker " + server.getServerName() + " starting");
139      coordination.registerListener();
140      // wait for Coordination Engine is ready
141      boolean res = false;
142      while (!res && !coordination.isStop()) {
143        res = coordination.isReady();
144      }
145      if (!coordination.isStop()) {
146        coordination.taskLoop();
147      }
148    } catch (Throwable t) {
149      if (ExceptionUtil.isInterrupt(t)) {
150        LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" :
151            " (ERROR: exitWorker is not set, exiting anyway)"));
152      } else {
153        // only a logical error can cause here. Printing it out
154        // to make debugging easier
155        LOG.error("unexpected error ", t);
156      }
157    } finally {
158      coordination.removeListener();
159      LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
160    }
161  }
162
163  /**
164   * If the worker is doing a task i.e. splitting a log file then stop the task.
165   * It doesn't exit the worker thread.
166   */
167  public void stopTask() {
168    LOG.info("Sending interrupt to stop the worker thread");
169    worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
170  }
171
172  /**
173   * start the SplitLogWorker thread
174   */
175  public void start() {
176    worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString());
177    worker.start();
178  }
179
180  /**
181   * stop the SplitLogWorker thread
182   */
183  public void stop() {
184    coordination.stopProcessingTasks();
185    stopTask();
186  }
187
188  /**
189   * Objects implementing this interface actually do the task that has been
190   * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
191   * guarantee that two workers will not be executing the same task therefore it
192   * is better to have workers prepare the task and then have the
193   * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
194   * SplitLogManager.TaskFinisher
195   */
196  public interface TaskExecutor {
197    enum Status {
198      DONE(),
199      ERR(),
200      RESIGNED(),
201      PREEMPTED()
202    }
203    Status exec(String name, CancelableProgressable p);
204  }
205
206  /**
207   * Returns the number of tasks processed by coordination.
208   * This method is used by tests only
209   */
210  @VisibleForTesting
211  public int getTaskReadySeq() {
212    return coordination.getTaskReadySeq();
213  }
214}