diff --git a/bridge.py b/bridge.py index 01d8651..3f669fe 100644 --- a/bridge.py +++ b/bridge.py @@ -41,94 +41,147 @@ PRODUCT_IDS = { logger = logging.getLogger("brother-ql-bridge") +class PrinterUnavailableError(Exception): + """No printer found on USB.""" + + +class PrinterDisconnectedError(Exception): + """Printer was lost during a write operation.""" + + class USBPrinter: """Handles USB communication with the Brother QL printer.""" def __init__(self, device=None): + self._device_spec = device self.dev = None self.ep_out = None self.ep_in = None + self._connected = False self._lock = asyncio.Lock() - if device: - # Parse device string like "usb://0x04f9:0x2028" - if device.startswith("usb://"): - parts = device[6:].split(":") + @property + def is_connected(self) -> bool: + return self._connected and self.dev is not None and self.ep_out is not None + + def _find_device(self) -> bool: + """Attempt to find the USB device. Returns True if found.""" + self.dev = None + + if self._device_spec: + if self._device_spec.startswith("usb://"): + parts = self._device_spec[6:].split(":") vendor = int(parts[0], 16) product = int(parts[1].split("/")[0], 16) self.dev = usb.core.find(idVendor=vendor, idProduct=product) else: - # Auto-detect Brother QL printer self.dev = usb.core.find(idVendor=BROTHER_VENDOR_ID) if self.dev is None: - raise RuntimeError("No Brother QL printer found on USB") + return False product_name = PRODUCT_IDS.get(self.dev.idProduct, f"Unknown (0x{self.dev.idProduct:04x})") logger.info(f"Found printer: {product_name}") + return True + + def connect(self) -> bool: + """Initialize USB connection to the printer. Returns True on success.""" + self.disconnect() + + if not self._find_device(): + logger.warning("No Brother QL printer found on USB") + return False - def connect(self): - """Initialize USB connection to the printer.""" - # Detach kernel driver if necessary try: - if self.dev.is_kernel_driver_active(0): - self.dev.detach_kernel_driver(0) - logger.debug("Detached kernel driver") - except (usb.core.USBError, NotImplementedError): - pass + # Detach kernel driver if necessary + try: + if self.dev.is_kernel_driver_active(0): + self.dev.detach_kernel_driver(0) + logger.debug("Detached kernel driver") + except (usb.core.USBError, NotImplementedError): + pass - # Set configuration - try: - self.dev.set_configuration() - except usb.core.USBError: - pass # Already configured + # Set configuration + try: + self.dev.set_configuration() + except usb.core.USBError: + pass # Already configured - # Get endpoints - cfg = self.dev.get_active_configuration() - intf = cfg[(0, 0)] + # Get endpoints + cfg = self.dev.get_active_configuration() + intf = cfg[(0, 0)] - self.ep_out = usb.util.find_descriptor( - intf, - custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) - == usb.util.ENDPOINT_OUT, - ) - self.ep_in = usb.util.find_descriptor( - intf, - custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) - == usb.util.ENDPOINT_IN, - ) + self.ep_out = usb.util.find_descriptor( + intf, + custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) + == usb.util.ENDPOINT_OUT, + ) + self.ep_in = usb.util.find_descriptor( + intf, + custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) + == usb.util.ENDPOINT_IN, + ) - if self.ep_out is None: - raise RuntimeError("Could not find USB OUT endpoint") + if self.ep_out is None: + logger.error("Could not find USB OUT endpoint") + self.disconnect() + return False - logger.info("USB connection established") + self._connected = True + logger.info("USB connection established") + return True + + except usb.core.USBError as e: + logger.error(f"USB connection failed: {e}") + self.disconnect() + return False + + def disconnect(self): + """Release USB resources. Safe to call repeatedly.""" + self._connected = False + self.ep_out = None + self.ep_in = None + if self.dev is not None: + try: + usb.util.dispose_resources(self.dev) + except Exception: + pass + self.dev = None + + def ensure_connected(self) -> bool: + """Return True if connected, attempting reconnect if needed.""" + if self.is_connected: + return True + return self.connect() def write_sync(self, data: bytes) -> int: """Write data to the printer synchronously with proper timeout.""" - # Write in chunks with longer timeout (15 seconds) - chunk_size = 32768 - total_written = 0 - for i in range(0, len(data), chunk_size): - chunk = data[i : i + chunk_size] - written = self.ep_out.write(chunk, timeout=15000) - total_written += written - return total_written + if not self.ensure_connected(): + raise PrinterUnavailableError("No printer available") + + try: + chunk_size = 32768 + total_written = 0 + for i in range(0, len(data), chunk_size): + chunk = data[i : i + chunk_size] + written = self.ep_out.write(chunk, timeout=15000) + total_written += written + return total_written + except usb.core.USBError as e: + logger.error(f"USB write error: {e}") + self.disconnect() + raise PrinterDisconnectedError(f"Printer disconnected during write: {e}") from e async def write(self, data: bytes) -> int: """Write data to the printer asynchronously.""" async with self._lock: - try: - return await asyncio.get_event_loop().run_in_executor( - None, self.write_sync, data - ) - except usb.core.USBError as e: - logger.error(f"USB write error: {e}") - raise + return await asyncio.get_event_loop().run_in_executor( + None, self.write_sync, data + ) def close(self): """Release USB resources.""" - if self.dev: - usb.util.dispose_resources(self.dev) + self.disconnect() class PrinterBridge: @@ -140,6 +193,9 @@ class PrinterBridge: self.port = port self.server = None self.job_count = 0 + self._job_queue = asyncio.Queue() + self._batch_timeout = 0.5 + self._batch_task = None async def handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter @@ -160,21 +216,76 @@ class PrinterBridge: chunks.append(data) job_data = b"".join(chunks) - logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, sending to printer...") - # Send complete job to printer - await self.printer.write(job_data) - logger.info(f"[Job {job_id}] Completed successfully") + if len(job_data) == 0: + logger.warning(f"[Job {job_id}] Skipped: empty job (0 bytes)") + return + + logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, queuing for print...") + await self._job_queue.put((job_id, job_data)) except Exception as e: - logger.error(f"[Job {job_id}] Error: {e}") + logger.error(f"[Job {job_id}] Error receiving data: {e}") finally: writer.close() await writer.wait_closed() + async def _batch_sender(self): + """Background coroutine that drains the job queue in batches.""" + while True: + try: + # Wait for the first job + first_job = await self._job_queue.get() + batch = [first_job] + + # Collect more jobs within the batch window + deadline = asyncio.get_event_loop().time() + self._batch_timeout + while True: + remaining = deadline - asyncio.get_event_loop().time() + if remaining <= 0: + break + try: + job = await asyncio.wait_for( + self._job_queue.get(), timeout=remaining + ) + batch.append(job) + except asyncio.TimeoutError: + break + + job_ids = [jid for jid, _ in batch] + combined_data = b"".join(data for _, data in batch) + + if len(batch) > 1: + logger.info( + f"[Batch] Sending {len(batch)} jobs " + f"({job_ids}) as one write ({len(combined_data)} bytes)" + ) + else: + logger.info( + f"[Job {job_ids[0]}] Sending {len(combined_data)} bytes to printer..." + ) + + try: + await self.printer.write(combined_data) + for jid in job_ids: + logger.info(f"[Job {jid}] Completed successfully") + except PrinterUnavailableError: + for jid in job_ids: + logger.warning(f"[Job {jid}] Rejected: no printer connected") + except PrinterDisconnectedError: + for jid in job_ids: + logger.error(f"[Job {jid}] Failed: printer disconnected during print") + + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f"[Batch sender] Unexpected error: {e}") + async def start(self): """Start the bridge server.""" + self._batch_task = asyncio.create_task(self._batch_sender()) + self.server = await asyncio.start_server( self.handle_client, self.host, self.port ) @@ -187,6 +298,12 @@ class PrinterBridge: async def stop(self): """Stop the bridge server.""" + if self._batch_task: + self._batch_task.cancel() + try: + await self._batch_task + except asyncio.CancelledError: + pass if self.server: self.server.close() await self.server.wait_closed() @@ -227,13 +344,12 @@ async def main(): datefmt="%Y-%m-%d %H:%M:%S" ) - # Initialize printer - try: - printer = USBPrinter(args.device) - printer.connect() - except RuntimeError as e: - logger.error(f"Failed to initialize printer: {e}") - sys.exit(1) + # Initialize printer (never exits — printer may appear later) + printer = USBPrinter(args.device) + if printer.connect(): + logger.info("Printer ready") + else: + logger.warning("No printer detected at startup — will retry on first print job") # Create and run bridge bridge = PrinterBridge(printer, args.host, args.port)