001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.List;
029import java.util.Optional;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
045import org.apache.hadoop.hbase.wal.WALSplitUtil;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
052 * {@link SplitWALProcedure}. Total number of workers is (number of online servers) *
053 * (HBASE_SPLIT_WAL_MAX_SPLITTER). Helps assign and release workers for split tasks. Provide helper
054 * method to delete split WAL file and directory. The user can get the SplitWALProcedures via
055 * splitWALs(crashedServer, splitMeta) can get the files that need to split via
056 * getWALsToSplit(crashedServer, splitMeta) can delete the splitting WAL and directory via
057 * deleteSplitWAL(wal) and deleteSplitWAL(crashedServer) can check if splitting WALs of a crashed
058 * server is success via isSplitWALFinished(walPath) can acquire and release a worker for splitting
059 * WAL via acquireSplitWALWorker(procedure) and releaseSplitWALWorker(worker, scheduler) This class
060 * is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
061 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
062 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed after
063 * we switch to procedure-based WAL splitting.
064 * @see SplitLogManager for the original distributed split WAL manager.
065 */
066@InterfaceAudience.Private
067public class SplitWALManager {
068  private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
069
070  private final MasterServices master;
071  private final WorkerAssigner splitWorkerAssigner;
072  private final Path rootDir;
073  private final FileSystem fs;
074  private final Configuration conf;
075  private final Path walArchiveDir;
076
077  public SplitWALManager(MasterServices master) throws IOException {
078    this.master = master;
079    this.conf = master.getConfiguration();
080    this.splitWorkerAssigner = new WorkerAssigner(this.master,
081      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER),
082      new ProcedureEvent<>("split-WAL-worker-assigning"));
083    this.rootDir = master.getMasterFileSystem().getWALRootDir();
084    this.fs = master.getMasterFileSystem().getWALFileSystem();
085    this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
086  }
087
088  public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) throws IOException {
089    try {
090      // 1. list all splitting files
091      List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
092      // 2. create corresponding procedures
093      return createSplitWALProcedures(splittingFiles, crashedServer);
094    } catch (IOException e) {
095      LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e);
096      throw e;
097    }
098  }
099
100  public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
101    throws IOException {
102    List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
103    List<FileStatus> fileStatuses =
104      SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
105    LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta);
106    return fileStatuses;
107  }
108
109  private Path getWALSplitDir(ServerName serverName) {
110    Path logDir =
111      new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
112    return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
113  }
114
115  /**
116   * Archive processed WAL
117   */
118  public void archive(String wal) throws IOException {
119    WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
120  }
121
122  public void deleteWALDir(ServerName serverName) throws IOException {
123    Path splitDir = getWALSplitDir(serverName);
124    try {
125      if (!fs.delete(splitDir, false)) {
126        LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
127      }
128    } catch (PathIsNotEmptyDirectoryException e) {
129      FileStatus[] files = CommonFSUtils.listStatus(fs, splitDir);
130      LOG.warn("PathIsNotEmptyDirectoryException {}",
131        Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
132      throw e;
133    }
134  }
135
136  public boolean isSplitWALFinished(String walPath) throws IOException {
137    return !fs.exists(new Path(rootDir, walPath));
138  }
139
140  List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
141    ServerName crashedServer) {
142    return splittingWALs.stream()
143      .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
144      .collect(Collectors.toList());
145  }
146
147  /**
148   * Acquire a split WAL worker
149   * @param procedure split WAL task
150   * @return an available region server which could execute this task
151   * @throws ProcedureSuspendedException if there is no available worker, it will throw this
152   *                                     exception to WAIT the procedure.
153   */
154  public ServerName acquireSplitWALWorker(Procedure<?> procedure)
155    throws ProcedureSuspendedException {
156    Optional<ServerName> worker = splitWorkerAssigner.acquire();
157    if (worker.isPresent()) {
158      LOG.debug("Acquired split WAL worker={}", worker.get());
159      return worker.get();
160    }
161    splitWorkerAssigner.suspend(procedure);
162    throw new ProcedureSuspendedException();
163  }
164
165  /**
166   * After the worker finished the split WAL task, it will release the worker, and wake up all the
167   * suspend procedures in the ProcedureEvent
168   * @param worker    worker which is about to release
169   * @param scheduler scheduler which is to wake up the procedure event
170   */
171  public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
172    LOG.debug("Release split WAL worker={}", worker);
173    splitWorkerAssigner.release(worker);
174    splitWorkerAssigner.wake(scheduler);
175  }
176
177  /**
178   * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
179   * tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
180   * Thus we should add the workers of running tasks to the assigner when we load the procedures
181   * from MasterProcWALs.
182   * @param worker region server which is executing a split WAL task
183   */
184  public void addUsedSplitWALWorker(ServerName worker) {
185    splitWorkerAssigner.addUsedWorker(worker);
186  }
187}