001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
022import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
039import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
042import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
043import org.apache.hadoop.hbase.util.CommonFSUtils;
044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
045import org.apache.hadoop.hbase.wal.WALSplitUtil;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052/**
053 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
054 * {@link SplitWALProcedure}.
055 * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER).
056 * Helps assign and release workers for split tasks.
057 * Provide helper method to delete split WAL file and directory.
058 *
059 * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta)
060 * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta)
061 * can delete the splitting WAL and directory via deleteSplitWAL(wal)
062 * and deleteSplitWAL(crashedServer)
063 * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath)
064 * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure)
065 * and releaseSplitWALWorker(worker, scheduler)
066 *
067 * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
068 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
069 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
070 * after we switch to procedure-based WAL splitting.
071 * @see SplitLogManager for the original distributed split WAL manager.
072 */
073@InterfaceAudience.Private
074public class SplitWALManager {
075  private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
076
077  private final MasterServices master;
078  private final SplitWorkerAssigner splitWorkerAssigner;
079  private final Path rootDir;
080  private final FileSystem fs;
081  private final Configuration conf;
082  private final Path walArchiveDir;
083
084  public SplitWALManager(MasterServices master) throws IOException {
085    this.master = master;
086    this.conf = master.getConfiguration();
087    this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
088        conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
089    this.rootDir = master.getMasterFileSystem().getWALRootDir();
090    // TODO: This should be the WAL FS, not the Master FS?
091    this.fs = master.getMasterFileSystem().getFileSystem();
092    this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
093  }
094
095  public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
096      throws IOException {
097    try {
098      // 1. list all splitting files
099      List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
100      // 2. create corresponding procedures
101      return createSplitWALProcedures(splittingFiles, crashedServer);
102    } catch (IOException e) {
103      LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e);
104      throw e;
105    }
106  }
107
108  public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
109      throws IOException {
110    List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
111    FileStatus[] fileStatuses =
112        SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
113    LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.length, splitMeta);
114    return Lists.newArrayList(fileStatuses);
115  }
116
117  private Path getWALSplitDir(ServerName serverName) {
118    Path logDir =
119        new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
120    return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
121  }
122
123  /**
124   * Archive processed WAL
125   */
126  public void archive(String wal) throws IOException {
127    WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
128  }
129
130  public void deleteWALDir(ServerName serverName) throws IOException {
131    Path splitDir = getWALSplitDir(serverName);
132    try {
133      if (!fs.delete(splitDir, false)) {
134        LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
135      }
136    } catch (PathIsNotEmptyDirectoryException e) {
137      FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir);
138      LOG.warn("PathIsNotEmptyDirectoryException {}",
139        Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
140      throw e;
141    }
142  }
143
144  public boolean isSplitWALFinished(String walPath) throws IOException {
145    return !fs.exists(new Path(rootDir, walPath));
146  }
147
148  @VisibleForTesting
149  List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
150      ServerName crashedServer) {
151    return splittingWALs.stream()
152        .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
153        .collect(Collectors.toList());
154  }
155
156  /**
157   * Acquire a split WAL worker
158   * @param procedure split WAL task
159   * @return an available region server which could execute this task
160   * @throws ProcedureSuspendedException if there is no available worker,
161   *         it will throw this exception to WAIT the procedure.
162   */
163  public ServerName acquireSplitWALWorker(Procedure<?> procedure)
164      throws ProcedureSuspendedException {
165    Optional<ServerName> worker = splitWorkerAssigner.acquire();
166    if (worker.isPresent()) {
167      LOG.debug("Acquired split WAL worker={}", worker.get());
168      return worker.get();
169    }
170    splitWorkerAssigner.suspend(procedure);
171    throw new ProcedureSuspendedException();
172  }
173
174  /**
175   * After the worker finished the split WAL task, it will release the worker, and wake up all the
176   * suspend procedures in the ProcedureEvent
177   * @param worker worker which is about to release
178   * @param scheduler scheduler which is to wake up the procedure event
179   */
180  public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
181    LOG.debug("Release split WAL worker={}", worker);
182    splitWorkerAssigner.release(worker);
183    splitWorkerAssigner.wake(scheduler);
184  }
185
186  /**
187   * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
188   * tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
189   * Thus we should add the workers of running tasks to the assigner when we load the procedures
190   * from MasterProcWALs.
191   * @param worker region server which is executing a split WAL task
192   */
193  public void addUsedSplitWALWorker(ServerName worker){
194    splitWorkerAssigner.addUsedWorker(worker);
195  }
196
197  /**
198   * help assign and release a worker for each WAL splitting task
199   * For each worker, concurrent running splitting task should be no more than maxSplitTasks
200   * If a task failed to acquire a worker, it will suspend and wait for workers available
201   *
202   */
203  private static final class SplitWorkerAssigner implements ServerListener {
204    private int maxSplitTasks;
205    private final ProcedureEvent<?> event;
206    private Map<ServerName, Integer> currentWorkers = new HashMap<>();
207    private MasterServices master;
208
209    public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
210      this.maxSplitTasks = maxSplitTasks;
211      this.master = master;
212      this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
213      // ServerManager might be null in a test context where we are mocking; allow for this
214      ServerManager sm = this.master.getServerManager();
215      if (sm != null) {
216        sm.registerListener(this);
217      }
218    }
219
220    public synchronized Optional<ServerName> acquire() {
221      List<ServerName> serverList = master.getServerManager().getOnlineServersList();
222      Collections.shuffle(serverList);
223      Optional<ServerName> worker = serverList.stream().filter(
224        serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
225          .findAny();
226      if (worker.isPresent()) {
227        currentWorkers.compute(worker.get(), (serverName,
228            availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
229      }
230      return worker;
231    }
232
233    public synchronized void release(ServerName serverName) {
234      currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
235    }
236
237    public void suspend(Procedure<?> proc) {
238      event.suspend();
239      event.suspendIfNotReady(proc);
240    }
241
242    public void wake(MasterProcedureScheduler scheduler) {
243      if (!event.isReady()) {
244        event.wake(scheduler);
245      }
246    }
247
248    @Override
249    public void serverAdded(ServerName worker) {
250      this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
251    }
252
253    public synchronized void addUsedWorker(ServerName worker) {
254      // load used worker when master restart
255      currentWorkers.compute(worker, (serverName,
256          availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
257    }
258  }
259}