dirty-ghidra / main.py
ejschwartz's picture
Update for multiple predictions
036f81f
raw
history blame
7.71 kB
import gradio as gr
import subprocess
import tempfile
import itertools
import os
import sys
import hashlib
import json
GHIDRA_PROJECT_DIR = f"{os.getenv('HOME')}/ghidra_project"
os.makedirs(GHIDRA_PROJECT_DIR, exist_ok=True)
def hash_file(file):
sha256_hash = hashlib.sha256()
with open(file, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def get_functions(file):
file_hash = hash_file(file)
with tempfile.TemporaryDirectory() as TEMP_DIR:
# First import the file
o = subprocess.run(
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {file_hash} -import {file} 2>&1",
shell=True,
capture_output=True,
encoding="utf8"
)
if o.returncode != 0:
if not "Found conflicting program file in project:" in o.stdout:
raise gr.Error(f"Unable to run Ghidra on {file}: {o.stdout}")
o = subprocess.run(
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {file_hash} -process -postscript /home/user/app/scripts/dump_functions.py {TEMP_DIR}/funcs.json 2>&1",
shell=True,
capture_output=True,
encoding="utf8"
)
if not os.path.exists(f"{TEMP_DIR}/funcs.json"):
raise gr.Error(f"DIRTY Ghidra failed to produce output: {o.stdout}")
json_funcs = json.load(open(f"{TEMP_DIR}/funcs.json"))
return json_funcs
with gr.Blocks() as demo:
state = gr.State()
intro = gr.Markdown(
"""
# DIRTY-Ghidra Inference Demo
Welcome! This is a demo of DIRTY-Ghidra, a tool that predict names and types for variables for Ghidra's decompiler.
To get started, upload a binary or select one of the example binaries below. Uploading a binary requires decompiling each function in the binary, which can take a few minutes.
## TODOs
* Make predictions for variables in non-unique storage locations
"""
)
file_widget = gr.File(label="Executable file")
with gr.Column(visible=False) as col:
# output = gr.Textbox("Output")
gr.Markdown(
"""
Great, you selected an executable! Now pick the function you would like
to analyze.
"""
)
fun_dropdown = gr.Dropdown(
label="Select a function", choices=["Woohoo!"], interactive=True
)
gr.Markdown(
"""
Below you can find some information.
"""
)
with gr.Row(visible=True) as result:
disassembly = gr.Code(
label="Disassembly", lines=20,
#min_width=400
)
original_decompile = gr.Code(
language="c",
label="Original Decompilation", lines=20,
#min_width=400
)
decompile = gr.Code(
language="c",
label="Renamed and retyped Decompilation",
lines=20,
#min_width=400
)
model_output = gr.JSON(
label="Model Output",
#min_width=400
)
# with gr.Column():
# clazz = gr.Label()
# interpret_button = gr.Button("Interpret (very slow)")
# interpretation = gr.components.Interpretation(disassembly)
example_widget = gr.Examples(
examples=[f.path for f in os.scandir(os.path.join(os.path.dirname(__file__), "examples"))],
inputs=file_widget,
outputs=[state, disassembly, original_decompile, decompile, model_output],
)
def file_change_fn(file):
if file is None:
return {col: gr.update(visible=False), state: {"file": None}}
else:
try:
progress = gr.Progress()
progress(
0,
desc=f"Analyzing binary {os.path.basename(file.name)} with Ghidra...",
)
fun_data = get_functions(file.name)
# print(fun_data)
addrs = [
(f"{name} ({hex(int(addr))}; {numvars} vars)", int(addr))
for addr, (name, cf, numvars) in fun_data.items()
]
cfs = {name: cf for (name, cf, _numvars) in fun_data.values()}
except Exception as e:
raise gr.Error(f"Unable to analyze binary with Ghidra: {e}")
return {
col: gr.Column(visible=True),
fun_dropdown: gr.Dropdown(choices=addrs, value=addrs[0][1]),
state: {"file": file,
"file_hash": hash_file(file.name),
"cfs": cfs},
}
def function_change_fn(selected_fun, state, progress=gr.Progress()):
# disassembly_str = fun_data[int(selected_fun, 16)].decode("utf-8")
# load_results = model.fn(disassembly_str)
# top_k = {e['label']: e['confidence'] for e in load_results['confidences']}
with tempfile.TemporaryDirectory() as TEMP_DIR:
progress(0, desc=f"Running DIRTY Ghidra on {hex(selected_fun)}...")
o = subprocess.run(
f"/ghidra/support/analyzeHeadless {GHIDRA_PROJECT_DIR} {state['file_hash']} -process -postscript /DIRTY/scripts/DIRTY_infer.py {TEMP_DIR}/funcs.json {selected_fun} 2>&1",
shell=True,
capture_output=True,
encoding="utf8"
)
if o.returncode != 0:
raise gr.Error(f"Unable to run Ghidra: {o.stdout}")
if not os.path.exists(f"{TEMP_DIR}/funcs.json"):
raise gr.Error(f"DIRTY Ghidra failed to produce output: {o.stdout}")
try:
json_info = json.load(open(f"{TEMP_DIR}/funcs.json"))
except Exception as e:
raise gr.Error(f"Unable to parse DIRTY Ghidra output: {e}\n{o.stdout}")
if "exception" in json_info:
raise gr.Error(f"DIRTY Ghidra failed: {json_info['exception']}")
#print(json_info)
# group by location
src_filtered = json_info['other_info']['example_info']['source_filtered']
keyfunc = lambda x: x[1]
src_filtered = sorted(src_filtered.items(), key=keyfunc)
src_filtered = {k: [v1 for v1, v2 in v] for k, v in itertools.groupby(src_filtered, keyfunc)}
model_output_info = {
'model_output': json_info["model_output"],
'model_output_multi': json_info["model_output_multi"],
'dup_location_vars': src_filtered,
'other_outputs': json_info['other_info']['other_outputs']
}
return {
disassembly: gr.Textbox(value=json_info["disassembly"]),
original_decompile: gr.Textbox(value=json_info["original_decompile"]),
decompile: gr.Textbox(value=json_info["decompile"]),
model_output: gr.JSON(value=json.dumps(model_output_info)),
}
# Need to put intro as output to get progress to work!
file_widget.change(
file_change_fn, file_widget, outputs=[intro, state, col, fun_dropdown]
)
fun_dropdown.change(
function_change_fn,
inputs=[fun_dropdown, state],
outputs=[disassembly, original_decompile, decompile, model_output],
)
# spaces only shows stderr..
os.dup2(sys.stdout.fileno(), sys.stderr.fileno())
demo.queue()
demo.launch(server_name="0.0.0.0", server_port=7860)