1# SPDX-License-Identifier: Apache-2.0
2# Copyright (C) 2025 Marcin Zieba <marcinpsk@gmail.com>
3"""Core renaming engine — rule lookup and interface rename logic.
4
5This module is imported lazily by signals.py so that model imports happen
6after Django is fully initialised.
7"""
8
9import ast
10import logging
11import re
12
13from django.core.exceptions import ValidationError
14from django.db import IntegrityError, transaction
15from django.db.models.functions import Length
16
17logger = logging.getLogger(__name__)
18
19
20def _get_parent_module_type(module_bay):
21 """Return the module type of the module installed in the parent bay, or None.
22
23 Used by ``apply_interface_name_rules`` to scope rules to a specific parent
24 module type (e.g., SFP inside a CVR-X2-SFP converter).
25 """
26 if module_bay.parent:
27 parent_bay = module_bay.parent
28 if hasattr(parent_bay, "installed_module") and parent_bay.installed_module:
29 return parent_bay.installed_module.module_type
30 return None
31
32
33def _collect_unrenamed(interfaces, rule, raw_names, force_reapply):
34 """Return the subset of *interfaces* that should be processed by the rule.
35
36 Normal (non-force) mode: only interfaces whose current name is still in the
37 raw template names (idempotency guard).
38
39 force_reapply, non-channel: all interfaces (e.g. vc_position changed).
40
41 force_reapply, channel rule: one interface per base name, matching when the
42 full base OR its last path segment is in *raw_names*; prefers the ":0"
43 channel to avoid duplicate-name errors on re-apply.
44 """
45 if not force_reapply:
46 return [i for i in interfaces if i.name in raw_names]
47 if rule.channel_count == 0:
48 return interfaces
49 # Breakout + force_reapply: deduplicate by base, prefer ":0"
50 seen_bases: dict = {}
51 for i in interfaces:
52 base = i.name.rsplit(":", 1)[0]
53 # Also match when the last path segment equals a raw name (already-renamed bases).
54 if base in raw_names or base.rsplit("/", 1)[-1] in raw_names:
55 if base not in seen_bases or i.name.endswith(":0"):
56 seen_bases[base] = i
57 return list(seen_bases.values())
58
59
60def apply_interface_name_rules(module, module_bay, force_reapply=False):
61 """Apply InterfaceNameRule rename after module installation.
62
63 Looks up a matching rule for (module_type, parent_module_type, device_type, platform)
64 and renames interfaces created by NetBox's template instantiation.
65
66 Only processes interfaces whose name still matches the raw bay position
67 (i.e., haven't been renamed yet), ensuring idempotency. Pass
68 ``force_reapply=True`` to skip this check and re-apply rules to ALL
69 module interfaces (used when vc_position or other variables change).
70
71 Returns:
72 Number of interfaces renamed/created, or 0 if no rule matched.
73
74 """
75 from dcim.models import Interface
76
77 device_type = module.device.device_type if module.device else None
78 platform = module.device.platform if module.device else None
79 rule = find_matching_rule(module.module_type, _get_parent_module_type(module_bay), device_type, platform)
80
81 if not rule:
82 return 0
83
84 variables = build_variables(module_bay, device=module.device)
85 interfaces = list(Interface.objects.filter(module=module))
86
87 if not interfaces:
88 return 0
89
90 # Determine raw names NetBox assigned from templates; fall back to bay_position.
91 raw_names = _get_raw_interface_names(module) or {variables["bay_position"]}
92 unrenamed = _collect_unrenamed(interfaces, rule, raw_names, force_reapply)
93
94 if not unrenamed:
95 return 0 # Already renamed (idempotent guard)
96
97 renamed = 0
98 for iface in unrenamed:
99 vars_copy = dict(variables)
100 vars_copy["base"] = iface.name
101 renamed += _apply_rule_to_interface(rule, iface, vars_copy, module)
102
103 if unrenamed and renamed == 0:
104 # All interfaces already have the names the rule would produce — flag as
105 # potentially obsolete (e.g., newer NetBox generates correct names natively).
106 _flag_rule_potentially_deprecated(rule)
107
108 return renamed
109
110
111def _try_rename_device_interface(rule, iface, vc_position, device, renamed_pks):
112 """Attempt to rename a single device-level interface using *rule*.
113
114 Returns ``True`` if the interface was successfully renamed, ``False`` otherwise.
115 Mutates ``renamed_pks`` on success.
116 """
117 if iface.pk in renamed_pks:
118 return False # Already renamed by a higher-priority rule
119
120 if rule.module_type_pattern:
121 try:
122 if not re.fullmatch(rule.module_type_pattern, iface.name):
123 return False
124 except re.error:
125 return False
126
127 port = iface.name.rsplit("/", 1)[-1] if "/" in iface.name else iface.name
128 variables = {"vc_position": vc_position, "base": iface.name, "port": port}
129
130 try:
131 new_name = evaluate_name_template(rule.name_template, variables)
132 except (ValueError, TypeError, re.error):
133 logger.exception(
134 "Failed to evaluate template %r for interface %s (rule %s)",
135 rule.name_template,
136 iface.name,
137 rule.pk,
138 )
139 return False
140
141 if new_name == iface.name:
142 return False
143
144 old_name = iface.name
145 iface.name = new_name
146 try:
147 iface.full_clean()
148 except ValidationError:
149 logger.exception(
150 "Validation failed for device interface %s → %s (rule %s, device %s)",
151 old_name,
152 new_name,
153 rule.pk,
154 device.pk,
155 )
156 iface.name = old_name
157 return False
158 try:
159 iface.save()
160 except (IntegrityError, ValidationError):
161 logger.exception(
162 "DB save failed for device interface %s → %s (rule %s, device %s)",
163 old_name,
164 new_name,
165 rule.pk,
166 device.pk,
167 )
168 iface.name = old_name
169 return False
170
171 renamed_pks.add(iface.pk)
172 logger.debug("Renamed device interface %s → %s (rule %s, device %s)", old_name, new_name, rule.pk, device.pk)
173 return True
174
175
176def apply_device_interface_rules(device):
177 """Rename device-level interfaces (module=None) when a device joins/changes position in a VC.
178
179 Finds all enabled rules with ``applies_to_device_interfaces=True`` that match the device's
180 type and platform, then renames any matching interfaces using the name_template.
181
182 Template variables available: ``{vc_position}``, ``{base}`` (full current name),
183 ``{port}`` (segment after the last ``/``, or the full name if no ``/`` present).
184
185 Returns the number of interfaces renamed.
186 """
187 from dcim.models import Interface
188
189 from .models import InterfaceNameRule
190
191 if not getattr(device, "virtual_chassis_id", None):
192 return 0 # Only rename for VC members (vc_position must be set)
193
194 if device.vc_position is None:
195 return 0 # vc_position unset (e.g. VC master before position assigned)
196
197 vc_position = str(device.vc_position)
198 device_type = getattr(device, "device_type", None)
199 platform = getattr(device, "platform", None)
200
201 from django.db.models import Q
202
203 rules = list(
204 InterfaceNameRule.objects.filter(
205 applies_to_device_interfaces=True,
206 enabled=True,
207 )
208 .filter(Q(device_type=device_type) | Q(device_type__isnull=True))
209 .filter(Q(platform=platform) | Q(platform__isnull=True))
210 )
211 # Sort Python-side: specificity_score descending, then module_type_pattern length
212 # descending (for device-interface rules with ties), then pk ascending for stability.
213 # (InterfaceNameRule has no DB 'priority' field; specificity_score is a property.)
214 rules.sort(
215 key=lambda r: (
216 -r.specificity_score,
217 -(len(r.module_type_pattern or "") if r.applies_to_device_interfaces else 0),
218 r.pk,
219 )
220 )
221
222 if not rules:
223 return 0
224
225 interfaces = list(Interface.objects.filter(device=device, module=None))
226 if not interfaces:
227 return 0
228
229 total = 0
230 renamed_pks: set[int] = set()
231 for rule in rules:
232 for iface in interfaces:
233 if _try_rename_device_interface(rule, iface, vc_position, device, renamed_pks):
234 total += 1
235
236 return total
237
238
239def _get_raw_interface_names(module):
240 """Return the original interface names NetBox assigned from templates.
241
242 Prefetches module relationships to avoid per-template queries when
243 InterfaceTemplate.resolve_name() dereferences the module bay chain.
244 """
245 from dcim.models import InterfaceTemplate, Module
246
247 module_fresh = Module.objects.select_related(
248 "module_bay",
249 "module_bay__parent",
250 "module_bay__module",
251 "module_bay__module__module_bay",
252 "module_bay__module__module_bay__parent",
253 "module_bay__module__module_bay__module",
254 ).get(pk=module.pk)
255 templates = InterfaceTemplate.objects.filter(module_type=module_fresh.module_type)
256 return {tmpl.resolve_name(module_fresh) for tmpl in templates}
257
258
259def _flag_rule_potentially_deprecated(rule):
260 """Tag a rule as 'potentially-deprecated' when its rename is a no-op.
261
262 Called from apply_interface_name_rules when a matching rule produces no
263 renames because NetBox already generates the correct interface names. This
264 may indicate the rule is no longer needed (e.g. after a NetBox upgrade that
265 improved template resolution), or only needed for a subset of module types.
266
267 Adds a NetBox Tag 'potentially-deprecated' so the rule is visually flagged
268 in the UI for operator review. Failures are logged but never re-raised so
269 the install path is not disrupted.
270 """
271 try:
272 from extras.models import Tag
273
274 tag, _ = Tag.objects.get_or_create(
275 slug="potentially-deprecated",
276 defaults={"name": "potentially-deprecated", "color": "ffc107"},
277 )
278 rule.tags.add(tag)
279 logger.info(
280 "Rule '%s' flagged as potentially-deprecated: NetBox already generates the correct interface names.",
281 rule,
282 )
283 except Exception:
284 logger.exception("Failed to flag rule '%s' as potentially-deprecated.", rule)
285
286
287def _scope_filter(field: str, value) -> dict:
288 """Return ``{field: value}`` or ``{field__isnull: True}`` for a nullable FK.
289
290 Avoids repeating the ``if value is None`` pattern throughout rule lookup.
291 """
292 if value is None:
293 return {f"{field}__isnull": True}
294 return {field: value}
295
296
297def _build_candidates(parent_module_type, device_type, platform) -> list:
298 """Build ordered list of (pmt, dt, pl) tuples from most to least specific.
299
300 Each argument expands to ``[value, None]`` when provided, or ``[None]``
301 when already absent. Deduplication ensures no key appears twice (which
302 would happen when multiple inputs are None).
303 """
304 seen: set = set()
305 candidates = []
306 pmt_opts = [parent_module_type, None] if parent_module_type else [None]
307 dt_opts = [device_type, None] if device_type else [None]
308 pl_opts = [platform, None] if platform else [None]
309 for pmt in pmt_opts:
310 for dt in dt_opts:
311 for pl in pl_opts:
312 key = (pmt, dt, pl)
313 if key not in seen:
314 seen.add(key)
315 candidates.append(key)
316 return candidates
317
318
319def _find_exact_match(module_type, candidates):
320 """Tier 1: return the first enabled exact-FK rule in specificity order, or None."""
321 from .models import InterfaceNameRule
322
323 for pmt, dt, pl in candidates:
324 filters = {"module_type": module_type, "module_type_is_regex": False, "enabled": True}
325 filters.update(_scope_filter("parent_module_type", pmt))
326 filters.update(_scope_filter("device_type", dt))
327 filters.update(_scope_filter("platform", pl))
328 rule = InterfaceNameRule.objects.filter(**filters).first()
329 if rule:
330 return rule
331 return None
332
333
334def _find_regex_match(model_name: str, candidates):
335 """Tier 2: return the first enabled regex rule whose pattern fullmatches *model_name*, or None.
336
337 Tries candidates in specificity order; within each level longer patterns
338 are tried first (more specific). Invalid regex patterns are silently skipped.
339 """
340 from .models import InterfaceNameRule
341
342 for pmt, dt, pl in candidates:
343 filters = {"module_type_is_regex": True, "enabled": True}
344 filters.update(_scope_filter("parent_module_type", pmt))
345 filters.update(_scope_filter("device_type", dt))
346 filters.update(_scope_filter("platform", pl))
347 qs = (
348 InterfaceNameRule.objects.filter(**filters)
349 .annotate(pattern_length=Length("module_type_pattern"))
350 .order_by("-pattern_length", "pk")
351 )
352 for rule in qs:
353 try:
354 if re.fullmatch(rule.module_type_pattern, model_name):
355 return rule
356 except re.error:
357 continue
358 return None
359
360
361def find_matching_rule(module_type, parent_module_type, device_type, platform=None):
362 """Find the most specific InterfaceNameRule matching the context.
363
364 Uses a two-tier strategy:
365 Tier 1 — Exact FK match (priority order, most specific first):
366 Iterates all combinations of (parent_module_type, device_type, platform)
367 from fully-constrained to fully-unconstrained (None = any).
368 Tier 2 — Regex pattern match (same priority order, longer patterns first):
369 Same specificity cascade, but module_type_pattern is matched via
370 re.fullmatch() against module_type.model. Patterns are iterated
371 from longest to shortest to prefer more specific patterns.
372
373 Returns the first matching rule, or None if no rule matches.
374 """
375 candidates = _build_candidates(parent_module_type, device_type, platform)
376 return _find_exact_match(module_type, candidates) or _find_regex_match(module_type.model, candidates)
377
378
379def _extract_trailing_digits(s: str) -> str:
380 r"""Return the trailing digit run of *s* without regex backtracking.
381
382 Pure O(n) string scan — eliminates the polynomial backtracking risk that
383 arises from using ``re.search(r"(\d+)$", ...)`` on strings ending in a
384 non-digit character (e.g. ``"1" * n + "x"`` would cause O(n²) steps).
385
386 Returns an empty string when *s* has no trailing digits.
387 """
388 i = len(s)
389 while i > 0 and s[i - 1].isdigit():
390 i -= 1
391 return s[i:]
392
393
394def _resolve_bay_position(module_bay):
395 """Return (bay_position, bay_position_num) from a module bay's position field.
396
397 Handles template expressions like ``{module}`` by extracting the trailing
398 digit from the bay name. Falls back to ``"0"`` if no digit is found.
399 """
400 bay_position = module_bay.position or "0"
401 if bay_position.startswith("{"):
402 digits = _extract_trailing_digits(module_bay.name)
403 bay_position = digits if digits else "0"
404 digits = _extract_trailing_digits(bay_position)
405 bay_position_num = digits if digits else "0"
406 return bay_position, bay_position_num
407
408
409def _resolve_slot(module_bay, bay_position_num, parent_bay_position):
410 """Return the ``slot`` variable from the module bay hierarchy.
411
412 When the bay has a parent bay, slot comes from the parent (or grandparent
413 when two levels of nesting exist). When the bay belongs to an installed
414 module with its own bay, slot comes from that module's bay position.
415 Falls back to ``bay_position_num``.
416 """
417 if module_bay.parent:
418 parent_bay = module_bay.parent
419 if parent_bay.parent and hasattr(parent_bay.parent, "installed_module"):
420 return parent_bay.parent.position or parent_bay_position
421 return parent_bay_position
422 if hasattr(module_bay, "module") and module_bay.module:
423 owner_module = module_bay.module
424 if hasattr(owner_module, "module_bay") and owner_module.module_bay:
425 return owner_module.module_bay.position or bay_position_num
426 return bay_position_num
427
428
429def build_variables(module_bay, device=None):
430 """Build template variable dict from a module bay's position context.
431
432 Extracts numeric and raw position values from the bay and its parent chain,
433 producing the variables available for name_template substitution.
434
435 Returns a dict with keys: slot, bay_position, bay_position_num,
436 parent_bay_position, sfp_slot, and optionally vc_position.
437
438 ``vc_position`` is only injected when *device* is a Virtual Chassis member
439 (device.virtual_chassis_id is set). Templates using ``{vc_position}`` on a
440 non-VC device will raise ValueError during evaluation — this is intentional.
441 Note: Juniper VC positions start at 0, so 0 is a valid real-world value and
442 cannot be used as a "not in VC" sentinel.
443 """
444 bay_position, bay_position_num = _resolve_bay_position(module_bay)
445
446 parent_bay_position = "0"
447 if module_bay.parent:
448 parent_bay_position = module_bay.parent.position or "0"
449
450 slot = _resolve_slot(module_bay, bay_position_num, parent_bay_position)
451
452 result = {
453 "slot": slot,
454 "bay_position": bay_position,
455 "bay_position_num": bay_position_num,
456 "parent_bay_position": parent_bay_position,
457 "sfp_slot": bay_position_num,
458 }
459 if (
460 device is not None
461 and getattr(device, "virtual_chassis_id", None) is not None
462 and device.vc_position is not None
463 ):
464 result["vc_position"] = str(device.vc_position)
465 return result
466
467
468def _apply_rule_to_interface(rule, iface, variables, module):
469 """Apply a single rule to an interface, handling breakout channels.
470
471 All saves are wrapped in a transaction so a failure mid-breakout rolls
472 back any partially created interfaces.
473
474 Returns the number of interfaces renamed/created.
475 """
476 from dcim.models import Interface
477
478 count = 0
479
480 with transaction.atomic():
481 if rule.channel_count > 0:
482 # Breakout: rename base interface and create additional channel interfaces
483 for ch in range(rule.channel_count):
484 variables["channel"] = str(rule.channel_start + ch)
485 new_name = evaluate_name_template(rule.name_template, variables)
486 if ch == 0:
487 if new_name != iface.name:
488 iface.name = new_name
489 iface.full_clean()
490 iface.save()
491 count += 1
492 else:
493 if not Interface.objects.filter(module=module, name=new_name).exists():
494 breakout_iface = Interface(
495 device=module.device,
496 module=module,
497 name=new_name,
498 type=iface.type,
499 enabled=iface.enabled,
500 )
501 breakout_iface.full_clean()
502 breakout_iface.save()
503 count += 1
504 else:
505 # Simple rename (converter offset, platform naming, etc.)
506 new_name = evaluate_name_template(rule.name_template, variables)
507 if new_name != iface.name:
508 iface.name = new_name
509 iface.full_clean()
510 iface.save()
511 count += 1
512
513 return count
514
515
516def _find_channel_base(rule, ifaces, variables):
517 """Find the best 'base' interface for a channel rule on a single module.
518
519 Prefers an interface whose current name already equals the expected ch=0 name
520 (i.e. it has already been renamed to channel 0 and is safe to re-process).
521 Falls back to the first interface (alphabetically) so that on first apply,
522 the template-created base interface becomes channel 0.
523
524 This ensures apply_rule_to_existing / find_interfaces_for_rule call
525 _apply_rule_to_interface exactly ONCE per module for channel rules, preventing
526 duplicate-name IntegrityErrors when channels already exist.
527 """
528 if not ifaces:
529 return None
530 for iface in ifaces:
531 vars_copy = dict(variables)
532 vars_copy["base"] = iface.name
533 vars_copy["channel"] = str(rule.channel_start) # ch=0
534 try:
535 ch0_name = evaluate_name_template(rule.name_template, vars_copy)
536 if iface.name == ch0_name:
537 return iface
538 except ValueError:
539 pass
540 return ifaces[0]
541
542
543def _matching_moduletype_pks(module_type_pattern):
544 """Return PKs of ModuleTypes whose model name matches the given regex pattern.
545
546 Raises ValueError for invalid regex patterns, mirroring evaluate_name_template's
547 error-handling convention so callers can treat both as ValueError.
548 """
549 from dcim.models import ModuleType
550
551 try:
552 compiled = re.compile(module_type_pattern)
553 except re.error as exc:
554 raise ValueError(f"Invalid module_type_pattern regex '{module_type_pattern}': {exc}") from exc
555 return [mt.pk for mt in ModuleType.objects.only("pk", "model") if compiled.fullmatch(mt.model)]
556
557
558def has_applicable_interfaces(rule) -> bool:
559 """Check whether applying this rule right now would rename at least one interface.
560
561 Calls find_interfaces_for_rule(limit=1) to determine if any currently installed
562 interface would receive a new name. Returns False when:
563 - no matching modules/interfaces are installed, OR
564 - all matching interfaces are already correctly named.
565
566 This is more expensive than a plain EXISTS query but ensures the Applicable
567 column in the Apply Rules list accurately reflects "would something change?"
568 rather than the misleading "do interfaces exist?".
569 """
570 try:
571 results, _ = find_interfaces_for_rule(rule, limit=1)
572 return len(results) > 0
573 except (ValueError, re.error):
574 return False
575
576
577def _build_module_qs(rule):
578 """Return a Module queryset filtered to the rule's scope (module type, parent, device, platform).
579
580 Shared by ``find_interfaces_for_rule`` and ``apply_rule_to_existing`` to avoid
581 duplicating the filtering logic.
582 """
583 from dcim.models import Module
584
585 if rule.module_type_is_regex:
586 qs = Module.objects.filter(module_type__in=_matching_moduletype_pks(rule.module_type_pattern))
587 else:
588 qs = Module.objects.filter(module_type=rule.module_type)
589 if rule.parent_module_type:
590 qs = qs.filter(module_bay__parent__installed_module__module_type=rule.parent_module_type)
591 if rule.device_type:
592 qs = qs.filter(device__device_type=rule.device_type)
593 if rule.platform:
594 qs = qs.filter(device__platform=rule.platform)
595 return qs
596
597
598def _evaluate_plain_interface(rule, module, iface, variables) -> dict | None:
599 """Return a result dict if *iface* would be renamed by *rule*, else None."""
600 vars_copy = {**variables, "base": iface.name}
601 try:
602 new_name = evaluate_name_template(rule.name_template, vars_copy)
603 except ValueError as exc:
604 new_name = f"<error: {exc}>"
605 if new_name == iface.name:
606 return None
607 return {"module": module, "interface": iface, "current_name": iface.name, "new_names": [new_name]}
608
609
610def _channel_rule_entry(rule, module, ifaces, variables) -> dict | None:
611 """Return a result dict if the channel rule would change any name for this module, else None."""
612 base_iface = _find_channel_base(rule, ifaces, variables)
613 if base_iface is None:
614 return None
615 vars_copy = {**variables, "base": base_iface.name}
616 expected_names = []
617 try:
618 for ch in range(rule.channel_count):
619 expected_names.append(
620 evaluate_name_template(rule.name_template, {**vars_copy, "channel": str(rule.channel_start + ch)})
621 )
622 except ValueError as exc:
623 expected_names = [f"<error: {exc}>"]
624 existing_names = {i.name for i in ifaces}
625 # Report if any channel name is missing or the base itself needs renaming
626 if any(n not in existing_names for n in expected_names) or (
627 expected_names and expected_names[0] != base_iface.name
628 ):
629 return {"module": module, "interface": base_iface, "current_name": base_iface.name, "new_names": expected_names}
630 return None
631
632
633def _count_remaining_interfaces(module_qs, processed_pks) -> int:
634 """Count interfaces in modules not yet visited during a find_interfaces_for_rule scan."""
635 from dcim.models import Interface
636
637 return Interface.objects.filter(module__in=module_qs.exclude(pk__in=processed_pks)).count()
638
639
640def _process_channel_module(rule, module, ifaces, variables, limit, results, module_qs, processed_pks):
641 """Process one module for a channel rule. Returns (checked_count, should_stop)."""
642 checked = len(ifaces)
643 if not ifaces:
644 return checked, False
645 entry = _channel_rule_entry(rule, module, ifaces, variables)
646 if entry:
647 results.append(entry)
648 if limit is not None and len(results) >= limit:
649 return checked + _count_remaining_interfaces(module_qs, processed_pks), True
650 return checked, False
651
652
653def _process_plain_module(rule, module, ifaces, variables, limit, results, module_qs, processed_pks):
654 """Process one module for a plain (non-channel) rule. Returns (checked_count, should_stop)."""
655 checked = 0
656 for iface_idx, iface in enumerate(ifaces):
657 checked += 1
658 entry = _evaluate_plain_interface(rule, module, iface, variables)
659 if entry:
660 results.append(entry)
661 if limit is not None and len(results) >= limit:
662 checked += len(ifaces) - (iface_idx + 1)
663 checked += _count_remaining_interfaces(module_qs, processed_pks)
664 return checked, True
665 return checked, False
666
667
668def find_interfaces_for_rule(rule, limit=None):
669 """Find interfaces that would be renamed by applying the given rule retroactively.
670
671 Searches for all Module instances matching the rule's criteria and computes
672 what their interfaces would be renamed to.
673
674 Returns a tuple ``(results, total_checked)`` where *results* is a list of dicts::
675
676 {
677 "module": Module instance,
678 "interface": Interface instance,
679 "current_name": str,
680 "new_names": list[str], # one entry per channel, or single-element
681 }
682
683 Only includes entries where at least one new_name differs from current_name.
684 If *limit* is set the list is truncated after that many changed entries, but
685 *total_checked* always reflects the full count of interfaces examined.
686 """
687 from collections import defaultdict
688
689 from dcim.models import Interface
690
691 module_qs = _build_module_qs(rule).select_related(
692 "module_type",
693 "device",
694 "device__device_type",
695 "device__platform",
696 "device__virtual_chassis",
697 "module_bay",
698 "module_bay__parent",
699 )
700 process_fn = _process_channel_module if rule.channel_count > 0 else _process_plain_module
701
702 # Batch-load all interfaces for matching modules to avoid N+1 queries.
703 ifaces_by_module = defaultdict(list)
704 for iface in Interface.objects.filter(module__in=module_qs).order_by("module_id", "name"):
705 ifaces_by_module[iface.module_id].append(iface)
706
707 processed_pks = set()
708 results = []
709 total_checked = 0
710 for module in module_qs:
711 processed_pks.add(module.pk)
712 variables = build_variables(module.module_bay, device=module.device)
713 ifaces = ifaces_by_module.get(module.pk, [])
714 checked, stop = process_fn(rule, module, ifaces, variables, limit, results, module_qs, processed_pks)
715 total_checked += checked
716 if stop:
717 return results, total_checked
718
719 return results, total_checked
720
721
722def apply_rule_to_existing(rule, limit=None, interface_ids=None):
723 """Apply a rule retroactively to all matching installed modules.
724
725 Unlike apply_interface_name_rules(), this does not skip already-renamed
726 interfaces — it re-evaluates every interface on each matching module.
727
728 For channel rules (channel_count > 0), each module is processed as a single
729 unit using _find_channel_base() to pick the base interface. Calling
730 _apply_rule_to_interface for every interface in the module would produce
731 duplicate-name IntegrityErrors when channel interfaces already exist.
732
733 If *interface_ids* is provided (list/set of Interface PKs), only those
734 interfaces are processed; all others are skipped. For channel rules the
735 base interface PK is used as the selector. An empty *interface_ids*
736 collection returns 0 immediately without touching the database.
737
738 Returns the number of interfaces renamed/created.
739 """
740 from collections import defaultdict
741
742 from dcim.models import Interface
743
744 id_set = frozenset(interface_ids) if interface_ids is not None else None
745 if id_set is not None and not id_set:
746 return 0
747
748 if not rule.enabled:
749 return 0
750
751 module_qs = _build_module_qs(rule)
752
753 # Batch-load interfaces to avoid N+1 queries in the module loop.
754 ifaces_by_module = defaultdict(list)
755 for iface in Interface.objects.filter(module__in=module_qs).order_by("module_id", "name"):
756 ifaces_by_module[iface.module_id].append(iface)
757
758 count = 0
759 for module in module_qs.select_related("module_bay", "module_type", "device", "device__virtual_chassis"):
760 variables = build_variables(module.module_bay, device=module.device)
761 ifaces = ifaces_by_module.get(module.pk, [])
762
763 if rule.channel_count > 0:
764 # Channel rule: process module ONCE using the best base interface.
765 # Calling _apply_rule_to_interface for each existing interface would
766 # attempt to create the same channel names multiple times.
767 if not ifaces:
768 continue
769 base_iface = _find_channel_base(rule, ifaces, variables)
770 if id_set is not None and base_iface.pk not in id_set:
771 continue
772 vars_copy = dict(variables)
773 vars_copy["base"] = base_iface.name
774 try:
775 count += _apply_rule_to_interface(rule, base_iface, vars_copy, module)
776 except (ValueError, ValidationError, IntegrityError):
777 logger.exception(
778 "Failed to apply channel rule '%s' to module '%s' (id=%s); skipping.",
779 rule,
780 module,
781 module.pk,
782 )
783 else:
784 for iface in ifaces:
785 if id_set is not None and iface.pk not in id_set:
786 continue
787 vars_copy = dict(variables)
788 vars_copy["base"] = iface.name
789 try:
790 count += _apply_rule_to_interface(rule, iface, vars_copy, module)
791 except (ValueError, ValidationError, IntegrityError):
792 logger.exception(
793 "Failed to apply rule '%s' to interface '%s' (id=%s); skipping.",
794 rule,
795 iface.name,
796 iface.pk,
797 )
798 continue
799
800 if limit is not None and count >= limit:
801 return count
802
803 return count
804
805
806def evaluate_name_template(template: str, variables: dict) -> str:
807 """Evaluate a name template with variable substitution and safe arithmetic.
808
809 Supports templates like:
810 "GigabitEthernet{slot}/{8 + ({parent_bay_position} - 1) * 2 + {sfp_slot}}"
811
812 Variables are substituted first, then any brace-enclosed expression
813 containing arithmetic operators is safely evaluated via AST. True division
814 (/) is not allowed — use floor division (//) instead. Results are cast to
815 int to ensure interface names are always whole numbers.
816 """
817 # First pass: substitute all simple variables
818 result = template
819 for key, value in variables.items():
820 result = result.replace(f"{{{key}}}", str(value))
821
822 # Second pass: evaluate any remaining brace-enclosed arithmetic expressions
823 def _eval_expr(match):
824 expr = match.group(1).strip()
825 # Allow digits, arithmetic operators (excluding lone /), parens, whitespace.
826 # Negative lookahead disallows a single / that is not part of //.
827 if not re.match(r"^(?!.*(?<!/)/(?!/))[\d\s\+\-\*\(\/\)]+$", expr):
828 raise ValueError(f"Unsafe expression in name template: {expr}")
829 try:
830 node = ast.parse(expr, mode="eval")
831 for child in ast.walk(node):
832 if not isinstance(
833 child,
834 (
835 ast.Expression,
836 ast.BinOp,
837 ast.UnaryOp,
838 ast.Constant,
839 ast.Add,
840 ast.Sub,
841 ast.Mult,
842 ast.FloorDiv,
843 ast.USub,
844 ast.UAdd,
845 ),
846 ):
847 raise ValueError(f"Unsafe AST node in expression: {type(child).__name__}")
848 return str(int(eval(compile(node, "<template>", "eval")))) # noqa: S307
849 except (SyntaxError, TypeError) as e:
850 raise ValueError(f"Invalid arithmetic expression '{expr}': {e}") from e
851
852 return re.sub(r"\{([^}]+)\}", _eval_expr, result)