View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure.flush;
19  
20  import java.io.IOException;
21  import java.util.HashMap;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.concurrent.ThreadPoolExecutor;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.MetaTableAccessor;
37  import org.apache.hadoop.hbase.errorhandling.ForeignException;
38  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
39  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
40  import org.apache.hadoop.hbase.master.MasterServices;
41  import org.apache.hadoop.hbase.master.MetricsMaster;
42  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
43  import org.apache.hadoop.hbase.procedure.Procedure;
44  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
45  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
46  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
47  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
50  import org.apache.zookeeper.KeeperException;
51  
52  import com.google.common.collect.Lists;
53  
54  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
55  public class MasterFlushTableProcedureManager extends MasterProcedureManager {
56  
57    public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
58  
59    private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis";
60    private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
61    private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis";
62    private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500;
63  
64    private static final String FLUSH_PROC_POOL_THREADS_KEY =
65        "hbase.flush.procedure.master.threads";
66    private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1;
67  
68    private static final Log LOG = LogFactory.getLog(MasterFlushTableProcedureManager.class);
69  
70    private MasterServices master;
71    private ProcedureCoordinator coordinator;
72    private Map<TableName, Procedure> procMap = new HashMap<TableName, Procedure>();
73    private boolean stopped;
74  
75    public MasterFlushTableProcedureManager() {};
76  
77    @Override
78    public void stop(String why) {
79      LOG.info("stop: " + why);
80      this.stopped = true;
81    }
82  
83    @Override
84    public boolean isStopped() {
85      return this.stopped;
86    }
87  
88    @Override
89    public void initialize(MasterServices master, MetricsMaster metricsMaster)
90        throws KeeperException, IOException, UnsupportedOperationException {
91      this.master = master;
92  
93      // get the configuration for the coordinator
94      Configuration conf = master.getConfiguration();
95      long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
96      long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
97      int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
98  
99      // setup the procedure coordinator
100     String name = master.getServerName().toString();
101     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
102     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
103         master.getZooKeeper(), getProcedureSignature(), name);
104 
105     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
106   }
107 
108   @Override
109   public String getProcedureSignature() {
110     return FLUSH_TABLE_PROCEDURE_SIGNATURE;
111   }
112 
113   @Override
114   public void execProcedure(ProcedureDescription desc) throws IOException {
115 
116     TableName tableName = TableName.valueOf(desc.getInstance());
117 
118     // call pre coproc hook
119     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
120     if (cpHost != null) {
121       cpHost.preTableFlush(tableName);
122     }
123 
124     // Get the list of region servers that host the online regions for table.
125     // We use the procedure instance name to carry the table name from the client.
126     // It is possible that regions may move after we get the region server list.
127     // Each region server will get its own online regions for the table.
128     // We may still miss regions that need to be flushed.
129     List<Pair<HRegionInfo, ServerName>> regionsAndLocations;
130 
131     if (TableName.META_TABLE_NAME.equals(tableName)) {
132       regionsAndLocations = new MetaTableLocator().getMetaRegionsAndLocations(
133         master.getZooKeeper());
134     } else {
135       regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations(
136         master.getZooKeeper(), master.getConnection(), tableName, false);
137     }
138 
139     Set<String> regionServers = new HashSet<String>(regionsAndLocations.size());
140     for (Pair<HRegionInfo, ServerName> region : regionsAndLocations) {
141       if (region != null && region.getFirst() != null && region.getSecond() != null) {
142         HRegionInfo hri = region.getFirst();
143         if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue;
144         regionServers.add(region.getSecond().toString());
145       }
146     }
147 
148     ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
149 
150     // Kick of the global procedure from the master coordinator to the region servers.
151     // We rely on the existing Distributed Procedure framework to prevent any concurrent
152     // procedure with the same name.
153     Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
154       new byte[0], Lists.newArrayList(regionServers));
155     monitor.rethrowException();
156     if (proc == null) {
157       String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
158           + desc.getInstance() + "'. " + "Another flush procedure is running?";
159       LOG.error(msg);
160       throw new IOException(msg);
161     }
162 
163     procMap.put(tableName, proc);
164 
165     try {
166       // wait for the procedure to complete.  A timer thread is kicked off that should cancel this
167       // if it takes too long.
168       proc.waitForCompleted();
169       LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '"
170           + desc.getInstance() + "'");
171       LOG.info("Master flush table procedure is successful!");
172     } catch (InterruptedException e) {
173       ForeignException ee =
174           new ForeignException("Interrupted while waiting for flush table procdure to finish", e);
175       monitor.receive(ee);
176       Thread.currentThread().interrupt();
177     } catch (ForeignException e) {
178       ForeignException ee =
179           new ForeignException("Exception while waiting for flush table procdure to finish", e);
180       monitor.receive(ee);
181     }
182     monitor.rethrowException();
183   }
184 
185   @Override
186   public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException {
187     // Procedure instance name is the table name.
188     TableName tableName = TableName.valueOf(desc.getInstance());
189     Procedure proc = procMap.get(tableName);
190     if (proc == null) {
191       // The procedure has not even been started yet.
192       // The client would request the procedure and call isProcedureDone().
193       // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone().
194       return false;
195     }
196     // We reply on the existing Distributed Procedure framework to give us the status.
197     return proc.isCompleted();
198   }
199 
200 }