Initial release: RMA Automation plugin with repair parts tracking

Features:
- Automatic stock status updates based on return order outcomes
- Repair parts allocation and consumption tracking
- Configurable status mappings for each outcome type
- React-based UI panel for managing repair parts
- Location display for easy part retrieval
- Available stock filtering (excludes allocated items)
This commit is contained in:
Tim Hadwen
2026-01-18 20:56:07 +10:00
commit 2477fd1539
13 changed files with 1854 additions and 0 deletions

48
.gitignore vendored Normal file
View File

@@ -0,0 +1,48 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
venv/
ENV/
env/
.venv/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
# Local settings
.claude/
*.local.json
# OS files
.DS_Store
Thumbs.db

145
README.md Normal file
View File

@@ -0,0 +1,145 @@
# InvenTree RMA Automation Plugin
An InvenTree plugin that automates stock item status changes when return orders are completed, with support for tracking and consuming repair parts.
## Features
- **Automatic Status Updates**: Updates stock item status based on return order line item outcomes
- **Repair Parts Tracking**: Allocate replacement parts to repairs and consume them on order completion
- **Configurable Status Mapping**: Customize the stock status for each outcome type
- **Customer Reassignment**: Optionally reassign repaired/returned items to the original customer
- **Audit Trail**: Tracking notes added to stock items for full history
## Repair Parts Panel
The plugin adds a "Repair Parts" panel to Return Order detail pages where you can:
- View allocated repair/replacement parts
- Add new part allocations with part search and stock selection
- See the location of parts for easy retrieval
- Parts are automatically consumed when the return order is completed
## Outcome to Stock Status Mapping
| Line Item Outcome | Default Stock Status | Description |
|-------------------|---------------------|-------------|
| Return | OK (10) | Item returned as-is, ready for stock |
| Repair | OK (10) | Item repaired, ready for stock |
| Replace | Attention (50) | Original item - needs processing |
| Refund | Attention (50) | Item kept, needs review |
| Reject | Rejected (65) | Item rejected |
All mappings are configurable in the plugin settings.
## Installation
### From PyPI (when published)
```bash
pip install inventree-rma-plugin
```
### From Source
```bash
pip install -e /path/to/inventree-rma-plugin
```
### In Docker Environment
Add to your InvenTree `plugins.txt`:
```
inventree-rma-plugin
```
Or install from git:
```
git+https://github.com/timmyhadwen/inventree-rma-plugin.git
```
## Configuration
After installation, enable the plugin in InvenTree's admin panel under Settings > Plugins.
### Required InvenTree Settings
Ensure these settings are enabled in InvenTree:
- **ENABLE_PLUGINS_APP**: Required for the plugin's database models
- **ENABLE_PLUGINS_URL**: Required for the plugin's API endpoints
- **ENABLE_PLUGINS_INTERFACE**: Required for the Repair Parts panel UI
### Plugin Settings
| Setting | Default | Description |
|---------|---------|-------------|
| Enable Auto Status Change | True | Automatically update stock status on RO completion |
| Enable Customer Reassignment | False | Reassign repaired/returned items to original customer |
| Add Tracking Notes | True | Add notes to stock item history |
| Consume Repair Parts on Complete | True | Consume allocated repair parts when order completes |
| Status for RETURN Outcome | OK | Stock status when outcome is "Return" |
| Status for REPAIR Outcome | OK | Stock status when outcome is "Repair" |
| Status for REPLACE Outcome | Attention | Stock status when outcome is "Replace" |
| Status for REFUND Outcome | Attention | Stock status when outcome is "Refund" |
| Status for REJECT Outcome | Rejected | Stock status when outcome is "Reject" |
## Prerequisites
- InvenTree >= 0.15.0
- Return Orders feature enabled in InvenTree settings
- Event Integration enabled for plugins
## How It Works
### Status Automation
1. When a Return Order is marked as "Complete", InvenTree triggers the `returnorder.completed` event
2. The plugin listens for this event and processes each line item
3. For each line item with a defined outcome, the plugin:
- Updates the stock item's status according to the configured mapping
- Optionally reassigns the item back to the customer (for Return/Repair outcomes)
- Adds a tracking note to the stock item history
### Repair Parts Consumption
1. Before completing a return order, allocate repair/replacement parts via the "Repair Parts" panel
2. Select the line item being repaired, search for the replacement part, and choose from available stock
3. When the return order is completed, allocated parts are automatically consumed (quantity reduced)
4. A tracking entry is added to each consumed stock item
## Development
### Running Tests
```bash
python -m pytest tests/ -v
```
### Project Structure
```
inventree-rma-plugin/
├── pyproject.toml
├── README.md
├── inventree_rma_plugin/
│ ├── __init__.py
│ ├── apps.py
│ ├── api.py
│ ├── models.py
│ ├── rma_automation.py
│ ├── migrations/
│ └── static/
│ └── repair_panel.js
└── tests/
├── __init__.py
└── test_rma_automation.py
```
## License
MIT License
## Author
Timmy Hadwen (https://github.com/timmyhadwen)

View File

@@ -0,0 +1,7 @@
"""InvenTree RMA Automation Plugin."""
from inventree_rma_plugin.rma_automation import RMAAutomationPlugin
__all__ = ['RMAAutomationPlugin']
__version__ = '0.2.0'
default_app_config = 'inventree_rma_plugin.apps.RMAAutomationConfig'

153
inventree_rma_plugin/api.py Normal file
View File

@@ -0,0 +1,153 @@
"""API endpoints for the RMA Automation plugin."""
from rest_framework import serializers, generics, permissions
from rest_framework.response import Response
from inventree_rma_plugin.models import RepairStockAllocation
class RepairAllocationSerializer(serializers.ModelSerializer):
"""Serializer for RepairStockAllocation model."""
# Read-only fields for display
stock_item_detail = serializers.SerializerMethodField(read_only=True)
line_item_detail = serializers.SerializerMethodField(read_only=True)
return_order_line_detail = serializers.SerializerMethodField(read_only=True)
return_order_id = serializers.SerializerMethodField(read_only=True)
class Meta:
"""Serializer metadata."""
model = RepairStockAllocation
fields = [
'id',
'return_order_line',
'return_order_line_detail',
'line_item_detail',
'return_order_id',
'stock_item',
'stock_item_detail',
'quantity',
'consumed',
'created',
'notes',
]
read_only_fields = ['id', 'consumed', 'created']
def get_stock_item_detail(self, obj):
"""Get stock item details for display."""
stock_item = obj.stock_item
return {
'pk': stock_item.pk,
'part': stock_item.part.pk,
'part_name': stock_item.part.name,
'quantity': float(stock_item.quantity),
'serial': stock_item.serial,
'batch': stock_item.batch,
'location': stock_item.location.pk if stock_item.location else None,
'location_name': str(stock_item.location) if stock_item.location else None,
}
def get_line_item_detail(self, obj):
"""Get the line item details (the item being repaired)."""
line = obj.return_order_line
item = line.item if line else None
if not item:
return {
'pk': None,
'part_name': 'Unknown',
'serial': None,
}
return {
'pk': item.pk,
'part_name': item.part.name if item.part else 'Unknown',
'serial': item.serial,
'batch': item.batch,
}
def get_return_order_line_detail(self, obj):
"""Get return order line item details for display."""
line = obj.return_order_line
return {
'pk': line.pk,
'item_pk': line.item.pk if line.item else None,
'item_name': str(line.item) if line.item else None,
}
def get_return_order_id(self, obj):
"""Get the parent return order ID."""
return obj.return_order_line.order.pk
def validate(self, data):
"""Validate the allocation data."""
stock_item = data.get('stock_item')
quantity = data.get('quantity', 1)
if stock_item:
# Check available quantity
available = stock_item.quantity
# Subtract existing allocations for this stock item
existing_allocations = RepairStockAllocation.objects.filter(
stock_item=stock_item,
consumed=False,
)
# Exclude current instance if updating
if self.instance:
existing_allocations = existing_allocations.exclude(pk=self.instance.pk)
allocated = sum(float(a.quantity) for a in existing_allocations)
available -= allocated
if quantity > available:
raise serializers.ValidationError({
'quantity': f'Only {available} available (already allocated: {allocated})',
})
return data
class RepairAllocationList(generics.ListCreateAPIView):
"""List and create repair stock allocations."""
serializer_class = RepairAllocationSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
"""Filter allocations based on query parameters."""
queryset = RepairStockAllocation.objects.all()
# Filter by return order
return_order = self.request.query_params.get('return_order')
if return_order:
queryset = queryset.filter(return_order_line__order__pk=return_order)
# Filter by return order line
return_order_line = self.request.query_params.get('return_order_line')
if return_order_line:
queryset = queryset.filter(return_order_line__pk=return_order_line)
# Filter by consumed status
consumed = self.request.query_params.get('consumed')
if consumed is not None:
consumed_bool = consumed.lower() in ('true', '1', 'yes')
queryset = queryset.filter(consumed=consumed_bool)
return queryset.select_related(
'stock_item',
'stock_item__part',
'stock_item__location',
'return_order_line',
'return_order_line__order',
'return_order_line__item',
'return_order_line__item__part',
)
class RepairAllocationDetail(generics.RetrieveUpdateDestroyAPIView):
"""Retrieve, update, or delete a repair stock allocation."""
serializer_class = RepairAllocationSerializer
permission_classes = [permissions.IsAuthenticated]
queryset = RepairStockAllocation.objects.all()

View File

@@ -0,0 +1,11 @@
"""Django app configuration for RMA Automation plugin."""
from django.apps import AppConfig
class RMAAutomationConfig(AppConfig):
"""App configuration for the RMA Automation plugin."""
name = 'inventree_rma_plugin'
verbose_name = 'RMA Automation'
default_auto_field = 'django.db.models.AutoField'

View File

@@ -0,0 +1,33 @@
# Generated by Django 4.2.26 on 2026-01-18 09:34
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('order', '0114_purchaseorderextraline_project_code_and_more'),
('stock', '0116_alter_stockitem_link'),
]
operations = [
migrations.CreateModel(
name='RepairStockAllocation',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('quantity', models.DecimalField(decimal_places=5, default=1, help_text='Quantity of stock allocated', max_digits=15)),
('consumed', models.BooleanField(default=False, help_text='Whether this allocation has been consumed')),
('created', models.DateTimeField(auto_now_add=True, help_text='When this allocation was created')),
('notes', models.CharField(blank=True, default='', help_text='Optional notes about this allocation', max_length=500)),
('return_order_line', models.ForeignKey(help_text='The return order line item this allocation is for', on_delete=django.db.models.deletion.CASCADE, related_name='repair_allocations', to='order.returnorderlineitem')),
('stock_item', models.ForeignKey(help_text='The stock item being allocated for repair', on_delete=django.db.models.deletion.CASCADE, related_name='repair_allocations', to='stock.stockitem')),
],
options={
'verbose_name': 'Repair Stock Allocation',
'verbose_name_plural': 'Repair Stock Allocations',
},
),
]

View File

@@ -0,0 +1,85 @@
"""Database models for the RMA Automation plugin."""
from django.db import models
class RepairStockAllocation(models.Model):
"""Tracks stock items allocated for repair work on return order line items.
When a return order line item requires repair, stock items (replacement parts)
can be allocated to it. These are consumed when the return order is completed.
"""
return_order_line = models.ForeignKey(
'order.ReturnOrderLineItem',
on_delete=models.CASCADE,
related_name='repair_allocations',
help_text='The return order line item this allocation is for',
)
stock_item = models.ForeignKey(
'stock.StockItem',
on_delete=models.CASCADE,
related_name='repair_allocations',
help_text='The stock item being allocated for repair',
)
quantity = models.DecimalField(
max_digits=15,
decimal_places=5,
default=1,
help_text='Quantity of stock allocated',
)
consumed = models.BooleanField(
default=False,
help_text='Whether this allocation has been consumed',
)
created = models.DateTimeField(
auto_now_add=True,
help_text='When this allocation was created',
)
notes = models.CharField(
max_length=500,
blank=True,
default='',
help_text='Optional notes about this allocation',
)
class Meta:
"""Model metadata."""
app_label = 'inventree_rma_plugin'
verbose_name = 'Repair Stock Allocation'
verbose_name_plural = 'Repair Stock Allocations'
def __str__(self):
"""String representation."""
return f'{self.quantity} x {self.stock_item} for {self.return_order_line}'
@property
def return_order(self):
"""Get the parent return order."""
return self.return_order_line.order
def clean(self):
"""Validate the allocation."""
from django.core.exceptions import ValidationError
# Check stock item has sufficient quantity
if self.stock_item and not self.consumed:
available = self.stock_item.quantity
# Subtract other allocations for this stock item (excluding self)
other_allocations = RepairStockAllocation.objects.filter(
stock_item=self.stock_item,
consumed=False,
).exclude(pk=self.pk)
allocated = sum(a.quantity for a in other_allocations)
available -= allocated
if self.quantity > available:
raise ValidationError({
'quantity': f'Only {available} available (already allocated: {allocated})',
})

View File

@@ -0,0 +1,448 @@
"""RMA Automation Plugin for InvenTree.
Automates stock item status changes when return orders are completed,
based on the outcome set for each return order line item.
Also provides repair parts tracking - allocate stock items to be consumed
when repairs are completed.
"""
from typing import Optional
import structlog
from plugin import InvenTreePlugin
from plugin.mixins import AppMixin, EventMixin, SettingsMixin, UrlsMixin, UserInterfaceMixin
logger = structlog.get_logger('inventree')
class RMAAutomationPlugin(AppMixin, UserInterfaceMixin, UrlsMixin, EventMixin, SettingsMixin, InvenTreePlugin):
"""Plugin that automates actions on return order line items when a return order is completed.
When a return order is marked as complete, this plugin:
1. Iterates through all line items
2. Updates each stock item's status based on the line item outcome
3. Optionally reassigns stock items back to the customer
4. Consumes any allocated repair parts
Outcome-to-Status Mapping:
- RETURN (20): Stock status -> OK (ready to return to customer)
- REPAIR (30): Stock status -> OK (repaired and ready)
- REPLACE (40): Stock status -> configurable (original item handling)
- REFUND (50): Stock status -> ATTENTION (item needs review)
- REJECT (60): Stock status -> REJECTED
"""
NAME = 'RMA Automation'
SLUG = 'rma-automation'
TITLE = 'RMA Workflow Automation'
DESCRIPTION = 'Automates stock status changes and repair parts tracking for return orders'
VERSION = '0.2.0'
AUTHOR = 'Timmy Hadwen'
# Status code constants (from InvenTree)
# ReturnOrderLineStatus
OUTCOME_PENDING = 10
OUTCOME_RETURN = 20
OUTCOME_REPAIR = 30
OUTCOME_REPLACE = 40
OUTCOME_REFUND = 50
OUTCOME_REJECT = 60
# StockStatus
STOCK_OK = 10
STOCK_ATTENTION = 50
STOCK_DAMAGED = 55
STOCK_DESTROYED = 60
STOCK_REJECTED = 65
STOCK_LOST = 70
STOCK_QUARANTINED = 75
STOCK_RETURNED = 85
# All available stock status choices
ALL_STATUS_CHOICES = [
('10', 'OK'),
('50', 'Attention'),
('55', 'Damaged'),
('60', 'Destroyed'),
('65', 'Rejected'),
('70', 'Lost'),
('75', 'Quarantined'),
('85', 'Returned'),
]
SETTINGS = {
'ENABLE_AUTO_STATUS': {
'name': 'Enable Auto Status Change',
'description': 'Automatically update stock item status when return order is completed',
'validator': bool,
'default': True,
},
'ENABLE_CUSTOMER_REASSIGN': {
'name': 'Enable Customer Reassignment',
'description': 'Reassign repaired/returned items back to the original customer',
'validator': bool,
'default': False,
},
'ADD_TRACKING_NOTES': {
'name': 'Add Tracking Notes',
'description': 'Add tracking notes to stock items when status changes',
'validator': bool,
'default': True,
},
'CONSUME_REPAIR_PARTS': {
'name': 'Consume Repair Parts on Complete',
'description': 'Automatically consume allocated repair parts when return order is completed',
'validator': bool,
'default': True,
},
'STATUS_FOR_RETURN': {
'name': 'Status for RETURN Outcome',
'description': 'Stock status to set when line item outcome is RETURN',
'default': '10',
'choices': ALL_STATUS_CHOICES,
},
'STATUS_FOR_REPAIR': {
'name': 'Status for REPAIR Outcome',
'description': 'Stock status to set when line item outcome is REPAIR',
'default': '10',
'choices': ALL_STATUS_CHOICES,
},
'STATUS_FOR_REPLACE': {
'name': 'Status for REPLACE Outcome',
'description': 'Stock status to set for original item when outcome is REPLACE',
'default': '50',
'choices': ALL_STATUS_CHOICES,
},
'STATUS_FOR_REFUND': {
'name': 'Status for REFUND Outcome',
'description': 'Stock status to set when line item outcome is REFUND',
'default': '50',
'choices': ALL_STATUS_CHOICES,
},
'STATUS_FOR_REJECT': {
'name': 'Status for REJECT Outcome',
'description': 'Stock status to set when line item outcome is REJECT',
'default': '65',
'choices': ALL_STATUS_CHOICES,
},
}
def setup_urls(self):
"""Set up URL patterns for the plugin API."""
from django.urls import path
from inventree_rma_plugin import api
return [
path('allocations/', api.RepairAllocationList.as_view(), name='repair-allocation-list'),
path('allocations/<int:pk>/', api.RepairAllocationDetail.as_view(), name='repair-allocation-detail'),
]
def get_ui_panels(self, request, context, **kwargs):
"""Return custom panels to display on Return Order pages."""
panels = []
target_model = context.get('target_model', None) if context else None
# Add repair parts panel to return order detail pages
if target_model == 'returnorder':
panels.append({
'key': 'repair-parts',
'title': 'Repair Parts',
'description': 'Stock items allocated for repairs',
'icon': 'ti:tools:outline',
'source': self.plugin_static_file('repair_panel.js:renderRepairPartsPanel'),
})
return panels
def wants_process_event(self, event: str) -> bool:
"""Determine if this plugin wants to process the given event.
Only process return order completion events.
"""
return event == 'returnorder.completed'
def process_event(self, event: str, *args, **kwargs) -> None:
"""Process a return order completion event.
Args:
event: The event name (should be 'returnorder.completed')
*args: Additional positional arguments
**kwargs: Keyword arguments containing:
- id: The primary key of the completed ReturnOrder
"""
order_id = kwargs.get('id')
if not order_id:
logger.warning('RMA Automation: No order ID provided in event kwargs')
return
logger.info('RMA Automation: Processing return order completion', order_id=order_id)
try:
# Process status changes
if self.get_setting('ENABLE_AUTO_STATUS'):
self._process_return_order(order_id)
# Consume allocated repair parts
if self.get_setting('CONSUME_REPAIR_PARTS'):
self._consume_repair_parts(order_id)
except Exception as e:
logger.error(
'RMA Automation: Error processing return order',
order_id=order_id,
error=str(e),
exc_info=True,
)
def _process_return_order(self, order_id: int) -> None:
"""Process all line items for a completed return order.
Args:
order_id: The primary key of the ReturnOrder
"""
# Import models here to avoid circular imports
from order.models import ReturnOrder
try:
return_order = ReturnOrder.objects.get(pk=order_id)
except ReturnOrder.DoesNotExist:
logger.error('RMA Automation: Return order not found', order_id=order_id)
return
customer = return_order.customer
enable_reassign = self.get_setting('ENABLE_CUSTOMER_REASSIGN')
add_notes = self.get_setting('ADD_TRACKING_NOTES')
# Process each line item
for line_item in return_order.lines.all():
self._process_line_item(
line_item,
return_order,
customer,
enable_reassign,
add_notes,
)
def _process_line_item(
self,
line_item,
return_order,
customer,
enable_reassign: bool,
add_notes: bool,
) -> None:
"""Process a single return order line item.
Args:
line_item: The ReturnOrderLineItem instance
return_order: The parent ReturnOrder
customer: The customer Company (may be None)
enable_reassign: Whether to reassign items to customer
add_notes: Whether to add tracking notes
"""
stock_item = line_item.item
if not stock_item:
logger.warning(
'RMA Automation: Line item has no stock item',
line_item_id=line_item.pk,
)
return
outcome = line_item.outcome
new_status = self._get_status_for_outcome(outcome)
if new_status is None:
logger.debug(
'RMA Automation: No status change for outcome',
outcome=outcome,
stock_item_id=stock_item.pk,
)
return
# Determine if we should reassign to customer
should_reassign = (
enable_reassign
and customer
and outcome in (self.OUTCOME_RETURN, self.OUTCOME_REPAIR)
)
# Build the tracking note
note = self._build_tracking_note(outcome, return_order, new_status, line_item) if add_notes else None
# Update the stock item
self._update_stock_item(
stock_item,
new_status,
customer if should_reassign else None,
note,
)
logger.info(
'RMA Automation: Updated stock item',
stock_item_id=stock_item.pk,
outcome=outcome,
new_status=new_status,
reassigned_to_customer=should_reassign,
)
def _get_status_for_outcome(self, outcome: int) -> Optional[int]:
"""Get the stock status to set based on line item outcome.
Args:
outcome: The ReturnOrderLineStatus value
Returns:
The StockStatus value to set, or None if no change needed
"""
status_mapping = {
self.OUTCOME_RETURN: self.get_setting('STATUS_FOR_RETURN'),
self.OUTCOME_REPAIR: self.get_setting('STATUS_FOR_REPAIR'),
self.OUTCOME_REPLACE: self.get_setting('STATUS_FOR_REPLACE'),
self.OUTCOME_REFUND: self.get_setting('STATUS_FOR_REFUND'),
self.OUTCOME_REJECT: self.get_setting('STATUS_FOR_REJECT'),
}
status = status_mapping.get(outcome)
if status is not None:
return int(status)
return None
def _build_tracking_note(self, outcome: int, return_order, new_status: int, line_item) -> str:
"""Build a tracking note for the stock item.
Args:
outcome: The line item outcome
return_order: The ReturnOrder instance
new_status: The new stock status
line_item: The ReturnOrderLineItem instance
Returns:
A descriptive note string
"""
outcome_names = {
self.OUTCOME_PENDING: 'Pending',
self.OUTCOME_RETURN: 'Return',
self.OUTCOME_REPAIR: 'Repair',
self.OUTCOME_REPLACE: 'Replace',
self.OUTCOME_REFUND: 'Refund',
self.OUTCOME_REJECT: 'Reject',
}
status_names = {
self.STOCK_OK: 'OK',
self.STOCK_ATTENTION: 'Attention',
self.STOCK_DAMAGED: 'Damaged',
self.STOCK_DESTROYED: 'Destroyed',
self.STOCK_REJECTED: 'Rejected',
self.STOCK_LOST: 'Lost',
self.STOCK_QUARANTINED: 'Quarantined',
self.STOCK_RETURNED: 'Returned',
}
outcome_name = outcome_names.get(outcome, f'#{outcome}')
status_name = status_names.get(new_status, f'#{new_status}')
# Format: "RMA-0003: Repair → OK"
note = f'{return_order.reference}: {outcome_name}{status_name}'
# Append any notes from the line item
if line_item.notes:
note += f'\n{line_item.notes}'
return note
def _update_stock_item(
self,
stock_item,
new_status: int,
customer=None,
note: Optional[str] = None,
) -> None:
"""Update a stock item's status and optionally reassign to customer.
Args:
stock_item: The StockItem instance
new_status: The new status code to set
customer: Optional customer Company to reassign to
note: Optional tracking note to add
"""
from stock.status_codes import StockHistoryCode
# Only update if status is changing
if stock_item.status != new_status:
stock_item.set_status(new_status)
# Reassign to customer if specified
if customer is not None:
stock_item.customer = customer
# Save the stock item
stock_item.save(add_note=False)
# Add tracking note if provided
if note:
stock_item.add_tracking_entry(
StockHistoryCode.EDITED,
None, # User - will be None for automated actions
notes=note,
deltas={'status': new_status},
)
def _consume_repair_parts(self, order_id: int) -> None:
"""Consume all allocated repair parts for a return order.
Args:
order_id: The primary key of the ReturnOrder
"""
from inventree_rma_plugin.models import RepairStockAllocation
from order.models import ReturnOrder
from stock.status_codes import StockHistoryCode
try:
return_order = ReturnOrder.objects.get(pk=order_id)
except ReturnOrder.DoesNotExist:
logger.error('RMA Automation: Return order not found for parts consumption', order_id=order_id)
return
# Get all unconsumed allocations for this order's line items
allocations = RepairStockAllocation.objects.filter(
return_order_line__order=return_order,
consumed=False,
)
for allocation in allocations:
stock_item = allocation.stock_item
quantity = allocation.quantity
logger.info(
'RMA Automation: Consuming repair part',
stock_item_id=stock_item.pk,
quantity=quantity,
return_order=return_order.reference,
)
# Subtract the quantity from stock
if stock_item.quantity >= quantity:
stock_item.quantity -= quantity
stock_item.save(add_note=False)
# Add tracking entry
stock_item.add_tracking_entry(
StockHistoryCode.EDITED,
None,
notes=f'Consumed for repair: {return_order.reference}',
deltas={'removed': float(quantity)},
)
# Mark allocation as consumed
allocation.consumed = True
allocation.save()
else:
logger.warning(
'RMA Automation: Insufficient stock to consume',
stock_item_id=stock_item.pk,
available=stock_item.quantity,
requested=quantity,
)

View File

@@ -0,0 +1,499 @@
/**
* Repair Parts Panel for RMA Automation Plugin
* React-based implementation using InvenTree's native components and form system
*/
// Format quantity - show whole number if no decimal, otherwise show decimal
function formatQty(value) {
if (value == null) return '0';
const num = parseFloat(value);
return Number.isInteger(num) ? num.toString() : num.toFixed(2).replace(/\.?0+$/, '');
}
// API helper using the context's api instance or fetch fallback
async function apiFetch(ctx, url, options = {}) {
// If we have the InvenTree api instance, use it
if (ctx?.api) {
try {
const response = await ctx.api({
url: url,
method: options.method || 'GET',
data: options.body ? JSON.parse(options.body) : undefined,
});
return response.data;
} catch (error) {
throw new Error(error.response?.data?.detail || error.message || 'Request failed');
}
}
// Fallback to fetch
const response = await fetch(url, {
...options,
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': getCsrfToken(),
...options.headers,
},
});
if (!response.ok) {
const error = await response.json().catch(() => ({ detail: 'Request failed' }));
throw new Error(error.detail || JSON.stringify(error));
}
if (response.status === 204) return null;
return response.json();
}
function getCsrfToken() {
const name = 'csrftoken';
let cookieValue = null;
if (document.cookie && document.cookie !== '') {
const cookies = document.cookie.split(';');
for (let i = 0; i < cookies.length; i++) {
const cookie = cookies[i].trim();
if (cookie.substring(0, name.length + 1) === (name + '=')) {
cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
break;
}
}
}
return cookieValue;
}
// Main Panel Component
function RepairPartsPanel({ ctx }) {
const React = window.React;
const { useState, useEffect, useCallback, useMemo } = React;
const { Button, Table, Text, Group, Badge, Stack, Loader, Alert, Paper, ActionIcon, Tooltip } = window.MantineCore;
const IconTrash = window.TablerIcons?.IconTrash;
const [allocations, setAllocations] = useState([]);
const [lines, setLines] = useState([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState(null);
const [showForm, setShowForm] = useState(false);
const returnOrderId = ctx.id;
const isComplete = ctx.instance?.status === 30;
// Fetch data
const fetchData = useCallback(async () => {
try {
setLoading(true);
setError(null);
const [allocData, linesData] = await Promise.all([
apiFetch(ctx, `/plugin/rma-automation/allocations/?return_order=${returnOrderId}`),
apiFetch(ctx, `/api/order/ro-line/?order=${returnOrderId}`),
]);
setAllocations(allocData || []);
setLines(linesData?.results || linesData || []);
} catch (err) {
setError(err.message);
} finally {
setLoading(false);
}
}, [returnOrderId]);
useEffect(() => {
fetchData();
}, [fetchData]);
// Delete allocation
const handleDelete = async (id) => {
if (!confirm('Remove this repair part allocation?')) return;
try {
await apiFetch(ctx, `/plugin/rma-automation/allocations/${id}/`, { method: 'DELETE' });
fetchData();
} catch (err) {
alert('Failed to remove: ' + err.message);
}
};
// Handle form success
const handleFormSuccess = () => {
setShowForm(false);
fetchData();
};
if (loading) {
return React.createElement(Group, { justify: 'center', p: 'xl' },
React.createElement(Loader, { size: 'sm' }),
React.createElement(Text, { size: 'sm', c: 'dimmed' }, 'Loading repair parts...')
);
}
if (error) {
return React.createElement(Alert, { color: 'red', title: 'Error' }, error);
}
// Render delete button with or without icon
const renderDeleteButton = (allocId) => {
if (IconTrash) {
return React.createElement(Tooltip, { label: 'Remove allocation' },
React.createElement(ActionIcon, {
color: 'red',
variant: 'subtle',
onClick: () => handleDelete(allocId)
}, React.createElement(IconTrash, { size: 16 }))
);
}
return React.createElement(Button, {
color: 'red',
size: 'xs',
variant: 'subtle',
onClick: () => handleDelete(allocId)
}, '×');
};
return React.createElement(Stack, { gap: 'md' },
// Header with description and add button
React.createElement(Group, { justify: 'space-between', align: 'center' },
React.createElement(Text, { size: 'sm', c: 'dimmed' },
isComplete
? 'This order is complete. Allocated parts have been consumed.'
: 'Allocate stock items to be consumed when this return order is completed.'
),
!isComplete && React.createElement(Button, {
onClick: () => setShowForm(true),
size: 'compact-sm',
variant: 'light',
}, '+ Add Part')
),
// Allocations table
allocations.length === 0
? React.createElement(Paper, { p: 'lg', withBorder: true },
React.createElement(Text, { c: 'dimmed', ta: 'center', fs: 'italic' },
'No repair parts allocated yet.'
)
)
: React.createElement(Table, {
striped: true,
highlightOnHover: true,
withTableBorder: true,
verticalSpacing: 'sm',
},
React.createElement(Table.Thead, null,
React.createElement(Table.Tr, null,
React.createElement(Table.Th, { style: { width: '30%' } }, 'Repair Item'),
React.createElement(Table.Th, { style: { width: '30%' } }, 'Replacement Part'),
React.createElement(Table.Th, { style: { textAlign: 'center', width: '10%' } }, 'Qty'),
React.createElement(Table.Th, { style: { textAlign: 'center', width: '20%' } }, 'Status'),
!isComplete && React.createElement(Table.Th, { style: { width: '10%' } }, '')
)
),
React.createElement(Table.Tbody, null,
allocations.map(alloc => {
const detail = alloc.stock_item_detail || {};
const lineDetail = alloc.line_item_detail || {};
return React.createElement(Table.Tr, { key: alloc.id },
// Repair Item column - show the line item being repaired
React.createElement(Table.Td, null,
React.createElement(Stack, { gap: 0 },
React.createElement(Text, { size: 'sm', fw: 500 }, lineDetail.part_name || 'Unknown'),
lineDetail.serial && React.createElement(Text, { size: 'xs', c: 'dimmed' }, `SN: ${lineDetail.serial}`)
)
),
// Replacement Part column - show the stock item being consumed with location
React.createElement(Table.Td, null,
React.createElement(Stack, { gap: 0 },
React.createElement(Text, { size: 'sm' }, detail.part_name || 'Unknown'),
(detail.serial || detail.batch) && React.createElement(Text, { size: 'xs', c: 'dimmed' },
detail.serial ? `SN: ${detail.serial}` : `Batch: ${detail.batch}`
),
detail.location_name && React.createElement(Text, { size: 'xs', c: 'teal', fw: 500 },
detail.location_name
)
)
),
React.createElement(Table.Td, { style: { textAlign: 'center' } },
React.createElement(Text, { size: 'sm' }, formatQty(alloc.quantity))
),
React.createElement(Table.Td, { style: { textAlign: 'center' } },
React.createElement(Badge, {
color: alloc.consumed ? 'green' : 'blue',
variant: 'light',
size: 'sm'
}, alloc.consumed ? 'Consumed' : 'Allocated')
),
!isComplete && React.createElement(Table.Td, { style: { textAlign: 'center' } },
!alloc.consumed && renderDeleteButton(alloc.id)
)
);
})
)
),
// Add form modal
showForm && React.createElement(AddAllocationForm, {
ctx: ctx,
lines: lines,
returnOrderId: returnOrderId,
onSuccess: handleFormSuccess,
onCancel: () => setShowForm(false),
})
);
}
// Add Allocation Form Component (Modal)
function AddAllocationForm({ ctx, lines, returnOrderId, onSuccess, onCancel }) {
const React = window.React;
const { useState, useEffect } = React;
const { Modal, TextInput, Select, NumberInput, Button, Text, Group, Badge, Stack, Loader, Alert, Paper, ScrollArea, UnstyledButton, Box, Divider } = window.MantineCore;
const [selectedLine, setSelectedLine] = useState('');
const [searchQuery, setSearchQuery] = useState('');
const [searchResults, setSearchResults] = useState([]);
const [selectedPart, setSelectedPart] = useState(null);
const [stockItems, setStockItems] = useState([]);
const [selectedStock, setSelectedStock] = useState(null);
const [quantity, setQuantity] = useState(1);
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
const [searching, setSearching] = useState(false);
const [loadingStock, setLoadingStock] = useState(false);
// Search parts
useEffect(() => {
if (searchQuery.length < 2) {
setSearchResults([]);
return;
}
const timer = setTimeout(async () => {
setSearching(true);
try {
const data = await apiFetch(ctx, `/api/part/?search=${encodeURIComponent(searchQuery)}&limit=20`);
setSearchResults(data?.results || data || []);
} catch (err) {
console.error('Search failed:', err);
} finally {
setSearching(false);
}
}, 300);
return () => clearTimeout(timer);
}, [searchQuery]);
// Load stock when part selected - only show available stock
useEffect(() => {
if (!selectedPart) {
setStockItems([]);
return;
}
setLoadingStock(true);
(async () => {
try {
// Filter for in_stock=true and available=true to exclude fully allocated stock
const data = await apiFetch(ctx, `/api/stock/?part=${selectedPart.pk}&in_stock=true&available=true&limit=50`);
const items = data?.results || data || [];
// Calculate available quantity for each item
const itemsWithAvailable = items.map(item => ({
...item,
available_quantity: Math.max(0, (item.quantity || 0) - (item.allocated || 0))
}));
// Filter out items with no available quantity
setStockItems(itemsWithAvailable.filter(item => item.available_quantity > 0));
} catch (err) {
console.error('Failed to load stock:', err);
} finally {
setLoadingStock(false);
}
})();
}, [selectedPart]);
// Submit
const handleSubmit = async () => {
if (!selectedLine || !selectedStock || !quantity) {
setError('Please fill in all fields');
return;
}
setLoading(true);
setError(null);
try {
await apiFetch(ctx, '/plugin/rma-automation/allocations/', {
method: 'POST',
body: JSON.stringify({
return_order_line: parseInt(selectedLine),
stock_item: selectedStock.pk,
quantity: quantity,
}),
});
onSuccess();
} catch (err) {
setError(err.message);
} finally {
setLoading(false);
}
};
const lineOptions = lines.map(line => ({
value: String(line.pk),
label: `${line.item_detail?.part_detail?.name || 'Unknown'}${line.item_detail?.serial ? ` (SN: ${line.item_detail.serial})` : ''}`
}));
return React.createElement(Modal, {
opened: true,
onClose: onCancel,
title: 'Add Repair Part',
size: 'lg',
centered: true,
overlayProps: { backgroundOpacity: 0.55, blur: 3 },
},
React.createElement(Stack, { gap: 'md' },
// Step 1: Line item select
React.createElement(Select, {
label: 'Line Item Being Repaired',
description: 'Select which return order item this repair part is for',
placeholder: 'Select line item...',
data: lineOptions,
value: selectedLine,
onChange: setSelectedLine,
searchable: true,
required: true,
withAsterisk: true,
}),
React.createElement(Divider, { label: 'Repair Part Selection', labelPosition: 'center' }),
// Step 2: Part search
!selectedPart ? React.createElement(Stack, { gap: 'xs' },
React.createElement(TextInput, {
label: 'Search for Part',
placeholder: 'Type at least 2 characters to search...',
value: searchQuery,
onChange: (e) => setSearchQuery(e.target.value),
rightSection: searching ? React.createElement(Loader, { size: 'xs' }) : null,
}),
searchResults.length > 0 && React.createElement(Paper, { withBorder: true, p: 0, radius: 'sm' },
React.createElement(ScrollArea, { h: 200 },
searchResults.map((part, index) => React.createElement(React.Fragment, { key: part.pk },
React.createElement(UnstyledButton, {
onClick: () => {
setSelectedPart(part);
setSearchQuery('');
setSearchResults([]);
},
style: { display: 'block', width: '100%' },
},
React.createElement(Box, {
p: 'sm',
style: {
cursor: 'pointer',
transition: 'background-color 150ms ease',
},
onMouseEnter: (e) => e.currentTarget.style.backgroundColor = 'var(--mantine-color-gray-1)',
onMouseLeave: (e) => e.currentTarget.style.backgroundColor = 'transparent',
},
React.createElement(Text, { size: 'sm', fw: 500 }, part.full_name || part.name),
part.description && React.createElement(Text, { size: 'xs', c: 'dimmed', lineClamp: 1 }, part.description)
)
),
index < searchResults.length - 1 && React.createElement(Divider, null)
))
)
)
) : React.createElement(Paper, { withBorder: true, p: 'sm', radius: 'sm', bg: 'var(--mantine-color-blue-light)' },
React.createElement(Group, { justify: 'space-between' },
React.createElement(Stack, { gap: 0 },
React.createElement(Text, { size: 'xs', c: 'dimmed' }, 'Selected Part'),
React.createElement(Text, { fw: 500 }, selectedPart.full_name || selectedPart.name)
),
React.createElement(Button, {
onClick: () => {
setSelectedPart(null);
setSelectedStock(null);
setStockItems([]);
setQuantity(1);
},
variant: 'subtle',
size: 'xs',
}, 'Change')
)
),
// Step 3: Stock items
selectedPart && React.createElement(Stack, { gap: 'xs' },
React.createElement(Group, { justify: 'space-between' },
React.createElement(Text, { size: 'sm', fw: 500 }, 'Select Stock Item'),
React.createElement(Text, { size: 'xs', c: 'dimmed' }, 'Only unallocated stock shown')
),
loadingStock
? React.createElement(Group, { justify: 'center', p: 'md' },
React.createElement(Loader, { size: 'sm' })
)
: stockItems.length === 0
? React.createElement(Alert, { color: 'yellow', variant: 'light' }, 'No available stock for this part')
: React.createElement(Paper, { withBorder: true, p: 0, radius: 'sm' },
React.createElement(ScrollArea, { h: 200 },
stockItems.map((stock, index) => React.createElement(React.Fragment, { key: stock.pk },
React.createElement(UnstyledButton, {
onClick: () => {
setSelectedStock(stock);
setQuantity(Math.min(1, stock.available_quantity));
},
style: { display: 'block', width: '100%' },
},
React.createElement(Box, {
p: 'sm',
style: {
backgroundColor: selectedStock?.pk === stock.pk ? 'var(--mantine-color-blue-light)' : undefined,
cursor: 'pointer',
},
},
React.createElement(Group, { justify: 'space-between', wrap: 'nowrap' },
React.createElement(Stack, { gap: 0 },
React.createElement(Text, { size: 'sm', fw: selectedStock?.pk === stock.pk ? 600 : 400 },
stock.serial ? `SN: ${stock.serial}` :
stock.batch ? `Batch: ${stock.batch}` :
`Stock #${stock.pk}`
),
React.createElement(Text, { size: 'xs', c: 'dimmed' },
stock.location_detail?.pathstring || 'Unknown location'
)
),
React.createElement(Badge, {
color: 'green',
variant: 'light',
size: 'lg'
}, formatQty(stock.available_quantity))
)
)
),
index < stockItems.length - 1 && React.createElement(Divider, null)
))
)
)
),
// Step 4: Quantity
selectedStock && React.createElement(NumberInput, {
label: 'Quantity to Allocate',
description: `Maximum available: ${formatQty(selectedStock.available_quantity)}`,
value: quantity,
onChange: setQuantity,
min: 0.00001,
max: selectedStock.available_quantity,
step: 1,
required: true,
withAsterisk: true,
}),
// Error
error && React.createElement(Alert, { color: 'red', variant: 'light', withCloseButton: true, onClose: () => setError(null) }, error),
// Buttons
React.createElement(Group, { justify: 'flex-end', mt: 'md' },
React.createElement(Button, { variant: 'default', onClick: onCancel }, 'Cancel'),
React.createElement(Button, {
onClick: handleSubmit,
loading: loading,
disabled: !selectedLine || !selectedStock || !quantity,
}, 'Add Repair Part')
)
)
);
}
// Export the render function for InvenTree plugin system
// Using the new single-argument signature - InvenTree handles context wrapping
export function renderRepairPartsPanel(ctx) {
return window.React.createElement(RepairPartsPanel, { ctx: ctx });
}

47
pyproject.toml Normal file
View File

@@ -0,0 +1,47 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "inventree-rma-plugin"
version = "0.2.0"
description = "InvenTree plugin for automating RMA workflow with repair parts tracking"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Timmy Hadwen", email = "timmy@hadwen.net"}
]
requires-python = ">=3.9"
classifiers = [
"Development Status :: 3 - Alpha",
"Framework :: Django",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
keywords = ["inventree", "plugin", "rma", "return-order"]
[project.urls]
Homepage = "https://github.com/timmyhadwen/inventree-rma-plugin"
Repository = "https://github.com/timmyhadwen/inventree-rma-plugin"
[project.entry-points."inventree_plugins"]
RMAAutomationPlugin = "inventree_rma_plugin.rma_automation:RMAAutomationPlugin"
[tool.setuptools.packages.find]
where = ["."]
include = ["inventree_rma_plugin*"]
[tool.setuptools.package-data]
"inventree_rma_plugin" = [
"static/**/*",
"migrations/*",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = "test_*.py"

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests for InvenTree RMA Automation Plugin."""

View File

@@ -0,0 +1,377 @@
"""Tests for RMA Automation Plugin."""
import sys
from unittest import TestCase
from unittest.mock import MagicMock, patch, PropertyMock
# Mock InvenTree modules before importing the plugin
sys.modules['plugin'] = MagicMock()
sys.modules['plugin.mixins'] = MagicMock()
sys.modules['structlog'] = MagicMock()
class MockEventMixin:
"""Mock EventMixin class."""
pass
class MockSettingsMixin:
"""Mock SettingsMixin class."""
def get_setting(self, key):
"""Return mock settings."""
return getattr(self, f'_setting_{key}', None)
class MockInvenTreePlugin:
"""Mock InvenTreePlugin class."""
pass
# Patch the mixins before import
with patch.dict(sys.modules, {
'plugin': MagicMock(),
'plugin.mixins': MagicMock(),
}):
sys.modules['plugin'].InvenTreePlugin = MockInvenTreePlugin
sys.modules['plugin.mixins'].EventMixin = MockEventMixin
sys.modules['plugin.mixins'].SettingsMixin = MockSettingsMixin
from inventree_rma_plugin.rma_automation import RMAAutomationPlugin
class TestRMAAutomationPlugin(TestCase):
"""Test cases for RMAAutomationPlugin."""
def setUp(self):
"""Set up test fixtures."""
self.plugin = RMAAutomationPlugin()
# Set default settings
self.plugin._setting_ENABLE_AUTO_STATUS = True
self.plugin._setting_ENABLE_CUSTOMER_REASSIGN = False
self.plugin._setting_ADD_TRACKING_NOTES = True
self.plugin._setting_STATUS_FOR_RETURN = 10 # OK
self.plugin._setting_STATUS_FOR_REPAIR = 10 # OK
self.plugin._setting_STATUS_FOR_REPLACE = 50 # Attention
self.plugin._setting_STATUS_FOR_REFUND = 50 # Attention
self.plugin._setting_STATUS_FOR_REJECT = 65 # Rejected
def test_plugin_metadata(self):
"""Test plugin metadata is correctly defined."""
self.assertEqual(self.plugin.NAME, 'RMA Automation')
self.assertEqual(self.plugin.SLUG, 'rma-automation')
self.assertEqual(self.plugin.VERSION, '0.2.0')
self.assertEqual(self.plugin.AUTHOR, 'Timmy Hadwen')
def test_wants_process_event_returns_true_for_returnorder_completed(self):
"""Test that plugin wants to process returnorder.completed events."""
result = self.plugin.wants_process_event('returnorder.completed')
self.assertTrue(result)
def test_wants_process_event_returns_false_for_other_events(self):
"""Test that plugin ignores other events."""
self.assertFalse(self.plugin.wants_process_event('returnorder.created'))
self.assertFalse(self.plugin.wants_process_event('salesorder.completed'))
self.assertFalse(self.plugin.wants_process_event('stock.changed'))
self.assertFalse(self.plugin.wants_process_event(''))
def test_process_event_returns_early_when_disabled(self):
"""Test that processing is skipped when auto status is disabled."""
self.plugin._setting_ENABLE_AUTO_STATUS = False
# Should not raise any errors and return early
self.plugin.process_event('returnorder.completed', id=1)
def test_process_event_returns_early_without_order_id(self):
"""Test that processing is skipped when no order ID provided."""
# Should not raise any errors and return early
self.plugin.process_event('returnorder.completed')
def test_get_status_for_outcome_return(self):
"""Test status mapping for RETURN outcome."""
result = self.plugin._get_status_for_outcome(self.plugin.OUTCOME_RETURN)
self.assertEqual(result, 10) # OK
def test_get_status_for_outcome_repair(self):
"""Test status mapping for REPAIR outcome."""
result = self.plugin._get_status_for_outcome(self.plugin.OUTCOME_REPAIR)
self.assertEqual(result, 10) # OK
def test_get_status_for_outcome_replace(self):
"""Test status mapping for REPLACE outcome."""
result = self.plugin._get_status_for_outcome(self.plugin.OUTCOME_REPLACE)
self.assertEqual(result, 50) # Attention
def test_get_status_for_outcome_refund(self):
"""Test status mapping for REFUND outcome."""
result = self.plugin._get_status_for_outcome(self.plugin.OUTCOME_REFUND)
self.assertEqual(result, 50) # Attention
def test_get_status_for_outcome_reject(self):
"""Test status mapping for REJECT outcome."""
result = self.plugin._get_status_for_outcome(self.plugin.OUTCOME_REJECT)
self.assertEqual(result, 65) # Rejected
def test_get_status_for_outcome_pending(self):
"""Test status mapping for PENDING outcome returns None."""
result = self.plugin._get_status_for_outcome(self.plugin.OUTCOME_PENDING)
self.assertIsNone(result)
def test_get_status_for_outcome_unknown(self):
"""Test status mapping for unknown outcome returns None."""
result = self.plugin._get_status_for_outcome(999)
self.assertIsNone(result)
def test_build_tracking_note(self):
"""Test tracking note generation."""
mock_return_order = MagicMock()
mock_return_order.reference = 'RMA-001'
mock_line_item = MagicMock()
mock_line_item.notes = None
note = self.plugin._build_tracking_note(
self.plugin.OUTCOME_REPAIR,
mock_return_order,
self.plugin.STOCK_OK,
mock_line_item,
)
self.assertEqual(note, 'RMA-001: Repair → OK')
def test_build_tracking_note_with_line_item_note(self):
"""Test tracking note includes line item notes."""
mock_return_order = MagicMock()
mock_return_order.reference = 'RMA-001'
mock_line_item = MagicMock()
mock_line_item.notes = 'Customer reported screen flickering'
note = self.plugin._build_tracking_note(
self.plugin.OUTCOME_REPAIR,
mock_return_order,
self.plugin.STOCK_OK,
mock_line_item,
)
self.assertEqual(note, 'RMA-001: Repair → OK\nCustomer reported screen flickering')
def test_build_tracking_note_unknown_outcome(self):
"""Test tracking note with unknown outcome code."""
mock_return_order = MagicMock()
mock_return_order.reference = 'RMA-002'
mock_line_item = MagicMock()
mock_line_item.notes = None
note = self.plugin._build_tracking_note(999, mock_return_order, 10, mock_line_item)
self.assertIn('#999', note)
def test_settings_defined(self):
"""Test that all expected settings are defined."""
expected_settings = [
'ENABLE_AUTO_STATUS',
'ENABLE_CUSTOMER_REASSIGN',
'ADD_TRACKING_NOTES',
'STATUS_FOR_RETURN',
'STATUS_FOR_REPAIR',
'STATUS_FOR_REPLACE',
'STATUS_FOR_REFUND',
'STATUS_FOR_REJECT',
]
for setting in expected_settings:
self.assertIn(setting, self.plugin.SETTINGS)
def test_settings_have_defaults(self):
"""Test that all settings have default values."""
for key, config in self.plugin.SETTINGS.items():
self.assertIn('default', config, f"Setting {key} missing default")
def test_outcome_constants(self):
"""Test outcome constant values match InvenTree."""
self.assertEqual(self.plugin.OUTCOME_PENDING, 10)
self.assertEqual(self.plugin.OUTCOME_RETURN, 20)
self.assertEqual(self.plugin.OUTCOME_REPAIR, 30)
self.assertEqual(self.plugin.OUTCOME_REPLACE, 40)
self.assertEqual(self.plugin.OUTCOME_REFUND, 50)
self.assertEqual(self.plugin.OUTCOME_REJECT, 60)
def test_stock_status_constants(self):
"""Test stock status constant values match InvenTree."""
self.assertEqual(self.plugin.STOCK_OK, 10)
self.assertEqual(self.plugin.STOCK_ATTENTION, 50)
self.assertEqual(self.plugin.STOCK_DAMAGED, 55)
self.assertEqual(self.plugin.STOCK_DESTROYED, 60)
self.assertEqual(self.plugin.STOCK_REJECTED, 65)
self.assertEqual(self.plugin.STOCK_QUARANTINED, 75)
self.assertEqual(self.plugin.STOCK_RETURNED, 85)
class TestProcessLineItem(TestCase):
"""Test cases for _process_line_item method."""
def setUp(self):
"""Set up test fixtures."""
self.plugin = RMAAutomationPlugin()
self.plugin._setting_ENABLE_AUTO_STATUS = True
self.plugin._setting_ENABLE_CUSTOMER_REASSIGN = True
self.plugin._setting_ADD_TRACKING_NOTES = True
self.plugin._setting_STATUS_FOR_RETURN = 10
self.plugin._setting_STATUS_FOR_REPAIR = 10
self.plugin._setting_STATUS_FOR_REPLACE = 50
self.plugin._setting_STATUS_FOR_REFUND = 50
self.plugin._setting_STATUS_FOR_REJECT = 65
def test_process_line_item_with_no_stock_item(self):
"""Test handling of line item with no stock item."""
line_item = MagicMock()
line_item.item = None
line_item.pk = 1
return_order = MagicMock()
customer = MagicMock()
# Should not raise any errors
self.plugin._process_line_item(
line_item, return_order, customer, True, True
)
def test_process_line_item_with_pending_outcome(self):
"""Test handling of line item with PENDING outcome (no status change)."""
stock_item = MagicMock()
stock_item.pk = 1
line_item = MagicMock()
line_item.item = stock_item
line_item.outcome = self.plugin.OUTCOME_PENDING
return_order = MagicMock()
customer = MagicMock()
with patch.object(self.plugin, '_update_stock_item') as mock_update:
self.plugin._process_line_item(
line_item, return_order, customer, True, True
)
# Should not call update for PENDING outcome
mock_update.assert_not_called()
def test_process_line_item_repair_with_customer_reassign(self):
"""Test REPAIR outcome with customer reassignment enabled."""
stock_item = MagicMock()
stock_item.pk = 1
line_item = MagicMock()
line_item.item = stock_item
line_item.outcome = self.plugin.OUTCOME_REPAIR
return_order = MagicMock()
return_order.reference = 'RMA-001'
customer = MagicMock()
with patch.object(self.plugin, '_update_stock_item') as mock_update:
self.plugin._process_line_item(
line_item, return_order, customer,
enable_reassign=True, add_notes=True
)
mock_update.assert_called_once()
call_args = mock_update.call_args
# Check that customer is passed for reassignment
self.assertEqual(call_args[0][0], stock_item)
self.assertEqual(call_args[0][1], 10) # OK status
self.assertEqual(call_args[0][2], customer)
def test_process_line_item_reject_no_customer_reassign(self):
"""Test REJECT outcome does not reassign customer."""
stock_item = MagicMock()
stock_item.pk = 1
line_item = MagicMock()
line_item.item = stock_item
line_item.outcome = self.plugin.OUTCOME_REJECT
return_order = MagicMock()
return_order.reference = 'RMA-001'
customer = MagicMock()
with patch.object(self.plugin, '_update_stock_item') as mock_update:
self.plugin._process_line_item(
line_item, return_order, customer,
enable_reassign=True, add_notes=True
)
mock_update.assert_called_once()
call_args = mock_update.call_args
# Check that customer is NOT passed for rejected items
self.assertEqual(call_args[0][1], 65) # Rejected status
self.assertIsNone(call_args[0][2]) # No customer
class TestUpdateStockItem(TestCase):
"""Test cases for _update_stock_item method."""
def setUp(self):
"""Set up test fixtures."""
self.plugin = RMAAutomationPlugin()
def test_update_stock_item_status_change(self):
"""Test updating stock item status."""
stock_item = MagicMock()
stock_item.status = 75 # QUARANTINED
stock_item.customer = None
with patch.dict(sys.modules, {'stock.status_codes': MagicMock()}):
self.plugin._update_stock_item(
stock_item,
new_status=10, # OK
customer=None,
note=None,
)
stock_item.set_status.assert_called_once_with(10)
stock_item.save.assert_called_once_with(add_note=False)
def test_update_stock_item_with_customer_reassign(self):
"""Test updating stock item with customer reassignment."""
stock_item = MagicMock()
stock_item.status = 75
stock_item.customer = None
customer = MagicMock()
customer.name = 'Test Customer'
with patch.dict(sys.modules, {'stock.status_codes': MagicMock()}):
self.plugin._update_stock_item(
stock_item,
new_status=10,
customer=customer,
note=None,
)
self.assertEqual(stock_item.customer, customer)
stock_item.save.assert_called_once_with(add_note=False)
def test_update_stock_item_with_tracking_note(self):
"""Test updating stock item with tracking note."""
stock_item = MagicMock()
stock_item.status = 75
mock_history_code = MagicMock()
mock_history_code.EDITED = 5
with patch.dict(sys.modules, {'stock.status_codes': MagicMock(StockHistoryCode=mock_history_code)}):
self.plugin._update_stock_item(
stock_item,
new_status=10,
customer=None,
note='Test tracking note',
)
stock_item.add_tracking_entry.assert_called_once()
call_args = stock_item.add_tracking_entry.call_args
self.assertEqual(call_args[1]['notes'], 'Test tracking note')
if __name__ == '__main__':
import unittest
unittest.main()