import gradio as gr import chatchain from chatchain import MyChainSend,MyChainRec import crypt import os from huggingface_hub import (upload_file,HfApi) import requests import pandas as pd import json rec_list = [] pa=os.environ['PASS'] main_chain='https://huggingface.co/datasets/Omnibus/chat-at/raw/main/chat/' rec_list, rec_drop = chatchain.update_rec_list(main_chain) def checkp(inp): if inp == pa: return gr.update(visible=False), gr.update(visible=True) elif inp != pa: return gr.update(visible=True), gr.update(visible=False) def get_my_chain_send(sender_name=None): global mychain_send print(sender_name) mes_error = "" try: r1 = requests.get(f'{main_chain}{sender_name}/{sender_name}.json') print (f's={r1.text}') mychain_send = chatchain.MyChainSend(chain_load=f'{main_chain}{sender_name}/',load=r1.text) response = {'chain': mychain_send.chain, 'length': len(mychain_send.chain)} #print (f'response={response}') message = f"Blockchain loaded from: {main_chain}{sender_name}/{sender_name}.json" return response,message except Exception: message = f"Error loading from: {sender_name}" print (message) return {"MESSAGE":"Error Loading Chain"},message def get_my_chain_rec(recipient_name=None): global mychain_rec print(recipient_name) try: r2 = requests.get(f'{main_chain}{recipient_name}/{recipient_name}.json') print (f'r={r2.text}') mychain_rec = MyChainRec(chain_load=f'{main_chain}{recipient_name}/',load=r2.text) response = {'chain': mychain_rec.chain, 'length': len(mychain_rec.chain)} message = f"Blockchain loaded from: {main_chain}{recipient_name}/{recipient_name}.json" return response,message except Exception: try: mychain_rec = MyChainRec(chain_load=main_chain,create=recipient_name) response = {'chain': mychain_rec.chain, 'length': len(mychain_rec.chain)} message = f"Blockchain loaded from: {main_chain}{recipient_name}/{recipient_name}.json" return response,message except Exception: message = f"Error loading from: {recipient_name}" return ["Error Loading Chain"],message def display_chain_send(): response = {'chain': mychain_send.chain, 'length': len(mychain_send.chain)} return response def display_chain_rec(): response = {'chain': mychain_rec.chain, 'length': len(mychain_rec.chain)} return response def valid_send(): valid,ind,mes = mychain_send.chain_valid(mychain_send.chain) if valid: response = 'The Blockchain is valid.' z=True else: response = f'Sender Blockchain is not valid. {mes} at Index {ind}' z=False return response,z,mes,ind def valid_rec(): valid,ind,mes = mychain_rec.chain_valid(mychain_rec.chain) if valid: response = 'The Blockchain is valid.' z=True else: response = f'Blockchain is not valid. {mes} at Index {ind}' z=False return response,z, mes, ind def mychain_mine_block_send(chain_r=None,chain_n=None): previous_block = mychain_send.print_previous_block() previous_proof = previous_block['proof'] proof = mychain_send.proof_of_work(previous_proof) previous_hash = mychain_send.hash(previous_block) block = mychain_send.create_block(proof, previous_hash,chain_r,chain_n) response = {'message': 'A block is MINED', 'index': block['index'], 'timestamp': block['timestamp'], 'proof': block['proof'], 'previous_hash': block['previous_hash'] } message = "A block is MINED" show_chain = display_chain_send() if len(mychain_send.chain) > 1000: mychain_send.reset() response = None show_chain=display_chain_send() message = "New Chain Created at Max 20 Blocks" return response, show_chain,message def mychain_mine_block_rec(chain_r=None,chain_n=None): previous_block = mychain_rec.print_previous_block() previous_proof = previous_block['proof'] proof = mychain_rec.proof_of_work(previous_proof) previous_hash = mychain_rec.hash(previous_block) block = mychain_rec.create_block(proof, previous_hash,chain_r,chain_n) response = {'message': 'A block is MINED', 'index': block['index'], 'timestamp': block['timestamp'], 'proof': block['proof'], 'previous_hash': block['previous_hash'] } message = "A block is MINED" show_chain = display_chain_rec() if len(mychain_rec.chain) > 1000: mychain_rec.reset() response = None show_chain=display_chain_rec() message = "New Chain Created at Max 20 Blocks" return response, show_chain, message ############################################## ############################################ def send_message(send,rec,message): response_send={} response_rec={} show_chain_send={} show_chain_rec={} data_send=None data_rec=None rec_send=None rec_drop=None message_send=None message_rec=None trans_data=None trans_mes=None balance_send = 0 balance_rec = 0 mes = "blank message" try: response,message_out = get_my_chain_send(send) print(f'response1:: {response}') print(f'message1:: {message_out}') send_rec = response response,message_out = get_my_chain_rec(rec) print(f'response2:: {response}') print(f'message2:: {message_out}') lod_rec = response p=True except Exception as e: mes = f"An Error Occured: {e}" p=False #print (mes) if p==False: return (mes, p,None,None,None,None,None,None,None,None,None,None,None) if p==True: print(mychain_send.chain[0]["message"]) mychain_send.new_transaction(f"{send}",f"{rec}",f"{message}",mychain_send.chain[0]["message"]) message_send = "Transaction Added to Pool" data_send = pd.DataFrame(mychain_send.pending_transactions) mychain_rec.new_transaction(f"{send}",f"{rec}",f"{message}",mychain_rec.chain[0]["message"]) message_rec = "Transaction Added to Pool" data_rec = pd.DataFrame(mychain_rec.pending_transactions) response_send, show_chain_send, message_send = mychain_mine_block_send(chain_r=None,chain_n=send) #mychain_mine_block_trans(balance_send, chain_r=None,chain_n=send) response_rec, show_chain_rec, message_rec = mychain_mine_block_rec(chain_r=None,chain_n=rec) mes = (f'Send: {message_send} :: Recieve: {message_rec}') #_,rec_send=update_send_list() _,rec_drop=chatchain.update_rec_list(main_chain) #trans_bx = merge_trans() #trans_data, mes = bc_utils.bc_transactions(trans_bx) return (mes,show_chain_send) with gr.Blocks() as app: with gr.Row(visible=True) as invalid: pass_box = gr.Textbox() pass_btn = gr.Button() with gr.Group(visible=False) as valida: gr.Column() with gr.Column(): with gr.Row(): with gr.Tab("Messages"): with gr.Accordion("Key"): input_key = gr.Image(label="Key",type="filepath") with gr.Row(): with gr.Column(): sender=gr.Textbox(label = "Sender", interactive = False) rec=gr.Dropdown(label="Recipient", choices=[f for f in rec_list], allow_custom_value=True, interactive=True) send_mes=gr.Textbox(label="Message", lines=6) send_mes_btn=gr.Button() with gr.Column(): block_text = gr.Textbox(label = "System", interactive = False) rec_mes = gr.JSON() response_json=gr.JSON() #rec_mes = gr.Textbox(lines=6) with gr.Tab("BC"): with gr.Row(): with gr.Tab("Gen Wal"): gen_wal_btn=gr.Button() seed = gr.Textbox(label='Seed Phrase') img1=gr.Image(label='Private Key',type='filepath') out1 = gr.Textbox(label='Private Key',max_lines=4) img2=gr.Pil(label='Public Key') out2 = gr.Textbox(label='Public Key',max_lines=4) img3=gr.Pil(label='Address') out3 = gr.Textbox(label='Address') with gr.Tab("Encrypt"): rsa_to_enc = gr.Textbox(label="txt to encrypt") pub_key_in = gr.Image(label="Public Key", type="filepath") priv_key_in1 = gr.Image(label="Private Key(sig)", type="filepath") rsa_enc_btn = gr.Button("RSA Encrypt") rsa_enc_mes = gr.Textbox(label="encoded", max_lines=4) qr_enc_mes = gr.Image(type="filepath") with gr.Tab("Decrypt"): mes_in = gr.Image(label="Message", type="filepath") priv_key_in = gr.Image(label="Private Key", type="filepath") rsa_dec_btn = gr.Button("RSA Decrypt") rsa_dec_mes = gr.Textbox(label="decoded") in_chain=gr.Textbox(value=main_chain,visible=False) gr.Column() def display_messages(im): out = [] address,key=crypt.address(im) response, message_out = get_my_chain_send(address) #print (f'response :: {response}') #[demoDictionary[dict_key] for dict_key in demoDictionary] #for ip,dictValue in enumerate(response): for i,ip in enumerate(response['chain']): #print (f'i === {i}') #print (f'ip === {ip}') if i == 0: pass else: try: ts = response['chain'][i]['timestamp'] ea = response['chain'][i]['message'] #print (ea) ea = ea[0] #print (ea) dec = crypt.decrypt_trans(ea,im) print(f'dec === {dec}') dec=eval(dec) #json_dec=json.loads(f"{dec}") #dec = dec.strip('"') out1 = { 'timestamp':ts, 'sender': dec['sender'] if dec['sender'] != address else 'Sender', 'recipient': dec['recipient'] if dec['recipient'] != address else 'Recipient', 'message': dec['message'] } out.append(out1) #out.append(dec.strip('"')) #out.append("#########################") #out.append(response['chain'][i]['message']) except Exception as e: print (f'decrypt error ::: {e}') pass #out.append(response[ip]["message"] for ip in response) return (address,message_out,response,out) def test_fn(im): return (im) def create_new_chain(address,pub_key,im): address = str(address.strip("b").strip("'")) mychain_rec = chatchain.MyChainRec(chain_load=f'{main_chain}{address}/',create=address,pub_key=pub_key) chatchain.store_image(img=im,chain_load=f'{main_chain}',address=address) response = {'chain': mychain_rec.chain, 'length': len(mychain_rec.chain)} message = f"Blockchain loaded from: {main_chain}{address}.json" #send_list,send_drop = update_send_list() rec_list,rec_drop = chatchain.update_rec_list() #rec_drop = gr.Dropdown.update(label="Recipient", choices=[f for f in rec_list]) return response,message,rec_drop def rec_list_up(inp): _,out = chatchain.update_rec_list(inp) return (out) send_mes_btn.click(send_message,[sender,rec,send_mes],[block_text,response_json]) input_key.change(display_messages,input_key,[sender,block_text,response_json,rec_mes]).then(rec_list_up,in_chain,rec) pass_btn.click(checkp,pass_box,[invalid,valida]) gen_wal_btn.click(crypt.generate_keys,None,[out2,out1,img3,out3,img1,img2]).then(create_new_chain,[out3,out2,img1],[response_json,block_text,rec]).then(test_fn,[img1],[input_key]) rsa_enc_btn.click(crypt.encrypt_text,[rsa_to_enc,pub_key_in,priv_key_in1,out3],[rsa_enc_mes,qr_enc_mes]) rsa_dec_btn.click(crypt.decrypt_text,[mes_in,priv_key_in],rsa_dec_mes) app.launch()