File size: 3,752 Bytes
4a51346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 13 18:17:15 2014

@author: takluyver
"""
import sys
PY3 = (sys.version_info[0] >= 3)

try:
    from inspect import signature, Parameter  # Python >= 3.3
except ImportError:
    from ._signatures import signature, Parameter

if PY3:
    from functools import wraps
else:
    from functools import wraps as _wraps
    def wraps(f):
        def dec(func):
            _wraps(f)(func)
            func.__wrapped__ = f
            return func

        return dec

def callback_prototype(prototype):
    """Decorator to process a callback prototype.
    
    A callback prototype is a function whose signature includes all the values
    that will be passed by the callback API in question.
    
    The original function will be returned, with a ``prototype.adapt`` attribute
    which can be used to prepare third party callbacks.
    """
    protosig = signature(prototype)
    positional, keyword = [], []
    for name, param in protosig.parameters.items():
        if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
            raise TypeError("*args/**kwargs not supported in prototypes")

        if (param.default is not Parameter.empty) \
            or (param.kind == Parameter.KEYWORD_ONLY):
            keyword.append(name)
        else:
            positional.append(name)
        
    kwargs = dict.fromkeys(keyword)
    def adapt(callback):
        """Introspect and prepare a third party callback."""
        sig = signature(callback)
        try:
            # XXX: callback can have extra optional parameters - OK?
            sig.bind(*positional, **kwargs)
            return callback
        except TypeError:
            pass
        
        # Match up arguments
        unmatched_pos = positional[:]
        unmatched_kw = kwargs.copy()
        unrecognised = []
        # TODO: unrecognised parameters with default values - OK?
        for name, param in sig.parameters.items():
            # print(name, param.kind) #DBG
            if param.kind == Parameter.POSITIONAL_ONLY:
                if len(unmatched_pos) > 0:
                    unmatched_pos.pop(0)
                else:
                    unrecognised.append(name)
            elif param.kind == Parameter.POSITIONAL_OR_KEYWORD:
                if (param.default is not Parameter.empty) and (name in unmatched_kw):
                    unmatched_kw.pop(name)
                elif len(unmatched_pos) > 0:
                    unmatched_pos.pop(0)    
                else:
                    unrecognised.append(name)
            elif param.kind == Parameter.VAR_POSITIONAL:
                unmatched_pos = []
            elif param.kind == Parameter.KEYWORD_ONLY:
                if name in unmatched_kw:
                    unmatched_kw.pop(name)
                else:
                    unrecognised.append(name)
            else:  # VAR_KEYWORD
                unmatched_kw = {}
        
            # print(unmatched_pos, unmatched_kw, unrecognised) #DBG
        
        if unrecognised:
            raise TypeError("Function {!r} had unmatched arguments: {}".format(callback, unrecognised))

        n_positional = len(positional) - len(unmatched_pos)

        @wraps(callback)
        def adapted(*args, **kwargs):
            """Wrapper for third party callbacks that discards excess arguments"""
#            print(args, kwargs)
            args = args[:n_positional]
            for name in unmatched_kw:
                # XXX: Could name not be in kwargs?
                kwargs.pop(name)
#            print(args, kwargs, unmatched_pos, cut_positional, unmatched_kw)
            return callback(*args, **kwargs)
        
        return adapted

    prototype.adapt = adapt
    return prototype