package com.atlassian.pipelines.runner.core.log.processor;

import brave.Span;
import com.atlassian.pipelines.common.trace.rxjava.util.BraveUtil;
import com.atlassian.pipelines.runner.api.log.processor.LogProcessor;
import com.atlassian.pipelines.runner.api.log.processor.task.LogTask;
import com.atlassian.pipelines.runner.api.model.log.LogId;
import com.atlassian.pipelines.runner.api.model.variable.EnvironmentVariable;
import com.atlassian.pipelines.variable.model.SystemVariableKey;
import com.google.common.annotations.VisibleForTesting;
import io.vavr.Value;
import io.vavr.collection.List;
import io.vavr.control.Option;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/atlassian/pipelines/runner/core/log/processor/LogProcessorImpl.class */
public final class LogProcessorImpl implements LogProcessor {
    private static final long PROCESS_DELAY_MILLIS = 0;
    private static final int THREAD_POOL_CORE_SIZE = 2;
    public static final int MAX_UNTHROTTLED_PARALLEL_STEPS = 10;
    private final Duration processPeriod;
    private final ConcurrentHashMap<LogId, ScheduledTask> scheduledTasks;
    private final ScheduledExecutorService executor;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) LogProcessorImpl.class);
    static final Duration DEFAULT_PROCESS_PERIOD = Duration.ofSeconds(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/atlassian/pipelines/runner/core/log/processor/LogProcessorImpl$ScheduledTask.class */
    public static final class ScheduledTask implements Runnable {
        private final LogTask logTask;
        private ScheduledFuture<?> scheduledFuture;
        private final Map<String, String> context = (Map) Optional.ofNullable(MDC.getCopyOfContextMap()).orElse(Collections.emptyMap());
        private final Optional<Span> span = BraveUtil.currentSpan();
        private final CountDownLatch startedLatch = new CountDownLatch(1);
        private final CountDownLatch completedLatch = new CountDownLatch(1);

        ScheduledTask(LogTask logTask) {
            this.logTask = logTask;
        }

        public void startSendingLogsInRealtime() {
            this.logTask.startSendingLogsInRealtime();
        }

        public void started(ScheduledFuture<?> scheduledFuture) {
            this.scheduledFuture = scheduledFuture;
            this.startedLatch.countDown();
        }

        public boolean complete() {
            return completeWithoutLogging();
        }

        private boolean completeWithoutLogging() {
            try {
                tryComplete();
                return true;
            } catch (InterruptedException | RuntimeException e) {
                LogProcessorImpl.logger.error("Error occurred while trying to complete a scheduled log task: {}. Task cancelled.", this.logTask.getLogSafeName(), e);
                cancel();
                return false;
            }
        }

        private void tryComplete() throws InterruptedException {
            stopTailing();
            run();
            this.completedLatch.await();
        }

        private synchronized void stopTailing() {
            this.logTask.stopTailing();
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            try {
                BraveUtil.SafeCloseable withOptionalSpanInScope = BraveUtil.withOptionalSpanInScope(this.span);
                try {
                    MDC.setContextMap(this.context);
                    tryRun();
                    if (withOptionalSpanInScope != null) {
                        withOptionalSpanInScope.close();
                    }
                } finally {
                }
            } catch (InterruptedException | RuntimeException e) {
                LogProcessorImpl.logger.error("Unexpected error running task: {}. Cancelling.", this.logTask.getLogSafeName(), e);
                cancel();
            }
        }

        private synchronized void tryRun() throws InterruptedException {
            this.startedLatch.await();
            if (isComplete() || this.logTask.run()) {
                return;
            }
            cancel();
        }

        private void cancel() {
            this.completedLatch.countDown();
            this.scheduledFuture.cancel(false);
        }

        private boolean isComplete() throws InterruptedException {
            return this.completedLatch.await(0L, TimeUnit.MILLISECONDS);
        }
    }

    public LogProcessorImpl(Duration duration) {
        this.processPeriod = duration;
        this.scheduledTasks = new ConcurrentHashMap<>();
        this.executor = Executors.newScheduledThreadPool(2);
    }

    public LogProcessorImpl(List<EnvironmentVariable> list) {
        this(getProcessPeriod(list));
    }

    private static Duration getProcessPeriod(List<EnvironmentVariable> list) {
        return (Duration) getParallelGroupSize(list).map((v0) -> {
            return getProcessPeriodForParallelStep(v0);
        }).getOrElse((Value) DEFAULT_PROCESS_PERIOD);
    }

    private static Option<Integer> getParallelGroupSize(List<EnvironmentVariable> list) {
        try {
            return list.find(environmentVariable -> {
                return environmentVariable.getKey().equals(SystemVariableKey.BITBUCKET_PARALLEL_STEP_COUNT.name());
            }).map((v0) -> {
                return v0.getValue();
            }).map(Integer::parseInt);
        } catch (Exception e) {
            logger.error("Failed to determine parallel group size. Using standard log upload period.", (Throwable) e);
            return Option.none();
        }
    }

    public static Duration getProcessPeriodForParallelStep(int i) {
        return Duration.ofMillis((DEFAULT_PROCESS_PERIOD.toMillis() * Math.max(10, i)) / 10);
    }

    @VisibleForTesting
    Duration getProcessPeriod() {
        return this.processPeriod;
    }

    @Override // com.atlassian.pipelines.runner.api.log.processor.LogProcessor
    public void schedule(LogTask logTask) {
        this.scheduledTasks.computeIfAbsent(logTask.getLogId(), logId -> {
            return createAndStartScheduledTask(logTask);
        });
    }

    private ScheduledTask createAndStartScheduledTask(LogTask logTask) {
        ScheduledTask scheduledTask = new ScheduledTask(logTask);
        scheduledTask.started(this.executor.scheduleAtFixedRate(scheduledTask, 0L, this.processPeriod.toMillis(), TimeUnit.MILLISECONDS));
        return scheduledTask;
    }

    @Override // com.atlassian.pipelines.runner.api.log.processor.LogProcessor
    public boolean completeTask(LogId logId) {
        if (this.scheduledTasks.containsKey(logId)) {
            return ((Boolean) Optional.ofNullable(this.scheduledTasks.get(logId)).map((v0) -> {
                return v0.complete();
            }).orElse(false)).booleanValue();
        }
        return false;
    }

    @Override // com.atlassian.pipelines.runner.api.log.processor.LogProcessor
    public void startSendingLogsInRealtime(LogId logId) {
        Optional.ofNullable(this.scheduledTasks.get(logId)).ifPresent((v0) -> {
            v0.startSendingLogsInRealtime();
        });
    }

    @Override // com.atlassian.pipelines.runner.api.log.processor.LogProcessor
    public void shutdown() {
        this.executor.shutdown();
    }
}
