001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.stream.StreamSupport; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.TableDescriptor; 035import org.apache.hadoop.hbase.errorhandling.ForeignException; 036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 037import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 038import org.apache.hadoop.hbase.master.MasterServices; 039import org.apache.hadoop.hbase.master.MetricsMaster; 040import org.apache.hadoop.hbase.procedure.MasterProcedureManager; 041import org.apache.hadoop.hbase.procedure.Procedure; 042import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; 043import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; 044import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; 045import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.security.access.AccessChecker; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.hadoop.hbase.util.Strings; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.apache.zookeeper.KeeperException; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 057 058import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 060 061@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 062public class MasterFlushTableProcedureManager extends MasterProcedureManager { 063 064 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; 065 066 public static final String FLUSH_PROCEDURE_ENABLED = "hbase.flush.procedure.enabled"; 067 068 public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true; 069 070 private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; 071 private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; 072 private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; 073 private static final int FLUSH_WAKE_MILLIS_DEFAULT = 500; 074 075 private static final String FLUSH_PROC_POOL_THREADS_KEY = "hbase.flush.procedure.master.threads"; 076 private static final int FLUSH_PROC_POOL_THREADS_DEFAULT = 1; 077 078 private static final Logger LOG = LoggerFactory.getLogger(MasterFlushTableProcedureManager.class); 079 080 private MasterServices master; 081 private ProcedureCoordinator coordinator; 082 private Map<TableName, Procedure> procMap = new HashMap<>(); 083 private boolean stopped; 084 085 public MasterFlushTableProcedureManager() { 086 } 087 088 @Override 089 public void stop(String why) { 090 LOG.info("stop: " + why); 091 this.stopped = true; 092 } 093 094 @Override 095 public boolean isStopped() { 096 return this.stopped; 097 } 098 099 @Override 100 public void initialize(MasterServices master, MetricsMaster metricsMaster) 101 throws KeeperException, IOException, UnsupportedOperationException { 102 this.master = master; 103 104 // get the configuration for the coordinator 105 Configuration conf = master.getConfiguration(); 106 long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT); 107 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); 108 int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT); 109 110 // setup the procedure coordinator 111 String name = master.getServerName().toString(); 112 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads); 113 ProcedureCoordinatorRpcs comms = 114 new ZKProcedureCoordinator(master.getZooKeeper(), getProcedureSignature(), name); 115 116 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); 117 } 118 119 @Override 120 public String getProcedureSignature() { 121 return FLUSH_TABLE_PROCEDURE_SIGNATURE; 122 } 123 124 @Override 125 public void execProcedure(ProcedureDescription desc) throws IOException { 126 127 TableName tableName = TableName.valueOf(desc.getInstance()); 128 129 // call pre coproc hook 130 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 131 if (cpHost != null) { 132 cpHost.preTableFlush(tableName); 133 } 134 135 // Get the list of region servers that host the online regions for table. 136 // We use the procedure instance name to carry the table name from the client. 137 // It is possible that regions may move after we get the region server list. 138 // Each region server will get its own online regions for the table. 139 // We may still miss regions that need to be flushed. 140 List<Pair<RegionInfo, ServerName>> regionsAndLocations = 141 master.getAssignmentManager().getTableRegionsAndLocations(tableName, false); 142 143 Set<String> regionServers = new HashSet<>(regionsAndLocations.size()); 144 for (Pair<RegionInfo, ServerName> region : regionsAndLocations) { 145 if (region != null && region.getFirst() != null && region.getSecond() != null) { 146 RegionInfo hri = region.getFirst(); 147 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue; 148 regionServers.add(region.getSecond().toString()); 149 } 150 } 151 152 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); 153 154 HBaseProtos.NameStringPair families = null; 155 for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) { 156 if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) { 157 families = nsp; 158 } 159 } 160 161 byte[] procArgs; 162 if (families != null) { 163 TableDescriptor tableDescriptor = master.getTableDescriptors().get(tableName); 164 List<String> noSuchFamilies = 165 StreamSupport.stream(Strings.SPLITTER.split(families.getValue()).spliterator(), false) 166 .filter(cf -> !tableDescriptor.hasColumnFamily(Bytes.toBytes(cf))).toList(); 167 if (!noSuchFamilies.isEmpty()) { 168 throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies 169 + " don't exist in table " + tableName.getNameAsString()); 170 } 171 procArgs = families.toByteArray(); 172 } else { 173 procArgs = new byte[0]; 174 } 175 176 // Kick of the global procedure from the master coordinator to the region servers. 177 // We rely on the existing Distributed Procedure framework to prevent any concurrent 178 // procedure with the same name. 179 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), procArgs, 180 Lists.newArrayList(regionServers)); 181 monitor.rethrowException(); 182 if (proc == null) { 183 String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '" 184 + desc.getInstance() + "'. " + "Another flush procedure is running?"; 185 LOG.error(msg); 186 throw new IOException(msg); 187 } 188 189 procMap.put(tableName, proc); 190 191 try { 192 // wait for the procedure to complete. A timer thread is kicked off that should cancel this 193 // if it takes too long. 194 proc.waitForCompleted(); 195 LOG.info("Done waiting - exec procedure " + desc.getSignature() + " for '" 196 + desc.getInstance() + "'"); 197 LOG.info("Master flush table procedure is successful!"); 198 } catch (InterruptedException e) { 199 ForeignException ee = 200 new ForeignException("Interrupted while waiting for flush table procdure to finish", e); 201 monitor.receive(ee); 202 Thread.currentThread().interrupt(); 203 } catch (ForeignException e) { 204 ForeignException ee = 205 new ForeignException("Exception while waiting for flush table procdure to finish", e); 206 monitor.receive(ee); 207 } 208 monitor.rethrowException(); 209 } 210 211 @Override 212 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 213 throws IOException { 214 // Done by AccessController as part of preTableFlush coprocessor hook (legacy code path). 215 // In future, when we AC is removed for good, that check should be moved here. 216 } 217 218 @Override 219 public synchronized boolean isProcedureDone(ProcedureDescription desc) throws IOException { 220 // Procedure instance name is the table name. 221 TableName tableName = TableName.valueOf(desc.getInstance()); 222 Procedure proc = procMap.get(tableName); 223 if (proc == null) { 224 // The procedure has not even been started yet. 225 // The client would request the procedure and call isProcedureDone(). 226 // The HBaseAdmin.execProcedure() wraps both request and isProcedureDone(). 227 return false; 228 } 229 // We reply on the existing Distributed Procedure framework to give us the status. 230 return proc.isCompleted(); 231 } 232 233}