001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
031
032/**
033 * Helper class that loads the procedures stored in a WAL.
034 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
035 *             use the new region based procedure store.
036 */
037@Deprecated
038@InterfaceAudience.Private
039class ProcedureWALFormatReader {
040  private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
041
042  /**
043   * We will use the localProcedureMap to track the active procedures for the current proc wal file,
044   * and when we finished reading one proc wal file, we will merge he localProcedureMap to the
045   * procedureMap, which tracks the global active procedures.
046   * <p/>
047   * See the comments of {@link WALProcedureMap} for more details.
048   * <p/>
049   * After reading all the proc wal files, we will use the procedures in the procedureMap to build a
050   * {@link ProcedureTree}, and then give the result to the upper layer. See the comments of
051   * {@link ProcedureTree} and the code in {@link #finish()} for more details.
052   */
053  private final WALProcedureMap localProcedureMap = new WALProcedureMap();
054  private final WALProcedureMap procedureMap = new WALProcedureMap();
055
056  private final ProcedureWALFormat.Loader loader;
057
058  /**
059   * Global tracker that will be used by the WALProcedureStore after load. If the last WAL was
060   * closed cleanly we already have a full tracker ready to be used. If the last WAL was truncated
061   * (e.g. master killed) the tracker will be empty and the 'partial' flag will be set. In this
062   * case, on WAL replay we are going to rebuild the tracker.
063   */
064  private final ProcedureStoreTracker tracker;
065
066  /**
067   * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build
068   * the list of procedures modified in that WAL because we need it for log cleaning purposes. If
069   * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see
070   * {@link WALProcedureStore#removeInactiveLogs()}).
071   * <p/>
072   * Notice that, the deleted part for this tracker will not be global valid as we can only count
073   * the deletes in the current file, but it is not big problem as finally, the above tracker will
074   * have the global state of deleted, and it will also be used to build the cleanup tracker.
075   */
076  private ProcedureStoreTracker localTracker;
077
078  private long maxProcId = 0;
079
080  public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
081    ProcedureWALFormat.Loader loader) {
082    this.tracker = tracker;
083    this.loader = loader;
084  }
085
086  public void read(ProcedureWALFile log) throws IOException {
087    localTracker = log.getTracker();
088    if (localTracker.isPartial()) {
089      LOG.info("Rebuilding tracker for {}", log);
090    }
091
092    long count = 0;
093    FSDataInputStream stream = log.getStream();
094    try {
095      boolean hasMore = true;
096      while (hasMore) {
097        ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
098        if (entry == null) {
099          LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log);
100          break;
101        }
102        count++;
103        switch (entry.getType()) {
104          case PROCEDURE_WAL_INIT:
105            readInitEntry(entry);
106            break;
107          case PROCEDURE_WAL_INSERT:
108            readInsertEntry(entry);
109            break;
110          case PROCEDURE_WAL_UPDATE:
111          case PROCEDURE_WAL_COMPACT:
112            readUpdateEntry(entry);
113            break;
114          case PROCEDURE_WAL_DELETE:
115            readDeleteEntry(entry);
116            break;
117          case PROCEDURE_WAL_EOF:
118            hasMore = false;
119            break;
120          default:
121            throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
122        }
123      }
124      LOG.info("Read {} entries in {}", count, log);
125    } catch (InvalidProtocolBufferException e) {
126      LOG.error("While reading entry #{} in {}", count, log, e);
127      loader.markCorruptedWAL(log, e);
128    }
129
130    if (!localProcedureMap.isEmpty()) {
131      log.setProcIds(localProcedureMap.getMinModifiedProcId(),
132        localProcedureMap.getMaxModifiedProcId());
133      if (localTracker.isPartial()) {
134        localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
135          localProcedureMap.getMaxModifiedProcId());
136      }
137      procedureMap.merge(localProcedureMap);
138    }
139    // Do not reset the partial flag for local tracker, as here the local tracker only know the
140    // procedures which are modified in this file.
141  }
142
143  public void finish() throws IOException {
144    // notify the loader about the max proc ID
145    loader.setMaxProcId(maxProcId);
146
147    // build the procedure execution tree. When building we will verify that whether a procedure is
148    // valid.
149    ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures());
150    loader.load(tree.getValidProcs());
151    loader.handleCorrupted(tree.getCorruptedProcs());
152  }
153
154  private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
155    if (tracker.isPartial()) {
156      tracker.setDeleted(procId, true);
157    }
158  }
159
160  private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) {
161    if (tracker.isPartial()) {
162      tracker.insert(proc.getProcId());
163    }
164  }
165
166  private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) {
167    maxProcId = Math.max(maxProcId, proc.getProcId());
168    if (isRequired(proc.getProcId())) {
169      LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId());
170      localProcedureMap.add(proc);
171      insertIfPartial(tracker, proc);
172    }
173    insertIfPartial(localTracker, proc);
174  }
175
176  private void readInitEntry(ProcedureWALEntry entry) {
177    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
178    loadProcedure(entry, entry.getProcedure(0));
179  }
180
181  private void readInsertEntry(ProcedureWALEntry entry) {
182    assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
183    loadProcedure(entry, entry.getProcedure(0));
184    for (int i = 1; i < entry.getProcedureCount(); ++i) {
185      loadProcedure(entry, entry.getProcedure(i));
186    }
187  }
188
189  private void readUpdateEntry(ProcedureWALEntry entry) {
190    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
191    loadProcedure(entry, entry.getProcedure(0));
192  }
193
194  private void readDeleteEntry(ProcedureWALEntry entry) {
195    assert entry.hasProcId() : "expected ProcID";
196
197    if (entry.getChildIdCount() > 0) {
198      assert entry.getProcedureCount() == 1 : "Expected only one procedure";
199
200      // update the parent procedure
201      loadProcedure(entry, entry.getProcedure(0));
202
203      // remove the child procedures of entry.getProcId()
204      for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) {
205        deleteEntry(entry.getChildId(i));
206      }
207    } else {
208      assert entry.getProcedureCount() == 0 : "Expected no procedures";
209
210      // delete the procedure
211      deleteEntry(entry.getProcId());
212    }
213  }
214
215  private void deleteEntry(final long procId) {
216    LOG.trace("delete entry {}", procId);
217    maxProcId = Math.max(maxProcId, procId);
218    localProcedureMap.remove(procId);
219    assert !procedureMap.contains(procId);
220    setDeletedIfPartial(tracker, procId);
221    setDeletedIfPartial(localTracker, procId);
222  }
223
224  private boolean isDeleted(long procId) {
225    return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
226  }
227
228  private boolean isRequired(long procId) {
229    return !isDeleted(procId) && !procedureMap.contains(procId);
230  }
231}