File size: 3,314 Bytes
ef3d4ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import uuid
import hashlib
import warnings
import itertools as it
import functools as ft
from pathlib import Path

class FileObject:
    _window = 20

    def __init__(self, path):
        self.fp = path.open('rb')
        self.chunk = 2 ** self._window

    def close(self):
        self.fp.close()

    @ft.cached_property
    def checksum(self):
        csum = hashlib.blake2b()

        while True:
            data = self.fp.read(self.chunk)
            if not data:
                break
            csum.update(data)
        self.fp.seek(0)

        return csum.hexdigest()

class FileStream:
    def __init__(self, paths):
        self.paths = paths
        self.streams = []

    def __len__(self):
        return len(self.streams)

    def __iter__(self):
        for p in self.paths:
            stream = FileObject(p)
            self.streams.append(stream)
            yield stream

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        for s in self.streams:
            s.close()
        self.streams.clear()

class FileManager:
    def __init__(self, client, prefix, batch_size=20):
        self.client = client
        self.prefix = prefix
        self.batch_size = batch_size

        self.storage = set()
        self.vector_store_id = None

    def __bool__(self):
        return self.vector_store_id is not None

    def __iter__(self):
        if self:
            kwargs = {}
            while True:
                vs_files = self.client.beta.vector_stores.files.list(
                    vector_store_id=self.vector_store_id,
                    **kwargs,
                )
                for f in vs_files.data:
                    result = self.client.files.retrieve(f.id)
                    yield result.filename

                if not vs_files.has_more:
                    break
                kwargs['after'] = vs_files.after

    def __call__(self, paths):
        files = []
        self.test_and_setup()

        for p in self.ls(paths):
            with FileStream(p) as stream:
                for s in stream:
                    if s.checksum not in self.storage:
                        files.append(s.fp)
                        self.storage.add(s.checksum)
                if files:
                    self.put(files)
                    files.clear()

        return '\n'.join(self)

    def test_and_setup(self):
        if self:
            msg = f'Vector store already exists ({self.vector_store_id})'
            warnings.warn(msg)
        else:
            name = f'{self.prefix}{uuid.uuid4()}'
            vector_store = self.client.beta.vector_stores.create(
	        name=name,
            )
            self.vector_store_id = vector_store.id

    def ls(self, paths):
        left = 0
        while left < len(paths):
            right = left + self.batch_size
            yield list(map(Path, it.islice(paths, left, right)))
            left = right

    def put(self, files):
        batch = self.client.beta.vector_stores.file_batches.upload_and_poll(
            vector_store_id=self.vector_store_id,
            files=files,
        )
        if batch.file_counts.completed != len(files):
            err = f'Error uploading documents: {batch.file_counts}'
            raise InterruptedError(err)