001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CountDownLatch; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; 028import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 029import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 030import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 031 032import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 033import org.apache.hbase.thirdparty.io.netty.channel.Channel; 034import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 035 036class DualAsyncFSWALForTest extends DualAsyncFSWAL { 037 038 private boolean localBroken; 039 040 private boolean remoteBroken; 041 042 private CountDownLatch arrive; 043 044 private CountDownLatch resume; 045 046 private final class MyCombinedAsyncWriter implements AsyncWriter { 047 048 private final AsyncWriter localWriter; 049 050 private final AsyncWriter remoteWriter; 051 052 public MyCombinedAsyncWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) { 053 this.localWriter = localWriter; 054 this.remoteWriter = remoteWriter; 055 } 056 057 @Override 058 public long getLength() { 059 return localWriter.getLength(); 060 } 061 062 @Override 063 public long getSyncedLength() { 064 return this.localWriter.getSyncedLength(); 065 } 066 067 @Override 068 public void close() throws IOException { 069 Closeables.close(localWriter, true); 070 Closeables.close(remoteWriter, true); 071 } 072 073 @Override 074 public CompletableFuture<Long> sync(boolean forceSync) { 075 CompletableFuture<Long> localFuture; 076 CompletableFuture<Long> remoteFuture; 077 if (!localBroken) { 078 localFuture = localWriter.sync(forceSync); 079 } else { 080 localFuture = new CompletableFuture<>(); 081 localFuture.completeExceptionally(new IOException("Inject error")); 082 } 083 if (!remoteBroken) { 084 remoteFuture = remoteWriter.sync(forceSync); 085 } else { 086 remoteFuture = new CompletableFuture<>(); 087 remoteFuture.completeExceptionally(new IOException("Inject error")); 088 } 089 return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> { 090 return localFuture.getNow(0L); 091 }); 092 } 093 094 @Override 095 public void append(Entry entry) { 096 if (!localBroken) { 097 localWriter.append(entry); 098 } 099 if (!remoteBroken) { 100 remoteWriter.append(entry); 101 } 102 } 103 } 104 105 public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir, 106 String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, 107 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 108 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 109 super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists, 110 prefix, suffix, eventLoopGroup, channelClass); 111 } 112 113 @Override 114 protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter, 115 AsyncWriter remoteWriter) { 116 return new MyCombinedAsyncWriter(localWriter, remoteWriter); 117 } 118 119 @Override 120 protected AsyncWriter createWriterInstance(Path path) throws IOException { 121 if (arrive != null) { 122 arrive.countDown(); 123 try { 124 resume.await(); 125 } catch (InterruptedException e) { 126 } 127 } 128 if (localBroken || remoteBroken) { 129 throw new IOException("WAL broken"); 130 } 131 return super.createWriterInstance(path); 132 } 133 134 public void setLocalBroken() { 135 this.localBroken = true; 136 } 137 138 public void setRemoteBroken() { 139 this.remoteBroken = true; 140 } 141 142 public void suspendLogRoll() { 143 arrive = new CountDownLatch(1); 144 resume = new CountDownLatch(1); 145 } 146 147 public void waitUntilArrive() throws InterruptedException { 148 arrive.await(); 149 } 150 151 public void resumeLogRoll() { 152 resume.countDown(); 153 } 154}