Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix COSMOS #198

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/aleph/sdk/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from aleph_message.models import Chain

from aleph.sdk.chains.common import get_fallback_private_key
from aleph.sdk.chains.cosmos import CSDKAccount
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.chains.evm import EVMAccount
from aleph.sdk.chains.remote import RemoteAccount
Expand Down Expand Up @@ -38,6 +39,7 @@
Chain.SOL: SOLAccount,
Chain.WORLDCHAIN: EVMAccount,
Chain.ZORA: EVMAccount,
Chain.CSDK: CSDKAccount,
}


Expand Down
45 changes: 37 additions & 8 deletions src/aleph/sdk/chains/cosmos.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import hashlib
import json
from typing import Union
from pathlib import Path
from typing import Optional, Union

import ecdsa
from cosmospy._wallet import privkey_to_address, privkey_to_pubkey
from ecdsa import BadSignatureError

from .common import BaseAccount, get_fallback_private_key, get_verification_buffer

Expand Down Expand Up @@ -52,7 +54,8 @@ def __init__(self, private_key=None, hrp=DEFAULT_HRP):
async def sign_message(self, message):
message = self._setup_sender(message)
verif = get_verification_string(message)
base64_pubkey = base64.b64encode(self.get_public_key().encode()).decode("utf-8")
pub_key = bytes.fromhex(self.get_public_key())
base64_pubkey = base64.b64encode(pub_key).decode()
signature = await self.sign_raw(verif.encode("utf-8"))

sig = {
Expand All @@ -78,17 +81,43 @@ def get_address(self) -> str:
return privkey_to_address(self.private_key)

def get_public_key(self) -> str:
return privkey_to_pubkey(self.private_key).decode()
return privkey_to_pubkey(self.private_key).hex()


def get_fallback_account(hrp=DEFAULT_HRP):
return CSDKAccount(private_key=get_fallback_private_key(), hrp=hrp)
def get_fallback_account(path: Optional[Path] = None, hrp=DEFAULT_HRP):
return CSDKAccount(private_key=get_fallback_private_key(path=path), hrp=hrp)


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
) -> bool:
"""TODO: Implement this"""
raise NotImplementedError("Not implemented yet")
):
"""
Verifies a signature.
Args:
signature: The signature to verify. Can be a base64 encoded string or bytes.
public_key: The public key to use for verification. Can be a base64 encoded string or bytes.
message: The message to verify. Can be an utf-8 string or bytes.
Raises:
BadSignatureError: If the signature is invalid.!
"""

if isinstance(signature, str):
signature = base64.b64decode(signature.encode("utf-8"))
if isinstance(public_key, str):
public_key = base64.b64decode(public_key)
if isinstance(message, str):
message = message.encode("utf-8")

vk = ecdsa.VerifyingKey.from_string(public_key, curve=ecdsa.SECP256k1)

try:
vk.verify(
signature,
message,
hashfunc=hashlib.sha256,
)
return True
except Exception as e:
raise BadSignatureError from e
8 changes: 8 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aiohttp import ClientResponseError
from aleph_message.models import AggregateMessage, AlephMessage, PostMessage

import aleph.sdk.chains.cosmos as cosmos
import aleph.sdk.chains.ethereum as ethereum
import aleph.sdk.chains.solana as solana
import aleph.sdk.chains.substrate as substrate
Expand Down Expand Up @@ -54,6 +55,13 @@ def substrate_account() -> substrate.DOTAccount:
yield substrate.get_fallback_account(path=Path(private_key_file.name))


@pytest.fixture
def cosmos_account() -> cosmos.CSDKAccount:
with NamedTemporaryFile(delete=False) as private_key_file:
private_key_file.close()
yield cosmos.get_fallback_account(path=Path(private_key_file.name))


@pytest.fixture
def json_messages():
messages_path = Path(__file__).parent / "messages.json"
Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_chain_cosmos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import base64
import json
from dataclasses import asdict, dataclass

import pytest
from ecdsa import BadSignatureError

from aleph.sdk.chains.common import get_verification_buffer
from aleph.sdk.chains.cosmos import get_verification_string, verify_signature


@dataclass
class Message:
chain: str
sender: str
type: str
item_hash: str


@pytest.mark.asyncio
async def test_verify_signature(cosmos_account):
message = asdict(
Message(
"CSDK",
cosmos_account.get_address(),
"POST",
"SomeHash",
)
)

await cosmos_account.sign_message(message)
assert message["signature"]
signature = json.loads(message["signature"])
raw_signature = signature["signature"]
assert isinstance(raw_signature, str)

pub_key = base64.b64decode(signature["pub_key"]["value"])

verify_signature(
raw_signature,
pub_key,
get_verification_string(message),
)


@pytest.mark.asyncio
async def test_verify_signature_raw(cosmos_account):
message = asdict(
Message(
"CSDK",
cosmos_account.get_address(),
"POST",
"SomeHash",
)
)
await cosmos_account.sign_message(message)
raw_message = get_verification_buffer(message)
raw_signature = await cosmos_account.sign_raw(raw_message)
assert isinstance(raw_signature, bytes)

pub_key = bytes.fromhex(cosmos_account.get_public_key())
verify_signature(
raw_signature.decode(),
pub_key,
raw_message,
)


@pytest.mark.asyncio
async def test_bad_signature(cosmos_account):
message = asdict(
Message(
"CSDK",
cosmos_account.get_address(),
"POST",
"SomeHash",
)
)
await cosmos_account.sign_message(message)
assert message["signature"]
signature = json.loads(message["signature"])
raw_signature = "1" + signature["signature"]
assert isinstance(raw_signature, str)

pub_key = base64.b64decode(signature["pub_key"]["value"])

with pytest.raises(BadSignatureError):
verify_signature(
raw_signature,
pub_key,
get_verification_string(message),
)
Loading