Danieldu
add code
a89d9fd
raw
history blame
5.83 kB
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rapidfuzz.distance import Levenshtein
from difflib import SequenceMatcher
import numpy as np
import string
class RecMetric(object):
def __init__(self,
main_indicator='acc',
is_filter=False,
ignore_space=True,
**kwargs):
self.main_indicator = main_indicator
self.is_filter = is_filter
self.ignore_space = ignore_space
self.eps = 1e-5
self.reset()
def _normalize_text(self, text):
text = ''.join(
filter(lambda x: x in (string.digits + string.ascii_letters), text))
return text.lower()
def __call__(self, pred_label, *args, **kwargs):
preds, labels = pred_label
correct_num = 0
all_num = 0
norm_edit_dis = 0.0
for (pred, pred_conf), (target, _) in zip(preds, labels):
if self.ignore_space:
pred = pred.replace(" ", "")
target = target.replace(" ", "")
if self.is_filter:
pred = self._normalize_text(pred)
target = self._normalize_text(target)
norm_edit_dis += Levenshtein.normalized_distance(pred, target)
if pred == target:
correct_num += 1
all_num += 1
self.correct_num += correct_num
self.all_num += all_num
self.norm_edit_dis += norm_edit_dis
return {
'acc': correct_num / (all_num + self.eps),
'norm_edit_dis': 1 - norm_edit_dis / (all_num + self.eps)
}
def get_metric(self):
"""
return metrics {
'acc': 0,
'norm_edit_dis': 0,
}
"""
acc = 1.0 * self.correct_num / (self.all_num + self.eps)
norm_edit_dis = 1 - self.norm_edit_dis / (self.all_num + self.eps)
self.reset()
return {'acc': acc, 'norm_edit_dis': norm_edit_dis}
def reset(self):
self.correct_num = 0
self.all_num = 0
self.norm_edit_dis = 0
class CNTMetric(object):
def __init__(self, main_indicator='acc', **kwargs):
self.main_indicator = main_indicator
self.eps = 1e-5
self.reset()
def __call__(self, pred_label, *args, **kwargs):
preds, labels = pred_label
correct_num = 0
all_num = 0
for pred, target in zip(preds, labels):
if pred == target:
correct_num += 1
all_num += 1
self.correct_num += correct_num
self.all_num += all_num
return {'acc': correct_num / (all_num + self.eps), }
def get_metric(self):
"""
return metrics {
'acc': 0,
}
"""
acc = 1.0 * self.correct_num / (self.all_num + self.eps)
self.reset()
return {'acc': acc}
def reset(self):
self.correct_num = 0
self.all_num = 0
class CANMetric(object):
def __init__(self, main_indicator='exp_rate', **kwargs):
self.main_indicator = main_indicator
self.word_right = []
self.exp_right = []
self.word_total_length = 0
self.exp_total_num = 0
self.word_rate = 0
self.exp_rate = 0
self.reset()
self.epoch_reset()
def __call__(self, preds, batch, **kwargs):
for k, v in kwargs.items():
epoch_reset = v
if epoch_reset:
self.epoch_reset()
word_probs = preds
word_label, word_label_mask = batch
line_right = 0
if word_probs is not None:
word_pred = word_probs.argmax(2)
word_pred = word_pred.cpu().detach().numpy()
word_scores = [
SequenceMatcher(
None,
s1[:int(np.sum(s3))],
s2[:int(np.sum(s3))],
autojunk=False).ratio() * (
len(s1[:int(np.sum(s3))]) + len(s2[:int(np.sum(s3))])) /
len(s1[:int(np.sum(s3))]) / 2
for s1, s2, s3 in zip(word_label, word_pred, word_label_mask)
]
batch_size = len(word_scores)
for i in range(batch_size):
if word_scores[i] == 1:
line_right += 1
self.word_rate = np.mean(word_scores) #float
self.exp_rate = line_right / batch_size #float
exp_length, word_length = word_label.shape[:2]
self.word_right.append(self.word_rate * word_length)
self.exp_right.append(self.exp_rate * exp_length)
self.word_total_length = self.word_total_length + word_length
self.exp_total_num = self.exp_total_num + exp_length
def get_metric(self):
"""
return {
'word_rate': 0,
"exp_rate": 0,
}
"""
cur_word_rate = sum(self.word_right) / self.word_total_length
cur_exp_rate = sum(self.exp_right) / self.exp_total_num
self.reset()
return {'word_rate': cur_word_rate, "exp_rate": cur_exp_rate}
def reset(self):
self.word_rate = 0
self.exp_rate = 0
def epoch_reset(self):
self.word_right = []
self.exp_right = []
self.word_total_length = 0
self.exp_total_num = 0