diff --git a/pyproject.toml b/pyproject.toml index 24606f3..a516837 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,7 +99,7 @@ target-version = "py310" # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or # McCabe complexity (`C901`) by default. -select = ["E4", "E7", "E9", "F", "B", "UP"] +select = ["E4", "E7", "E9", "F", "B", "UP", "ANN"] ignore = ["UP031", "UP025", "UP032"] # Allow fix for all enabled rules (when `--fix`) is provided. @@ -109,6 +109,10 @@ unfixable = [] # Allow unused variables when underscore-prefixed. dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" +[tool.ruff.lint.per-file-ignores] +"scripts/*.py" = ["ANN"] +"tests/*.py" = ["ANN"] + [tool.ruff.format] # Like Black, use double quotes for strings. quote-style = "double" @@ -135,3 +139,13 @@ docstring-code-format = false # This only has an effect when the `docstring-code-format` setting is # enabled. docstring-code-line-length = "dynamic" + +[tool.ty.environment] +python-version = "3.10" +root = ["."] + +[tool.ty.src] +exclude = ["./lib", "./tests"] + +[tool.ty.rules] +all = "warn" diff --git a/tests/test_mldsa.py b/tests/test_mldsa.py index 023b734..822f1a5 100644 --- a/tests/test_mldsa.py +++ b/tests/test_mldsa.py @@ -183,7 +183,7 @@ def test_sign_with_seed(mldsa_type, rng): # test that the seed type is checked (should be bytes-like, not string) with pytest.raises(TypeError): - _ = mldsa_priv.sign_with_seed(message, "") + _ = mldsa_priv.sign_with_seed(message, " " * ML_DSA_SIGNATURE_SEED_LENGTH) def test_sign_with_seed_and_context(mldsa_type, rng): signature_seed = rng.bytes(ML_DSA_SIGNATURE_SEED_LENGTH) diff --git a/wolfcrypt/__init__.py b/wolfcrypt/__init__.py index 99a27e8..c2514df 100644 --- a/wolfcrypt/__init__.py +++ b/wolfcrypt/__init__.py @@ -17,6 +17,9 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +from typing import cast + +from _cffi_backend import Lib from wolfcrypt._version import __version__, __wolfssl_version__ @@ -50,7 +53,7 @@ if hasattr(_lib, 'WC_RNG_SEED_CB_ENABLED'): if _lib.WC_RNG_SEED_CB_ENABLED: - ret = _lib.wc_SetSeed_Cb(_ffi.addressof(_lib, "wc_GenerateSeed")) + ret = _lib.wc_SetSeed_Cb(_ffi.addressof(cast(Lib, _lib), "wc_GenerateSeed")) if ret < 0: raise WolfCryptApiError("wc_SetSeed_Cb failed", ret) if _lib.FIPS_ENABLED and _lib.FIPS_VERSION >= 5: diff --git a/wolfcrypt/_ffi/lib.pyi b/wolfcrypt/_ffi/lib.pyi index 047e5cf..521bebf 100644 --- a/wolfcrypt/_ffi/lib.pyi +++ b/wolfcrypt/_ffi/lib.pyi @@ -15,6 +15,7 @@ DES3_ENABLED: int ECC_ENABLED: int ED25519_ENABLED: int ED448_ENABLED: int +ERROR_STRINGS_ENABLED: int FIPS_ENABLED: int HMAC_ENABLED: int KEYGEN_ENABLED: int @@ -24,6 +25,7 @@ ML_KEM_ENABLED: int MPAPI_ENABLED: int PWDBASED_ENABLED: int RSA_ENABLED: int +RSA_BLINDING_ENABLED: int RSA_PSS_ENABLED: int SHA_ENABLED: int SHA3_ENABLED: int @@ -68,9 +70,261 @@ WC_ML_DSA_87: int WC_KEYTYPE_ALL: int +PRIVATEKEY_TYPE: int +PUBLICKEY_TYPE: int +CERT_TYPE: int +MAX_DER_DIGEST_SZ: int +SHAh: int +SHA256h: int +SHA384h: int +SHA512h: int + +DILITHIUM_SEED_SZ: int +ECC_TIMING_RESISTANCE_ENABLED: int +WC_RSA_OAEP_PAD: int + RNG: TypeAlias = FFI.CData +def wc_SetSeed_Cb(cb: FFI.CData) -> int: ... +def wolfCrypt_SetPrivateKeyReadEnable_fips(enable: int, key_type: int) -> int: ... +def wc_GetErrorString(error: int) -> FFI.CData: ... + def wc_InitRngNonce_ex(rng: RNG, nonce: bytes, nonce_size: int, heap: FFI.CData, device_id: int) -> int: ... def wc_RNG_GenerateByte(rng: RNG, buffer: FFI.CData) -> int: ... def wc_RNG_GenerateBlock(rng: RNG, buffer: FFI.CData, len: int) -> int: ... def wc_FreeRng(rng: RNG) -> None: ... + +DerBufferPtr: TypeAlias = FFI.CData +EncryptedInfo: TypeAlias = FFI.CData +IntPtr: TypeAlias = FFI.CData +BytePtr: TypeAlias = FFI.CData + +def wc_PemToDer(buff: bytes, buf_size: int, type: int, der: DerBufferPtr, heap: FFI.CData, info: EncryptedInfo, + key_format: IntPtr) -> int: ... +def wc_DerToPemEx(der: bytes, der_size: int, output: FFI.CData, output_size: int, cipher_info: FFI.CData, + type: int) -> int: ... +def wc_FreeDer(der: DerBufferPtr) -> None: ... +def wc_EncodeSignature(out: BytePtr, digest: bytes, digest_size: int, hash_oid: int) -> int: ... + +def wc_PBKDF2(out: BytePtr, passwd: bytes, pass_len: int, salt: bytes, salt_len: int, iterations: int, keylen: int, + hash_type: int) -> int: ... + +def wc_InitSha(obj: FFI.CData) -> int: ... +def wc_ShaCopy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_ShaUpdate(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_ShaFinal(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_ShaFree(obj: FFI.CData) -> None: ... + +def wc_InitSha256(obj: FFI.CData) -> int: ... +def wc_Sha256Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha256Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha256Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha256Free(obj: FFI.CData) -> None: ... + +def wc_InitSha384(obj: FFI.CData) -> int: ... +def wc_Sha384Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha384Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha384Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha384Free(obj: FFI.CData) -> None: ... + +def wc_InitSha512(obj: FFI.CData) -> int: ... +def wc_Sha512Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha512Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha512Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha512Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_224(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_224_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_224_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_224_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_224_Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_256(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_256_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_256_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_256_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_256_Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_384(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_384_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_384_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_384_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_384_Free(obj: FFI.CData) -> None: ... + +def wc_InitSha3_512(obj: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_Sha3_512_Copy(src: FFI.CData, dst: FFI.CData) -> int: ... +def wc_Sha3_512_Update(obj: FFI.CData, data: bytes, size: int) -> int: ... +def wc_Sha3_512_Final(obj: FFI.CData, ret: FFI.CData) -> int: ... +def wc_Sha3_512_Free(obj: FFI.CData) -> None: ... + +def wc_HmacInit(hmac: FFI.CData, heap: FFI.CData, dev_id: int) -> int: ... +def wc_HmacSetKey(hmac: FFI.CData, type: int, key: bytes, length: int) -> int: ... +def wc_HmacUpdate(hmac: FFI.CData, data: bytes, size: int) -> int: ... +def wc_HmacFinal(hmac: FFI.CData, hash: FFI.CData) -> int: ... +def wc_HmacFree(hmac: FFI.CData) -> None: ... + +def wc_HKDF(type: int, in_key: bytes, in_key_size: int, salt: bytes, salt_size: int, info: bytes, info_size: int, out: FFI.CData, out_size: int) -> int: ... +def wc_HKDF_Extract(type: int, salt: bytes, salt_size: int, in_key: bytes, in_key_size: int, out: FFI.CData) -> int: ... +def wc_HKDF_Expand(type: int, in_key: bytes, in_key_size: int, info: bytes, info_size: int, out: FFI.CData, out_size: int) -> int: ... + +CAes: TypeAlias = FFI.CData +CAesSivAssoc: TypeAlias = FFI.CData + +def wc_AesInit(aes: CAes, heap: FFI.CData, dev_id: int) -> int: ... +def wc_AesFree(aes: CAes) -> int: ... +def wc_AesSetKey(aes: CAes, key: bytes, len: int, iv: bytes, dir: int) -> int: ... +def wc_AesCbcEncrypt(aes: CAes, ciphertext_out: BytePtr, plaintext_in: bytes, plaintext_len: int) -> int: ... +def wc_AesCbcDecrypt(aes: CAes, plaintext_out: BytePtr, ciphertext_in: bytes, ciphertext_len: int) -> int: ... +def wc_AesCtrEncrypt(aes: CAes, ciphertext_out: BytePtr, plaintext_in: bytes, plaintext_len: int) -> int: ... +def wc_AesSivEncrypt_ex(key: bytes, key_len: int, assoc: CAesSivAssoc, num_assoc: int, nonce: bytes, nonce_len: int, plaintext: bytes, plaintext_len: int, siv: BytePtr, ciphertext: BytePtr) -> int: ... +def wc_AesSivDecrypt_ex(key: bytes, key_len: int, assoc: CAesSivAssoc, num_assoc: int, nonce: bytes, nonce_len: int, ciphertext: bytes, ciphertext_len: int, siv: bytes, plaintext: BytePtr) -> int: ... +def wc_AesGcmInit(aes: CAes, key: bytes, key_len: int, iv: bytes, iv_len: int) -> int: ... +def wc_AesGcmEncryptUpdate(aes: CAes, ciphertext: BytePtr, plaintext: bytes, plaintext_len: int, auth: bytes, auth_len: int) -> int: ... +def wc_AesGcmEncryptFinal(aes: CAes, auth_tag: BytePtr, auth_tag_len: int) -> int: ... +def wc_AesGcmDecryptUpdate(aes: CAes, plaintext: BytePtr, ciphertext: bytes, ciphertext_len: int, auth: bytes, auth_len: int) -> int: ... +def wc_AesGcmDecryptFinal(aes: CAes, auth: bytes, auth_len: int) -> int: ... + +ChaCha: TypeAlias = FFI.CData + +def wc_Chacha_SetKey(ctx: ChaCha, key: bytes, key_len: int) -> int: ... +def wc_Chacha_SetIV(ctx: ChaCha, iv: bytes, counter: int) -> int: ... +def wc_Chacha_Process(ctx: ChaCha, cipher: BytePtr, plain: bytes, msg_len: int) -> int: ... +def wc_ChaCha20Poly1305_Encrypt(key: bytes, iv: bytes, aad: bytes, aad_len: int, plaintext: bytes, plaintext_len: int, cipher_out: BytePtr, auth_tag: BytePtr) -> int: ... +def wc_ChaCha20Poly1305_Decrypt(key: bytes, iv: bytes, aad: bytes, aad_len: int, ciphertext: bytes, ciphertext_len: int, auth_tag: bytes, plaintext_out: BytePtr) -> int: ... + +DesKey: TypeAlias = FFI.CData + +def wc_Des3_SetKey(des: DesKey, key: bytes, iv: bytes, dir: int) -> int: ... +def wc_Des3_CbcEncrypt(des: DesKey, out: BytePtr, message: bytes, message_len: int) -> int: ... +def wc_Des3_CbcDecrypt(des: DesKey, out: BytePtr, encrypted: bytes, encrypted_len: int) -> int: ... + +RsaKey: TypeAlias = FFI.CData + +def wc_InitRsaKey(key: RsaKey, heap: FFI.CData) -> int: ... +def wc_RsaSetRNG(key: RsaKey, rng: RNG) -> int: ... +def wc_FreeRsaKey(key: RsaKey) -> int: ... +def wc_RsaPublicKeyDecode(input: bytes, in_out_idx: IntPtr, key: RsaKey, input_len:int ) -> int: ... +def wc_RsaPublicEncrypt(plaintext: bytes, plaintext_len: int, out: BytePtr, out_len: int, key: RsaKey, rng: RNG) -> int: ... +def wc_RsaPublicEncrypt_ex(plaintext: bytes, plaintext_len: int, out: BytePtr, out_len: int, key: RsaKey, rng: RNG, pad_type: int, hash_type: int, mgf: int, label: bytes | BytePtr, label_len: int) -> int: ... +def wc_RsaSSL_Verify(signature: bytes, signature_len: int, out: BytePtr, out_len: int, key: RsaKey) -> int: ... +def wc_RsaPSS_Verify(signature: bytes, signature_len: int, out: BytePtr, out_len: int, hash_type: int, mgf: int, key: RsaKey) -> int: ... +def wc_RsaPSS_CheckPadding(digest: bytes, digest_len: int, sig: bytes | BytePtr, sig_len: int, hash_type: int) -> int: ... +def wc_MakeRsaKey(key: RsaKey, size: int, e: int, rng: RNG) -> int: ... +def wc_RsaPrivateKeyDecode(input: bytes, in_out_idx: IntPtr, key: RsaKey, in_len: int) -> int: ... +def wc_RsaEncryptSize(key: RsaKey) -> int: ... +def wc_RsaKeyToDer(key: RsaKey, output: BytePtr, in_len: int) -> int: ... +def wc_RsaPrivateDecrypt(ciphertext: bytes, ciphertext_len: int, out: BytePtr, out_len: int, key: RsaKey) -> int: ... +def wc_RsaPrivateDecrypt_ex(ciphertext: bytes, ciphertext_len: int, out: BytePtr, out_len: int, key: RsaKey, pad_type: int, hash_type: int, mgf: int, label: bytes | BytePtr, label_len: int) -> int: ... +def wc_RsaSSL_Sign(plaintext: bytes, plaintext_len: int, out: BytePtr, out_len: int, key: RsaKey, rng: RNG) -> int: ... +def wc_RsaPSS_Sign(digest: bytes, digest_len: int, out: BytePtr, out_len: int, hash_type: int, mgf: int, key: RsaKey, rng: RNG) -> int: ... + +def wc_GetPkcs8TraditionalOffset(input: BytePtr, in_out_idx: IntPtr, size: int) -> int: ... +def wc_RsaKeyToPublicDer(key: RsaKey, output: BytePtr, in_len: int) -> int: ... + + +MpInt: TypeAlias = FFI.CData + +def mp_init(a: MpInt) -> int: ... +def mp_clear(a: MpInt) -> None: ... +def mp_read_unsigned_bin(a: MpInt, uns: bytes, uns_len: int) -> int: ... +def mp_to_unsigned_bin_len(a: MpInt, out: BytePtr, out_len: int) -> int: ... + +EccKey: TypeAlias = FFI.CData + +def wc_ecc_init(key: EccKey) -> int: ... +def wc_ecc_free(key: EccKey) -> int: ... +def wc_ecc_size(key: EccKey) -> int: ... +def wc_ecc_sig_size(key: EccKey) -> int: ... +def wc_ecc_get_curve_size_from_id(curve_id: int) -> int: ... +def wc_ecc_import_unsigned(key: EccKey, qx: bytes, qy: bytes, d: bytes | FFI.CData, curve_id: int) -> int: ... +def wc_ecc_export_public_raw(key: EccKey, qx: BytePtr, qx_len: IntPtr, qy: BytePtr, qy_len: IntPtr) -> int: ... +def wc_ecc_import_x963(x963: bytes, x963_len: int, key: EccKey) -> int: ... +def wc_ecc_export_x963(key: EccKey, out: BytePtr, out_len: IntPtr) -> int: ... +def wc_ecc_verify_hash(sig: bytes, sig_len: int, hash: bytes, hash_len: int, res: IntPtr, key: EccKey) -> int: ... +def wc_ecc_verify_hash_ex(r: MpInt, s: MpInt, hash: bytes, hash_len: int, res: IntPtr, key: EccKey) -> int: ... +def wc_ecc_make_key(rng: RNG, key_size: int, key: EccKey) -> int: ... +def wc_ecc_set_rng(key: EccKey, rng: RNG) -> int: ... +def wc_ecc_sign_hash(hash: bytes, hash_len: int, out: BytePtr, out_len: IntPtr, rng: RNG, key: EccKey) -> int: ... +def wc_ecc_sign_hash_ex(hash: bytes, hash_len: int, rng: RNG, key: EccKey, r: MpInt, s: MpInt) -> int: ... +def wc_ecc_shared_secret(private_key: EccKey, public_key: EccKey, out: BytePtr, out_len: IntPtr) -> int: ... +def wc_ecc_export_private_raw(key: EccKey, qx: BytePtr, qx_len: IntPtr, qy: BytePtr, qy_len: IntPtr, d: BytePtr, d_len: IntPtr) -> int: ... + +def wc_EccPublicKeyDecode(pub_der: bytes, in_out_idx: IntPtr, key: EccKey, pub_der_len: int) -> int: ... +def wc_EccPrivateKeyDecode(priv_der: bytes, in_out_idx: IntPtr, key: EccKey, priv_der_len: int) -> int: ... +def wc_EccPublicKeyToDer(key: EccKey, pub_der: BytePtr, pub_der_len: int, with_alg_curve: int) -> int: ... +def wc_EccKeyToDer(key: EccKey, priv_key_der: BytePtr, priv_key_len: int) -> int: ... + +Ed25519Key: TypeAlias = FFI.CData + +def wc_ed25519_make_public(key: Ed25519Key, pub_key: BytePtr, pub_key_len: int) -> int: ... +def wc_ed25519_make_key(rng: RNG, keysize: int, key: Ed25519Key) -> int: ... +def wc_ed25519_sign_msg(msg: bytes, msg_len: int, out: BytePtr, out_len: IntPtr, key: Ed25519Key) -> int: ... +def wc_ed25519_verify_msg(sig: bytes, sig_len: int, msg: bytes, msg_len: int, res: IntPtr, key: Ed25519Key) -> int: ... +def wc_ed25519_init(key: Ed25519Key) -> int: ... +def wc_ed25519_free(key: Ed25519Key) -> None: ... +def wc_ed25519_import_public(pub: bytes | BytePtr, pub_len: int, key: Ed25519Key) -> int: ... +def wc_ed25519_import_private_only(priv: bytes | BytePtr, priv_len: int, key: Ed25519Key) -> int: ... +def wc_ed25519_import_private_key(priv: bytes, priv_len: int, pub: bytes, pub_len: int, key: Ed25519Key) -> int: ... +def wc_ed25519_export_public(key: Ed25519Key, pub: BytePtr, pub_len: IntPtr) -> int: ... +def wc_ed25519_export_private_only(key: Ed25519Key, priv: BytePtr, priv_len: IntPtr) -> int: ... +def wc_ed25519_size(key: Ed25519Key) -> int: ... +def wc_ed25519_priv_size(key: Ed25519Key) -> int: ... +def wc_ed25519_pub_size(key: Ed25519Key) -> int: ... +def wc_ed25519_sig_size(key: Ed25519Key) -> int: ... + +Ed448Key: TypeAlias = FFI.CData + +def wc_ed448_make_public(key: Ed448Key, pub_key: BytePtr, pub_key_len: int) -> int: ... +def wc_ed448_make_key(rng: RNG, keysize: int, key: Ed448Key) -> int: ... +def wc_ed448_sign_msg(msg: bytes, msg_len: int, out: BytePtr, out_len: IntPtr, key: Ed448Key, context: bytes | FFI.CData, context_len: int) -> int: ... +def wc_ed448_verify_msg(sig: bytes, sig_len: int, msg: bytes, msg_len: int, res: IntPtr, key: Ed448Key, context: bytes | FFI.CData, context_len: int) -> int: ... +def wc_ed448_init(key: Ed448Key) -> int: ... +def wc_ed448_free(key: Ed448Key) -> None: ... +def wc_ed448_import_public(pub: bytes | BytePtr, pub_len: int, key: Ed448Key) -> int: ... +def wc_ed448_import_private_only(priv: bytes | BytePtr, priv_len: int, key: Ed448Key) -> int: ... +def wc_ed448_import_private_key(priv: bytes, priv_len: int, pub: bytes, pub_len: int, key: Ed448Key) -> int: ... +def wc_ed448_export_public(key: Ed448Key, pub: BytePtr, pub_len: IntPtr) -> int: ... +def wc_ed448_export_private_only(key: Ed448Key, priv: BytePtr, priv_len: IntPtr) -> int: ... +def wc_ed448_size(key: Ed448Key) -> int: ... +def wc_ed448_priv_size(key: Ed448Key) -> int: ... +def wc_ed448_pub_size(key: Ed448Key) -> int: ... +def wc_ed448_sig_size(key: Ed448Key) -> int: ... + +MlKemKey: TypeAlias = FFI.CData + +def wc_KyberKey_Init(type: int, key: MlKemKey, heap: FFI.CData, dev_id: int) -> int: ... +def wc_KyberKey_Free(key: MlKemKey) -> int: ... +def wc_KyberKey_MakeKey(key: MlKemKey, rng: RNG) -> int: ... +def wc_KyberKey_MakeKeyWithRandom(key: MlKemKey, rand: bytes, len: int) -> int: ... +def wc_KyberKey_CipherTextSize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_SharedSecretSize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_Encapsulate(key: MlKemKey, ct: BytePtr, ss: BytePtr, rng: RNG) -> int: ... +def wc_KyberKey_EncapsulateWithRandom(key: MlKemKey, ct: BytePtr, ss: BytePtr, rand: bytes, len: int) -> int: ... +def wc_KyberKey_Decapsulate(key: MlKemKey, ss: BytePtr, ct: bytes, len: int) -> int: ... +def wc_KyberKey_DecodePrivateKey(key: MlKemKey, private_key: bytes, private_key_len: int) -> int: ... +def wc_KyberKey_DecodePublicKey(key: MlKemKey, public_key: bytes, public_key_len: int) -> int: ... +def wc_KyberKey_PublicKeySize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_PrivateKeySize(key: MlKemKey, len: IntPtr) -> int: ... +def wc_KyberKey_EncodePrivateKey(key: MlKemKey, out: BytePtr, len: int) -> int: ... +def wc_KyberKey_EncodePublicKey(key: MlKemKey, out: BytePtr, len: int) -> int: ... + +DilithiumKey: TypeAlias = FFI.CData + +def wc_dilithium_init_ex(key: DilithiumKey, heap: FFI.CData, dev_id: int) -> int: ... +def wc_dilithium_set_level(key: DilithiumKey, level: int) -> int: ... +def wc_dilithium_free(key: DilithiumKey) -> None: ... +def wc_dilithium_import_public(public: bytes, public_len: int, key: DilithiumKey) -> int: ... +def wc_dilithium_export_public(key: DilithiumKey, public: BytePtr, public_len: IntPtr) -> int: ... +def wc_dilithium_verify_ctx_msg(sig: bytes, sig_len: int, ctx: bytes, ctx_len: int, msg: bytes, msg_len: int, res: IntPtr, key: DilithiumKey) -> int: ... +def wc_dilithium_verify_msg(sig: bytes, sig_len: int, msg: bytes, msg_len: int, res: IntPtr, key: DilithiumKey) -> int: ... +def wc_dilithium_make_key(key: DilithiumKey, rng: RNG) -> int: ... +def wc_dilithium_make_key_from_seed(key: DilithiumKey, seed: bytes) -> int: ... +def wc_dilithium_export_private(key: DilithiumKey, out: BytePtr, out_len: IntPtr) -> int: ... +def wc_dilithium_import_private(priv: bytes, priv_size: int, key: DilithiumKey) -> int: ... +def wc_dilithium_sign_ctx_msg(ctx: bytes, ctx_len: int, msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, rng: RNG) -> int: ... +def wc_dilithium_sign_msg(msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, rng: RNG) -> int: ... +def wc_dilithium_sign_ctx_msg_with_seed(ctx: bytes, ctx_len: int, msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, seed: bytes) -> int: ... +def wc_dilithium_sign_msg_with_seed(msg: bytes, msg_len: int, sig: BytePtr, sig_len: IntPtr, key: DilithiumKey, seed: bytes) -> int: ... +def wc_MlDsaKey_GetPrivLen(key: DilithiumKey, len: IntPtr) -> int: ... +def wc_MlDsaKey_GetPubLen(key: DilithiumKey, len: IntPtr) -> int: ... +def wc_MlDsaKey_GetSigLen(key: DilithiumKey, len: IntPtr) -> int: ... diff --git a/wolfcrypt/asn.py b/wolfcrypt/asn.py index dc702d1..a726b78 100644 --- a/wolfcrypt/asn.py +++ b/wolfcrypt/asn.py @@ -20,23 +20,26 @@ # pylint: disable=no-member,no-name-in-module +from __future__ import annotations + import hmac as _hmac from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib from wolfcrypt.exceptions import WolfCryptError, WolfCryptApiError +from wolfcrypt.hashes import _Hash if _lib.SHA_ENABLED: - from wolfcrypt.hashes import Sha + from wolfcrypt.hashes import Sha # ty: ignore[possibly-missing-import] if _lib.SHA256_ENABLED: - from wolfcrypt.hashes import Sha256 + from wolfcrypt.hashes import Sha256 # ty: ignore[possibly-missing-import] if _lib.SHA384_ENABLED: - from wolfcrypt.hashes import Sha384 + from wolfcrypt.hashes import Sha384 # ty: ignore[possibly-missing-import] if _lib.SHA512_ENABLED: - from wolfcrypt.hashes import Sha512 + from wolfcrypt.hashes import Sha512 # ty: ignore[possibly-missing-import] if _lib.ASN_ENABLED: - def pem_to_der(pem, pem_type): + def pem_to_der(pem: bytes, pem_type: int) -> bytes: der = _ffi.new("DerBuffer**") ret = _lib.wc_PemToDer(pem, len(pem), pem_type, der, _ffi.NULL, _ffi.NULL, _ffi.NULL) @@ -49,7 +52,7 @@ def pem_to_der(pem, pem_type): _lib.wc_FreeDer(der) return result - def der_to_pem(der, pem_type): + def der_to_pem(der: bytes, pem_type: int) -> bytes: pem_length = _lib.wc_DerToPemEx(der, len(der), _ffi.NULL, 0, _ffi.NULL, pem_type) if pem_length <= 0: @@ -63,7 +66,7 @@ def der_to_pem(der, pem_type): return _ffi.buffer(pem, pem_length)[:] - def hash_oid_from_class(hash_cls): + def hash_oid_from_class(hash_cls: type[_Hash]) -> int: if _lib.SHA_ENABLED and hash_cls == Sha: return _lib.SHAh elif _lib.SHA256_ENABLED and hash_cls == Sha256: @@ -75,7 +78,7 @@ def hash_oid_from_class(hash_cls): else: raise WolfCryptError(f"Unknown hash class {hash_cls.__name__}") - def make_signature(data, hash_cls, key=None): + def make_signature(data: bytes, hash_cls: type[_Hash], key = None) -> bytes: hash_obj = hash_cls() hash_obj.update(data) digest = hash_obj.digest() @@ -93,7 +96,7 @@ def make_signature(data, hash_cls, key=None): else: return plaintext_sig - def check_signature(signature, data, hash_cls, pub_key): + def check_signature(signature: bytes, data: bytes, hash_cls: type[_Hash], pub_key) -> bool: computed_signature = make_signature(data, hash_cls) decrypted_signature = pub_key.verify(signature) return _hmac.compare_digest(computed_signature, decrypted_signature) diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index 562aabd..c802932 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -20,11 +20,14 @@ # pylint: disable=no-member,no-name-in-module +from __future__ import annotations + +from abc import ABC, abstractmethod from enum import IntEnum from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib -from wolfcrypt.utils import t2b +from wolfcrypt.utils import BytesOrStr, t2b from wolfcrypt.random import Random from wolfcrypt.asn import pem_to_der from wolfcrypt.hashes import hash_type_to_cls @@ -111,12 +114,12 @@ -class _Cipher: +class _Cipher(ABC): """ A **PEP 272: Block Encryption Algorithms** compliant **Symmetric Key Cipher**. """ - def __init__(self, key, mode, IV=None): + def __init__(self, key: BytesOrStr, mode: int, IV: BytesOrStr | None = None) -> None: if mode not in _FEEDBACK_MODES: raise ValueError("this mode is not supported") @@ -152,10 +155,35 @@ def __init__(self, key, mode, IV=None): if IV: self._IV = IV else: # pragma: no cover - self._IV = _ffi.new(f"byte[{self.block_size}]") + self._IV = bytes(self.block_size) + + @property + @abstractmethod + def _native_type(self) -> str: ... + + @property + @abstractmethod + def block_size(self) -> int: ... + + @property + @abstractmethod + def key_size(self) -> int: ... + + @property + @abstractmethod + def _key_sizes(self) -> list[int]: ... + + @abstractmethod + def _set_key(self, direction: int) -> int: ... + + @abstractmethod + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: ... + + @abstractmethod + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: ... @classmethod - def new(cls, key, mode, IV=None, **kwargs): # pylint: disable=W0613 + def new(cls, key: BytesOrStr, mode: int, IV: BytesOrStr | None = None) -> _Cipher: # pylint: disable=W0613 """ Returns a ciphering object, using the secret key contained in the string **key**, and using the feedback mode **mode**, which @@ -168,7 +196,7 @@ def new(cls, key, mode, IV=None, **kwargs): # pylint: disable=W0613 """ return cls(key, mode, IV) - def encrypt(self, string): + def encrypt(self, string: BytesOrStr) -> bytes: """ Encrypts a non-empty string, using the key-dependent data in the object, and with the appropriate feedback mode. @@ -200,7 +228,7 @@ def encrypt(self, string): return _ffi.buffer(result)[:] - def decrypt(self, string): + def decrypt(self, string: BytesOrStr) -> bytes: """ Decrypts **string**, using the key-dependent data in the object and with the appropriate feedback mode. @@ -244,17 +272,20 @@ class Aes(_Cipher): _key_sizes = [16, 24, 32] _native_type = "Aes *" - def _set_key(self, direction): + def _set_key(self, direction: int) -> int: if direction == _ENCRYPTION: + assert self._enc is not None return _lib.wc_AesSetKey( self._enc, self._key, len(self._key), self._IV, _ENCRYPTION) + assert self._dec is not None if self.mode == MODE_CTR: return _lib.wc_AesSetKey( self._dec, self._key, len(self._key), self._IV, _ENCRYPTION) return _lib.wc_AesSetKey( self._dec, self._key, len(self._key), self._IV, _DECRYPTION) - def _encrypt(self, destination, source): + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._enc is not None if self.mode == MODE_CBC: return _lib.wc_AesCbcEncrypt(self._enc, destination, source, len(source)) @@ -264,7 +295,8 @@ def _encrypt(self, destination, source): else: raise ValueError("Invalid mode associated to cipher") - def _decrypt(self, destination, source): + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._dec is not None if self.mode == MODE_CBC: return _lib.wc_AesCbcDecrypt(self._dec, destination, source, len(source)) @@ -283,12 +315,12 @@ class AesSiv: _key_sizes = [32, 48, 64] block_size = 16 - def __init__(self, key): + def __init__(self, key: BytesOrStr) -> None: self._key = t2b(key) if len(self._key) not in AesSiv._key_sizes: raise ValueError(f"key must be {AesSiv._key_sizes} in length, not {len(self._key)}") - def encrypt(self, associated_data, nonce, plaintext): + def encrypt(self, associated_data: BytesOrStr | list[BytesOrStr], nonce: BytesOrStr, plaintext: BytesOrStr) -> tuple[bytes, bytes]: """ Encrypt plaintext data using the nonce provided. The associated data is not encrypted but is included in the authentication tag. @@ -302,20 +334,20 @@ def encrypt(self, associated_data, nonce, plaintext): # Prepare the associated data blocks. Make sure to hold on to the # returned references until the C function has been called in order # to prevent garbage collection of them until the function is done. - associated_data, _refs = ( + prep_associated_data, _refs = ( AesSiv._prepare_associated_data(associated_data)) nonce = t2b(nonce) plaintext = t2b(plaintext) siv = _ffi.new(f"byte[{AesSiv.block_size}]") ciphertext = _ffi.new(f"byte[{len(plaintext)}]") ret = _lib.wc_AesSivEncrypt_ex(self._key, len(self._key), - associated_data, len(associated_data), nonce, len(nonce), + prep_associated_data, len(prep_associated_data), nonce, len(nonce), plaintext, len(plaintext), siv, ciphertext) if ret < 0: # pragma: no cover raise WolfCryptApiError("AES-SIV encryption error", ret) return _ffi.buffer(siv)[:], _ffi.buffer(ciphertext)[:] - def decrypt(self, associated_data, nonce, siv, ciphertext): + def decrypt(self, associated_data: BytesOrStr | list[BytesOrStr], nonce: BytesOrStr, siv: BytesOrStr, ciphertext: BytesOrStr) -> bytes: """ Decrypt the ciphertext using the nonce and SIV provided. The integrity of the associated data is checked. @@ -329,7 +361,7 @@ def decrypt(self, associated_data, nonce, siv, ciphertext): # Prepare the associated data blocks. Make sure to hold on to the # returned references until the C function has been called in order # to prevent garbage collection of them until the function is done. - associated_data, _refs = ( + prep_associated_data, _refs = ( AesSiv._prepare_associated_data(associated_data)) nonce = t2b(nonce) siv = t2b(siv) @@ -338,14 +370,14 @@ def decrypt(self, associated_data, nonce, siv, ciphertext): ciphertext = t2b(ciphertext) plaintext = _ffi.new(f"byte[{len(ciphertext)}]") ret = _lib.wc_AesSivDecrypt_ex(self._key, len(self._key), - associated_data, len(associated_data), nonce, len(nonce), + prep_associated_data, len(prep_associated_data), nonce, len(nonce), ciphertext, len(ciphertext), siv, plaintext) if ret < 0: raise WolfCryptApiError("AES-SIV decryption error", ret) return _ffi.buffer(plaintext)[:] @staticmethod - def _prepare_associated_data(associated_data): + def _prepare_associated_data(associated_data: BytesOrStr | list[BytesOrStr]) -> tuple[_ffi.CData, bytes | list[bytes]]: """ Prepare associated data for sending to C library. @@ -362,9 +394,9 @@ def _prepare_associated_data(associated_data): if isinstance(associated_data, (str, bytes, bytearray, memoryview)): # A single block is provided. # Make sure we have bytes. - associated_data = t2b(associated_data) + associated_data_bytes = t2b(associated_data) result = _ffi.new("AesSivAssoc[1]") - result[0].assoc = _ffi.from_buffer(associated_data) + result[0].assoc = _ffi.from_buffer(associated_data_bytes) result[0].assocSz = len(associated_data) else: # It is assumed that a list is provided. @@ -373,14 +405,14 @@ def _prepare_associated_data(associated_data): raise WolfCryptError("AES-SIV does not support more than 126 blocks " f"of associated data, got: {num_blocks}") # Make sure we have bytes. - associated_data = [t2b(block) for block in associated_data] + associated_data_bytes = [t2b(block) for block in associated_data] result = _ffi.new("AesSivAssoc[]", num_blocks) - for index, block in enumerate(associated_data): + for index, block in enumerate(associated_data_bytes): result[index].assoc = _ffi.from_buffer(block) result[index].assocSz = len(block) # Return the converted associated data blocks so the caller can # hold on to them until the function has been called. - return result, associated_data + return result, associated_data_bytes if _lib.AESGCM_STREAM_ENABLED: @@ -394,7 +426,7 @@ class AesGcmStream: # making sure _lib.wc_AesFree outlives Aes instances _delete = staticmethod(_lib.wc_AesFree) - def __init__(self, key, IV, tag_bytes=16): + def __init__(self, key: BytesOrStr, IV: BytesOrStr, tag_bytes: int = 16) -> None: """ tag_bytes is the number of bytes to use for the authentication tag during encryption """ @@ -420,12 +452,12 @@ def __init__(self, key, IV, tag_bytes=16): if ret < 0: raise WolfCryptApiError("Init error", ret) - def __del__(self): + def __del__(self) -> None: if getattr(self, '_init_done', False): self._delete(self._native_object) self._init_done = False - def set_aad(self, data): + def set_aad(self, data: BytesOrStr) -> None: """ Set the additional authentication data for the stream """ @@ -433,10 +465,10 @@ def set_aad(self, data): raise WolfCryptError("AAD can only be set before encrypt() or decrypt() is called") self._aad = t2b(data) - def get_aad(self): + def get_aad(self) -> bytes: return self._aad - def encrypt(self, data): + def encrypt(self, data: BytesOrStr) -> bytes: """ Add more data to the encryption stream """ @@ -453,7 +485,7 @@ def encrypt(self, data): raise WolfCryptApiError("Encryption error", ret) return bytes(buf) - def decrypt(self, data): + def decrypt(self, data: BytesOrStr) -> bytes: """ Add more data to the decryption stream """ @@ -470,7 +502,7 @@ def decrypt(self, data): raise WolfCryptApiError("Decryption error", ret) return bytes(buf) - def final(self, authTag=None): + def final(self, authTag: BytesOrStr | None = None) -> bytes | None: """ When encrypting, finalize the stream and return an authentication tag for the stream. When decrypting, verify the authentication tag for the stream. @@ -479,11 +511,11 @@ def final(self, authTag=None): if self._mode is None: raise WolfCryptError("Final called with no encryption or decryption") elif self._mode == _ENCRYPTION: - authTag = _ffi.new(f"byte[{self._tag_bytes}]") - ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag, self._tag_bytes) + authTag_out = _ffi.new(f"byte[{self._tag_bytes}]") + ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag_out, self._tag_bytes) if ret < 0: raise WolfCryptApiError("Encryption error", ret) - return _ffi.buffer(authTag)[:] + return _ffi.buffer(authTag_out)[:] else: if authTag is None: raise WolfCryptError("authTag parameter required") @@ -509,7 +541,7 @@ class ChaCha(_Cipher): _IV_nonce = b"" _IV_counter = 0 - def __init__(self, key="", size=32): # pylint: disable=unused-argument + def __init__(self, key: BytesOrStr="", _size: int= 32) -> None: # pylint: disable=unused-argument # size is kept for backwards compatibility; key length is now # derived from the actual key and validated against _key_sizes. self._native_object = _ffi.new(self._native_type) @@ -528,7 +560,7 @@ def __init__(self, key="", size=32): # pylint: disable=unused-argument # collide with _ENCRYPTION (0) or _DECRYPTION (1). _REKEY_BOTH = -1 - def _set_key(self, direction): + def _set_key(self, direction: int) -> int: if self._key is None: return -1 # _REKEY_BOTH re-keys whichever contexts are already allocated, @@ -539,12 +571,14 @@ def _set_key(self, direction): do_enc = self._enc and direction in (self._REKEY_BOTH, _ENCRYPTION) do_dec = self._dec and direction in (self._REKEY_BOTH, _DECRYPTION) if do_enc: + assert self._enc is not None ret = _lib.wc_Chacha_SetKey(self._enc, self._key, len(self._key)) if ret == 0: ret = _lib.wc_Chacha_SetIV(self._enc, self._IV_nonce, self._IV_counter) if ret != 0: return ret if do_dec: + assert self._dec is not None ret = _lib.wc_Chacha_SetKey(self._dec, self._key, len(self._key)) if ret == 0: ret = _lib.wc_Chacha_SetIV(self._dec, self._IV_nonce, self._IV_counter) @@ -552,17 +586,19 @@ def _set_key(self, direction): return ret return 0 - def _encrypt(self, destination, source): + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._enc is not None return _lib.wc_Chacha_Process(self._enc, destination, source, len(source)) - def _decrypt(self, destination, source): + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._dec is not None return _lib.wc_Chacha_Process(self._dec, destination, source, len(source)) _NONCE_SIZE = 12 - def set_iv(self, nonce, counter = 0): + def set_iv(self, nonce: BytesOrStr, counter: int = 0) -> None: self._IV_nonce = t2b(nonce) if len(self._IV_nonce) != self._NONCE_SIZE: raise ValueError(f"nonce must be {self._NONCE_SIZE} bytes, got {len(self._IV_nonce)}") @@ -581,12 +617,12 @@ class ChaCha20Poly1305: _key_sizes = [32] _tag_bytes = 16 - def __init__(self, key): + def __init__(self, key: BytesOrStr) -> None: self._key = t2b(key) if len(self._key) not in self._key_sizes: raise ValueError(f"key must be {self._key_sizes} in length, not {len(self._key)}") - def encrypt(self, aad, iv, plaintext): + def encrypt(self, aad: BytesOrStr, iv: BytesOrStr, plaintext: BytesOrStr) -> tuple[bytes, bytes]: """ Encrypt plaintext data using the IV/nonce provided. The associated data (aad) is not encrypted but is included in the @@ -602,11 +638,11 @@ def encrypt(self, aad, iv, plaintext): ciphertext = _ffi.new(f"byte[{len(plaintext)}]") authTag = _ffi.new(f"byte[{self._tag_bytes}]") ret = _lib.wc_ChaCha20Poly1305_Encrypt( - _ffi.from_buffer(self._key), - _ffi.from_buffer(iv), - _ffi.from_buffer(aad), + self._key, + iv, + aad, len(aad), - _ffi.from_buffer(plaintext), + plaintext, len(plaintext), ciphertext, authTag @@ -615,7 +651,7 @@ def encrypt(self, aad, iv, plaintext): raise WolfCryptApiError("Encryption error", ret) return bytes(ciphertext), bytes(authTag) - def decrypt(self, aad, iv, authTag, ciphertext): + def decrypt(self, aad: BytesOrStr, iv: BytesOrStr, authTag: BytesOrStr, ciphertext: BytesOrStr) -> bytes: """ Decrypt the ciphertext using the IV/nonce and authentication tag provided. The integrity of the associated data (aad) is checked. @@ -632,13 +668,13 @@ def decrypt(self, aad, iv, authTag, ciphertext): ciphertext = t2b(ciphertext) plaintext = _ffi.new(f"byte[{len(ciphertext)}]") ret = _lib.wc_ChaCha20Poly1305_Decrypt( - _ffi.from_buffer(self._key), - _ffi.from_buffer(iv), - _ffi.from_buffer(aad), + self._key, + iv, + aad, len(aad), - _ffi.from_buffer(ciphertext), + ciphertext, len(ciphertext), - _ffi.from_buffer(authTag), + authTag, plaintext ) if ret < 0: @@ -655,9 +691,10 @@ class Des3(_Cipher): """ block_size = 8 key_size = 24 + _key_sizes = [24] _native_type = "Des3 *" - def __init__(self, key, mode, IV=None): + def __init__(self, key: BytesOrStr, mode: int, IV: BytesOrStr | None = None) -> None: # Intentionally stricter than _Cipher.__init__, which accepts both # CBC and CTR. wolfCrypt has no 3DES-CTR implementation, so reject # MODE_CTR here with a clearer error before delegating. @@ -665,30 +702,30 @@ def __init__(self, key, mode, IV=None): raise ValueError("Des3 only supports MODE_CBC") super().__init__(key, mode, IV) - def _set_key(self, direction): + def _set_key(self, direction: int) -> int: if direction == _ENCRYPTION: - return _lib.wc_Des3_SetKey(self._enc, self._key, - self._IV, _ENCRYPTION) + assert self._enc is not None + return _lib.wc_Des3_SetKey(self._enc, self._key, self._IV, _ENCRYPTION) - return _lib.wc_Des3_SetKey(self._dec, self._key, - self._IV, _DECRYPTION) + assert self._dec is not None + return _lib.wc_Des3_SetKey(self._dec, self._key, self._IV, _DECRYPTION) - def _encrypt(self, destination, source): - return _lib.wc_Des3_CbcEncrypt(self._enc, destination, - source, len(source)) + def _encrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._enc is not None + return _lib.wc_Des3_CbcEncrypt(self._enc, destination, source, len(source)) - def _decrypt(self, destination, source): - return _lib.wc_Des3_CbcDecrypt(self._dec, destination, - source, len(source)) + def _decrypt(self, destination: _ffi.CData, source: bytes) -> int: + assert self._dec is not None + return _lib.wc_Des3_CbcDecrypt(self._dec, destination, source, len(source)) if _lib.RSA_ENABLED: class _Rsa: # pylint: disable=too-few-public-methods RSA_MIN_PAD_SIZE = 11 - _mgf = None + _mgf: int | None = None _hash_type = None - def __init__(self, rng=None): + def __init__(self, rng: Random | None = None) -> None: if rng is None: rng = Random() @@ -707,14 +744,14 @@ def __init__(self, rng=None): # making sure _lib.wc_FreeRsaKey outlives RsaKey instances _delete = staticmethod(_lib.wc_FreeRsaKey) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) - def set_mgf(self, mgf): + def set_mgf(self, mgf: int) -> None: self._mgf = mgf - def _get_mgf(self): + def _get_mgf(self) -> None: if self._hash_type == _lib.WC_HASH_TYPE_SHA: self._mgf = _lib.WC_MGF1SHA1 elif self._hash_type == _lib.WC_HASH_TYPE_SHA224: @@ -731,7 +768,7 @@ def _get_mgf(self): class RsaPublic(_Rsa): - def __init__(self, key=None, hash_type=None, rng=None): + def __init__(self, key: BytesOrStr, hash_type: int | None = None, rng: Random | None = None) -> None: super().__init__(rng) if key is not None: @@ -753,11 +790,11 @@ def __init__(self, key=None, hash_type=None, rng=None): if _lib.ASN_ENABLED: @classmethod - def from_pem(cls, file, hash_type=None, rng=None): + def from_pem(cls, file: bytes, hash_type: int | None = None, rng: Random | None = None) -> RsaPublic: der = pem_to_der(file, _lib.PUBLICKEY_TYPE) return cls(key=der, hash_type=hash_type, rng=rng) - def encrypt(self, plaintext): + def encrypt(self, plaintext: BytesOrStr) -> bytes: """ Encrypts **plaintext**, using the public key data in the object. The plaintext's length must not be greater than: @@ -780,7 +817,7 @@ def encrypt(self, plaintext): return _ffi.buffer(ciphertext)[:] - def encrypt_oaep(self, plaintext, label=""): + def encrypt_oaep(self, plaintext: BytesOrStr, label: BytesOrStr = "") -> bytes: if not self._hash_type: raise WolfCryptError("Hash type not set. Cannot use OAEP padding without a hash type.") plaintext = t2b(plaintext) @@ -788,6 +825,7 @@ def encrypt_oaep(self, plaintext, label=""): ciphertext = _ffi.new(f"byte[{self.output_size}]") if self._mgf is None: self._get_mgf() + assert self._mgf is not None ret = _lib.wc_RsaPublicEncrypt_ex(plaintext, len(plaintext), ciphertext, self.output_size, self.native_object, @@ -800,7 +838,7 @@ def encrypt_oaep(self, plaintext, label=""): return _ffi.buffer(ciphertext)[:] - def verify(self, signature): + def verify(self, signature: BytesOrStr) -> bytes: """ Verifies **signature**, using the public key data in the object. The signature's length must be equal to: @@ -822,7 +860,7 @@ def verify(self, signature): return _ffi.buffer(plaintext, ret)[:] if _lib.RSA_PSS_ENABLED: - def verify_pss(self, plaintext, signature): + def verify_pss(self, plaintext: BytesOrStr, signature: BytesOrStr) -> bool: """ Verifies **signature**, using the public key data in the object. The signature's length must be equal to: @@ -842,6 +880,7 @@ def verify_pss(self, plaintext, signature): signature = t2b(signature) if self._mgf is None: self._get_mgf() + assert self._mgf is not None verify = _ffi.new(f"byte[{self.output_size}]") ret = _lib.wc_RsaPSS_Verify(signature, len(signature), @@ -865,13 +904,13 @@ def verify_pss(self, plaintext, signature): class RsaPrivate(RsaPublic): if _lib.KEYGEN_ENABLED: @classmethod - def make_key(cls, size, rng=None, hash_type=None): + def make_key(cls, size: int, rng: Random | None = None, hash_type: int | None = None) -> RsaPrivate: """ Generates a new key pair of desired length **size**. """ if rng is None: rng = Random() - rsa = cls(hash_type=hash_type) + rsa = cls(hash_type=hash_type, rng=rng) ret = _lib.wc_MakeRsaKey(rsa.native_object, size, 65537, rng.native_object) @@ -883,12 +922,9 @@ def make_key(cls, size, rng=None, hash_type=None): if rsa.output_size < 0: # pragma: no cover raise WolfCryptApiError("Invalid key size error", rsa.output_size) - # Retain RNG reference defensively. - rsa._rng = rng - return rsa - def __init__(self, key=None, hash_type=None, rng=None): # pylint: disable=super-init-not-called + def __init__(self, key: BytesOrStr | None = None, hash_type: int | None = None, rng: Random | None = None) -> None: # pylint: disable=super-init-not-called _Rsa.__init__(self, rng) # pylint: disable=non-parent-init-called self._hash_type = hash_type @@ -922,12 +958,12 @@ def __init__(self, key=None, hash_type=None, rng=None): # pylint: disable=super if _lib.ASN_ENABLED: @classmethod - def from_pem(cls, file, hash_type=None, rng=None): + def from_pem(cls, file: bytes, hash_type: int | None = None, rng: Random | None = None) -> RsaPrivate: der = pem_to_der(file, _lib.PRIVATEKEY_TYPE) return cls(key=der, hash_type=hash_type, rng=rng) if _lib.KEYGEN_ENABLED: - def encode_key(self): + def encode_key(self) -> tuple[bytes, bytes]: """ Encodes the RSA private and public keys in an ASN sequence. @@ -949,7 +985,7 @@ def encode_key(self): return _ffi.buffer(priv, privlen)[:], _ffi.buffer(pub, publen)[:] - def decrypt(self, ciphertext): + def decrypt(self, ciphertext: BytesOrStr) -> bytes: """ Decrypts **ciphertext**, using the private key data in the object. The ciphertext's length must be equal to: @@ -970,7 +1006,7 @@ def decrypt(self, ciphertext): return _ffi.buffer(plaintext, ret)[:] - def decrypt_oaep(self, ciphertext, label=""): + def decrypt_oaep(self, ciphertext: BytesOrStr, label: BytesOrStr = "") -> bytes: """ Decrypts **ciphertext**, using the private key data in the object. The ciphertext's length must be equal to: @@ -986,6 +1022,7 @@ def decrypt_oaep(self, ciphertext, label=""): plaintext = _ffi.new(f"byte[{self.output_size}]") if self._mgf is None: self._get_mgf() + assert self._mgf is not None ret = _lib.wc_RsaPrivateDecrypt_ex(ciphertext, len(ciphertext), plaintext, self.output_size, self.native_object, @@ -997,7 +1034,7 @@ def decrypt_oaep(self, ciphertext, label=""): return _ffi.buffer(plaintext, ret)[:] - def sign(self, plaintext): + def sign(self, plaintext: BytesOrStr) -> bytes: """ Signs **plaintext**, using the private key data in the object. The plaintext's length must not be greater than: @@ -1020,7 +1057,7 @@ def sign(self, plaintext): return _ffi.buffer(signature, self.output_size)[:] if _lib.RSA_PSS_ENABLED: - def sign_pss(self, plaintext): + def sign_pss(self, plaintext: BytesOrStr) -> bytes: """ Signs **plaintext**, using the private key data in the object. The plaintext's length must not be greater than: @@ -1042,6 +1079,7 @@ def sign_pss(self, plaintext): signature = _ffi.new(f"byte[{self.output_size}]") if self._mgf is None: self._get_mgf() + assert self._mgf is not None ret = _lib.wc_RsaPSS_Sign(digest, len(digest), signature, self.output_size, @@ -1057,7 +1095,7 @@ def sign_pss(self, plaintext): if _lib.ECC_ENABLED: class _Ecc: # pylint: disable=too-few-public-methods - def __init__(self): + def __init__(self) -> None: self.native_object = _ffi.new("ecc_key *") ret = _lib.wc_ecc_init(self.native_object) if ret < 0: # pragma: no cover @@ -1066,27 +1104,27 @@ def __init__(self): # making sure _lib.wc_ecc_free outlives ecc_key instances _delete = staticmethod(_lib.wc_ecc_free) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) @property - def size(self): + def size(self) -> int: return _lib.wc_ecc_size(self.native_object) @property - def max_signature_size(self): + def max_signature_size(self) -> int: return _lib.wc_ecc_sig_size(self.native_object) class EccPublic(_Ecc): - def __init__(self, key=None): + def __init__(self, key: BytesOrStr | None = None) -> None: _Ecc.__init__(self) if key: self.decode_key(key) - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ECC public key from an ASN sequence. """ @@ -1104,7 +1142,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def decode_key_raw(self, qx, qy, curve_id=ECC_SECP256R1): + def decode_key_raw(self, qx: BytesOrStr, qy: BytesOrStr, curve_id: int = ECC_SECP256R1) -> None: """ Decodes an ECC public key from its raw elements: (Qx,Qy) """ @@ -1122,7 +1160,7 @@ def decode_key_raw(self, qx, qy, curve_id=ECC_SECP256R1): if ret != 0: raise WolfCryptApiError("Key decode error", ret) - def encode_key(self, with_curve=True): + def encode_key(self, with_curve: bool = True) -> bytes: """ Encodes the ECC public key in an ASN sequence. @@ -1137,7 +1175,7 @@ def encode_key(self, with_curve=True): return _ffi.buffer(key, ret)[:] - def encode_key_raw(self): + def encode_key_raw(self) -> tuple[bytes, bytes]: """ Encodes the ECC public key in its two raw elements @@ -1158,7 +1196,7 @@ def encode_key_raw(self): return _ffi.buffer(Qx, qx_size[0])[:], _ffi.buffer(Qy, qy_size[0])[:] - def import_x963(self, x963): + def import_x963(self, x963: bytes) -> None: """ Imports an ECC public key in ANSI X9.63 format. """ @@ -1166,7 +1204,7 @@ def import_x963(self, x963): if ret != 0: raise WolfCryptApiError("x963 import error", ret) - def export_x963(self): + def export_x963(self) -> bytes: """ Exports the public key data of the object in ANSI X9.63 format. @@ -1182,7 +1220,7 @@ def export_x963(self): return _ffi.buffer(x963, x963_size[0])[:] - def verify(self, signature, data): + def verify(self, signature: bytes, data: BytesOrStr) -> bool: """ Verifies **signature**, using the public key data in the object. @@ -1201,7 +1239,7 @@ def verify(self, signature, data): return status[0] == 1 if _lib.MPAPI_ENABLED: - def verify_raw(self, R, S, data): + def verify_raw(self, R: bytes, S: bytes, data: BytesOrStr) -> bool: """ Verifies signature from its raw elements **R** and **S**, using the public key data in the object. @@ -1245,18 +1283,19 @@ def verify_raw(self, R, S, data): class EccPrivate(EccPublic): - def __init__(self, key=None, rng=None): + def __init__(self, key: BytesOrStr | None = None, rng: Random | None = None) -> None: super().__init__(key) self._rng = rng @classmethod - def make_key(cls, size, rng=None): + def make_key(cls, size: int, rng: Random | None = None) -> EccPrivate: """ Generates a new key pair of desired length **size**. """ if rng is None: rng = Random() ecc = cls(rng=rng) + assert ecc._rng is not None ret = _lib.wc_ecc_make_key(ecc._rng.native_object, size, ecc.native_object) @@ -1271,7 +1310,7 @@ def make_key(cls, size, rng=None): return ecc - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ECC private key from an ASN sequence. """ @@ -1289,7 +1328,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def decode_key_raw(self, qx, qy, d, curve_id=ECC_SECP256R1): + def decode_key_raw(self, qx: BytesOrStr, qy: BytesOrStr, d: BytesOrStr, curve_id: int = ECC_SECP256R1) -> None: """ Decodes an ECC private key from its raw elements: public (Qx,Qy) and private(d) @@ -1310,7 +1349,7 @@ def decode_key_raw(self, qx, qy, d, curve_id=ECC_SECP256R1): if ret != 0: raise WolfCryptApiError("Key decode error", ret) - def encode_key(self): + def encode_key(self) -> bytes: """ Encodes the ECC private key in an ASN sequence. @@ -1324,7 +1363,7 @@ def encode_key(self): return _ffi.buffer(key, ret)[:] - def encode_key_raw(self): + def encode_key_raw(self) -> tuple[bytes, bytes, bytes]: """ Encodes the ECC private key in its three raw elements @@ -1348,7 +1387,7 @@ def encode_key_raw(self): return _ffi.buffer(Qx, qx_size[0])[:], _ffi.buffer(Qy, qy_size[0])[:], _ffi.buffer(d, d_size[0])[:] - def shared_secret(self, peer): + def shared_secret(self, peer: EccPrivate) -> bytes: """ Generates a new secret key using the private key data in the object and the peer's public key. @@ -1368,7 +1407,7 @@ def shared_secret(self, peer): return _ffi.buffer(shared_secret, secret_size[0])[:] - def sign(self, plaintext, rng=None): + def sign(self, plaintext: BytesOrStr, rng: Random | None = None) -> bytes: """ Signs **plaintext**, using the private key data in the object. @@ -1393,7 +1432,7 @@ def sign(self, plaintext, rng=None): return _ffi.buffer(signature, signature_size[0])[:] if _lib.MPAPI_ENABLED: - def sign_raw(self, plaintext, rng=None): + def sign_raw(self, plaintext: BytesOrStr, rng: Random | None = None) -> tuple[bytes, bytes]: """ Signs **plaintext**, using the private key data in the object. @@ -1441,7 +1480,7 @@ def sign_raw(self, plaintext, rng=None): if _lib.ED25519_ENABLED: class _Ed25519: # pylint: disable=too-few-public-methods - def __init__(self): + def __init__(self) -> None: self.native_object = _ffi.new("ed25519_key *") ret = _lib.wc_ed25519_init(self.native_object) if ret < 0: # pragma: no cover @@ -1450,27 +1489,27 @@ def __init__(self): # making sure _lib.wc_ed25519_free outlives ed25519_key instances _delete = staticmethod(_lib.wc_ed25519_free) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) @property - def size(self): + def size(self) -> int: return _lib.wc_ed25519_size(self.native_object) @property - def max_signature_size(self): + def max_signature_size(self) -> int: return _lib.wc_ed25519_sig_size(self.native_object) class Ed25519Public(_Ed25519): - def __init__(self, key=None): + def __init__(self, key: BytesOrStr | None = None) -> None: _Ed25519.__init__(self) if key: self.decode_key(key) - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ED25519 public key """ @@ -1489,7 +1528,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + def encode_key(self) -> bytes: """ Encodes the ED25519 public key @@ -1506,7 +1545,7 @@ def encode_key(self): return _ffi.buffer(key, size[0])[:] - def verify(self, signature, data): + def verify(self, signature: bytes, data: BytesOrStr) -> bool: """ Verifies **signature**, using the public key data in the object. @@ -1527,16 +1566,18 @@ def verify(self, signature, data): class Ed25519Private(Ed25519Public): - def __init__(self, key=None, pub=None): + def __init__(self, key: BytesOrStr | None = None, pub: bytes | None = None) -> None: _Ed25519.__init__(self) + self._rng = None + if key and not pub: self.decode_key(key) if key and pub: self.decode_key(key,pub) @classmethod - def make_key(cls, size, rng=None): + def make_key(cls, size: int, rng: Random | None = None) -> Ed25519Private: """ Generates a new key pair of desired length **size**. """ @@ -1555,7 +1596,7 @@ def make_key(cls, size, rng=None): return ed25519 - def decode_key(self, key, pub = None): + def decode_key(self, key: BytesOrStr, pub: bytes | None = None) -> None: """ Decodes an ED25519 private + pub key """ @@ -1591,7 +1632,7 @@ def decode_key(self, key, pub = None): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + def encode_key(self) -> tuple[bytes, bytes]: """ Encodes the ED25519 private key. @@ -1616,7 +1657,7 @@ def encode_key(self): return _ffi.buffer(key, priv_size[0])[:], _ffi.buffer(pubkey, pub_size[0])[:] - def sign(self, plaintext): + def sign(self, plaintext: BytesOrStr) -> bytes: """ Signs **plaintext**, using the private key data in the object. @@ -1639,7 +1680,7 @@ def sign(self, plaintext): if _lib.ED448_ENABLED: class _Ed448: # pylint: disable=too-few-public-methods - def __init__(self): + def __init__(self) -> None: self.native_object = _ffi.new("ed448_key *") ret = _lib.wc_ed448_init(self.native_object) if ret < 0: # pragma: no cover @@ -1648,27 +1689,27 @@ def __init__(self): # making sure _lib.wc_ed448_free outlives ed448_key instances _delete = staticmethod(_lib.wc_ed448_free) - def __del__(self): + def __del__(self) -> None: if self.native_object: self._delete(self.native_object) @property - def size(self): + def size(self) -> int: return _lib.wc_ed448_size(self.native_object) @property - def max_signature_size(self): + def max_signature_size(self) -> int: return _lib.wc_ed448_sig_size(self.native_object) class Ed448Public(_Ed448): - def __init__(self, key=None): + def __init__(self, key: BytesOrStr | None = None) -> None: _Ed448.__init__(self) if key: self.decode_key(key) - def decode_key(self, key): + def decode_key(self, key: BytesOrStr) -> None: """ Decodes an ED448 public key """ @@ -1687,7 +1728,7 @@ def decode_key(self, key): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + def encode_key(self) -> bytes: """ Encodes the ED448 public key @@ -1704,7 +1745,7 @@ def encode_key(self): return _ffi.buffer(key, size[0])[:] - def verify(self, signature, data, ctx=None): + def verify(self, signature: bytes, data: BytesOrStr, ctx: BytesOrStr | None = None) -> bool: """ Verifies **signature**, using the public key data in the object. @@ -1733,16 +1774,17 @@ def verify(self, signature, data, ctx=None): class Ed448Private(Ed448Public): - def __init__(self, key=None, pub=None): + def __init__(self, key: BytesOrStr | None = None, pub: bytes | None = None) -> None: _Ed448.__init__(self) + self._rng = None if key and not pub: self.decode_key(key) if key and pub: - self.decode_key(key,pub) + self.decode_key(key, pub) @classmethod - def make_key(cls, size, rng=None): + def make_key(cls, size: int, rng: Random | None = None) -> Ed448Private: """ Generates a new key pair of desired length **size**. """ @@ -1761,7 +1803,7 @@ def make_key(cls, size, rng=None): return ed448 - def decode_key(self, key, pub = None): + def decode_key(self, key: BytesOrStr, pub: bytes | None = None) -> None: """ Decodes an ED448 private + pub key """ @@ -1797,7 +1839,7 @@ def decode_key(self, key, pub = None): if self.max_signature_size <= 0: # pragma: no cover raise WolfCryptError(f"Key decode error ({self.max_signature_size})") - def encode_key(self): + def encode_key(self) -> tuple[bytes, bytes]: """ Encodes the ED448 private key. @@ -1822,7 +1864,7 @@ def encode_key(self): return _ffi.buffer(key, priv_size[0])[:], _ffi.buffer(pubkey, pub_size[0])[:] - def sign(self, plaintext, ctx=None): + def sign(self, plaintext: BytesOrStr, ctx : BytesOrStr | None = None) -> bytes: """ Signs **plaintext**, using the private key data in the object. @@ -1873,7 +1915,7 @@ class MlKemType(IntEnum): class _MlKemBase: INVALID_DEVID = _lib.INVALID_DEVID - def __init__(self, mlkem_type): + def __init__(self, mlkem_type: MlKemType) -> None: self.init_done = False self.native_object = _ffi.new("KyberKey *") ret = _lib.wc_KyberKey_Init( @@ -1884,13 +1926,14 @@ def __init__(self, mlkem_type): raise WolfCryptApiError("wc_KyberKey_Init() error", ret) self.init_done = True + self._rng = None - def __del__(self): + def __del__(self) -> None: if self.init_done: _lib.wc_KyberKey_Free(self.native_object) @property - def ct_size(self): + def ct_size(self) -> int: """ :return: cipher text size in bytes :rtype: int @@ -1904,7 +1947,7 @@ def ct_size(self): return len[0] @property - def ss_size(self): + def ss_size(self) -> int: """ :return: shared secret size in bytes :rtype: int @@ -1918,7 +1961,7 @@ def ss_size(self): return len[0] @property - def _pub_key_size(self): + def _pub_key_size(self) -> int: len = _ffi.new("word32 *") ret = _lib.wc_KyberKey_PublicKeySize(self.native_object, len) @@ -1927,7 +1970,7 @@ def _pub_key_size(self): return len[0] - def _encode_pub_key(self): + def _encode_pub_key(self) -> bytes: pub_key_size = self._pub_key_size pub_key = _ffi.new(f"unsigned char[{pub_key_size}]") ret = _lib.wc_KyberKey_EncodePublicKey( @@ -1941,21 +1984,21 @@ def _encode_pub_key(self): class MlKemPublic(_MlKemBase): @property - def key_size(self): + def key_size(self) -> int: """ :return: public key size in bytes :rtype: int """ return self._pub_key_size - def encode_key(self): + def encode_key(self) -> bytes: """ :return: exported public key :rtype: bytes """ return self._encode_pub_key() - def decode_key(self, pub_key): + def decode_key(self, pub_key: BytesOrStr) -> None: """ :param pub_key: public key to be imported :type pub_key: bytes or str @@ -1970,7 +2013,7 @@ def decode_key(self, pub_key): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_KyberKey_DecodePublicKey() error", ret) - def encapsulate(self, rng=None): + def encapsulate(self, rng: Random | None = None) -> tuple[bytes, bytes]: """ :param rng: random number generator for an encupsulation :type rng: Random @@ -1992,7 +2035,7 @@ def encapsulate(self, rng=None): return _ffi.buffer(ss, ss_size)[:], _ffi.buffer(ct, ct_size)[:] - def encapsulate_with_random(self, rand): + def encapsulate_with_random(self, rand: bytes) -> tuple[bytes, bytes]: """ :param rand: random number for an encapsulation :type rand: bytes @@ -2004,7 +2047,7 @@ def encapsulate_with_random(self, rand): ct = _ffi.new(f"unsigned char[{ct_size}]") ss = _ffi.new(f"unsigned char[{ss_size}]") ret = _lib.wc_KyberKey_EncapsulateWithRandom( - self.native_object, ct, ss, _ffi.from_buffer(rand), len(rand) + self.native_object, ct, ss, rand, len(rand) ) if ret < 0: # pragma: no cover @@ -2014,7 +2057,7 @@ def encapsulate_with_random(self, rand): class MlKemPrivate(_MlKemBase): @classmethod - def make_key(cls, mlkem_type, rng=None): + def make_key(cls, mlkem_type: MlKemType, rng: Random | None = None) -> MlKemPrivate: """ :param mlkem_type: ML-KEM type :type mlkem_type: MlKemType @@ -2037,7 +2080,7 @@ def make_key(cls, mlkem_type, rng=None): return mlkem_priv @classmethod - def make_key_with_random(cls, mlkem_type, rand): + def make_key_with_random(cls, mlkem_type: MlKemType, rand: bytes) -> MlKemPrivate: """ :param mlkem_type: ML-KEM type :type mlkem_type: MlKemType @@ -2047,9 +2090,7 @@ def make_key_with_random(cls, mlkem_type, rand): :rtype: MlKemPrivate """ mlkem_priv = cls(mlkem_type) - ret = _lib.wc_KyberKey_MakeKeyWithRandom( - mlkem_priv.native_object, _ffi.from_buffer(rand), len(rand) - ) + ret = _lib.wc_KyberKey_MakeKeyWithRandom(mlkem_priv.native_object, rand, len(rand)) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_KyberKey_MakeKeyWithRandom() error", ret) @@ -2057,7 +2098,7 @@ def make_key_with_random(cls, mlkem_type, rand): return mlkem_priv @property - def pub_key_size(self): + def pub_key_size(self) -> int: """ :return: public key size in bytes :rtype: int @@ -2065,7 +2106,7 @@ def pub_key_size(self): return self._pub_key_size @property - def priv_key_size(self): + def priv_key_size(self) -> int: """ :return: private key size in bytes :rtype: int @@ -2078,14 +2119,14 @@ def priv_key_size(self): return len[0] - def encode_pub_key(self): + def encode_pub_key(self) -> bytes: """ :return: exported public key :rtype: bytes """ return self._encode_pub_key() - def encode_priv_key(self): + def encode_priv_key(self) -> bytes: """ :return: exported private key :rtype: bytes @@ -2101,7 +2142,7 @@ def encode_priv_key(self): return _ffi.buffer(priv_key, priv_key_size)[:] - def decode_key(self, priv_key: tuple[bytes, str]): + def decode_key(self, priv_key: BytesOrStr) -> None: """ :param priv_key: private key to be imported :type priv_key: bytes or str @@ -2109,14 +2150,14 @@ def decode_key(self, priv_key: tuple[bytes, str]): priv_key_bytestype = t2b(priv_key) ret = _lib.wc_KyberKey_DecodePrivateKey( self.native_object, - _ffi.from_buffer(priv_key_bytestype), + priv_key_bytestype, len(priv_key_bytestype), ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_KyberKey_DecodePrivateKey() error", ret) - def decapsulate(self, ct): + def decapsulate(self, ct: BytesOrStr) -> bytes: """ :param ct: cipher text :type ct: bytes or str @@ -2129,7 +2170,7 @@ def decapsulate(self, ct): ret = _lib.wc_KyberKey_Decapsulate( self.native_object, ss, - _ffi.from_buffer(ct_bytestype), + ct_bytestype, len(ct_bytestype), ) @@ -2164,7 +2205,7 @@ class _MlDsaBase: INVALID_DEVID = _lib.INVALID_DEVID ML_DSA_KEYGEN_SEED_LENGTH = _lib.DILITHIUM_SEED_SZ - def __init__(self, mldsa_type): + def __init__(self, mldsa_type: MlDsaType) -> None: self._init_done = False self.native_object = _ffi.new("dilithium_key *") ret = _lib.wc_dilithium_init_ex( @@ -2174,6 +2215,7 @@ def __init__(self, mldsa_type): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_init_ex() error", ret) + self._rng = None self._init_done = True ret = _lib.wc_dilithium_set_level(self.native_object, mldsa_type) @@ -2181,12 +2223,12 @@ def __init__(self, mldsa_type): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_set_level() error", ret) - def __del__(self): + def __del__(self) -> None: if self._init_done: _lib.wc_dilithium_free(self.native_object) @property - def _pub_key_size(self): + def _pub_key_size(self) -> int: size = _ffi.new("int *") ret = _lib.wc_MlDsaKey_GetPubLen(self.native_object, size) @@ -2196,7 +2238,7 @@ def _pub_key_size(self): return size[0] @property - def sig_size(self): + def sig_size(self) -> int: """ :return: signature size in bytes :rtype: int @@ -2209,10 +2251,10 @@ def sig_size(self): return size[0] - def _decode_pub_key(self, pub_key): + def _decode_pub_key(self, pub_key: BytesOrStr) -> None: pub_key_bytestype = t2b(pub_key) ret = _lib.wc_dilithium_import_public( - _ffi.from_buffer(pub_key_bytestype), + pub_key_bytestype, len(pub_key_bytestype), self.native_object, ) @@ -2220,7 +2262,7 @@ def _decode_pub_key(self, pub_key): if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_import_public() error", ret) - def _encode_pub_key(self): + def _encode_pub_key(self) -> bytes: in_size = self._pub_key_size pub_key = _ffi.new(f"byte[{in_size}]") out_size = _ffi.new("word32 *") @@ -2235,7 +2277,7 @@ def _encode_pub_key(self): return _ffi.buffer(pub_key, out_size[0])[:] - def verify(self, signature, message, ctx=None): + def verify(self, signature: BytesOrStr, message: BytesOrStr, ctx: BytesOrStr | None = None) -> bool: """ :param signature: signature to be verified :type signature: bytes or str @@ -2253,11 +2295,11 @@ def verify(self, signature, message, ctx=None): if ctx is not None: ctx_bytestype = t2b(ctx) ret = _lib.wc_dilithium_verify_ctx_msg( - _ffi.from_buffer(sig_bytestype), + sig_bytestype, len(sig_bytestype), - _ffi.from_buffer(ctx_bytestype), + ctx_bytestype, len(ctx_bytestype), - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), res, self.native_object, @@ -2266,9 +2308,9 @@ def verify(self, signature, message, ctx=None): raise WolfCryptApiError("wc_dilithium_verify_ctx_msg() error", ret) else: ret = _lib.wc_dilithium_verify_msg( - _ffi.from_buffer(sig_bytestype), + sig_bytestype, len(sig_bytestype), - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), res, self.native_object, @@ -2281,7 +2323,7 @@ def verify(self, signature, message, ctx=None): class MlDsaPrivate(_MlDsaBase): @classmethod - def make_key(cls, mldsa_type, rng=None): + def make_key(cls, mldsa_type: MlDsaType, rng: Random | None = None) -> MlDsaPrivate: """ :param mldsa_type: ML-DSA type :type mldsa_type: MlDsaType @@ -2306,7 +2348,7 @@ def make_key(cls, mldsa_type, rng=None): return mldsa_priv @classmethod - def make_key_from_seed(cls, mldsa_type, seed): + def make_key_from_seed(cls, mldsa_type: MlDsaType, seed: bytes) -> MlDsaPrivate: """ Deterministically generate the key from a seed. @@ -2316,19 +2358,13 @@ def make_key_from_seed(cls, mldsa_type, seed): :type seed: bytes """ mldsa_priv = cls(mldsa_type) - try: - seed_view = memoryview(seed) - except TypeError as exception: - raise TypeError( - "seed must support the buffer protocol, such as `bytes` or `bytearray`" - ) from exception - if len(seed_view) != cls.ML_DSA_KEYGEN_SEED_LENGTH: + seed_bytes = t2b(seed) + if len(seed_bytes) != cls.ML_DSA_KEYGEN_SEED_LENGTH: raise ValueError( f"Seed for generating ML-DSA key must be {cls.ML_DSA_KEYGEN_SEED_LENGTH} bytes" ) - ret = _lib.wc_dilithium_make_key_from_seed(mldsa_priv.native_object, - _ffi.from_buffer(seed_view)) + ret = _lib.wc_dilithium_make_key_from_seed(mldsa_priv.native_object, seed_bytes) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_make_key_from_seed() error", ret) @@ -2336,7 +2372,7 @@ def make_key_from_seed(cls, mldsa_type, seed): return mldsa_priv @property - def pub_key_size(self): + def pub_key_size(self) -> int: """ :return: public key size in bytes :rtype: int @@ -2344,7 +2380,7 @@ def pub_key_size(self): return self._pub_key_size @property - def priv_key_size(self): + def priv_key_size(self) -> int: """ :return: private key size in bytes :rtype: int @@ -2357,14 +2393,14 @@ def priv_key_size(self): return size[0] - self.pub_key_size - def encode_pub_key(self): + def encode_pub_key(self) -> bytes: """ :return: exported public key :rtype: bytes """ return self._encode_pub_key() - def encode_priv_key(self): + def encode_priv_key(self) -> bytes: """ :return: exported private key :rtype: bytes @@ -2385,7 +2421,7 @@ def encode_priv_key(self): return _ffi.buffer(priv_key, out_size[0])[:] - def decode_key(self, priv_key, pub_key=None): + def decode_key(self, priv_key: BytesOrStr, pub_key: BytesOrStr | None = None) -> None: """ :param priv_key: private key to be imported :type priv_key: bytes or str @@ -2394,7 +2430,7 @@ def decode_key(self, priv_key, pub_key=None): """ priv_key_bytestype = t2b(priv_key) ret = _lib.wc_dilithium_import_private( - _ffi.from_buffer(priv_key_bytestype), + priv_key_bytestype, len(priv_key_bytestype), self.native_object, ) @@ -2405,7 +2441,7 @@ def decode_key(self, priv_key, pub_key=None): if pub_key is not None: self._decode_pub_key(pub_key) - def sign(self, message, rng=None, ctx=None): + def sign(self, message: BytesOrStr, rng: Random | None = None, ctx: BytesOrStr | None = None) -> bytes: """ :param message: message to be signed :type message: bytes or str @@ -2429,9 +2465,9 @@ def sign(self, message, rng=None, ctx=None): if len(ctx_bytestype) > 255: raise ValueError(f"context length {len(ctx_bytestype)} too large: must be 255 bytes or less") ret = _lib.wc_dilithium_sign_ctx_msg( - _ffi.from_buffer(ctx_bytestype), + ctx_bytestype, len(ctx_bytestype), # length must be < 256 bytes - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, @@ -2442,7 +2478,7 @@ def sign(self, message, rng=None, ctx=None): raise WolfCryptApiError("wc_dilithium_sign_ctx_msg() error", ret) else: ret = _lib.wc_dilithium_sign_msg( - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, @@ -2457,7 +2493,7 @@ def sign(self, message, rng=None, ctx=None): return _ffi.buffer(signature, out_size[0])[:] - def sign_with_seed(self, message, seed, ctx=None): + def sign_with_seed(self, message: BytesOrStr, seed: bytes, ctx: BytesOrStr | None = None) -> bytes: """ :param message: message to be signed :type message: bytes or str @@ -2474,13 +2510,7 @@ def sign_with_seed(self, message, seed, ctx=None): out_size = _ffi.new("word32 *") out_size[0] = in_size - try: - seed_view = memoryview(seed) - except TypeError as exception: - raise TypeError( - "seed must support the buffer protocol, such as `bytes` or `bytearray`" - ) from exception - if len(seed_view) != ML_DSA_SIGNATURE_SEED_LENGTH: + if len(seed) != ML_DSA_SIGNATURE_SEED_LENGTH: raise ValueError( f"Seed for generating a signature must be {ML_DSA_SIGNATURE_SEED_LENGTH}" "bytes." @@ -2493,25 +2523,25 @@ def sign_with_seed(self, message, seed, ctx=None): f"context length {len(ctx_bytestype)} too large: must be 255 or less" ) ret = _lib.wc_dilithium_sign_ctx_msg_with_seed( - _ffi.from_buffer(ctx_bytestype), + ctx_bytestype, len(ctx_bytestype), # length must be < 256 bytes - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, self.native_object, - _ffi.from_buffer(seed_view), + seed, ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_sign_ctx_msg_with_seed() error", ret) else: ret = _lib.wc_dilithium_sign_msg_with_seed( - _ffi.from_buffer(msg_bytestype), + msg_bytestype, len(msg_bytestype), signature, out_size, self.native_object, - _ffi.from_buffer(seed_view), + seed, ) if ret < 0: # pragma: no cover raise WolfCryptApiError("wc_dilithium_sign_msg_with_seed() error", ret) @@ -2524,21 +2554,21 @@ def sign_with_seed(self, message, seed, ctx=None): class MlDsaPublic(_MlDsaBase): @property - def key_size(self): + def key_size(self) -> int: """ :return: public key size in bytes :rtype: int """ return self._pub_key_size - def decode_key(self, pub_key): + def decode_key(self, pub_key: BytesOrStr) -> None: """ :param pub_key: public key to be imported :type pub_key: bytes or str """ - return self._decode_pub_key(pub_key) + self._decode_pub_key(pub_key) - def encode_key(self): + def encode_key(self) -> bytes: """ :return: exported public key :rtype: bytes diff --git a/wolfcrypt/exceptions.py b/wolfcrypt/exceptions.py index 5b41bd7..c691a18 100644 --- a/wolfcrypt/exceptions.py +++ b/wolfcrypt/exceptions.py @@ -18,6 +18,9 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +from __future__ import annotations + +from typing import cast from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib @@ -56,6 +59,6 @@ def error_string(err_code: int) -> str: :return: error string """ if _lib.ERROR_STRINGS_ENABLED: - return _ffi.string(_lib.wc_GetErrorString(err_code)).decode() + return cast(bytes, _ffi.string(_lib.wc_GetErrorString(err_code))).decode() else: return "" diff --git a/wolfcrypt/hashes.py b/wolfcrypt/hashes.py index eac0f22..8de8881 100644 --- a/wolfcrypt/hashes.py +++ b/wolfcrypt/hashes.py @@ -20,19 +20,23 @@ # pylint: disable=no-member,no-name-in-module, no-self-use +from __future__ import annotations + +from abc import ABC, abstractmethod + +from _cffi_backend import FFI from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib -from wolfcrypt.utils import t2b, b2h - from wolfcrypt.exceptions import WolfCryptApiError +from wolfcrypt.utils import t2b, b2h, BytesOrStr -class _Hash: +class _Hash(ABC): """ A **PEP 247: Cryptographic Hash Functions** compliant **Hash Function Interface**. """ - def __init__(self, string=None): + def __init__(self, string: BytesOrStr | None = None) -> None: self._native_object = _ffi.new(self._native_type) self._shallow_copy = False ret = self._init() @@ -42,17 +46,33 @@ def __init__(self, string=None): if string: self.update(string) + @abstractmethod + def _init(self) -> int: ... + + @abstractmethod + def _update(self, data: bytes) -> int: ... + + @abstractmethod + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: ... + + @property + @abstractmethod + def _native_size(self) -> int: ... + + @property + @abstractmethod + def _native_type(self) -> str: ... + + @property + @abstractmethod + def digest_size(self) -> int: ... + @classmethod - def new(cls, string=None): - """ - Creates a new hashing object and returns it. The optional - **string** parameter, if supplied, will be immediately - hashed into the object's starting state, as if - obj.update(string) was called. - """ - return cls(string) + @abstractmethod + def new(cls, string: BytesOrStr | None) -> _Hash: ... + - def copy(self): + def copy(self) -> _Hash: """ Returns a separate copy of this hashing object. An update to this copy won't affect the original object. @@ -86,7 +106,7 @@ def copy(self): return copy - def update(self, string): + def update(self, string: BytesOrStr) -> None: """ Hashes **string** into the current state of the hashing object. update() can be called any number of times during @@ -98,7 +118,7 @@ def update(self, string): if ret < 0: # pragma: no cover raise WolfCryptApiError("Hash update error", ret) - def digest(self): + def digest(self) -> bytes: """ Returns the hash value of this hashing object as a string containing 8-bit data. The object is not altered in any @@ -137,7 +157,7 @@ def digest(self): return _ffi.buffer(result, self.digest_size)[:] - def hexdigest(self): + def hexdigest(self) -> bytes: """ Returns the hash value of this hashing object as a string containing hexadecimal digits. Lowercase letters are used @@ -147,8 +167,20 @@ def hexdigest(self): return b2h(self.digest()) +class _Sha(_Hash): + @classmethod + def new(cls, string: BytesOrStr | None = None) -> _Hash: + """ + Creates a new hashing object and returns it. The optional + **string** parameter, if supplied, will be immediately + hashed into the object's starting state, as if + obj.update(string) was called. + """ + return cls(string) + + if _lib.SHA_ENABLED: - class Sha(_Hash): + class Sha(_Sha): """ **SHA-1** is a cryptographic hash function standardized by **NIST**. @@ -160,22 +192,22 @@ class Sha(_Hash): _delete = staticmethod(_lib.wc_ShaFree) _copy = staticmethod(_lib.wc_ShaCopy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + def _init(self) -> int: return _lib.wc_InitSha(self._native_object) - def _update(self, data): + def _update(self, data: bytes) -> int: return _lib.wc_ShaUpdate(self._native_object, data, len(data)) - def _final(self, obj, ret): + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_ShaFinal(obj, ret) if _lib.SHA256_ENABLED: - class Sha256(_Hash): + class Sha256(_Sha): """ **SHA-256** is a cryptographic hash function from the **SHA-2 family** and is standardized by **NIST**. @@ -188,22 +220,22 @@ class Sha256(_Hash): _delete = staticmethod(_lib.wc_Sha256Free) _copy = staticmethod(_lib.wc_Sha256Copy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + def _init(self) -> int: return _lib.wc_InitSha256(self._native_object) - def _update(self, data): + def _update(self, data: bytes) -> int: return _lib.wc_Sha256Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_Sha256Final(obj, ret) if _lib.SHA384_ENABLED: - class Sha384(_Hash): + class Sha384(_Sha): """ **SHA-384** is a cryptographic hash function from the **SHA-2 family** and is standardized by **NIST**. @@ -216,22 +248,22 @@ class Sha384(_Hash): _delete = staticmethod(_lib.wc_Sha384Free) _copy = staticmethod(_lib.wc_Sha384Copy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + def _init(self) -> int: return _lib.wc_InitSha384(self._native_object) - def _update(self, data): + def _update(self, data: bytes) -> int: return _lib.wc_Sha384Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_Sha384Final(obj, ret) if _lib.SHA512_ENABLED: - class Sha512(_Hash): + class Sha512(_Sha): """ **SHA-512** is a cryptographic hash function from the **SHA-2 family** and is standardized by **NIST**. @@ -244,21 +276,21 @@ class Sha512(_Hash): _delete = staticmethod(_lib.wc_Sha512Free) _copy = staticmethod(_lib.wc_Sha512Copy) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def _init(self): + def _init(self) -> int: return _lib.wc_InitSha512(self._native_object) - def _update(self, data): + def _update(self, data: bytes) -> int: return _lib.wc_Sha512Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_Sha512Final(obj, ret) if _lib.SHA3_ENABLED: - class Sha3(_Hash): + class Sha3(_Sha): """ **SHA3 ** is a cryptographic hash function family standardized by **NIST**. @@ -267,6 +299,7 @@ class Sha3(_Hash): Using SHA3-384 by default, unless a different digest size is passed through __init__. """ + digest_size = None _native_type = "wc_Sha3 *" _native_size = _ffi.sizeof("wc_Sha3") SHA3_224_DIGEST_SIZE = 28 @@ -288,16 +321,19 @@ class Sha3(_Hash): 64: _lib.wc_Sha3_512_Copy, } - def __del__(self): + def __del__(self) -> None: # Unlike the SHA-1/2 classes, Sha3's _delete is set per-instance # from a size->function dict and is None for invalid sizes, so # we need the extra truthiness check. - if (hasattr(self, '_native_object') - and not getattr(self, '_shallow_copy', False) - and getattr(self, '_delete', None)): + if ( + hasattr(self, '_native_object') + and not getattr(self, '_shallow_copy', False) + and getattr(self, '_delete', None) + and self._delete is not None + ): self._delete(self._native_object) - def __init__(self, string=None, size=SHA3_384_DIGEST_SIZE): # pylint: disable=W0231 + def __init__(self, string: BytesOrStr | None = None, size: int = SHA3_384_DIGEST_SIZE) -> None: # pylint: disable=W0231 self._native_object = _ffi.new(self._native_type) self._shallow_copy = False self.digest_size = size @@ -310,10 +346,10 @@ def __init__(self, string=None, size=SHA3_384_DIGEST_SIZE): # pylint: disable=W self.update(string) @classmethod - def new(cls, string=None, size=SHA3_384_DIGEST_SIZE): + def new(cls, string: BytesOrStr | None = None, size: int = SHA3_384_DIGEST_SIZE) -> Sha3: return cls(string, size) - def copy(self): + def copy(self) -> Sha3: # Bypass __init__ to avoid calling _init() on a state that _copy # immediately overwrites (which would leak internal resources in # async/HW-accelerated builds). Mark as shallow up front so @@ -337,12 +373,7 @@ def copy(self): # Keep _shallow_copy = True: memmove shares state with self. return c - def _init(self): - if (self.digest_size != Sha3.SHA3_224_DIGEST_SIZE and - self.digest_size != Sha3.SHA3_256_DIGEST_SIZE and - self.digest_size != Sha3.SHA3_384_DIGEST_SIZE and - self.digest_size != Sha3.SHA3_512_DIGEST_SIZE): - return -1 + def _init(self) -> int: if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: return _lib.wc_InitSha3_224(self._native_object, _ffi.NULL, 0) if self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: @@ -351,7 +382,9 @@ def _init(self): return _lib.wc_InitSha3_384(self._native_object, _ffi.NULL, 0) if self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: return _lib.wc_InitSha3_512(self._native_object, _ffi.NULL, 0) - def _update(self, data): + return -1 + + def _update(self, data: bytes) -> int: if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: return _lib.wc_Sha3_224_Update(self._native_object, data, len(data)) if self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: @@ -360,7 +393,9 @@ def _update(self, data): return _lib.wc_Sha3_384_Update(self._native_object, data, len(data)) if self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: return _lib.wc_Sha3_512_Update(self._native_object, data, len(data)) - def _final(self, obj, ret): + return -1 + + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: return _lib.wc_Sha3_224_Final(obj, ret) if self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: @@ -369,6 +404,7 @@ def _final(self, obj, ret): return _lib.wc_Sha3_384_Final(obj, ret) if self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: return _lib.wc_Sha3_512_Final(obj, ret) + return -1 # Hmac types @@ -405,24 +441,27 @@ class _Hmac(_Hash): _native_size = _ffi.sizeof("Hmac") _delete = staticmethod(_lib.wc_HmacFree) - def __del__(self): + def __del__(self) -> None: if hasattr(self, '_native_object') and not getattr(self, '_shallow_copy', False): self._delete(self._native_object) - def __init__(self, key, string=None): # pylint: disable=W0231 + def __init__(self, key: BytesOrStr, string: BytesOrStr | None = None) -> None: # pylint: disable=W0231 key = t2b(key) self._native_object = _ffi.new(self._native_type) self._shallow_copy = False - ret = self._init(self._type, key) + ret = self._hmac_init(self._type, key) if ret < 0: # pragma: no cover raise WolfCryptApiError("Hmac init error", ret) if string: self.update(string) + def _init(self) -> int: + return -1 + @classmethod - def new(cls, key, string=None): # pylint: disable=W0221 + def new(cls, key: BytesOrStr, string: BytesOrStr | None = None) -> _Hash: # pylint: disable=W0221 # ty: ignore[invalid-method-override] """ Creates a new hashing object and returns it. **key** is a required parameter containing a string giving the key @@ -432,7 +471,12 @@ def new(cls, key, string=None): # pylint: disable=W0221 """ return cls(key, string) - def _init(self, hmac, key): + + @property + @abstractmethod + def _type(self) -> int: ... + + def _hmac_init(self, hmac: int, key: bytes) -> int: ret = _lib.wc_HmacInit(self._native_object, _ffi.NULL, -2) if ret < 0: raise WolfCryptApiError("wc_HmacInit error", ret) @@ -446,10 +490,10 @@ def _init(self, hmac, key): raise WolfCryptApiError("wc_HmacSetKey error", ret) return ret - def _update(self, data): + def _update(self, data: bytes) -> int: return _lib.wc_HmacUpdate(self._native_object, data, len(data)) - def _final(self, obj, ret): + def _final(self, obj: FFI.CData, ret: FFI.CData) -> int: return _lib.wc_HmacFinal(obj, ret) @@ -462,7 +506,7 @@ class HmacSha(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA - digest_size = Sha.digest_size + digest_size = Sha.digest_size # ty: ignore[possibly-unresolved-reference] if _lib.SHA256_ENABLED: @@ -474,7 +518,7 @@ class HmacSha256(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA256 - digest_size = Sha256.digest_size + digest_size = Sha256.digest_size # ty: ignore[possibly-unresolved-reference] if _lib.SHA384_ENABLED: @@ -486,7 +530,7 @@ class HmacSha384(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA384 - digest_size = Sha384.digest_size + digest_size = Sha384.digest_size # ty: ignore[possibly-unresolved-reference] if _lib.SHA512_ENABLED: @@ -498,9 +542,9 @@ class HmacSha512(_Hmac): It produces a [ **512-bit | 64 bytes** ] message digest. """ _type = _TYPE_SHA512 - digest_size = Sha512.digest_size + digest_size = Sha512.digest_size # ty: ignore[possibly-unresolved-reference] -def hash_type_to_cls(hash_type): +def hash_type_to_cls(hash_type: int) -> type[_Hash] | None: if _lib.SHA_ENABLED and hash_type == _lib.WC_HASH_TYPE_SHA: hash_cls = Sha elif _lib.SHA256_ENABLED and hash_type == _lib.WC_HASH_TYPE_SHA256: diff --git a/wolfcrypt/hkdf.py b/wolfcrypt/hkdf.py index c06ab87..a4ddcdb 100644 --- a/wolfcrypt/hkdf.py +++ b/wolfcrypt/hkdf.py @@ -28,8 +28,9 @@ if _lib.HKDF_ENABLED: + from wolfcrypt.hashes import _Hmac # ty: ignore[possibly-missing-import] - def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): + def HKDF(hash_cls: _Hmac, in_key: bytes | str, salt: bytes | str | None = None, info: bytes | str | None = None, out_len: int | None = None) -> bytes: """ Perform HKDF Extract-and-Expand in one call (wraps wc_HKDF). @@ -55,6 +56,7 @@ def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): if out_len is None: out_len = hash_cls.digest_size + assert out_len is not None out = _ffi.new(f"byte[{out_len}]") ret = _lib.wc_HKDF( @@ -73,7 +75,7 @@ def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): return _ffi.buffer(out, out_len)[:] - def HKDF_Extract(hash_cls, salt, in_key): + def HKDF_Extract(hash_cls: _Hmac, salt: bytes | str | None, in_key: bytes | str) -> bytes: """ HKDF-Extract: PRK = HMAC-Hash(salt, IKM) Wraps wc_HKDF_Extract. @@ -100,7 +102,7 @@ def HKDF_Extract(hash_cls, salt, in_key): return _ffi.buffer(out, out_len)[:] - def HKDF_Expand(hash_cls, prk, info, out_len): + def HKDF_Expand(hash_cls: _Hmac, prk: bytes | str, info: bytes | str | None, out_len: int) -> bytes: """ HKDF-Expand: OKM = HKDF-Expand(PRK, info, L) Wraps wc_HKDF_Expand. diff --git a/wolfcrypt/pwdbased.py b/wolfcrypt/pwdbased.py index 56c83b0..ecdbd41 100644 --- a/wolfcrypt/pwdbased.py +++ b/wolfcrypt/pwdbased.py @@ -20,13 +20,15 @@ # pylint: disable=no-member,no-name-in-module +from __future__ import annotations + from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib from wolfcrypt.exceptions import WolfCryptApiError if _lib.PWDBASED_ENABLED: - def PBKDF2(password, salt, iterations, key_length, hash_type): + def PBKDF2(password: bytes | str, salt: bytes | str, iterations: int, key_length: int, hash_type: int) -> bytes: if isinstance(salt, str): salt = str.encode(salt) diff --git a/wolfcrypt/random.py b/wolfcrypt/random.py index 1c4da84..67b9639 100644 --- a/wolfcrypt/random.py +++ b/wolfcrypt/random.py @@ -34,11 +34,10 @@ class Random: """ def __init__(self, nonce: __builtins__.bytes = b"", device_id: int = -2) -> None: - self.native_object: _lib.RNG | None = _ffi.new("WC_RNG *") + self.native_object: _lib.RNG = _ffi.new("WC_RNG *") ret = _lib.wc_InitRngNonce_ex(self.native_object, nonce, len(nonce), _ffi.NULL, device_id) if ret < 0: # pragma: no cover - self.native_object = None raise WolfCryptApiError("RNG init error", ret) # making sure _lib.wc_FreeRng outlives WC_RNG instances @@ -58,7 +57,6 @@ def byte(self) -> __builtins__.bytes: """ result = _ffi.new("byte[1]") - assert self.native_object is not None ret = _lib.wc_RNG_GenerateByte(self.native_object, result) if ret < 0: # pragma: no cover raise WolfCryptApiError("RNG generate byte error", ret) @@ -71,7 +69,6 @@ def bytes(self, length: int) -> __builtins__.bytes: """ result = _ffi.new(f"byte[{length}]") - assert self.native_object is not None ret = _lib.wc_RNG_GenerateBlock(self.native_object, result, length) if ret < 0: # pragma: no cover raise WolfCryptApiError("RNG generate block error", ret) diff --git a/wolfcrypt/utils.py b/wolfcrypt/utils.py index 5de6363..6793488 100644 --- a/wolfcrypt/utils.py +++ b/wolfcrypt/utils.py @@ -22,10 +22,13 @@ from __future__ import annotations +from typing import TypeAlias + from binascii import hexlify as b2h, unhexlify as h2b # noqa: F401 +BytesOrStr: TypeAlias = bytes | bytearray | memoryview | str -def t2b(string: bytes | bytearray | memoryview | str) -> bytes: +def t2b(string: BytesOrStr) -> bytes: """ Converts text to bytes.