001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
021import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
022import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
023import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
024import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
026import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
027import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
028
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.nio.ByteBuffer;
032import java.util.Collection;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedDeque;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.TimeUnit;
042import java.util.function.Supplier;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.crypto.Encryptor;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
047import org.apache.hadoop.hbase.util.CancelableProgressable;
048import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
049import org.apache.hadoop.hdfs.DFSClient;
050import org.apache.hadoop.hdfs.DistributedFileSystem;
051import org.apache.hadoop.hdfs.protocol.ClientProtocol;
052import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
053import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
054import org.apache.hadoop.hdfs.protocol.LocatedBlock;
055import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
056import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
057import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
058import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
059import org.apache.hadoop.util.DataChecksum;
060import org.apache.yetus.audience.InterfaceAudience;
061
062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
063import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
064import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
065import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
066import org.apache.hbase.thirdparty.io.netty.channel.Channel;
067import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
068import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
069import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
070import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
071import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
072import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
073import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
074
075/**
076 * An asynchronous HDFS output stream implementation which fans out data to datanode and only
077 * supports writing file with only one block.
078 * <p>
079 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
080 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
081 * method. And we place it here under io package because we want to make it independent of WAL
082 * implementation thus easier to move it to HDFS project finally.
083 * <p>
084 * Note that, although we support pipelined flush, i.e, write new data and then flush before the
085 * previous flush succeeds, the implementation is not thread safe, so you should not call its
086 * methods concurrently.
087 * <p>
088 * Advantages compare to DFSOutputStream:
089 * <ol>
090 * <li>The fan out mechanism. This will reduce the latency.</li>
091 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
092 * ASAP.</li>
093 * <li>We could benefit from netty's ByteBuf management mechanism.</li>
094 * </ol>
095 */
096@InterfaceAudience.Private
097public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
098
099  // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a
100  // smaller limit for data size.
101  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
102
103  private final Configuration conf;
104
105  private final DistributedFileSystem dfs;
106
107  private final DFSClient client;
108
109  private final ClientProtocol namenode;
110
111  private final String clientName;
112
113  private final String src;
114
115  private final long fileId;
116
117  private final ExtendedBlock block;
118
119  private final DatanodeInfo[] locations;
120
121  private final Encryptor encryptor;
122
123  private final List<Channel> datanodeList;
124
125  private final DataChecksum summer;
126
127  private final int maxDataLen;
128
129  private final ByteBufAllocator alloc;
130
131  private static final class Callback {
132
133    private final CompletableFuture<Long> future;
134
135    private final long ackedLength;
136
137    // should be backed by a thread safe collection
138    private final Set<ChannelId> unfinishedReplicas;
139
140    public Callback(CompletableFuture<Long> future, long ackedLength,
141        Collection<Channel> replicas) {
142      this.future = future;
143      this.ackedLength = ackedLength;
144      if (replicas.isEmpty()) {
145        this.unfinishedReplicas = Collections.emptySet();
146      } else {
147        this.unfinishedReplicas =
148          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
149        replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
150      }
151    }
152  }
153
154  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
155
156  private volatile long ackedBlockLength = 0L;
157
158  // this could be different from acked block length because a packet can not start at the middle of
159  // a chunk.
160  private long nextPacketOffsetInBlock = 0L;
161
162  // the length of the trailing partial chunk, this is because the packet start offset must be
163  // aligned with the length of checksum chunk so we need to resend the same data.
164  private int trailingPartialChunkLength = 0;
165
166  private long nextPacketSeqno = 0L;
167
168  private ByteBuf buf;
169
170  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
171
172  // State for connections to DN
173  private enum State {
174    STREAMING, CLOSING, BROKEN, CLOSED
175  }
176
177  private volatile State state;
178
179  // all lock-free to make it run faster
180  private void completed(Channel channel) {
181    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
182      Callback c = iter.next();
183      // if the current unfinished replicas does not contain us then it means that we have already
184      // acked this one, let's iterate to find the one we have not acked yet.
185      if (c.unfinishedReplicas.remove(channel.id())) {
186        if (c.unfinishedReplicas.isEmpty()) {
187          // we need to remove first before complete the future. It is possible that after we
188          // complete the future the upper layer will call close immediately before we remove the
189          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
190          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
191          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
192          // removing the entry returned by calling previous next, so if the entry has already been
193          // removed then it is a no-op, and for the assign, the values are the same so no problem.
194          iter.remove();
195          ackedBlockLength = c.ackedLength;
196          // the future.complete check is to confirm that we are the only one who grabbed the work,
197          // otherwise just give up and return.
198          if (c.future.complete(c.ackedLength)) {
199            // also wake up flush requests which have the same length.
200            while (iter.hasNext()) {
201              Callback maybeDummyCb = iter.next();
202              if (maybeDummyCb.ackedLength == c.ackedLength) {
203                iter.remove();
204                maybeDummyCb.future.complete(c.ackedLength);
205              } else {
206                break;
207              }
208            }
209          }
210        }
211        return;
212      }
213    }
214  }
215
216  // this usually does not happen which means it is not on the critical path so make it synchronized
217  // so that the implementation will not burn up our brain as there are multiple state changes and
218  // checks.
219  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
220    if (state == State.BROKEN || state == State.CLOSED) {
221      return;
222    }
223    if (state == State.CLOSING) {
224      Callback c = waitingAckQueue.peekFirst();
225      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
226        // nothing, the endBlock request has already finished.
227        return;
228      }
229    }
230    // disable further write, and fail all pending ack.
231    state = State.BROKEN;
232    Throwable error = errorSupplier.get();
233    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
234      Callback c = iter.next();
235      // find the first sync request which we have not acked yet and fail all the request after it.
236      if (!c.unfinishedReplicas.contains(channel.id())) {
237        continue;
238      }
239      for (;;) {
240        c.future.completeExceptionally(error);
241        if (!iter.hasNext()) {
242          break;
243        }
244        c = iter.next();
245      }
246      break;
247    }
248    datanodeList.forEach(ch -> ch.close());
249  }
250
251  @Sharable
252  private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
253
254    private final int timeoutMs;
255
256    public AckHandler(int timeoutMs) {
257      this.timeoutMs = timeoutMs;
258    }
259
260    @Override
261    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
262      Status reply = getStatus(ack);
263      if (reply != Status.SUCCESS) {
264        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
265          block + " from datanode " + ctx.channel().remoteAddress()));
266        return;
267      }
268      if (PipelineAck.isRestartOOBStatus(reply)) {
269        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
270          block + " from datanode " + ctx.channel().remoteAddress()));
271        return;
272      }
273      if (ack.getSeqno() == HEART_BEAT_SEQNO) {
274        return;
275      }
276      completed(ctx.channel());
277    }
278
279    @Override
280    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
281      if (state == State.CLOSED) {
282        return;
283      }
284      failed(ctx.channel(),
285        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
286    }
287
288    @Override
289    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
290      failed(ctx.channel(), () -> cause);
291    }
292
293    @Override
294    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
295      if (evt instanceof IdleStateEvent) {
296        IdleStateEvent e = (IdleStateEvent) evt;
297        if (e.state() == READER_IDLE) {
298          failed(ctx.channel(),
299            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
300        } else if (e.state() == WRITER_IDLE) {
301          PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
302          int len = heartbeat.getSerializedSize();
303          ByteBuf buf = alloc.buffer(len);
304          heartbeat.putInBuffer(buf.nioBuffer(0, len));
305          buf.writerIndex(len);
306          ctx.channel().writeAndFlush(buf);
307        }
308        return;
309      }
310      super.userEventTriggered(ctx, evt);
311    }
312  }
313
314  private void setupReceiver(int timeoutMs) {
315    AckHandler ackHandler = new AckHandler(timeoutMs);
316    for (Channel ch : datanodeList) {
317      ch.pipeline().addLast(
318        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
319        new ProtobufVarint32FrameDecoder(),
320        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
321      ch.config().setAutoRead(true);
322    }
323  }
324
325  FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
326      DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
327      LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
328      DataChecksum summer, ByteBufAllocator alloc) {
329    this.conf = conf;
330    this.dfs = dfs;
331    this.client = client;
332    this.namenode = namenode;
333    this.fileId = fileId;
334    this.clientName = clientName;
335    this.src = src;
336    this.block = locatedBlock.getBlock();
337    this.locations = locatedBlock.getLocations();
338    this.encryptor = encryptor;
339    this.datanodeList = datanodeList;
340    this.summer = summer;
341    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
342    this.alloc = alloc;
343    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
344    this.state = State.STREAMING;
345    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
346  }
347
348  @Override
349  public void writeInt(int i) {
350    buf.ensureWritable(4);
351    buf.writeInt(i);
352  }
353
354  @Override
355  public void write(ByteBuffer bb) {
356    buf.ensureWritable(bb.remaining());
357    buf.writeBytes(bb);
358  }
359
360  @Override
361  public void write(byte[] b) {
362    write(b, 0, b.length);
363  }
364
365  @Override
366  public void write(byte[] b, int off, int len) {
367    buf.ensureWritable(len);
368    buf.writeBytes(b, off, len);
369  }
370
371  @Override
372  public int buffered() {
373    return buf.readableBytes();
374  }
375
376  @Override
377  public DatanodeInfo[] getPipeline() {
378    return locations;
379  }
380
381  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
382      long nextPacketOffsetInBlock, boolean syncBlock) {
383    int dataLen = dataBuf.readableBytes();
384    int chunkLen = summer.getBytesPerChecksum();
385    int trailingPartialChunkLen = dataLen % chunkLen;
386    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
387    int checksumLen = numChecks * summer.getChecksumSize();
388    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
389    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
390    checksumBuf.writerIndex(checksumLen);
391    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
392        nextPacketSeqno, false, dataLen, syncBlock);
393    int headerLen = header.getSerializedSize();
394    ByteBuf headerBuf = alloc.buffer(headerLen);
395    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
396    headerBuf.writerIndex(headerLen);
397    Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
398    waitingAckQueue.addLast(c);
399    // recheck again after we pushed the callback to queue
400    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
401      future.completeExceptionally(new IOException("stream already broken"));
402      // it's the one we have just pushed or just a no-op
403      waitingAckQueue.removeFirst();
404      return;
405    }
406    // TODO: we should perhaps measure time taken per DN here;
407    //       we could collect statistics per DN, and/or exclude bad nodes in createOutput.
408    datanodeList.forEach(ch -> {
409      ch.write(headerBuf.retainedDuplicate());
410      ch.write(checksumBuf.retainedDuplicate());
411      ch.writeAndFlush(dataBuf.retainedDuplicate());
412    });
413    checksumBuf.release();
414    headerBuf.release();
415    dataBuf.release();
416    nextPacketSeqno++;
417  }
418
419  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
420    if (state != State.STREAMING) {
421      future.completeExceptionally(new IOException("stream already broken"));
422      return;
423    }
424    int dataLen = buf.readableBytes();
425    if (dataLen == trailingPartialChunkLength) {
426      // no new data
427      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
428      Callback lastFlush = waitingAckQueue.peekLast();
429      if (lastFlush != null) {
430        Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
431        waitingAckQueue.addLast(c);
432        // recheck here if we have already removed the previous callback from the queue
433        if (waitingAckQueue.peekFirst() == c) {
434          // all previous callbacks have been removed
435          // notice that this does mean we will always win here because the background thread may
436          // have already started to mark the future here as completed in the completed or failed
437          // methods but haven't removed it from the queue yet. That's also why the removeFirst
438          // call below may be a no-op.
439          if (state != State.STREAMING) {
440            future.completeExceptionally(new IOException("stream already broken"));
441          } else {
442            future.complete(lengthAfterFlush);
443          }
444          // it's the one we have just pushed or just a no-op
445          waitingAckQueue.removeFirst();
446        }
447      } else {
448        // we must have acked all the data so the ackedBlockLength must be same with
449        // lengthAfterFlush
450        future.complete(lengthAfterFlush);
451      }
452      return;
453    }
454
455    if (encryptor != null) {
456      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
457      buf.readBytes(encryptBuf, trailingPartialChunkLength);
458      int toEncryptLength = dataLen - trailingPartialChunkLength;
459      try {
460        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
461          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
462      } catch (IOException e) {
463        encryptBuf.release();
464        future.completeExceptionally(e);
465        return;
466      }
467      encryptBuf.writerIndex(dataLen);
468      buf.release();
469      buf = encryptBuf;
470    }
471
472    if (dataLen > maxDataLen) {
473      // We need to write out the data by multiple packets as the max packet allowed is 16M.
474      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
475      for (int remaining = dataLen;;) {
476        if (remaining < maxDataLen) {
477          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
478            syncBlock);
479          break;
480        } else {
481          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
482            nextSubPacketOffsetInBlock, syncBlock);
483          remaining -= maxDataLen;
484          nextSubPacketOffsetInBlock += maxDataLen;
485        }
486      }
487    } else {
488      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
489    }
490    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
491    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
492        .ensureWritable(trailingPartialChunkLength);
493    if (trailingPartialChunkLength != 0) {
494      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
495        trailingPartialChunkLength);
496    }
497    buf.release();
498    this.buf = newBuf;
499    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
500  }
501
502  /**
503   * Flush the buffer out to datanodes.
504   * @param syncBlock will call hsync if true, otherwise hflush.
505   * @return A CompletableFuture that hold the acked length after flushing.
506   */
507  @Override
508  public CompletableFuture<Long> flush(boolean syncBlock) {
509    CompletableFuture<Long> future = new CompletableFuture<>();
510    flush0(future, syncBlock);
511    return future;
512  }
513
514  private void endBlock() throws IOException {
515    Preconditions.checkState(waitingAckQueue.isEmpty(),
516      "should call flush first before calling close");
517    if (state != State.STREAMING) {
518      throw new IOException("stream already broken");
519    }
520    state = State.CLOSING;
521    long finalizedLength = ackedBlockLength;
522    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
523    buf.release();
524    buf = null;
525    int headerLen = header.getSerializedSize();
526    ByteBuf headerBuf = alloc.directBuffer(headerLen);
527    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
528    headerBuf.writerIndex(headerLen);
529    CompletableFuture<Long> future = new CompletableFuture<>();
530    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
531    datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
532    headerBuf.release();
533    try {
534      future.get();
535    } catch (InterruptedException e) {
536      throw (IOException) new InterruptedIOException().initCause(e);
537    } catch (ExecutionException e) {
538      Throwable cause = e.getCause();
539      Throwables.propagateIfPossible(cause, IOException.class);
540      throw new IOException(cause);
541    }
542  }
543
544  /**
545   * The close method when error occurred. Now we just call recoverFileLease.
546   */
547  @Override
548  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
549    if (buf != null) {
550      buf.release();
551      buf = null;
552    }
553    datanodeList.forEach(ch -> ch.close());
554    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
555    endFileLease(client, fileId);
556    RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
557      reporter == null ? new CancelOnClose(client) : reporter);
558  }
559
560  /**
561   * End the current block and complete file at namenode. You should call
562   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
563   */
564  @Override
565  public void close() throws IOException {
566    endBlock();
567    state = State.CLOSED;
568    datanodeList.forEach(ch -> ch.close());
569    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
570    block.setNumBytes(ackedBlockLength);
571    completeFile(client, namenode, src, clientName, block, fileId);
572  }
573
574  @Override
575  public boolean isBroken() {
576    return state == State.BROKEN;
577  }
578
579  @Override
580  public long getSyncedLength() {
581    return this.ackedBlockLength;
582  }
583}