Initial commit
This commit is contained in:
commit
8897476a63
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
__pycache__
|
||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 Nasir Hossain
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
90
README.md
Normal file
90
README.md
Normal file
@ -0,0 +1,90 @@
|
||||
# Betanet Layer Prototype (L1, L2, L3, L5)
|
||||
|
||||
**Quick prototype of Betanet layers L1, L2, L3, and L5** implemented as an asyncio simulation.
|
||||
This is for educational/demo purposes only — **not** production-ready code.
|
||||
|
||||
## Disclaimer
|
||||
|
||||
This simulation **does not** use real Ed25519/X25519 cryptography.
|
||||
Crypto functions are stubbed with hash/HMAC placeholders to keep it simple and dependency-light.
|
||||
|
||||
For better implementations, replace the placeholder crypto in:
|
||||
|
||||
- `src/ashops.py`
|
||||
- `src/betanet_cryptography.py`
|
||||
- `src/htx_session.py`
|
||||
|
||||
with real libraries like:
|
||||
|
||||
- [PyNaCl](https://pynacl.readthedocs.io/)
|
||||
- [cryptography](https://cryptography.io/en/latest/)
|
||||
|
||||
---
|
||||
|
||||
## Mapping to the Betanet Spec
|
||||
|
||||
| Spec Layer | This Prototype Implements | Notes |
|
||||
| ----------------------------------------- | ---------------------------------------------------------------------------- | ---------------------------------------------------- |
|
||||
| **L1** Path selection & AS-hop validation | Simple `L1Path` & `PathSegment` with `ASHop` objects signed by AS keys | Validates SCION-like hop signatures |
|
||||
| **L2** Cover transport (HTX) | `HTXSession` with outer "fingerprint" mirroring & inner Noise-like handshake | Frame types: `STREAM`, `PING`, `KEY_UPDATE`, `CLOSE` |
|
||||
| **L3** Overlay mesh | `OverlayNode` with peerstore & content exchange (bitswap-style) | Simulates CID publication & retrieval |
|
||||
| **L5** Naming & trust | `AliasLedger` with self-certifying IDs & alias registration | Includes expiry and sequence tracking |
|
||||
|
||||
---
|
||||
|
||||
## Features
|
||||
|
||||
- **Path building & validation (L1)** — verifies AS hop signatures before use.
|
||||
- **HTX transport sessions (L2)** — simulates TLS/QUIC-like outer wrapping with inner encrypted frames.
|
||||
- **Overlay network (L3)** — publish & retrieve content by CID from connected peers.
|
||||
- **Alias ledger (L5)** — map human-readable aliases to self-certifying IDs.
|
||||
|
||||
---
|
||||
|
||||
## Running the Demo
|
||||
|
||||
```bash
|
||||
# i have many mirrors thus the clone url might not match depending on where you
|
||||
# are seeing this. but I have basically same everywhere so cloning github is ok.
|
||||
git clone https://github.com/nishad-prime/betanet-layer-prototype.git
|
||||
cd betanet-layer-prototype
|
||||
python3 betanet_quick_impl.py
|
||||
```
|
||||
|
||||
Expected output (trimmed for brevity):
|
||||
|
||||
```
|
||||
[L1] Path validated
|
||||
[L3] Carol published CID ab12cd34ef56...
|
||||
[L5] Resolved carol.service -> deadbeefcafebabe...
|
||||
[L3] Alice retrieved content via overlay: b"Hello from Carol's content!"
|
||||
[L2][C] Received frame type=STREAM ...
|
||||
...
|
||||
Demo complete.
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Directory Structure
|
||||
|
||||
```
|
||||
.
|
||||
|- betanet_quick_impl.py # Main asyncio simulation
|
||||
|- src/
|
||||
|- alias_ledger.py # L5 alias registry
|
||||
|- ashop.py # L1 AS hop representation
|
||||
|- betanet_cryptography.py # Simplified keypair/signature
|
||||
|- htx_frame.py # HTX frame constants
|
||||
|- htx_session.py # L2 HTX transport session
|
||||
|- overlay_node.py # L3 overlay mesh node
|
||||
|- path.py # L1 path building/validation
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Next Steps
|
||||
|
||||
- Replace placeholder crypto with real Ed25519/X25519 implementations.
|
||||
- Swap in a real routing library for L1.
|
||||
- Expand L3 into a libp2p-compatible overlay.
|
||||
- Add tests for path validation, alias expiry, and HTX frame parsing.
|
||||
123
betanet_quick_impl.py
Normal file
123
betanet_quick_impl.py
Normal file
@ -0,0 +1,123 @@
|
||||
"""
|
||||
Quick prototype of Betanet layers L1, L2, L3 and L5 in an asyncio simulation.
|
||||
|
||||
Disclaimer: This is a simplified educational prototype. Cryptography is simulated using
|
||||
hash/HMAC placeholders (not real Ed25519/X25519). Replace placeholders with real
|
||||
crypto libraries (PyNaCl / cryptography) for production-quality code.
|
||||
|
||||
Mapping to spec:
|
||||
- L1: Path and AS-hop validation (simple path object + signature checks)
|
||||
- L2: HTX session with outer 'fingerprint' mirroring and inner Noise-like handshake
|
||||
- L3: Overlay mesh with simple peerstore and content exchange (bitswap-like)
|
||||
- L5: Self-certifying IDs and a tiny alias ledger with finality simulation
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from src.alias_ledger import AliasLedger
|
||||
from src.ashop import ASHop
|
||||
from src.betanet_cryptography import KeyPair
|
||||
from src.htx_frame import FT_CLOSE, FT_KEY_UPDATE, FT_PING, FT_STREAM
|
||||
from src.htx_session import HTXSession
|
||||
from src.overlay_node import OverlayNode
|
||||
from src.path import L1Path, PathSegment
|
||||
|
||||
|
||||
async def demo():
|
||||
# Create AS keys for L1 hop validation
|
||||
as_keys = {
|
||||
"AS1": KeyPair(),
|
||||
"AS2": KeyPair(),
|
||||
"AS3": KeyPair(),
|
||||
}
|
||||
|
||||
# Create overlay nodes (L3) with keys (these also act as service endpoints L5)
|
||||
nodeA = OverlayNode("Alice", KeyPair())
|
||||
nodeB = OverlayNode("Bob", KeyPair())
|
||||
nodeC = OverlayNode("Carol", KeyPair())
|
||||
|
||||
# Connect peers (mesh)
|
||||
nodeA.add_peer(nodeB)
|
||||
nodeB.add_peer(nodeC)
|
||||
nodeA.add_peer(nodeC)
|
||||
|
||||
# L5: register an alias for Carol
|
||||
ledger = AliasLedger()
|
||||
ledger.register_alias("carol.service", nodeC.kp, seq=1, exp=int(time.time()) + 3600)
|
||||
|
||||
# L1: Build a SCION-like path A -> AS1 -> AS2 -> C
|
||||
hop1_sig = as_keys["AS1"].sign(b"AS1")
|
||||
hop2_sig = as_keys["AS2"].sign(b"AS2")
|
||||
segment = PathSegment([ASHop("AS1", hop1_sig), ASHop("AS2", hop2_sig)])
|
||||
path = L1Path([segment])
|
||||
assert path.validate(as_keys), "L1 path validation failed"
|
||||
print("[L1] Path validated")
|
||||
|
||||
# L2: Setup HTX session between A and C (outer fingerprint mirrored)
|
||||
fingerprint = "origin:example.com:ja3hash" # placeholder
|
||||
sessionAC_local = HTXSession(nodeA.kp, nodeC.kp.pub, fingerprint)
|
||||
sessionAC_remote = HTXSession(nodeC.kp, nodeA.kp.pub, fingerprint)
|
||||
# link peers
|
||||
sessionAC_local.peer = sessionAC_remote
|
||||
sessionAC_remote.peer = sessionAC_local
|
||||
|
||||
await asyncio.gather(
|
||||
sessionAC_local.perform_handshake(), sessionAC_remote.perform_handshake()
|
||||
)
|
||||
|
||||
# Start receive loops
|
||||
async def handler_a(ftype, sid, plain):
|
||||
print(
|
||||
f"[L2][A] Received frame type={ftype} sid={sid} payload={plain.decode(errors='ignore')}"
|
||||
)
|
||||
|
||||
async def handler_c(ftype, sid, plain):
|
||||
print(
|
||||
f"[L2][C] Received frame type={ftype} sid={sid} payload={plain.decode(errors='ignore')}"
|
||||
)
|
||||
|
||||
asyncio.create_task(sessionAC_local.recv_loop(handler_a))
|
||||
asyncio.create_task(sessionAC_remote.recv_loop(handler_c))
|
||||
|
||||
# L3: publish content from Carol
|
||||
cid = nodeC.publish(b"Hello from Carol's content!")
|
||||
print(f"[L3] Carol published CID {cid[:16]}...")
|
||||
|
||||
# A resolves 'carol.service' via L5
|
||||
resolved_pk_hex = ledger.resolve("carol.service")
|
||||
if not resolved_pk_hex:
|
||||
print("[L5] Alias resolution failed")
|
||||
return
|
||||
print(f"[L5] Resolved carol.service -> {resolved_pk_hex[:16]}...")
|
||||
|
||||
# A asks overlay for CID (bitswap)
|
||||
content = await nodeA.bitswap_get(cid)
|
||||
if content:
|
||||
print("[L3] Alice retrieved content via overlay:", content.data)
|
||||
else:
|
||||
print("[L3] Alice failed to retrieve content from overlay")
|
||||
|
||||
# A sends a message to C over HTX inner frames (L2)
|
||||
await sessionAC_local.send_frame(
|
||||
FT_STREAM, 1, b"GET /resource HTTP/1.1\r\nHost: carol.service\r\n\r\n"
|
||||
)
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Simulate KEY_UPDATE
|
||||
await sessionAC_local.send_frame(FT_KEY_UPDATE, None, b"KEY_UPDATE")
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Send a ping
|
||||
await sessionAC_local.send_frame(FT_PING, None, b"PING")
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Close
|
||||
await sessionAC_local.send_frame(FT_CLOSE, None, b"GOODBYE")
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
print("\nDemo complete.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(demo())
|
||||
31
src/alias_ledger.py
Normal file
31
src/alias_ledger.py
Normal file
@ -0,0 +1,31 @@
|
||||
# ------------------------- L5: Naming & Alias Ledger --------------------------
|
||||
import base64
|
||||
from typing import Dict, Optional
|
||||
|
||||
from .alias_record import AliasRecord
|
||||
from .betanet_cryptography import KeyPair
|
||||
|
||||
|
||||
class AliasLedger:
|
||||
"""A tiny in-memory alias ledger that simulates finality across 3 chains.
|
||||
For demo, we assume we get finality on 2 chains when registering.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.aliases: Dict[str, AliasRecord] = {}
|
||||
|
||||
def register_alias(self, name: str, kp: KeyPair, seq: int, exp: int) -> bool:
|
||||
# Build payload and signature
|
||||
payload = f"{name} pk={kp.public_hex()} seq={seq} exp={exp}".encode("utf-8")
|
||||
sig = kp.sign(payload)
|
||||
sig_b64 = base64.b64encode(sig).decode("ascii")
|
||||
# Simulate 2-of-3 finality by accepting after minor delay
|
||||
self.aliases[name] = AliasRecord(kp.public_hex(), seq, sig_b64, exp)
|
||||
print(f"[L5] Alias '{name}' registered (simulated 2-of-3 finality)")
|
||||
return True
|
||||
|
||||
def resolve(self, name: str) -> Optional[str]:
|
||||
rec = self.aliases.get(name)
|
||||
if not rec:
|
||||
return None
|
||||
return rec.pk_hex
|
||||
9
src/alias_record.py
Normal file
9
src/alias_record.py
Normal file
@ -0,0 +1,9 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class AliasRecord:
|
||||
pk_hex: str
|
||||
seq: int
|
||||
sig_b64: str
|
||||
exp: int
|
||||
8
src/ashop.py
Normal file
8
src/ashop.py
Normal file
@ -0,0 +1,8 @@
|
||||
# --------------------------------- L1: AS-hop ---------------------------------
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ASHop:
|
||||
as_id: str
|
||||
sig: bytes # placeholder signature of the hop data by AS key
|
||||
33
src/betanet_cryptography.py
Normal file
33
src/betanet_cryptography.py
Normal file
@ -0,0 +1,33 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import secrets
|
||||
|
||||
|
||||
def sha256(data: bytes) -> bytes:
|
||||
return hashlib.sha256(data).digest()
|
||||
|
||||
|
||||
class KeyPair:
|
||||
"""Placeholder keypair. Use real Ed25519/X25519 in real code.
|
||||
Here 'secret' is random bytes; pubkey is SHA256(secret) (non-standard!).
|
||||
Signing is HMAC-SHA256(secret, message) as a stand-in.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.secret = secrets.token_bytes(32)
|
||||
self.pub = sha256(self.secret)
|
||||
|
||||
def sign(self, message: bytes) -> bytes:
|
||||
return hmac.new(self.secret, message, hashlib.sha256).digest()
|
||||
|
||||
def verify(self, message: bytes, sig: bytes) -> bool:
|
||||
expected = hmac.new(self.secret, message, hashlib.sha256).digest()
|
||||
return hmac.compare_digest(expected, sig)
|
||||
|
||||
def public_hex(self) -> str:
|
||||
return self.pub.hex()
|
||||
|
||||
|
||||
# Self-certifying ID: hex SHA-256(pubkey)
|
||||
def self_cert_id(pubkey: bytes) -> str:
|
||||
return sha256(pubkey).hex()
|
||||
7
src/content.py
Normal file
7
src/content.py
Normal file
@ -0,0 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Content:
|
||||
cid: str
|
||||
data: bytes
|
||||
65
src/htx_frame.py
Normal file
65
src/htx_frame.py
Normal file
@ -0,0 +1,65 @@
|
||||
# ------------------------------- L2: HTX Frames -------------------------------
|
||||
import struct
|
||||
from typing import Optional, Tuple
|
||||
|
||||
# Frame types
|
||||
FT_STREAM = 0
|
||||
FT_PING = 1
|
||||
FT_CLOSE = 2
|
||||
FT_KEY_UPDATE = 3
|
||||
FT_WINDOW_UPDATE = 4
|
||||
|
||||
|
||||
class HTXFrame:
|
||||
"""Simple pack/unpack implementation for the inner frame format.
|
||||
struct Frame {
|
||||
uint24 length; // ciphertext length (excl. tag)
|
||||
uint8 type; // 0=STREAM, ...
|
||||
varint stream_id; // present if type==STREAM or WINDOW_UPDATE
|
||||
uint8[] ciphertext;
|
||||
}
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def pack(frame_type: int, stream_id: int, payload: bytes) -> bytes:
|
||||
# uint24 length
|
||||
l = len(payload)
|
||||
if l >= 2**24:
|
||||
raise ValueError("payload too large")
|
||||
length_bytes = l.to_bytes(3, "big")
|
||||
type_byte = struct.pack("B", frame_type)
|
||||
# varint (quick simple encoding: 1 or more bytes, here we use 1 byte for small ints)
|
||||
if stream_id is None:
|
||||
varint_bytes = b""
|
||||
else:
|
||||
if stream_id < 0x80:
|
||||
varint_bytes = struct.pack("B", stream_id)
|
||||
else:
|
||||
# simple multibyte varint (not fully RFC-complete)
|
||||
parts = []
|
||||
v = stream_id
|
||||
while v:
|
||||
parts.append((v & 0x7F) | 0x80)
|
||||
v >>= 7
|
||||
parts[0] &= 0x7F
|
||||
varint_bytes = bytes(parts)
|
||||
return length_bytes + type_byte + varint_bytes + payload
|
||||
|
||||
@staticmethod
|
||||
def unpack(stream: bytes) -> Tuple[int, Optional[int], bytes]:
|
||||
if len(stream) < 4:
|
||||
raise ValueError("stream too short")
|
||||
length = int.from_bytes(stream[0:3], "big")
|
||||
ftype = stream[3]
|
||||
idx = 4
|
||||
# read varint if needed
|
||||
stream_id = None
|
||||
if ftype in (FT_STREAM, FT_WINDOW_UPDATE):
|
||||
# read one byte varint for demo
|
||||
if idx >= len(stream):
|
||||
raise ValueError("missing varint")
|
||||
b = stream[idx]
|
||||
idx += 1
|
||||
stream_id = b
|
||||
payload = stream[idx : idx + length]
|
||||
return ftype, stream_id, payload
|
||||
65
src/htx_session.py
Normal file
65
src/htx_session.py
Normal file
@ -0,0 +1,65 @@
|
||||
# ------------------------------ L2: HTX Sessions ------------------------------
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
from typing import Optional
|
||||
|
||||
from .betanet_cryptography import KeyPair
|
||||
from .htx_frame import HTXFrame
|
||||
|
||||
|
||||
class HTXSession:
|
||||
"""Simulates an HTX session between two nodes. Uses asyncio queues to emulate a stream.
|
||||
Outer TLS mirroring is represented by storing a 'fingerprint' that must match.
|
||||
Inner 'Noise' handshake is simulated (derive shared secret via HMAC)."""
|
||||
|
||||
def __init__(self, local_kp: KeyPair, remote_pub: bytes, fingerprint: str):
|
||||
self.local_kp = local_kp
|
||||
self.remote_pub = remote_pub
|
||||
self.fingerprint = fingerprint
|
||||
self.inbox = asyncio.Queue()
|
||||
self.peer: Optional["HTXSession"] = None
|
||||
self.k = None # symmetric shared key (placeholder)
|
||||
self.nonce = 0
|
||||
|
||||
async def perform_handshake(self):
|
||||
# Outer TLS mirroring check (pretend we checked JA3/ALPN)
|
||||
print(
|
||||
f"[L2] Performing outer TLS pre-flight (mirror fingerprint={self.fingerprint})"
|
||||
)
|
||||
await asyncio.sleep(0.05)
|
||||
# inner handshake: derive shared key as HMAC(local_secret, remote_pub)
|
||||
self.k = hmac.new(
|
||||
self.local_kp.secret, self.remote_pub, hashlib.sha256
|
||||
).digest()
|
||||
print("[L2] Inner handshake complete (shared key derived)")
|
||||
|
||||
async def send_frame(self, ftype: int, stream_id: Optional[int], plaintext: bytes):
|
||||
# 'encrypt' = xor with shared key stream (simplified)
|
||||
if self.k is None:
|
||||
raise RuntimeError("handshake not performed")
|
||||
cipher = bytes(
|
||||
[
|
||||
plaintext[i] ^ self.k[(self.nonce + i) % len(self.k)]
|
||||
for i in range(len(plaintext))
|
||||
]
|
||||
)
|
||||
self.nonce += len(plaintext)
|
||||
packed = HTXFrame.pack(ftype, stream_id if stream_id is not None else 0, cipher)
|
||||
if self.peer is None:
|
||||
raise RuntimeError("no peer connected")
|
||||
await self.peer.inbox.put(packed)
|
||||
|
||||
async def recv_loop(self, handler):
|
||||
while True:
|
||||
packed = await self.inbox.get()
|
||||
try:
|
||||
ftype, stream_id, cipher = HTXFrame.unpack(packed)
|
||||
except Exception as e:
|
||||
print("[L2] Failed unpack:", e)
|
||||
continue
|
||||
# decrypt
|
||||
plain = bytes(
|
||||
[cipher[i] ^ self.k[(0 + i) % len(self.k)] for i in range(len(cipher))]
|
||||
)
|
||||
await handler(ftype, stream_id, plain)
|
||||
36
src/overlay_node.py
Normal file
36
src/overlay_node.py
Normal file
@ -0,0 +1,36 @@
|
||||
# ------------------------- L3: Overlay Mesh (simple) --------------------------
|
||||
from typing import Dict, Optional
|
||||
|
||||
from .betanet_cryptography import KeyPair, self_cert_id, sha256
|
||||
from .content import Content
|
||||
|
||||
|
||||
class OverlayNode:
|
||||
def __init__(self, name: str, kp: KeyPair):
|
||||
self.name = name
|
||||
self.kp = kp
|
||||
self.peerstore: Dict[str, "OverlayNode"] = {}
|
||||
self.store: Dict[str, Content] = {}
|
||||
|
||||
def id(self) -> str:
|
||||
return self_cert_id(self.kp.pub)
|
||||
|
||||
def add_peer(self, peer: "OverlayNode"):
|
||||
self.peerstore[peer.id()] = peer
|
||||
|
||||
def publish(self, data: bytes) -> str:
|
||||
cid = sha256(data).hex()
|
||||
self.store[cid] = Content(cid, data)
|
||||
return cid
|
||||
|
||||
async def bitswap_get(self, cid: str) -> Optional[Content]:
|
||||
# Try local
|
||||
c = self.store.get(cid)
|
||||
if c:
|
||||
return c
|
||||
# ask peers in peerstore (simple flooding)
|
||||
for pid, peer in list(self.peerstore.items()):
|
||||
c = peer.store.get(cid)
|
||||
if c:
|
||||
return c
|
||||
return None
|
||||
36
src/path.py
Normal file
36
src/path.py
Normal file
@ -0,0 +1,36 @@
|
||||
# ---------------------------------- L1: Path ----------------------------------
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List
|
||||
|
||||
from .ashop import ASHop
|
||||
from .overlay_node import KeyPair
|
||||
|
||||
|
||||
@dataclass
|
||||
class PathSegment:
|
||||
hops: List[ASHop]
|
||||
|
||||
def verify(self, as_pubkeys: Dict[str, KeyPair]) -> bool:
|
||||
# Each hop must validate using the AS pubkey (placeholder)
|
||||
for hop in self.hops:
|
||||
kp = as_pubkeys.get(hop.as_id)
|
||||
if not kp:
|
||||
print(f"[L1] Missing pubkey for AS {hop.as_id}")
|
||||
return False
|
||||
# hop sig should be HMAC(as_secret, as_id || ...) in our placeholder
|
||||
if not kp.verify(hop.as_id.encode("utf-8"), hop.sig):
|
||||
print(f"[L1] Invalid hop signature for AS {hop.as_id}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class L1Path:
|
||||
segments: List[PathSegment]
|
||||
|
||||
def validate(self, as_pubkeys: Dict[str, KeyPair]) -> bool:
|
||||
# For demo: validate every segment
|
||||
for seg in self.segments:
|
||||
if not seg.verify(as_pubkeys):
|
||||
return False
|
||||
return True
|
||||
Loading…
x
Reference in New Issue
Block a user