package com.atolio.connector.core.backfiller;

import com.atolio.connector.core.api.DataAccessor;
import com.atolio.connector.core.api.DataPager;
import com.atolio.connector.core.feeder.Feeder;
import com.atolio.connector.core.feeder.FeederStream;
import com.atolio.connector.core.model.dto.EntityDTOBase;
import com.atolio.pbingest.Feed;
import io.grpc.stub.StreamObserver;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/lib/core-dc-connector-0.0.0.2.jar:com/atolio/connector/core/backfiller/BackfillerProcess.class */
public class BackfillerProcess extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(BackfillerProcess.class);
    private final DataAccessor dataAccessor;
    private final Feeder feeder;
    private final BackfillerProcessStatus backfillerProcessStatus = new BackfillerProcessStatus();

    /* JADX INFO: Access modifiers changed from: package-private */
    public BackfillerProcess(DataAccessor dataAccessor, Feeder feeder) {
        this.dataAccessor = (DataAccessor) Objects.requireNonNull(dataAccessor, "DataAccessor is NULL. Backfiller Process can't be created without DataAccessor!");
        this.feeder = (Feeder) Objects.requireNonNull(feeder, "Feeder is NULL. Backfiller Process can't be created without Feeder!");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                LOGGER.info("Backfiller process starting");
                this.backfillerProcessStatus.setStarted(true);
                this.backfillerProcessStatus.setStartedDate(new Date());
                List<String> dataCollectionsOrder = this.dataAccessor.getDataCollectionsOrder();
                Objects.requireNonNull(dataCollectionsOrder, "dataCollectionsOrder is NULL. Backfiller Process can't transform and publish NULL data collections!");
                FeederStream feederStream = setupFeederStream();
                for (String str : dataCollectionsOrder) {
                    LOGGER.info("Backfiller processing data collection starting: {}", str);
                    DataPager<? extends EntityDTOBase> dataCollection = this.dataAccessor.getDataCollection(str, this.feeder);
                    Objects.requireNonNull(dataCollection, "EntitiesToPublish is NULL. Backfiller Process can't transform and publish NULL data!");
                    while (dataCollection.hasNext() && !this.backfillerProcessStatus.isTerminated()) {
                        try {
                            List<? extends EntityDTOBase> nextPage = dataCollection.nextPage();
                            LOGGER.debug("Start iterating page of collection: {} (size={})", str, Integer.valueOf(nextPage.size()));
                            for (EntityDTOBase entityDTOBase : nextPage) {
                                if (this.backfillerProcessStatus.isTerminated()) {
                                    break;
                                } else {
                                    publishEntity(feederStream, entityDTOBase);
                                }
                            }
                            LOGGER.debug("Finished iterating page of collection: {}", str);
                        } catch (Exception e) {
                            LOGGER.error("Error while processing a page of records", e);
                            this.backfillerProcessStatus.addErrorMessage(e.getMessage());
                        }
                    }
                    LOGGER.info("Backfiller processing data collection finished: {}", str);
                }
                feederStream.onCompletedRequest();
                LOGGER.info("Backfiller process finished");
                this.backfillerProcessStatus.setFinishedDate(new Date());
                this.backfillerProcessStatus.setCompleted(true);
            } catch (Throwable th) {
                LOGGER.error("Backfiller process finished with error", th);
                this.backfillerProcessStatus.addErrorMessage(th.getMessage());
                this.backfillerProcessStatus.setFinishedDate(new Date());
                this.backfillerProcessStatus.setCompleted(true);
            }
        } catch (Throwable th2) {
            this.backfillerProcessStatus.setFinishedDate(new Date());
            this.backfillerProcessStatus.setCompleted(true);
            throw th2;
        }
    }

    private FeederStream setupFeederStream() {
        return new FeederStream(this.feeder, new StreamObserver<Feed.PublishResponse>() { // from class: com.atolio.connector.core.backfiller.BackfillerProcess.1
            @Override // io.grpc.stub.StreamObserver
            public void onNext(Feed.PublishResponse publishResponse) {
                if (publishResponse.hasError()) {
                    String message = publishResponse.getError().getMessage();
                    BackfillerProcess.LOGGER.error("error in publish response: {}", message);
                    BackfillerProcess.this.backfillerProcessStatus.addErrorMessage(message);
                }
            }

            @Override // io.grpc.stub.StreamObserver
            public void onError(Throwable th) {
                BackfillerProcess.LOGGER.error("Feeder server respond with error", th);
                BackfillerProcess.this.backfillerProcessStatus.addErrorMessage(th.getMessage());
            }

            @Override // io.grpc.stub.StreamObserver
            public void onCompleted() {
                BackfillerProcess.this.backfillerProcessStatus.setCompleted(true);
            }
        });
    }

    private void publishEntity(FeederStream feederStream, EntityDTOBase entityDTOBase) {
        LOGGER.trace("Feeder object: {}", entityDTOBase != null ? entityDTOBase.getId() : "null");
        feederStream.publishEntity(entityDTOBase);
    }

    public BackfillerProcessStatus getStatus() {
        return this.backfillerProcessStatus;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void terminate() {
        this.backfillerProcessStatus.setTerminated(true);
        interrupt();
    }
}
