import { Observable, Subject } from 'rxjs';
import { groupBy, mergeMap, share, tap, throttleTime } from 'rxjs/operators';

import { EVENT_THROTTLE_TIME_IN_MS } from './constants';
import { logger } from './logger';
import { Envelope } from './types';

const globalThisWithMessageBus: typeof globalThis & {
  eventInput$?: Subject<Envelope>;
} = globalThis;

/** Ensuring the bus is the same object for all consumers, even if imported with
 * different context or lib version */
if (!globalThisWithMessageBus.eventInput$) {
  globalThisWithMessageBus.eventInput$ = new Subject<Envelope>();
}

/** Subject in which every published message envelope will be nexted */
export const { eventInput$ } = globalThisWithMessageBus;

// todo? using this will force the events to be async. The issue is that all
// tests have to be as well. We should find a way to have a sync scheduler in
// the tests and keep the scheduler async in production export const
// debouncedEventInput$ = scheduled(eventInput$, asyncScheduler)

/** Observable yielding published messages envelopes but throttled when of the
 * same content (channel topic and payload) */
export const debouncedEventInput$: Observable<Envelope> = eventInput$.pipe(
  // From https://stackoverflow.com/questions/51218124/how-to-do-distinct-throttle-in-rxjs
  // Group together envelopes with the same content
  groupBy((envelope) => {
    try {
      return `${envelope.channel}${envelope.topic}${JSON.stringify(
        envelope.payload,
      )}`;
    } catch {
      return `${envelope.channel}${envelope.topic}`;
    }
  }),
  // Throttle those duplicate messages
  mergeMap((grouped) => grouped.pipe(throttleTime(EVENT_THROTTLE_TIME_IN_MS))),
  // Log incoming messages
  tap((envelope) => {
    logger.debug(
      `[Message][${envelope.channel}] Topic: ${envelope.topic} Payload:`,
      envelope.payload,
      'Envelope:',
      envelope,
      envelope.payload,
    );
  }),
  // Ensure the same subscription is shared between all subscribers
  share(),
);
