001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
023import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
024
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
041import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
044import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
045import org.apache.hadoop.hbase.util.CommonFSUtils;
046import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
047import org.apache.hadoop.hbase.wal.WALSplitUtil;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
054 * {@link SplitWALProcedure}. Total number of workers is (number of online servers) *
055 * (HBASE_SPLIT_WAL_MAX_SPLITTER). Helps assign and release workers for split tasks. Provide helper
056 * method to delete split WAL file and directory. The user can get the SplitWALProcedures via
057 * splitWALs(crashedServer, splitMeta) can get the files that need to split via
058 * getWALsToSplit(crashedServer, splitMeta) can delete the splitting WAL and directory via
059 * deleteSplitWAL(wal) and deleteSplitWAL(crashedServer) can check if splitting WALs of a crashed
060 * server is success via isSplitWALFinished(walPath) can acquire and release a worker for splitting
061 * WAL via acquireSplitWALWorker(procedure) and releaseSplitWALWorker(worker, scheduler) This class
062 * is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
063 * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
064 * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed after
065 * we switch to procedure-based WAL splitting.
066 * @see SplitLogManager for the original distributed split WAL manager.
067 */
068@InterfaceAudience.Private
069public class SplitWALManager {
070  private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
071
072  private final MasterServices master;
073  private final SplitWorkerAssigner splitWorkerAssigner;
074  private final Path rootDir;
075  private final FileSystem fs;
076  private final Configuration conf;
077  private final Path walArchiveDir;
078
079  public SplitWALManager(MasterServices master) throws IOException {
080    this.master = master;
081    this.conf = master.getConfiguration();
082    this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
083      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
084    this.rootDir = master.getMasterFileSystem().getWALRootDir();
085    this.fs = master.getMasterFileSystem().getWALFileSystem();
086    this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
087  }
088
089  public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) throws IOException {
090    try {
091      // 1. list all splitting files
092      List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
093      // 2. create corresponding procedures
094      return createSplitWALProcedures(splittingFiles, crashedServer);
095    } catch (IOException e) {
096      LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e);
097      throw e;
098    }
099  }
100
101  public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
102    throws IOException {
103    List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
104    List<FileStatus> fileStatuses =
105      SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
106    LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.size(), splitMeta);
107    return fileStatuses;
108  }
109
110  private Path getWALSplitDir(ServerName serverName) {
111    Path logDir =
112      new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
113    return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
114  }
115
116  /**
117   * Archive processed WAL
118   */
119  public void archive(String wal) throws IOException {
120    WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
121  }
122
123  public void deleteWALDir(ServerName serverName) throws IOException {
124    Path splitDir = getWALSplitDir(serverName);
125    try {
126      if (!fs.delete(splitDir, false)) {
127        LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
128      }
129    } catch (PathIsNotEmptyDirectoryException e) {
130      FileStatus[] files = CommonFSUtils.listStatus(fs, splitDir);
131      LOG.warn("PathIsNotEmptyDirectoryException {}",
132        Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
133      throw e;
134    }
135  }
136
137  public boolean isSplitWALFinished(String walPath) throws IOException {
138    return !fs.exists(new Path(rootDir, walPath));
139  }
140
141  List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
142    ServerName crashedServer) {
143    return splittingWALs.stream()
144      .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
145      .collect(Collectors.toList());
146  }
147
148  /**
149   * Acquire a split WAL worker
150   * @param procedure split WAL task
151   * @return an available region server which could execute this task
152   * @throws ProcedureSuspendedException if there is no available worker, it will throw this
153   *                                     exception to WAIT the procedure.
154   */
155  public ServerName acquireSplitWALWorker(Procedure<?> procedure)
156    throws ProcedureSuspendedException {
157    Optional<ServerName> worker = splitWorkerAssigner.acquire();
158    if (worker.isPresent()) {
159      LOG.debug("Acquired split WAL worker={}", worker.get());
160      return worker.get();
161    }
162    splitWorkerAssigner.suspend(procedure);
163    throw new ProcedureSuspendedException();
164  }
165
166  /**
167   * After the worker finished the split WAL task, it will release the worker, and wake up all the
168   * suspend procedures in the ProcedureEvent
169   * @param worker    worker which is about to release
170   * @param scheduler scheduler which is to wake up the procedure event
171   */
172  public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
173    LOG.debug("Release split WAL worker={}", worker);
174    splitWorkerAssigner.release(worker);
175    splitWorkerAssigner.wake(scheduler);
176  }
177
178  /**
179   * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
180   * tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
181   * Thus we should add the workers of running tasks to the assigner when we load the procedures
182   * from MasterProcWALs.
183   * @param worker region server which is executing a split WAL task
184   */
185  public void addUsedSplitWALWorker(ServerName worker) {
186    splitWorkerAssigner.addUsedWorker(worker);
187  }
188
189  /**
190   * help assign and release a worker for each WAL splitting task For each worker, concurrent
191   * running splitting task should be no more than maxSplitTasks If a task failed to acquire a
192   * worker, it will suspend and wait for workers available
193   */
194  private static final class SplitWorkerAssigner implements ServerListener {
195    private int maxSplitTasks;
196    private final ProcedureEvent<?> event;
197    private Map<ServerName, Integer> currentWorkers = new HashMap<>();
198    private MasterServices master;
199
200    public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
201      this.maxSplitTasks = maxSplitTasks;
202      this.master = master;
203      this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
204      // ServerManager might be null in a test context where we are mocking; allow for this
205      ServerManager sm = this.master.getServerManager();
206      if (sm != null) {
207        sm.registerListener(this);
208      }
209    }
210
211    public synchronized Optional<ServerName> acquire() {
212      List<ServerName> serverList = master.getServerManager().getOnlineServersList();
213      Collections.shuffle(serverList);
214      Optional<ServerName> worker = serverList.stream().filter(
215        serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
216        .findAny();
217      if (worker.isPresent()) {
218        currentWorkers.compute(worker.get(), (serverName,
219          availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
220      }
221      return worker;
222    }
223
224    public synchronized void release(ServerName serverName) {
225      currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
226    }
227
228    public void suspend(Procedure<?> proc) {
229      event.suspend();
230      event.suspendIfNotReady(proc);
231    }
232
233    public void wake(MasterProcedureScheduler scheduler) {
234      if (!event.isReady()) {
235        event.wake(scheduler);
236      }
237    }
238
239    @Override
240    public void serverAdded(ServerName worker) {
241      this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
242    }
243
244    public synchronized void addUsedWorker(ServerName worker) {
245      // load used worker when master restart
246      currentWorkers.compute(worker, (serverName,
247        availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
248    }
249  }
250}