|
|
|
import json |
|
import numpy as np |
|
import cv2 |
|
import re |
|
from eval_referring import referring_expression |
|
import matplotlib.pyplot as plt |
|
from shapely import wkt |
|
import time |
|
import math |
|
from matplotlib.path import Path |
|
from eval_classification import accuracy_precision_recall |
|
|
|
|
|
def convert_geochat_string(build, img_size=256): |
|
""" |
|
convert the raw str geochat output {<40><89><56><100>|<57>}, {<0><89><56><100>|<57>} |
|
to a list of rotated bboxes |
|
""" |
|
build = build.strip('{}') |
|
bbox_segments = build.split("}{") |
|
|
|
|
|
pattern = r"<(\d+)>" |
|
|
|
|
|
bboxes = [ |
|
list(map(int, re.findall(pattern, segment))) |
|
for segment in bbox_segments] |
|
|
|
rotated_bboxes = [] |
|
for bbox in bboxes: |
|
try: |
|
xmin, ymin, xmax, ymax, angle = [float(v) for v in bbox] |
|
except: |
|
pass |
|
|
|
|
|
xmin = xmin * img_size / 100 |
|
ymin = ymin * img_size / 100 |
|
xmax = xmax * img_size / 100 |
|
ymax = ymax * img_size / 100 |
|
|
|
|
|
rect_width = xmax - xmin |
|
rect_height = ymax - ymin |
|
center_x = xmin + rect_width / 2 |
|
center_y = ymin + rect_height / 2 |
|
|
|
|
|
corners = np.array([ |
|
[xmin, ymin], |
|
[xmax, ymin], |
|
[xmax, ymax], |
|
[xmin, ymax] |
|
]) |
|
|
|
|
|
angle_rad = math.radians(angle) |
|
cos_angle = math.cos(angle_rad) |
|
sin_angle = math.sin(angle_rad) |
|
rotated_corners = [] |
|
for x, y in corners: |
|
tx = x - center_x |
|
ty = y - center_y |
|
rotated_x = tx * cos_angle - ty * sin_angle + center_x |
|
rotated_y = tx * sin_angle + ty * cos_angle + center_y |
|
rotated_corners.append([rotated_x, rotated_y]) |
|
|
|
rotated_bboxes.append(np.array(rotated_corners)) |
|
|
|
return rotated_bboxes |
|
|
|
|
|
def get_changed_buildings(build1, build2, img_size=256, task=None): |
|
""" |
|
Given a list of predicted buildings in image 1 and image 2, this function |
|
- creates two img_size * img_size numpy arrays for both of the images |
|
- gets the mask differences between the two numpy arrays |
|
- returns a list of bounding boxes that reflect those differences, as well as the difference mask |
|
Input: |
|
- build1: [[x,y],[x,y],[x,y],[x,y]] array of four x,y coordinates of the bounding box of a building |
|
- task can be either None, constructed or destructed |
|
Note: those bboxes can be rotated |
|
""" |
|
image1 = np.zeros((img_size, img_size), np.uint8) |
|
image2 = np.zeros((img_size, img_size), np.uint8) |
|
|
|
build1 = convert_geochat_string(build1) |
|
build2 = convert_geochat_string(build2) |
|
|
|
|
|
for b in build1: |
|
path = Path(b) |
|
x, y = np.meshgrid(np.arange(img_size), np.arange(img_size)) |
|
points = np.vstack((x.flatten(), y.flatten())).T |
|
image1[path.contains_points(points).reshape(img_size, img_size)] = 1 |
|
|
|
for b in build2: |
|
path = Path(b) |
|
x, y = np.meshgrid(np.arange(img_size), np.arange(img_size)) |
|
points = np.vstack((x.flatten(), y.flatten())).T |
|
image2[path.contains_points(points).reshape(img_size, img_size)] = 1 |
|
|
|
|
|
if task == None: |
|
diff = cv2.bitwise_xor(image1, image2) |
|
elif task == "constructed": |
|
|
|
diff = cv2.bitwise_and(image2, cv2.bitwise_not(image1)) |
|
elif task == "destructed": |
|
|
|
diff = cv2.bitwise_and(image1, cv2.bitwise_not(image2)) |
|
|
|
|
|
contours, _ = cv2.findContours(diff, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
bboxes = [] |
|
for contour in contours: |
|
x, y, w, h = cv2.boundingRect(contour) |
|
x, y, w, h = y, x, h, w |
|
bboxes.append([x, y, x+w, y+h]) |
|
|
|
return bboxes, diff |
|
|
|
def get_canonical_answer_dataset(answers): |
|
""" |
|
This function creates a new dataset with questions and answers for geochat, ready to parse into the evaluation metrics.""" |
|
|
|
new_dataset = {} |
|
|
|
for key, answer in answers.items(): |
|
num, quadrant, geovlmid = key.split("_") |
|
task = answer['task'] |
|
if geovlmid == "1" in task: |
|
continue |
|
|
|
|
|
id2 = num + "_" + quadrant + "_" + "1" |
|
answer1 = answers[key] |
|
try: |
|
answer2 = answers[id2] |
|
except: |
|
print(f"The associated image to {key} wasn't present in the dataset") |
|
continue |
|
|
|
|
|
change_bboxes, mask = get_changed_buildings(answer1['predicted'], answer2['predicted']) |
|
|
|
|
|
new_line = {} |
|
|
|
new_line['predicted'] = "" |
|
if len(change_bboxes)>0: |
|
for bbox in change_bboxes: |
|
new_line['predicted'] += str(bbox) + ", " |
|
new_line['predicted'] = new_line['predicted'][:-2] |
|
new_line['predicted_mask'] = mask.tolist() |
|
|
|
new_line['ground_truth'] = answer1['original_answer'] |
|
new_line['question'] = answer1['original_question'] |
|
new_line['task'] = answer1['task'] |
|
new_line['original_input_polygon'] = answer1['original_input_polygon'] |
|
|
|
new_key = num + "_" + quadrant |
|
new_dataset[new_key] = new_line |
|
|
|
return new_dataset |
|
|
|
def postprocess_auxiliary_qa(key, answer, original_answers): |
|
new_line = {} |
|
new_line['ground_truth'] = answer['ground_truth'] |
|
new_line['question'] = answer['question'] |
|
new_line['task'] = answer['task'] |
|
new_line['original_input_polygon'] = answer['original_input_polygon'] |
|
|
|
|
|
answer1 = original_answers[key + '_0']['predicted'] |
|
answer2 = original_answers[key + '_1']['predicted'] |
|
|
|
|
|
setting = None |
|
if "constructed" or "built" in answer['original_question']: |
|
setting = "constructed" |
|
elif "destructed" or "torn down" in answer['original_question']: |
|
setting = "destructed" |
|
else: |
|
print("The task is not recognized") |
|
print("Original question: ", answer['original_question']) |
|
print() |
|
|
|
|
|
change_bboxes, mask = get_changed_buildings(answer1, answer2, task=setting) |
|
|
|
new_line['predicted_mask'] = mask.tolist() |
|
contours, hierarchy = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
|
found_convex_polygon = False |
|
for contour in contours: |
|
|
|
epsilon = 0.04 * cv2.arcLength(contour, True) |
|
approx = cv2.approxPolyDP(contour, epsilon, True) |
|
if len(approx) == 4: |
|
found_convex_polygon = True |
|
break |
|
|
|
if found_convex_polygon: |
|
new_line['predicted'] = "Yes" |
|
else: |
|
new_line['predicted'] = "No" |
|
|
|
return new_line |
|
|
|
|
|
def postprocess_auxiliary_region_qa(key, answer, original_answers, img_size=256): |
|
""" |
|
There is a bbox in the input polygon, we need to find the changed buildings in the image |
|
inside that bbox |
|
""" |
|
new_line = {} |
|
new_line['ground_truth'] = answer['ground_truth'] |
|
new_line['question'] = answer['question'] |
|
new_line['task'] = answer['task'] |
|
new_line['original_input_polygon'] = answer['original_input_polygon'] |
|
|
|
|
|
answer1 = original_answers[key + '_0']['predicted'] |
|
answer2 = original_answers[key + '_1']['predicted'] |
|
|
|
|
|
change_bboxes, mask = get_changed_buildings(answer1, answer2) |
|
|
|
|
|
question = new_line['question'] |
|
|
|
start = question.find('[') |
|
end = question.find(']') |
|
bbox = question[start+1:end].split(',') |
|
bbox = [int(b) * img_size // 100 for b in bbox] |
|
|
|
|
|
mask[:bbox[0], :] = 0 |
|
mask[bbox[2]:, :] = 0 |
|
mask[:, :bbox[1]] = 0 |
|
mask[:, bbox[3]:] = 0 |
|
|
|
|
|
found_convex_polygon = False |
|
contours, hierarchy = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
for contour in contours: |
|
|
|
epsilon = 0.04 * cv2.arcLength(contour, True) |
|
approx = cv2.approxPolyDP(contour, epsilon, True) |
|
if len(approx) == 4: |
|
found_convex_polygon = True |
|
break |
|
|
|
new_line['predicted_mask'] = mask.tolist() |
|
|
|
if found_convex_polygon: |
|
new_line['predicted'] = "Yes" |
|
else: |
|
new_line['predicted'] = "No" |
|
|
|
return new_line |
|
|
|
|
|
def postprocess_auxiliary_referring(key, answer, original_answers): |
|
new_line = {} |
|
new_line['ground_truth'] = answer['ground_truth'] |
|
new_line['question'] = answer['question'] |
|
new_line['task'] = answer['task'] |
|
new_line['original_input_polygon'] = answer['original_input_polygon'] |
|
|
|
|
|
answer1 = original_answers[key + '_0']['predicted'] |
|
answer2 = original_answers[key + '_1']['predicted'] |
|
|
|
|
|
setting = None |
|
if "constructed" or "built" in answer['original_question']: |
|
setting = "constructed" |
|
elif "destructed" or "torn down" in answer['original_question']: |
|
setting = "destructed" |
|
else: |
|
print("The task is not recognized") |
|
print("Original question: ", answer['original_question']) |
|
print() |
|
|
|
|
|
change_bboxes, mask = get_changed_buildings(answer1, answer2, task=setting) |
|
|
|
new_line['predicted_mask'] = mask.tolist() |
|
new_line['predicted'] = "" |
|
if len(change_bboxes)>0: |
|
for bbox in change_bboxes: |
|
new_line['predicted'] += str(bbox) + ", " |
|
new_line['predicted'] = new_line['predicted'][:-2] |
|
|
|
return new_line |
|
|
|
|
|
def postprocess_auxiliary_geochat_s2looking(canonical_answers, original_answers): |
|
""" |
|
Postprocess the auxiliary file for geochat_s2looking |
|
The present questions are |
|
question1 = 'temporal_question_answering: Are there any buildings in the first image which were {destructed,torn down} in the second?' |
|
question2 = 'temporal_referring_expression: Identify the buildings in the first image which were {built,constructed,destructed,torn down} as seen in the second image.' |
|
question3 = 'localization_task: Identify all changed buildings.' |
|
question4 = 'referring_expression: identify the {constructed, destructed} buildings in the image.' |
|
question5 = 'question_answering: Have any buildings been task in the area? Please answer with Yes or No' |
|
|
|
The goal is to update the 'predicted' field with the correct bounding boxes of the changed buildings. |
|
- Localization can be kept as is. |
|
- For question answering tasks, the 'predicted' field should be updated with 'Yes' or 'No' depending on the answer. |
|
We output 'Yes' if there is a convex polygon in the 'predicted' field. |
|
- For referring expression, we first need to identify if the task is 'constructed' or 'destructed' and then update the 'predicted' field with the correct mask of the changed buildings. |
|
Input: |
|
- answers: dictionary with the answers paired with the get_canonical_answer_dataset function |
|
Output: |
|
- postprocessed_answers: dictionary with 'predicted' and 'predicted_mask' fields updated |
|
""" |
|
postprocessed_answers = {} |
|
|
|
for key, answer in canonical_answers.items(): |
|
task = answer['task'] |
|
|
|
if 'localization' in task: |
|
postprocessed_answers[key] = answer |
|
continue |
|
if 'region_based_question_answering' in task: |
|
answer = postprocess_auxiliary_region_qa(key, answer, original_answers) |
|
postprocessed_answers[key] = answer |
|
continue |
|
if 'question_answering' in task: |
|
answer = postprocess_auxiliary_qa(key, answer, original_answers) |
|
postprocessed_answers[key] = answer |
|
continue |
|
if 'referring_expression' in task: |
|
answer = postprocess_auxiliary_referring(key, answer, original_answers) |
|
postprocessed_answers[key] = answer |
|
continue |
|
|
|
return postprocessed_answers |
|
|
|
|
|
def evaluate_geochat_s2looking(answer_file, dataset_file, split): |
|
answers = {} |
|
with open(answer_file, 'r') as f: |
|
for line in f: |
|
line = json.loads(line) |
|
answers[list(line.keys())[0]] = line[list(line.keys())[0]] |
|
|
|
dataset = dataset_file.split("/")[-1] |
|
if dataset == "dataset_canonical.json": |
|
|
|
|
|
postprocessed_answers = get_canonical_answer_dataset(answers) |
|
|
|
referring_expression(postprocessed_answers, "geochat_s2looking", False, "s2looking/answers/geochat_canonical_test", split=split) |
|
|
|
elif dataset == "dataset_v01_v02_canonical_filtered.json" or dataset == "dataset_RQA.json": |
|
|
|
|
|
postprocessed_answers = get_canonical_answer_dataset(answers) |
|
postprocessed_answers = postprocess_auxiliary_geochat_s2looking(postprocessed_answers, answers) |
|
|
|
print("Referring expression") |
|
referring_expression(postprocessed_answers, "geochat_s2looking", False, "s2looking/answers/geochat_v01_v02_canonical_filtered_test", split=split) |
|
print() |
|
print("Accuracy") |
|
accuracy_precision_recall(postprocessed_answers, "s2looking", verbose=False) |
|
print() |
|
|
|
|
|
|
|
question1 = 'temporal_question_answering: Are there any buildings in the first image which were {destructed,torn down} in the second?' |
|
question2 = 'temporal_referring_expression: Identify the buildings in the first image which were {built,constructed,destructed,torn down} as seen in the second image.' |
|
question3 = 'localization_task: Identify all changed buildings.' |
|
question4 = 'referring_expression: identify the {constructed, destructed} buildings in the image.' |
|
question5 = 'question_answering: Have any buildings been task in the area? Please answer with Yes or No' |
|
|
|
|
|
for question in [question1, question2, question3, question4, question5]: |
|
dataset_question = {} |
|
for data in postprocessed_answers: |
|
if postprocessed_answers[data]['task'] == question: |
|
dataset_question[data] = postprocessed_answers[data] |
|
|
|
if len(dataset_question) > 0: |
|
print('Evaluating for question ', question) |
|
print('Size of the dataset is ', len(dataset_question)) |
|
referring_expression(dataset_question, "geochat_s2looking", False, "s2looking/answers/geochat_v01_v02_canonical_filtered_test", split=split) |
|
print() |
|
|
|
else: |
|
print("Evaluation is not suppored for this dataset. Please provide a valid dataset.") |
|
print("The supported datasets are: dataset_canonical.json, dataset_v01_v02_canonical_filtered.json") |
|
|
|
|
|
|
|
|