/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.storage.internals.log;

import java.io.File;
import java.io.IOException;
import java.security.DigestException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.config.BrokerReconfigurable;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.storage.internals.log.Cleaner;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.CleanerStats;
import org.apache.kafka.storage.internals.log.LogCleanerManager;
import org.apache.kafka.storage.internals.log.LogCleaningAbortedException;
import org.apache.kafka.storage.internals.log.LogCleaningException;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogToClean;
import org.apache.kafka.storage.internals.log.PreCleanStats;
import org.apache.kafka.storage.internals.log.SkimpyOffsetMap;
import org.apache.kafka.storage.internals.log.ThreadShutdownException;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.internals.utils.Throttler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogCleaner
implements BrokerReconfigurable {
    private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
    public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of("log.cleaner.threads", "log.cleaner.dedupe.buffer.size", "log.cleaner.io.buffer.load.factor", "log.cleaner.io.buffer.size", "message.max.bytes", "log.cleaner.io.max.bytes.per.second", "log.cleaner.backoff.ms");
    public static final String MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME = "max-buffer-utilization-percent";
    public static final String MAX_CLEAN_TIME_METRIC_NAME = "max-clean-time-secs";
    public static final String MAX_COMPACTION_DELAY_METRICS_NAME = "max-compaction-delay-secs";
    private static final String CLEANER_RECOPY_PERCENT_METRIC_NAME = "cleaner-recopy-percent";
    private static final String DEAD_THREAD_COUNT_METRIC_NAME = "DeadThreadCount";
    public static final Set<String> METRIC_NAMES = Set.of("max-buffer-utilization-percent", "cleaner-recopy-percent", "max-clean-time-secs", "max-compaction-delay-secs", "DeadThreadCount");
    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "LogCleaner");
    private final LogCleanerManager cleanerManager;
    private final Throttler throttler;
    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
    private final LogDirFailureChannel logDirFailureChannel;
    private final Time time;
    private final List<CleanerThread> cleaners = new ArrayList<CleanerThread>();
    private volatile CleanerConfig config;

    public LogCleaner(CleanerConfig initialConfig, List<File> logDirs, ConcurrentMap<TopicPartition, UnifiedLog> logs, LogDirFailureChannel logDirFailureChannel, Time time) {
        this.config = initialConfig;
        this.logs = logs;
        this.logDirFailureChannel = logDirFailureChannel;
        this.cleanerManager = new LogCleanerManager(logDirs, logs, this.logDirFailureChannel);
        this.time = time;
        this.throttler = new Throttler(this.config.maxIoBytesPerSecond, 300L, "cleaner-io", "bytes", this.time);
        this.registerMetrics();
    }

    private void registerMetrics() {
        this.metricsGroup.newGauge(MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME, () -> (int)(this.maxOverCleanerThreads(t -> t.lastStats.bufferUtilization) * 100.0));
        this.metricsGroup.newGauge(CLEANER_RECOPY_PERCENT_METRIC_NAME, () -> {
            List<CleanerStats> stats = this.cleaners.stream().map(t -> t.lastStats).toList();
            double recopyRate = (double)stats.stream().mapToLong(stat -> stat.bytesWritten).sum() / (double)Math.max(stats.stream().mapToLong(stat -> stat.bytesRead).sum(), 1L);
            return (int)(100.0 * recopyRate);
        });
        this.metricsGroup.newGauge(MAX_CLEAN_TIME_METRIC_NAME, () -> (int)this.maxOverCleanerThreads(t -> t.lastStats.elapsedSecs()));
        this.metricsGroup.newGauge(MAX_COMPACTION_DELAY_METRICS_NAME, () -> (int)(this.maxOverCleanerThreads(t -> t.lastPreCleanStats.maxCompactionDelayMs()) / 1000.0));
        this.metricsGroup.newGauge(DEAD_THREAD_COUNT_METRIC_NAME, this::deadThreadCount);
    }

    public void startup() {
        if (this.config.numThreads < 1) {
            LOG.warn("Invalid value for `log.cleaner.threads`: must be >= 1 starting from Kafka 5.0 since log cleaner is always enabled.");
        }
        LOG.info("Starting the log cleaner");
        IntStream.range(0, this.config.numThreads).forEach(i -> {
            try {
                CleanerThread cleaner = new CleanerThread(i);
                this.cleaners.add(cleaner);
                cleaner.start();
            }
            catch (NoSuchAlgorithmException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void shutdownCleaners() {
        LOG.info("Shutting down the log cleaner.");
        this.cleaners.forEach(thread -> {
            try {
                thread.shutdown();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        this.cleaners.clear();
    }

    public void shutdown() {
        try {
            this.shutdownCleaners();
        }
        finally {
            this.removeMetrics();
        }
    }

    public void removeMetrics() {
        METRIC_NAMES.forEach(arg_0 -> ((KafkaMetricsGroup)this.metricsGroup).removeMetric(arg_0));
        this.cleanerManager.removeMetrics();
    }

    public Set<String> reconfigurableConfigs() {
        return RECONFIGURABLE_CONFIGS;
    }

    public void validateReconfiguration(AbstractConfig newConfig) {
        int numThreads = new CleanerConfig((AbstractConfig)newConfig).numThreads;
        int currentThreads = this.config.numThreads;
        if (numThreads < 1) {
            throw new ConfigException("Log cleaner threads should be at least 1");
        }
        if (numThreads < currentThreads / 2) {
            throw new ConfigException("Log cleaner threads cannot be reduced to less than half the current value " + currentThreads);
        }
        if (numThreads > currentThreads * 2) {
            throw new ConfigException("Log cleaner threads cannot be increased to more than double the current value " + currentThreads);
        }
    }

    public void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig) {
        this.config = new CleanerConfig(newConfig);
        double maxIoBytesPerSecond = this.config.maxIoBytesPerSecond;
        if (maxIoBytesPerSecond != oldConfig.getDouble("log.cleaner.io.max.bytes.per.second")) {
            LOG.info("Updating logCleanerIoMaxBytesPerSecond: {}", (Object)maxIoBytesPerSecond);
            this.throttler.updateDesiredRatePerSec(maxIoBytesPerSecond);
        }
        this.shutdownCleaners();
        this.startup();
    }

    public void abortCleaning(TopicPartition topicPartition) {
        this.cleanerManager.abortCleaning(topicPartition);
    }

    public void updateCheckpoints(File dataDir, Optional<TopicPartition> partitionToRemove) {
        this.cleanerManager.updateCheckpoints(dataDir, Optional.empty(), partitionToRemove);
    }

    public void alterCheckpointDir(TopicPartition topicPartition, File sourceLogDir, File destLogDir) {
        this.cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, destLogDir);
    }

    public void handleLogDirFailure(String dir) {
        this.cleanerManager.handleLogDirFailure(dir);
    }

    public void maybeTruncateCheckpoint(File dataDir, TopicPartition topicPartition, long offset) {
        this.cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, offset);
    }

    public void abortAndPauseCleaning(TopicPartition topicPartition) {
        this.cleanerManager.abortAndPauseCleaning(topicPartition);
    }

    public void resumeCleaning(Set<TopicPartition> topicPartitions) {
        this.cleanerManager.resumeCleaning(topicPartitions);
    }

    public boolean awaitCleaned(TopicPartition topicPartition, long offset, long maxWaitMs) throws InterruptedException {
        long sleepTime;
        for (long remainingWaitMs = maxWaitMs; !this.isCleaned(topicPartition, offset) && remainingWaitMs > 0L; remainingWaitMs -= sleepTime) {
            sleepTime = Math.min(100L, remainingWaitMs);
            Thread.sleep(sleepTime);
        }
        return this.isCleaned(topicPartition, offset);
    }

    private boolean isCleaned(TopicPartition topicPartition, long offset) {
        return Optional.ofNullable(this.cleanerManager.allCleanerCheckpoints().get(topicPartition)).map(checkpoint -> checkpoint >= offset).orElse(false);
    }

    public Map<TopicPartition, UnifiedLog> pauseCleaningForNonCompactedPartitions() {
        return this.cleanerManager.pauseCleaningForNonCompactedPartitions().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public double maxOverCleanerThreads(Function<CleanerThread, Double> f) {
        return this.cleaners.stream().mapToDouble(f::apply).max().orElse(0.0);
    }

    public int deadThreadCount() {
        return (int)this.cleaners.stream().filter(ShutdownableThread::isThreadFailed).count();
    }

    public LogCleanerManager cleanerManager() {
        return this.cleanerManager;
    }

    public Throttler throttler() {
        return this.throttler;
    }

    public ConcurrentMap<TopicPartition, UnifiedLog> logs() {
        return this.logs;
    }

    public List<CleanerThread> cleaners() {
        return this.cleaners;
    }

    public KafkaMetricsGroup metricsGroup() {
        return this.metricsGroup;
    }

    public CleanerConfig currentConfig() {
        return this.config;
    }

    public int cleanerCount() {
        return this.cleaners.size();
    }

    public final class CleanerThread
    extends ShutdownableThread {
        private final Logger logger;
        private final Cleaner cleaner;
        private volatile CleanerStats lastStats;
        private volatile PreCleanStats lastPreCleanStats;

        public CleanerThread(int threadId) throws NoSuchAlgorithmException {
            super("kafka-log-cleaner-thread-" + threadId, false);
            this.logger = new LogContext(this.logPrefix).logger(CleanerThread.class);
            this.lastStats = new CleanerStats(Time.SYSTEM);
            this.lastPreCleanStats = new PreCleanStats();
            this.cleaner = new Cleaner(threadId, new SkimpyOffsetMap((int)Math.min(LogCleaner.this.config.dedupeBufferSize / (long)LogCleaner.this.config.numThreads, Integer.MAX_VALUE), LogCleaner.this.config.hashAlgorithm()), LogCleaner.this.config.ioBufferSize / LogCleaner.this.config.numThreads / 2, LogCleaner.this.config.maxMessageSize, LogCleaner.this.config.dedupeBufferLoadFactor, LogCleaner.this.throttler, LogCleaner.this.time, this::checkDone);
            if (LogCleaner.this.config.dedupeBufferSize / (long)LogCleaner.this.config.numThreads > Integer.MAX_VALUE) {
                this.logger.warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...");
            }
        }

        private void checkDone(TopicPartition topicPartition) {
            if (!this.isRunning()) {
                throw new ThreadShutdownException();
            }
            LogCleaner.this.cleanerManager.checkCleaningAborted(topicPartition);
        }

        public void doWork() {
            boolean cleaned = this.tryCleanFilthiestLog();
            if (!cleaned) {
                try {
                    this.pause(LogCleaner.this.config.backoffMs, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            LogCleaner.this.cleanerManager.maintainUncleanablePartitions();
        }

        public CleanerStats lastStats() {
            return this.lastStats;
        }

        public void setLastStats(CleanerStats lastStats) {
            this.lastStats = lastStats;
        }

        public PreCleanStats lastPreCleanStats() {
            return this.lastPreCleanStats;
        }

        private boolean tryCleanFilthiestLog() {
            try {
                return this.cleanFilthiestLog();
            }
            catch (LogCleaningException e) {
                this.logger.warn("Unexpected exception thrown when cleaning log {}. Marking its partition ({}) as uncleanable", new Object[]{e.log, e.log.topicPartition(), e});
                LogCleaner.this.cleanerManager.markPartitionUncleanable(e.log.parentDir(), e.log.topicPartition());
                return false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean cleanFilthiestLog() throws LogCleaningException {
            boolean cleaned;
            PreCleanStats preCleanStats = new PreCleanStats();
            Optional<LogToClean> ltc = LogCleaner.this.cleanerManager.grabFilthiestCompactedLog(LogCleaner.this.time, preCleanStats);
            if (ltc.isEmpty()) {
                cleaned = false;
            } else {
                this.lastPreCleanStats = preCleanStats;
                LogToClean cleanable = ltc.get();
                try {
                    this.cleanLog(cleanable);
                    cleaned = true;
                }
                catch (ThreadShutdownException e) {
                    throw e;
                }
                catch (Exception e) {
                    throw new LogCleaningException(cleanable.log(), e.getMessage(), e);
                }
            }
            Map<TopicPartition, UnifiedLog> deletable = LogCleaner.this.cleanerManager.deletableLogs();
            try {
                deletable.forEach((topicPartition, log) -> {
                    try {
                        log.deleteOldSegments();
                    }
                    catch (ThreadShutdownException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new LogCleaningException((UnifiedLog)log, e.getMessage(), e);
                    }
                });
            }
            finally {
                LogCleaner.this.cleanerManager.doneDeleting(deletable.keySet().stream().toList());
            }
            return cleaned;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cleanLog(LogToClean cleanable) throws DigestException {
            long startOffset;
            long endOffset = startOffset = cleanable.firstDirtyOffset();
            try {
                Map.Entry<Long, CleanerStats> entry = this.cleaner.clean(cleanable);
                endOffset = entry.getKey();
                this.recordStats(this.cleaner.id(), cleanable.log().name(), startOffset, endOffset, entry.getValue());
            }
            catch (LogCleaningAbortedException entry) {
            }
            catch (KafkaStorageException entry) {
            }
            catch (IOException e) {
                String logDirectory = cleanable.log().parentDir();
                String msg = String.format("Failed to clean up log for %s in dir %s due to IOException", cleanable.topicPartition(), logDirectory);
                LogCleaner.this.logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e);
            }
            finally {
                LogCleaner.this.cleanerManager.doneCleaning(cleanable.topicPartition(), cleanable.log().parentDirFile(), endOffset);
            }
        }

        private void recordStats(int id, String name, long from, long to, CleanerStats stats) {
            this.lastStats = stats;
            String message = String.format("%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n", id, name, from, to) + String.format("\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n", this.mb(stats.bytesRead), stats.elapsedSecs(), this.mb((double)stats.bytesRead / stats.elapsedSecs())) + String.format("\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n", this.mb(stats.mapBytesRead), stats.elapsedIndexSecs(), this.mb(stats.mapBytesRead) / stats.elapsedIndexSecs(), 100.0 * stats.elapsedIndexSecs() / stats.elapsedSecs()) + String.format("\tBuffer utilization: %.1f%%%n", 100.0 * stats.bufferUtilization) + String.format("\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n", this.mb(stats.bytesRead), stats.elapsedSecs() - stats.elapsedIndexSecs(), this.mb(stats.bytesRead) / (stats.elapsedSecs() - stats.elapsedIndexSecs()), 100.0 * (stats.elapsedSecs() - stats.elapsedIndexSecs()) / stats.elapsedSecs()) + String.format("\tStart size: %,.1f MB (%,d messages)%n", this.mb(stats.bytesRead), stats.messagesRead) + String.format("\tEnd size: %,.1f MB (%,d messages)%n", this.mb(stats.bytesWritten), stats.messagesWritten) + String.format("\t%.1f%% size reduction (%.1f%% fewer messages)%n", 100.0 * (1.0 - Long.valueOf(stats.bytesWritten).doubleValue() / (double)stats.bytesRead), 100.0 * (1.0 - Long.valueOf(stats.messagesWritten).doubleValue() / (double)stats.messagesRead));
            this.logger.info(message);
            if (this.lastPreCleanStats.delayedPartitions() > 0) {
                this.logger.info("\tCleanable partitions: {}, Delayed partitions: {}, max delay: {}", new Object[]{this.lastPreCleanStats.cleanablePartitions(), this.lastPreCleanStats.delayedPartitions(), this.lastPreCleanStats.maxCompactionDelayMs()});
            }
            if (stats.invalidMessagesRead > 0L) {
                this.logger.warn("\tFound {} invalid messages during compaction.", (Object)stats.invalidMessagesRead);
            }
        }

        private double mb(double bytes) {
            return bytes / 1048576.0;
        }
    }
}

