package com.atlassian.pipelines.runner.core.log;

import com.atlassian.pipelines.runner.api.factory.UploadLogTaskFactory;
import com.atlassian.pipelines.runner.api.log.LogUploadManager;
import com.atlassian.pipelines.runner.api.log.processor.LogProcessor;
import com.atlassian.pipelines.runner.api.model.log.LogId;
import com.atlassian.pipelines.runner.api.model.step.FeatureFlag;
import com.atlassian.pipelines.runner.api.model.step.Step;
import com.atlassian.pipelines.runner.api.model.step.service.Service;
import com.atlassian.pipelines.runner.api.service.EventService;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.vavr.Value;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.control.Option;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/atlassian/pipelines/runner/core/log/LogUploadManagerImpl.class */
public final class LogUploadManagerImpl implements LogUploadManager {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) LogUploadManagerImpl.class);
    public static final int MAX_LOG_UPLOAD_WAIT_TIME_SECONDS_DEFAULT = 5;
    private final UploadLogTaskFactory uploadLogTaskFactory;
    private final LogProcessor logProcessor;
    private final EventService eventService;
    private Optional<Disposable> watchForLogRequestedEventsSubscription = Optional.empty();
    private boolean isRealTimeUploadEnabled = false;

    public LogUploadManagerImpl(UploadLogTaskFactory uploadLogTaskFactory, LogProcessor logProcessor, EventService eventService) {
        this.uploadLogTaskFactory = uploadLogTaskFactory;
        this.logProcessor = logProcessor;
        this.eventService = eventService;
    }

    @Override // com.atlassian.pipelines.runner.api.log.LogUploadManager
    public Completable startUploads(Step step) {
        boolean isDynamicLogUploadSpeedEnabled = isDynamicLogUploadSpeedEnabled(step.getFeatureFlags());
        return Completable.merge(scheduleLogUploads(step.getServices(), isDynamicLogUploadSpeedEnabled, getDynamicLogUploadMaxWaitTime(step.getFeatureFlags()))).doOnSubscribe(disposable -> {
            logger.info("Starting log uploader.");
        }).doOnComplete(() -> {
            watchForLogRequestedEvents(step, isDynamicLogUploadSpeedEnabled);
        }).doOnError(th -> {
            logger.error("An error occurred whilst starting log uploader.", th);
        });
    }

    private void watchForLogRequestedEvents(Step step, boolean z) {
        if (z) {
            this.watchForLogRequestedEventsSubscription = Optional.of(this.eventService.watchForLogRequestedEvents(step.getId()).doOnComplete(() -> {
                logger.info("Log requested event received. Sending logs in realtime.");
            }).subscribe(stepId -> {
                startSendingLogsInRealtime(step.getServices());
            }, th -> {
                logger.error("Error waiting for log requested events", th);
            }));
        }
    }

    private List<Completable> scheduleLogUploads(List<Service> list, boolean z, int i) {
        return List.of(Completable.fromAction(() -> {
            this.logProcessor.schedule(this.uploadLogTaskFactory.createMainLogUploadTask(z, i));
        })).appendAll((Iterable) list.filter(LogUploadManagerImpl::isNotLocal).map(service -> {
            return Completable.fromAction(() -> {
                this.logProcessor.schedule(this.uploadLogTaskFactory.createServiceLogUploadTask(service, z, i));
            });
        }));
    }

    private static boolean isNotLocal(Service service) {
        return service.getOrigin() != Service.Origin.LOCAL;
    }

    @Override // com.atlassian.pipelines.runner.api.log.LogUploadManager
    public Completable completeUploads(List<Service> list) {
        this.watchForLogRequestedEventsSubscription.ifPresent((v0) -> {
            v0.dispose();
        });
        Completable doOnError = Completable.merge(completeLogUploads(list)).doOnSubscribe(disposable -> {
            logger.info("Shutting down log uploader.");
        }).doOnError(th -> {
            logger.error("An error occurred whilst shutting down log uploader.", th);
        });
        LogProcessor logProcessor = this.logProcessor;
        Objects.requireNonNull(logProcessor);
        return doOnError.doFinally(logProcessor::shutdown);
    }

    private List<Completable> completeLogUploads(List<Service> list) {
        return List.of(Completable.fromAction(() -> {
            this.logProcessor.completeTask(LogId.main());
        })).appendAll((Iterable) getServiceLogIds(list).map(logId -> {
            return Completable.fromAction(() -> {
                this.logProcessor.completeTask(logId);
            });
        }));
    }

    @Override // com.atlassian.pipelines.runner.api.log.LogUploadManager
    public void startSendingLogsInRealtime(List<Service> list) {
        if (this.isRealTimeUploadEnabled) {
            return;
        }
        logger.info("Start sending logs in real time");
        this.isRealTimeUploadEnabled = true;
        this.logProcessor.startSendingLogsInRealtime(LogId.main());
        List<LogId> serviceLogIds = getServiceLogIds(list);
        LogProcessor logProcessor = this.logProcessor;
        Objects.requireNonNull(logProcessor);
        serviceLogIds.forEach(logProcessor::startSendingLogsInRealtime);
    }

    private static List<LogId> getServiceLogIds(List<Service> list) {
        return list.filter(LogUploadManagerImpl::isNotLocal).map((v0) -> {
            return v0.getUuid();
        }).map((Function<? super U, ? extends U>) LogId::service);
    }

    private boolean isDynamicLogUploadSpeedEnabled(Map<FeatureFlag, Object> map) {
        Option<Object> option = map.get(FeatureFlag.DYNAMIC_LOG_UPLOAD_SPEED_ENABLED);
        Class<Boolean> cls = Boolean.class;
        Objects.requireNonNull(Boolean.class);
        return ((Boolean) option.map(cls::cast).getOrElse((Value) false)).booleanValue();
    }

    private int getDynamicLogUploadMaxWaitTime(Map<FeatureFlag, Object> map) {
        Option<Object> option = map.get(FeatureFlag.DYNAMIC_LOG_UPLOAD_MAX_WAIT_TIME_SECONDS);
        Class<Integer> cls = Integer.class;
        Objects.requireNonNull(Integer.class);
        return ((Integer) option.map(cls::cast).getOrElse((Value) 5)).intValue();
    }
}
