001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.Future;
030import java.util.concurrent.ThreadPoolExecutor;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.errorhandling.ForeignException;
034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
035import org.apache.hadoop.hbase.regionserver.RegionServerServices;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.apache.zookeeper.KeeperException;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
043
044public class SimpleRSProcedureManager extends RegionServerProcedureManager {
045
046  private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
047
048  private RegionServerServices rss;
049  private ProcedureMemberRpcs memberRpcs;
050  private ProcedureMember member;
051
052  @Override
053  public void initialize(RegionServerServices rss) throws KeeperException {
054    this.rss = rss;
055    ZKWatcher zkw = rss.getZooKeeper();
056    this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
057
058    ThreadPoolExecutor pool =
059        ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
060    this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
061    LOG.info("Initialized: " + rss.getServerName().toString());
062  }
063
064  @Override
065  public void start() {
066    this.memberRpcs.start(rss.getServerName().toString(), member);
067    LOG.info("Started.");
068  }
069
070  @Override
071  public void stop(boolean force) throws IOException {
072    LOG.info("stop: " + force);
073    try {
074      this.member.close();
075    } finally {
076      this.memberRpcs.close();
077    }
078  }
079
080  @Override
081  public String getProcedureSignature() {
082    return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
083  }
084
085  /**
086   * If in a running state, creates the specified subprocedure for handling a procedure.
087   * @return Subprocedure to submit to the ProcedureMember.
088   */
089  public Subprocedure buildSubprocedure(String name) {
090
091    // don't run a procedure if the parent is stop(ping)
092    if (rss.isStopping() || rss.isStopped()) {
093      throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
094          + ", because stopping/stopped!");
095    }
096
097    LOG.info("Attempting to run a procedure.");
098    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
099    Configuration conf = rss.getConfiguration();
100
101    SimpleSubprocedurePool taskManager =
102        new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
103    return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
104  }
105
106  /**
107   * Build the actual procedure runner that will do all the 'hard' work
108   */
109  public class SimleSubprocedureBuilder implements SubprocedureFactory {
110
111    @Override
112    public Subprocedure buildSubprocedure(String name, byte[] data) {
113      LOG.info("Building procedure: " + name);
114      return SimpleRSProcedureManager.this.buildSubprocedure(name);
115    }
116  }
117
118  public static class SimpleSubprocedurePool implements Closeable, Abortable {
119
120    private final ExecutorCompletionService<Void> taskPool;
121    private final ExecutorService executor;
122    private volatile boolean aborted;
123    private final List<Future<Void>> futures = new ArrayList<>();
124    private final String name;
125
126    public SimpleSubprocedurePool(String name, Configuration conf) {
127      this.name = name;
128      executor = Executors.newSingleThreadExecutor(
129        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
130          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
131      taskPool = new ExecutorCompletionService<>(executor);
132    }
133
134    /**
135     * Submit a task to the pool.
136     */
137    public void submitTask(final Callable<Void> task) {
138      Future<Void> f = this.taskPool.submit(task);
139      futures.add(f);
140    }
141
142    /**
143     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
144     *
145     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
146     * @throws ForeignException
147     */
148    public boolean waitForOutstandingTasks() throws ForeignException {
149      LOG.debug("Waiting for procedure to finish.");
150
151      try {
152        for (Future<Void> f: futures) {
153          f.get();
154        }
155        return true;
156      } catch (InterruptedException e) {
157        if (aborted) throw new ForeignException(
158            "Interrupted and found to be aborted while waiting for tasks!", e);
159        Thread.currentThread().interrupt();
160      } catch (ExecutionException e) {
161        if (e.getCause() instanceof ForeignException) {
162          throw (ForeignException) e.getCause();
163        }
164        throw new ForeignException(name, e.getCause());
165      } finally {
166        // close off remaining tasks
167        for (Future<Void> f: futures) {
168          if (!f.isDone()) {
169            f.cancel(true);
170          }
171        }
172      }
173      return false;
174    }
175
176    /**
177     * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
178     * finish
179     */
180    @Override
181    public void close() {
182      executor.shutdown();
183    }
184
185    @Override
186    public void abort(String why, Throwable e) {
187      if (this.aborted) return;
188
189      this.aborted = true;
190      LOG.warn("Aborting because: " + why, e);
191      this.executor.shutdownNow();
192    }
193
194    @Override
195    public boolean isAborted() {
196      return this.aborted;
197    }
198  }
199
200  public class SimpleSubprocedure extends Subprocedure {
201    private final RegionServerServices rss;
202    private final SimpleSubprocedurePool taskManager;
203
204    public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
205        ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
206      super(member, name, errorListener, 500, 60000);
207      LOG.info("Constructing a SimpleSubprocedure.");
208      this.rss = rss;
209      this.taskManager = taskManager;
210    }
211
212    /**
213     * Callable task.
214     * TODO. We don't need a thread pool to execute roll log. This can be simplified
215     * with no use of subprocedurepool.
216     */
217    class RSSimpleTask implements Callable<Void> {
218      RSSimpleTask() {}
219
220      @Override
221      public Void call() throws Exception {
222        LOG.info("Execute subprocedure on " + rss.getServerName().toString());
223        return null;
224      }
225
226    }
227
228    private void execute() throws ForeignException {
229
230      monitor.rethrowException();
231
232      // running a task (e.g., roll log, flush table) on region server
233      taskManager.submitTask(new RSSimpleTask());
234      monitor.rethrowException();
235
236      // wait for everything to complete.
237      taskManager.waitForOutstandingTasks();
238      monitor.rethrowException();
239
240    }
241
242    @Override
243    public void acquireBarrier() throws ForeignException {
244      // do nothing, executing in inside barrier step.
245    }
246
247    /**
248     * do a log roll.
249     */
250    @Override
251    public byte[] insideBarrier() throws ForeignException {
252      execute();
253      return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes();
254    }
255
256    /**
257     * Cancel threads if they haven't finished.
258     */
259    @Override
260    public void cleanup(Exception e) {
261      taskManager.abort("Aborting simple subprocedure tasks due to error", e);
262    }
263  }
264
265}