File size: 7,690 Bytes
87e5c9c
 
 
 
77d5469
87e5c9c
8312087
77d5469
34de38e
77d5469
8312087
9d26661
8312087
77d5469
 
 
 
8312087
87e5c9c
471b053
9d26661
77d5469
 
 
 
8312087
 
 
 
 
 
 
 
87e5c9c
079d1ca
77d5469
34de38e
 
 
 
 
77d5469
 
 
 
 
079d1ca
 
87e5c9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3ea8fe3
87e5c9c
 
 
 
 
 
 
925dd67
7452863
 
 
925dd67
3ea8fe3
87e5c9c
 
 
 
 
 
3ea8fe3
87e5c9c
 
 
 
 
 
 
 
 
 
 
 
 
 
34de38e
079d1ca
e9ed1f2
 
 
471b053
e9ed1f2
471b053
 
 
 
e9ed1f2
471b053
 
 
 
77d5469
e9ed1f2
 
 
 
77d5469
e9ed1f2
471b053
e9ed1f2
471b053
e9ed1f2
 
 
471b053
 
e9ed1f2
471b053
e9ed1f2
471b053
e9ed1f2
 
 
 
471b053
e9ed1f2
471b053
e9ed1f2
471b053
 
 
77d5469
471b053
 
 
 
 
77d5469
471b053
 
 
03e9034
 
 
34de38e
 
 
03e9034
 
 
 
34de38e
77d5469
e219aa1
34de38e
 
e219aa1
34de38e
471b053
77d5469
471b053
 
 
 
 
77d5469
34de38e
 
 
03e9034
34de38e
 
03e9034
 
 
 
 
34de38e
 
 
 
e219aa1
03e9034
34de38e
03e9034
 
 
 
e219aa1
03e9034
 
34de38e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
    utils.py - Utility functions for the project.
"""

import logging
import re
import subprocess
from collections import defaultdict, deque
from datetime import datetime
from itertools import combinations, islice
from pathlib import Path
from typing import List

logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    level=logging.INFO,
)
import torch
from natsort import natsorted
from rapidfuzz import fuzz

# Define stopwords
STOPWORDS = set(
    "a about above after again against all am an and any are aren't as at be because been before being below between both but by can't cannot could couldn't did didn't do does doesn't doing don't down during each few for from further had hadn't has hasn't have haven't having he he'd he'll he's her here here's hers herself him himself his how how's i i'd i'll i'm i've if in into is isn't it it's its itself let's me more most mustn't my myself no nor not of off on once only or other ought our ours ourselves out over own same shan't she she'd she'll she's should shouldn't so some such than that that's the their theirs them themselves then there there's these they they'd they'll they're they've this those through to too under until up very was wasn't we we'd we'll we're we've were weren't what what's when when's where where's which while who who's whom why why's with won't would wouldn't you you'd you'll you're you've your yours yourself yourselves".split()
)


def validate_pytorch2(torch_version: str = None):
    torch_version = torch.__version__ if torch_version is None else torch_version

    pattern = r"^2\.\d+(\.\d+)*"

    return True if re.match(pattern, torch_version) else False


def get_timestamp(detailed=False) -> str:
    """
    get_timestamp - get a timestamp for the current time
    Returns:
        str, the timestamp
    """
    return (
        datetime.now().strftime("%b%d%Y_%H%M%S%f")
        if detailed
        else datetime.now().strftime("%b%d%Y_%H")
    )


def truncate_word_count(text, max_words=512):
    """
    truncate_word_count - a helper function for the gradio module
    Parameters
    ----------
    text : str, required, the text to be processed
    max_words : int, optional, the maximum number of words, default=512
    Returns
    -------
    dict, the text and whether it was truncated
    """
    # split on whitespace with regex
    words = re.split(r"\s+", text)
    processed = {}
    if len(words) > max_words:
        processed["was_truncated"] = True
        processed["truncated_text"] = " ".join(words[:max_words])
    else:
        processed["was_truncated"] = False
        processed["truncated_text"] = text
    return processed


def load_examples(src, filetypes=[".txt", ".pdf"]):
    """
    load_examples - a helper function for the gradio module to load examples
    Returns:
        list of str, the examples
    """
    src = Path(src)
    src.mkdir(exist_ok=True)

    pdf_url = (
        "https://www.dropbox.com/s/y92xy7o5qb88yij/all_you_need_is_attention.pdf?dl=1"
    )
    subprocess.run(["wget", pdf_url, "-O", src / "all_you_need_is_attention.pdf"])
    examples = [f for f in src.iterdir() if f.suffix in filetypes]
    examples = natsorted(examples)
    # load the examples into a list
    text_examples = []
    for example in examples:
        with open(example, "r") as f:
            text = f.read()
            text_examples.append([text, "base", 2, 1024, 0.7, 3.5, 3])

    return text_examples


def load_example_filenames(example_path: str or Path):
    """
    load_example_filenames - a helper function for the gradio module to load examples
    Returns:
        dict, the examples (filename:full path)
    """
    example_path = Path(example_path)
    # load the examples into a list
    examples = {f.name: f for f in example_path.glob("*.txt")}
    return examples


def extract_keywords(
    text: str, num_keywords: int = 3, window_size: int = 5
) -> List[str]:
    """
    Extracts keywords from a text using a simplified TextRank algorithm.

    Args:
        text: The text to extract keywords from.
        num_keywords: The number of keywords to extract. Default is 5.
        window_size: The number of words considered for co-occurrence. Default is 5.

    Returns:
        A list of strings, where each string is a keyword extracted from the input text.
    """
    logger = logging.getLogger(__name__)
    # Remove stopwords and tokenize the text into words
    words = [
        word
        for word in re.findall(r"\b\w{3,}\b", text.lower())
        if word not in STOPWORDS
    ]

    # Create a graph of word co-occurrences within a moving window of words
    cooccur = defaultdict(lambda: defaultdict(int))
    deque_words = deque(maxlen=window_size)
    for word in words:
        for w1, w2 in combinations(deque_words, 2):
            cooccur[w1][w2] += 1
            cooccur[w2][w1] += 1
        deque_words.append(word)

    # Assign scores to words using a simplified TextRank algorithm
    scores = defaultdict(float)
    for _ in range(10):
        new_scores = defaultdict(float)
        for word, co_words in cooccur.items():
            new_scores[word] = 0.15 + 0.85 * sum(
                cooccur[word][other] / sum(cooccur[other].values()) * scores[other]
                for other in co_words
            )
        scores = new_scores

    # Sort the words by score and return the top num_keywords keywords
    keywords = sorted(scores, key=scores.get, reverse=True)[:num_keywords]
    logger.debug(f"All keywords: {keywords}")
    # Use fuzzy matching to remove similar keywords
    final_keywords = []
    for keyword in keywords:
        if not any(fuzz.ratio(keyword, other) > 70 for other in final_keywords):
            final_keywords.append(keyword)
    logger.info(f"Keywords (final):\t{final_keywords}")
    return final_keywords


def saves_summary(
    summarize_output, outpath: str or Path = None, add_signature=True, **kwargs
):
    """
    saves_summary - save the summary generated from summarize_via_tokenbatches() to a text file

    summarize_output: output from summarize_via_tokenbatches()
    outpath: path to the output file
    add_signature: whether to add a signature to the output file
    kwargs: additional keyword arguments to include in the output file
    """
    logger = logging.getLogger(__name__)
    sum_text = [f"\t{s['summary'][0]}\n" for s in summarize_output]
    sum_scores = [f"\n - {round(s['summary_score'],4)}" for s in summarize_output]
    scores_text = "\n".join(sum_scores)
    full_summary = "\n".join(sum_text)

    keywords = "_".join(extract_keywords(full_summary))
    logger.info(f"kw:\t{keywords}")
    outpath = (
        Path.cwd() / f"document_summary_{get_timestamp()}_{keywords}.txt"
        if outpath is None
        else Path(outpath)
    )
    logger.info(f"Saving summary to:\t{outpath.name}")
    with open(
        outpath,
        "w",
        encoding="utf-8",
    ) as fo:
        fo.writelines(full_summary)
        fo.write("\n\n")
        if add_signature:
            fo.write("\n\n---\n\n")
            fo.write("Generated with the Document Summarization space :)\n\n")
            fo.write("https://hf.co/spaces/pszemraj/document-summarization\n\n")
    with open(
        outpath,
        "a",
    ) as fo:
        fo.write("\n")
        fo.write(f"## Section Scores:\n\n")
        fo.writelines(scores_text)
        fo.write("\n\n")
        fo.write(f"Date: {get_timestamp()}\n\n")
        if kwargs:
            fo.write("---\n\n")
            fo.write("## Parameters:\n\n")
            for key, value in kwargs.items():
                fo.write(f"{key}: {value}\n")
    return outpath