001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.migrate;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.TimeUnit;
026import java.util.stream.Collectors;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ScheduledChore;
030import org.apache.hadoop.hbase.Stoppable;
031import org.apache.hadoop.hbase.TableDescriptors;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.master.MasterServices;
034import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
035import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
036import org.apache.hadoop.hbase.regionserver.storefiletracker.InitializeStoreFileTrackerProcedure;
037import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * To avoid too many migrating/upgrade threads to be submitted at the time during master
044 * initialization, RollingUpgradeChore handles all rolling-upgrade tasks.
045 */
046@InterfaceAudience.Private
047public class RollingUpgradeChore extends ScheduledChore {
048
049  static final String ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY =
050    "hbase.master.rolling.upgrade.chore.period.secs";
051  static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS = 10; // 10 seconds by default
052
053  static final String ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY =
054    "hbase.master.rolling.upgrade.chore.delay.secs";
055  static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS = 30; // 30 seconds
056
057  static final int CONCURRENT_PROCEDURES_COUNT = 5;
058
059  private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class);
060  ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
061  private TableDescriptors tableDescriptors;
062  private List<InitializeStoreFileTrackerProcedure> processingProcs = new ArrayList<>();
063
064  public RollingUpgradeChore(MasterServices masterServices) {
065    this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(),
066      masterServices.getTableDescriptors(), masterServices);
067  }
068
069  private RollingUpgradeChore(Configuration conf,
070    ProcedureExecutor<MasterProcedureEnv> procedureExecutor, TableDescriptors tableDescriptors,
071    Stoppable stopper) {
072    super(RollingUpgradeChore.class.getSimpleName(), stopper,
073      conf.getInt(ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY,
074        DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS),
075      conf.getLong(ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY,
076        DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS),
077      TimeUnit.SECONDS);
078    this.procedureExecutor = procedureExecutor;
079    this.tableDescriptors = tableDescriptors;
080  }
081
082  @Override
083  protected void chore() {
084    if (isCompletelyMigrateSFT(CONCURRENT_PROCEDURES_COUNT)) {
085      LOG.info("All Rolling-Upgrade tasks are complete, shutdown RollingUpgradeChore!");
086      shutdown();
087    }
088  }
089
090  private boolean isCompletelyMigrateSFT(int concurrentCount) {
091    Iterator<InitializeStoreFileTrackerProcedure> iter = processingProcs.iterator();
092    while (iter.hasNext()) {
093      InitializeStoreFileTrackerProcedure proc = iter.next();
094      if (procedureExecutor.isFinished(proc.getProcId())) {
095        iter.remove();
096      }
097    }
098    // No new migration procedures will be submitted until
099    // all procedures executed last time are completed.
100    if (!processingProcs.isEmpty()) {
101      return false;
102    }
103
104    Map<String, TableDescriptor> migrateSFTTables;
105    try {
106      migrateSFTTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> {
107        TableDescriptor td = entry.getValue();
108        return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
109      }).limit(concurrentCount).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
110    } catch (IOException e) {
111      LOG.warn("Failed to migrate StoreFileTracker", e);
112      return false;
113    }
114
115    if (migrateSFTTables.isEmpty()) {
116      LOG.info("There is no table to migrate StoreFileTracker!");
117      return true;
118    }
119
120    for (Map.Entry<String, TableDescriptor> entry : migrateSFTTables.entrySet()) {
121      TableDescriptor tableDescriptor = entry.getValue();
122      InitializeStoreFileTrackerProcedure proc = new InitializeStoreFileTrackerProcedure(
123        procedureExecutor.getEnvironment(), tableDescriptor.getTableName());
124      procedureExecutor.submitProcedure(proc);
125      processingProcs.add(proc);
126    }
127    return false;
128  }
129}