001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.OptionalLong;
025import java.util.Set;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043// imports for things that haven't moved from regionserver.wal yet.
044
045/**
046 * No-op implementation of {@link WALProvider} used when the WAL is disabled.
047 *
048 * Should only be used when severe data loss is acceptable.
049 *
050 */
051@InterfaceAudience.Private
052class DisabledWALProvider implements WALProvider {
053
054  private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class);
055
056  WAL disabled;
057
058  @Override
059  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
060    if (null != disabled) {
061      throw new IllegalStateException("WALProvider.init should only be called once.");
062    }
063    if (null == providerId) {
064      providerId = "defaultDisabled";
065    }
066    disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null);
067  }
068
069  @Override
070  public List<WAL> getWALs() {
071    List<WAL> wals = new ArrayList<>(1);
072    wals.add(disabled);
073    return wals;
074  }
075
076  @Override
077  public WAL getWAL(RegionInfo region) throws IOException {
078    return disabled;
079  }
080
081  @Override
082  public void close() throws IOException {
083    disabled.close();
084  }
085
086  @Override
087  public void shutdown() throws IOException {
088    disabled.shutdown();
089  }
090
091  private static class DisabledWAL implements WAL {
092    protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
093    protected final Path path;
094    protected final WALCoprocessorHost coprocessorHost;
095    protected final AtomicBoolean closed = new AtomicBoolean(false);
096
097    public DisabledWAL(final Path path, final Configuration conf,
098        final List<WALActionsListener> listeners) {
099      this.coprocessorHost = new WALCoprocessorHost(this, conf);
100      this.path = path;
101      if (null != listeners) {
102        for(WALActionsListener listener : listeners) {
103          registerWALActionsListener(listener);
104        }
105      }
106    }
107
108    @Override
109    public void registerWALActionsListener(final WALActionsListener listener) {
110      listeners.add(listener);
111    }
112
113    @Override
114    public boolean unregisterWALActionsListener(final WALActionsListener listener) {
115      return listeners.remove(listener);
116    }
117
118    @Override
119    public byte[][] rollWriter() {
120      if (!listeners.isEmpty()) {
121        for (WALActionsListener listener : listeners) {
122          listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
123        }
124        for (WALActionsListener listener : listeners) {
125          try {
126            listener.preLogRoll(path, path);
127          } catch (IOException exception) {
128            LOG.debug("Ignoring exception from listener.", exception);
129          }
130        }
131        for (WALActionsListener listener : listeners) {
132          try {
133            listener.postLogRoll(path, path);
134          } catch (IOException exception) {
135            LOG.debug("Ignoring exception from listener.", exception);
136          }
137        }
138      }
139      return null;
140    }
141
142    @Override
143    public byte[][] rollWriter(boolean force) {
144      return rollWriter();
145    }
146
147    @Override
148    public void shutdown() {
149      if(closed.compareAndSet(false, true)) {
150        if (!this.listeners.isEmpty()) {
151          for (WALActionsListener listener : this.listeners) {
152            listener.logCloseRequested();
153          }
154        }
155      }
156    }
157
158    @Override
159    public void close() {
160      shutdown();
161    }
162
163    @Override
164    public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
165        throws IOException {
166      WriteEntry writeEntry = key.getMvcc().begin();
167      if (!edits.isReplay()) {
168        for (Cell cell : edits.getCells()) {
169          PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
170        }
171      }
172      key.setWriteEntry(writeEntry);
173      if (!this.listeners.isEmpty()) {
174        final long start = System.nanoTime();
175        long len = 0;
176        for (Cell cell : edits.getCells()) {
177          len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
178        }
179        final long elapsed = (System.nanoTime() - start) / 1000000L;
180        for (WALActionsListener listener : this.listeners) {
181          listener.postAppend(len, elapsed, key, edits);
182        }
183      }
184      return -1;
185    }
186
187    @Override
188    public void updateStore(byte[] encodedRegionName, byte[] familyName,
189        Long sequenceid, boolean onlyIfGreater) { return; }
190
191    @Override
192    public void sync() {
193      if (!this.listeners.isEmpty()) {
194        for (WALActionsListener listener : this.listeners) {
195          listener.postSync(0L, 0);
196        }
197      }
198    }
199
200    @Override
201    public void sync(long txid) {
202      sync();
203    }
204
205    @Override
206    public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
207        flushedFamilyNamesToSeq) {
208      return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
209    }
210
211    @Override
212    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
213      if (closed.get()) return null;
214      return HConstants.NO_SEQNUM;
215    }
216
217    @Override
218    public void completeCacheFlush(final byte[] encodedRegionName) {
219    }
220
221    @Override
222    public void abortCacheFlush(byte[] encodedRegionName) {
223    }
224
225    @Override
226    public WALCoprocessorHost getCoprocessorHost() {
227      return coprocessorHost;
228    }
229
230    @Override
231    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
232      return HConstants.NO_SEQNUM;
233    }
234
235    @Override
236    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
237      return HConstants.NO_SEQNUM;
238    }
239
240    @Override
241    public String toString() {
242      return "WAL disabled.";
243    }
244
245    @Override
246    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
247      return OptionalLong.empty();
248    }
249  }
250
251  @Override
252  public long getNumLogFiles() {
253    return 0;
254  }
255
256  @Override
257  public long getLogFileSize() {
258    return 0;
259  }
260
261  @Override
262  public void addWALActionsListener(WALActionsListener listener) {
263    disabled.registerWALActionsListener(listener);
264  }
265}