001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.Future;
030import java.util.concurrent.ThreadPoolExecutor;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.errorhandling.ForeignException;
034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
035import org.apache.hadoop.hbase.regionserver.RegionServerServices;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.apache.zookeeper.KeeperException;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044public class SimpleRSProcedureManager extends RegionServerProcedureManager {
045
046  private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
047
048  private RegionServerServices rss;
049  private ProcedureMemberRpcs memberRpcs;
050  private ProcedureMember member;
051
052  @Override
053  public void initialize(RegionServerServices rss) throws KeeperException {
054    this.rss = rss;
055    ZKWatcher zkw = rss.getZooKeeper();
056    this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
057
058    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
059    this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
060    LOG.info("Initialized: " + rss.getServerName().toString());
061  }
062
063  @Override
064  public void start() {
065    this.memberRpcs.start(rss.getServerName().toString(), member);
066    LOG.info("Started.");
067  }
068
069  @Override
070  public void stop(boolean force) throws IOException {
071    LOG.info("stop: " + force);
072    try {
073      this.member.close();
074    } finally {
075      this.memberRpcs.close();
076    }
077  }
078
079  @Override
080  public String getProcedureSignature() {
081    return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
082  }
083
084  /**
085   * If in a running state, creates the specified subprocedure for handling a procedure.
086   * @return Subprocedure to submit to the ProcedureMember.
087   */
088  public Subprocedure buildSubprocedure(String name) {
089
090    // don't run a procedure if the parent is stop(ping)
091    if (rss.isStopping() || rss.isStopped()) {
092      throw new IllegalStateException(
093        "Can't start procedure on RS: " + rss.getServerName() + ", because stopping/stopped!");
094    }
095
096    LOG.info("Attempting to run a procedure.");
097    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
098    Configuration conf = rss.getConfiguration();
099
100    SimpleSubprocedurePool taskManager =
101      new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
102    return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
103  }
104
105  /**
106   * Build the actual procedure runner that will do all the 'hard' work
107   */
108  public class SimleSubprocedureBuilder implements SubprocedureFactory {
109
110    @Override
111    public Subprocedure buildSubprocedure(String name, byte[] data) {
112      LOG.info("Building procedure: " + name);
113      return SimpleRSProcedureManager.this.buildSubprocedure(name);
114    }
115  }
116
117  public static class SimpleSubprocedurePool implements Closeable, Abortable {
118
119    private final ExecutorCompletionService<Void> taskPool;
120    private final ExecutorService executor;
121    private volatile boolean aborted;
122    private final List<Future<Void>> futures = new ArrayList<>();
123    private final String name;
124
125    public SimpleSubprocedurePool(String name, Configuration conf) {
126      this.name = name;
127      executor = Executors.newSingleThreadExecutor(
128        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
129          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
130      taskPool = new ExecutorCompletionService<>(executor);
131    }
132
133    /**
134     * Submit a task to the pool.
135     */
136    public void submitTask(final Callable<Void> task) {
137      Future<Void> f = this.taskPool.submit(task);
138      futures.add(f);
139    }
140
141    /**
142     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
143     * @return <tt>true</tt> on success, <tt>false</tt> otherwise n
144     */
145    public boolean waitForOutstandingTasks() throws ForeignException {
146      LOG.debug("Waiting for procedure to finish.");
147
148      try {
149        for (Future<Void> f : futures) {
150          f.get();
151        }
152        return true;
153      } catch (InterruptedException e) {
154        if (aborted)
155          throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
156            e);
157        Thread.currentThread().interrupt();
158      } catch (ExecutionException e) {
159        if (e.getCause() instanceof ForeignException) {
160          throw (ForeignException) e.getCause();
161        }
162        throw new ForeignException(name, e.getCause());
163      } finally {
164        // close off remaining tasks
165        for (Future<Void> f : futures) {
166          if (!f.isDone()) {
167            f.cancel(true);
168          }
169        }
170      }
171      return false;
172    }
173
174    /**
175     * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
176     * finish
177     */
178    @Override
179    public void close() {
180      executor.shutdown();
181    }
182
183    @Override
184    public void abort(String why, Throwable e) {
185      if (this.aborted) return;
186
187      this.aborted = true;
188      LOG.warn("Aborting because: " + why, e);
189      this.executor.shutdownNow();
190    }
191
192    @Override
193    public boolean isAborted() {
194      return this.aborted;
195    }
196  }
197
198  public class SimpleSubprocedure extends Subprocedure {
199    private final RegionServerServices rss;
200    private final SimpleSubprocedurePool taskManager;
201
202    public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
203      ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
204      super(member, name, errorListener, 500, 60000);
205      LOG.info("Constructing a SimpleSubprocedure.");
206      this.rss = rss;
207      this.taskManager = taskManager;
208    }
209
210    /**
211     * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
212     * with no use of subprocedurepool.
213     */
214    class RSSimpleTask implements Callable<Void> {
215      RSSimpleTask() {
216      }
217
218      @Override
219      public Void call() throws Exception {
220        LOG.info("Execute subprocedure on " + rss.getServerName().toString());
221        return null;
222      }
223
224    }
225
226    private void execute() throws ForeignException {
227
228      monitor.rethrowException();
229
230      // running a task (e.g., roll log, flush table) on region server
231      taskManager.submitTask(new RSSimpleTask());
232      monitor.rethrowException();
233
234      // wait for everything to complete.
235      taskManager.waitForOutstandingTasks();
236      monitor.rethrowException();
237
238    }
239
240    @Override
241    public void acquireBarrier() throws ForeignException {
242      // do nothing, executing in inside barrier step.
243    }
244
245    /**
246     * do a log roll.
247     */
248    @Override
249    public byte[] insideBarrier() throws ForeignException {
250      execute();
251      return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes();
252    }
253
254    /**
255     * Cancel threads if they haven't finished.
256     */
257    @Override
258    public void cleanup(Exception e) {
259      taskManager.abort("Aborting simple subprocedure tasks due to error", e);
260    }
261  }
262
263}