001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.OptionalLong;
025import java.util.Set;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.atomic.AtomicBoolean;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Abortable;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043// imports for things that haven't moved from regionserver.wal yet.
044
045/**
046 * No-op implementation of {@link WALProvider} used when the WAL is disabled.
047 *
048 * Should only be used when severe data loss is acceptable.
049 *
050 */
051@InterfaceAudience.Private
052class DisabledWALProvider implements WALProvider {
053
054  private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class);
055
056  WAL disabled;
057
058  @Override
059  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
060      throws IOException {
061    if (null != disabled) {
062      throw new IllegalStateException("WALProvider.init should only be called once.");
063    }
064    if (null == providerId) {
065      providerId = "defaultDisabled";
066    }
067    disabled = new DisabledWAL(new Path(CommonFSUtils.getWALRootDir(conf), providerId), conf, null);
068  }
069
070  @Override
071  public List<WAL> getWALs() {
072    List<WAL> wals = new ArrayList<>(1);
073    wals.add(disabled);
074    return wals;
075  }
076
077  @Override
078  public WAL getWAL(RegionInfo region) throws IOException {
079    return disabled;
080  }
081
082  @Override
083  public void close() throws IOException {
084    disabled.close();
085  }
086
087  @Override
088  public void shutdown() throws IOException {
089    disabled.shutdown();
090  }
091
092  private static class DisabledWAL implements WAL {
093    protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
094    protected final Path path;
095    protected final WALCoprocessorHost coprocessorHost;
096    protected final AtomicBoolean closed = new AtomicBoolean(false);
097
098    public DisabledWAL(final Path path, final Configuration conf,
099        final List<WALActionsListener> listeners) {
100      this.coprocessorHost = new WALCoprocessorHost(this, conf);
101      this.path = path;
102      if (null != listeners) {
103        for(WALActionsListener listener : listeners) {
104          registerWALActionsListener(listener);
105        }
106      }
107    }
108
109    @Override
110    public void registerWALActionsListener(final WALActionsListener listener) {
111      listeners.add(listener);
112    }
113
114    @Override
115    public boolean unregisterWALActionsListener(final WALActionsListener listener) {
116      return listeners.remove(listener);
117    }
118
119    @Override
120    public Map<byte[], List<byte[]>> rollWriter() {
121      if (!listeners.isEmpty()) {
122        for (WALActionsListener listener : listeners) {
123          listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
124        }
125        for (WALActionsListener listener : listeners) {
126          try {
127            listener.preLogRoll(path, path);
128          } catch (IOException exception) {
129            LOG.debug("Ignoring exception from listener.", exception);
130          }
131        }
132        for (WALActionsListener listener : listeners) {
133          try {
134            listener.postLogRoll(path, path);
135          } catch (IOException exception) {
136            LOG.debug("Ignoring exception from listener.", exception);
137          }
138        }
139      }
140      return null;
141    }
142
143    @Override
144    public Map<byte[], List<byte[]>> rollWriter(boolean force) {
145      return rollWriter();
146    }
147
148    @Override
149    public void shutdown() {
150      if(closed.compareAndSet(false, true)) {
151        if (!this.listeners.isEmpty()) {
152          for (WALActionsListener listener : this.listeners) {
153            listener.logCloseRequested();
154          }
155        }
156      }
157    }
158
159    @Override
160    public void close() {
161      shutdown();
162    }
163
164    @Override
165    public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
166      return append(info, key, edits, true);
167    }
168
169    @Override
170    public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
171      throws IOException {
172      return append(info, key, edits, false);
173    }
174
175    private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
176        throws IOException {
177      WriteEntry writeEntry = key.getMvcc().begin();
178      if (!edits.isReplay()) {
179        for (Cell cell : edits.getCells()) {
180          PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
181        }
182      }
183      key.setWriteEntry(writeEntry);
184      if (!this.listeners.isEmpty()) {
185        final long start = System.nanoTime();
186        long len = 0;
187        for (Cell cell : edits.getCells()) {
188          len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
189        }
190        final long elapsed = (System.nanoTime() - start) / 1000000L;
191        for (WALActionsListener listener : this.listeners) {
192          listener.postAppend(len, elapsed, key, edits);
193        }
194      }
195      return -1;
196    }
197
198    @Override
199    public void updateStore(byte[] encodedRegionName, byte[] familyName,
200        Long sequenceid, boolean onlyIfGreater) { return; }
201
202    @Override
203    public void sync() {
204      if (!this.listeners.isEmpty()) {
205        for (WALActionsListener listener : this.listeners) {
206          listener.postSync(0L, 0);
207        }
208      }
209    }
210
211    @Override
212    public void sync(long txid) {
213      sync();
214    }
215
216    @Override
217    public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
218        flushedFamilyNamesToSeq) {
219      return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
220    }
221
222    @Override
223    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
224      if (closed.get()) return null;
225      return HConstants.NO_SEQNUM;
226    }
227
228    @Override
229    public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
230    }
231
232    @Override
233    public void abortCacheFlush(byte[] encodedRegionName) {
234    }
235
236    @Override
237    public WALCoprocessorHost getCoprocessorHost() {
238      return coprocessorHost;
239    }
240
241    @Override
242    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
243      return HConstants.NO_SEQNUM;
244    }
245
246    @Override
247    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
248      return HConstants.NO_SEQNUM;
249    }
250
251    @Override
252    public String toString() {
253      return "WAL disabled.";
254    }
255
256    @Override
257    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
258      return OptionalLong.empty();
259    }
260  }
261
262  @Override
263  public long getNumLogFiles() {
264    return 0;
265  }
266
267  @Override
268  public long getLogFileSize() {
269    return 0;
270  }
271
272  @Override
273  public void addWALActionsListener(WALActionsListener listener) {
274    disabled.registerWALActionsListener(listener);
275  }
276}