128 lines
3.3 KiB
Python
Executable File
128 lines
3.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import socket
|
|
from struct import pack
|
|
from time import sleep
|
|
from sys import exit
|
|
|
|
MAGIC_HEADER = b"zyx\0"
|
|
|
|
PARTITION_TYPES = {
|
|
"bootloader": 0x01,
|
|
"usrConfig": 0x02,
|
|
"ras": 0x04,
|
|
"rom-d": 0x08,
|
|
"ras-2": 0x10,
|
|
}
|
|
|
|
def checksum(data):
|
|
i = 0
|
|
for a in data:
|
|
i = i + int(a)
|
|
|
|
return pack(">H", (i >> 0x10) + i & 0xFFFF)
|
|
|
|
|
|
def l(i: int):
|
|
return pack(">L", i)
|
|
|
|
|
|
def split_data(data, n=0x400):
|
|
return [data[i : i + n] for i in range(0, len(data), n)]
|
|
|
|
|
|
def gen_packet(part, recvID, data, total=None):
|
|
if total == None:
|
|
total = len(data)
|
|
|
|
buf = bytearray(MAGIC_HEADER)
|
|
buf.extend(checksum(data))
|
|
buf.extend(l(recvID))
|
|
buf.extend(l(len(data)))
|
|
buf.extend(l(total))
|
|
buf.extend(b"\x00\x00")
|
|
buf.extend(part.to_bytes(1, "little"))
|
|
buf.extend(part.to_bytes(1, "little"))
|
|
buf.extend(l(0))
|
|
buf.extend(l(0xFFFFFFFF))
|
|
buf.extend(data)
|
|
|
|
return buf
|
|
|
|
|
|
def send_file(part, file, address="225.0.0.0", port=5631, ttl=2, progress=False, sleeptime=0.1):
|
|
data = file.read()
|
|
|
|
total = len(data)
|
|
chunks = split_data(bytearray(data))
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
|
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
|
|
|
|
if progress:
|
|
try:
|
|
from enlighten import Counter
|
|
|
|
pbar = Counter(
|
|
total=float(total),
|
|
bar_format="{percentage:3.0f}%|{bar}| {count:!.2j}B / {total:!.2j}B "
|
|
"[{elapsed} | {eta}, {rate:!.2j}B/s]",
|
|
)
|
|
except ImportError:
|
|
print("Install the Python package 'enlighten' for a progress bar")
|
|
print("Hint: Use the flag `-q` to suppress messages")
|
|
print("Uploading file...")
|
|
progress = False
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
packet = gen_packet(part, i, chunk, total=total)
|
|
sock.sendto(packet, (address, port))
|
|
sleep(sleeptime)
|
|
if progress:
|
|
pbar.update(float(len(chunk)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="Upload file to Zyxel device using the Multiboot procedure",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
parser.add_argument("file", type=argparse.FileType("rb"), help="file to upload")
|
|
parser.add_argument(
|
|
"-t",
|
|
metavar="TYPE",
|
|
default="ras",
|
|
choices=list(PARTITION_TYPES),
|
|
help="type of file",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
metavar="SECONDS",
|
|
default=0.1,
|
|
type=float,
|
|
help="time in seconds between sent packets",
|
|
)
|
|
parser.add_argument(
|
|
"-a", metavar="ADDRESS", default="225.0.0.0", help="multicast group for packets"
|
|
)
|
|
parser.add_argument(
|
|
"-p", metavar="PORT", default=5631, help="destination port for packets"
|
|
)
|
|
parser.add_argument("--ttl", default=2, help="TTL for packets")
|
|
parser.add_argument("-q", action="store_true", help="operate silently")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
send_file(
|
|
PARTITION_TYPES.get(args.t),
|
|
args.file,
|
|
port=args.p,
|
|
ttl=args.ttl,
|
|
progress=(not args.q),
|
|
sleeptime=args.s
|
|
)
|
|
except KeyboardInterrupt:
|
|
exit(130)
|