001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.OptionalLong; 025import java.util.Set; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.atomic.AtomicBoolean; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.PrivateCellUtil; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; 035import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 036import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 037import org.apache.hadoop.hbase.util.CommonFSUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042// imports for things that haven't moved from regionserver.wal yet. 043 044/** 045 * No-op implementation of {@link WALProvider} used when the WAL is disabled. 046 * 047 * Should only be used when severe data loss is acceptable. 048 * 049 */ 050@InterfaceAudience.Private 051class DisabledWALProvider implements WALProvider { 052 053 private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class); 054 055 WAL disabled; 056 057 @Override 058 public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { 059 if (null != disabled) { 060 throw new IllegalStateException("WALProvider.init should only be called once."); 061 } 062 if (null == providerId) { 063 providerId = "defaultDisabled"; 064 } 065 disabled = new DisabledWAL(new Path(CommonFSUtils.getWALRootDir(conf), providerId), conf, null); 066 } 067 068 @Override 069 public List<WAL> getWALs() { 070 List<WAL> wals = new ArrayList<>(1); 071 wals.add(disabled); 072 return wals; 073 } 074 075 @Override 076 public WAL getWAL(RegionInfo region) throws IOException { 077 return disabled; 078 } 079 080 @Override 081 public void close() throws IOException { 082 disabled.close(); 083 } 084 085 @Override 086 public void shutdown() throws IOException { 087 disabled.shutdown(); 088 } 089 090 private static class DisabledWAL implements WAL { 091 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 092 protected final Path path; 093 protected final WALCoprocessorHost coprocessorHost; 094 protected final AtomicBoolean closed = new AtomicBoolean(false); 095 096 public DisabledWAL(final Path path, final Configuration conf, 097 final List<WALActionsListener> listeners) { 098 this.coprocessorHost = new WALCoprocessorHost(this, conf); 099 this.path = path; 100 if (null != listeners) { 101 for(WALActionsListener listener : listeners) { 102 registerWALActionsListener(listener); 103 } 104 } 105 } 106 107 @Override 108 public void registerWALActionsListener(final WALActionsListener listener) { 109 listeners.add(listener); 110 } 111 112 @Override 113 public boolean unregisterWALActionsListener(final WALActionsListener listener) { 114 return listeners.remove(listener); 115 } 116 117 @Override 118 public byte[][] rollWriter() { 119 if (!listeners.isEmpty()) { 120 for (WALActionsListener listener : listeners) { 121 listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR); 122 } 123 for (WALActionsListener listener : listeners) { 124 try { 125 listener.preLogRoll(path, path); 126 } catch (IOException exception) { 127 LOG.debug("Ignoring exception from listener.", exception); 128 } 129 } 130 for (WALActionsListener listener : listeners) { 131 try { 132 listener.postLogRoll(path, path); 133 } catch (IOException exception) { 134 LOG.debug("Ignoring exception from listener.", exception); 135 } 136 } 137 } 138 return null; 139 } 140 141 @Override 142 public byte[][] rollWriter(boolean force) { 143 return rollWriter(); 144 } 145 146 @Override 147 public void shutdown() { 148 if(closed.compareAndSet(false, true)) { 149 if (!this.listeners.isEmpty()) { 150 for (WALActionsListener listener : this.listeners) { 151 listener.logCloseRequested(); 152 } 153 } 154 } 155 } 156 157 @Override 158 public void close() { 159 shutdown(); 160 } 161 162 @Override 163 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 164 return append(info, key, edits, true); 165 } 166 167 @Override 168 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) 169 throws IOException { 170 return append(info, key, edits, false); 171 } 172 173 private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 174 throws IOException { 175 WriteEntry writeEntry = key.getMvcc().begin(); 176 if (!edits.isReplay()) { 177 for (Cell cell : edits.getCells()) { 178 PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); 179 } 180 } 181 key.setWriteEntry(writeEntry); 182 if (!this.listeners.isEmpty()) { 183 final long start = System.nanoTime(); 184 long len = 0; 185 for (Cell cell : edits.getCells()) { 186 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 187 } 188 final long elapsed = (System.nanoTime() - start) / 1000000L; 189 for (WALActionsListener listener : this.listeners) { 190 listener.postAppend(len, elapsed, key, edits); 191 } 192 } 193 return -1; 194 } 195 196 @Override 197 public void updateStore(byte[] encodedRegionName, byte[] familyName, 198 Long sequenceid, boolean onlyIfGreater) { return; } 199 200 @Override 201 public void sync() { 202 if (!this.listeners.isEmpty()) { 203 for (WALActionsListener listener : this.listeners) { 204 listener.postSync(0L, 0); 205 } 206 } 207 } 208 209 @Override 210 public void sync(long txid) { 211 sync(); 212 } 213 214 @Override 215 public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long> 216 flushedFamilyNamesToSeq) { 217 return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet()); 218 } 219 220 @Override 221 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) { 222 if (closed.get()) return null; 223 return HConstants.NO_SEQNUM; 224 } 225 226 @Override 227 public void completeCacheFlush(final byte[] encodedRegionName) { 228 } 229 230 @Override 231 public void abortCacheFlush(byte[] encodedRegionName) { 232 } 233 234 @Override 235 public WALCoprocessorHost getCoprocessorHost() { 236 return coprocessorHost; 237 } 238 239 @Override 240 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 241 return HConstants.NO_SEQNUM; 242 } 243 244 @Override 245 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 246 return HConstants.NO_SEQNUM; 247 } 248 249 @Override 250 public String toString() { 251 return "WAL disabled."; 252 } 253 254 @Override 255 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 256 return OptionalLong.empty(); 257 } 258 } 259 260 @Override 261 public long getNumLogFiles() { 262 return 0; 263 } 264 265 @Override 266 public long getLogFileSize() { 267 return 0; 268 } 269 270 @Override 271 public void addWALActionsListener(WALActionsListener listener) { 272 disabled.registerWALActionsListener(listener); 273 } 274}