001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID;
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
037import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
038import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
039import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
040import org.apache.hadoop.hbase.util.CommonFSUtils;
041import org.apache.hadoop.hbase.wal.WAL.Entry;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal
048 * interactions with HDFS.
049 * <p>
050 * This implementation picks a directory in HDFS based on the same mechanisms as the
051 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the
052 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma
053 * separated list of allowed operations:
054 * <ul>
055 * <li><em>append</em> : edits will be written to the underlying filesystem</li>
056 * <li><em>sync</em> : wal syncs will result in hflush calls</li>
057 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying
058 * filesystem.</li>
059 * </ul>
060 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults
061 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is
062 * also undefined if values not listed above are included.
063 * <p>
064 * Only those operations listed will occur between the returned WAL and HDFS. All others will be
065 * no-ops.
066 * <p>
067 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned
068 * WAL will just keep writing to the same file. This won't avoid all costs associated with file
069 * management over time, becaue the data set size may result in additional HDFS block allocations.
070 */
071@InterfaceAudience.Private
072public class IOTestProvider implements WALProvider {
073  private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class);
074
075  private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations";
076
077  private enum AllowedOperations {
078    all,
079    append,
080    sync,
081    fileroll,
082    none;
083  }
084
085  private WALFactory factory;
086
087  private Configuration conf;
088
089  private volatile FSHLog log;
090
091  private String providerId;
092  protected AtomicBoolean initialized = new AtomicBoolean(false);
093
094  private List<WALActionsListener> listeners = new ArrayList<>();
095
096  /**
097   * @param factory    factory that made us, identity used for FS layout. may not be null
098   * @param conf       may not be null
099   * @param providerId differentiate between providers from one facotry, used for FS layout. may be
100   *                   null
101   */
102  @Override
103  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
104    throws IOException {
105    if (!initialized.compareAndSet(false, true)) {
106      throw new IllegalStateException("WALProvider.init should only be called once.");
107    }
108    this.factory = factory;
109    this.conf = conf;
110    this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
111
112  }
113
114  @Override
115  public List<WAL> getWALs() {
116    return Collections.singletonList(log);
117  }
118
119  private FSHLog createWAL() throws IOException {
120    String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
121    return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
122      AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
123      HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
124      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
125  }
126
127  @Override
128  public WAL getWAL(RegionInfo region) throws IOException {
129    FSHLog log = this.log;
130    if (log != null) {
131      return log;
132    }
133    synchronized (this) {
134      log = this.log;
135      if (log == null) {
136        log = createWAL();
137        this.log = log;
138      }
139    }
140    return log;
141  }
142
143  @Override
144  public void close() throws IOException {
145    FSHLog log = this.log;
146    if (log != null) {
147      log.close();
148    }
149  }
150
151  @Override
152  public void shutdown() throws IOException {
153    FSHLog log = this.log;
154    if (log != null) {
155      log.shutdown();
156    }
157  }
158
159  private static class IOTestWAL extends FSHLog {
160
161    private final boolean doFileRolls;
162
163    // Used to differntiate between roll calls before and after we finish construction.
164    private final boolean initialized;
165
166    /**
167     * Create an edit log at the given <code>dir</code> location. You should never have to load an
168     * existing log. If there is a log at startup, it should have already been processed and deleted
169     * by the time the WAL object is started up.
170     * @param fs              filesystem handle
171     * @param rootDir         path to where logs and oldlogs
172     * @param logDir          dir where wals are stored
173     * @param archiveDir      dir where wals are archived
174     * @param conf            configuration to use
175     * @param listeners       Listeners on WAL events. Listeners passed here will be registered
176     *                        before we do anything else; e.g. the Constructor
177     *                        {@link #rollWriter()}.
178     * @param failIfWALExists If true IOException will be thrown if files related to this wal
179     *                        already exist.
180     * @param prefix          should always be hostname and port in distributed env and it will be
181     *                        URL encoded before being used. If prefix is null, "wal" will be used
182     * @param suffix          will be url encoded. null is treated as empty. non-empty must start
183     *                        with {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
184     */
185    public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir,
186      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
187      final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
188      super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
189      Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
190      doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name())
191        || operations.contains(AllowedOperations.fileroll.name());
192      initialized = true;
193      LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled"));
194    }
195
196    private Writer noRollsWriter;
197
198    // creatWriterInstance is where the new pipeline is set up for doing file rolls
199    // if we are skipping it, just keep returning the same writer.
200    @Override
201    protected Writer createWriterInstance(final Path path) throws IOException {
202      // we get called from the FSHLog constructor (!); always roll in this case since
203      // we don't know yet if we're supposed to generally roll and
204      // we need an initial file in the case of doing appends but no rolls.
205      if (!initialized || doFileRolls) {
206        LOG.info("creating new writer instance.");
207        final ProtobufLogWriter writer = new IOTestWriter();
208        try {
209          writer.init(fs, path, conf, false, this.blocksize,
210            StreamSlowMonitor.create(conf, path.getName()));
211        } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
212          throw new IOException("Can't create writer instance because underlying FileSystem "
213            + "doesn't support needed stream capabilities.", exception);
214        }
215        if (!initialized) {
216          LOG.info("storing initial writer instance in case file rolling isn't allowed.");
217          noRollsWriter = writer;
218        }
219        return writer;
220      } else {
221        LOG.info("WAL rolling disabled, returning the first writer.");
222        // Initial assignment happens during the constructor call, so there ought not be
223        // a race for first assignment.
224        return noRollsWriter;
225      }
226    }
227  }
228
229  /**
230   * Presumes init will be called by a single thread prior to any access of other methods.
231   */
232  private static class IOTestWriter extends ProtobufLogWriter {
233    private boolean doAppends;
234    private boolean doSyncs;
235
236    @Override
237    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
238      long blocksize, StreamSlowMonitor monitor)
239      throws IOException, CommonFSUtils.StreamLacksCapabilityException {
240      Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
241      if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
242        doAppends = doSyncs = true;
243      } else if (operations.contains(AllowedOperations.none.name())) {
244        doAppends = doSyncs = false;
245      } else {
246        doAppends = operations.contains(AllowedOperations.append.name());
247        doSyncs = operations.contains(AllowedOperations.sync.name());
248      }
249      LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled")
250        + " and syncs " + (doSyncs ? "enabled" : "disabled"));
251      super.init(fs, path, conf, overwritable, blocksize, monitor);
252    }
253
254    @Override
255    public void append(Entry entry) throws IOException {
256      if (doAppends) {
257        super.append(entry);
258      }
259    }
260
261    @Override
262    public void sync(boolean forceSync) throws IOException {
263      if (doSyncs) {
264        super.sync(forceSync);
265      }
266    }
267  }
268
269  @Override
270  public long getNumLogFiles() {
271    return this.log.getNumLogFiles();
272  }
273
274  @Override
275  public long getLogFileSize() {
276    return this.log.getLogFileSize();
277  }
278
279  @Override
280  public void addWALActionsListener(WALActionsListener listener) {
281    // TODO Implement WALProvider.addWALActionLister
282
283  }
284}