001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ThreadPoolExecutor; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.MetaTableAccessor; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.errorhandling.ForeignException; 034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 035import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 036import org.apache.hadoop.hbase.master.MasterServices; 037import org.apache.hadoop.hbase.master.MetricsMaster; 038import org.apache.hadoop.hbase.procedure.MasterProcedureManager; 039import org.apache.hadoop.hbase.procedure.Procedure; 040import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; 041import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; 042import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; 043import org.apache.hadoop.hbase.security.User; 044import org.apache.hadoop.hbase.security.access.AccessChecker; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.apache.zookeeper.KeeperException; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 055 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 057public class MasterFlushTableProcedureManager extends MasterProcedureManager { 058 059 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 060 061 private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; 062 private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; 063 private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; 064 private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500; 065 066 private static final String FLUSH_PROC_POOL_THREADS_KEY = 067 "hbase.flush.procedure.master.threads"; 068 private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1; 069 070 private static final Logger LOG = LoggerFactory.getLogger(MasterFlushTableProcedureManager.class); 071 072 private MasterServices master; 073 private ProcedureCoordinator coordinator; 074 private Map<TableName, Procedure> procMap = new HashMap<>(); 075 private boolean stopped; 076 077 public MasterFlushTableProcedureManager() {}; 078 079 @Override 080 public void stop(String why) { 081 LOG.info("stop: " + why); 082 this.stopped = true; 083 } 084 085 @Override 086 public boolean isStopped() { 087 return this.stopped; 088 } 089 090 @Override 091 public void initialize(MasterServices master, MetricsMaster metricsMaster) 092 throws KeeperException, IOException, UnsupportedOperationException { 093 this.master = master; 094 095 // get the configuration for the coordinator 096 Configuration conf = master.getConfiguration(); 097 long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT); 098 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); 099 int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT); 100 101 // setup the procedure coordinator 102 String name = master.getServerName().toString(); 103 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads); 104 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( 105 master.getZooKeeper(), getProcedureSignature(), name); 106 107 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); 108 } 109 110 @Override 111 public String getProcedureSignature() { 112 return FLUSH_TABLE_PROCEDURE_SIGNATURE; 113 } 114 115 @Override 116 public void execProcedure(ProcedureDescription desc) throws IOException { 117 118 TableName tableName = TableName.valueOf(desc.getInstance()); 119 120 // call pre coproc hook 121 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 122 if (cpHost != null) { 123 cpHost.preTableFlush(tableName); 124 } 125 126 // Get the list of region servers that host the online regions for table. 127 // We use the procedure instance name to carry the table name from the client. 128 // It is possible that regions may move after we get the region server list. 129 // Each region server will get its own online regions for the table. 130 // We may still miss regions that need to be flushed. 131 List<Pair<RegionInfo, ServerName>> regionsAndLocations; 132 133 if (TableName.META_TABLE_NAME.equals(tableName)) { 134 regionsAndLocations = MetaTableLocator.getMetaRegionsAndLocations( 135 master.getZooKeeper()); 136 } else { 137 regionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations( 138 master.getConnection(), tableName, false); 139 } 140 141 Set<String> regionServers = new HashSet<>(regionsAndLocations.size()); 142 for (Pair<RegionInfo, ServerName> region : regionsAndLocations) { 143 if (region != null && region.getFirst() != null && region.getSecond() != null) { 144 RegionInfo hri = region.getFirst(); 145 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue; 146 regionServers.add(region.getSecond().toString()); 147 } 148 } 149 150 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); 151 152 // Kick of the global procedure from the master coordinator to the region servers. 153 // We rely on the existing Distributed Procedure framework to prevent any concurrent 154 // procedure with the same name. 155 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), 156 new byte[0], Lists.newArrayList(regionServers)); 157 monitor.rethrowException(); 158 if (proc == null) { 159 String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '" 160 + desc.getInstance() + "'. " + "Another flush procedure is running?"; 161 LOG.error(msg); 162 throw new IOException(msg); 163 } 164 165 procMap.put(tableName, proc); 166 167 try { 168 // wait for the procedure to complete. A timer thread is kicked off that should cancel this 169 // if it takes too long. 170 proc.waitForCompleted(); 171 LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '" 172 + desc.getInstance() + "'"); 173 LOG.info("Master flush table procedure is successful!"); 174 } catch (InterruptedException e) { 175 ForeignException ee = 176 new ForeignException("Interrupted while waiting for flush table procdure to finish", e); 177 monitor.receive(ee); 178 Thread.currentThread().interrupt(); 179 } catch (ForeignException e) { 180 ForeignException ee = 181 new ForeignException("Exception while waiting for flush table procdure to finish", e); 182 monitor.receive(ee); 183 } 184 monitor.rethrowException(); 185 } 186 187 @Override 188 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 189 throws IOException { 190 // Done by AccessController as part of preTableFlush coprocessor hook (legacy code path). 191 // In future, when we AC is removed for good, that check should be moved here. 192 } 193 194 @Override 195 public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException { 196 // Procedure instance name is the table name. 197 TableName tableName = TableName.valueOf(desc.getInstance()); 198 Procedure proc = procMap.get(tableName); 199 if (proc == null) { 200 // The procedure has not even been started yet. 201 // The client would request the procedure and call isProcedureDone(). 202 // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone(). 203 return false; 204 } 205 // We reply on the existing Distributed Procedure framework to give us the status. 206 return proc.isCompleted(); 207 } 208 209}