001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.Future;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.DaemonThreadFactory;
035import org.apache.hadoop.hbase.regionserver.RegionServerServices;
036import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
037import org.apache.hadoop.hbase.errorhandling.ForeignException;
038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
039import org.apache.zookeeper.KeeperException;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043public class SimpleRSProcedureManager extends RegionServerProcedureManager {
044
045  private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
046
047  private RegionServerServices rss;
048  private ProcedureMemberRpcs memberRpcs;
049  private ProcedureMember member;
050
051  @Override
052  public void initialize(RegionServerServices rss) throws KeeperException {
053    this.rss = rss;
054    ZKWatcher zkw = rss.getZooKeeper();
055    this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
056
057    ThreadPoolExecutor pool =
058        ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
059    this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
060    LOG.info("Initialized: " + rss.getServerName().toString());
061  }
062
063  @Override
064  public void start() {
065    this.memberRpcs.start(rss.getServerName().toString(), member);
066    LOG.info("Started.");
067  }
068
069  @Override
070  public void stop(boolean force) throws IOException {
071    LOG.info("stop: " + force);
072    try {
073      this.member.close();
074    } finally {
075      this.memberRpcs.close();
076    }
077  }
078
079  @Override
080  public String getProcedureSignature() {
081    return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
082  }
083
084  /**
085   * If in a running state, creates the specified subprocedure for handling a procedure.
086   * @return Subprocedure to submit to the ProcedureMemeber.
087   */
088  public Subprocedure buildSubprocedure(String name) {
089
090    // don't run a procedure if the parent is stop(ping)
091    if (rss.isStopping() || rss.isStopped()) {
092      throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
093          + ", because stopping/stopped!");
094    }
095
096    LOG.info("Attempting to run a procedure.");
097    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
098    Configuration conf = rss.getConfiguration();
099
100    SimpleSubprocedurePool taskManager =
101        new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
102    return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
103  }
104
105  /**
106   * Build the actual procedure runner that will do all the 'hard' work
107   */
108  public class SimleSubprocedureBuilder implements SubprocedureFactory {
109
110    @Override
111    public Subprocedure buildSubprocedure(String name, byte[] data) {
112      LOG.info("Building procedure: " + name);
113      return SimpleRSProcedureManager.this.buildSubprocedure(name);
114    }
115  }
116
117  public class SimpleSubprocedurePool implements Closeable, Abortable {
118
119    private final ExecutorCompletionService<Void> taskPool;
120    private final ThreadPoolExecutor executor;
121    private volatile boolean aborted;
122    private final List<Future<Void>> futures = new ArrayList<>();
123    private final String name;
124
125    public SimpleSubprocedurePool(String name, Configuration conf) {
126      this.name = name;
127      executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
128          new LinkedBlockingQueue<>(),
129          new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
130      taskPool = new ExecutorCompletionService<>(executor);
131    }
132
133    /**
134     * Submit a task to the pool.
135     */
136    public void submitTask(final Callable<Void> task) {
137      Future<Void> f = this.taskPool.submit(task);
138      futures.add(f);
139    }
140
141    /**
142     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
143     *
144     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
145     * @throws ForeignException
146     */
147    public boolean waitForOutstandingTasks() throws ForeignException {
148      LOG.debug("Waiting for procedure to finish.");
149
150      try {
151        for (Future<Void> f: futures) {
152          f.get();
153        }
154        return true;
155      } catch (InterruptedException e) {
156        if (aborted) throw new ForeignException(
157            "Interrupted and found to be aborted while waiting for tasks!", e);
158        Thread.currentThread().interrupt();
159      } catch (ExecutionException e) {
160        if (e.getCause() instanceof ForeignException) {
161          throw (ForeignException) e.getCause();
162        }
163        throw new ForeignException(name, e.getCause());
164      } finally {
165        // close off remaining tasks
166        for (Future<Void> f: futures) {
167          if (!f.isDone()) {
168            f.cancel(true);
169          }
170        }
171      }
172      return false;
173    }
174
175    /**
176     * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
177     * finish
178     */
179    @Override
180    public void close() {
181      executor.shutdown();
182    }
183
184    @Override
185    public void abort(String why, Throwable e) {
186      if (this.aborted) return;
187
188      this.aborted = true;
189      LOG.warn("Aborting because: " + why, e);
190      this.executor.shutdownNow();
191    }
192
193    @Override
194    public boolean isAborted() {
195      return this.aborted;
196    }
197  }
198
199  public class SimpleSubprocedure extends Subprocedure {
200    private final RegionServerServices rss;
201    private final SimpleSubprocedurePool taskManager;
202
203    public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
204        ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
205      super(member, name, errorListener, 500, 60000);
206      LOG.info("Constructing a SimpleSubprocedure.");
207      this.rss = rss;
208      this.taskManager = taskManager;
209    }
210
211    /**
212     * Callable task.
213     * TODO. We don't need a thread pool to execute roll log. This can be simplified
214     * with no use of subprocedurepool.
215     */
216    class RSSimpleTask implements Callable<Void> {
217      RSSimpleTask() {}
218
219      @Override
220      public Void call() throws Exception {
221        LOG.info("Execute subprocedure on " + rss.getServerName().toString());
222        return null;
223      }
224
225    }
226
227    private void execute() throws ForeignException {
228
229      monitor.rethrowException();
230
231      // running a task (e.g., roll log, flush table) on region server
232      taskManager.submitTask(new RSSimpleTask());
233      monitor.rethrowException();
234
235      // wait for everything to complete.
236      taskManager.waitForOutstandingTasks();
237      monitor.rethrowException();
238
239    }
240
241    @Override
242    public void acquireBarrier() throws ForeignException {
243      // do nothing, executing in inside barrier step.
244    }
245
246    /**
247     * do a log roll.
248     */
249    @Override
250    public byte[] insideBarrier() throws ForeignException {
251      execute();
252      return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes();
253    }
254
255    /**
256     * Cancel threads if they haven't finished.
257     */
258    @Override
259    public void cleanup(Exception e) {
260      taskManager.abort("Aborting simple subprocedure tasks due to error", e);
261    }
262  }
263
264}