znation HF staff commited on
Commit
f624d68
·
1 Parent(s): c90151d

let's try this

Browse files
Files changed (6) hide show
  1. app.py +12 -0
  2. list_files.py +126 -0
  3. list_reconstructions.py +161 -0
  4. list_repos.py +52 -0
  5. list_xorbs.py +59 -0
  6. refresh_lists.py +27 -0
app.py CHANGED
@@ -1,7 +1,19 @@
1
  import gradio as gr
2
 
 
 
 
3
  def greet(name):
 
 
4
  return "Hello " + name + "!!"
5
 
 
 
 
 
 
 
 
6
  demo = gr.Interface(fn=greet, inputs="text", outputs="text")
7
  demo.launch()
 
1
  import gradio as gr
2
 
3
+ import list_repos
4
+ import refresh_lists
5
+
6
  def greet(name):
7
+ refresh_repos()
8
+ refresh_files()
9
  return "Hello " + name + "!!"
10
 
11
+ def refresh_repos(progress=gr.Progress(track_tqdm=True)):
12
+ list_repos.write_repos_to_db()
13
+
14
+ def refresh_files(progress=gr.Progress(track_tqdm=True)):
15
+ while True:
16
+ refresh_lists.refresh_oldest_repo()
17
+
18
  demo = gr.Interface(fn=greet, inputs="text", outputs="text")
19
  demo.launch()
list_files.py ADDED
@@ -0,0 +1,126 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ # found on https://stackoverflow.com/a/52130355 to fix infinite recursion with ssl
3
+ # at the beginning of the script
4
+ import gevent.monkey
5
+ gevent.monkey.patch_all()
6
+
7
+ import json
8
+ from datetime import date, datetime
9
+ import sys
10
+ import time
11
+
12
+ import huggingface_hub
13
+ import sqlite3
14
+ from tqdm import tqdm
15
+
16
+ fs = huggingface_hub.HfFileSystem()
17
+
18
+ import list_repos
19
+
20
+ SQLITE3_DB = "data/files.sqlite3"
21
+
22
+
23
+ def json_serial(obj):
24
+ if isinstance(obj, (datetime, date)):
25
+ return obj.isoformat()
26
+ raise TypeError("Type %s not serializable" % type(obj))
27
+
28
+
29
+ def list_files_from_hub(repo, replace_model_in_url=True):
30
+ # remove models/ from the front of repo,
31
+ # since the "default" type of repo is a model.
32
+ # the underlying implementation of fs.ls appends repo as /api/models/<repo>.
33
+ if replace_model_in_url and repo.startswith("models/"):
34
+ repo = repo.replace("models/", "", 1)
35
+
36
+ # implement our own recursive list since it will make multiple requests,
37
+ # one for each ls, which are much more likely to succeed.
38
+ # passing recursive=True (which is undocumented anyway) does it in one request
39
+ # which really slams the server and might give a 500 error due to hitting some
40
+ # backend timeout.
41
+ items = fs.ls(repo)
42
+ for item in items:
43
+ if item["type"] == "directory":
44
+ yield from list_files_from_hub(item["name"], replace_model_in_url=False)
45
+ else:
46
+ yield item
47
+
48
+
49
+ def write_files_to_db(repo):
50
+ print("Opening database", SQLITE3_DB, file=sys.stderr)
51
+ con = sqlite3.connect(SQLITE3_DB)
52
+ cur = con.cursor()
53
+ print("Creating files table if not exists", file=sys.stderr)
54
+ cur.execute(
55
+ "CREATE TABLE IF NOT EXISTS files (name TEXT PRIMARY KEY, last_updated_datetime INTEGER, repo TEXT, size INTEGER, type TEXT, blob_id TEXT, is_lfs INTEGER, lfs_size INTEGER, lfs_sha256 TEXT, lfs_pointer_size INTEGER, last_commit_oid TEXT, last_commit_title TEXT, last_commit_date TEXT)"
56
+ )
57
+ con.commit()
58
+ print("Deleting existing rows for repo {}".format(repo), file=sys.stderr)
59
+ cur.execute("DELETE FROM files WHERE repo = '{}'".format(repo))
60
+ con.commit()
61
+ print("Inserting new rows from HFFileSystem query for repo {}".format(repo), file=sys.stderr)
62
+ for file in tqdm(list_files_from_hub(repo)):
63
+ is_lfs = file["lfs"] is not None
64
+ query = "INSERT INTO files VALUES ('{}', {}, '{}', {}, '{}', '{}', {}, {}, '{}', {}, '{}', '{}', '{}')".format(
65
+ file["name"],
66
+ int(time.time()),
67
+ repo,
68
+ file["size"],
69
+ file["type"],
70
+ file["blob_id"],
71
+ 1 if is_lfs else 0,
72
+ file["lfs"]["size"] if is_lfs else 'NULL',
73
+ file["lfs"]["sha256"] if is_lfs else 'NULL',
74
+ file["lfs"]["pointer_size"] if is_lfs else 'NULL',
75
+ file["last_commit"]["oid"],
76
+ file["last_commit"]["title"],
77
+ file["last_commit"]["date"],
78
+ )
79
+ cur.execute(query)
80
+ con.commit()
81
+
82
+
83
+ def is_lfs(file):
84
+ return file["lfs"] is not None
85
+
86
+
87
+ def list_lfs_files(repo):
88
+ list = list_files(repo)
89
+ for file in list:
90
+ if is_lfs(file):
91
+ yield file
92
+
93
+
94
+ def list_files(repo, limit=None):
95
+ con = sqlite3.connect(SQLITE3_DB)
96
+ cur = con.cursor()
97
+ if limit is None:
98
+ res = cur.execute("SELECT * FROM files WHERE repo == '{}'".format(repo))
99
+ else:
100
+ res = cur.execute("SELECT * FROM files WHERE repo == '{}' LIMIT {}".format(repo, limit))
101
+ ret = [
102
+ {
103
+ "name": row[0],
104
+ "last_updated_datetime": row[1],
105
+ "size": row[2],
106
+ "type": row[3],
107
+ "blob_id": row[4],
108
+ "lfs": (
109
+ {"size": row[6], "sha256": row[7], "pointer_size": row[8]}
110
+ if row[5]
111
+ else None
112
+ ),
113
+ "last_commit": {"oid": row[9], "title": row[10], "date": row[11]},
114
+ }
115
+ for row in res.fetchall()
116
+ ]
117
+ return ret
118
+
119
+ if __name__ == "__main__":
120
+ for repo in list_repos.list_repos():
121
+ write_files_to_db(repo)
122
+ print("Done writing to DB. Sample of 9 rows:")
123
+ for repo in list_repos.list_repos(limit=3):
124
+ for file in list_files(repo, limit=3):
125
+ print(json.dumps(file, default=json_serial))
126
+
list_reconstructions.py ADDED
@@ -0,0 +1,161 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import sys
4
+ import time
5
+
6
+ import grequests
7
+ import sqlite3
8
+ from tqdm import tqdm
9
+
10
+ import list_files
11
+ import list_repos
12
+
13
+ SQLITE3_DB = "data/reconstructions.sqlite3"
14
+
15
+ HF_ENDPOINT = os.getenv("HF_ENDPOINT", "https://huggingface.co")
16
+ XET_CAS_ENDPOINT = os.getenv("XET_CAS_ENDPOINT", "https://cas-server.xethub.hf.co")
17
+
18
+ RESOLVE_URL_TEMPLATE = HF_ENDPOINT + "/{}/resolve/main"
19
+
20
+
21
+ def exception_handler(req, exc):
22
+ print(exc, file=sys.stderr)
23
+
24
+
25
+ def list_reconstructions_from_hub(repo):
26
+ print(
27
+ "Listing reconstructions using:\nHF Hub Endpoint: {}\nXet CAS Endpoint: {}".format(
28
+ HF_ENDPOINT, XET_CAS_ENDPOINT
29
+ ),
30
+ file=sys.stderr,
31
+ )
32
+
33
+ ret = []
34
+ files = []
35
+ resolve_reqs = []
36
+ reconstruct_reqs = []
37
+ err_count = 0
38
+
39
+ print("Listing files for repo {}".format(repo), file=sys.stderr)
40
+ total = 0
41
+ for i, file in tqdm(enumerate(list_files.list_lfs_files(repo))):
42
+ total += 1
43
+ files.append(file["name"])
44
+ if repo.startswith("models/"):
45
+ repo = repo.replace("models/", "", 1)
46
+ url = file["name"].replace(repo, RESOLVE_URL_TEMPLATE.format(repo), 1)
47
+ headers = {"Authorization": "Bearer {}".format(os.getenv("HF_TOKEN"))}
48
+ resolve_reqs.append(
49
+ grequests.head(url, headers=headers, allow_redirects=False)
50
+ )
51
+
52
+ print("", file=sys.stderr)
53
+ print("Calling /resolve/ for repo {}".format(repo), file=sys.stderr)
54
+ for i, resp in tqdm(
55
+ grequests.imap_enumerated(
56
+ resolve_reqs, size=4, exception_handler=exception_handler
57
+ ),
58
+ total=total,
59
+ ):
60
+ if resp is None:
61
+ err_count += 1
62
+ continue
63
+ # todo: use refresh_route when access_token is expired
64
+ refresh_route = resp.headers.get("x-xet-refresh-route")
65
+ xet_hash = resp.headers.get("x-xet-hash")
66
+ access_token = resp.headers.get("x-xet-access-token")
67
+ if xet_hash is not None and xet_hash != "":
68
+ url = "{}/reconstruction/{}".format(XET_CAS_ENDPOINT, xet_hash)
69
+ headers = {"Authorization": "Bearer {}".format(access_token)}
70
+ reconstruct_reqs.append(grequests.get(url, headers=headers))
71
+
72
+ print("", file=sys.stderr)
73
+ print(
74
+ "Calling /reconstruct/ with grequests for repo {}".format(repo),
75
+ file=sys.stderr,
76
+ )
77
+
78
+ for i, resp in tqdm(
79
+ grequests.imap_enumerated(
80
+ reconstruct_reqs, size=4, exception_handler=exception_handler
81
+ ),
82
+ total=total,
83
+ ):
84
+ if resp is None:
85
+ continue
86
+ if resp.status_code != 200:
87
+ continue
88
+ body = resp.json()
89
+ for term in body["terms"]:
90
+ entry = {
91
+ "start": term["range"]["start"],
92
+ "end": term["range"]["end"],
93
+ "file_path": files[i + err_count],
94
+ "xorb_id": term["hash"],
95
+ "unpacked_length": term["unpacked_length"]
96
+ }
97
+ ret.append(entry)
98
+
99
+ return ret
100
+
101
+
102
+ def list_reconstructions(repos, limit=None):
103
+ ret = []
104
+ con = sqlite3.connect(SQLITE3_DB)
105
+ cur = con.cursor()
106
+ for repo in repos:
107
+ if limit is None:
108
+ res = cur.execute("SELECT * FROM reconstructions WHERE repo = '{}'".format(repo))
109
+ else:
110
+ res = cur.execute("SELECT * FROM reconstructions WHERE repo = '{}' LIMIT {}".format(repo, limit))
111
+ for row in res.fetchall():
112
+ entry = {
113
+ "xorb_id": row[1],
114
+ "last_updated_timestamp": row[2],
115
+ "repo": row[3],
116
+ "file_path": row[4],
117
+ "unpacked_length": row[5],
118
+ "start": row[6],
119
+ "end": row[7]
120
+ }
121
+ ret.append(entry)
122
+ return ret
123
+
124
+
125
+ def write_files_to_db(repo):
126
+ print("Opening database", SQLITE3_DB, file=sys.stderr)
127
+ con = sqlite3.connect(SQLITE3_DB)
128
+ cur = con.cursor()
129
+ print("Creating reconstructions table if not exists", file=sys.stderr)
130
+ cur.execute(
131
+ "CREATE TABLE IF NOT EXISTS reconstructions (id INTEGER PRIMARY KEY AUTOINCREMENT, xorb_id TEXT, last_updated_datetime INTEGER, repo TEXT, file_path TEXT, unpacked_length INTEGER, start INTEGER, end INTEGER)"
132
+ )
133
+ con.commit()
134
+ print("Deleting existing rows for repo {}".format(repo), file=sys.stderr)
135
+ cur.execute("DELETE FROM reconstructions WHERE repo = '{}'".format(repo))
136
+ con.commit()
137
+ print("Inserting rows from HFFileSystem query", file=sys.stderr)
138
+ for reconstruction in list_reconstructions_from_hub(repo):
139
+ query = "INSERT INTO reconstructions VALUES (NULL, '{}', {}, '{}', '{}', {}, {}, {})".format(
140
+ reconstruction["xorb_id"],
141
+ int(time.time()),
142
+ repo,
143
+ reconstruction["file_path"],
144
+ reconstruction["unpacked_length"],
145
+ reconstruction["start"],
146
+ reconstruction["end"]
147
+ )
148
+ cur.execute(query)
149
+ con.commit()
150
+
151
+
152
+ if __name__ == "__main__":
153
+ for repo in list_repos.list_repos():
154
+ write_files_to_db(repo)
155
+ print("Done writing to DB. Sample of 5 rows:")
156
+ json.dump(
157
+ list_reconstructions(list_repos.list_repos(), limit=5),
158
+ sys.stdout,
159
+ sort_keys=True,
160
+ indent=4,
161
+ )
list_repos.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
+ import huggingface_hub
3
+ import sys
4
+ import time
5
+
6
+ from tqdm import tqdm
7
+
8
+ SQLITE3_DB = "data/repos.sqlite3"
9
+
10
+ def list_repos_from_hub():
11
+ for repo in huggingface_hub.list_datasets():
12
+ if not(repo.private):
13
+ yield "datasets/" + repo.id
14
+ for repo in huggingface_hub.list_models():
15
+ if not(repo.private):
16
+ yield "models/" + repo.id
17
+ for repo in huggingface_hub.list_spaces():
18
+ if not(repo.private):
19
+ yield "spaces/" + repo.id
20
+
21
+ def write_repos_to_db():
22
+ print("Opening database", SQLITE3_DB, file=sys.stderr)
23
+ con = sqlite3.connect(SQLITE3_DB)
24
+ cur = con.cursor()
25
+ print("Creating repos table if not exists", file=sys.stderr)
26
+ cur.execute("CREATE TABLE IF NOT EXISTS repos (id TEXT PRIMARY KEY, last_updated_datetime INTEGER, last_enumerated_datetime INTEGER NULLABLE)")
27
+ con.commit()
28
+ print("Inserting rows from huggingface_hub query", file=sys.stderr)
29
+ for repo in tqdm(list_repos_from_hub()):
30
+ cur.execute("INSERT OR IGNORE INTO repos VALUES ('{}', '{}', NULL)".format(repo, 0))
31
+ con.commit()
32
+
33
+ def list_repos(limit=None):
34
+ con = sqlite3.connect(SQLITE3_DB)
35
+ cur = con.cursor()
36
+ if limit is None:
37
+ res = cur.execute("SELECT id FROM repos ORDER BY last_updated_datetime ASC")
38
+ else:
39
+ res = cur.execute("SELECT id FROM repos ORDER BY last_updated_datetime ASC LIMIT {}".format(limit))
40
+ return [row[0] for row in res.fetchall()]
41
+
42
+ def set_updated_datetime(repo):
43
+ con = sqlite3.connect(SQLITE3_DB)
44
+ cur = con.cursor()
45
+ cur.execute("UPDATE repos SET last_updated_datetime = {} WHERE id = '{}'".format(int(time.time()), repo))
46
+ con.commit()
47
+
48
+ if __name__ == "__main__":
49
+ write_repos_to_db()
50
+ print("Done writing to DB. Sample of 5 rows:")
51
+ for repo in list_repos(limit=5):
52
+ print(repo)
list_xorbs.py ADDED
@@ -0,0 +1,59 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ### A simpler rendition of reconstructions.
2
+ ### Lists only xorbs and a "dedupe factor" for the xorb
3
+ ### Where dedupe-factor 1 == no dedupe,
4
+ ### 2 == 1 chunk shared,
5
+ ### 3 == 2 chunks shared,
6
+ ### etc.
7
+
8
+ import json
9
+ import sys
10
+
11
+ import list_reconstructions
12
+ import list_repos
13
+
14
+ def list_xorbs(repos):
15
+ # first build up a mapping of {xorb_id: [(start, end), (start, end), ...]}
16
+ xorbs = {}
17
+ reconstructions = list_reconstructions.list_reconstructions(repos)
18
+ for term in reconstructions:
19
+ if not(term["xorb_id"] in xorbs):
20
+ xorbs[term["xorb_id"]] = []
21
+ path_parts = term["file_path"].split("/")
22
+ if path_parts[0] != "datasets" and \
23
+ path_parts[0] != "spaces":
24
+ # models omit the "models" part from file path
25
+ path_parts.insert(0, "models")
26
+ repo = "/".join(path_parts[:3])
27
+ xorbs[term["xorb_id"]].append((term["start"], term["end"], repo))
28
+
29
+ # then walk the lists and compute dedupe factor
30
+ output = []
31
+ for xorb_id,chunks in xorbs.items():
32
+ min_chunk_idx = float("inf")
33
+ max_chunk_idx = float("-inf")
34
+ xorb_repos = set()
35
+ dedupe_factor = 0
36
+ for chunk in chunks:
37
+ min_chunk_idx = min(min_chunk_idx, chunk[0])
38
+ max_chunk_idx = max(max_chunk_idx, chunk[1])
39
+ xorb_repos.add(chunk[2])
40
+ xorb_repos = list(xorb_repos)
41
+ for i in range(min_chunk_idx, max_chunk_idx):
42
+ ref_count = 0
43
+ for chunk in chunks:
44
+ if i >= chunk[0] and i < chunk[1]:
45
+ ref_count += 1
46
+ dedupe_factor += ref_count
47
+ if max_chunk_idx != 0:
48
+ dedupe_factor /= float(max_chunk_idx)
49
+ for repo in xorb_repos:
50
+ output.append({
51
+ "xorb_id": xorb_id,
52
+ "dedupe_factor": dedupe_factor,
53
+ "repo": repo
54
+ })
55
+ return output
56
+
57
+
58
+ if __name__ == "__main__":
59
+ json.dump(list_xorbs(list_repos.list_repos()), sys.stdout, sort_keys=True, indent=4)
refresh_lists.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Refreshes the `files` and `reconstructions` tables for a single repo.
3
+ """
4
+
5
+ import sys
6
+
7
+ import list_files
8
+ import list_reconstructions
9
+ import list_repos
10
+
11
+ def refresh_oldest_repo():
12
+ oldest_repo = list_repos.list_repos(limit=1)
13
+ assert(len(oldest_repo) == 1)
14
+ refresh_repo(oldest_repo[0])
15
+
16
+ def refresh_repo(repo):
17
+ print("Refreshing repo", repo, file=sys.stderr)
18
+ print("Listing files", file=sys.stderr)
19
+ list_files.write_files_to_db(repo)
20
+ print("Listing reconstructions", file=sys.stderr)
21
+ list_reconstructions.write_files_to_db(repo)
22
+ print("Updating timestamp", file=sys.stderr)
23
+ list_repos.set_updated_datetime(repo)
24
+ print("Done", file=sys.stderr)
25
+
26
+ if __name__ == "__main__":
27
+ refresh_oldest_repo()