Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import tempfile | |
import itertools | |
import os | |
import sys | |
import hashlib | |
import json | |
GHIDRA_PROJECT_DIR = f"{os.getenv('HOME')}/ghidra_project" | |
os.makedirs(GHIDRA_PROJECT_DIR, exist_ok=True) | |
def hash_file(file): | |
sha256_hash = hashlib.sha256() | |
with open(file, "rb") as f: | |
for byte_block in iter(lambda: f.read(4096), b""): | |
sha256_hash.update(byte_block) | |
return sha256_hash.hexdigest() | |
def get_functions(file): | |
file_hash = hash_file(file) | |
with tempfile.TemporaryDirectory() as TEMP_DIR: | |
# First import the file | |
o = subprocess.run( | |
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {file_hash} -import {file} 2>&1", | |
shell=True, | |
capture_output=True, | |
encoding="utf8" | |
) | |
if o.returncode != 0: | |
if not "Found conflicting program file in project:" in o.stdout: | |
raise gr.Error(f"Unable to run Ghidra on {file}: {o.stdout}") | |
o = subprocess.run( | |
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {file_hash} -process -postscript /home/user/app/scripts/dump_functions.py {TEMP_DIR}/funcs.json 2>&1", | |
shell=True, | |
capture_output=True, | |
encoding="utf8" | |
) | |
if not os.path.exists(f"{TEMP_DIR}/funcs.json"): | |
raise gr.Error(f"DIRTY Ghidra failed to produce output: {o.stdout}") | |
json_funcs = json.load(open(f"{TEMP_DIR}/funcs.json")) | |
return json_funcs | |
with gr.Blocks() as demo: | |
state = gr.State() | |
intro = gr.Markdown( | |
""" | |
# DIRTY-Ghidra Inference Demo | |
Welcome! This is a demo of DIRTY-Ghidra, a tool that predict names and types for variables for Ghidra's decompiler. | |
To get started, upload a binary or select one of the example binaries below. Uploading a binary requires decompiling each function in the binary, which can take a few minutes. | |
## TODOs | |
* Make predictions for variables in non-unique storage locations | |
""" | |
) | |
file_widget = gr.File(label="Executable file") | |
with gr.Column(visible=False) as col: | |
# output = gr.Textbox("Output") | |
gr.Markdown( | |
""" | |
Great, you selected an executable! Now pick the function you would like | |
to analyze. | |
""" | |
) | |
fun_dropdown = gr.Dropdown( | |
label="Select a function", choices=["Woohoo!"], interactive=True | |
) | |
gr.Markdown( | |
""" | |
Below you can find some information. | |
""" | |
) | |
with gr.Row(visible=True) as result: | |
disassembly = gr.Code( | |
label="Disassembly", lines=20, | |
#min_width=400 | |
) | |
original_decompile = gr.Code( | |
language="c", | |
label="Original Decompilation", lines=20, | |
#min_width=400 | |
) | |
decompile = gr.Code( | |
language="c", | |
label="Renamed and retyped Decompilation", | |
lines=20, | |
#min_width=400 | |
) | |
model_output = gr.JSON( | |
label="Model Output", | |
#min_width=400 | |
) | |
# with gr.Column(): | |
# clazz = gr.Label() | |
# interpret_button = gr.Button("Interpret (very slow)") | |
# interpretation = gr.components.Interpretation(disassembly) | |
example_widget = gr.Examples( | |
examples=[f.path for f in os.scandir(os.path.join(os.path.dirname(__file__), "examples"))], | |
inputs=file_widget, | |
outputs=[state, disassembly, original_decompile, decompile, model_output], | |
) | |
def file_change_fn(file): | |
if file is None: | |
return {col: gr.update(visible=False), state: {"file": None}} | |
else: | |
try: | |
progress = gr.Progress() | |
progress( | |
0, | |
desc=f"Analyzing binary {os.path.basename(file.name)} with Ghidra...", | |
) | |
fun_data = get_functions(file.name) | |
# print(fun_data) | |
addrs = [ | |
(f"{name} ({hex(int(addr))}; {numvars} vars)", int(addr)) | |
for addr, (name, cf, numvars) in fun_data.items() | |
] | |
cfs = {name: cf for (name, cf, _numvars) in fun_data.values()} | |
except Exception as e: | |
raise gr.Error(f"Unable to analyze binary with Ghidra: {e}") | |
return { | |
col: gr.Column(visible=True), | |
fun_dropdown: gr.Dropdown(choices=addrs, value=addrs[0][1]), | |
state: {"file": file, | |
"file_hash": hash_file(file.name), | |
"cfs": cfs}, | |
} | |
def function_change_fn(selected_fun, state, progress=gr.Progress()): | |
# disassembly_str = fun_data[int(selected_fun, 16)].decode("utf-8") | |
# load_results = model.fn(disassembly_str) | |
# top_k = {e['label']: e['confidence'] for e in load_results['confidences']} | |
with tempfile.TemporaryDirectory() as TEMP_DIR: | |
progress(0, desc=f"Running DIRTY Ghidra on {hex(selected_fun)}...") | |
o = subprocess.run( | |
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {state['file_hash']} -process -postscript /DIRTY/scripts/DIRTY_infer.py {TEMP_DIR}/funcs.json {selected_fun} 2>&1", | |
shell=True, | |
capture_output=True, | |
encoding="utf8" | |
) | |
if o.returncode != 0: | |
raise gr.Error(f"Unable to run Ghidra: {o.stdout}") | |
if not os.path.exists(f"{TEMP_DIR}/funcs.json"): | |
raise gr.Error(f"DIRTY Ghidra failed to produce output: {o.stdout}") | |
try: | |
json_info = json.load(open(f"{TEMP_DIR}/funcs.json")) | |
except Exception as e: | |
raise gr.Error(f"Unable to parse DIRTY Ghidra output: {e}\n{o.stdout}") | |
if "exception" in json_info: | |
raise gr.Error(f"DIRTY Ghidra failed: {json_info['exception']}") | |
#print(json_info) | |
# group by location | |
src_filtered = json_info['other_info']['example_info']['source_filtered'] | |
keyfunc = lambda x: x[1] | |
src_filtered = sorted(src_filtered.items(), key=keyfunc) | |
src_filtered = {k: [v1 for v1, v2 in v] for k, v in itertools.groupby(src_filtered, keyfunc)} | |
model_output_info = { | |
'model_output': json_info["model_output"], | |
'model_output_multi': json_info["model_output_multi"], | |
'dup_location_vars': src_filtered, | |
'other_outputs': json_info['other_info']['other_outputs'] | |
} | |
return { | |
disassembly: gr.Textbox(value=json_info["disassembly"]), | |
original_decompile: gr.Textbox(value=json_info["original_decompile"]), | |
decompile: gr.Textbox(value=json_info["decompile"]), | |
model_output: gr.JSON(value=json.dumps(model_output_info)), | |
} | |
# Need to put intro as output to get progress to work! | |
file_widget.change( | |
file_change_fn, file_widget, outputs=[intro, state, col, fun_dropdown] | |
) | |
fun_dropdown.change( | |
function_change_fn, | |
inputs=[fun_dropdown, state], | |
outputs=[disassembly, original_decompile, decompile, model_output], | |
) | |
# spaces only shows stderr.. | |
os.dup2(sys.stdout.fileno(), sys.stderr.fileno()) | |
demo.queue() | |
demo.launch(server_name="0.0.0.0", server_port=7860) | |