001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.CompletionException; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.regex.Matcher; 033import java.util.regex.Pattern; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}. 043 */ 044@InterfaceAudience.Private 045class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator { 046 047 private static final Logger LOG = 048 LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class); 049 050 private final AsyncBufferedMutator mutator; 051 052 private final ExceptionListener listener; 053 054 private final Set<CompletableFuture<Void>> futures = ConcurrentHashMap.newKeySet(); 055 056 private final AtomicLong bufferedSize = new AtomicLong(0); 057 058 private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors = 059 new ConcurrentLinkedQueue<>(); 060 061 BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator, 062 ExceptionListener listener) { 063 this.mutator = mutator; 064 this.listener = listener; 065 } 066 067 @Override 068 public TableName getName() { 069 return mutator.getName(); 070 } 071 072 @Override 073 public Configuration getConfiguration() { 074 return mutator.getConfiguration(); 075 } 076 077 @Override 078 public void mutate(Mutation mutation) throws IOException { 079 mutate(Collections.singletonList(mutation)); 080 } 081 082 private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed"); 083 084 // not always work, so may return an empty string 085 private String getHostnameAndPort(Throwable error) { 086 Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage()); 087 if (matcher.matches()) { 088 return matcher.group(1); 089 } else { 090 return ""; 091 } 092 } 093 094 private RetriesExhaustedWithDetailsException makeError() { 095 List<Row> rows = new ArrayList<>(); 096 List<Throwable> throwables = new ArrayList<>(); 097 List<String> hostnameAndPorts = new ArrayList<>(); 098 for (;;) { 099 Pair<Mutation, Throwable> pair = errors.poll(); 100 if (pair == null) { 101 break; 102 } 103 rows.add(pair.getFirst()); 104 throwables.add(pair.getSecond()); 105 hostnameAndPorts.add(getHostnameAndPort(pair.getSecond())); 106 } 107 return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts); 108 } 109 110 private void internalFlush() throws RetriesExhaustedWithDetailsException { 111 // should get the future array before calling mutator.flush, otherwise we may hit an infinite 112 // wait, since someone may add new future to the map after we calling the flush. 113 CompletableFuture<?>[] toWait = futures.toArray(new CompletableFuture<?>[0]); 114 mutator.flush(); 115 try { 116 CompletableFuture.allOf(toWait).join(); 117 } catch (CompletionException e) { 118 // just ignore, we will record the actual error in the errors field 119 LOG.debug("Flush failed, you should get an exception thrown to your code", e); 120 } 121 if (!errors.isEmpty()) { 122 RetriesExhaustedWithDetailsException error = makeError(); 123 listener.onException(error, this); 124 } 125 } 126 127 @Override 128 public void mutate(List<? extends Mutation> mutations) throws IOException { 129 List<CompletableFuture<Void>> fs = mutator.mutate(mutations); 130 for (int i = 0, n = fs.size(); i < n; i++) { 131 CompletableFuture<Void> toComplete = new CompletableFuture<>(); 132 futures.add(toComplete); 133 Mutation mutation = mutations.get(i); 134 long heapSize = mutation.heapSize(); 135 bufferedSize.addAndGet(heapSize); 136 addListener(fs.get(i), (r, e) -> { 137 futures.remove(toComplete); 138 bufferedSize.addAndGet(-heapSize); 139 if (e != null) { 140 errors.add(Pair.newPair(mutation, e)); 141 toComplete.completeExceptionally(e); 142 } else { 143 toComplete.complete(r); 144 } 145 }); 146 } 147 synchronized (this) { 148 if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) { 149 // We have too many mutations which are not completed yet, let's call a flush to release the 150 // memory to prevent OOM 151 // We use buffer size * 2 is because that, the async buffered mutator will flush 152 // automatically when the write buffer size limit is reached, so usually we do not need to 153 // call flush explicitly if the buffered size is only a little larger than the buffer size 154 // limit. But if the buffered size is too large(2 times of the buffer size), we still need 155 // to block here to prevent OOM. 156 internalFlush(); 157 } else if (!errors.isEmpty()) { 158 RetriesExhaustedWithDetailsException error = makeError(); 159 listener.onException(error, this); 160 } 161 } 162 } 163 164 @Override 165 public synchronized void close() throws IOException { 166 internalFlush(); 167 mutator.close(); 168 } 169 170 @Override 171 public synchronized void flush() throws IOException { 172 internalFlush(); 173 } 174 175 @Override 176 public long getWriteBufferSize() { 177 return mutator.getWriteBufferSize(); 178 } 179 180 @Override 181 public void setRpcTimeout(int timeout) { 182 // no effect 183 } 184 185 @Override 186 public void setOperationTimeout(int timeout) { 187 // no effect 188 } 189}