import type { MarlinOptions } from './request-types';
import type { BatchWriter } from '@zg-rentals/util';
import { ThrottledBatchWriter } from '@zg-rentals/util';
import { postMarlin } from './marlin-request';

import type {
  MarlinJsCountBatchRequestDto,
  MarlinJsGaugeBatchRequestDto,
  MarlinJsMeasurementBatchRequestDto,
} from '@zg-rentals/ts-marlin-client';
import { getGlobalLogger } from '@zg-rentals/logger-base';

export class Marlin {
  private readonly counts: ThrottledBatchWriter<MarlinJsCountBatchRequestDto, void>;
  private readonly gauges: BatchWriter<MarlinJsGaugeBatchRequestDto, void>;
  private readonly measures: BatchWriter<MarlinJsMeasurementBatchRequestDto, void>;

  constructor(
    public readonly options: MarlinOptions,
    intervalMs: number,
    private readonly bucketMs = 5_000,
  ) {
    this.options.api ||= 'https://marlin.hotpads.com';

    const onError = (variant: string) => (reason: unknown) => {
      getGlobalLogger()?.error(reason, `marlin ${variant} flush error`);
    };

    this.counts = new ThrottledBatchWriter(
      async (batch: Array<MarlinJsCountBatchRequestDto>) => {
        const merged = batch.reduce<Record<string, Record<string, MarlinJsCountBatchRequestDto>>>((counts, count) => {
          const { serverName = '', serviceName = '' } = count;

          counts[serverName] ??= {};

          counts[serverName][serviceName] ??= {
            ...count,
            serverName,
            serviceName,
            countStats: {},
          };

          const bucket = counts[serverName][serviceName];

          Object.entries(count.countStats).forEach(([ms, countsForMs]) => {
            const numberMs = Number(ms);
            const clampedMs = numberMs - (numberMs % this.bucketMs);

            bucket.countStats[clampedMs] ??= {};

            Object.entries(countsForMs).forEach(([name, { sum }]) => {
              bucket.countStats[clampedMs][name] ??= { sum: 0 };

              bucket.countStats[clampedMs][name].sum += Math.round(sum);
            });
          });

          return counts;
        }, {});

        for (const [serverName, serverCounts] of Object.entries(merged)) {
          for (const [serviceName, serviceCounts] of Object.entries(serverCounts)) {
            await postMarlin(
              'count',
              {
                ...serviceCounts,
                serverName,
                serviceName,
              },
              this.options,
            );
          }
        }
      },
      intervalMs,
      onError('counts'),
    );

    this.gauges = new ThrottledBatchWriter(
      async (batch: Array<MarlinJsGaugeBatchRequestDto>) => {
        const merged = batch.reduce<Record<string, Record<string, MarlinJsGaugeBatchRequestDto>>>((gauges, gauge) => {
          const { serverName = '', serviceName = '' } = gauge;

          gauges[serverName] ??= {};

          gauges[serverName][serviceName] ??= {
            ...gauge,
            serverName,
            serviceName,
            gauges: {},
          };

          const bucket = gauges[serverName][serviceName];

          Object.entries(gauge.gauges).forEach(([ms, gaugesForMs]) => {
            const numberMs = Number(ms);
            const clampedMs = numberMs - (numberMs % this.bucketMs);

            bucket.gauges[clampedMs] ??= {};

            Object.entries(gaugesForMs).forEach(([name, { sum, min, max, count }]) => {
              if (bucket.gauges[clampedMs][name]) {
                bucket.gauges[clampedMs][name].count += count;
                bucket.gauges[clampedMs][name].sum += Math.round(sum);
                bucket.gauges[clampedMs][name].min = Math.min(bucket.gauges[clampedMs][name].min, Math.round(min));
                bucket.gauges[clampedMs][name].max = Math.max(bucket.gauges[clampedMs][name].max, Math.round(max));
              } else {
                bucket.gauges[clampedMs][name] = {
                  count,
                  sum: Math.round(sum),
                  min: Math.round(min),
                  max: Math.round(max),
                };
              }
            });
          });

          return gauges;
        }, {});

        for (const [serverName, serverGauges] of Object.entries(merged)) {
          for (const [serviceName, serviceGauges] of Object.entries(serverGauges)) {
            await postMarlin(
              'gauge',
              {
                ...serviceGauges,
                serverName,
                serviceName,
              },
              this.options,
            );
          }
        }
      },
      intervalMs,
      onError('gauges'),
    );

    this.measures = new ThrottledBatchWriter(
      async (batch: Array<MarlinJsMeasurementBatchRequestDto>) => {
        const merged = batch.reduce<Record<string, Record<string, MarlinJsMeasurementBatchRequestDto>>>(
          (measurements, measurement) => {
            const { serverName = '', serviceName = '' } = measurement;

            measurements[serverName] ??= {};

            measurements[serverName][serviceName] ??= {
              ...measurement,
              serverName,
              serviceName,
              measurements: {},
            };

            const bucket = measurements[serverName][serviceName];

            Object.entries(measurement.measurements).forEach(([name, measurementsForName]) => {
              bucket.measurements[name] ??= [];

              measurementsForName.forEach((measure) => {
                const existing = bucket.measurements[name].find((point) => point.timeMs === measure.timeMs);

                if (existing) {
                  existing.value += measure.value;
                } else {
                  bucket.measurements[name].push({ ...measure });
                }
              });
            });

            return measurements;
          },
          {},
        );

        for (const [serverName, serverMeasurements] of Object.entries(merged)) {
          for (const [serviceName, serviceMeasurements] of Object.entries(serverMeasurements)) {
            await postMarlin(
              'measure',
              {
                ...serviceMeasurements,
                serverName,
                serviceName,
              },
              this.options,
            );
          }
        }
      },
      intervalMs,
      onError('measures'),
    );
  }

  public async sendCounts(counts: MarlinJsCountBatchRequestDto) {
    const isInvalid = Object.values(counts.countStats).some((countsForMs) =>
      Object.values(countsForMs).some((count) => !Number.isFinite(count.sum)),
    );

    if (isInvalid) {
      return this.options.logger?.warn(`Invalid counts: ${JSON.stringify(counts)}`);
    }

    return this.counts.write(counts);
  }

  public async sendGauges(gauges: MarlinJsGaugeBatchRequestDto) {
    const isInvalid = Object.values(gauges.gauges).some((gaugesForMs) =>
      Object.values(gaugesForMs).some((gauge) => Object.values(gauge).some((num) => !Number.isFinite(num))),
    );

    if (isInvalid) {
      return this.options.logger?.warn(`Invalid gauges: ${JSON.stringify(gauges)}`);
    }

    return this.gauges.write(gauges);
  }

  public async sendMeasure(measures: MarlinJsMeasurementBatchRequestDto) {
    const isInvalid = Object.values(measures.measurements).some((measurementsForMs) =>
      Object.values(measurementsForMs).some((measurement) =>
        Object.values(measurement).some((num) => !Number.isFinite(num)),
      ),
    );

    if (isInvalid) {
      return this.options.logger?.warn(`Invalid measures: ${JSON.stringify(measures)}`);
    }

    return this.measures.write(measures);
  }

  public async sendMeasurementWithGauges(measures: MarlinJsMeasurementBatchRequestDto) {
    const gaugePromise = this.sendGauges({
      ...measures,
      gauges: Object.entries(measures.measurements).reduce(
        (acc, [metricName, currMeasures]) => {
          currMeasures.forEach(({ timeMs, value: amount }) => {
            acc[timeMs] ||= {};
            if (acc[timeMs][metricName]) {
              acc[timeMs][metricName].sum += amount;
              acc[timeMs][metricName].count += 1;
              acc[timeMs][metricName].min = Math.min(acc[timeMs][metricName].min, amount);
              acc[timeMs][metricName].max = Math.max(acc[timeMs][metricName].max, amount);
            } else {
              acc[timeMs][metricName] = {
                sum: amount,
                count: 1,
                min: amount,
                max: amount,
              };
            }
          });
          return acc;
        },
        {} as MarlinJsGaugeBatchRequestDto['gauges'],
      ),
    });

    return Promise.all([gaugePromise, this.sendMeasure(measures)]);
  }
}
