package com.sohu.jafka.log;

import com.sohu.jafka.api.OffsetRequest;
import com.sohu.jafka.api.PartitionChooser;
import com.sohu.jafka.common.InvalidPartitionException;
import com.sohu.jafka.server.ServerConfig;
import com.sohu.jafka.server.ServerRegister;
import com.sohu.jafka.server.TopicTask;
import com.sohu.jafka.utils.Closer;
import com.sohu.jafka.utils.IteratorTemplate;
import com.sohu.jafka.utils.KV;
import com.sohu.jafka.utils.Pool;
import com.sohu.jafka.utils.Scheduler;
import com.sohu.jafka.utils.TopicNameValidator;
import com.sohu.jafka.utils.Utils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/log/LogManager.class */
public class LogManager implements PartitionChooser, Closeable {
    final ServerConfig config;
    private final Scheduler scheduler;
    final long logCleanupIntervalMs;
    final long logCleanupDefaultAgeMs;
    final boolean needRecovery;
    final int numPartitions;
    final File logDir;
    final int flushInterval;
    final CountDownLatch startupLatch;
    final Map<String, Integer> logFlushIntervalMap;
    final Map<String, Long> logRetentionMSMap;
    final int logRetentionSize;
    private ServerRegister serverRegister;
    private final Map<String, Integer> topicPartitionsMap;
    private RollingStrategy rollingStategy;
    private final int maxMessageSize;
    private final Logger logger = LoggerFactory.getLogger(LogManager.class);
    private final Object logCreationLock = new Object();
    final Random random = new Random();
    private final Pool<String, Pool<Integer, Log>> logs = new Pool<>();
    private final Scheduler logFlusherScheduler = new Scheduler(1, "jafka-logflusher-", false);
    private final LinkedBlockingQueue<TopicTask> topicRegisterTasks = new LinkedBlockingQueue<>();
    private volatile boolean stopTopicRegisterTasks = false;

    /* loaded from: input_file:com/sohu/jafka/log/LogManager$TopicRegisterTask.class */
    class TopicRegisterTask extends Thread {
        TopicRegisterTask() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            LogManager.this.registeredTaskLooply();
        }
    }

    public LogManager(ServerConfig serverConfig, Scheduler scheduler, long j, long j2, boolean z) {
        this.config = serverConfig;
        this.maxMessageSize = serverConfig.getMaxMessageSize();
        this.scheduler = scheduler;
        this.logCleanupIntervalMs = j;
        this.logCleanupDefaultAgeMs = j2;
        this.needRecovery = z;
        this.logDir = Utils.getCanonicalFile(new File(serverConfig.getLogDir()));
        this.numPartitions = serverConfig.getNumPartitions();
        this.flushInterval = serverConfig.getFlushInterval();
        this.topicPartitionsMap = serverConfig.getTopicPartitionsMap();
        this.startupLatch = serverConfig.getEnableZookeeper() ? new CountDownLatch(1) : null;
        this.logFlushIntervalMap = serverConfig.getFlushIntervalMap();
        this.logRetentionSize = serverConfig.getLogRetentionSize();
        this.logRetentionMSMap = getLogRetentionMSMap(serverConfig.getLogRetentionHoursMap());
    }

    public void setRollingStategy(RollingStrategy rollingStrategy) {
        this.rollingStategy = rollingStrategy;
    }

    public void load() throws IOException {
        if (this.rollingStategy == null) {
            this.rollingStategy = new FixedSizeRollingStrategy(this.config.getLogFileSize());
        }
        if (!this.logDir.exists()) {
            this.logger.info("No log directory found, creating '" + this.logDir.getAbsolutePath() + "'");
            this.logDir.mkdirs();
        }
        if (!this.logDir.isDirectory() || !this.logDir.canRead()) {
            throw new IllegalArgumentException(this.logDir.getAbsolutePath() + " is not a readable log directory.");
        }
        File[] listFiles = this.logDir.listFiles();
        if (listFiles != null) {
            for (File file : listFiles) {
                if (file.isDirectory()) {
                    this.logger.info("Loading log from " + file.getAbsolutePath());
                    String name = file.getName();
                    if (-1 == name.indexOf(45)) {
                        throw new IllegalArgumentException("error topic directory: " + file.getAbsolutePath());
                    }
                    KV<String, Integer> topicPartition = Utils.getTopicPartition(name);
                    String str = topicPartition.k;
                    int intValue = topicPartition.v.intValue();
                    Log log = new Log(file, intValue, this.rollingStategy, this.flushInterval, this.needRecovery, this.maxMessageSize);
                    this.logs.putIfNotExists(str, new Pool<>());
                    this.logs.get(str).put((Pool<Integer, Log>) Integer.valueOf(intValue), (Integer) log);
                    if (getPartition(str) <= intValue) {
                        this.topicPartitionsMap.put(str, Integer.valueOf(intValue + 1));
                    }
                } else {
                    this.logger.warn("Skipping unexplainable file '" + file.getAbsolutePath() + "'--should it be there?");
                }
            }
        }
        if (this.scheduler != null) {
            this.logger.debug("starting log cleaner every " + this.logCleanupIntervalMs + " ms");
            this.scheduler.scheduleWithRate(new Runnable() { // from class: com.sohu.jafka.log.LogManager.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        LogManager.this.cleanupLogs();
                    } catch (IOException e) {
                        LogManager.this.logger.error("cleanup log failed.", e);
                    }
                }
            }, 60000L, this.logCleanupIntervalMs);
        }
        if (this.config.getEnableZookeeper()) {
            this.serverRegister = new ServerRegister(this.config, this);
            this.serverRegister.startup();
            TopicRegisterTask topicRegisterTask = new TopicRegisterTask();
            topicRegisterTask.setName("jafka.topicregister");
            topicRegisterTask.setDaemon(true);
            topicRegisterTask.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registeredTaskLooply() {
        TopicTask take;
        while (!this.stopTopicRegisterTasks) {
            try {
                take = this.topicRegisterTasks.take();
            } catch (Exception e) {
                this.logger.error(e.getMessage(), e);
            }
            if (take.type == TopicTask.TaskType.SHUTDOWN) {
                break;
            } else {
                this.serverRegister.processTask(take);
            }
        }
        this.logger.debug("stop topic register task");
    }

    private Map<String, Long> getLogRetentionMSMap(Map<String, Integer> map) {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next().getKey(), Long.valueOf(r0.getValue().intValue() * 60 * 60 * 1000));
        }
        return hashMap;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.logFlusherScheduler.shutdown();
        Iterator<Log> logIterator = getLogIterator();
        while (logIterator.hasNext()) {
            Closer.closeQuietly(logIterator.next(), this.logger);
        }
        if (this.config.getEnableZookeeper()) {
            this.stopTopicRegisterTasks = true;
            this.topicRegisterTasks.add(new TopicTask(TopicTask.TaskType.SHUTDOWN, null));
            this.topicRegisterTasks.add(new TopicTask(TopicTask.TaskType.SHUTDOWN, null));
            Closer.closeQuietly(this.serverRegister);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupLogs() throws IOException {
        this.logger.trace("Beginning log cleanup...");
        int i = 0;
        Iterator<Log> logIterator = getLogIterator();
        long currentTimeMillis = System.currentTimeMillis();
        while (logIterator.hasNext()) {
            Log next = logIterator.next();
            i += cleanupExpiredSegments(next) + cleanupSegmentsToMaintainSize(next);
        }
        if (i > 0) {
            this.logger.warn("Log cleanup completed. " + i + " files deleted in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds");
        } else {
            this.logger.trace("Log cleanup completed. " + i + " files deleted in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds");
        }
    }

    private int cleanupSegmentsToMaintainSize(final Log log) throws IOException {
        if (this.logRetentionSize < 0 || log.size() < this.logRetentionSize) {
            return 0;
        }
        return deleteSegments(log, log.markDeletedWhile(new LogSegmentFilter() { // from class: com.sohu.jafka.log.LogManager.2
            long diff;

            {
                this.diff = log.size() - LogManager.this.logRetentionSize;
            }

            @Override // com.sohu.jafka.log.LogSegmentFilter
            public boolean filter(LogSegment logSegment) {
                this.diff -= logSegment.size();
                return this.diff >= 0;
            }
        }));
    }

    private int cleanupExpiredSegments(Log log) throws IOException {
        final long currentTimeMillis = System.currentTimeMillis();
        Long l = this.logRetentionMSMap.get(Utils.getTopicPartition(log.dir.getName()).k);
        if (l == null) {
            l = Long.valueOf(this.logCleanupDefaultAgeMs);
        }
        final long longValue = l.longValue();
        return deleteSegments(log, log.markDeletedWhile(new LogSegmentFilter() { // from class: com.sohu.jafka.log.LogManager.3
            @Override // com.sohu.jafka.log.LogSegmentFilter
            public boolean filter(LogSegment logSegment) {
                return currentTimeMillis - logSegment.getFile().lastModified() > longValue;
            }
        }));
    }

    private int deleteSegments(Log log, List<LogSegment> list) {
        int i = 0;
        for (LogSegment logSegment : list) {
            boolean z = false;
            try {
                try {
                    logSegment.getMessageSet().close();
                } catch (IOException e) {
                    this.logger.warn(e.getMessage(), e);
                }
                if (logSegment.getFile().delete()) {
                    i++;
                } else {
                    z = true;
                }
                this.logger.warn(String.format("DELETE_LOG[%s] %s => %s", log.name, logSegment.getFile().getAbsolutePath(), Boolean.valueOf(z)));
            } catch (Throwable th) {
                this.logger.warn(String.format("DELETE_LOG[%s] %s => %s", log.name, logSegment.getFile().getAbsolutePath(), false));
                throw th;
            }
        }
        return i;
    }

    public void startup() {
        if (this.config.getEnableZookeeper()) {
            this.serverRegister.registerBrokerInZk();
            Iterator<String> it = getAllTopics().iterator();
            while (it.hasNext()) {
                this.serverRegister.processTask(new TopicTask(TopicTask.TaskType.CREATE, it.next()));
            }
            this.startupLatch.countDown();
        }
        this.logger.debug("Starting log flusher every {} ms with the following overrides {}", Integer.valueOf(this.config.getFlushSchedulerThreadRate()), this.logFlushIntervalMap);
        this.logFlusherScheduler.scheduleWithRate(new Runnable() { // from class: com.sohu.jafka.log.LogManager.4
            @Override // java.lang.Runnable
            public void run() {
                LogManager.this.flushAllLogs(false);
            }
        }, this.config.getFlushSchedulerThreadRate(), this.config.getFlushSchedulerThreadRate());
    }

    public void flushAllLogs(boolean z) {
        Iterator<Log> logIterator = getLogIterator();
        while (logIterator.hasNext()) {
            Log next = logIterator.next();
            boolean z2 = z;
            if (!z2) {
                try {
                    long currentTimeMillis = System.currentTimeMillis() - next.getLastFlushedTime();
                    Integer num = this.logFlushIntervalMap.get(next.getTopicName());
                    if (num == null) {
                        num = Integer.valueOf(this.config.getDefaultFlushIntervalMs());
                    }
                    z2 = currentTimeMillis >= ((long) num.intValue());
                    this.logger.trace(String.format("[%s] flush interval %d, last flushed %d, need flush? %s", next.getTopicName(), num, Long.valueOf(next.getLastFlushedTime()), Boolean.valueOf(z2)));
                } catch (IOException e) {
                    this.logger.error("Error flushing topic " + next.getTopicName(), e);
                    this.logger.error("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage(), e);
                    Runtime.getRuntime().halt(1);
                } catch (Exception e2) {
                    this.logger.error("Error flushing topic " + next.getTopicName(), e2);
                }
            }
            if (z2) {
                next.flush();
            }
        }
    }

    private Collection<String> getAllTopics() {
        return this.logs.keySet();
    }

    private Iterator<Log> getLogIterator() {
        return new IteratorTemplate<Log>() { // from class: com.sohu.jafka.log.LogManager.5
            final Iterator<Pool<Integer, Log>> iterator;
            Iterator<Log> logIter;

            {
                this.iterator = LogManager.this.logs.values().iterator();
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.sohu.jafka.utils.IteratorTemplate
            public Log makeNext() {
                while (true) {
                    if (this.logIter != null && this.logIter.hasNext()) {
                        return this.logIter.next();
                    }
                    if (!this.iterator.hasNext()) {
                        return allDone();
                    }
                    this.logIter = this.iterator.next().values().iterator();
                }
            }
        };
    }

    private void awaitStartup() {
        if (this.config.getEnableZookeeper()) {
            try {
                this.startupLatch.await();
            } catch (InterruptedException e) {
                this.logger.warn(e.getMessage(), e);
            }
        }
    }

    private Pool<Integer, Log> getLogPool(String str, int i) {
        awaitStartup();
        if (str.length() <= 0) {
            throw new IllegalArgumentException("topic name can't be empty");
        }
        Integer num = this.topicPartitionsMap.get(str);
        if (num == null) {
            num = Integer.valueOf(this.numPartitions);
        }
        if (i >= 0 && i < num.intValue()) {
            return this.logs.get(str);
        }
        String format = String.format("Wrong partition [%d] for topic [%s], valid partitions [0,%d)", Integer.valueOf(i), str, Integer.valueOf(num.intValue() - 1));
        this.logger.warn(format);
        throw new InvalidPartitionException(format);
    }

    public ILog getLog(String str, int i) {
        TopicNameValidator.validate(str);
        Pool<Integer, Log> logPool = getLogPool(str, i);
        if (logPool == null) {
            return null;
        }
        return logPool.get(Integer.valueOf(i));
    }

    public ILog getOrCreateLog(String str, int i) throws IOException {
        int partition = getPartition(str);
        if (i >= partition) {
            throw new IOException("partition is bigger than the number of configuration: " + partition);
        }
        Pool<Integer, Log> logPool = getLogPool(str, i);
        if (logPool == null) {
            r11 = this.logs.putIfNotExists(str, new Pool<>()) == null;
            logPool = this.logs.get(str);
        }
        Log log = logPool.get(Integer.valueOf(i));
        if (log == null) {
            log = createLog(str, i);
            Log putIfNotExists = logPool.putIfNotExists(Integer.valueOf(i), log);
            if (putIfNotExists != null) {
                Closer.closeQuietly(log, this.logger);
                log = putIfNotExists;
            } else {
                this.logger.info(String.format("Created log for [%s-%d], now create other logs if necessary", str, Integer.valueOf(i)));
                int partition2 = getPartition(str);
                for (int i2 = 0; i2 < partition2; i2++) {
                    getOrCreateLog(str, i2);
                }
            }
        }
        if (r11 && this.config.getEnableZookeeper()) {
            this.topicRegisterTasks.add(new TopicTask(TopicTask.TaskType.CREATE, str));
        }
        return log;
    }

    public int createLogs(String str, int i, boolean z) {
        TopicNameValidator.validate(str);
        synchronized (this.logCreationLock) {
            int partition = getPartition(str);
            if (partition >= i || !z) {
                return partition;
            }
            this.topicPartitionsMap.put(str, Integer.valueOf(i));
            if (this.config.getEnableZookeeper()) {
                if (getLogPool(str, 0) != null) {
                    this.topicRegisterTasks.add(new TopicTask(TopicTask.TaskType.ENLARGE, str));
                } else {
                    this.topicRegisterTasks.add(new TopicTask(TopicTask.TaskType.CREATE, str));
                }
            }
            return i;
        }
    }

    public int deleteLogs(String str, String str2) {
        if (!this.config.getAuthentication().auth(str2)) {
            return -1;
        }
        int i = 0;
        synchronized (this.logCreationLock) {
            Pool<Integer, Log> remove = this.logs.remove(str);
            if (remove != null) {
                Iterator it = new ArrayList(remove.values()).iterator();
                while (it.hasNext()) {
                    ((Log) it.next()).delete();
                    i++;
                }
            }
            if (this.config.getEnableZookeeper()) {
                this.topicRegisterTasks.add(new TopicTask(TopicTask.TaskType.DELETE, str));
            }
        }
        return i;
    }

    private Log createLog(String str, int i) throws IOException {
        Log log;
        synchronized (this.logCreationLock) {
            File file = new File(this.logDir, str + "-" + i);
            file.mkdirs();
            log = new Log(file, i, this.rollingStategy, this.flushInterval, false, this.maxMessageSize);
        }
        return log;
    }

    private int getPartition(String str) {
        Integer num = this.topicPartitionsMap.get(str);
        return num != null ? num.intValue() : this.numPartitions;
    }

    @Override // com.sohu.jafka.api.PartitionChooser
    public int choosePartition(String str) {
        return this.random.nextInt(getPartition(str));
    }

    public List<Long> getOffsets(OffsetRequest offsetRequest) {
        ILog log = getLog(offsetRequest.topic, offsetRequest.partition);
        return log != null ? log.getOffsetsBefore(offsetRequest) : ILog.EMPTY_OFFSETS;
    }

    public Map<String, Integer> getTopicPartitionsMap() {
        return this.topicPartitionsMap;
    }
}
