001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure.flush;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.ThreadPoolExecutor;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.MetaTableAccessor;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.errorhandling.ForeignException;
034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
035import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
036import org.apache.hadoop.hbase.master.MasterServices;
037import org.apache.hadoop.hbase.master.MetricsMaster;
038import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
039import org.apache.hadoop.hbase.procedure.Procedure;
040import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
041import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
042import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
043import org.apache.hadoop.hbase.security.User;
044import org.apache.hadoop.hbase.security.access.AccessChecker;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.zookeeper.KeeperException;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
055
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
057public class MasterFlushTableProcedureManager extends MasterProcedureManager {
058
059  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
060
061  private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis";
062  private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
063  private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis";
064  private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500;
065
066  private static final String FLUSH_PROC_POOL_THREADS_KEY =
067      "hbase.flush.procedure.master.threads";
068  private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1;
069
070  private static final Logger LOG = LoggerFactory.getLogger(MasterFlushTableProcedureManager.class);
071
072  private MasterServices master;
073  private ProcedureCoordinator coordinator;
074  private Map<TableName, Procedure> procMap = new HashMap<>();
075  private boolean stopped;
076
077  public MasterFlushTableProcedureManager() {};
078
079  @Override
080  public void stop(String why) {
081    LOG.info("stop: " + why);
082    this.stopped = true;
083  }
084
085  @Override
086  public boolean isStopped() {
087    return this.stopped;
088  }
089
090  @Override
091  public void initialize(MasterServices master, MetricsMaster metricsMaster)
092      throws KeeperException, IOException, UnsupportedOperationException {
093    this.master = master;
094
095    // get the configuration for the coordinator
096    Configuration conf = master.getConfiguration();
097    long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
098    long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
099    int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
100
101    // setup the procedure coordinator
102    String name = master.getServerName().toString();
103    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
104    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
105        master.getZooKeeper(), getProcedureSignature(), name);
106
107    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
108  }
109
110  @Override
111  public String getProcedureSignature() {
112    return FLUSH_TABLE_PROCEDURE_SIGNATURE;
113  }
114
115  @Override
116  public void execProcedure(ProcedureDescription desc) throws IOException {
117
118    TableName tableName = TableName.valueOf(desc.getInstance());
119
120    // call pre coproc hook
121    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
122    if (cpHost != null) {
123      cpHost.preTableFlush(tableName);
124    }
125
126    // Get the list of region servers that host the online regions for table.
127    // We use the procedure instance name to carry the table name from the client.
128    // It is possible that regions may move after we get the region server list.
129    // Each region server will get its own online regions for the table.
130    // We may still miss regions that need to be flushed.
131    List<Pair<RegionInfo, ServerName>> regionsAndLocations;
132
133    if (TableName.META_TABLE_NAME.equals(tableName)) {
134      regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations(
135        master.getZooKeeper());
136    } else {
137      regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
138        master.getConnection(), tableName, false);
139    }
140
141    Set<String> regionServers = new HashSet<>(regionsAndLocations.size());
142    for (Pair<RegionInfo, ServerName> region : regionsAndLocations) {
143      if (region != null && region.getFirst() != null && region.getSecond() != null) {
144        RegionInfo hri = region.getFirst();
145        if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
146        regionServers.add(region.getSecond().toString());
147      }
148    }
149
150    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
151
152    // Kick of the global procedure from the master coordinator to the region servers.
153    // We rely on the existing Distributed Procedure framework to prevent any concurrent
154    // procedure with the same name.
155    Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
156      new byte[0], Lists.newArrayList(regionServers));
157    monitor.rethrowException();
158    if (proc == null) {
159      String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
160          + desc.getInstance() + "'. " + "Another flush procedure is running?";
161      LOG.error(msg);
162      throw new IOException(msg);
163    }
164
165    procMap.put(tableName, proc);
166
167    try {
168      // wait for the procedure to complete.  A timer thread is kicked off that should cancel this
169      // if it takes too long.
170      proc.waitForCompleted();
171      LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '"
172          + desc.getInstance() + "'");
173      LOG.info("Master flush table procedure is successful!");
174    } catch (InterruptedException e) {
175      ForeignException ee =
176          new ForeignException("Interrupted while waiting for flush table procdure to finish", e);
177      monitor.receive(ee);
178      Thread.currentThread().interrupt();
179    } catch (ForeignException e) {
180      ForeignException ee =
181          new ForeignException("Exception while waiting for flush table procdure to finish", e);
182      monitor.receive(ee);
183    }
184    monitor.rethrowException();
185  }
186
187  @Override
188  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
189      throws IOException {
190    // Done by AccessController as part of preTableFlush coprocessor hook (legacy code path).
191    // In future, when we AC is removed for good, that check should be moved here.
192  }
193
194  @Override
195  public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException {
196    // Procedure instance name is the table name.
197    TableName tableName = TableName.valueOf(desc.getInstance());
198    Procedure proc = procMap.get(tableName);
199    if (proc == null) {
200      // The procedure has not even been started yet.
201      // The client would request the procedure and call isProcedureDone().
202      // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone().
203      return false;
204    }
205    // We reply on the existing Distributed Procedure framework to give us the status.
206    return proc.isCompleted();
207  }
208
209}