"""
 Copyright (c) 2023, salesforce.com, inc.
 All rights reserved.
 SPDX-License-Identifier: BSD-3-Clause
 For full license text, see the LICENSE_Lavis file in the repo root or https://opensource.org/licenses/BSD-3-Clause
"""
import contextlib
import os
import logging

import torch
import torch.nn as nn

from .Qformer import BertConfig, BertLMHeadModel
from .eva_vit import create_eva_vit_g
from transformers import BertTokenizer


class Blip2Base(nn.Module):
    def __init__(self):
        super().__init__()

    @classmethod
    def init_tokenizer(cls):
        tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
        tokenizer.add_special_tokens({"bos_token": "[DEC]"})
        return tokenizer
    
    @property
    def device(self):
        return list(self.parameters())[0].device

    def maybe_autocast(self, dtype=torch.float16):
        # if on cpu, don't use autocast
        # if on gpu, use autocast with dtype if provided, otherwise use torch.float16
        enable_autocast = self.device != torch.device("cpu")

        if enable_autocast:
            return torch.cuda.amp.autocast(dtype=dtype)
        else:
            return contextlib.nullcontext()

    @classmethod
    def init_Qformer(
        cls, 
        num_query_token, vision_width, 
        qformer_hidden_dropout_prob=0.,
        qformer_attention_probs_dropout_prob=0.,
        qformer_drop_path_rate=0.,
    ):
        encoder_config = BertConfig.from_pretrained("bert-base-uncased")
        encoder_config.encoder_width = vision_width
        # insert cross-attention layer every other block
        encoder_config.add_cross_attention = True
        encoder_config.cross_attention_freq = 2
        encoder_config.query_length = num_query_token
        encoder_config.hidden_dropout_prob = qformer_hidden_dropout_prob
        encoder_config.attention_probs_dropout_prob = qformer_attention_probs_dropout_prob
        encoder_config.drop_path_list = [x.item() for x in torch.linspace(0, qformer_drop_path_rate, encoder_config.num_hidden_layers)]
        print(f"Drop_path:{encoder_config.drop_path_list}")
        print(encoder_config)
        Qformer = BertLMHeadModel(config=encoder_config)
        query_tokens = nn.Parameter(
            torch.zeros(1, num_query_token, encoder_config.hidden_size)
        )
        query_tokens.data.normal_(mean=0.0, std=encoder_config.initializer_range)
        return Qformer, query_tokens

    @classmethod
    def init_vision_encoder(
        cls, 
        model_name, img_size, drop_path_rate, 
        use_grad_checkpoint, precision, vit_model_path,
        temporal_downsample=True,
        no_lmhra=False, 
        double_lmhra=False,
        lmhra_reduction=2.0, 
        gmhra_layers=8, 
        gmhra_drop_path_rate=0.,
        gmhra_dropout=0.5, 
    ):
        assert model_name == "eva_clip_g", "vit model must be eva_clip_g for current version of VideoChat"
        visual_encoder = create_eva_vit_g(
            img_size, drop_path_rate, 
            use_grad_checkpoint, precision, vit_model_path,
            temporal_downsample=temporal_downsample,
            no_lmhra=no_lmhra, 
            double_lmhra=double_lmhra,
            lmhra_reduction=lmhra_reduction, 
            gmhra_layers=gmhra_layers, 
            gmhra_drop_path_rate=gmhra_drop_path_rate,
            gmhra_dropout=gmhra_dropout, 
        )

        ln_vision = LayerNorm(visual_encoder.num_features)
        return visual_encoder, ln_vision

    def load_from_pretrained(self, model_path):
        if model_path is not None and os.path.isfile(model_path):
            checkpoint = torch.load(model_path, map_location="cpu")
        else:
            raise RuntimeError("checkpoint url or path is invalid")

        state_dict = checkpoint["model"]

        msg = self.load_state_dict(state_dict, strict=False)

        print(f"Load QFormer from {model_path}")
        print(msg)

        return msg


def disabled_train(self, mode=True):
    """Overwrite model.train with this function to make sure train/eval mode
    does not change anymore."""
    return self


class LayerNorm(nn.LayerNorm):
    """Subclass torch's LayerNorm to handle fp16."""

    def forward(self, x: torch.Tensor):
        orig_type = x.dtype
        ret = super().forward(x.type(torch.float32))
        return ret.type(orig_type)