#! /usr/bin/env python3
#
# Copyright (C) 2018 Fx Bricks Inc.
# This file is part of the pfxbrick python module.
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# PFx Brick file system helpers
import os
import time
from pfxbrick import *
from pfxbrick.pfxdict import file_attr_dict, fileid_dict
from pfxbrick.pfxhelpers import *
from pfxbrick.pfxmsg import cmd_file_dir, usb_transaction
# fmt: off
try:
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
TextColumn, TimeRemainingColumn,
TransferSpeedColumn)
has_rich = True
except:
has_rich = False
# fmt: on
if has_rich:
progress = Progress(
TextColumn("[white]{task.fields[filename]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"•",
DownloadColumn(),
"•",
TransferSpeedColumn(),
"•",
TimeRemainingColumn(),
)
def fs_get_fileid_from_name(hdev, name):
"""
Resolves a text string filename into a numeric file ID.
:param hdev: device I/O handle
:returns: integer file ID or 0xFF if not found
"""
fileid = 0xFF
fb = bytes(name, "utf-8")
p = [len(fb)]
p.extend(fb)
res = cmd_file_dir(hdev, PFX_DIR_REQ_GET_NAMED_FILE_ID, p)
if len(res) >= 3 and not res[2] == PFX_ERR_FILE_NOT_FOUND:
fileid = int(res[2])
return fileid
def fs_error_check(res, silent=False):
"""
Convenience error status lookup function used by other file system functions.
:param res: result status code byte returned by almost all file system ICD messages
:returns: True if there is an error, False on success
"""
if res > 62:
if not silent:
print("File system error: [%02X] %s" % (res, get_error_str(res)))
return True
else:
return False
def fs_format(hdev, quick=False):
"""
Sends an ICD message to format the PFx Brick file system.
:param hdev: USB HID session handle
:param boolean quick: If True, only occupied sectors are erased. If False, every sector is erased, i.e. a complete format.
"""
msg = [PFX_CMD_FILE_FORMAT_FS, PFX_FORMAT_BYTE0, PFX_FORMAT_BYTE1, PFX_FORMAT_BYTE2]
if quick:
msg.append(0)
else:
msg.append(1)
res = usb_transaction(hdev, msg)
fs_error_check(res[1])
def fs_remove_file(hdev, fid, silent=False):
"""
Sends an ICD message to remove a file from the PFx Brick file system.
:param hdev: USB HID session handle
:param fid: the file ID of the file to remove
"""
msg = [PFX_CMD_FILE_REMOVE]
msg.append(fid)
res = usb_transaction(hdev, msg)
fs_error_check(res[1], silent=silent)
def fs_copy_file_to(brick, fid, fn, show_progress=True, with_bytes=None):
"""
File copy handler to put a file on the PFx Brick.
This function handles the process of opening and transferring
file data from the host to the PFx Brick file system. A copy session
may involve many message transactions with the PFx Brick and could
be time consuming. Therefore, a progress bar can be optionally shown
on the console to monitor the transfer.
:param brick: :obj:`PFxBrick` object
:param fid: a unique file ID to assign the copied file.
:param fn: the host filename (optionally including path) to copy
:param boolean show_progress: a flag to show the progress bar indicator during transfer.
"""
if with_bytes is not None:
nBytes = len(with_bytes)
else:
nBytes = os.path.getsize(fn)
if is_version_less_than(brick.icd_rev, "3.39"):
tx_chunk = 61
fast_write = False
else:
tx_chunk = 62
fast_write = True
if nBytes > 0:
msg = [PFX_CMD_FILE_OPEN]
msg.append(fid)
msg.append(0x06) # CREATE | WRITE mode
msg.extend(uint32_to_bytes(nBytes))
name = os.path.basename(fn)
nd = bytes(name, "utf-8")
for b in nd:
msg.append(b)
for i in range(32 - len(nd)):
msg.append(0)
res = usb_transaction(brick.dev, msg)
if not res:
return
if fs_error_check(res[1]):
return
if has_rich and show_progress:
with progress:
if with_bytes is None:
f = open(fn, "rb")
nCount = 0
err = False
transfer = progress.add_task("copy_to", filename=name, total=nBytes)
while (nCount < nBytes) and not err:
if with_bytes is None:
buf = f.read(tx_chunk)
else:
remain = len(with_bytes) - nCount
remain = min(tx_chunk, remain)
buf = with_bytes[nCount : nCount + remain]
nRead = len(buf)
nCount += nRead
if nRead > 0:
if fast_write:
msg = [PFX_CMD_FILE_WRITE_FAST]
msg.append(nRead)
else:
msg = [PFX_CMD_FILE_WRITE]
msg.append(fid)
msg.append(nRead)
for b in buf:
msg.append(b)
res = usb_transaction(brick.dev, msg)
err = fs_error_check(res[1])
progress.update(transfer, advance=nRead)
if with_bytes is None:
f.close()
msg = [PFX_CMD_FILE_CLOSE]
msg.append(fid)
res = usb_transaction(brick.dev, msg)
fs_error_check(res[1])
progress.remove_task(transfer)
else:
if with_bytes is None:
f = open(fn, "rb")
nCount = 0
err = False
while (nCount < nBytes) and not err:
if with_bytes is None:
buf = f.read(tx_chunk)
else:
remain = len(with_bytes) - nCount
remain = min(tx_chunk, remain)
buf = with_bytes[nCount : nCount + remain]
nRead = len(buf)
nCount += nRead
if nRead > 0:
if fast_write:
msg = [PFX_CMD_FILE_WRITE_FAST]
msg.append(nRead)
else:
msg = [PFX_CMD_FILE_WRITE]
msg.append(fid)
msg.append(nRead)
for b in buf:
msg.append(b)
res = usb_transaction(brick.dev, msg)
err = fs_error_check(res[1])
if show_progress:
printProgressBar(
nCount,
nBytes,
prefix="Copying:",
suffix="Complete",
length=50,
)
if with_bytes is None:
f.close()
msg = [PFX_CMD_FILE_CLOSE]
msg.append(fid)
res = usb_transaction(brick.dev, msg)
fs_error_check(res[1])
def fs_copy_file_from(
brick, pfile, fn=None, show_progress=True, as_bytes=False, to_console=False
):
"""
File copy handler to get a file from the PFx Brick.
This function handles the process of opening and transferring
file data from the PFx Brick file system to the host. A copy session
may involve many message transactions with the PFx Brick and could
be time consuming. Therefore, a progress bar can be optionally shown
on the console to monitor the transfer.
:param hdev: USB HID session handle
:param PFxFile pfile: a PFxFile object specifying the file to copy.
:param fn: optional name to override the filename of the host's copy.
:param boolean show_progress: a flag to show the progress bar indicator during transfer.
"""
if pfile is None:
return None
rx_chunk = 62
msg = [PFX_CMD_FILE_OPEN]
msg.append(pfile.id)
msg.append(0x01) # READ mode
res = usb_transaction(brick.dev, msg)
if not res:
return None
if fs_error_check(res[1]):
return None
rbytes = bytearray()
if has_rich and show_progress:
with progress:
nf = pfile.name
if fn is not None:
nf = fn
nCount = 0
err = False
transfer = progress.add_task("copy_from", filename=nf, total=pfile.size)
while (nCount < pfile.size) and not err:
msg = [PFX_CMD_FILE_READ]
msg.append(pfile.id)
nToRead = pfile.size - nCount
if nToRead > rx_chunk:
nToRead = rx_chunk
msg.append(nToRead)
res = usb_transaction(brick.dev, msg)
err = fs_error_check(res[1])
if not err:
nCount += res[1]
b = bytes(res[2 : 2 + res[1]])
rbytes.extend(b)
if to_console and not show_progress:
if as_bytes:
pprint_bytes(b)
else:
s = []
for bc in b:
s.append("%c" % (bc))
print("".join(s), end="")
progress.update(transfer, advance=res[1])
msg = [PFX_CMD_FILE_CLOSE]
msg.append(pfile.id)
res = usb_transaction(brick.dev, msg)
fs_error_check(res[1])
progress.remove_task(transfer)
if not as_bytes:
with open(nf, "wb") as f:
f.write(rbytes)
return rbytes
else:
nf = pfile.name
if fn is not None:
nf = fn
nCount = 0
err = False
while (nCount < pfile.size) and not err:
msg = [PFX_CMD_FILE_READ]
msg.append(pfile.id)
nToRead = pfile.size - nCount
if nToRead > rx_chunk:
nToRead = rx_chunk
msg.append(nToRead)
res = usb_transaction(brick.dev, msg)
err = fs_error_check(res[1])
if not err:
nCount += res[1]
b = bytes(res[2 : 2 + res[1]])
rbytes.extend(b)
if to_console and not show_progress:
if as_bytes:
pprint_bytes(b)
else:
s = []
for bc in b:
s.append("%c" % (bc))
print("".join(s), end="")
if show_progress:
printProgressBar(
nCount,
pfile.size,
prefix="Copying:",
suffix="Complete",
length=50,
)
msg = [PFX_CMD_FILE_CLOSE]
msg.append(pfile.id)
res = usb_transaction(brick.dev, msg)
fs_error_check(res[1])
if not as_bytes:
with open(nf, "wb") as f:
f.write(rbytes)
return rbytes
return None
[docs]class PFxFile:
"""
File directory entry container class.
This class contains directory entry data for a file on the PFx file system.
Attributes:
id (:obj:`int`): unique file ID
size (:obj:`int`): size in bytes
firstSector (:obj:`int`): the first 4k sector index in flash memory
attributes (:obj:`int`): 16-bit attributes field
userData1 (:obj:`int`): 32-bit user defined data field
userData2 (:obj:`int`): 32-bit user defined data field
crc32 (:obj:`int`): CRC32 hash of the file (auto computed after write)
name (:obj:`str`): UTF-8 filename up to 32 bytes
"""
def __init__(self):
self.id = 0
self.size = 0
self.firstSector = 0
self.attributes = 0
self.userData1 = 0
self.userData2 = 0
self.crc32 = 0
self.name = ""
[docs] def is_audio_file(self):
"""
Checks the file attributes to see if this file is a valid audio WAV file.
:returns: True if it is valid audio WAV file
"""
if (self.attributes & PFX_FILE_FMT_MASK) == PFX_FILE_FMT_WAV:
if self.userData1 != 0 and self.userData2 != 0:
return True
return False
[docs] def is_script_file(self):
"""
Checks the file attributes to see if this file is a valid script file.
:returns: True if it is valid script file
"""
if (self.attributes & PFX_FILE_ATTR_MASK) == PFX_FILE_ATTR_SCRIPT:
return True
if (self.attributes & PFX_FILE_FMT_MASK) == PFX_FILE_FMT_PFX:
return True
return False
[docs] def has_same_crc32_as_file(self, other):
"""
Checks if the CRC32 of this file is the same as a specified file on the local filesystem
:returns: True if CRC32 hash codes match
"""
other_crc = get_file_crc32(other)
if self.crc32 == other_crc:
return True
return False
[docs] def from_bytes(self, msg):
"""
Converts the message string bytes read from the PFx Brick into
the corresponding data members of this class.
"""
self.id = msg[3]
self.size = uint32_toint(msg[4:8])
self.firstSector = uint16_toint(msg[8:10])
self.attributes = uint16_toint(msg[10:12])
self.userData1 = uint32_toint(msg[12:16])
self.userData2 = uint32_toint(msg[16:20])
self.crc32 = uint32_toint(msg[20:24])
self.name = safe_unicode_str(msg[24:56])
[docs] def __str__(self):
"""
Convenient human readable string of a file directory entry. This allows
a :py:class:`PFxFile` object to be used with :obj:`str` and :obj:`print` methods.
"""
attr_str = ""
if self.attributes in file_attr_dict:
attr_str = file_attr_dict[self.attributes]
elif self.id in fileid_dict:
attr_str = fileid_dict[self.id]
s = "%3d %-24s %6.1f kB %04X %08X %08X %08X %04X %02X %s" % (
self.id,
self.name,
float(self.size / 1000),
self.attributes,
self.userData1,
self.userData2,
self.crc32,
self.firstSector,
self.id,
attr_str,
)
return s
[docs] def colour_str(self):
"""
Convenient human readable colour string of a file directory entry.
"""
attr_str = ""
if self.attributes in file_attr_dict:
attr_str = file_attr_dict[self.attributes]
elif self.id in fileid_dict:
attr_str = fileid_dict[self.id]
s = "%3d %-24s [bold white]%6.1f kB[/bold white] [bold blue]%04X[/bold blue] [bold black]%08X %08X[/bold black] [bold aquamarine3]%08X[/bold aquamarine3] [bold black]%04X %02X[/bold black] %s" % (
self.id,
self.name,
float(self.size / 1000),
self.attributes,
self.userData1,
self.userData2,
self.crc32,
self.firstSector,
self.id,
attr_str,
)
return s
[docs]class PFxDir:
"""
File directory container class.
This class contains PFx file system directory.
Attributes:
numFiles (:obj:`int`): number of files in the file system
files ([:obj:`PFxFile`]): a list of PFxFile objects corresponding to directory entries
bytesUsed (:obj:`int`): bytes occupied by files
bytesLeft (:obj:`int`): remaining space in bytes
"""
def __init__(self):
self.numFiles = 0
self.files = []
self.bytesUsed = 0
self.bytesLeft = 0
[docs] def get_file_dir_entry(self, fid):
"""
Returns a file directory entry containined in a :py:class:`PFxFile` class.
:param int fid: the unique file ID of desired directory entry
:returns: :py:class:`PFxFile` directory entry
"""
for f in self.files:
if f.id == fid:
return f
return None
[docs] def get_filename(self, fid):
"""
Returns a filename with a numeric file ID
:param int fid: the unique file ID of desired directory entry
:returns: :obj:`str` filename of file ID, or None
"""
f = self.get_file_dir_entry(fid)
if f is not None:
return f.name
return None
[docs] def find_available_file_id(self):
"""
Returns the next available unique file ID from the file system.
The directory is scanned for all currently used file ID values and
returns an un-used/available file ID value.
:returns: :obj:`int` next available file ID value, or None
"""
used_ids = [x.id for x in self.files]
for x in range(255):
if x not in used_ids:
return x
return None
[docs] def has_file(self, fileID):
"""
Determines if a specified file is on the PFx Brick file system either by
filename or numeric file ID.
:returns: :obj:`boolean` True or False if the file is found
"""
for file in self.files:
if isinstance(fileID, int):
if file.id == fileID:
return True
elif isinstance(fileID, str):
if file.name == fileID:
return True
return False
[docs] def __str__(self):
"""
Convenient human readable string of the file directory. This allows
a :py:class:`PFxDir` object to be used with :obj:`str` and :obj:`print` methods.
"""
sb = []
sb.append(
"%3s %-24s %6s %4s %8s %8s %8s %5s %s"
% (
"ID",
"Name",
"Size",
"Attr",
"User1",
"User2",
"CRC32",
"Start",
"Ext Attr",
)
)
for f in self.files:
sb.append(str(f))
sb.append(
"%d files, %.1f kB used, %.1f kB remaining"
% (
len(self.files),
float(self.bytesUsed / 1000),
float(self.bytesLeft / 1000),
)
)
s = "\n".join(sb)
return s
[docs] def colour_dir(self):
"""
Convenient human readable string of the file directory with colour.
"""
sb = []
sb.append(
"[bold yellow]%3s %-24s %6s %4s %8s %8s %8s %5s %s[/bold yellow]"
% (
"ID",
"Name",
"Size",
"Attr",
"User1",
"User2",
"CRC32",
"Start",
"Ext Attr",
)
)
for f in self.files:
sb.append(f.colour_str())
sb.append(
"%d files, %.1f kB used, %.1f kB remaining"
% (
len(self.files),
float(self.bytesUsed / 1000),
float(self.bytesLeft / 1000),
)
)
s = "\n".join(sb)
return s