Initial commit: Brother QL USB-to-Network Bridge
- TCP server exposing USB Brother QL printers on port 9100 - Supports all Brother QL series label printers - Async architecture for handling concurrent print jobs - Raspberry Pi deployment script and systemd service - Documentation for setup and InvenTree integration
This commit is contained in:
261
bridge.py
Normal file
261
bridge.py
Normal file
@@ -0,0 +1,261 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Brother QL Bridge - USB to Network Bridge
|
||||
|
||||
Exposes a USB-connected Brother QL printer (e.g., QL-570) on the network
|
||||
so it can be used with network-based printing tools like InvenTree's Brother plugin.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
import config
|
||||
|
||||
# Brother printer USB vendor ID
|
||||
BROTHER_VENDOR_ID = 0x04F9
|
||||
|
||||
# Known QL printer product IDs
|
||||
PRODUCT_IDS = {
|
||||
0x2015: "QL-500",
|
||||
0x2016: "QL-550",
|
||||
0x2027: "QL-560",
|
||||
0x2028: "QL-570",
|
||||
0x2029: "QL-580N",
|
||||
0x201B: "QL-650TD",
|
||||
0x2042: "QL-700",
|
||||
0x2043: "QL-710W",
|
||||
0x2044: "QL-720NW",
|
||||
0x209B: "QL-800",
|
||||
0x209C: "QL-810W",
|
||||
0x209D: "QL-820NWB",
|
||||
0x2020: "QL-1050",
|
||||
0x202A: "QL-1060N",
|
||||
}
|
||||
|
||||
logger = logging.getLogger("brother-ql-bridge")
|
||||
|
||||
|
||||
class USBPrinter:
|
||||
"""Handles USB communication with the Brother QL printer."""
|
||||
|
||||
def __init__(self, device=None):
|
||||
self.dev = None
|
||||
self.ep_out = None
|
||||
self.ep_in = None
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
if device:
|
||||
# Parse device string like "usb://0x04f9:0x2028"
|
||||
if device.startswith("usb://"):
|
||||
parts = device[6:].split(":")
|
||||
vendor = int(parts[0], 16)
|
||||
product = int(parts[1].split("/")[0], 16)
|
||||
self.dev = usb.core.find(idVendor=vendor, idProduct=product)
|
||||
else:
|
||||
# Auto-detect Brother QL printer
|
||||
self.dev = usb.core.find(idVendor=BROTHER_VENDOR_ID)
|
||||
|
||||
if self.dev is None:
|
||||
raise RuntimeError("No Brother QL printer found on USB")
|
||||
|
||||
product_name = PRODUCT_IDS.get(self.dev.idProduct, f"Unknown (0x{self.dev.idProduct:04x})")
|
||||
logger.info(f"Found printer: {product_name}")
|
||||
|
||||
def connect(self):
|
||||
"""Initialize USB connection to the printer."""
|
||||
# Detach kernel driver if necessary
|
||||
try:
|
||||
if self.dev.is_kernel_driver_active(0):
|
||||
self.dev.detach_kernel_driver(0)
|
||||
logger.debug("Detached kernel driver")
|
||||
except (usb.core.USBError, NotImplementedError):
|
||||
pass
|
||||
|
||||
# Set configuration
|
||||
try:
|
||||
self.dev.set_configuration()
|
||||
except usb.core.USBError:
|
||||
pass # Already configured
|
||||
|
||||
# Get endpoints
|
||||
cfg = self.dev.get_active_configuration()
|
||||
intf = cfg[(0, 0)]
|
||||
|
||||
self.ep_out = usb.util.find_descriptor(
|
||||
intf,
|
||||
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
|
||||
== usb.util.ENDPOINT_OUT,
|
||||
)
|
||||
self.ep_in = usb.util.find_descriptor(
|
||||
intf,
|
||||
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
|
||||
== usb.util.ENDPOINT_IN,
|
||||
)
|
||||
|
||||
if self.ep_out is None:
|
||||
raise RuntimeError("Could not find USB OUT endpoint")
|
||||
|
||||
logger.info("USB connection established")
|
||||
|
||||
def write_sync(self, data: bytes) -> int:
|
||||
"""Write data to the printer synchronously with proper timeout."""
|
||||
# Write in chunks with longer timeout (15 seconds)
|
||||
chunk_size = 32768
|
||||
total_written = 0
|
||||
for i in range(0, len(data), chunk_size):
|
||||
chunk = data[i : i + chunk_size]
|
||||
written = self.ep_out.write(chunk, timeout=15000)
|
||||
total_written += written
|
||||
return total_written
|
||||
|
||||
async def write(self, data: bytes) -> int:
|
||||
"""Write data to the printer asynchronously."""
|
||||
async with self._lock:
|
||||
try:
|
||||
return await asyncio.get_event_loop().run_in_executor(
|
||||
None, self.write_sync, data
|
||||
)
|
||||
except usb.core.USBError as e:
|
||||
logger.error(f"USB write error: {e}")
|
||||
raise
|
||||
|
||||
def close(self):
|
||||
"""Release USB resources."""
|
||||
if self.dev:
|
||||
usb.util.dispose_resources(self.dev)
|
||||
|
||||
|
||||
class PrinterBridge:
|
||||
"""TCP to USB bridge server."""
|
||||
|
||||
def __init__(self, printer: USBPrinter, host: str, port: int):
|
||||
self.printer = printer
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.server = None
|
||||
self.job_count = 0
|
||||
|
||||
async def handle_client(
|
||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
):
|
||||
"""Handle incoming print job from TCP client."""
|
||||
addr = writer.get_extra_info("peername")
|
||||
self.job_count += 1
|
||||
job_id = self.job_count
|
||||
logger.info(f"[Job {job_id}] Connection from {addr}")
|
||||
|
||||
try:
|
||||
# Buffer entire job first (Brother raster protocol works best this way)
|
||||
chunks = []
|
||||
while True:
|
||||
data = await reader.read(4096)
|
||||
if not data:
|
||||
break
|
||||
chunks.append(data)
|
||||
|
||||
job_data = b"".join(chunks)
|
||||
logger.info(f"[Job {job_id}] Received {len(job_data)} bytes, sending to printer...")
|
||||
|
||||
# Send complete job to printer
|
||||
await self.printer.write(job_data)
|
||||
logger.info(f"[Job {job_id}] Completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Job {job_id}] Error: {e}")
|
||||
|
||||
finally:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
async def start(self):
|
||||
"""Start the bridge server."""
|
||||
self.server = await asyncio.start_server(
|
||||
self.handle_client, self.host, self.port
|
||||
)
|
||||
|
||||
addrs = ", ".join(str(sock.getsockname()) for sock in self.server.sockets)
|
||||
logger.info(f"Bridge server listening on {addrs}")
|
||||
|
||||
async with self.server:
|
||||
await self.server.serve_forever()
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the bridge server."""
|
||||
if self.server:
|
||||
self.server.close()
|
||||
await self.server.wait_closed()
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Bridge a USB Brother QL printer to the network"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p", "--port",
|
||||
type=int,
|
||||
default=config.TCP_PORT,
|
||||
help=f"TCP port to listen on (default: {config.TCP_PORT})"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-H", "--host",
|
||||
default=config.TCP_HOST,
|
||||
help=f"Host/IP to bind to (default: {config.TCP_HOST})"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d", "--device",
|
||||
default=config.USB_DEVICE,
|
||||
help="USB device (e.g., usb://0x04f9:0x2028). Auto-detect if not specified"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--verbose",
|
||||
action="store_true",
|
||||
help="Enable verbose logging"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Setup logging
|
||||
log_level = logging.DEBUG if args.verbose else getattr(logging, config.LOG_LEVEL)
|
||||
logging.basicConfig(
|
||||
level=log_level,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
# Initialize printer
|
||||
try:
|
||||
printer = USBPrinter(args.device)
|
||||
printer.connect()
|
||||
except RuntimeError as e:
|
||||
logger.error(f"Failed to initialize printer: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Create and run bridge
|
||||
bridge = PrinterBridge(printer, args.host, args.port)
|
||||
|
||||
# Handle shutdown signals
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def shutdown():
|
||||
logger.info("Shutting down...")
|
||||
loop.create_task(bridge.stop())
|
||||
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, shutdown)
|
||||
|
||||
try:
|
||||
await bridge.start()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
printer.close()
|
||||
logger.info("Bridge stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user