chat-ap / crypt.py
Omnibus's picture
Create crypt.py
16bac35
raw
history blame
9.69 kB
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Hash import RIPEMD160, SHA256
from Crypto.Signature import pkcs1_15
import binascii
import base58
import base64
import cv2
import numpy as np
from PIL import Image
import os
import uuid as uniq
import qrcode as qr
import math
############### qr ######################
def make_qr(txt=None,data=None,im_size=None,color_f=None,color_b=None):
qrm = qr.QRCode(box_size=10,error_correction=qr.constants.ERROR_CORRECT_H)
if color_f == None:
color_f = "#000"
if color_b == None:
color_b = "#fff"
if txt != None and txt != "" and data != None:
f = Image.open(f'{data}')
f.thumbnail((im_size,im_size))
f.save("tmp.jpg")
imr = open(f'tmp.jpg','rb')
out = f'{txt}+++{base64.b64encode(imr.read())}'
print (f'txt+data {out}')
qrm.add_data(out)
qrm.make(fit=True)
img1 = qrm.make_image(fill_color=color_f, back_color=color_b)
img1.save("im.png")
return "im.png"
if txt == None or txt == "" and data != None:
f = Image.open(f'{data}')
f.thumbnail((im_size,im_size))
f.save("tmp1.jpg")
imr = open(f'tmp1.jpg','rb')
out = f'+++{base64.b64encode(imr.read())}'
print (f'data {out}')
qrm.add_data(out)
qrm.make(fit=True)
img1 = qrm.make_image(fill_color=color_f, back_color=color_b)
img1.save("im1.png")
return "im1.png"
if txt != None and txt != "" and data == None:
out = f'{txt}'
print (f'txt {out}')
qrm.add_data(out)
qrm.make(fit=True)
img1 = qrm.make_image(fill_color=color_f, back_color=color_b)
img1.save("im2.png")
return "im2.png"
############ stegan ####################
def to_bin(data):
"""Convert `data` to binary format as string"""
if isinstance(data, str):
return ''.join([ format(ord(i), "08b") for i in data ])
elif isinstance(data, bytes):
return ''.join([ format(i, "08b") for i in data ])
elif isinstance(data, np.ndarray):
return [ format(i, "08b") for i in data ]
elif isinstance(data, int) or isinstance(data, np.uint8):
return format(data, "08b")
else:
raise TypeError("Type not supported.")
def decode(image_name,txt=None):
BGRimage = cv2.imread(image_name)
image = cv2.cvtColor(BGRimage, cv2.COLOR_BGR2RGB)
binary_data = ""
for row in image:
for pixel in row:
r, g, b = to_bin(pixel)
binary_data += r[-1]
binary_data += g[-1]
binary_data += b[-1]
all_bytes = [ binary_data[i: i+8] for i in range(0, len(binary_data), 8) ]
decoded_data = ""
for byte in all_bytes:
decoded_data += chr(int(byte, 2))
if decoded_data[-5:] == "=====":
break
this = decoded_data[:-5].split("#####",1)[0]
this = eval(this)
enc_in=this
return this
def encode(image_name, secret_data,txt=None):
BGRimage = cv2.imread(image_name)
image = cv2.cvtColor(BGRimage, cv2.COLOR_BGR2RGB)
n_bytes = image.shape[0] * image.shape[1] * 3 // 8
print("[*] Maximum bytes to encode:", n_bytes)
secret_data1=secret_data
#secret_data1=f'{secret_data}#{resultp}'
while True:
if len(secret_data1)+5 < (n_bytes):
secret_data1 = f'{secret_data1}#####'
elif len(secret_data1)+5 >= (n_bytes):
break
secret_data = secret_data1
if len(secret_data) > n_bytes:
return image_name, gr.Markdown.update("""<center><h3>Input image is too large""")
secret_data += "====="
data_index = 0
binary_secret_data = to_bin(secret_data)
data_len = len(binary_secret_data)
for row in image:
for pixel in row:
r, g, b = to_bin(pixel)
if data_index < data_len:
pixel[0] = int(r[:-1] + binary_secret_data[data_index], 2)
data_index += 1
if data_index < data_len:
pixel[1] = int(g[:-1] + binary_secret_data[data_index], 2)
data_index += 1
if data_index < data_len:
pixel[2] = int(b[:-1] + binary_secret_data[data_index], 2)
data_index += 1
if data_index >= data_len:
break
return image
def conv_im(im,data):
uniqnum = uniq.uuid4()
byte_size = len(data)
print (f'bytes:{byte_size}')
data_pixels = byte_size*4
print (f'pixels:{data_pixels}')
#data_sq=data_pixels/2
data_sq = int(math.sqrt(data_pixels))
data_pad = data_sq+100
print (f'square image:{data_pad}x{data_pad}')
qr_im = im
img1 = Image.open(qr_im)
imgw = img1.size[0]
imgh = img1.size[1]
print (f'qr Size:{img1.size}')
#img1.thumbnail((imgw*4,imgh*4), Image.Resampling.LANCZOS)
if data_pad > imgw or data_pad > imgh :
img1 = img1.resize((int(data_pad),int(data_pad)), Image.Resampling.LANCZOS)
print (img1.size)
img1.save(f'tmpim{uniqnum}.png')
with open(f'tmpim{uniqnum}.png', "rb") as image_file:
encoded_string = base64.b64encode(image_file.read())
image_file.close()
im_out = encode(f'tmpim{uniqnum}.png',data)
return im_out
###################################
################ crypt #####################
def calculate_hash(data, hash_function: str = "sha256") -> str:
if type(data) == str:
data = bytearray(data, "utf-8")
if hash_function == "sha256":
h = SHA256.new()
h.update(data)
return h.hexdigest()
if hash_function == "ripemd160":
h = RIPEMD160.new()
h.update(data)
return h.hexdigest()
def generate_keys():
secret_code="SECRET PASSWORD"
key = RSA.generate(2048)
#private_key = key.export_key('PEM')
private_key = key.export_key(passphrase=secret_code, pkcs=8,
protection="scryptAndAES128-CBC")
print(f'private_key:: {private_key}')
file_out_priv = open("private.pem", "wb")
file_out_priv.write(private_key)
file_out_priv.close()
public_key = key.publickey().export_key('PEM')
file_out_pub = open("receiver.pem", "wb")
file_out_pub.write(public_key)
file_out_pub.close()
hash_1 = calculate_hash(public_key, hash_function="sha256")
hash_2 = calculate_hash(hash_1, hash_function="ripemd160")
address = base58.b58encode(hash_2)
address_im=qr.make_qr(txt=address)
add_label = str(address)
add_label = add_label.strip("b").strip("'")
address_im = overlay.textover(address_im, "Wallet",add_label)
address_im.save("address_im.png")
#qr_link="test"
address_im = "address_im.png"
private_key_im = overlay.textover("private_key.png", "Key",add_label)
private_key_im.save("private_key_im.png")
priv_key = conv_im("private_key_im.png",data=private_key)
pub_key = conv_im("address_im.png",data=public_key)
return public_key,private_key,address_im,address,priv_key,pub_key
def encrypt_text(data,pub_im,priv_im,address):
pub_key = decode(pub_im)
data = data.encode("utf-8")
recipient_key = RSA.import_key(pub_key)
session_key = get_random_bytes(16)
# Encrypt the session key with the public RSA key
cipher_rsa = PKCS1_OAEP.new(recipient_key)
enc_session_key = cipher_rsa.encrypt(session_key)
# Encrypt the data with the AES session key
cipher_aes = AES.new(session_key, AES.MODE_EAX)
ciphertext, tag = cipher_aes.encrypt_and_digest(data)
file_out = open("encrypted_data.bin", "wb")
[ file_out.write(x) for x in (enc_session_key, cipher_aes.nonce, tag, ciphertext) ]
file_out.close()
doc_name = "encrypted_data.bin"
with open(doc_name, "rb") as file:
file_data =(file.read())
print (f'file_data::{file_data}')
qr_link="test"
#trans_im1.save("trans_im.png")
hash_1 = calculate_hash(pub_key, hash_function="sha256")
hash_2 = calculate_hash(hash_1, hash_function="ripemd160")
address = base58.b58encode(hash_2)
add_label = str(address)
add_label = add_label.strip("b").strip("'")
trans_im1=qr.make_qr(txt=address, color_b="#ECFD08")
private_key_im = overlay.textover(trans_im1, "Transaction",add_label)
private_key_im.save("private_key_im.png")
enc_qr = conv_im("private_key_im.png",data=file_data)
file.close()
return str(file_data),enc_qr
def decrypt_text(im,in2):
enc_in = decode(im)
secret_code="SECRET PASSWORD"
#private_key = RSA.import_key(open("private.pem").read())
priv_key = decode(in2)
print(f'priv_key:: {priv_key}')
private_key = RSA.import_key(priv_key,passphrase=secret_code)
print(f'private_key:: {private_key}')
enc_session_key = enc_in[:private_key.size_in_bytes()]
end1 = private_key.size_in_bytes()+16
nonce = enc_in[private_key.size_in_bytes():end1]
start1=end1+1
end2 = private_key.size_in_bytes()+32
start2=end2+1
tag = enc_in[end1:end2]
ciphertext = enc_in[end2:]
print (f'enc_session_key::{enc_session_key}')
print (f'nonce::{nonce}')
print (f'tag::{tag}')
print (f'ciphertext::{ciphertext}')
# Decrypt the session key with the private RSA key
cipher_rsa = PKCS1_OAEP.new(private_key)
session_key = cipher_rsa.decrypt(enc_session_key)
# Decrypt the data with the AES session key
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce)
data = cipher_aes.decrypt_and_verify(ciphertext, tag)
return(data.decode("utf-8"))