|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import ast |
|
import json |
|
import os |
|
import sys |
|
import time |
|
from typing import Dict |
|
|
|
from get_ci_error_statistics import get_jobs |
|
from notification_service import ( |
|
Message, |
|
handle_stacktraces, |
|
handle_test_results, |
|
prepare_reports, |
|
retrieve_artifact, |
|
retrieve_available_artifacts, |
|
) |
|
from slack_sdk import WebClient |
|
|
|
|
|
client = WebClient(token=os.environ["CI_SLACK_BOT_TOKEN"]) |
|
|
|
|
|
class QuantizationMessage(Message): |
|
def __init__( |
|
self, |
|
title: str, |
|
results: Dict, |
|
): |
|
self.title = title |
|
|
|
|
|
self.n_success = sum(r["success"] for r in results.values()) |
|
self.single_gpu_failures = sum(r["failed"]["single"] for r in results.values()) |
|
self.multi_gpu_failures = sum(r["failed"]["multi"] for r in results.values()) |
|
self.n_failures = self.single_gpu_failures + self.multi_gpu_failures |
|
|
|
self.n_tests = self.n_failures + self.n_success |
|
self.results = results |
|
self.thread_ts = None |
|
|
|
@property |
|
def payload(self) -> str: |
|
blocks = [self.header] |
|
|
|
if self.n_failures > 0: |
|
blocks.append(self.failures_overwiew) |
|
blocks.append(self.failures_detailed) |
|
|
|
if self.n_failures == 0: |
|
blocks.append(self.no_failures) |
|
|
|
return json.dumps(blocks) |
|
|
|
@property |
|
def time(self) -> str: |
|
all_results = self.results.values() |
|
time_spent = [] |
|
for r in all_results: |
|
if len(r["time_spent"]): |
|
time_spent.extend([x for x in r["time_spent"].split(", ") if len(x.strip())]) |
|
total_secs = 0 |
|
|
|
for time in time_spent: |
|
time_parts = time.split(":") |
|
|
|
|
|
if len(time_parts) == 1: |
|
time_parts = [0, 0, time_parts[0]] |
|
|
|
hours, minutes, seconds = int(time_parts[0]), int(time_parts[1]), float(time_parts[2]) |
|
total_secs += hours * 3600 + minutes * 60 + seconds |
|
|
|
hours, minutes, seconds = total_secs // 3600, (total_secs % 3600) // 60, total_secs % 60 |
|
return f"{int(hours)}h{int(minutes)}m{int(seconds)}s" |
|
|
|
@property |
|
def failures_overwiew(self) -> Dict: |
|
return { |
|
"type": "section", |
|
"text": { |
|
"type": "plain_text", |
|
"text": ( |
|
f"There were {self.n_failures} failures, out of {self.n_tests} tests.\n" |
|
f"The suite ran in {self.time}." |
|
), |
|
"emoji": True, |
|
}, |
|
"accessory": { |
|
"type": "button", |
|
"text": {"type": "plain_text", "text": "Check Action results", "emoji": True}, |
|
"url": f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}", |
|
}, |
|
} |
|
|
|
@property |
|
def failures_detailed(self) -> Dict: |
|
failures = {k: v["failed"] for k, v in self.results.items()} |
|
|
|
individual_reports = [] |
|
for key, value in failures.items(): |
|
device_report = self.get_device_report(value) |
|
if sum(value.values()): |
|
report = f"{device_report}{key}" |
|
individual_reports.append(report) |
|
|
|
header = "Single | Multi | Category\n" |
|
failures_report = prepare_reports( |
|
title="The following quantization tests had failures", header=header, reports=individual_reports |
|
) |
|
|
|
return {"type": "section", "text": {"type": "mrkdwn", "text": failures_report}} |
|
|
|
def post(self): |
|
payload = self.payload |
|
print("Sending the following payload") |
|
print(json.dumps({"blocks": json.loads(payload)})) |
|
|
|
text = f"{self.n_failures} failures out of {self.n_tests} tests," if self.n_failures else "All tests passed." |
|
|
|
self.thread_ts = client.chat_postMessage( |
|
channel=SLACK_REPORT_CHANNEL_ID, |
|
blocks=payload, |
|
text=text, |
|
) |
|
|
|
def post_reply(self): |
|
if self.thread_ts is None: |
|
raise ValueError("Can only post reply if a post has been made.") |
|
|
|
for job, job_result in self.results.items(): |
|
if len(job_result["failures"]): |
|
for device, failures in job_result["failures"].items(): |
|
blocks = self.get_reply_blocks( |
|
job, |
|
job_result, |
|
failures, |
|
device, |
|
text=f'Number of failures: {job_result["failed"][device]}', |
|
) |
|
|
|
print("Sending the following reply") |
|
print(json.dumps({"blocks": blocks})) |
|
|
|
client.chat_postMessage( |
|
channel="#transformers-ci-daily-quantization", |
|
text=f"Results for {job}", |
|
blocks=blocks, |
|
thread_ts=self.thread_ts["ts"], |
|
) |
|
time.sleep(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
setup_status = os.environ.get("SETUP_STATUS") |
|
SLACK_REPORT_CHANNEL_ID = os.environ["SLACK_REPORT_CHANNEL"] |
|
setup_failed = True if setup_status is not None and setup_status != "success" else False |
|
|
|
|
|
ci_event = os.environ["CI_EVENT"] |
|
|
|
title = f"🤗 Results of the {ci_event} tests." |
|
|
|
if setup_failed: |
|
Message.error_out( |
|
title, ci_title="", runner_not_available=False, runner_failed=False, setup_failed=setup_failed |
|
) |
|
exit(0) |
|
|
|
arguments = sys.argv[1:][0] |
|
try: |
|
quantization_matrix = ast.literal_eval(arguments) |
|
|
|
quantization_matrix = [x.replace("quantization/", "quantization_") for x in quantization_matrix] |
|
except SyntaxError: |
|
Message.error_out(title, ci_title="") |
|
raise ValueError("Errored out.") |
|
|
|
available_artifacts = retrieve_available_artifacts() |
|
|
|
quantization_results = { |
|
quant: { |
|
"failed": {"single": 0, "multi": 0}, |
|
"success": 0, |
|
"time_spent": "", |
|
"failures": {}, |
|
"job_link": {}, |
|
} |
|
for quant in quantization_matrix |
|
if f"run_quantization_torch_gpu_{ quant }_test_reports" in available_artifacts |
|
} |
|
|
|
github_actions_jobs = get_jobs( |
|
workflow_run_id=os.environ["GITHUB_RUN_ID"], token=os.environ["ACCESS_REPO_INFO_TOKEN"] |
|
) |
|
github_actions_job_links = {job["name"]: job["html_url"] for job in github_actions_jobs} |
|
|
|
artifact_name_to_job_map = {} |
|
for job in github_actions_jobs: |
|
for step in job["steps"]: |
|
if step["name"].startswith("Test suite reports artifacts: "): |
|
artifact_name = step["name"][len("Test suite reports artifacts: ") :] |
|
artifact_name_to_job_map[artifact_name] = job |
|
break |
|
|
|
for quant in quantization_results.keys(): |
|
for artifact_path in available_artifacts[f"run_quantization_torch_gpu_{ quant }_test_reports"].paths: |
|
artifact = retrieve_artifact(artifact_path["path"], artifact_path["gpu"]) |
|
if "stats" in artifact: |
|
|
|
job = artifact_name_to_job_map[artifact_path["path"]] |
|
quantization_results[quant]["job_link"][artifact_path["gpu"]] = job["html_url"] |
|
failed, success, time_spent = handle_test_results(artifact["stats"]) |
|
quantization_results[quant]["failed"][artifact_path["gpu"]] += failed |
|
quantization_results[quant]["success"] += success |
|
quantization_results[quant]["time_spent"] += time_spent[1:-1] + ", " |
|
|
|
stacktraces = handle_stacktraces(artifact["failures_line"]) |
|
|
|
for line in artifact["summary_short"].split("\n"): |
|
if line.startswith("FAILED "): |
|
line = line[len("FAILED ") :] |
|
line = line.split()[0].replace("\n", "") |
|
|
|
if artifact_path["gpu"] not in quantization_results[quant]["failures"]: |
|
quantization_results[quant]["failures"][artifact_path["gpu"]] = [] |
|
|
|
quantization_results[quant]["failures"][artifact_path["gpu"]].append( |
|
{"line": line, "trace": stacktraces.pop(0)} |
|
) |
|
|
|
message = QuantizationMessage( |
|
title, |
|
results=quantization_results, |
|
) |
|
|
|
message.post() |
|
message.post_reply() |
|
|