|
"""Module contains the async interface to match needle against haystack in batch.""" |
|
import asyncio |
|
import heapq |
|
from typing import Any, Callable, Dict, List, Union, cast |
|
|
|
from pfzy.score import SCORE_INDICES, fzy_scorer |
|
from pfzy.types import HAYSTACKS |
|
|
|
|
|
async def _rank_task( |
|
scorer: Callable[[str, str], SCORE_INDICES], |
|
needle: str, |
|
haystacks: List[Union[str, Dict[str, Any]]], |
|
key: str, |
|
) -> List[Dict[str, Any]]: |
|
"""Calculate the score for needle against given list of haystacks and rank them. |
|
|
|
Args: |
|
scorer: Scorer to be used to do the calculation. |
|
needle: Substring to search. |
|
haystacks: List of dictionary containing the haystack to be searched. |
|
key: The key within the `haystacks` dictionary that contains the actual string value. |
|
|
|
Return: |
|
Sorted list of haystacks based on the needle score with additional keys for the `score` |
|
and `indices`. |
|
""" |
|
result = [] |
|
for haystack in haystacks: |
|
score, indices = scorer(needle, cast(Dict, haystack)[key]) |
|
if indices is None: |
|
continue |
|
result.append( |
|
{ |
|
"score": score, |
|
"indices": indices, |
|
"haystack": haystack, |
|
} |
|
) |
|
result.sort(key=lambda x: x["score"], reverse=True) |
|
return result |
|
|
|
|
|
async def fuzzy_match( |
|
needle: str, |
|
haystacks: HAYSTACKS, |
|
key: str = "", |
|
batch_size: int = 4096, |
|
scorer: Callable[[str, str], SCORE_INDICES] = None, |
|
) -> List[Dict[str, Any]]: |
|
"""Fuzzy find the needle within list of haystacks and get matched results with matching index. |
|
|
|
Note: |
|
The `key` argument is optional when the provided `haystacks` argument is a list of :class:`str`. |
|
It will be given a default key `value` if not present. |
|
|
|
Warning: |
|
The `key` argument is required when provided `haystacks` argument is a list of :class:`dict`. |
|
If not present, :class:`TypeError` will be raised. |
|
|
|
Args: |
|
needle: String to search within the `haystacks`. |
|
haystacks: List of haystack/longer strings to be searched. |
|
key: If `haystacks` is a list of dictionary, provide the key that |
|
can obtain the haystack value to search. |
|
batch_size: Number of entry to be processed together. |
|
scorer (Callable[[str, str], SCORE_indices]): Desired scorer to use. Currently only :func:`~pfzy.score.fzy_scorer` and :func:`~pfzy.score.substr_scorer` is supported. |
|
|
|
Raises: |
|
TypeError: When the argument `haystacks` is :class:`list` of :class:`dict` and the `key` argument |
|
is missing, :class:`TypeError` will be raised. |
|
|
|
Returns: |
|
List of matching `haystacks` with additional key indices and score. |
|
|
|
Examples: |
|
>>> import asyncio |
|
>>> asyncio.run(fuzzy_match("ab", ["acb", "acbabc"])) |
|
[{'value': 'acbabc', 'indices': [3, 4]}, {'value': 'acb', 'indices': [0, 2]}] |
|
""" |
|
if scorer is None: |
|
scorer = fzy_scorer |
|
|
|
for index, haystack in enumerate(haystacks): |
|
if not isinstance(haystack, dict): |
|
if not key: |
|
key = "value" |
|
haystacks[index] = {key: haystack} |
|
|
|
if not key: |
|
raise TypeError( |
|
f"${fuzzy_match.__name__} missing 1 required argument: 'key', 'key' is required when haystacks is an instance of dict" |
|
) |
|
|
|
batches = await asyncio.gather( |
|
*( |
|
_rank_task( |
|
scorer, |
|
needle, |
|
haystacks[offset : offset + batch_size], |
|
key, |
|
) |
|
for offset in range(0, len(haystacks), batch_size) |
|
) |
|
) |
|
results = heapq.merge(*batches, key=lambda x: x["score"], reverse=True) |
|
choices = [] |
|
for candidate in results: |
|
candidate["haystack"]["indices"] = candidate["indices"] |
|
choices.append(candidate["haystack"]) |
|
return choices |
|
|