/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.client.example.examples;

import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.LimitExceededException;
import com.aliyun.datahub.client.exception.SeekOutOfRangeException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.CursorType;
import com.aliyun.datahub.client.model.GetRecordsResult;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.SubscriptionOffset;
import com.aliyun.datahub.client.model.TupleRecordData;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

class ConsumerThread
extends Thread {
    private int maxRetry;
    private String shardId;
    private String subId;
    private RecordSchema schema;
    private SubscriptionOffset subscriptionOffset;
    private String cursor;
    private DatahubClient datahubClient;

    private void init() {
        this.cursor = "";
        if (this.subscriptionOffset.getSequence() < 0L) {
            this.cursor = this.datahubClient.getCursor("", "", this.shardId, CursorType.OLDEST).getCursor();
        } else {
            long nextSequence = this.subscriptionOffset.getSequence() + 1L;
            try {
                this.cursor = this.datahubClient.getCursor("", "", this.shardId, CursorType.SEQUENCE, nextSequence).getCursor();
            }
            catch (SeekOutOfRangeException e) {
                this.cursor = this.datahubClient.getCursor("", "", this.shardId, CursorType.OLDEST).getCursor();
            }
            catch (DatahubClientException e) {
                e.printStackTrace();
                throw e;
            }
        }
    }

    public ConsumerThread(int maxRetry, String shardId, String subId, RecordSchema schema, SubscriptionOffset subscriptionOffset, DatahubClient datahubClient) {
        this.maxRetry = maxRetry;
        this.shardId = shardId;
        this.subId = subId;
        this.schema = schema;
        this.subscriptionOffset = subscriptionOffset;
        this.datahubClient = datahubClient;
        this.init();
    }

    @Override
    public void run() {
        long recordCount = 0L;
        int fetchNum = 1000;
        int retryNum = 0;
        int commitNum = 2000;
        int sleepTimeMs = 5000;
        while (retryNum < this.maxRetry) {
            try {
                GetRecordsResult result = this.datahubClient.getRecords("", "", this.shardId, this.schema, this.cursor, fetchNum);
                if (result.getRecordCount() <= 0) {
                    System.out.printf("no data, sleep %d second\n", sleepTimeMs / 1000);
                    Thread.sleep(sleepTimeMs);
                    continue;
                }
                for (RecordEntry recordEntry : result.getRecords()) {
                    TupleRecordData data = (TupleRecordData)recordEntry.getRecordData();
                    String res = "string_field:" + data.getField("string_field") + "\tstring_field:" + data.getField("string_field");
                    System.out.println(Thread.currentThread() + "\t" + res);
                    if (++recordCount % (long)commitNum != 0L) continue;
                    this.subscriptionOffset.setSequence(recordEntry.getSequence());
                    this.subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
                    HashMap<String, SubscriptionOffset> offsetMap = new HashMap<String, SubscriptionOffset>();
                    offsetMap.put(this.shardId, this.subscriptionOffset);
                    this.datahubClient.commitSubscriptionOffset("", "", this.subId, offsetMap);
                    System.out.println(Thread.currentThread() + " commit offset successful");
                }
                this.cursor = result.getNextCursor();
                retryNum = 0;
            }
            catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
                e.printStackTrace();
                throw e;
            }
            catch (SubscriptionOffsetResetException e) {
                List<String> shardIds = Arrays.asList(this.shardId);
                SubscriptionOffset offset = this.datahubClient.getSubscriptionOffset("", "", this.subId, shardIds).getOffsets().get(this.shardId);
                this.subscriptionOffset.setVersionId(offset.getVersionId());
                this.cursor = null;
                CursorType type = CursorType.SEQUENCE;
                while (this.cursor == null) {
                    if (type == CursorType.SEQUENCE) {
                        try {
                            long nextSequence = offset.getSequence() + 1L;
                            this.cursor = this.datahubClient.getCursor("", "", this.shardId, CursorType.SEQUENCE, nextSequence).getCursor();
                        }
                        catch (DatahubClientException exception) {
                            type = CursorType.SYSTEM_TIME;
                        }
                        continue;
                    }
                    if (type == CursorType.SYSTEM_TIME) {
                        try {
                            this.cursor = this.datahubClient.getCursor("", "", this.shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
                        }
                        catch (DatahubClientException exception) {
                            type = CursorType.OLDEST;
                        }
                        continue;
                    }
                    try {
                        this.cursor = this.datahubClient.getCursor("", "", this.shardId, CursorType.OLDEST).getCursor();
                    }
                    catch (DatahubClientException exception) {
                        System.exit(1);
                    }
                }
            }
            catch (LimitExceededException e) {
                e.printStackTrace();
                ++retryNum;
            }
            catch (DatahubClientException e) {
                e.printStackTrace();
                ++retryNum;
            }
            catch (Exception e) {
                e.printStackTrace();
                System.exit(-1);
            }
        }
    }
}

