2020年5月31日 星期日

RSA - Signature feature

Copy it is from Here
Key Generation:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

# Generate the public/private key pair.
private_key = rsa.generate_private_key(
    public_exponent = 65537,
    key_size = 4096,
    backend = default_backend(),
)

# Save the private key to a file.
with open('private.key', 'wb') as f:
    f.write(
        private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption(),
        )
    )

# Save the public key to a file.
with open('public.pem', 'wb') as f:
    f.write(
        private_key.public_key().public_bytes(
            encoding = serialization.Encoding.PEM,
            format = serialization.PublicFormat.SubjectPublicKeyInfo,
        )
    )

Signing:
#!/usr/bin/env python3 
import base64
import os
import sys
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from multiprocessing import Pool
from multiprocessing import Process

class SignFiles:
    def __init__(self):
        self.private_key = None

    def open_private_key(self):
        with open('private.key', 'rb') as key_file:
            self.private_key = serialization.load_pem_private_key(
                key_file.read(),
                password = None,
                backend = default_backend(),
            )

    #def sign_file(self, in_path, tfile, out_path):
    def sign_file(self, listFullPath):
        for files in listFullPath:
            with open(files, 'rb') as f:
                payload = f.read()
            signature = base64.b64encode(
                self.private_key.sign(
                    payload,
                    padding.PSS(
                        mgf = padding.MGF1(hashes.SHA256()),
                        salt_length = padding.PSS.MAX_LENGTH,
                    ),
                    hashes.SHA256(),
                )
            )
            sig=files + ".sig"
            #print("sig={}".format(sig))
            with open(sig, 'wb') as f:
                f.write(signature)

    def getFullFilePath(self, path):
        fullpath=[]
        for root, dirs, files in os.walk(path):
            for f in files:
                fullpath.append(os.path.join(root, f))
        return fullpath

    def getOnlyRootFilePath(self, path):
        files = []
        dirs = []
        for f in os.listdir(path):
            ffile = os.path.join(path, f)
            if os.path.isfile(ffile):
                files.append(ffile)
            elif os.path.isdir(ffile):
                dirs.append(ffile)
        return dirs, files

if __name__ == "__main__":
    # Create multiple process to handle the request  
    sf=SignFiles()
    sf.open_private_key()
    dirs = []
    files = []
    if ( len(sys.argv) < 2):
        print("enter a root path")
        exit(1)
    dirs, files=sf.getOnlyRootFilePath(sys.argv[1])
    processes=[]
    for ddir in dirs:
        fullpath = sf.getFullFilePath(ddir)
        p=Process(target=sf.sign_file, args=(fullpath,))
        processes.append(p)
    p=Process(target=sf.sign_file, args=(files,))
    processes.append(p)

    for p in processes:
        p.start()
    for p in processes:
        p.join()

Verification:
import base64
import os
import sys
import cryptography.exceptions
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from multiprocessing import Process

class VerifySig:
    def __init__(self):
        self.public_key = None

    def open_public_key(self):
        with open('public.pem', 'rb') as f:
            self.public_key = load_pem_public_key(f.read(), default_backend())

    def verify_file(self, listFullPath):
        for files in listFullPath:
            try:
                with open(files, 'rb') as f:
                    payload_contents = f.read()
                with open(files+".sig", 'rb') as f:
                    signature = base64.b64decode(f.read())

                # Perform the verification.
                try:
                    self.public_key.verify(
                        signature,
                        payload_contents,
                        padding.PSS(
                            mgf = padding.MGF1(hashes.SHA256()),
                            salt_length = padding.PSS.MAX_LENGTH,
                        ),
                        hashes.SHA256(),
                    )
                except cryptography.exceptions.InvalidSignature as e:
                    print('ERROR: signature {} failed verification!'\
                           .format(files+".sig"))
                    return False
            except Exception as e:
                print("ERROR: can't not found {}".format(files+".sig"))
                return False

    def getFullFilePath(self, path):
        fullpath=[]
        for root, dirs, files in os.walk(path):
            for f in files:
                if f.endswith('.sig'):
                    continue
                else:
                    fullpath.append(os.path.join(root, f))
        return fullpath

    def getOnlyRootFilePath(self, path):
        try:
            files = []
            dirs = []
            for f in os.listdir(path):
                ffile = os.path.join(path, f)
                if os.path.isfile(ffile):
                    if ffile.endswith('.sig'):
                        continue
                    else:
                        files.append(ffile)
                elif os.path.isdir(ffile):
                    dirs.append(ffile)
            return dirs, files
        except Exception as e:
            print('ERROR: signature files failed verification!')
            return False

if __name__ == "__main__":
    # Create multiple process to handle the request  
    vs=VerifySig()
    vs.open_public_key()
    dirs = []
    files = []
    if ( len(sys.argv) < 2):
        print("enter a root path")
        exit(1)
    dirs, files=vs.getOnlyRootFilePath(sys.argv[1])
    processes=[]
    for ddir in dirs:
        fullpath = vs.getFullFilePath(ddir)
        p=Process(target=vs.verify_file, args=(fullpath,))
        processes.append(p)
    p=Process(target=vs.verify_file, args=(files,))
    processes.append(p)

    for p in processes:
        p.start()
    for p in processes:
        p.join()


Copy it is from Here

沒有留言:

張貼留言