001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
022import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
045import org.apache.hadoop.hbase.wal.WALSplitUtil;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
052 * {@link SplitWALProcedure}.
053 * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER).
054 * Helps assign and release workers for split tasks.
055 * Provide helper method to delete split WAL file and directory.
056 *
057 * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta)
058 * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta)
059 * can delete the splitting WAL and directory via deleteSplitWAL(wal)
060 * and deleteSplitWAL(crashedServer)
061 * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath)
062 * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure)
063 * and releaseSplitWALWorker(worker, scheduler)
064 *
065 * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
066 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
067 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
068 * after we switch to procedure-based WAL splitting.
069 * @see SplitLogManager for the original distributed split WAL manager.
070 */
071@InterfaceAudience.Private
072public class SplitWALManager {
073  private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
074
075  private final MasterServices master;
076  private final SplitWorkerAssigner splitWorkerAssigner;
077  private final Path rootDir;
078  private final FileSystem fs;
079  private final Configuration conf;
080  private final Path walArchiveDir;
081
082  public SplitWALManager(MasterServices master) throws IOException {
083    this.master = master;
084    this.conf = master.getConfiguration();
085    this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
086        conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
087    this.rootDir = master.getMasterFileSystem().getWALRootDir();
088    // TODO: This should be the WAL FS, not the Master FS?
089    this.fs = master.getMasterFileSystem().getFileSystem();
090    this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
091  }
092
093  public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
094      throws IOException {
095    try {
096      // 1. list all splitting files
097      List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
098      // 2. create corresponding procedures
099      return createSplitWALProcedures(splittingFiles, crashedServer);
100    } catch (IOException e) {
101      LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e);
102      throw e;
103    }
104  }
105
106  public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
107      throws IOException {
108    List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
109    List<FileStatus> fileStatuses =
110      SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
111    LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta);
112    return fileStatuses;
113  }
114
115  private Path getWALSplitDir(ServerName serverName) {
116    Path logDir =
117        new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
118    return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
119  }
120
121  /**
122   * Archive processed WAL
123   */
124  public void archive(String wal) throws IOException {
125    WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
126  }
127
128  public void deleteWALDir(ServerName serverName) throws IOException {
129    Path splitDir = getWALSplitDir(serverName);
130    try {
131      if (!fs.delete(splitDir, false)) {
132        LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
133      }
134    } catch (PathIsNotEmptyDirectoryException e) {
135      FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir);
136      LOG.warn("PathIsNotEmptyDirectoryException {}",
137        Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
138      throw e;
139    }
140  }
141
142  public boolean isSplitWALFinished(String walPath) throws IOException {
143    return !fs.exists(new Path(rootDir, walPath));
144  }
145
146  List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
147      ServerName crashedServer) {
148    return splittingWALs.stream()
149        .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
150        .collect(Collectors.toList());
151  }
152
153  /**
154   * Acquire a split WAL worker
155   * @param procedure split WAL task
156   * @return an available region server which could execute this task
157   * @throws ProcedureSuspendedException if there is no available worker,
158   *         it will throw this exception to WAIT the procedure.
159   */
160  public ServerName acquireSplitWALWorker(Procedure<?> procedure)
161      throws ProcedureSuspendedException {
162    Optional<ServerName> worker = splitWorkerAssigner.acquire();
163    if (worker.isPresent()) {
164      LOG.debug("Acquired split WAL worker={}", worker.get());
165      return worker.get();
166    }
167    splitWorkerAssigner.suspend(procedure);
168    throw new ProcedureSuspendedException();
169  }
170
171  /**
172   * After the worker finished the split WAL task, it will release the worker, and wake up all the
173   * suspend procedures in the ProcedureEvent
174   * @param worker worker which is about to release
175   * @param scheduler scheduler which is to wake up the procedure event
176   */
177  public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
178    LOG.debug("Release split WAL worker={}", worker);
179    splitWorkerAssigner.release(worker);
180    splitWorkerAssigner.wake(scheduler);
181  }
182
183  /**
184   * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
185   * tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
186   * Thus we should add the workers of running tasks to the assigner when we load the procedures
187   * from MasterProcWALs.
188   * @param worker region server which is executing a split WAL task
189   */
190  public void addUsedSplitWALWorker(ServerName worker){
191    splitWorkerAssigner.addUsedWorker(worker);
192  }
193
194  /**
195   * help assign and release a worker for each WAL splitting task
196   * For each worker, concurrent running splitting task should be no more than maxSplitTasks
197   * If a task failed to acquire a worker, it will suspend and wait for workers available
198   *
199   */
200  private static final class SplitWorkerAssigner implements ServerListener {
201    private int maxSplitTasks;
202    private final ProcedureEvent<?> event;
203    private Map<ServerName, Integer> currentWorkers = new HashMap<>();
204    private MasterServices master;
205
206    public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
207      this.maxSplitTasks = maxSplitTasks;
208      this.master = master;
209      this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
210      // ServerManager might be null in a test context where we are mocking; allow for this
211      ServerManager sm = this.master.getServerManager();
212      if (sm != null) {
213        sm.registerListener(this);
214      }
215    }
216
217    public synchronized Optional<ServerName> acquire() {
218      List<ServerName> serverList = master.getServerManager().getOnlineServersList();
219      Collections.shuffle(serverList);
220      Optional<ServerName> worker = serverList.stream().filter(
221        serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
222          .findAny();
223      if (worker.isPresent()) {
224        currentWorkers.compute(worker.get(), (serverName,
225            availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
226      }
227      return worker;
228    }
229
230    public synchronized void release(ServerName serverName) {
231      currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
232    }
233
234    public void suspend(Procedure<?> proc) {
235      event.suspend();
236      event.suspendIfNotReady(proc);
237    }
238
239    public void wake(MasterProcedureScheduler scheduler) {
240      if (!event.isReady()) {
241        event.wake(scheduler);
242      }
243    }
244
245    @Override
246    public void serverAdded(ServerName worker) {
247      this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
248    }
249
250    public synchronized void addUsedWorker(ServerName worker) {
251      // load used worker when master restart
252      currentWorkers.compute(worker, (serverName,
253          availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
254    }
255  }
256}