/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore.timeline.core;

import com.alicloud.openservices.tablestore.AsyncClientInterface;
import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.SyncClientInterface;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.model.ConsumedCapacity;
import com.alicloud.openservices.tablestore.model.CreateTableRequest;
import com.alicloud.openservices.tablestore.model.DeleteTableRequest;
import com.alicloud.openservices.tablestore.model.PrimaryKeySchema;
import com.alicloud.openservices.tablestore.model.PrimaryKeyType;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.alicloud.openservices.tablestore.model.TableMeta;
import com.alicloud.openservices.tablestore.model.TableOptions;
import com.alicloud.openservices.tablestore.model.search.CreateSearchIndexRequest;
import com.alicloud.openservices.tablestore.model.search.DeleteSearchIndexRequest;
import com.alicloud.openservices.tablestore.model.search.SearchQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest;
import com.alicloud.openservices.tablestore.model.search.SearchResponse;
import com.alicloud.openservices.tablestore.timeline.TimelineQueue;
import com.alicloud.openservices.tablestore.timeline.TimelineStore;
import com.alicloud.openservices.tablestore.timeline.core.TimelineQueueImpl;
import com.alicloud.openservices.tablestore.timeline.model.RowPutChangeWithCallback;
import com.alicloud.openservices.tablestore.timeline.model.TimelineEntry;
import com.alicloud.openservices.tablestore.timeline.model.TimelineIdentifier;
import com.alicloud.openservices.tablestore.timeline.model.TimelineSchema;
import com.alicloud.openservices.tablestore.timeline.query.SearchParameter;
import com.alicloud.openservices.tablestore.timeline.query.SearchResult;
import com.alicloud.openservices.tablestore.timeline.utils.Preconditions;
import com.alicloud.openservices.tablestore.timeline.utils.Utils;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TimelineStoreImpl
implements TimelineStore {
    private final SyncClientInterface client;
    private final AsyncClientInterface asyncClient;
    private final TimelineSchema schema;
    private ExecutorService threadPool;
    private TableStoreWriter writer;
    private TableStoreCallback<RowChange, ConsumedCapacity> callback = null;
    private TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>(){

        @Override
        public void onCompleted(RowChange req, RowWriteResult res) {
            if (req instanceof RowPutChangeWithCallback) {
                RowPutChangeWithCallback rowPutChange = (RowPutChangeWithCallback)req;
                TimelineEntry timelineEntry = Utils.rowToTimelineEntryWithColumnList(TimelineStoreImpl.this.schema, res.getRow(), rowPutChange.getColumnsToPut());
                rowPutChange.setComplete(timelineEntry);
            }
        }

        @Override
        public void onFailed(RowChange req, Exception ex) {
            if (req instanceof RowPutChangeWithCallback) {
                RowPutChangeWithCallback rowPutChange = (RowPutChangeWithCallback)req;
                rowPutChange.setFailed(ex);
            }
        }
    };

    public TimelineStoreImpl(SyncClient client, TimelineSchema schema) {
        this.client = client;
        this.asyncClient = client.asAsyncClient();
        this.schema = schema;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TimelineQueue createTimelineQueue(TimelineIdentifier identifier) {
        Preconditions.checkNotNull(identifier, "Identifier should not be null.");
        if (null == this.writer) {
            TimelineStoreImpl timelineStoreImpl = this;
            synchronized (timelineStoreImpl) {
                if (null == this.writer) {
                    ThreadFactory threadFactory = new ThreadFactory(){
                        private final AtomicInteger counter = new AtomicInteger(1);

                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "timeline-callback-" + this.counter.getAndIncrement());
                        }
                    };
                    this.threadPool = new ThreadPoolExecutor(this.schema.getCallbackExecuteThreads(), this.schema.getMaxCallbackExecuteThreads(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
                    this.writer = new DefaultTableStoreWriter(this.asyncClient, this.schema.getTableName(), this.schema.getWriterConfig(), this.callback, this.threadPool);
                    this.writer.setResultCallback(this.resultCallback);
                }
            }
        }
        return new TimelineQueueImpl(this.client, this.writer, this.schema, identifier);
    }

    @Override
    public SearchResult<TimelineEntry> search(SearchParameter searchParameter) {
        return this.search(Utils.toSearchQuery(searchParameter));
    }

    @Override
    public SearchResult<TimelineEntry> search(SearchQuery searchQuery) {
        SearchResponse response;
        Preconditions.checkArgument(this.schema.hasDataIndex(), "The store not support search cause not has data index");
        SearchRequest request = new SearchRequest(this.schema.getTableName(), this.schema.getIndexName(), searchQuery);
        SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
        columnsToGet.setReturnAll(true);
        request.setColumnsToGet(columnsToGet);
        try {
            response = this.client.search(request);
        }
        catch (Exception e) {
            throw Utils.convertException(e);
        }
        ArrayList entries = new ArrayList(response.getRows().size());
        for (Row row : response.getRows()) {
            TimelineEntry entry = Utils.rowToTimelineEntry(this.schema, row);
            TimelineIdentifier identifier = Utils.primaryKeyToIdentifier(this.schema.getIdentifierSchema(), row.getPrimaryKey());
            SearchResult.Entry<TimelineEntry> se = new SearchResult.Entry<TimelineEntry>(identifier, entry);
            entries.add(se);
        }
        SearchResult<TimelineEntry> result = new SearchResult<TimelineEntry>(entries, response.isAllSuccess(), response.getTotalCount(), response.getNextToken());
        return result;
    }

    @Override
    public void prepareTables() {
        TableMeta tableMeta = new TableMeta(this.schema.getTableName());
        for (PrimaryKeySchema key : this.schema.getIdentifierSchema().getKeys()) {
            tableMeta.addPrimaryKeyColumn(key);
        }
        if (this.schema.isAutoGenerateSeqId()) {
            tableMeta.addAutoIncrementPrimaryKeyColumn(this.schema.getSequenceIdColumnName());
        } else {
            tableMeta.addPrimaryKeyColumn(this.schema.getSequenceIdColumnName(), PrimaryKeyType.INTEGER);
        }
        TableOptions tableOptions = new TableOptions();
        tableOptions.setTimeToLive(this.schema.getTimeToLive());
        tableOptions.setMaxVersions(1);
        CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
        try {
            this.client.createTable(request);
        }
        catch (Exception e) {
            throw Utils.convertException(e);
        }
        if (this.schema.hasDataIndex()) {
            CreateSearchIndexRequest csRequest = new CreateSearchIndexRequest();
            csRequest.setTableName(this.schema.getTableName());
            csRequest.setIndexName(this.schema.getIndexName());
            csRequest.setIndexSchema(this.schema.getIndexSchema());
            try {
                this.client.createSearchIndex(csRequest);
            }
            catch (Exception e) {
                throw Utils.convertException(e);
            }
        }
    }

    @Override
    public void dropAllTables() {
        if (this.schema.hasDataIndex()) {
            DeleteSearchIndexRequest dsRequest = new DeleteSearchIndexRequest();
            dsRequest.setTableName(this.schema.getTableName());
            dsRequest.setIndexName(this.schema.getIndexName());
            try {
                this.client.deleteSearchIndex(dsRequest);
            }
            catch (Exception e) {
                throw Utils.convertException(e);
            }
        }
        DeleteTableRequest request = new DeleteTableRequest(this.schema.getTableName());
        try {
            this.client.deleteTable(request);
        }
        catch (Exception e) {
            throw Utils.convertException(e);
        }
    }

    @Override
    public void flush() {
        if (this.writer != null) {
            this.writer.flush();
        }
    }

    @Override
    public void close() {
        if (this.writer != null) {
            this.writer.close();
            this.threadPool.shutdown();
        }
    }
}

