Add USB hot-plug resilience and batch printing
USBPrinter no longer raises on init — the service starts and stays running regardless of whether a printer is connected, reconnecting automatically on demand. Print jobs are now queued and batched: a 500ms collection window groups consecutive labels into a single USB write, eliminating gaps when printing multiple labels from InvenTree. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
244
bridge.py
244
bridge.py
@@ -41,94 +41,147 @@ PRODUCT_IDS = {
|
|||||||
logger = logging.getLogger("brother-ql-bridge")
|
logger = logging.getLogger("brother-ql-bridge")
|
||||||
|
|
||||||
|
|
||||||
|
class PrinterUnavailableError(Exception):
|
||||||
|
"""No printer found on USB."""
|
||||||
|
|
||||||
|
|
||||||
|
class PrinterDisconnectedError(Exception):
|
||||||
|
"""Printer was lost during a write operation."""
|
||||||
|
|
||||||
|
|
||||||
class USBPrinter:
|
class USBPrinter:
|
||||||
"""Handles USB communication with the Brother QL printer."""
|
"""Handles USB communication with the Brother QL printer."""
|
||||||
|
|
||||||
def __init__(self, device=None):
|
def __init__(self, device=None):
|
||||||
|
self._device_spec = device
|
||||||
self.dev = None
|
self.dev = None
|
||||||
self.ep_out = None
|
self.ep_out = None
|
||||||
self.ep_in = None
|
self.ep_in = None
|
||||||
|
self._connected = False
|
||||||
self._lock = asyncio.Lock()
|
self._lock = asyncio.Lock()
|
||||||
|
|
||||||
if device:
|
@property
|
||||||
# Parse device string like "usb://0x04f9:0x2028"
|
def is_connected(self) -> bool:
|
||||||
if device.startswith("usb://"):
|
return self._connected and self.dev is not None and self.ep_out is not None
|
||||||
parts = device[6:].split(":")
|
|
||||||
|
def _find_device(self) -> bool:
|
||||||
|
"""Attempt to find the USB device. Returns True if found."""
|
||||||
|
self.dev = None
|
||||||
|
|
||||||
|
if self._device_spec:
|
||||||
|
if self._device_spec.startswith("usb://"):
|
||||||
|
parts = self._device_spec[6:].split(":")
|
||||||
vendor = int(parts[0], 16)
|
vendor = int(parts[0], 16)
|
||||||
product = int(parts[1].split("/")[0], 16)
|
product = int(parts[1].split("/")[0], 16)
|
||||||
self.dev = usb.core.find(idVendor=vendor, idProduct=product)
|
self.dev = usb.core.find(idVendor=vendor, idProduct=product)
|
||||||
else:
|
else:
|
||||||
# Auto-detect Brother QL printer
|
|
||||||
self.dev = usb.core.find(idVendor=BROTHER_VENDOR_ID)
|
self.dev = usb.core.find(idVendor=BROTHER_VENDOR_ID)
|
||||||
|
|
||||||
if self.dev is None:
|
if self.dev is None:
|
||||||
raise RuntimeError("No Brother QL printer found on USB")
|
return False
|
||||||
|
|
||||||
product_name = PRODUCT_IDS.get(self.dev.idProduct, f"Unknown (0x{self.dev.idProduct:04x})")
|
product_name = PRODUCT_IDS.get(self.dev.idProduct, f"Unknown (0x{self.dev.idProduct:04x})")
|
||||||
logger.info(f"Found printer: {product_name}")
|
logger.info(f"Found printer: {product_name}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def connect(self) -> bool:
|
||||||
|
"""Initialize USB connection to the printer. Returns True on success."""
|
||||||
|
self.disconnect()
|
||||||
|
|
||||||
|
if not self._find_device():
|
||||||
|
logger.warning("No Brother QL printer found on USB")
|
||||||
|
return False
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
"""Initialize USB connection to the printer."""
|
|
||||||
# Detach kernel driver if necessary
|
|
||||||
try:
|
try:
|
||||||
if self.dev.is_kernel_driver_active(0):
|
# Detach kernel driver if necessary
|
||||||
self.dev.detach_kernel_driver(0)
|
try:
|
||||||
logger.debug("Detached kernel driver")
|
if self.dev.is_kernel_driver_active(0):
|
||||||
except (usb.core.USBError, NotImplementedError):
|
self.dev.detach_kernel_driver(0)
|
||||||
pass
|
logger.debug("Detached kernel driver")
|
||||||
|
except (usb.core.USBError, NotImplementedError):
|
||||||
|
pass
|
||||||
|
|
||||||
# Set configuration
|
# Set configuration
|
||||||
try:
|
try:
|
||||||
self.dev.set_configuration()
|
self.dev.set_configuration()
|
||||||
except usb.core.USBError:
|
except usb.core.USBError:
|
||||||
pass # Already configured
|
pass # Already configured
|
||||||
|
|
||||||
# Get endpoints
|
# Get endpoints
|
||||||
cfg = self.dev.get_active_configuration()
|
cfg = self.dev.get_active_configuration()
|
||||||
intf = cfg[(0, 0)]
|
intf = cfg[(0, 0)]
|
||||||
|
|
||||||
self.ep_out = usb.util.find_descriptor(
|
self.ep_out = usb.util.find_descriptor(
|
||||||
intf,
|
intf,
|
||||||
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
|
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
|
||||||
== usb.util.ENDPOINT_OUT,
|
== usb.util.ENDPOINT_OUT,
|
||||||
)
|
)
|
||||||
self.ep_in = usb.util.find_descriptor(
|
self.ep_in = usb.util.find_descriptor(
|
||||||
intf,
|
intf,
|
||||||
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
|
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
|
||||||
== usb.util.ENDPOINT_IN,
|
== usb.util.ENDPOINT_IN,
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.ep_out is None:
|
if self.ep_out is None:
|
||||||
raise RuntimeError("Could not find USB OUT endpoint")
|
logger.error("Could not find USB OUT endpoint")
|
||||||
|
self.disconnect()
|
||||||
|
return False
|
||||||
|
|
||||||
logger.info("USB connection established")
|
self._connected = True
|
||||||
|
logger.info("USB connection established")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except usb.core.USBError as e:
|
||||||
|
logger.error(f"USB connection failed: {e}")
|
||||||
|
self.disconnect()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""Release USB resources. Safe to call repeatedly."""
|
||||||
|
self._connected = False
|
||||||
|
self.ep_out = None
|
||||||
|
self.ep_in = None
|
||||||
|
if self.dev is not None:
|
||||||
|
try:
|
||||||
|
usb.util.dispose_resources(self.dev)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.dev = None
|
||||||
|
|
||||||
|
def ensure_connected(self) -> bool:
|
||||||
|
"""Return True if connected, attempting reconnect if needed."""
|
||||||
|
if self.is_connected:
|
||||||
|
return True
|
||||||
|
return self.connect()
|
||||||
|
|
||||||
def write_sync(self, data: bytes) -> int:
|
def write_sync(self, data: bytes) -> int:
|
||||||
"""Write data to the printer synchronously with proper timeout."""
|
"""Write data to the printer synchronously with proper timeout."""
|
||||||
# Write in chunks with longer timeout (15 seconds)
|
if not self.ensure_connected():
|
||||||
chunk_size = 32768
|
raise PrinterUnavailableError("No printer available")
|
||||||
total_written = 0
|
|
||||||
for i in range(0, len(data), chunk_size):
|
try:
|
||||||
chunk = data[i : i + chunk_size]
|
chunk_size = 32768
|
||||||
written = self.ep_out.write(chunk, timeout=15000)
|
total_written = 0
|
||||||
total_written += written
|
for i in range(0, len(data), chunk_size):
|
||||||
return total_written
|
chunk = data[i : i + chunk_size]
|
||||||
|
written = self.ep_out.write(chunk, timeout=15000)
|
||||||
|
total_written += written
|
||||||
|
return total_written
|
||||||
|
except usb.core.USBError as e:
|
||||||
|
logger.error(f"USB write error: {e}")
|
||||||
|
self.disconnect()
|
||||||
|
raise PrinterDisconnectedError(f"Printer disconnected during write: {e}") from e
|
||||||
|
|
||||||
async def write(self, data: bytes) -> int:
|
async def write(self, data: bytes) -> int:
|
||||||
"""Write data to the printer asynchronously."""
|
"""Write data to the printer asynchronously."""
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
try:
|
return await asyncio.get_event_loop().run_in_executor(
|
||||||
return await asyncio.get_event_loop().run_in_executor(
|
None, self.write_sync, data
|
||||||
None, self.write_sync, data
|
)
|
||||||
)
|
|
||||||
except usb.core.USBError as e:
|
|
||||||
logger.error(f"USB write error: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""Release USB resources."""
|
"""Release USB resources."""
|
||||||
if self.dev:
|
self.disconnect()
|
||||||
usb.util.dispose_resources(self.dev)
|
|
||||||
|
|
||||||
|
|
||||||
class PrinterBridge:
|
class PrinterBridge:
|
||||||
@@ -140,6 +193,9 @@ class PrinterBridge:
|
|||||||
self.port = port
|
self.port = port
|
||||||
self.server = None
|
self.server = None
|
||||||
self.job_count = 0
|
self.job_count = 0
|
||||||
|
self._job_queue = asyncio.Queue()
|
||||||
|
self._batch_timeout = 0.5
|
||||||
|
self._batch_task = None
|
||||||
|
|
||||||
async def handle_client(
|
async def handle_client(
|
||||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||||
@@ -160,21 +216,76 @@ class PrinterBridge:
|
|||||||
chunks.append(data)
|
chunks.append(data)
|
||||||
|
|
||||||
job_data = b"".join(chunks)
|
job_data = b"".join(chunks)
|
||||||
logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, sending to printer...")
|
|
||||||
|
|
||||||
# Send complete job to printer
|
if len(job_data) == 0:
|
||||||
await self.printer.write(job_data)
|
logger.warning(f"[Job {job_id}] Skipped: empty job (0 bytes)")
|
||||||
logger.info(f"[Job {job_id}] Completed successfully")
|
return
|
||||||
|
|
||||||
|
logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, queuing for print...")
|
||||||
|
await self._job_queue.put((job_id, job_data))
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[Job {job_id}] Error: {e}")
|
logger.error(f"[Job {job_id}] Error receiving data: {e}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
writer.close()
|
writer.close()
|
||||||
await writer.wait_closed()
|
await writer.wait_closed()
|
||||||
|
|
||||||
|
async def _batch_sender(self):
|
||||||
|
"""Background coroutine that drains the job queue in batches."""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Wait for the first job
|
||||||
|
first_job = await self._job_queue.get()
|
||||||
|
batch = [first_job]
|
||||||
|
|
||||||
|
# Collect more jobs within the batch window
|
||||||
|
deadline = asyncio.get_event_loop().time() + self._batch_timeout
|
||||||
|
while True:
|
||||||
|
remaining = deadline - asyncio.get_event_loop().time()
|
||||||
|
if remaining <= 0:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
job = await asyncio.wait_for(
|
||||||
|
self._job_queue.get(), timeout=remaining
|
||||||
|
)
|
||||||
|
batch.append(job)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
break
|
||||||
|
|
||||||
|
job_ids = [jid for jid, _ in batch]
|
||||||
|
combined_data = b"".join(data for _, data in batch)
|
||||||
|
|
||||||
|
if len(batch) > 1:
|
||||||
|
logger.info(
|
||||||
|
f"[Batch] Sending {len(batch)} jobs "
|
||||||
|
f"({job_ids}) as one write ({len(combined_data)} bytes)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"[Job {job_ids[0]}] Sending {len(combined_data)} bytes to printer..."
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.printer.write(combined_data)
|
||||||
|
for jid in job_ids:
|
||||||
|
logger.info(f"[Job {jid}] Completed successfully")
|
||||||
|
except PrinterUnavailableError:
|
||||||
|
for jid in job_ids:
|
||||||
|
logger.warning(f"[Job {jid}] Rejected: no printer connected")
|
||||||
|
except PrinterDisconnectedError:
|
||||||
|
for jid in job_ids:
|
||||||
|
logger.error(f"[Job {jid}] Failed: printer disconnected during print")
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[Batch sender] Unexpected error: {e}")
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""Start the bridge server."""
|
"""Start the bridge server."""
|
||||||
|
self._batch_task = asyncio.create_task(self._batch_sender())
|
||||||
|
|
||||||
self.server = await asyncio.start_server(
|
self.server = await asyncio.start_server(
|
||||||
self.handle_client, self.host, self.port
|
self.handle_client, self.host, self.port
|
||||||
)
|
)
|
||||||
@@ -187,6 +298,12 @@ class PrinterBridge:
|
|||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
"""Stop the bridge server."""
|
"""Stop the bridge server."""
|
||||||
|
if self._batch_task:
|
||||||
|
self._batch_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._batch_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
if self.server:
|
if self.server:
|
||||||
self.server.close()
|
self.server.close()
|
||||||
await self.server.wait_closed()
|
await self.server.wait_closed()
|
||||||
@@ -227,13 +344,12 @@ async def main():
|
|||||||
datefmt="%Y-%m-%d %H:%M:%S"
|
datefmt="%Y-%m-%d %H:%M:%S"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize printer
|
# Initialize printer (never exits — printer may appear later)
|
||||||
try:
|
printer = USBPrinter(args.device)
|
||||||
printer = USBPrinter(args.device)
|
if printer.connect():
|
||||||
printer.connect()
|
logger.info("Printer ready")
|
||||||
except RuntimeError as e:
|
else:
|
||||||
logger.error(f"Failed to initialize printer: {e}")
|
logger.warning("No printer detected at startup — will retry on first print job")
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Create and run bridge
|
# Create and run bridge
|
||||||
bridge = PrinterBridge(printer, args.host, args.port)
|
bridge = PrinterBridge(printer, args.host, args.port)
|
||||||
|
|||||||
Reference in New Issue
Block a user