001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.wal;
020
021import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID;
022import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.RegionInfo;
035// imports for things that haven't moved from regionserver.wal yet.
036import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
037import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
038import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.hadoop.hbase.wal.WAL.Entry;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A WAL Provider that returns a single thread safe WAL that optionally can skip parts of our normal
047 * interactions with HDFS.
048 * <p>
049 * This implementation picks a directory in HDFS based on the same mechanisms as the
050 * {@link FSHLogProvider}. Users can configure how much interaction we have with HDFS with the
051 * configuration property "hbase.wal.iotestprovider.operations". The value should be a comma
052 * separated list of allowed operations:
053 * <ul>
054 * <li><em>append</em> : edits will be written to the underlying filesystem</li>
055 * <li><em>sync</em> : wal syncs will result in hflush calls</li>
056 * <li><em>fileroll</em> : roll requests will result in creating a new file on the underlying
057 * filesystem.</li>
058 * </ul>
059 * Additionally, the special cases "all" and "none" are recognized. If ommited, the value defaults
060 * to "all." Behavior is undefined if "all" or "none" are paired with additional values. Behavior is
061 * also undefined if values not listed above are included.
062 * <p>
063 * Only those operations listed will occur between the returned WAL and HDFS. All others will be
064 * no-ops.
065 * <p>
066 * Note that in the case of allowing "append" operations but not allowing "fileroll", the returned
067 * WAL will just keep writing to the same file. This won't avoid all costs associated with file
068 * management over time, becaue the data set size may result in additional HDFS block allocations.
069 */
070@InterfaceAudience.Private
071public class IOTestProvider implements WALProvider {
072  private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class);
073
074  private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations";
075  private enum AllowedOperations {
076    all,
077    append,
078    sync,
079    fileroll,
080    none;
081  }
082
083  private WALFactory factory;
084
085  private Configuration conf;
086
087  private volatile FSHLog log;
088
089  private String providerId;
090
091  private List<WALActionsListener> listeners = new ArrayList<>();
092  /**
093   * @param factory factory that made us, identity used for FS layout. may not be null
094   * @param conf may not be null
095   * @param providerId differentiate between providers from one facotry, used for FS layout. may be
096   *                   null
097   */
098  @Override
099  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
100    if (factory != null) {
101      throw new IllegalStateException("WALProvider.init should only be called once.");
102    }
103    this.factory = factory;
104    this.conf = conf;
105    this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
106
107
108  }
109
110  @Override
111  public List<WAL> getWALs() {
112    return Collections.singletonList(log);
113  }
114
115  private FSHLog createWAL() throws IOException {
116    String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
117    return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
118        AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
119        HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
120        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
121  }
122
123  @Override
124  public WAL getWAL(RegionInfo region) throws IOException {
125    FSHLog log = this.log;
126    if (log != null) {
127      return log;
128    }
129    synchronized (this) {
130      log = this.log;
131      if (log == null) {
132        log = createWAL();
133        this.log = log;
134      }
135    }
136    return log;
137  }
138
139  @Override
140  public void close() throws IOException {
141    FSHLog log = this.log;
142    if (log != null) {
143      log.close();
144    }
145  }
146
147  @Override
148  public void shutdown() throws IOException {
149    FSHLog log = this.log;
150    if (log != null) {
151      log.shutdown();
152    }
153  }
154
155  private static class IOTestWAL extends FSHLog {
156
157    private final boolean doFileRolls;
158
159    // Used to differntiate between roll calls before and after we finish construction.
160    private final boolean initialized;
161
162    /**
163     * Create an edit log at the given <code>dir</code> location.
164     *
165     * You should never have to load an existing log. If there is a log at
166     * startup, it should have already been processed and deleted by the time the
167     * WAL object is started up.
168     *
169     * @param fs filesystem handle
170     * @param rootDir path to where logs and oldlogs
171     * @param logDir dir where wals are stored
172     * @param archiveDir dir where wals are archived
173     * @param conf configuration to use
174     * @param listeners Listeners on WAL events. Listeners passed here will
175     * be registered before we do anything else; e.g. the
176     * Constructor {@link #rollWriter()}.
177     * @param failIfWALExists If true IOException will be thrown if files related to this wal
178     *        already exist.
179     * @param prefix should always be hostname and port in distributed env and
180     *        it will be URL encoded before being used.
181     *        If prefix is null, "wal" will be used
182     * @param suffix will be url encoded. null is treated as empty. non-empty must start with
183     *        {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
184     * @throws IOException
185     */
186    public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir,
187        final String archiveDir, final Configuration conf,
188        final List<WALActionsListener> listeners,
189        final boolean failIfWALExists, final String prefix, final String suffix)
190        throws IOException {
191      super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
192      Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
193      doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) ||
194          operations.contains(AllowedOperations.fileroll.name());
195      initialized = true;
196      LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled"));
197    }
198
199    private Writer noRollsWriter;
200
201    // creatWriterInstance is where the new pipeline is set up for doing file rolls
202    // if we are skipping it, just keep returning the same writer.
203    @Override
204    protected Writer createWriterInstance(final Path path) throws IOException {
205      // we get called from the FSHLog constructor (!); always roll in this case since
206      // we don't know yet if we're supposed to generally roll and
207      // we need an initial file in the case of doing appends but no rolls.
208      if (!initialized || doFileRolls) {
209        LOG.info("creating new writer instance.");
210        final ProtobufLogWriter writer = new IOTestWriter();
211        try {
212          writer.init(fs, path, conf, false, this.blocksize);
213        } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
214          throw new IOException("Can't create writer instance because underlying FileSystem " +
215              "doesn't support needed stream capabilities.", exception);
216        }
217        if (!initialized) {
218          LOG.info("storing initial writer instance in case file rolling isn't allowed.");
219          noRollsWriter = writer;
220        }
221        return writer;
222      } else {
223        LOG.info("WAL rolling disabled, returning the first writer.");
224        // Initial assignment happens during the constructor call, so there ought not be
225        // a race for first assignment.
226        return noRollsWriter;
227      }
228    }
229  }
230
231  /**
232   * Presumes init will be called by a single thread prior to any access of other methods.
233   */
234  private static class IOTestWriter extends ProtobufLogWriter {
235    private boolean doAppends;
236    private boolean doSyncs;
237
238    @Override
239    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
240        long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException {
241      Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
242      if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
243        doAppends = doSyncs = true;
244      } else if (operations.contains(AllowedOperations.none.name())) {
245        doAppends = doSyncs = false;
246      } else {
247        doAppends = operations.contains(AllowedOperations.append.name());
248        doSyncs = operations.contains(AllowedOperations.sync.name());
249      }
250      LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
251          " and syncs " + (doSyncs ? "enabled" : "disabled"));
252      super.init(fs, path, conf, overwritable, blocksize);
253    }
254
255    @Override
256    protected String getWriterClassName() {
257      return ProtobufLogWriter.class.getSimpleName();
258    }
259
260    @Override
261    public void append(Entry entry) throws IOException {
262      if (doAppends) {
263        super.append(entry);
264      }
265    }
266
267    @Override
268    public void sync() throws IOException {
269      if (doSyncs) {
270        super.sync();
271      }
272    }
273  }
274
275  @Override
276  public long getNumLogFiles() {
277    return this.log.getNumLogFiles();
278  }
279
280  @Override
281  public long getLogFileSize() {
282    return this.log.getLogFileSize();
283  }
284
285  @Override
286  public void addWALActionsListener(WALActionsListener listener) {
287    // TODO Implement WALProvider.addWALActionLister
288
289  }
290}