001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.OptionalLong;
025import java.util.Set;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043// imports for things that haven't moved from regionserver.wal yet.
044
045/**
046 * No-op implementation of {@link WALProvider} used when the WAL is disabled.
047 *
048 * Should only be used when severe data loss is acceptable.
049 *
050 */
051@InterfaceAudience.Private
052class DisabledWALProvider implements WALProvider {
053
054  private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class);
055
056  WAL disabled;
057
058  @Override
059  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
060    if (null != disabled) {
061      throw new IllegalStateException("WALProvider.init should only be called once.");
062    }
063    if (null == providerId) {
064      providerId = "defaultDisabled";
065    }
066    disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null);
067  }
068
069  @Override
070  public List<WAL> getWALs() {
071    List<WAL> wals = new ArrayList<>(1);
072    wals.add(disabled);
073    return wals;
074  }
075
076  @Override
077  public WAL getWAL(RegionInfo region) throws IOException {
078    return disabled;
079  }
080
081  @Override
082  public void close() throws IOException {
083    disabled.close();
084  }
085
086  @Override
087  public void shutdown() throws IOException {
088    disabled.shutdown();
089  }
090
091  private static class DisabledWAL implements WAL {
092    protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
093    protected final Path path;
094    protected final WALCoprocessorHost coprocessorHost;
095    protected final AtomicBoolean closed = new AtomicBoolean(false);
096
097    public DisabledWAL(final Path path, final Configuration conf,
098        final List<WALActionsListener> listeners) {
099      this.coprocessorHost = new WALCoprocessorHost(this, conf);
100      this.path = path;
101      if (null != listeners) {
102        for(WALActionsListener listener : listeners) {
103          registerWALActionsListener(listener);
104        }
105      }
106    }
107
108    @Override
109    public void registerWALActionsListener(final WALActionsListener listener) {
110      listeners.add(listener);
111    }
112
113    @Override
114    public boolean unregisterWALActionsListener(final WALActionsListener listener) {
115      return listeners.remove(listener);
116    }
117
118    @Override
119    public byte[][] rollWriter() {
120      if (!listeners.isEmpty()) {
121        for (WALActionsListener listener : listeners) {
122          listener.logRollRequested(false);
123        }
124        for (WALActionsListener listener : listeners) {
125          try {
126            listener.preLogRoll(path, path);
127          } catch (IOException exception) {
128            LOG.debug("Ignoring exception from listener.", exception);
129          }
130        }
131        for (WALActionsListener listener : listeners) {
132          try {
133            listener.postLogRoll(path, path);
134          } catch (IOException exception) {
135            LOG.debug("Ignoring exception from listener.", exception);
136          }
137        }
138      }
139      return null;
140    }
141
142    @Override
143    public byte[][] rollWriter(boolean force) {
144      return rollWriter();
145    }
146
147    @Override
148    public void shutdown() {
149      if(closed.compareAndSet(false, true)) {
150        if (!this.listeners.isEmpty()) {
151          for (WALActionsListener listener : this.listeners) {
152            listener.logCloseRequested();
153          }
154        }
155      }
156    }
157
158    @Override
159    public void close() {
160      shutdown();
161    }
162
163    @Override
164    public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
165      return append(info, key, edits, true);
166    }
167
168    @Override
169    public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
170      throws IOException {
171      return append(info, key, edits, false);
172    }
173
174    private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
175        throws IOException {
176      WriteEntry writeEntry = key.getMvcc().begin();
177      if (!edits.isReplay()) {
178        for (Cell cell : edits.getCells()) {
179          PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
180        }
181      }
182      key.setWriteEntry(writeEntry);
183      if (!this.listeners.isEmpty()) {
184        final long start = System.nanoTime();
185        long len = 0;
186        for (Cell cell : edits.getCells()) {
187          len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
188        }
189        final long elapsed = (System.nanoTime() - start) / 1000000L;
190        for (WALActionsListener listener : this.listeners) {
191          listener.postAppend(len, elapsed, key, edits);
192        }
193      }
194      return -1;
195    }
196
197    @Override
198    public void updateStore(byte[] encodedRegionName, byte[] familyName,
199        Long sequenceid, boolean onlyIfGreater) { return; }
200
201    @Override
202    public void sync() {
203      if (!this.listeners.isEmpty()) {
204        for (WALActionsListener listener : this.listeners) {
205          listener.postSync(0L, 0);
206        }
207      }
208    }
209
210    @Override
211    public void sync(long txid) {
212      sync();
213    }
214
215    @Override
216    public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
217        flushedFamilyNamesToSeq) {
218      return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
219    }
220
221    @Override
222    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
223      if (closed.get()) return null;
224      return HConstants.NO_SEQNUM;
225    }
226
227    @Override
228    public void completeCacheFlush(final byte[] encodedRegionName) {
229    }
230
231    @Override
232    public void abortCacheFlush(byte[] encodedRegionName) {
233    }
234
235    @Override
236    public WALCoprocessorHost getCoprocessorHost() {
237      return coprocessorHost;
238    }
239
240    @Override
241    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
242      return HConstants.NO_SEQNUM;
243    }
244
245    @Override
246    public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
247      return HConstants.NO_SEQNUM;
248    }
249
250    @Override
251    public String toString() {
252      return "WAL disabled.";
253    }
254
255    @Override
256    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
257      return OptionalLong.empty();
258    }
259  }
260
261  @Override
262  public long getNumLogFiles() {
263    return 0;
264  }
265
266  @Override
267  public long getLogFileSize() {
268    return 0;
269  }
270
271  @Override
272  public void addWALActionsListener(WALActionsListener listener) {
273    disabled.registerWALActionsListener(listener);
274  }
275}