赞
踩
import abc import base64 import random from enum import Enum from Crypto.Cipher import PKCS1_OAEP, AES from Crypto.Hash import SHA256, SHA1 from Crypto.PublicKey import RSA from Crypto.Signature import pss from gmssl import sm2, sm4 # str 不是 16 的倍数那就补足为 16 的倍数 # key = '1234567890123456' # 密钥长度必须为 16、24 或 32 位,分别对应 AES-128、AES-192 和 AES-256 def pad_to_16(key): while len(key) % 16 != 0: key += '\0' return str.encode(key) # 返回bytes class BaseEncryption(metaclass=abc.ABCMeta): @abc.abstractmethod def encrypt(self, *args, **kwargs): pass @abc.abstractmethod def decrypt(self, *args, **kwargs): pass
class RsaEncryption(BaseEncryption): @staticmethod def encrypt(plaintext, public_key): """ 公钥分段加密 rsa/ecb/oaepwithsha-256andmgf1padding 算法加密 """ max_encrypt_block = 190 # 最长加密块长度 public_key = RSA.import_key(public_key) data = plaintext.encode('utf-8') cipher = PKCS1_OAEP.new(public_key, hashAlgo=SHA256, mgfunc=lambda x, y: pss.MGF1(x, y, SHA1)) if len(data) <= max_encrypt_block: text = cipher.encrypt(data) else: text = b'' for i in range(0, len(data), max_encrypt_block): text += cipher.encrypt(data[i:i + max_encrypt_block]) return base64.b64encode(text).decode() @staticmethod def decrypt(encrypt_text, private_key): """ 私钥分段解密 对 rsa/ecb/oaepwithsha-256andmgf1padding 加密的信息做解密 """ max_decrypt_block = 256 # 最长解密块为密钥长度/8 res = [] b64_data = base64.b64decode(encrypt_text) private_key = RSA.import_key(private_key) cipher = PKCS1_OAEP.new(private_key, hashAlgo=SHA256, mgfunc=lambda x, y: pss.MGF1(x, y, SHA1)) if len(b64_data) <= max_decrypt_block: text = cipher.decrypt(b64_data) return text.decode('utf-8') else: for i in range(0, len(b64_data), max_decrypt_block): res.append(cipher.decrypt(b64_data[i:i + max_decrypt_block])) return str(b''.join(res), encoding='utf-8')
class AesEncryption(BaseEncryption): @staticmethod def encrypt(plaintext, key: str = "api-test@test-api"): text = base64.b64encode(plaintext.encode('utf-8')).decode('ascii') # 初始化加密器 aes = AES.new(pad_to_16(key), AES.MODE_ECB) # 先进行 aes 加密 encrypt_aes = aes.encrypt(pad_to_16(text)) # 用 base64 转成字符串形式 encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 执行加密并转码返回bytes return encrypted_text.strip() @staticmethod def decrypt(encrypted_text, key: str = "api-test@test-api"): # 初始化加密器 aes = AES.new(pad_to_16(key), AES.MODE_ECB) # 优先逆向解密 base64 成 bytes base64_decrypted = base64.decodebytes(encrypted_text.encode(encoding='utf-8')) decrypted_text = str(aes.decrypt(base64_decrypted), encoding='utf-8') # 执行解密密并转码返回str decrypted_text = base64.b64decode(decrypted_text.encode('utf-8')).decode('utf-8') return decrypted_text
class Sm2Encryption(BaseEncryption): @staticmethod def encrypt(plaintext: str, public_key: str) -> str: """ sm2 加密 :param plaintext: 待加密字符串(bytes) :param public_key: 加密用的 16 进制公钥 :return: """ prefix = "04" # 压缩标识:02/03:压缩;04:未压缩 enc_byte = plaintext.encode() # 此处必须去掉前两位的压缩标识,CryptSM2的init源码中有坑使用的是public_key.lstrip("04")会移除前面所有0或4的字符串 public_key = public_key[2:] if public_key.startswith(prefix) else public_key sm2_crypt = sm2.CryptSM2(private_key='', public_key=public_key, mode=1, asn1=True) return base64.b64encode(bytes.fromhex(prefix) + sm2_crypt.encrypt(enc_byte)).decode() @staticmethod def decrypt(encrypted_text: str, private_key: str): """ sm2 解密,密文要从第二位开始解密 :param encrypted_text: 待解密字符串(bytes) :param private_key: 解密用的 16 进制私钥 :return: """ # sm2加密后拼接了压缩标识【04】,故而解密需从第二位开始解密 b64 = base64.b64decode(encrypted_text.encode())[1:] sm2_crypt = sm2.CryptSM2(private_key=private_key, public_key='', mode=1, asn1=True) return sm2_crypt.decrypt(b64).decode()
class Sm4Encryption(BaseEncryption): class CipherModeEnum(Enum): ECB = "ecb" CBC = "cbc" def __init__(self, cipher_mode: str = CipherModeEnum.ECB.value): """ :param cipher_mode: ecb or cbc """ if cipher_mode not in Sm4Encryption.CipherModeEnum._value2member_map_: raise ValueError('cipher mode just support cbc or ecb') self.cipher_mode = cipher_mode def encrypt(self, plaintext: str, key: str = "api-test@test-api") -> str: """ :param key: :param plaintext: :return: """ enc_data = plaintext.encode() crypt_sm4 = sm4.CryptSM4() crypt_sm4.set_key(key.encode(), sm4.SM4_ENCRYPT) iv = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' if self.cipher_mode == 'ecb': encrypt_value = crypt_sm4.crypt_ecb(enc_data) else: encrypt_value = crypt_sm4.crypt_cbc(iv, enc_data) return base64.b64encode(bytes.fromhex(encrypt_value.hex())).decode(encoding="utf-8") def decrypt(self, encrypted_text: str, key: str = "api-test@test-api"): """ :param key: :param encrypted_text: :return: """ b64 = base64.b64decode(encrypted_text) crypt_sm4 = sm4.CryptSM4() crypt_sm4.set_key(key.encode(), sm4.SM4_DECRYPT) iv = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' if self.cipher_mode == 'ecb': decrypt_value = crypt_sm4.crypt_ecb(b64) else: decrypt_value = crypt_sm4.crypt_cbc(iv, b64) return decrypt_value.decode(encoding='utf-8')
class EncryptionAlgoEnum(Enum): RSA = (1, RsaEncryption, "rsa") SM2 = (2, Sm2Encryption, "sm2") AES = (3, AesEncryption, "aes") SM4 = (4, Sm4Encryption, "sm4") @property def code(self): return self.value[0] @property def interface(self): return self.value[1] @property def tag(self): return self.value[2] @classmethod def get_crypt_algo_type_by_code(cls, code): return [k for k in EncryptionAlgoEnum.__members__.items() if k[1].value[0] == int(code)][-1][1] @classmethod def get_crypt_algo_type_by_tag(cls, tag): return [k for k in EncryptionAlgoEnum.__members__.items() if k[1].value[1] == str(tag)][-1][1]
if __name__ == '__main__': rsa_pri_key = """-----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCR2eBgYkRLPZgs OFf2/81o5i69/S2rjGic/YW7Clm1mokiYZ1YLDEHH2c7tdvXJrmfLavc4ngruX4y Uddn9A2+Ts+B5YbsbRrzOJ9ok6CA0MaVpAYejmaT/bq3e2jBY2rwI7NBzfx7c1H/ gRNY2kMTGK6l2aI4fg8jxCLt+5jOU9UbvEmwd4VNBcqfUpb9VTukdCfXt9p3jtLB ej2bc2CQwReuv8FldhuGzDgHLvRRplXeGQ1i2OwvCguJiZH8mCq4/18336Mf0Xoc bG7BmQYATPf+0OOkb1fJ9vDTtgLDB0T1BxiajoUNP5F4RLWer0eIDnF6iO46zwjC czasKs3XAgMBAAECggEAEDog7ChSt9pRAW59XvZD6M1fWvT9rU3wX7MXO2Gh7IIW itQ4eNRl2giE6FBJ9syQr2SZ7/fkbdzz7O5NHHw7QTasgKPEWK6k5nWgyrZOD7bq uUBoC+NFB2cd7IcW9xk0DyaxrXsZAbUpZNd8CxExkEDZQYuOsGFaFt0eE8M/04i2 L9QPB3WQ3lN+sK8q9AfTBK3yv8MkGI8+00Ihp9TLZ1XqhtvlJSxKs6vLFAcH8rzP ZKsSnqM7KGVlv5azkIeFeNLWHIzjepjA3V4ncnbu4nOBiuNGlcNh9++ELmHF+fNk EmGO4u+GTgfuaECFtjdeq+DUH8JJ0dPXOEMhGX8p2QKBgQC5yxa3CodwW832dQ+u pQaN3J7pZfgUrHK1idGSyGxuehEU+wyjSSwOC1eUGaP4dmj6azmvz5Ng4xPa7Ew3 TemNz7HGVnx8L+YMYa+cJHEfaWEvByin5nnPgy79Fy6aO49uK1egFGFNSPV7i4Sd nnGt/8ye+g65uS2MwSxAN3V/WQKBgQDI9u8G0xFNI3tg7glFTr9yy/WihCZ495BZ Bq7MAQWsaNUOR0Rh6/+gDDG2j0m3Yjnk9JFXpuctZGnVyWi1fBGjjW25e8NiyBjz BJbpPNrkxJWpdeG51gKaOUgz2YfnooHKrt/3Cn1LO6KrNBPZMw7++NpWWh+aRE6A cVWFOUbArwKBgHYLQdBkQS8zNQYc1CzrLuHdRZ4XKmrAMlWDTgNLkhETP10sMJhi OjpmHGu9ar/HQ+WvUMSSRxoszIWw0q/ksbpnNpVEh+1DZY+CVVgIk1MY2iVOEBe1 SLl+qNEm9HYL15JwuUi9CiFXdJjdSRH4BFRADRsI12hK7lTauynF0sJpAoGBAIt/ fWRLVxMoshgSo7TMePCCy7tH6DzH/BcQIH3Tp9CJ5HrI1Zrzarn6PPfwdscpE+7u JYWmgYpszVptOJXhzYyuO6ApXPNQ2qC2atr0Ny3dl9XN0iZhe+T6Jjh3o3MSxu2e fjDJwzdZ6gzVVYHLg3lR3J1cadt31tYp1adonshZAoGAWTgi+QE1ts0HwztCw2CV 3AeKB6wZNfFNYEE4bwZhwWUQMOEjnlwh9IHwjMh5POyFD5BR/YAgihW3/u052/9O 1w7I18QvqjwSyPRVMXhItT51P5QNsugRRcsCNTHvWdozjgKuIP5pLTK+nwE7PPl3 UUNybmttjqJCyfndIOUOVlQ= -----END PRIVATE KEY-----""" rsa_pub_key = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAkdngYGJESz2YLDhX9v/N aOYuvf0tq4xonP2FuwpZtZqJImGdWCwxBx9nO7Xb1ya5ny2r3OJ4K7l+MlHXZ/QN vk7PgeWG7G0a8zifaJOggNDGlaQGHo5mk/26t3towWNq8COzQc38e3NR/4ETWNpD ExiupdmiOH4PI8Qi7fuYzlPVG7xJsHeFTQXKn1KW/VU7pHQn17fad47SwXo9m3Ng kMEXrr/BZXYbhsw4By70UaZV3hkNYtjsLwoLiYmR/JgquP9fN9+jH9F6HGxuwZkG AEz3/tDjpG9Xyfbw07YCwwdE9QcYmo6FDT+ReES1nq9HiA5xeojuOs8IwnM2rCrN 1wIDAQAB -----END PUBLIC KEY-----""" sm_pri_key = 'c124889fb35a23cd3092a62f92ad9f3aafd07876c5bb08bf147ac27b43e15ec7' sm_pub_key = '04aaf5c364472a7b26ab254a834f5b8104ef06387ea7cc9104dd183c6ace0a647a70f19e704721919bcf955a1be8c69c9a9ead286a7d1fdfe067145244c377e1ce' test_data = "test" algo = random.choice([i for i in range(1, 5)]) encryption_algo = EncryptionAlgoEnum.get_crypt_algo_type_by_code(algo) print(f"加密算法为:{encryption_algo.tag}") if encryption_algo.code == 1: pub_key = rsa_pub_key pri_key = rsa_pri_key elif encryption_algo.code == 2: pub_key = sm_pub_key pri_key = sm_pri_key else: pub_key = pri_key = "test-api@api-test" encryption = encryption_algo.interface() encrypt_str = encryption.encrypt(test_data, pub_key) print("加密结果:", encrypt_str) decrypt_str = encryption.decrypt(encrypt_str, pri_key) print("解密结果:", decrypt_str) """ 加密算法为:rsa 加密结果: aUeC06g7ub3KBcEQ3Fhhl7y9oNk/8pF2Ha2rbXC9s+i5IAB+upEbcie8ZTfG8xug7u/FkjJOPXOzLw7C5YwEZKe8M5cQ1GGa1AyyeQkkb1s2qR3ndx9KomlajgtvRC1Dq/HImKQ3Y3YXF+HXQFYpJYEnWjzaKwx6F9KbdhdMUIHCPx3tzCi/hERO6U7dz2QpMXEfOk4MgrOTPEEnJhxWDe3JZu4woTo8OoWOubyLtTdu+MSQOpj1FwUEDjH+qNkitR8kQTe2rjKBoZZj17Qh+4YFzs3IvRkhrer5v0XOPB2VSHEf+fmCi3b7OT9BzMXLupg121rXOYd90ZAi1K46cQ== 解密结果: test """
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。