import AWS from 'aws-sdk';
import { FilteredLogEvents } from 'aws-sdk/clients/cloudwatchlogs';

import { getAwsCloudWatchConfig } from '../aws/config';

const cwCallBack =
  <T>(res: (r: T) => void, rej: (r: AWS.AWSError) => void) =>
  (err: AWS.AWSError, result: T) => {
    if (err) {
      rej(err);
      return;
    }
    res(result);
  };

// eslint-disable-next-line import/prefer-default-export
export async function getCloudWatchLogs(
  {
    logGroupName,
    logStreamNamePrefix,
    start = new Date('2020-07-01T00:00:00Z').getTime(),
    end = Date.now(),
  }: {
    logGroupName: string;
    logStreamNamePrefix: string;
    start?: number;
    end?: number;
  },
  tickCallback?: (p: number) => void,
  defaultConfig:
    | Awaited<ReturnType<typeof getAwsCloudWatchConfig>>
    | undefined = undefined,
): Promise<FilteredLogEvents> {
  const config = defaultConfig || (await getAwsCloudWatchConfig());
  const cw = new AWS.CloudWatchLogs(config);
  // Monkey-patching. TODO: Improve ?
  // Context: we use API Gateway to proxy cw-logs to enable multiple endpoints (and break 6 requests limit)
  if (config.endpoint) (cw as any).getSigningName = () => 'execute-api'; // Credential should be scoped to correct service: 'execute-api'

  let rqCount = 0;
  const rqStart = Date.now();
  let rqAvgDur = 0; // millisec
  const WORKERS = 16; // using http2 through apigw proxy, >6 supported
  const CHUNKS = 1.5 * WORKERS;
  const TARGET_RPS = 5 * 1.0; // (not) pushing the limit (x2 still works sometimes)

  let progress = 0;
  let activeLanes = 0;
  let total = 0;

  const dur = end - start;
  // build input array, TBC chunks
  const input = [...new Array(CHUNKS)]
    .map((_, i, { length: l }) => ({
      startTime: start + (i * dur) / l,
      endTime: start + ((i + 1) * dur) / l + 1,
    }))
    .entries();

  // console.log(
  //   Array.from(inputGenerator()).map(({ startTime, endTime }) => [
  //     new Date(startTime),
  //     new Date(endTime),
  //   ]),
  // );

  const progressTick = (cprogress: number, ctotal: number, rps: number) => {
    console.info(
      `${ctotal} (${Math.round(cprogress * 100)}%), ${
        Math.round(10 * rps) / 10
      } rps`,
    );
    if (tickCallback) {
      tickCallback(cprogress);
    }
  };

  const update = (rps: number) => {
    progressTick(progress / CHUNKS, total, rps);
  };

  const fetchChunk = async ({
    startTime,
    endTime,
  }: // lane,
  {
    startTime: number;
    endTime: number;
    // lane: number;
  }) => {
    let nextToken: string | undefined;
    const result: FilteredLogEvents = [];
    do {
      const params: AWS.CloudWatchLogs.FilterLogEventsRequest = {
        logGroupName,
        startTime,
        endTime,
        logStreamNamePrefix,
        nextToken,
      };
      const timeStart = Date.now();
      // eslint-disable-next-line no-await-in-loop
      const r = await new Promise<AWS.CloudWatchLogs.FilterLogEventsResponse>(
        (res, rej) => {
          cw.filterLogEvents(params, cwCallBack(res, rej));
        },
      );
      const timeEnd = Date.now();
      rqCount += 1;
      total += r.events?.length ?? 0;
      rqAvgDur +=
        (timeEnd - timeStart - rqAvgDur) / Math.min(rqCount, 2 * WORKERS);
      // calculate average target waitTime
      const waitTime =
        activeLanes * (1e3 / TARGET_RPS - rqAvgDur / activeLanes); // target 5 rq per sec
      // console.log(rqAvgDur, waitTime, activeLanes);
      nextToken = r.nextToken;
      result.push(...(r.events ?? []));
      // const d =
      //   r.events?.length && new Date(r.events[0].timestamp || 0).toISOString();
      // console.log(
      //   `${lane}/${WORKERS}`,
      //   d,
      //   `${r.events?.length}/${result.length}`,
      // );
      // eslint-disable-next-line no-await-in-loop
      await new Promise(r2 => {
        setTimeout(r2, waitTime);
      });
    } while (nextToken);
    // console.log(rqAvgDur);
    const rps = (rqCount / (Date.now() - rqStart)) * 1e3;
    progress += 1;
    update(rps);

    return result;
  };

  const chunkedData = await Promise.all(
    // (1) Each worker has its own lane (thread) ...
    [...new Array(WORKERS)].map(async () => {
      const result: FilteredLogEvents = [];
      activeLanes += 1;

      // (2) ... and all of them feed from the same source iterator of chunks ...
      let chunk = input.next();
      while (!chunk.done) {
        // eslint-disable-next-line no-await-in-loop
        result.push(...(await fetchChunk({ ...chunk.value[1] })));

        // (3) ... until it's empty
        chunk = input.next();
      }
      activeLanes -= 1;
      return result;
    }),
  );
  const data = ([] as FilteredLogEvents)
    .concat(...chunkedData)
    .sort((a, b) =>
      !a.timestamp || !b.timestamp ? 0 : a.timestamp - b.timestamp,
    );
  return data;
}
