import type { ChatMessage, MessageGenerated } from "./types";
import { createMessage } from "./utils/createMessages";
import { logger } from "@hapara/logger";
import { CentrifugeClient } from "./CentrifugeClient";

type ConnectionStatus = "disconnected" | "connecting" | "connected";

/**
 * This class provides an agnostic messaging interface/facade to shield
 * consumers from changes to the underlying RTM provider. Centrifugo is the 4th
 * iteration (following Agora, Stream, and PubNub). It manages real-time
 * communications like subscriptions, presence, and message handling without
 * requiring downstream changes if we switch to another provider.
 */
export class Messaging {
  private static instance: Messaging | null = null;
  private client: CentrifugeClient;

  private commonOnlinePresence = {
    type: "presence",
    body: "online",
    channelNamespace: "chat",
  } as const;

  constructor() {
    this.client = CentrifugeClient.getInstance();
  }

  public static getInstance() {
    if (!Messaging.instance) {
      Messaging.instance = new Messaging();
    }

    return Messaging.instance;
  }

  private handlers: {
    onReceive: ((message: ChatMessage) => void) | null;
    onConnectionStatus: ((state: ConnectionStatus) => void) | null;
  } = {
    onReceive: null,
    onConnectionStatus: null,
  };

  public onReceive(callback: (message: ChatMessage) => void): void {
    this.handlers.onReceive = callback;
  }

  public onConnectionStatus(callback: (state: ConnectionStatus) => void): void {
    this.handlers.onConnectionStatus = callback;
  }

  public setToken(token: string): void {
    this.client.setToken(token);
  }

  public getSubscribedChannels() {
    return Object.values(this.client.getSubscriptions()).map(
      (channel) => channel.channel
    );
  }

  public disconnect() {
    this.client.disconnect();
  }

  public async connect({
    channels,
    userId,
  }: {
    channels: string[];
    token: string;
    userId: string;
  }): Promise<void> {
    logger("messaging", `current user ${userId.slice(-3)}`);
    if (this.client.isDisconnected()) {
      this.client.connect();
      this.client.on("state", (stateContext) => {
        this.handlers.onConnectionStatus?.(stateContext.newState);
      });
      this.client.on("connected", () => {
        channels.forEach((channel) => {
          this.subscribe(channel, userId);
        });
      });
    } else {
      channels.forEach(async (channel) => {
        try {
          const presentUsers = await this.client.getPresence(channel);
          presentUsers.forEach((user) => {
            this.handlers.onReceive?.(
              createMessage({
                ...this.commonOnlinePresence,
                channelId: channel,
                publisherId: user.user,
              })
            );
          });
        } catch {
          logger("messaging", `error getting presence for ${channel}`);
        }

        await this.send(
          createMessage({
            ...this.commonOnlinePresence,
            channelId: channel,
            publisherId: userId,
          })
        );
      });
    }
  }

  private subscribe(channel: string, userId: string): void {
    if (!this.client.isSubscribed(channel)) {
      try {
        const subscription = this.client
          .newSubscription(channel, {
            recoverable: true,
          })
          .on("publication", (publication) => {
            const message = publication.data as ChatMessage | undefined;
            if (
              message?.type === "presence" &&
              message.publisherId === userId
            ) {
              // Ignore presence messages from self
            } else {
              this.handlers.onReceive?.(publication.data);
            }
          })
          .on("leave", async (subscriptionLeave) => {
            if (userId !== subscriptionLeave.info.user) {
              this.handlers.onReceive?.(
                createMessage({
                  channelId: channel,
                  type: "presence",
                  body: "offline",
                  channelNamespace: "chat",
                  publisherId: subscriptionLeave.info.user,
                })
              );
            } else {
              if (
                userId === subscriptionLeave.info.user &&
                subscriptionLeave.info.client !== this.client.id
              ) {
                await this.send(
                  createMessage({
                    ...this.commonOnlinePresence,
                    channelId: channel,
                    publisherId: userId,
                  })
                );
              }
            }
          })
          .on("join", async (subscriptionJoin) => {
            if (userId !== subscriptionJoin.info.user) {
              this.handlers.onReceive?.(
                createMessage({
                  ...this.commonOnlinePresence,
                  channelId: channel,
                  publisherId: subscriptionJoin.info.user,
                })
              );
              await this.send(
                createMessage({
                  ...this.commonOnlinePresence,
                  channelId: channel,
                  publisherId: userId,
                })
              );
            }
          });
        subscription.subscribe();
      } catch (error) {
        logger("messaging", "error subscribing to channel", error);
      }
    }
  }

  public async send(message: ChatMessage) {
    const result = await this.client.publish(message.channelId, message);
    return { ok: result.ok };
  }

  public resubscribe(): void {
    this.client.resubscribe();
  }
}
