1# SPDX-License-Identifier: Apache-2.0
2# Copyright (C) 2025 Marcin Zieba <marcinpsk@gmail.com>
3import dataclasses
4import logging
5import re
6
7import yaml
8from django.conf import settings
9from django.contrib import messages
10from django.core.exceptions import PermissionDenied
11from django.http import JsonResponse
12from django.shortcuts import redirect, render
13from django.urls import reverse
14from django.utils.http import url_has_allowed_host_and_scheme
15from netbox.views import generic
16from netbox.views.generic.base import BaseMultiObjectView
17from utilities.views import register_model_view
18
19from .filters import InterfaceNameRuleFilterSet
20from .forms import InterfaceNameRuleFilterForm, InterfaceNameRuleForm, InterfaceNameRuleImportForm, RuleTestForm
21from .models import InterfaceNameRule
22from .tables import InterfaceNameRuleTable
23
24logger = logging.getLogger(__name__)
25
26try:
27 _plugins_config = getattr(settings, "PLUGINS_CONFIG", {})
28 APPLY_BATCH_LIMIT = max(1, int(_plugins_config.get("netbox_interface_name_rules", {}).get("apply_batch_limit", 50)))
29except (ValueError, TypeError):
30 APPLY_BATCH_LIMIT = 50
31
32
33@dataclasses.dataclass
34class RulePreview:
35 """Lightweight stand-in for InterfaceNameRule used in the test/preview view."""
36
37 module_type_is_regex: bool
38 module_type_pattern: str
39 module_type: object
40 parent_module_type: object
41 device_type: object
42 platform: object
43 name_template: str
44 channel_count: int
45 channel_start: int
46
47
48try:
49 from netbox.object_actions import AddObject, BulkDelete, BulkEdit, BulkExport, BulkImport, BulkRename
50
51 class _YAMLOnlyExport(BulkExport):
52 """Export action that only offers YAML (no CSV "Current View" option)."""
53
54 template_name = "netbox_interface_name_rules/buttons/export_yaml_only.html"
55
56 _LIST_VIEW_ACTIONS: tuple = (AddObject, BulkImport, _YAMLOnlyExport, BulkEdit, BulkRename, BulkDelete)
57except ImportError:
58 _LIST_VIEW_ACTIONS = None
59
60
61class InterfaceNameRuleListView(generic.ObjectListView):
62 """List view for InterfaceNameRule."""
63
64 queryset = InterfaceNameRule.objects.all()
65 table = InterfaceNameRuleTable
66 filterset = InterfaceNameRuleFilterSet
67 filterset_form = InterfaceNameRuleFilterForm
68 template_name = "netbox_interface_name_rules/interfacenamerule_list.html"
69 if _LIST_VIEW_ACTIONS is not None:
70 actions = _LIST_VIEW_ACTIONS
71
72 def export_yaml(self):
73 """Export all rules as a single YAML list (overrides NetBox's per-object concatenation)."""
74 data = []
75 for rule in self.queryset.order_by("pk").select_related(
76 "module_type", "parent_module_type", "device_type", "platform"
77 ):
78 entry = {}
79 for header, value in zip(InterfaceNameRule.csv_headers, rule.to_csv()):
80 if (value != "" and value is not None) or header in {"name_template"}:
81 entry[header] = value
82 data.append(entry)
83 return yaml.dump(data, default_flow_style=False, allow_unicode=True, sort_keys=False)
84
85
86class InterfaceNameRuleCreateView(generic.ObjectEditView):
87 """Create view for InterfaceNameRule."""
88
89 queryset = InterfaceNameRule.objects.all()
90 form = InterfaceNameRuleForm
91
92
93@register_model_view(InterfaceNameRule, "bulk_import", path="import", detail=False)
94class InterfaceNameRuleBulkImportView(generic.BulkImportView):
95 """Bulk import view for InterfaceNameRule."""
96
97 queryset = InterfaceNameRule.objects.all()
98 model_form = InterfaceNameRuleImportForm
99
100
101class InterfaceNameRuleView(generic.ObjectView):
102 """Detail view for InterfaceNameRule."""
103
104 queryset = InterfaceNameRule.objects.all()
105
106
107class InterfaceNameRuleEditView(generic.ObjectEditView):
108 """Edit view for InterfaceNameRule."""
109
110 queryset = InterfaceNameRule.objects.all()
111 form = InterfaceNameRuleForm
112
113
114class InterfaceNameRuleDeleteView(generic.ObjectDeleteView):
115 """Delete view for InterfaceNameRule."""
116
117 queryset = InterfaceNameRule.objects.all()
118
119
120class InterfaceNameRuleBulkDeleteView(generic.BulkDeleteView):
121 """Bulk delete view for InterfaceNameRule."""
122
123 queryset = InterfaceNameRule.objects.all()
124 table = InterfaceNameRuleTable
125
126
127class InterfaceNameRuleChangeLogView(generic.ObjectChangeLogView):
128 """Change-log view for InterfaceNameRule."""
129
130 queryset = InterfaceNameRule.objects.all()
131
132
133class InterfaceNameRuleDuplicateView(generic.ObjectView):
134 """Redirect to the add view pre-populated with a clone of the given rule."""
135
136 queryset = InterfaceNameRule.objects.all()
137
138 def get(self, request, **kwargs):
139 """Redirect to the add view pre-populated with fields cloned from the given rule."""
140 from utilities.querydict import prepare_cloned_fields
141
142 rule = self.get_object(**kwargs)
143 params = prepare_cloned_fields(rule)
144 url = reverse("plugins:netbox_interface_name_rules:interfacenamerule_add")
145 return redirect(f"{url}?{params.urlencode()}")
146
147
148class RuleTestView(BaseMultiObjectView):
149 """Live-preview a name template with user-supplied variable values and optional DB lookup."""
150
151 queryset = InterfaceNameRule.objects.all()
152 template_name = "netbox_interface_name_rules/rule_test.html"
153
154 def get_required_permission(self):
155 """Return the permission required to access the rule test tool."""
156 return "netbox_interface_name_rules.add_interfacenamerule"
157
158 def get(self, request):
159 """Render the test form, pre-populated from rule_id query param if given."""
160 initial = {}
161 loaded_rule = None
162 rule_id = request.GET.get("rule_id")
163 can_view = request.user.has_perm("netbox_interface_name_rules.view_interfacenamerule")
164 if rule_id and not can_view:
165 messages.warning(request, "You do not have permission to load an existing rule.")
166 if rule_id and can_view:
167 try:
168 loaded_rule = (
169 InterfaceNameRule.objects.restrict(request.user, "view")
170 .select_related("module_type", "parent_module_type", "device_type", "platform")
171 .get(pk=int(rule_id))
172 )
173 initial = {
174 "name_template": loaded_rule.name_template,
175 "module_type_is_regex": loaded_rule.module_type_is_regex,
176 "module_type": loaded_rule.module_type,
177 "module_type_pattern": loaded_rule.module_type_pattern,
178 "parent_module_type": loaded_rule.parent_module_type,
179 "device_type": loaded_rule.device_type,
180 "platform": loaded_rule.platform,
181 "channel_count": loaded_rule.channel_count,
182 "channel_start": loaded_rule.channel_start,
183 }
184 except (InterfaceNameRule.DoesNotExist, ValueError):
185 pass
186 return render(request, self.template_name, {"form": RuleTestForm(initial=initial), "loaded_rule": loaded_rule})
187
188 def post(self, request):
189 """Evaluate the submitted template and return a preview or redirect to save."""
190 form = RuleTestForm(request.POST)
191 preview_results = None
192 db_preview = None
193 db_total = 0
194 error = None
195 action = request.POST.get("action", "check")
196
197 if form.is_valid():
198 cd = form.cleaned_data
199 if action == "save_rule":
200 return self._handle_save_rule(request, cd)
201 preview_results, error = self._evaluate_template_preview(cd)
202 db_preview, db_total, db_error = self._fetch_db_preview(cd)
203 if db_error and not error:
204 error = db_error
205
206 return render(
207 request,
208 self.template_name,
209 {
210 "form": form,
211 "preview_results": preview_results,
212 "db_preview": db_preview,
213 "db_total": db_total,
214 "error": error,
215 },
216 )
217
218 def _find_existing_rule(self, cd, user=None):
219 """Return the first existing rule matching the form data, or None."""
220 module_type_is_regex = cd.get("module_type_is_regex", False)
221 qs = InterfaceNameRule.objects.restrict(user, "view") if user else InterfaceNameRule.objects.all()
222 if module_type_is_regex:
223 qs = qs.filter(module_type_is_regex=True, module_type_pattern=cd.get("module_type_pattern", ""))
224 else:
225 qs = qs.filter(module_type_is_regex=False, module_type=cd.get("module_type"))
226 for field in ("parent_module_type", "device_type", "platform"):
227 val = cd.get(field)
228 if val:
229 qs = qs.filter(**{field: val})
230 else:
231 qs = qs.filter(**{f"{field}__isnull": True})
232 return qs.first()
233
234 def _handle_save_rule(self, request, cd):
235 """Find an existing matching rule or redirect to the add-rule form with pre-filled params."""
236 from urllib.parse import urlencode
237
238 name_template = cd["name_template"]
239 channel_count = cd.get("channel_count") or 0
240 channel_start = cd.get("channel_start") or 0
241 module_type_is_regex = cd.get("module_type_is_regex", False)
242 module_type = cd.get("module_type")
243
244 # Skip duplicate detection when the user lacks view permission — we cannot
245 # query existing rules without it, so add-only users always land on the
246 # create form (potentially allowing duplicates).
247 if request.user.has_perm("netbox_interface_name_rules.view_interfacenamerule"):
248 existing = self._find_existing_rule(cd, request.user)
249 if existing:
250 messages.info(
251 request,
252 f"A matching rule already exists (#{existing.pk}). Redirecting to edit it.",
253 )
254 return redirect(
255 reverse("plugins:netbox_interface_name_rules:interfacenamerule_edit", args=[existing.pk])
256 )
257
258 params = {
259 "name_template": name_template,
260 "module_type_is_regex": "on" if module_type_is_regex else "",
261 "channel_count": channel_count,
262 "channel_start": channel_start,
263 }
264 if module_type_is_regex:
265 params["module_type_pattern"] = cd.get("module_type_pattern", "")
266 elif module_type:
267 params["module_type"] = module_type.pk
268 for field in ("parent_module_type", "device_type", "platform"):
269 val = cd.get(field)
270 if val:
271 params[field] = val.pk
272 add_url = reverse("plugins:netbox_interface_name_rules:interfacenamerule_add")
273 return redirect(f"{add_url}?{urlencode(params)}")
274
275 def _evaluate_template_preview(self, cd):
276 """Evaluate name_template against form variables; return (preview_results, error)."""
277 from .engine import evaluate_name_template
278
279 name_template = cd["name_template"]
280 channel_count = cd.get("channel_count") or 0
281 channel_start = cd.get("channel_start") or 0
282 variables = {
283 "slot": cd.get("var_slot") or "1",
284 "bay_position": cd.get("var_bay_position") or "1",
285 "bay_position_num": cd.get("var_bay_position_num") or "1",
286 "parent_bay_position": cd.get("var_parent_bay_position") or "1",
287 "sfp_slot": cd.get("var_sfp_slot") or "1",
288 "base": cd.get("var_base") or "Ethernet1",
289 }
290 try:
291 if channel_count > 0:
292 preview_results = []
293 for ch in range(channel_count):
294 vars_copy = dict(variables)
295 vars_copy["channel"] = str(channel_start + ch)
296 preview_results.append(
297 {
298 "source": variables["base"],
299 "channel": str(channel_start + ch),
300 "result": evaluate_name_template(name_template, vars_copy),
301 }
302 )
303 else:
304 preview_results = [
305 {
306 "source": variables["base"],
307 "channel": None,
308 "result": evaluate_name_template(name_template, variables),
309 }
310 ]
311 return preview_results, None
312 except Exception as exc:
313 logger.exception("Template evaluation error: %s", exc)
314 return None, type(exc).__name__
315
316 def _fetch_db_preview(self, cd):
317 """Run find_interfaces_for_rule against the DB; return (db_preview, db_total, error)."""
318 from .engine import find_interfaces_for_rule
319
320 name_template = cd["name_template"]
321 channel_count = cd.get("channel_count") or 0
322 channel_start = cd.get("channel_start") or 0
323 module_type_is_regex = cd.get("module_type_is_regex", False)
324 module_type = cd.get("module_type")
325 module_type_pattern = cd.get("module_type_pattern", "")
326
327 if not ((module_type_is_regex and module_type_pattern) or (not module_type_is_regex and module_type)):
328 return None, 0, None
329
330 fake = RulePreview(
331 module_type_is_regex=module_type_is_regex,
332 module_type_pattern=module_type_pattern,
333 module_type=module_type,
334 parent_module_type=cd.get("parent_module_type"),
335 device_type=cd.get("device_type"),
336 platform=cd.get("platform"),
337 name_template=name_template,
338 channel_count=channel_count,
339 channel_start=channel_start,
340 )
341 try:
342 db_preview, db_total = find_interfaces_for_rule(fake, limit=100)
343 return db_preview, db_total, None
344 except (re.error, ValueError) as exc:
345 return [], 0, f"Invalid module type regex: {exc}"
346 except Exception as exc:
347 logger.exception("Unexpected error in find_interfaces_for_rule: %s", exc)
348 return [], 0, f"Unexpected error: {type(exc).__name__}"
349
350
351class RuleApplyListView(BaseMultiObjectView):
352 """Display all rules with buttons to preview/apply each one."""
353
354 queryset = InterfaceNameRule.objects.all()
355 template_name = "netbox_interface_name_rules/rule_apply.html"
356
357 def get_required_permission(self):
358 """Return the permission required to access the apply-rules page."""
359 return "netbox_interface_name_rules.view_interfacenamerule"
360
361 def get(self, request):
362 """Render the list of all rules with apply/preview buttons."""
363 rules = self.queryset.select_related("module_type", "parent_module_type", "device_type", "platform").order_by(
364 "pk"
365 )
366 return render(request, self.template_name, {"rules": rules, "batch_limit": APPLY_BATCH_LIMIT})
367
368
369class RuleApplicableView(generic.ObjectView):
370 """Return JSON indicating whether a rule would rename at least one interface.
371
372 Called on demand from the Apply Rules page — NOT at page load — to avoid
373 expensive full-scan queries blocking the initial render.
374 """
375
376 queryset = InterfaceNameRule.objects.all()
377
378 def get(self, request, **kwargs):
379 """Return JSON {"applicable": bool} for the rule identified by pk."""
380 from .engine import has_applicable_interfaces
381
382 rule = self.get_object(**kwargs)
383 try:
384 applicable = has_applicable_interfaces(rule)
385 except Exception as exc:
386 logger.exception("applicability scan failed for rule %s", kwargs.get("pk"))
387 return JsonResponse(
388 {"applicable": None, "error": f"scan failed: {type(exc).__name__}"},
389 status=500,
390 )
391 return JsonResponse({"applicable": applicable})
392
393
394class RuleApplyDetailView(generic.ObjectView):
395 """Show a preview of changes for a specific rule and allow applying them."""
396
397 queryset = InterfaceNameRule.objects.all()
398 template_name = "netbox_interface_name_rules/rule_apply_detail.html"
399 additional_permissions = ["dcim.change_interface"]
400
401 def get(self, request, **kwargs):
402 """Render a preview of all interfaces that would be renamed by this rule."""
403 from .engine import find_interfaces_for_rule
404
405 rule = self.get_object(**kwargs)
406 try:
407 preview, total_checked = find_interfaces_for_rule(rule, limit=APPLY_BATCH_LIMIT)
408 except (re.error, ValueError) as exc:
409 logger.exception("Failed to compute preview for rule %s: %s", rule, exc)
410 messages.error(request, f"Failed to compute preview: {exc}")
411 preview, total_checked = [], 0
412 except Exception as exc:
413 logger.exception("Unexpected error computing preview for rule %s: %s", rule, exc)
414 messages.error(request, f"Failed to compute preview: {type(exc).__name__}")
415 preview, total_checked = [], 0
416 return render(
417 request,
418 self.template_name,
419 {
420 "rule": rule,
421 "preview": preview,
422 "total_checked": total_checked,
423 "batch_limit": APPLY_BATCH_LIMIT,
424 "has_more": len(preview) >= APPLY_BATCH_LIMIT,
425 "can_apply": request.user.has_perm("dcim.change_interface"),
426 },
427 )
428
429 def post(self, request, **kwargs):
430 """Apply the rule (foreground batch or background job) and redirect back."""
431 from .engine import apply_rule_to_existing
432
433 rule = self.get_object(**kwargs)
434 action = request.POST.get("action", "apply")
435
436 if action == "background":
437 from .jobs import ApplyRuleJob
438
439 try:
440 job = ApplyRuleJob.enqueue(
441 # instance is intentionally omitted: InterfaceNameRule does not
442 # inherit JobsMixin, so passing instance= would fail full_clean().
443 # The job is still named and findable in Core → Jobs.
444 name=f"Apply rule: {rule}",
445 user=request.user,
446 rule_id=rule.pk,
447 )
448 messages.success(request, f"Background job enqueued (job #{job.pk}). Check Core → Jobs for status.")
449 except Exception as e:
450 logger.exception("Failed to enqueue background job for rule %s: %s", rule, e)
451 messages.error(request, f"Failed to enqueue background job: {type(e).__name__}")
452 else:
453 try:
454 raw_ids = request.POST.getlist("interface_ids")
455 interface_ids = [int(i) for i in raw_ids if i.isdigit()]
456 if not interface_ids:
457 messages.warning(request, "No interfaces selected; nothing was applied.")
458 else:
459 count = apply_rule_to_existing(rule, limit=APPLY_BATCH_LIMIT, interface_ids=interface_ids)
460 messages.success(request, f"Applied rule: {count} interface(s) renamed.")
461 except Exception as e:
462 logger.exception("Failed to apply rule %s: %s", rule, e)
463 messages.error(request, f"Failed to apply rule {rule}: {type(e).__name__}")
464
465 return redirect("plugins:netbox_interface_name_rules:interfacenamerule_apply_detail", pk=rule.pk)
466
467
468@register_model_view(InterfaceNameRule, name="toggle", path="toggle")
469class RuleToggleView(generic.ObjectView):
470 """POST /rules/<pk>/toggle/ — flip the enabled flag on a rule."""
471
472 queryset = InterfaceNameRule.objects.all()
473
474 def post(self, request, pk):
475 """Toggle the enabled flag on the rule and return JSON or redirect."""
476 rule = self.get_object(pk=pk)
477 if not request.user.has_perm("netbox_interface_name_rules.change_interfacenamerule"):
478 if request.headers.get("X-Requested-With") == "XMLHttpRequest":
479 return JsonResponse({"error": "Permission denied"}, status=403)
480 raise PermissionDenied
481 rule.enabled = not rule.enabled
482 rule.save(update_fields=["enabled"])
483 if request.headers.get("X-Requested-With") == "XMLHttpRequest":
484 return JsonResponse({"enabled": rule.enabled, "pk": pk})
485 state = "enabled" if rule.enabled else "disabled"
486 messages.success(request, f"Rule '{rule}' {state}.")
487 referer = request.META.get("HTTP_REFERER", "")
488 if referer and url_has_allowed_host_and_scheme(
489 referer, allowed_hosts={request.get_host()}, require_https=request.is_secure()
490 ):
491 return redirect(referer)
492 return redirect("plugins:netbox_interface_name_rules:interfacenamerule_list")