001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.migrate; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.TimeUnit; 026import java.util.stream.Collectors; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ScheduledChore; 030import org.apache.hadoop.hbase.Stoppable; 031import org.apache.hadoop.hbase.TableDescriptors; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.master.MasterServices; 034import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 035import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 036import org.apache.hadoop.hbase.regionserver.storefiletracker.InitializeStoreFileTrackerProcedure; 037import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * To avoid too many migrating/upgrade threads to be submitted at the time during master 044 * initialization, RollingUpgradeChore handles all rolling-upgrade tasks. 045 */ 046@InterfaceAudience.Private 047public class RollingUpgradeChore extends ScheduledChore { 048 049 static final String ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY = 050 "hbase.master.rolling.upgrade.chore.period.secs"; 051 static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS = 10; // 10 seconds by default 052 053 static final String ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY = 054 "hbase.master.rolling.upgrade.chore.delay.secs"; 055 static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS = 30; // 30 seconds 056 057 static final int CONCURRENT_PROCEDURES_COUNT = 5; 058 059 private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class); 060 ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 061 private TableDescriptors tableDescriptors; 062 private List<InitializeStoreFileTrackerProcedure> processingProcs = new ArrayList<>(); 063 064 public RollingUpgradeChore(MasterServices masterServices) { 065 this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(), 066 masterServices.getTableDescriptors(), masterServices); 067 } 068 069 private RollingUpgradeChore(Configuration conf, 070 ProcedureExecutor<MasterProcedureEnv> procedureExecutor, TableDescriptors tableDescriptors, 071 Stoppable stopper) { 072 super(RollingUpgradeChore.class.getSimpleName(), stopper, 073 conf.getInt(ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY, 074 DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS), 075 conf.getLong(ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY, 076 DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS), 077 TimeUnit.SECONDS); 078 this.procedureExecutor = procedureExecutor; 079 this.tableDescriptors = tableDescriptors; 080 } 081 082 @Override 083 protected void chore() { 084 if (isCompletelyMigrateSFT(CONCURRENT_PROCEDURES_COUNT)) { 085 LOG.info("All Rolling-Upgrade tasks are complete, shutdown RollingUpgradeChore!"); 086 shutdown(); 087 } 088 } 089 090 private boolean isCompletelyMigrateSFT(int concurrentCount) { 091 Iterator<InitializeStoreFileTrackerProcedure> iter = processingProcs.iterator(); 092 while (iter.hasNext()) { 093 InitializeStoreFileTrackerProcedure proc = iter.next(); 094 if (procedureExecutor.isFinished(proc.getProcId())) { 095 iter.remove(); 096 } 097 } 098 // No new migration procedures will be submitted until 099 // all procedures executed last time are completed. 100 if (!processingProcs.isEmpty()) { 101 return false; 102 } 103 104 Map<String, TableDescriptor> migrateSFTTables; 105 try { 106 migrateSFTTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> { 107 TableDescriptor td = entry.getValue(); 108 return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL)); 109 }).limit(concurrentCount).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); 110 } catch (IOException e) { 111 LOG.warn("Failed to migrate StoreFileTracker", e); 112 return false; 113 } 114 115 if (migrateSFTTables.isEmpty()) { 116 LOG.info("There is no table to migrate StoreFileTracker!"); 117 return true; 118 } 119 120 for (Map.Entry<String, TableDescriptor> entry : migrateSFTTables.entrySet()) { 121 TableDescriptor tableDescriptor = entry.getValue(); 122 InitializeStoreFileTrackerProcedure proc = new InitializeStoreFileTrackerProcedure( 123 procedureExecutor.getEnvironment(), tableDescriptor.getTableName()); 124 procedureExecutor.submitProcedure(proc); 125 processingProcs.add(proc); 126 } 127 return false; 128 } 129}