001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.Future;
030import java.util.concurrent.ThreadPoolExecutor;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.errorhandling.ForeignException;
034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
035import org.apache.hadoop.hbase.regionserver.RegionServerServices;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
039import org.apache.zookeeper.KeeperException;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045public class SimpleRSProcedureManager extends RegionServerProcedureManager {
046
047  private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
048
049  private RegionServerServices rss;
050  private ProcedureMemberRpcs memberRpcs;
051  private ProcedureMember member;
052
053  @Override
054  public void initialize(RegionServerServices rss) throws KeeperException {
055    this.rss = rss;
056    ZKWatcher zkw = rss.getZooKeeper();
057    this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
058
059    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
060    this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
061    LOG.info("Initialized: " + rss.getServerName().toString());
062  }
063
064  @Override
065  public void start() {
066    this.memberRpcs.start(rss.getServerName().toString(), member);
067    LOG.info("Started.");
068  }
069
070  @Override
071  public void stop(boolean force) throws IOException {
072    LOG.info("stop: " + force);
073    try {
074      this.member.close();
075    } finally {
076      this.memberRpcs.close();
077    }
078  }
079
080  @Override
081  public String getProcedureSignature() {
082    return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
083  }
084
085  /**
086   * If in a running state, creates the specified subprocedure for handling a procedure.
087   * @return Subprocedure to submit to the ProcedureMember.
088   */
089  public Subprocedure buildSubprocedure(String name) {
090
091    // don't run a procedure if the parent is stop(ping)
092    if (rss.isStopping() || rss.isStopped()) {
093      throw new IllegalStateException(
094        "Can't start procedure on RS: " + rss.getServerName() + ", because stopping/stopped!");
095    }
096
097    LOG.info("Attempting to run a procedure.");
098    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
099    Configuration conf = rss.getConfiguration();
100
101    SimpleSubprocedurePool taskManager =
102      new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
103    return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
104  }
105
106  /**
107   * Build the actual procedure runner that will do all the 'hard' work
108   */
109  public class SimleSubprocedureBuilder implements SubprocedureFactory {
110
111    @Override
112    public Subprocedure buildSubprocedure(String name, byte[] data) {
113      LOG.info("Building procedure: " + name);
114      return SimpleRSProcedureManager.this.buildSubprocedure(name);
115    }
116  }
117
118  public static class SimpleSubprocedurePool implements Closeable, Abortable {
119
120    private final ExecutorCompletionService<Void> taskPool;
121    private final ExecutorService executor;
122    private volatile boolean aborted;
123    private final List<Future<Void>> futures = new ArrayList<>();
124    private final String name;
125
126    public SimpleSubprocedurePool(String name, Configuration conf) {
127      this.name = name;
128      executor = Executors.newSingleThreadExecutor(
129        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
130          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
131      taskPool = new ExecutorCompletionService<>(executor);
132    }
133
134    /**
135     * Submit a task to the pool.
136     */
137    public void submitTask(final Callable<Void> task) {
138      Future<Void> f = this.taskPool.submit(task);
139      futures.add(f);
140    }
141
142    /**
143     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
144     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
145     */
146    public boolean waitForOutstandingTasks() throws ForeignException {
147      LOG.debug("Waiting for procedure to finish.");
148
149      try {
150        for (Future<Void> f : futures) {
151          f.get();
152        }
153        return true;
154      } catch (InterruptedException e) {
155        if (aborted)
156          throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
157            e);
158        Thread.currentThread().interrupt();
159      } catch (ExecutionException e) {
160        if (e.getCause() instanceof ForeignException) {
161          throw (ForeignException) e.getCause();
162        }
163        throw new ForeignException(name, e.getCause());
164      } finally {
165        // close off remaining tasks
166        for (Future<Void> f : futures) {
167          if (!f.isDone()) {
168            f.cancel(true);
169          }
170        }
171      }
172      return false;
173    }
174
175    /**
176     * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
177     * finish
178     */
179    @Override
180    public void close() {
181      executor.shutdown();
182    }
183
184    @Override
185    public void abort(String why, Throwable e) {
186      if (this.aborted) return;
187
188      this.aborted = true;
189      LOG.warn("Aborting because: " + why, e);
190      this.executor.shutdownNow();
191    }
192
193    @Override
194    public boolean isAborted() {
195      return this.aborted;
196    }
197  }
198
199  public class SimpleSubprocedure extends Subprocedure {
200    private final RegionServerServices rss;
201    private final SimpleSubprocedurePool taskManager;
202
203    public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
204      ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
205      super(member, name, errorListener, 500, 60000);
206      LOG.info("Constructing a SimpleSubprocedure.");
207      this.rss = rss;
208      this.taskManager = taskManager;
209    }
210
211    /**
212     * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
213     * with no use of subprocedurepool.
214     */
215    class RSSimpleTask implements Callable<Void> {
216      RSSimpleTask() {
217      }
218
219      @Override
220      public Void call() throws Exception {
221        LOG.info("Execute subprocedure on " + rss.getServerName().toString());
222        return null;
223      }
224
225    }
226
227    private void execute() throws ForeignException {
228
229      monitor.rethrowException();
230
231      // running a task (e.g., roll log, flush table) on region server
232      taskManager.submitTask(new RSSimpleTask());
233      monitor.rethrowException();
234
235      // wait for everything to complete.
236      taskManager.waitForOutstandingTasks();
237      monitor.rethrowException();
238
239    }
240
241    @Override
242    public void acquireBarrier() throws ForeignException {
243      // do nothing, executing in inside barrier step.
244    }
245
246    /**
247     * do a log roll.
248     */
249    @Override
250    public byte[] insideBarrier() throws ForeignException {
251      execute();
252      return Bytes.toBytes(SimpleMasterProcedureManager.SIMPLE_DATA);
253    }
254
255    /**
256     * Cancel threads if they haven't finished.
257     */
258    @Override
259    public void cleanup(Exception e) {
260      taskManager.abort("Aborting simple subprocedure tasks due to error", e);
261    }
262  }
263
264}