001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
024
025import java.io.IOException;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
043import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051/**
052 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
053 * {@link SplitWALProcedure}.
054 * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER).
055 * Helps assign and release workers for split tasks.
056 * Provide helper method to delete split WAL file and directory.
057 *
058 * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta)
059 * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta)
060 * can delete the splitting WAL and directory via deleteSplitWAL(wal)
061 * and deleteSplitWAL(crashedServer)
062 * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath)
063 * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure)
064 * and releaseSplitWALWorker(worker, scheduler)
065 *
066 * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
067 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
068 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
069 * after we switch to procedure-based WAL splitting.
070 */
071@InterfaceAudience.Private
072public class SplitWALManager {
073  private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
074
075  private final MasterServices master;
076  private final SplitWorkerAssigner splitWorkerAssigner;
077  private final Path rootDir;
078  private final FileSystem fs;
079  private final Configuration conf;
080
081  public SplitWALManager(MasterServices master) {
082    this.master = master;
083    this.conf = master.getConfiguration();
084    this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
085        conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
086    this.rootDir = master.getMasterFileSystem().getWALRootDir();
087    this.fs = master.getMasterFileSystem().getFileSystem();
088
089  }
090
091  public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
092      throws IOException {
093    try {
094      // 1. list all splitting files
095      List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
096      // 2. create corresponding procedures
097      return createSplitWALProcedures(splittingFiles, crashedServer);
098    } catch (IOException e) {
099      LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e);
100      throw e;
101    }
102  }
103
104  public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
105      throws IOException {
106    List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
107    FileStatus[] fileStatuses =
108        SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
109    LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, fileStatuses.length, splitMeta);
110    return Lists.newArrayList(fileStatuses);
111  }
112
113  private Path getWALSplitDir(ServerName serverName) {
114    Path logDir =
115        new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
116    return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
117  }
118
119  public void deleteSplitWAL(String wal) throws IOException {
120    fs.delete(new Path(wal), false);
121  }
122
123  public void deleteWALDir(ServerName serverName) throws IOException {
124    Path splitDir = getWALSplitDir(serverName);
125    fs.delete(splitDir, false);
126  }
127
128  public boolean isSplitWALFinished(String walPath) throws IOException {
129    return !fs.exists(new Path(rootDir, walPath));
130  }
131
132  @VisibleForTesting
133  List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
134      ServerName crashedServer) {
135    return splittingWALs.stream()
136        .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
137        .collect(Collectors.toList());
138  }
139
140  /**
141   * try to acquire an worker from online servers which is executring
142   * @param procedure split WAL task
143   * @return an available region server which could execute this task
144   * @throws ProcedureSuspendedException if there is no available worker,
145   *         it will throw this exception to let the procedure wait
146   */
147  public ServerName acquireSplitWALWorker(Procedure<?> procedure)
148      throws ProcedureSuspendedException {
149    Optional<ServerName> worker = splitWorkerAssigner.acquire();
150    LOG.debug("acquired a worker {} to split a WAL", worker);
151    if (worker.isPresent()) {
152      return worker.get();
153    }
154    splitWorkerAssigner.suspend(procedure);
155    throw new ProcedureSuspendedException();
156  }
157
158  /**
159   * After the worker finished the split WAL task, it will release the worker, and wake up all the
160   * suspend procedures in the ProcedureEvent
161   * @param worker worker which is about to release
162   * @param scheduler scheduler which is to wake up the procedure event
163   */
164  public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
165    LOG.debug("release a worker {} to split a WAL", worker);
166    splitWorkerAssigner.release(worker);
167    splitWorkerAssigner.wake(scheduler);
168  }
169
170  /**
171   * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
172   * tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
173   * Thus we should add the workers of running tasks to the assigner when we load the procedures
174   * from MasterProcWALs.
175   * @param worker region server which is executing a split WAL task
176   */
177  public void addUsedSplitWALWorker(ServerName worker){
178    splitWorkerAssigner.addUsedWorker(worker);
179  }
180
181  /**
182   * help assign and release a worker for each WAL splitting task
183   * For each worker, concurrent running splitting task should be no more than maxSplitTasks
184   * If a task failed to acquire a worker, it will suspend and wait for workers available
185   *
186   */
187  private static final class SplitWorkerAssigner implements ServerListener {
188    private int maxSplitTasks;
189    private final ProcedureEvent<?> event;
190    private Map<ServerName, Integer> currentWorkers = new HashMap<>();
191    private MasterServices master;
192
193    public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
194      this.maxSplitTasks = maxSplitTasks;
195      this.master = master;
196      this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
197      this.master.getServerManager().registerListener(this);
198    }
199
200    public synchronized Optional<ServerName> acquire() {
201      List<ServerName> serverList = master.getServerManager().getOnlineServersList();
202      Collections.shuffle(serverList);
203      Optional<ServerName> worker = serverList.stream().filter(
204        serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
205          .findAny();
206      if (worker.isPresent()) {
207        currentWorkers.compute(worker.get(), (serverName,
208            availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
209      }
210      return worker;
211    }
212
213    public synchronized void release(ServerName serverName) {
214      currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
215    }
216
217    public void suspend(Procedure<?> proc) {
218      event.suspend();
219      event.suspendIfNotReady(proc);
220    }
221
222    public void wake(MasterProcedureScheduler scheduler) {
223      if (!event.isReady()) {
224        event.wake(scheduler);
225      }
226    }
227
228    @Override
229    public void serverAdded(ServerName worker) {
230      this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
231    }
232
233    public synchronized void addUsedWorker(ServerName worker) {
234      // load used worker when master restart
235      currentWorkers.compute(worker, (serverName,
236          availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
237    }
238  }
239}