#!/usr/bin/env python3 """ Brother QL Bridge - USB to Network Bridge Exposes a USB-connected Brother QL printer (e.g., QL-570) on the network so it can be used with network-based printing tools like InvenTree's Brother plugin. """ import argparse import asyncio import logging import signal import sys import usb.core import usb.util import config # Brother printer USB vendor ID BROTHER_VENDOR_ID = 0x04F9 # Known QL printer product IDs PRODUCT_IDS = { 0x2015: "QL-500", 0x2016: "QL-550", 0x2027: "QL-560", 0x2028: "QL-570", 0x2029: "QL-580N", 0x201B: "QL-650TD", 0x2042: "QL-700", 0x2043: "QL-710W", 0x2044: "QL-720NW", 0x209B: "QL-800", 0x209C: "QL-810W", 0x209D: "QL-820NWB", 0x2020: "QL-1050", 0x202A: "QL-1060N", } logger = logging.getLogger("brother-ql-bridge") class PrinterUnavailableError(Exception): """No printer found on USB.""" class PrinterDisconnectedError(Exception): """Printer was lost during a write operation.""" class USBPrinter: """Handles USB communication with the Brother QL printer.""" def __init__(self, device=None): self._device_spec = device self.dev = None self.ep_out = None self.ep_in = None self._connected = False self._lock = asyncio.Lock() @property def is_connected(self) -> bool: return self._connected and self.dev is not None and self.ep_out is not None def _find_device(self) -> bool: """Attempt to find the USB device. Returns True if found.""" self.dev = None if self._device_spec: if self._device_spec.startswith("usb://"): parts = self._device_spec[6:].split(":") vendor = int(parts[0], 16) product = int(parts[1].split("/")[0], 16) self.dev = usb.core.find(idVendor=vendor, idProduct=product) else: self.dev = usb.core.find(idVendor=BROTHER_VENDOR_ID) if self.dev is None: return False product_name = PRODUCT_IDS.get(self.dev.idProduct, f"Unknown (0x{self.dev.idProduct:04x})") logger.info(f"Found printer: {product_name}") return True def connect(self) -> bool: """Initialize USB connection to the printer. Returns True on success.""" self.disconnect() if not self._find_device(): logger.warning("No Brother QL printer found on USB") return False try: # Detach kernel driver if necessary try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) logger.debug("Detached kernel driver") except (usb.core.USBError, NotImplementedError): pass # Set configuration try: self.dev.set_configuration() except usb.core.USBError: pass # Already configured # Get endpoints cfg = self.dev.get_active_configuration() intf = cfg[(0, 0)] self.ep_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT, ) self.ep_in = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN, ) if self.ep_out is None: logger.error("Could not find USB OUT endpoint") self.disconnect() return False self._connected = True logger.info("USB connection established") return True except usb.core.USBError as e: logger.error(f"USB connection failed: {e}") self.disconnect() return False def disconnect(self): """Release USB resources. Safe to call repeatedly.""" self._connected = False self.ep_out = None self.ep_in = None if self.dev is not None: try: usb.util.dispose_resources(self.dev) except Exception: pass self.dev = None def ensure_connected(self) -> bool: """Return True if connected, attempting reconnect if needed.""" if self.is_connected: return True return self.connect() def write_sync(self, data: bytes) -> int: """Write data to the printer synchronously with proper timeout.""" if not self.ensure_connected(): raise PrinterUnavailableError("No printer available") try: chunk_size = 32768 total_written = 0 for i in range(0, len(data), chunk_size): chunk = data[i : i + chunk_size] written = self.ep_out.write(chunk, timeout=15000) total_written += written return total_written except usb.core.USBError as e: logger.error(f"USB write error: {e}") self.disconnect() raise PrinterDisconnectedError(f"Printer disconnected during write: {e}") from e async def write(self, data: bytes) -> int: """Write data to the printer asynchronously.""" async with self._lock: return await asyncio.get_event_loop().run_in_executor( None, self.write_sync, data ) def close(self): """Release USB resources.""" self.disconnect() class PrinterBridge: """TCP to USB bridge server.""" def __init__(self, printer: USBPrinter, host: str, port: int): self.printer = printer self.host = host self.port = port self.server = None self.job_count = 0 self._job_queue = asyncio.Queue() self._batch_timeout = 0.5 self._batch_task = None async def handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ): """Handle incoming print job from TCP client.""" addr = writer.get_extra_info("peername") self.job_count += 1 job_id = self.job_count logger.info(f"[Job {job_id}] Connection from {addr}") try: # Buffer entire job first (Brother raster protocol works best this way) chunks = [] while True: data = await reader.read(4096) if not data: break chunks.append(data) job_data = b"".join(chunks) if len(job_data) == 0: logger.warning(f"[Job {job_id}] Skipped: empty job (0 bytes)") return logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, queuing for print...") await self._job_queue.put((job_id, job_data)) except Exception as e: logger.error(f"[Job {job_id}] Error receiving data: {e}") finally: writer.close() await writer.wait_closed() async def _batch_sender(self): """Background coroutine that drains the job queue in batches.""" while True: try: # Wait for the first job first_job = await self._job_queue.get() batch = [first_job] # Collect more jobs within the batch window deadline = asyncio.get_event_loop().time() + self._batch_timeout while True: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: break try: job = await asyncio.wait_for( self._job_queue.get(), timeout=remaining ) batch.append(job) except asyncio.TimeoutError: break job_ids = [jid for jid, _ in batch] combined_data = b"".join(data for _, data in batch) if len(batch) > 1: logger.info( f"[Batch] Sending {len(batch)} jobs " f"({job_ids}) as one write ({len(combined_data)} bytes)" ) else: logger.info( f"[Job {job_ids[0]}] Sending {len(combined_data)} bytes to printer..." ) try: await self.printer.write(combined_data) for jid in job_ids: logger.info(f"[Job {jid}] Completed successfully") except PrinterUnavailableError: for jid in job_ids: logger.warning(f"[Job {jid}] Rejected: no printer connected") except PrinterDisconnectedError: for jid in job_ids: logger.error(f"[Job {jid}] Failed: printer disconnected during print") except asyncio.CancelledError: raise except Exception as e: logger.error(f"[Batch sender] Unexpected error: {e}") async def start(self): """Start the bridge server.""" self._batch_task = asyncio.create_task(self._batch_sender()) self.server = await asyncio.start_server( self.handle_client, self.host, self.port ) addrs = ", ".join(str(sock.getsockname()) for sock in self.server.sockets) logger.info(f"Bridge server listening on {addrs}") async with self.server: await self.server.serve_forever() async def stop(self): """Stop the bridge server.""" if self._batch_task: self._batch_task.cancel() try: await self._batch_task except asyncio.CancelledError: pass if self.server: self.server.close() await self.server.wait_closed() async def main(): parser = argparse.ArgumentParser( description="Bridge a USB Brother QL printer to the network" ) parser.add_argument( "-p", "--port", type=int, default=config.TCP_PORT, help=f"TCP port to listen on (default: {config.TCP_PORT})" ) parser.add_argument( "-H", "--host", default=config.TCP_HOST, help=f"Host/IP to bind to (default: {config.TCP_HOST})" ) parser.add_argument( "-d", "--device", default=config.USB_DEVICE, help="USB device (e.g., usb://0x04f9:0x2028). Auto-detect if not specified" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() # Setup logging log_level = logging.DEBUG if args.verbose else getattr(logging, config.LOG_LEVEL) logging.basicConfig( level=log_level, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) # Initialize printer (never exits — printer may appear later) printer = USBPrinter(args.device) if printer.connect(): logger.info("Printer ready") else: logger.warning("No printer detected at startup — will retry on first print job") # Create and run bridge bridge = PrinterBridge(printer, args.host, args.port) # Handle shutdown signals loop = asyncio.get_event_loop() def shutdown(): logger.info("Shutting down...") loop.create_task(bridge.stop()) for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, shutdown) try: await bridge.start() except asyncio.CancelledError: pass finally: printer.close() logger.info("Bridge stopped") if __name__ == "__main__": asyncio.run(main())