001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.OptionalLong;
025import java.util.Set;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.atomic.AtomicBoolean;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.PrivateCellUtil;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
036import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
037import org.apache.hadoop.hbase.util.CommonFSUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042// imports for things that haven't moved from regionserver.wal yet.
043
044/**
045 * No-op implementation of {@link WALProvider} used when the WAL is disabled.
046 *
047 * Should only be used when severe data loss is acceptable.
048 *
049 */
050@InterfaceAudience.Private
051class DisabledWALProvider implements WALProvider {
052
053  private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class);
054
055  WAL disabled;
056
057  @Override
058  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
059    if (null != disabled) {
060      throw new IllegalStateException("WALProvider.init should only be called once.");
061    }
062    if (null == providerId) {
063      providerId = "defaultDisabled";
064    }
065    disabled = new DisabledWAL(new Path(CommonFSUtils.getWALRootDir(conf), providerId), conf, null);
066  }
067
068  @Override
069  public List<WAL> getWALs() {
070    List<WAL> wals = new ArrayList<>(1);
071    wals.add(disabled);
072    return wals;
073  }
074
075  @Override
076  public WAL getWAL(RegionInfo region) throws IOException {
077    return disabled;
078  }
079
080  @Override
081  public void close() throws IOException {
082    disabled.close();
083  }
084
085  @Override
086  public void shutdown() throws IOException {
087    disabled.shutdown();
088  }
089
090  private static class DisabledWAL implements WAL {
091    protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
092    protected final Path path;
093    protected final WALCoprocessorHost coprocessorHost;
094    protected final AtomicBoolean closed = new AtomicBoolean(false);
095
096    public DisabledWAL(final Path path, final Configuration conf,
097        final List<WALActionsListener> listeners) {
098      this.coprocessorHost = new WALCoprocessorHost(this, conf);
099      this.path = path;
100      if (null != listeners) {
101        for(WALActionsListener listener : listeners) {
102          registerWALActionsListener(listener);
103        }
104      }
105    }
106
107    @Override
108    public void registerWALActionsListener(final WALActionsListener listener) {
109      listeners.add(listener);
110    }
111
112    @Override
113    public boolean unregisterWALActionsListener(final WALActionsListener listener) {
114      return listeners.remove(listener);
115    }
116
117    @Override
118    public Map<byte[], List<byte[]>> rollWriter() {
119      if (!listeners.isEmpty()) {
120        for (WALActionsListener listener : listeners) {
121          listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
122        }
123        for (WALActionsListener listener : listeners) {
124          try {
125            listener.preLogRoll(path, path);
126          } catch (IOException exception) {
127            LOG.debug("Ignoring exception from listener.", exception);
128          }
129        }
130        for (WALActionsListener listener : listeners) {
131          try {
132            listener.postLogRoll(path, path);
133          } catch (IOException exception) {
134            LOG.debug("Ignoring exception from listener.", exception);
135          }
136        }
137      }
138      return null;
139    }
140
141    @Override
142    public Map<byte[], List<byte[]>> rollWriter(boolean force) {
143      return rollWriter();
144    }
145
146    @Override
147    public void shutdown() {
148      if(closed.compareAndSet(false, true)) {
149        if (!this.listeners.isEmpty()) {
150          for (WALActionsListener listener : this.listeners) {
151            listener.logCloseRequested();
152          }
153        }
154      }
155    }
156
157    @Override
158    public void close() {
159      shutdown();
160    }
161
162    @Override
163    public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
164      return append(info, key, edits, true);
165    }
166
167    @Override
168    public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
169      throws IOException {
170      return append(info, key, edits, false);
171    }
172
173    private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
174        throws IOException {
175      WriteEntry writeEntry = key.getMvcc().begin();
176      if (!edits.isReplay()) {
177        for (Cell cell : edits.getCells()) {
178          PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
179        }
180      }
181      key.setWriteEntry(writeEntry);
182      if (!this.listeners.isEmpty()) {
183        final long start = System.nanoTime();
184        long len = 0;
185        for (Cell cell : edits.getCells()) {
186          len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
187        }
188        final long elapsed = (System.nanoTime() - start) / 1000000L;
189        for (WALActionsListener listener : this.listeners) {
190          listener.postAppend(len, elapsed, key, edits);
191        }
192      }
193      return -1;
194    }
195
196    @Override
197    public void updateStore(byte[] encodedRegionName, byte[] familyName,
198        Long sequenceid, boolean onlyIfGreater) { return; }
199
200    @Override
201    public void sync() {
202      if (!this.listeners.isEmpty()) {
203        for (WALActionsListener listener : this.listeners) {
204          listener.postSync(0L, 0);
205        }
206      }
207    }
208
209    @Override
210    public void sync(long txid) {
211      sync();
212    }
213
214    @Override
215    public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
216        flushedFamilyNamesToSeq) {
217      return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
218    }
219
220    @Override
221    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
222      if (closed.get()) return null;
223      return HConstants.NO_SEQNUM;
224    }
225
226    @Override
227    public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
228    }
229
230    @Override
231    public void abortCacheFlush(byte[] encodedRegionName) {
232    }
233
234    @Override
235    public WALCoprocessorHost getCoprocessorHost() {
236      return coprocessorHost;
237    }
238
239    @Override
240    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
241      return HConstants.NO_SEQNUM;
242    }
243
244    @Override
245    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
246      return HConstants.NO_SEQNUM;
247    }
248
249    @Override
250    public String toString() {
251      return "WAL disabled.";
252    }
253
254    @Override
255    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
256      return OptionalLong.empty();
257    }
258  }
259
260  @Override
261  public long getNumLogFiles() {
262    return 0;
263  }
264
265  @Override
266  public long getLogFileSize() {
267    return 0;
268  }
269
270  @Override
271  public void addWALActionsListener(WALActionsListener listener) {
272    disabled.registerWALActionsListener(listener);
273  }
274}