#!/usr/bin/env python3 """ Brother QL Bridge - USB to Network Bridge Exposes a USB-connected Brother QL printer (e.g., QL-570) on the network so it can be used with network-based printing tools like InvenTree's Brother plugin. """ import argparse import asyncio import logging import signal import sys import usb.core import usb.util import config # Brother printer USB vendor ID BROTHER_VENDOR_ID = 0x04F9 # Known QL printer product IDs PRODUCT_IDS = { 0x2015: "QL-500", 0x2016: "QL-550", 0x2027: "QL-560", 0x2028: "QL-570", 0x2029: "QL-580N", 0x201B: "QL-650TD", 0x2042: "QL-700", 0x2043: "QL-710W", 0x2044: "QL-720NW", 0x209B: "QL-800", 0x209C: "QL-810W", 0x209D: "QL-820NWB", 0x2020: "QL-1050", 0x202A: "QL-1060N", } logger = logging.getLogger("brother-ql-bridge") class USBPrinter: """Handles USB communication with the Brother QL printer.""" def __init__(self, device=None): self.dev = None self.ep_out = None self.ep_in = None self._lock = asyncio.Lock() if device: # Parse device string like "usb://0x04f9:0x2028" if device.startswith("usb://"): parts = device[6:].split(":") vendor = int(parts[0], 16) product = int(parts[1].split("/")[0], 16) self.dev = usb.core.find(idVendor=vendor, idProduct=product) else: # Auto-detect Brother QL printer self.dev = usb.core.find(idVendor=BROTHER_VENDOR_ID) if self.dev is None: raise RuntimeError("No Brother QL printer found on USB") product_name = PRODUCT_IDS.get(self.dev.idProduct, f"Unknown (0x{self.dev.idProduct:04x})") logger.info(f"Found printer: {product_name}") def connect(self): """Initialize USB connection to the printer.""" # Detach kernel driver if necessary try: if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) logger.debug("Detached kernel driver") except (usb.core.USBError, NotImplementedError): pass # Set configuration try: self.dev.set_configuration() except usb.core.USBError: pass # Already configured # Get endpoints cfg = self.dev.get_active_configuration() intf = cfg[(0, 0)] self.ep_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT, ) self.ep_in = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN, ) if self.ep_out is None: raise RuntimeError("Could not find USB OUT endpoint") logger.info("USB connection established") def write_sync(self, data: bytes) -> int: """Write data to the printer synchronously with proper timeout.""" # Write in chunks with longer timeout (15 seconds) chunk_size = 32768 total_written = 0 for i in range(0, len(data), chunk_size): chunk = data[i : i + chunk_size] written = self.ep_out.write(chunk, timeout=15000) total_written += written return total_written async def write(self, data: bytes) -> int: """Write data to the printer asynchronously.""" async with self._lock: try: return await asyncio.get_event_loop().run_in_executor( None, self.write_sync, data ) except usb.core.USBError as e: logger.error(f"USB write error: {e}") raise def close(self): """Release USB resources.""" if self.dev: usb.util.dispose_resources(self.dev) class PrinterBridge: """TCP to USB bridge server.""" def __init__(self, printer: USBPrinter, host: str, port: int): self.printer = printer self.host = host self.port = port self.server = None self.job_count = 0 async def handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ): """Handle incoming print job from TCP client.""" addr = writer.get_extra_info("peername") self.job_count += 1 job_id = self.job_count logger.info(f"[Job {job_id}] Connection from {addr}") try: # Buffer entire job first (Brother raster protocol works best this way) chunks = [] while True: data = await reader.read(4096) if not data: break chunks.append(data) job_data = b"".join(chunks) logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, sending to printer...") # Send complete job to printer await self.printer.write(job_data) logger.info(f"[Job {job_id}] Completed successfully") except Exception as e: logger.error(f"[Job {job_id}] Error: {e}") finally: writer.close() await writer.wait_closed() async def start(self): """Start the bridge server.""" self.server = await asyncio.start_server( self.handle_client, self.host, self.port ) addrs = ", ".join(str(sock.getsockname()) for sock in self.server.sockets) logger.info(f"Bridge server listening on {addrs}") async with self.server: await self.server.serve_forever() async def stop(self): """Stop the bridge server.""" if self.server: self.server.close() await self.server.wait_closed() async def main(): parser = argparse.ArgumentParser( description="Bridge a USB Brother QL printer to the network" ) parser.add_argument( "-p", "--port", type=int, default=config.TCP_PORT, help=f"TCP port to listen on (default: {config.TCP_PORT})" ) parser.add_argument( "-H", "--host", default=config.TCP_HOST, help=f"Host/IP to bind to (default: {config.TCP_HOST})" ) parser.add_argument( "-d", "--device", default=config.USB_DEVICE, help="USB device (e.g., usb://0x04f9:0x2028). Auto-detect if not specified" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose logging" ) args = parser.parse_args() # Setup logging log_level = logging.DEBUG if args.verbose else getattr(logging, config.LOG_LEVEL) logging.basicConfig( level=log_level, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) # Initialize printer try: printer = USBPrinter(args.device) printer.connect() except RuntimeError as e: logger.error(f"Failed to initialize printer: {e}") sys.exit(1) # Create and run bridge bridge = PrinterBridge(printer, args.host, args.port) # Handle shutdown signals loop = asyncio.get_event_loop() def shutdown(): logger.info("Shutting down...") loop.create_task(bridge.stop()) for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, shutdown) try: await bridge.start() except asyncio.CancelledError: pass finally: printer.close() logger.info("Bridge stopped") if __name__ == "__main__": asyncio.run(main())