from collections.abc import Sequence
from typing import Optional, Tuple, Union
from transformers import PretrainedConfig
import torch
import torch.utils.checkpoint
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from transformers.activations import ACT2FN
from transformers.modeling_outputs import (
    BaseModelOutput,
    MaskedLMOutput,
    SequenceClassifierOutput,
    TokenClassifierOutput,
)
from transformers.modeling_utils import PreTrainedModel
from .configuration_deprot import DeprotConfig
from transformers.models.roformer.modeling_roformer import RoFormerModel
from transformers.models.esm.modeling_esm import EsmModel
import torch.nn.functional as F


def build_relative_position(query_size, key_size, device):
    """
    Build relative position according to the query and key

    We assume the absolute position of query \\(P_q\\) is range from (0, query_size) and the absolute position of key
    \\(P_k\\) is range from (0, key_size), The relative positions from query to key is \\(R_{q \\rightarrow k} = P_q -
    P_k\\)

    Args:
        query_size (int): the length of query
        key_size (int): the length of key

    Return:
        `torch.LongTensor`: A tensor with shape [1, query_size, key_size]

    """

    q_ids = torch.arange(query_size, dtype=torch.long, device=device)
    k_ids = torch.arange(key_size, dtype=torch.long, device=device)
    rel_pos_ids = q_ids[:, None] - k_ids.view(1, -1).repeat(query_size, 1)
    rel_pos_ids = rel_pos_ids[:query_size, :]
    rel_pos_ids = rel_pos_ids.unsqueeze(0)
    return rel_pos_ids


@torch.jit.script
def c2p_dynamic_expand(c2p_pos, query_layer, relative_pos):
    return c2p_pos.expand(
        [
            query_layer.size(0),
            query_layer.size(1),
            query_layer.size(2),
            relative_pos.size(-1),
        ]
    )


@torch.jit.script
def p2c_dynamic_expand(c2p_pos, query_layer, key_layer):
    return c2p_pos.expand(
        [
            query_layer.size(0),
            query_layer.size(1),
            key_layer.size(-2),
            key_layer.size(-2),
        ]
    )


@torch.jit.script
def pos_dynamic_expand(pos_index, p2c_att, key_layer):
    return pos_index.expand(
        p2c_att.size()[:2] + (pos_index.size(-2), key_layer.size(-2))
    )


def rotate_half(x):
    x1, x2 = x.chunk(2, dim=-1)
    return torch.cat((-x2, x1), dim=-1)


def apply_rotary_pos_emb(x, cos, sin):
    cos = cos[:, :, : x.shape[-2], :]
    sin = sin[:, :, : x.shape[-2], :]

    return (x * cos) + (rotate_half(x) * sin)


class RotaryEmbedding(torch.nn.Module):
    """
    Rotary position embeddings based on those in
    [RoFormer](https://huggingface.co/docs/transformers/model_doc/roformer). Query and keys are transformed by rotation
    matrices which depend on their relative positions.
    """

    def __init__(self, dim: int):
        super().__init__()
        # Generate and save the inverse frequency buffer (non trainable)
        inv_freq = 1.0 / (
            10000 ** (torch.arange(0, dim, 2, dtype=torch.int64).float() / dim)
        )
        inv_freq = inv_freq
        self.register_buffer("inv_freq", inv_freq)

        self._seq_len_cached = None
        self._cos_cached = None
        self._sin_cached = None

    def _update_cos_sin_tables(self, x, seq_dimension=2):
        seq_len = x.shape[seq_dimension]

        # Reset the tables if the sequence length has changed,
        # or if we're on a new device (possibly due to tracing for instance)
        if seq_len != self._seq_len_cached or self._cos_cached.device != x.device:
            self._seq_len_cached = seq_len
            t = torch.arange(x.shape[seq_dimension], device=x.device).type_as(
                self.inv_freq
            )
            freqs = torch.outer(t, self.inv_freq)
            emb = torch.cat((freqs, freqs), dim=-1).to(x.device)

            self._cos_cached = emb.cos()[None, None, :, :]
            self._sin_cached = emb.sin()[None, None, :, :]

        return self._cos_cached, self._sin_cached

    def forward(
        self, q: torch.Tensor, k: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        self._cos_cached, self._sin_cached = self._update_cos_sin_tables(
            k, seq_dimension=-2
        )

        return (
            apply_rotary_pos_emb(q, self._cos_cached, self._sin_cached),
            apply_rotary_pos_emb(k, self._cos_cached, self._sin_cached),
        )


class MaskedConv1d(nn.Conv1d):
    """A masked 1-dimensional convolution layer.

    Takes the same arguments as torch.nn.Conv1D, except that the padding is set automatically.

         Shape:
            Input: (N, L, in_channels)
            input_mask: (N, L, 1), optional
            Output: (N, L, out_channels)
    """

    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        kernel_size: int,
        stride: int = 1,
        dilation: int = 1,
        groups: int = 1,
        bias: bool = True,
    ):
        """
        :param in_channels: input channels
        :param out_channels: output channels
        :param kernel_size: the kernel width
        :param stride: filter shift
        :param dilation: dilation factor
        :param groups: perform depth-wise convolutions
        :param bias: adds learnable bias to output
        """
        padding = dilation * (kernel_size - 1) // 2
        super().__init__(
            in_channels,
            out_channels,
            kernel_size,
            stride=stride,
            dilation=dilation,
            groups=groups,
            bias=bias,
            padding=padding,
        )

    def forward(self, x, input_mask=None):
        if input_mask is not None:
            x = x * input_mask
        return super().forward(x.transpose(1, 2)).transpose(1, 2)


class Attention1dPooling(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer = MaskedConv1d(config.hidden_size, 1, 1)

    def forward(self, x, input_mask=None):
        batch_szie = x.shape[0]
        attn = self.layer(x)
        attn = attn.view(batch_szie, -1)
        if input_mask is not None:
            attn = attn.masked_fill_(
                ~input_mask.view(batch_szie, -1).bool(), float("-inf")
            )
        attn = F.softmax(attn, dim=-1).view(batch_szie, -1, 1)
        out = (attn * x).sum(dim=1)
        return out


class MeanPooling(nn.Module):
    """Mean Pooling for sentence-level classification tasks."""

    def __init__(self):
        super().__init__()

    def forward(self, features, input_mask=None):
        if input_mask is not None:
            # Applying input_mask to zero out masked values
            masked_features = features * input_mask.unsqueeze(2)
            sum_features = torch.sum(masked_features, dim=1)
            mean_pooled_features = sum_features / input_mask.sum(dim=1, keepdim=True)
        else:
            mean_pooled_features = torch.mean(features, dim=1)
        return mean_pooled_features


class ContextPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        scale_hidden = getattr(config, "scale_hidden", 1)
        if config.pooling_head == "mean":
            self.mean_pooling = MeanPooling()
        elif config.pooling_head == "attention":
            self.mean_pooling = Attention1dPooling(config)
        self.dense = nn.Linear(
            config.pooler_hidden_size, scale_hidden * config.pooler_hidden_size
        )
        self.dropout = nn.Dropout(config.pooler_dropout)
        self.config = config

    def forward(self, hidden_states, input_mask=None):
        # We "pool" the model by simply taking the hidden state corresponding
        # to the first token.

        context_token = self.mean_pooling(hidden_states, input_mask)
        context_token = self.dropout(context_token)
        pooled_output = self.dense(context_token)
        pooled_output = torch.tanh(pooled_output)
        return pooled_output

    @property
    def output_dim(self):
        return self.config.hidden_size


class DeprotLayerNorm(nn.Module):
    """LayerNorm module in the TF style (epsilon inside the square root)."""

    def __init__(self, size, eps=1e-12):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(size))
        self.bias = nn.Parameter(torch.zeros(size))
        self.variance_epsilon = eps

    def forward(self, hidden_states):
        input_type = hidden_states.dtype
        hidden_states = hidden_states.float()
        mean = hidden_states.mean(-1, keepdim=True)
        variance = (hidden_states - mean).pow(2).mean(-1, keepdim=True)
        hidden_states = (hidden_states - mean) / torch.sqrt(
            variance + self.variance_epsilon
        )
        hidden_states = hidden_states.to(input_type)
        y = self.weight * hidden_states + self.bias
        return y


class DisentangledSelfAttention(nn.Module):

    def __init__(self, config: DeprotConfig):
        super().__init__()
        if config.hidden_size % config.num_attention_heads != 0:
            raise ValueError(
                f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention "
                f"heads ({config.num_attention_heads})"
            )
        self.num_attention_heads = config.num_attention_heads
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        # Q, K, V projection layers
        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        # AA->SS, AA->POS, SS->AA, POS->AA and AA->AA attention layers
        self.pos_att_type = (
            config.pos_att_type if config.pos_att_type is not None else []
        )

        self.relative_attention = getattr(config, "relative_attention", False)
        self.position_embedding_type = getattr(
            config, "position_embedding_type", "relative"
        )
        if self.position_embedding_type == "rotary":
            self.rotary_embeddings = RotaryEmbedding(dim=self.attention_head_size)
            if self.relative_attention:

                if "aa2ss" in self.pos_att_type:
                    self.ss_proj = nn.Linear(
                        config.hidden_size, self.all_head_size, bias=False
                    )

                if "ss2aa" in self.pos_att_type:
                    self.ss_q_proj = nn.Linear(config.hidden_size, self.all_head_size)

        elif self.position_embedding_type == "relative":
            if self.relative_attention:
                self.max_relative_positions = getattr(
                    config, "max_relative_positions", -1
                )
                if self.max_relative_positions < 1:
                    self.max_relative_positions = config.max_position_embeddings
                self.pos_dropout = nn.Dropout(config.hidden_dropout_prob)

                # amino acid to position
                if "aa2pos" in self.pos_att_type:
                    self.pos_proj = nn.Linear(
                        config.hidden_size, self.all_head_size, bias=False
                    )  # Key

                if "pos2aa" in self.pos_att_type:
                    self.pos_q_proj = nn.Linear(
                        config.hidden_size, self.all_head_size
                    )  # Query

                if "aa2ss" in self.pos_att_type:
                    self.ss_proj = nn.Linear(
                        config.hidden_size, self.all_head_size, bias=False
                    )

                if "ss2aa" in self.pos_att_type:
                    self.ss_q_proj = nn.Linear(config.hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

    def transpose_for_scores(self, x):
        # x [batch_size, seq_len, all_head_size]
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, -1)
        # x [batch_size, seq_len, num_attention_heads, attention_head_size]
        x = x.view(new_x_shape)
        # x [batch_size, num_attention_heads, seq_len, attention_head_size]
        return x.permute(0, 2, 1, 3)

    def forward(
        self,
        hidden_states,
        attention_mask,
        output_attentions=False,
        query_states=None,
        relative_pos=None,
        rel_embeddings=None,
        ss_hidden_states=None,
    ):
        query_layer = self.transpose_for_scores(self.query(hidden_states))
        key_layer = self.transpose_for_scores(self.key(hidden_states))
        value_layer = self.transpose_for_scores(self.value(hidden_states))

        if self.position_embedding_type == "rotary":
            query_layer, key_layer = self.rotary_embeddings(query_layer, key_layer)

        rel_att = None
        scale_factor = 1 + len(self.pos_att_type)
        scale = torch.sqrt(
            torch.tensor(query_layer.size(-1), dtype=torch.float) * scale_factor
        )
        query_layer = query_layer / scale.to(dtype=query_layer.dtype)

        # [batch_size, num_attention_heads, seq_len, seq_len]
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))

        if self.relative_attention:
            if self.position_embedding_type == "relative":
                rel_embeddings = self.pos_dropout(rel_embeddings)
            rel_att = self.disentangled_att_bias(
                query_layer,
                key_layer,
                relative_pos,
                rel_embeddings,
                scale_factor,
                ss_hidden_states,
            )

        if rel_att is not None:
            attention_scores = attention_scores + rel_att

        rmask = ~(attention_mask.to(torch.bool))
        attention_probs = attention_scores.masked_fill(rmask, float("-inf"))
        attention_probs = torch.softmax(attention_probs, -1)
        attention_probs = attention_probs.masked_fill(rmask, 0.0)
        # attention_probs = XSoftmax.apply(attention_scores, attention_mask, -1)
        attention_probs = self.dropout(attention_probs)

        context_layer = torch.matmul(attention_probs, value_layer)
        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_layer_shape = context_layer.size()[:-2] + (-1,)
        context_layer = context_layer.view(new_context_layer_shape)
        if output_attentions:
            return (context_layer, attention_probs)
        else:
            return context_layer

    def disentangled_att_bias(
        self,
        query_layer,
        key_layer,
        relative_pos,
        rel_embeddings,
        scale_factor,
        ss_hidden_states,
    ):
        if self.position_embedding_type == "relative":
            if relative_pos is None:
                q = query_layer.size(-2)
                relative_pos = build_relative_position(
                    q, key_layer.size(-2), query_layer.device
                )
            if relative_pos.dim() == 2:
                relative_pos = relative_pos.unsqueeze(0).unsqueeze(0)
            elif relative_pos.dim() == 3:
                relative_pos = relative_pos.unsqueeze(1)
            # bxhxqxk
            elif relative_pos.dim() != 4:
                raise ValueError(
                    f"Relative position ids must be of dim 2 or 3 or 4. {relative_pos.dim()}"
                )

            att_span = min(
                max(query_layer.size(-2), key_layer.size(-2)),
                self.max_relative_positions,
            )
            relative_pos = relative_pos.long().to(query_layer.device)
            rel_embeddings = rel_embeddings[
                self.max_relative_positions
                - att_span : self.max_relative_positions
                + att_span,
                :,
            ].unsqueeze(0)

            score = 0

            if "aa2pos" in self.pos_att_type:
                pos_key_layer = self.pos_proj(rel_embeddings)
                pos_key_layer = self.transpose_for_scores(pos_key_layer)
                aa2p_att = torch.matmul(query_layer, pos_key_layer.transpose(-1, -2))
                aa2p_pos = torch.clamp(relative_pos + att_span, 0, att_span * 2 - 1)
                aa2p_att = torch.gather(
                    aa2p_att,
                    dim=-1,
                    index=c2p_dynamic_expand(aa2p_pos, query_layer, relative_pos),
                )
                score += aa2p_att

            if "pos2aa" in self.pos_att_type:
                pos_query_layer = self.pos_q_proj(rel_embeddings)
                pos_query_layer = self.transpose_for_scores(pos_query_layer)
                pos_query_layer /= torch.sqrt(
                    torch.tensor(pos_query_layer.size(-1), dtype=torch.float)
                    * scale_factor
                )
                if query_layer.size(-2) != key_layer.size(-2):
                    r_pos = build_relative_position(
                        key_layer.size(-2), key_layer.size(-2), query_layer.device
                    )
                else:
                    r_pos = relative_pos
                p2aa_pos = torch.clamp(-r_pos + att_span, 0, att_span * 2 - 1)
                p2aa_att = torch.matmul(
                    key_layer,
                    pos_query_layer.transpose(-1, -2).to(dtype=key_layer.dtype),
                )
                p2aa_att = torch.gather(
                    p2aa_att,
                    dim=-1,
                    index=p2c_dynamic_expand(p2aa_pos, query_layer, key_layer),
                ).transpose(-1, -2)

                if query_layer.size(-2) != key_layer.size(-2):
                    pos_index = relative_pos[:, :, :, 0].unsqueeze(-1)
                    p2aa_att = torch.gather(
                        p2aa_att,
                        dim=-2,
                        index=pos_dynamic_expand(pos_index, p2aa_att, key_layer),
                    )
                score += p2aa_att

            # content -> structure
            if "aa2ss" in self.pos_att_type:
                assert ss_hidden_states is not None
                ss_key_layer = self.ss_proj(ss_hidden_states)
                ss_key_layer = self.transpose_for_scores(ss_key_layer)
                # [batch_size, num_attention_heads, seq_len, seq_len]
                aa2ss_att = torch.matmul(query_layer, ss_key_layer.transpose(-1, -2))
                score += aa2ss_att

            if "ss2aa" in self.pos_att_type:
                assert ss_hidden_states is not None
                ss_query_layer = self.ss_q_proj(ss_hidden_states)
                ss_query_layer = self.transpose_for_scores(ss_query_layer)
                ss_query_layer /= torch.sqrt(
                    torch.tensor(ss_query_layer.size(-1), dtype=torch.float)
                    * scale_factor
                )
                ss2aa_att = torch.matmul(
                    key_layer, query_layer.transpose(-1, -2).to(dtype=key_layer.dtype)
                )
                score += ss2aa_att
            return score
        elif self.position_embedding_type == "rotary":
            score = 0
            if "aa2ss" in self.pos_att_type:
                assert ss_hidden_states is not None
                ss_key_layer = self.ss_proj(ss_hidden_states)
                ss_key_layer = self.transpose_for_scores(ss_key_layer)
                aa2ss_att = torch.matmul(query_layer, ss_key_layer.transpose(-1, -2))
                score += aa2ss_att

            if "ss2aa" in self.pos_att_type:
                assert ss_hidden_states is not None
                ss_query_layer = self.ss_q_proj(ss_hidden_states)
                ss_query_layer = self.transpose_for_scores(ss_query_layer)
                ss_query_layer /= torch.sqrt(
                    torch.tensor(ss_query_layer.size(-1), dtype=torch.float)
                    * scale_factor
                )
                ss2aa_att = torch.matmul(
                    key_layer, query_layer.transpose(-1, -2).to(dtype=key_layer.dtype)
                )
                score += ss2aa_att
            return score


class DeprotSelfOutput(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.LayerNorm = DeprotLayerNorm(config.hidden_size, config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, hidden_states, input_tensor):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + input_tensor)
        return hidden_states


class DeprotAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.self = DisentangledSelfAttention(config)
        self.output = DeprotSelfOutput(config)
        self.config = config

    def forward(
        self,
        hidden_states,
        attention_mask,
        output_attentions=False,
        query_states=None,
        relative_pos=None,
        rel_embeddings=None,
        ss_hidden_states=None,
    ):
        self_output = self.self(
            hidden_states,
            attention_mask,
            output_attentions,
            query_states=query_states,
            relative_pos=relative_pos,
            rel_embeddings=rel_embeddings,
            ss_hidden_states=ss_hidden_states,
        )
        if output_attentions:
            self_output, att_matrix = self_output
        if query_states is None:
            query_states = hidden_states
        attention_output = self.output(self_output, query_states)

        if output_attentions:
            return (attention_output, att_matrix)
        else:
            return attention_output


class DeprotIntermediate(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
        if isinstance(config.hidden_act, str):
            self.intermediate_act_fn = ACT2FN[config.hidden_act]
        else:
            self.intermediate_act_fn = config.hidden_act

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        hidden_states = self.dense(hidden_states)
        hidden_states = self.intermediate_act_fn(hidden_states)
        return hidden_states


class DeprotOutput(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
        self.LayerNorm = DeprotLayerNorm(config.hidden_size, config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.config = config

    def forward(self, hidden_states, input_tensor):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + input_tensor)
        return hidden_states


class DeprotLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.attention = DeprotAttention(config)
        self.intermediate = DeprotIntermediate(config)
        self.output = DeprotOutput(config)

    def forward(
        self,
        hidden_states,
        attention_mask,
        query_states=None,
        relative_pos=None,
        rel_embeddings=None,
        output_attentions=False,
        ss_hidden_states=None,
    ):
        attention_output = self.attention(
            hidden_states,
            attention_mask,
            output_attentions=output_attentions,
            query_states=query_states,
            relative_pos=relative_pos,
            rel_embeddings=rel_embeddings,
            ss_hidden_states=ss_hidden_states,
        )
        if output_attentions:
            attention_output, att_matrix = attention_output
        intermediate_output = self.intermediate(attention_output)
        layer_output = self.output(intermediate_output, attention_output)
        if output_attentions:
            return (layer_output, att_matrix)
        else:
            return layer_output


class DeprotEncoder(nn.Module):
    """Modified BertEncoder with relative position bias support"""

    def __init__(self, config):
        super().__init__()
        self.layer = nn.ModuleList(
            [DeprotLayer(config) for _ in range(config.num_hidden_layers)]
        )
        self.relative_attention = getattr(config, "relative_attention", False)
        if self.relative_attention:
            self.max_relative_positions = getattr(config, "max_relative_positions", -1)
            if self.max_relative_positions < 1:
                self.max_relative_positions = config.max_position_embeddings
            self.rel_embeddings = nn.Embedding(
                self.max_relative_positions * 2, config.hidden_size
            )
        self.gradient_checkpointing = False

    def get_rel_embedding(self):
        rel_embeddings = self.rel_embeddings.weight if self.relative_attention else None
        return rel_embeddings

    def get_attention_mask(self, attention_mask):
        if attention_mask.dim() <= 2:
            extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
            attention_mask = extended_attention_mask * extended_attention_mask.squeeze(
                -2
            ).unsqueeze(-1)
        elif attention_mask.dim() == 3:
            attention_mask = attention_mask.unsqueeze(1)

        return attention_mask

    def get_rel_pos(self, hidden_states, query_states=None, relative_pos=None):
        if self.relative_attention and relative_pos is None:
            q = (
                query_states.size(-2)
                if query_states is not None
                else hidden_states.size(-2)
            )
            relative_pos = build_relative_position(
                q, hidden_states.size(-2), hidden_states.device
            )
        return relative_pos

    def forward(
        self,
        hidden_states,
        attention_mask,
        output_hidden_states=True,
        output_attentions=False,
        query_states=None,
        relative_pos=None,
        ss_hidden_states=None,
        return_dict=True,
    ):
        attention_mask = self.get_attention_mask(attention_mask)
        relative_pos = self.get_rel_pos(hidden_states, query_states, relative_pos)

        all_hidden_states = () if output_hidden_states else None
        all_attentions = () if output_attentions else None

        if isinstance(hidden_states, Sequence):
            next_kv = hidden_states[0]
        else:
            next_kv = hidden_states
        rel_embeddings = self.get_rel_embedding()
        for i, layer_module in enumerate(self.layer):
            if output_hidden_states:
                all_hidden_states = all_hidden_states + (hidden_states,)

            if self.gradient_checkpointing and self.training:

                def create_custom_forward(module):
                    def custom_forward(*inputs):
                        return module(*inputs, output_attentions)

                    return custom_forward

                hidden_states = torch.utils.checkpoint.checkpoint(
                    create_custom_forward(layer_module),
                    next_kv,
                    attention_mask,
                    query_states,
                    relative_pos,
                    rel_embeddings,
                    ss_hidden_states,
                )
            else:
                hidden_states = layer_module(
                    next_kv,
                    attention_mask,
                    query_states=query_states,
                    relative_pos=relative_pos,
                    rel_embeddings=rel_embeddings,
                    output_attentions=output_attentions,
                    ss_hidden_states=ss_hidden_states,
                )

            if output_attentions:
                hidden_states, att_m = hidden_states

            if query_states is not None:
                query_states = hidden_states
                if isinstance(hidden_states, Sequence):
                    next_kv = hidden_states[i + 1] if i + 1 < len(self.layer) else None
            else:
                next_kv = hidden_states

            if output_attentions:
                all_attentions = all_attentions + (att_m,)

        if output_hidden_states:
            all_hidden_states = all_hidden_states + (hidden_states,)

        if not return_dict:
            return tuple(
                v
                for v in [hidden_states, all_hidden_states, all_attentions]
                if v is not None
            )
        return BaseModelOutput(
            last_hidden_state=hidden_states,
            hidden_states=all_hidden_states,
            attentions=all_attentions,
        )


class DeprotEmbeddings(nn.Module):
    """Construct the embeddings from word, position and token_type embeddings."""

    def __init__(self, config):
        super().__init__()
        pad_token_id = getattr(config, "pad_token_id", 0)
        self.embedding_size = getattr(config, "embedding_size", config.hidden_size)
        self.word_embeddings = nn.Embedding(
            config.vocab_size, self.embedding_size, padding_idx=pad_token_id
        )

        self.position_biased_input = getattr(config, "position_biased_input", False)
        if not self.position_biased_input:
            self.position_embeddings = None
        else:
            # assert getattr(config, "position_embedding_type", "relative") == "absolute"
            self.position_embeddings = nn.Embedding(
                config.max_position_embeddings, self.embedding_size
            )

        if config.type_vocab_size > 0:
            self.token_type_embeddings = nn.Embedding(
                config.type_vocab_size, self.embedding_size
            )

        if config.ss_vocab_size > 0:
            self.ss_embeddings = nn.Embedding(config.ss_vocab_size, self.embedding_size)
            self.ss_layer_norm = DeprotLayerNorm(
                config.hidden_size, config.layer_norm_eps
            )

        if self.embedding_size != config.hidden_size:
            self.embed_proj = nn.Linear(
                self.embedding_size, config.hidden_size, bias=False
            )
        self.LayerNorm = DeprotLayerNorm(config.hidden_size, config.layer_norm_eps)

        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.config = config

        # position_ids (1, len position emb) is contiguous in memory and exported when serialized
        if self.position_biased_input:
            self.register_buffer(
                "position_ids",
                torch.arange(config.max_position_embeddings).expand((1, -1)),
                persistent=False,
            )

    def forward(
        self,
        input_ids=None,
        ss_input_ids=None,
        token_type_ids=None,
        position_ids=None,
        mask=None,
        inputs_embeds=None,
    ):
        if input_ids is not None:
            input_shape = input_ids.size()
        else:
            input_shape = inputs_embeds.size()[:-1]

        seq_length = input_shape[1]

        if position_ids is None and self.position_biased_input:
            position_ids = self.position_ids[:, :seq_length]
            if seq_length > position_ids.size(1):
                zero_padding = (
                    torch.zeros(
                        (input_shape[0], seq_length - position_ids.size(1)),
                        dtype=torch.long,
                        device=position_ids.device,
                    )
                    + 2047
                )
                position_ids = torch.cat([position_ids, zero_padding], dim=1)

        if token_type_ids is None:
            token_type_ids = torch.zeros(
                input_shape, dtype=torch.long, device=self.position_ids.device
            )

        if inputs_embeds is None:
            if self.config.token_dropout:
                inputs_embeds = self.word_embeddings(input_ids)
                inputs_embeds.masked_fill_(
                    (input_ids == self.config.mask_token_id).unsqueeze(-1), 0.0
                )
                mask_ratio_train = self.config.mlm_probability * 0.8
                src_lengths = mask.sum(dim=-1)
                mask_ratio_observed = (input_ids == self.config.mask_token_id).sum(
                    -1
                ).to(inputs_embeds.dtype) / src_lengths
                inputs_embeds = (
                    inputs_embeds
                    * (1 - mask_ratio_train)
                    / (1 - mask_ratio_observed)[:, None, None]
                )
            else:
                inputs_embeds = self.word_embeddings(input_ids)

        if self.position_embeddings is not None and self.position_biased_input:
            position_embeddings = self.position_embeddings(position_ids.long())
        else:
            position_embeddings = torch.zeros_like(inputs_embeds)

        embeddings = inputs_embeds
        if self.position_biased_input:
            embeddings += position_embeddings
        if self.config.type_vocab_size > 0:
            token_type_embeddings = self.token_type_embeddings(token_type_ids)
            embeddings += token_type_embeddings

        if self.embedding_size != self.config.hidden_size:
            embeddings = self.embed_proj(embeddings)

        embeddings = self.LayerNorm(embeddings)

        if mask is not None:
            if mask.dim() != embeddings.dim():
                if mask.dim() == 4:
                    mask = mask.squeeze(1).squeeze(1)
                mask = mask.unsqueeze(2)
            mask = mask.to(embeddings.dtype)
            embeddings = embeddings * mask

        embeddings = self.dropout(embeddings)

        if self.config.ss_vocab_size > 0:
            ss_embeddings = self.ss_embeddings(ss_input_ids)
            ss_embeddings = self.ss_layer_norm(ss_embeddings)
            if mask is not None:
                if mask.dim() != ss_embeddings.dim():
                    if mask.dim() == 4:
                        mask = mask.squeeze(1).squeeze(1)
                    mask = mask.unsqueeze(2)
                mask = mask.to(ss_embeddings.dtype)
                ss_embeddings = ss_embeddings * mask
                ss_embeddings = self.dropout(ss_embeddings)
            return embeddings, ss_embeddings

        return embeddings, None


class DeprotPreTrainedModel(PreTrainedModel):
    """
    An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
    models.
    """

    config_class = DeprotConfig
    base_model_prefix = "deprot"
    _keys_to_ignore_on_load_unexpected = ["position_embeddings"]
    supports_gradient_checkpointing = True

    def _init_weights(self, module):
        """Initialize the weights."""
        if isinstance(module, nn.Linear):
            # Slightly different from the TF version which uses truncated_normal for initialization
            # cf https://github.com/pytorch/pytorch/pull/5617
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()

    def _set_gradient_checkpointing(self, module, value=False):
        if isinstance(module, DeprotEncoder):
            module.gradient_checkpointing = value


class DeprotModel(DeprotPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)

        self.embeddings = DeprotEmbeddings(config)
        self.encoder = DeprotEncoder(config)
        self.config = config
        # Initialize weights and apply final processing
        self.post_init()

    def get_input_embeddings(self):
        return self.embeddings.word_embeddings

    def set_input_embeddings(self, new_embeddings):
        self.embeddings.word_embeddings = new_embeddings

    def _prune_heads(self, heads_to_prune):
        """
        Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base
        class PreTrainedModel
        """
        raise NotImplementedError(
            "The prune function is not implemented in DeBERTa model."
        )

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        ss_input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, BaseModelOutput]:
        output_attentions = (
            output_attentions
            if output_attentions is not None
            else self.config.output_attentions
        )
        output_hidden_states = (
            output_hidden_states
            if output_hidden_states is not None
            else self.config.output_hidden_states
        )
        return_dict = (
            return_dict if return_dict is not None else self.config.use_return_dict
        )

        if input_ids is not None and inputs_embeds is not None:
            raise ValueError(
                "You cannot specify both input_ids and inputs_embeds at the same time"
            )
        elif input_ids is not None:
            self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask)
            input_shape = input_ids.size()
        elif inputs_embeds is not None:
            input_shape = inputs_embeds.size()[:-1]
        else:
            raise ValueError("You have to specify either input_ids or inputs_embeds")

        device = input_ids.device if input_ids is not None else inputs_embeds.device

        if attention_mask is None:
            attention_mask = torch.ones(input_shape, device=device)
        if token_type_ids is None:
            token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=device)

        embedding_output, ss_embeddings = self.embeddings(
            input_ids=input_ids,
            ss_input_ids=ss_input_ids,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            mask=attention_mask,
            inputs_embeds=inputs_embeds,
        )

        encoder_outputs = self.encoder(
            embedding_output,
            attention_mask,
            output_hidden_states=True,
            output_attentions=output_attentions,
            return_dict=return_dict,
            ss_hidden_states=ss_embeddings,
        )
        encoded_layers = encoder_outputs[1]

        sequence_output = encoded_layers[-1]

        if not return_dict:
            return (sequence_output,) + encoder_outputs[
                (1 if output_hidden_states else 2) :
            ]

        return BaseModelOutput(
            last_hidden_state=sequence_output,
            hidden_states=(
                encoder_outputs.hidden_states if output_hidden_states else None
            ),
            attentions=encoder_outputs.attentions,
        )


class DeprotPredictionHeadTransform(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embedding_size = getattr(config, "embedding_size", config.hidden_size)

        self.dense = nn.Linear(config.hidden_size, self.embedding_size)
        if isinstance(config.hidden_act, str):
            self.transform_act_fn = ACT2FN[config.hidden_act]
        else:
            self.transform_act_fn = config.hidden_act
        self.LayerNorm = nn.LayerNorm(self.embedding_size, eps=config.layer_norm_eps)

    def forward(self, hidden_states):
        hidden_states = self.dense(hidden_states)
        hidden_states = self.transform_act_fn(hidden_states)
        hidden_states = self.LayerNorm(hidden_states)
        return hidden_states


class DeprotLMPredictionHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.transform = DeprotPredictionHeadTransform(config)

        self.embedding_size = getattr(config, "embedding_size", config.hidden_size)
        # The output weights are the same as the input embeddings, but there is
        # an output-only bias for each token.
        self.decoder = nn.Linear(self.embedding_size, config.vocab_size, bias=False)

        # self.bias = nn.Parameter(torch.zeros(config.vocab_size))
        # Need a link between the two variables so that the bias is correctly resized with `resize_token_embeddings`
        # self.decoder.bias = self.bias

    def forward(self, hidden_states):
        hidden_states = self.transform(hidden_states)
        hidden_states = self.decoder(hidden_states)
        return hidden_states


class DeprotOnlyMLMHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.predictions = DeprotLMPredictionHead(config)

    def forward(self, sequence_output):
        prediction_scores = self.predictions(sequence_output)
        return prediction_scores


class DeprotPreTrainedModel(PreTrainedModel):
    """
    An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
    models.
    """

    config_class = DeprotConfig
    base_model_prefix = "deprot"
    _keys_to_ignore_on_load_unexpected = ["position_embeddings"]
    supports_gradient_checkpointing = True

    def _init_weights(self, module):
        """Initialize the weights."""
        if isinstance(module, nn.Linear):
            # Slightly different from the TF version which uses truncated_normal for initialization
            # cf https://github.com/pytorch/pytorch/pull/5617
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()

    def _set_gradient_checkpointing(self, module, value=False):
        if isinstance(module, DeprotEncoder):
            module.gradient_checkpointing = value


class DeprotForMaskedLM(DeprotPreTrainedModel):
    _tied_weights_keys = [
        "cls.predictions.decoder.weight",
        "cls.predictions.decoder.bias",
    ]

    def __init__(self, config):
        super().__init__(config)

        self.deprot = DeprotModel(config)
        self.cls = DeprotOnlyMLMHead(config)

        # Initialize weights and apply final processing
        self.post_init()

    def get_output_embeddings(self):
        return self.cls.predictions.decoder

    def set_output_embeddings(self, new_embeddings):
        self.cls.predictions.decoder = new_embeddings

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        ss_input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, MaskedLMOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ...,
            config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the
            loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`
        """

        return_dict = (
            return_dict if return_dict is not None else self.config.use_return_dict
        )

        outputs = self.deprot(
            input_ids,
            ss_input_ids=ss_input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]
        prediction_scores = self.cls(sequence_output)

        masked_lm_loss = None
        if labels is not None:
            loss_fct = CrossEntropyLoss()  # -100 index = padding token
            masked_lm_loss = loss_fct(
                prediction_scores.view(-1, self.config.vocab_size), labels.view(-1)
            )

        if not return_dict:
            output = (prediction_scores,) + outputs[1:]
            return (
                ((masked_lm_loss,) + output) if masked_lm_loss is not None else output
            )

        return MaskedLMOutput(
            loss=masked_lm_loss,
            logits=prediction_scores,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


class DeprotForSequenceClassification(DeprotPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)

        num_labels = getattr(config, "num_labels", 2)
        self.num_labels = num_labels
        self.scale_hidden = getattr(config, "scale_hidden", 1)
        self.deprot = DeprotModel(config)
        self.pooler = ContextPooler(config)
        output_dim = self.pooler.output_dim * self.scale_hidden

        self.classifier = nn.Linear(output_dim, num_labels)
        drop_out = getattr(config, "cls_dropout", None)
        drop_out = self.config.hidden_dropout_prob if drop_out is None else drop_out
        self.dropout = nn.Dropout(drop_out)

        # Initialize weights and apply final processing
        self.post_init()

    def get_input_embeddings(self):
        return self.deprot.get_input_embeddings()

    def set_input_embeddings(self, new_embeddings):
        self.deprot.set_input_embeddings(new_embeddings)

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        ss_input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = (
            return_dict if return_dict is not None else self.config.use_return_dict
        )

        outputs = self.deprot(
            input_ids,
            ss_input_ids=ss_input_ids,
            token_type_ids=token_type_ids,
            attention_mask=attention_mask,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        encoder_layer = outputs[0]
        pooled_output = self.pooler(encoder_layer, attention_mask)
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    # regression task
                    loss_fn = nn.MSELoss()
                    logits = logits.view(-1).to(labels.dtype)
                    loss = loss_fn(logits, labels.view(-1))
                elif labels.dim() == 1 or labels.size(-1) == 1:
                    label_index = (labels >= 0).nonzero()
                    labels = labels.long()
                    if label_index.size(0) > 0:
                        labeled_logits = torch.gather(
                            logits,
                            0,
                            label_index.expand(label_index.size(0), logits.size(1)),
                        )
                        labels = torch.gather(labels, 0, label_index.view(-1))
                        loss_fct = CrossEntropyLoss()
                        loss = loss_fct(
                            labeled_logits.view(-1, self.num_labels).float(),
                            labels.view(-1),
                        )
                    else:
                        loss = torch.tensor(0).to(logits)
                else:
                    log_softmax = nn.LogSoftmax(-1)
                    loss = -((log_softmax(logits) * labels).sum(-1)).mean()
            elif self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "binary_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits.squeeze(), labels.squeeze().to(logits.dtype))
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels.to(logits.dtype))
        if not return_dict:
            output = (logits,) + outputs[1:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


class DeprotForTokenClassification(DeprotPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels

        self.deprot = DeprotModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, TokenClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
        """
        return_dict = (
            return_dict if return_dict is not None else self.config.use_return_dict
        )

        outputs = self.deprot(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]

        sequence_output = self.dropout(sequence_output)
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,) + outputs[1:]
            return ((loss,) + output) if loss is not None else output

        return TokenClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


DeprotModel.register_for_auto_class("AutoModel")
DeprotForMaskedLM.register_for_auto_class("AutoModelForMaskedLM")
DeprotForSequenceClassification.register_for_auto_class(
    "AutoModelForSequenceClassification"
)
DeprotForTokenClassification.register_for_auto_class("AutoModelForTokenClassification")