001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.OptionalLong;
025import java.util.Set;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.atomic.AtomicBoolean;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Abortable;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043// imports for things that haven't moved from regionserver.wal yet.
044
045/**
046 * No-op implementation of {@link WALProvider} used when the WAL is disabled. Should only be used
047 * when severe data loss is acceptable.
048 */
049@InterfaceAudience.Private
050class DisabledWALProvider implements WALProvider {
051
052  private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class);
053
054  WAL disabled;
055
056  @Override
057  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
058    throws IOException {
059    if (null != disabled) {
060      throw new IllegalStateException("WALProvider.init should only be called once.");
061    }
062    if (null == providerId) {
063      providerId = "defaultDisabled";
064    }
065    disabled = new DisabledWAL(new Path(CommonFSUtils.getWALRootDir(conf), providerId), conf, null);
066  }
067
068  @Override
069  public List<WAL> getWALs() {
070    List<WAL> wals = new ArrayList<>(1);
071    wals.add(disabled);
072    return wals;
073  }
074
075  @Override
076  public WAL getWAL(RegionInfo region) throws IOException {
077    return disabled;
078  }
079
080  @Override
081  public void close() throws IOException {
082    disabled.close();
083  }
084
085  @Override
086  public void shutdown() throws IOException {
087    disabled.shutdown();
088  }
089
090  private static class DisabledWAL implements WAL {
091    protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
092    protected final Path path;
093    protected final WALCoprocessorHost coprocessorHost;
094    protected final AtomicBoolean closed = new AtomicBoolean(false);
095
096    public DisabledWAL(final Path path, final Configuration conf,
097      final List<WALActionsListener> listeners) {
098      this.coprocessorHost = new WALCoprocessorHost(this, conf);
099      this.path = path;
100      if (null != listeners) {
101        for (WALActionsListener listener : listeners) {
102          registerWALActionsListener(listener);
103        }
104      }
105    }
106
107    @Override
108    public void registerWALActionsListener(final WALActionsListener listener) {
109      listeners.add(listener);
110    }
111
112    @Override
113    public boolean unregisterWALActionsListener(final WALActionsListener listener) {
114      return listeners.remove(listener);
115    }
116
117    @Override
118    public Map<byte[], List<byte[]>> rollWriter() {
119      if (!listeners.isEmpty()) {
120        for (WALActionsListener listener : listeners) {
121          listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
122        }
123        for (WALActionsListener listener : listeners) {
124          try {
125            listener.preLogRoll(path, path);
126          } catch (IOException exception) {
127            LOG.debug("Ignoring exception from listener.", exception);
128          }
129        }
130        for (WALActionsListener listener : listeners) {
131          try {
132            listener.postLogRoll(path, path);
133          } catch (IOException exception) {
134            LOG.debug("Ignoring exception from listener.", exception);
135          }
136        }
137      }
138      return null;
139    }
140
141    @Override
142    public Map<byte[], List<byte[]>> rollWriter(boolean force) {
143      return rollWriter();
144    }
145
146    @Override
147    public void shutdown() {
148      if (closed.compareAndSet(false, true)) {
149        if (!this.listeners.isEmpty()) {
150          for (WALActionsListener listener : this.listeners) {
151            listener.logCloseRequested();
152          }
153        }
154      }
155    }
156
157    @Override
158    public void close() {
159      shutdown();
160    }
161
162    @Override
163    public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
164      return append(info, key, edits, true);
165    }
166
167    @Override
168    public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
169      return append(info, key, edits, false);
170    }
171
172    private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
173      throws IOException {
174      WriteEntry writeEntry = key.getMvcc().begin();
175      if (!edits.isReplay()) {
176        for (Cell cell : edits.getCells()) {
177          PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
178        }
179      }
180      key.setWriteEntry(writeEntry);
181      if (!this.listeners.isEmpty()) {
182        final long start = System.nanoTime();
183        long len = 0;
184        for (Cell cell : edits.getCells()) {
185          len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
186        }
187        final long elapsed = (System.nanoTime() - start) / 1000000L;
188        for (WALActionsListener listener : this.listeners) {
189          listener.postAppend(len, elapsed, key, edits);
190        }
191      }
192      return -1;
193    }
194
195    @Override
196    public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
197      boolean onlyIfGreater) {
198      return;
199    }
200
201    @Override
202    public void sync() {
203      if (!this.listeners.isEmpty()) {
204        for (WALActionsListener listener : this.listeners) {
205          listener.postSync(0L, 0);
206        }
207      }
208    }
209
210    @Override
211    public void sync(long txid) {
212      sync();
213    }
214
215    @Override
216    public Long startCacheFlush(final byte[] encodedRegionName,
217      Map<byte[], Long> flushedFamilyNamesToSeq) {
218      return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
219    }
220
221    @Override
222    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
223      if (closed.get()) return null;
224      return HConstants.NO_SEQNUM;
225    }
226
227    @Override
228    public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
229    }
230
231    @Override
232    public void abortCacheFlush(byte[] encodedRegionName) {
233    }
234
235    @Override
236    public WALCoprocessorHost getCoprocessorHost() {
237      return coprocessorHost;
238    }
239
240    @Override
241    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
242      return HConstants.NO_SEQNUM;
243    }
244
245    @Override
246    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
247      return HConstants.NO_SEQNUM;
248    }
249
250    @Override
251    public String toString() {
252      return "WAL disabled.";
253    }
254
255    @Override
256    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
257      return OptionalLong.empty();
258    }
259  }
260
261  @Override
262  public long getNumLogFiles() {
263    return 0;
264  }
265
266  @Override
267  public long getLogFileSize() {
268    return 0;
269  }
270
271  @Override
272  public void addWALActionsListener(WALActionsListener listener) {
273    disabled.registerWALActionsListener(listener);
274  }
275}