Update app.py
Browse files
app.py
CHANGED
@@ -7,11 +7,20 @@ from audioseal import AudioSeal
|
|
7 |
import random
|
8 |
import string
|
9 |
from pathlib import Path
|
|
|
|
|
|
|
10 |
|
11 |
# Initialize logging
|
12 |
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
|
13 |
logger = logging.getLogger(__name__)
|
14 |
|
|
|
|
|
|
|
|
|
|
|
|
|
15 |
# Helper function for generating a unique alphanumeric message
|
16 |
def generate_unique_message(length=16):
|
17 |
characters = string.ascii_letters + string.digits
|
@@ -22,11 +31,6 @@ def message_to_binary(message, bit_length=16):
|
|
22 |
binary_message = ''.join(format(ord(c), '08b') for c in message)
|
23 |
return binary_message[:bit_length].ljust(bit_length, '0')
|
24 |
|
25 |
-
# Converts binary string back to ASCII message
|
26 |
-
def binary_to_message(binary_str):
|
27 |
-
chars = [chr(int(binary_str[i:i+8], 2)) for i in range(0, len(binary_str), 8)]
|
28 |
-
return ''.join(chars).rstrip('\x00')
|
29 |
-
|
30 |
# Converts binary string to hexadecimal
|
31 |
def binary_to_hex(binary_str):
|
32 |
return hex(int(binary_str, 2))[2:].zfill(4)
|
@@ -39,8 +43,36 @@ def load_and_resample_audio(audio_file_path, target_sample_rate=16000):
|
|
39 |
waveform = resampler(waveform)
|
40 |
return waveform, target_sample_rate
|
41 |
|
42 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
43 |
def watermark_audio(audio_file_path, unique_message):
|
|
|
44 |
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
|
45 |
waveform = torch.clamp(waveform, min=-1.0, max=1.0)
|
46 |
generator = AudioSeal.load_generator("audioseal_wm_16bits")
|
@@ -52,7 +84,12 @@ def watermark_audio(audio_file_path, unique_message):
|
|
52 |
watermarked_audio = generator(waveform.unsqueeze(0), sample_rate=sample_rate, message=message_tensor).squeeze(0)
|
53 |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
|
54 |
torchaudio.save(temp_file.name, watermarked_audio, sample_rate)
|
55 |
-
|
|
|
|
|
|
|
|
|
|
|
56 |
|
57 |
# Function to detect watermark in audio
|
58 |
def detect_watermark(audio_file_path, original_hex_message=None):
|
@@ -70,45 +107,29 @@ def detect_watermark(audio_file_path, original_hex_message=None):
|
|
70 |
|
71 |
return result, detected_hex_message, match_result
|
72 |
|
73 |
-
# Setup for Gradio interface
|
74 |
# Load the CSS styles
|
75 |
style_path = Path("style.css")
|
76 |
-
|
|
|
|
|
|
|
77 |
|
|
|
78 |
with gr.Blocks(css=style) as demo:
|
79 |
with gr.Tab("Watermark Audio"):
|
80 |
with gr.Column(scale=6):
|
81 |
-
gr.Markdown("### How to Watermark Your Audio", elem_id="how_to_watermark")
|
82 |
-
gr.Markdown("""
|
83 |
-
This tool allows you to embed a unique, invisible watermark into your audio files. Here's how it works:
|
84 |
-
- **Upload Audio File**: Select the audio file you want to protect.
|
85 |
-
- **Generate Unique Message**: Click this button to create a unique code that identifies the audio as yours.
|
86 |
-
- **Unique Message**: This is the unique code generated for your audio. It's used to create the watermark.
|
87 |
-
- **Apply Watermark**: Embed the unique code into your audio file. This step converts the code into a format (hexadecimal) that's embedded into the audio without altering its quality.
|
88 |
-
- **Watermarked Audio**: After the watermark is applied, you can download and listen to your watermarked audio here. It will sound just like the original, but now it has your unique watermark.
|
89 |
-
- **Message Used for Watermarking**: Shows the code that was embedded into your audio. The actual embedded code is a hexadecimal version of this message, which is a more secure representation.
|
90 |
-
""", elem_id="watermark_process_explanation")
|
91 |
audio_input_watermark = gr.Audio(label="Upload Audio File for Watermarking", type="filepath")
|
92 |
-
|
93 |
-
unique_message_output = gr.Textbox(label="Unique Message", value="Press Generate")
|
94 |
-
watermark_button = gr.Button("Apply Watermark")
|
95 |
watermarked_audio_output = gr.Audio(label="Watermarked Audio")
|
96 |
message_output = gr.Textbox(label="Message Used for Watermarking")
|
|
|
|
|
|
|
97 |
generate_message_button.click(fn=generate_unique_message, inputs=None, outputs=unique_message_output)
|
98 |
-
watermark_button.click(fn=watermark_audio, inputs=[audio_input_watermark, unique_message_output], outputs=[watermarked_audio_output, message_output])
|
99 |
|
100 |
with gr.Tab("Detect Watermark"):
|
101 |
with gr.Column(scale=6):
|
102 |
-
gr.Markdown("### How to Detect a Watermark in Your Audio", elem_id="how_to_detect")
|
103 |
-
gr.Markdown("""
|
104 |
-
Use this tool to check if an audio file contains a specific watermark. Here's the process:
|
105 |
-
- **Upload Audio File**: Choose the audio file you suspect contains a watermark.
|
106 |
-
- **Original Hex Message for Comparison**: If you know the hexadecimal code of the watermark you're looking for, enter it here. This helps verify the specific watermark.
|
107 |
-
- **Detect Watermark**: Analyzes the audio to find any embedded watermarks.
|
108 |
-
- **Watermark Detection Result**: Indicates whether a watermark was found and its confidence level.
|
109 |
-
- **Detected Hex Message**: If a watermark is detected, this shows the found code in hexadecimal format.
|
110 |
-
- **Match Result**: Compares the detected hex code to the one you entered, indicating if they match or not. This confirms whether the detected watermark is the one you're looking for.
|
111 |
-
""", elem_id="detection_process_explanation")
|
112 |
audio_input_detect_watermark = gr.Audio(label="Upload Audio File for Watermark Detection", type="filepath")
|
113 |
original_hex_input = gr.Textbox(label="Original Hex Message for Comparison", placeholder="Enter the original hex message here")
|
114 |
detect_watermark_button = gr.Button("Detect Watermark")
|
@@ -117,4 +138,4 @@ Use this tool to check if an audio file contains a specific watermark. Here's th
|
|
117 |
match_result_output = gr.Textbox(label="Match Result")
|
118 |
detect_watermark_button.click(fn=detect_watermark, inputs=[audio_input_detect_watermark, original_hex_input], outputs=[watermark_detection_output, detected_message_output, match_result_output])
|
119 |
|
120 |
-
demo.launch()
|
|
|
7 |
import random
|
8 |
import string
|
9 |
from pathlib import Path
|
10 |
+
from datetime import datetime
|
11 |
+
import json
|
12 |
+
import os
|
13 |
|
14 |
# Initialize logging
|
15 |
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
|
16 |
logger = logging.getLogger(__name__)
|
17 |
|
18 |
+
# File to store audio metadata
|
19 |
+
metadata_file = 'audio_metadata.json'
|
20 |
+
if not os.path.exists(metadata_file):
|
21 |
+
with open(metadata_file, 'w') as f:
|
22 |
+
json.dump({}, f)
|
23 |
+
|
24 |
# Helper function for generating a unique alphanumeric message
|
25 |
def generate_unique_message(length=16):
|
26 |
characters = string.ascii_letters + string.digits
|
|
|
31 |
binary_message = ''.join(format(ord(c), '08b') for c in message)
|
32 |
return binary_message[:bit_length].ljust(bit_length, '0')
|
33 |
|
|
|
|
|
|
|
|
|
|
|
34 |
# Converts binary string to hexadecimal
|
35 |
def binary_to_hex(binary_str):
|
36 |
return hex(int(binary_str, 2))[2:].zfill(4)
|
|
|
43 |
waveform = resampler(waveform)
|
44 |
return waveform, target_sample_rate
|
45 |
|
46 |
+
# Function to generate enhanced unique identifier with timestamp and sequential number
|
47 |
+
def generate_enhanced_identifier():
|
48 |
+
timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f')
|
49 |
+
sequential_number = str(get_next_sequential_number()).zfill(6)
|
50 |
+
return f"{timestamp}-{sequential_number}"
|
51 |
+
|
52 |
+
# Function to increment and get the next sequential number from the metadata file
|
53 |
+
def get_next_sequential_number():
|
54 |
+
with open(metadata_file, 'r+') as f:
|
55 |
+
data = json.load(f)
|
56 |
+
next_number = data.get('next_sequential_number', 1)
|
57 |
+
data['next_sequential_number'] = next_number + 1
|
58 |
+
f.seek(0)
|
59 |
+
json.dump(data, f, indent=4)
|
60 |
+
f.truncate()
|
61 |
+
return next_number
|
62 |
+
|
63 |
+
# Function to save metadata for an audio file
|
64 |
+
def save_audio_metadata(unique_id, original_hex, enhanced_id):
|
65 |
+
with open(metadata_file, 'r+') as f:
|
66 |
+
data = json.load(f)
|
67 |
+
data['audio_files'] = data.get('audio_files', {})
|
68 |
+
data['audio_files'][unique_id] = {'original_hex': original_hex, 'enhanced_id': enhanced_id}
|
69 |
+
f.seek(0)
|
70 |
+
json.dump(data, f, indent=4)
|
71 |
+
f.truncate()
|
72 |
+
|
73 |
+
# Modify the watermark_audio function to include enhanced ID generation and saving metadata
|
74 |
def watermark_audio(audio_file_path, unique_message):
|
75 |
+
# Original watermarking process
|
76 |
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
|
77 |
waveform = torch.clamp(waveform, min=-1.0, max=1.0)
|
78 |
generator = AudioSeal.load_generator("audioseal_wm_16bits")
|
|
|
84 |
watermarked_audio = generator(waveform.unsqueeze(0), sample_rate=sample_rate, message=message_tensor).squeeze(0)
|
85 |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
|
86 |
torchaudio.save(temp_file.name, watermarked_audio, sample_rate)
|
87 |
+
|
88 |
+
# Enhanced ID generation and metadata saving
|
89 |
+
enhanced_id = generate_enhanced_identifier()
|
90 |
+
save_audio_metadata(unique_message, hex_message, enhanced_id)
|
91 |
+
|
92 |
+
return temp_file.name, hex_message, enhanced_id # Include enhanced ID in the return statement
|
93 |
|
94 |
# Function to detect watermark in audio
|
95 |
def detect_watermark(audio_file_path, original_hex_message=None):
|
|
|
107 |
|
108 |
return result, detected_hex_message, match_result
|
109 |
|
|
|
110 |
# Load the CSS styles
|
111 |
style_path = Path("style.css")
|
112 |
+
if style_path.exists():
|
113 |
+
style = style_path.read_text()
|
114 |
+
else:
|
115 |
+
style = ""
|
116 |
|
117 |
+
# Define Gradio interface
|
118 |
with gr.Blocks(css=style) as demo:
|
119 |
with gr.Tab("Watermark Audio"):
|
120 |
with gr.Column(scale=6):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
121 |
audio_input_watermark = gr.Audio(label="Upload Audio File for Watermarking", type="filepath")
|
122 |
+
unique_message_output = gr.Textbox(label="Unique Message")
|
|
|
|
|
123 |
watermarked_audio_output = gr.Audio(label="Watermarked Audio")
|
124 |
message_output = gr.Textbox(label="Message Used for Watermarking")
|
125 |
+
enhanced_id_output = gr.Textbox(label="Enhanced ID")
|
126 |
+
generate_message_button = gr.Button("Generate Unique Message")
|
127 |
+
watermark_button = gr.Button("Apply Watermark")
|
128 |
generate_message_button.click(fn=generate_unique_message, inputs=None, outputs=unique_message_output)
|
129 |
+
watermark_button.click(fn=watermark_audio, inputs=[audio_input_watermark, unique_message_output], outputs=[watermarked_audio_output, message_output, enhanced_id_output])
|
130 |
|
131 |
with gr.Tab("Detect Watermark"):
|
132 |
with gr.Column(scale=6):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
133 |
audio_input_detect_watermark = gr.Audio(label="Upload Audio File for Watermark Detection", type="filepath")
|
134 |
original_hex_input = gr.Textbox(label="Original Hex Message for Comparison", placeholder="Enter the original hex message here")
|
135 |
detect_watermark_button = gr.Button("Detect Watermark")
|
|
|
138 |
match_result_output = gr.Textbox(label="Match Result")
|
139 |
detect_watermark_button.click(fn=detect_watermark, inputs=[audio_input_detect_watermark, original_hex_input], outputs=[watermark_detection_output, detected_message_output, match_result_output])
|
140 |
|
141 |
+
demo.launch()
|