001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.OptionalLong; 025import java.util.Set; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.PrivateCellUtil; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; 036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 037import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043// imports for things that haven't moved from regionserver.wal yet. 044 045/** 046 * No-op implementation of {@link WALProvider} used when the WAL is disabled. 047 * 048 * Should only be used when severe data loss is acceptable. 049 * 050 */ 051@InterfaceAudience.Private 052class DisabledWALProvider implements WALProvider { 053 054 private static final Logger LOG = LoggerFactory.getLogger(DisabledWALProvider.class); 055 056 WAL disabled; 057 058 @Override 059 public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { 060 if (null != disabled) { 061 throw new IllegalStateException("WALProvider.init should only be called once."); 062 } 063 if (null == providerId) { 064 providerId = "defaultDisabled"; 065 } 066 disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null); 067 } 068 069 @Override 070 public List<WAL> getWALs() { 071 List<WAL> wals = new ArrayList<>(1); 072 wals.add(disabled); 073 return wals; 074 } 075 076 @Override 077 public WAL getWAL(RegionInfo region) throws IOException { 078 return disabled; 079 } 080 081 @Override 082 public void close() throws IOException { 083 disabled.close(); 084 } 085 086 @Override 087 public void shutdown() throws IOException { 088 disabled.shutdown(); 089 } 090 091 private static class DisabledWAL implements WAL { 092 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 093 protected final Path path; 094 protected final WALCoprocessorHost coprocessorHost; 095 protected final AtomicBoolean closed = new AtomicBoolean(false); 096 097 public DisabledWAL(final Path path, final Configuration conf, 098 final List<WALActionsListener> listeners) { 099 this.coprocessorHost = new WALCoprocessorHost(this, conf); 100 this.path = path; 101 if (null != listeners) { 102 for(WALActionsListener listener : listeners) { 103 registerWALActionsListener(listener); 104 } 105 } 106 } 107 108 @Override 109 public void registerWALActionsListener(final WALActionsListener listener) { 110 listeners.add(listener); 111 } 112 113 @Override 114 public boolean unregisterWALActionsListener(final WALActionsListener listener) { 115 return listeners.remove(listener); 116 } 117 118 @Override 119 public byte[][] rollWriter() { 120 if (!listeners.isEmpty()) { 121 for (WALActionsListener listener : listeners) { 122 listener.logRollRequested(false); 123 } 124 for (WALActionsListener listener : listeners) { 125 try { 126 listener.preLogRoll(path, path); 127 } catch (IOException exception) { 128 LOG.debug("Ignoring exception from listener.", exception); 129 } 130 } 131 for (WALActionsListener listener : listeners) { 132 try { 133 listener.postLogRoll(path, path); 134 } catch (IOException exception) { 135 LOG.debug("Ignoring exception from listener.", exception); 136 } 137 } 138 } 139 return null; 140 } 141 142 @Override 143 public byte[][] rollWriter(boolean force) { 144 return rollWriter(); 145 } 146 147 @Override 148 public void shutdown() { 149 if(closed.compareAndSet(false, true)) { 150 if (!this.listeners.isEmpty()) { 151 for (WALActionsListener listener : this.listeners) { 152 listener.logCloseRequested(); 153 } 154 } 155 } 156 } 157 158 @Override 159 public void close() { 160 shutdown(); 161 } 162 163 @Override 164 public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 165 throws IOException { 166 WriteEntry writeEntry = key.getMvcc().begin(); 167 if (!edits.isReplay()) { 168 for (Cell cell : edits.getCells()) { 169 PrivateCellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); 170 } 171 } 172 key.setWriteEntry(writeEntry); 173 if (!this.listeners.isEmpty()) { 174 final long start = System.nanoTime(); 175 long len = 0; 176 for (Cell cell : edits.getCells()) { 177 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 178 } 179 final long elapsed = (System.nanoTime() - start) / 1000000L; 180 for (WALActionsListener listener : this.listeners) { 181 listener.postAppend(len, elapsed, key, edits); 182 } 183 } 184 return -1; 185 } 186 187 @Override 188 public void updateStore(byte[] encodedRegionName, byte[] familyName, 189 Long sequenceid, boolean onlyIfGreater) { return; } 190 191 @Override 192 public void sync() { 193 if (!this.listeners.isEmpty()) { 194 for (WALActionsListener listener : this.listeners) { 195 listener.postSync(0L, 0); 196 } 197 } 198 } 199 200 @Override 201 public void sync(long txid) { 202 sync(); 203 } 204 205 @Override 206 public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long> 207 flushedFamilyNamesToSeq) { 208 return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet()); 209 } 210 211 @Override 212 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) { 213 if (closed.get()) return null; 214 return HConstants.NO_SEQNUM; 215 } 216 217 @Override 218 public void completeCacheFlush(final byte[] encodedRegionName) { 219 } 220 221 @Override 222 public void abortCacheFlush(byte[] encodedRegionName) { 223 } 224 225 @Override 226 public WALCoprocessorHost getCoprocessorHost() { 227 return coprocessorHost; 228 } 229 230 @Override 231 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 232 return HConstants.NO_SEQNUM; 233 } 234 235 @Override 236 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 237 return HConstants.NO_SEQNUM; 238 } 239 240 @Override 241 public String toString() { 242 return "WAL disabled."; 243 } 244 245 @Override 246 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 247 return OptionalLong.empty(); 248 } 249 } 250 251 @Override 252 public long getNumLogFiles() { 253 return 0; 254 } 255 256 @Override 257 public long getLogFileSize() { 258 return 0; 259 } 260 261 @Override 262 public void addWALActionsListener(WALActionsListener listener) { 263 disabled.registerWALActionsListener(listener); 264 } 265}