Key Generation:
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa # Generate the public/private key pair. private_key = rsa.generate_private_key( public_exponent = 65537, key_size = 4096, backend = default_backend(), ) # Save the private key to a file. with open('private.key', 'wb') as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) ) # Save the public key to a file. with open('public.pem', 'wb') as f: f.write( private_key.public_key().public_bytes( encoding = serialization.Encoding.PEM, format = serialization.PublicFormat.SubjectPublicKeyInfo, ) )
Signing:
#!/usr/bin/env python3 import base64 import os import sys from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding from multiprocessing import Pool from multiprocessing import Process class SignFiles: def __init__(self): self.private_key = None def open_private_key(self): with open('private.key', 'rb') as key_file: self.private_key = serialization.load_pem_private_key( key_file.read(), password = None, backend = default_backend(), ) #def sign_file(self, in_path, tfile, out_path): def sign_file(self, listFullPath): for files in listFullPath: with open(files, 'rb') as f: payload = f.read() signature = base64.b64encode( self.private_key.sign( payload, padding.PSS( mgf = padding.MGF1(hashes.SHA256()), salt_length = padding.PSS.MAX_LENGTH, ), hashes.SHA256(), ) ) sig=files + ".sig" #print("sig={}".format(sig)) with open(sig, 'wb') as f: f.write(signature) def getFullFilePath(self, path): fullpath=[] for root, dirs, files in os.walk(path): for f in files: fullpath.append(os.path.join(root, f)) return fullpath def getOnlyRootFilePath(self, path): files = [] dirs = [] for f in os.listdir(path): ffile = os.path.join(path, f) if os.path.isfile(ffile): files.append(ffile) elif os.path.isdir(ffile): dirs.append(ffile) return dirs, files if __name__ == "__main__": # Create multiple process to handle the request sf=SignFiles() sf.open_private_key() dirs = [] files = [] if ( len(sys.argv) < 2): print("enter a root path") exit(1) dirs, files=sf.getOnlyRootFilePath(sys.argv[1]) processes=[] for ddir in dirs: fullpath = sf.getFullFilePath(ddir) p=Process(target=sf.sign_file, args=(fullpath,)) processes.append(p) p=Process(target=sf.sign_file, args=(files,)) processes.append(p) for p in processes: p.start() for p in processes: p.join()
Verification:
import base64 import os import sys import cryptography.exceptions from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.serialization import load_pem_public_key from multiprocessing import Process class VerifySig: def __init__(self): self.public_key = None def open_public_key(self): with open('public.pem', 'rb') as f: self.public_key = load_pem_public_key(f.read(), default_backend()) def verify_file(self, listFullPath): for files in listFullPath: try: with open(files, 'rb') as f: payload_contents = f.read() with open(files+".sig", 'rb') as f: signature = base64.b64decode(f.read()) # Perform the verification. try: self.public_key.verify( signature, payload_contents, padding.PSS( mgf = padding.MGF1(hashes.SHA256()), salt_length = padding.PSS.MAX_LENGTH, ), hashes.SHA256(), ) except cryptography.exceptions.InvalidSignature as e: print('ERROR: signature {} failed verification!'\ .format(files+".sig")) return False except Exception as e: print("ERROR: can't not found {}".format(files+".sig")) return False def getFullFilePath(self, path): fullpath=[] for root, dirs, files in os.walk(path): for f in files: if f.endswith('.sig'): continue else: fullpath.append(os.path.join(root, f)) return fullpath def getOnlyRootFilePath(self, path): try: files = [] dirs = [] for f in os.listdir(path): ffile = os.path.join(path, f) if os.path.isfile(ffile): if ffile.endswith('.sig'): continue else: files.append(ffile) elif os.path.isdir(ffile): dirs.append(ffile) return dirs, files except Exception as e: print('ERROR: signature files failed verification!') return False if __name__ == "__main__": # Create multiple process to handle the request vs=VerifySig() vs.open_public_key() dirs = [] files = [] if ( len(sys.argv) < 2): print("enter a root path") exit(1) dirs, files=vs.getOnlyRootFilePath(sys.argv[1]) processes=[] for ddir in dirs: fullpath = vs.getFullFilePath(ddir) p=Process(target=vs.verify_file, args=(fullpath,)) processes.append(p) p=Process(target=vs.verify_file, args=(files,)) processes.append(p) for p in processes: p.start() for p in processes: p.join()
Copy it is from Here
沒有留言:
張貼留言