Coverage for  / home / runner / work / netbox-InterfaceNameRules-plugin / netbox-InterfaceNameRules-plugin / netbox-InterfaceNameRules-plugin / netbox_interface_name_rules / engine.py: 99%

394 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-02 15:19 +0000

1# SPDX-License-Identifier: Apache-2.0 

2# Copyright (C) 2025 Marcin Zieba <marcinpsk@gmail.com> 

3"""Core renaming engine — rule lookup and interface rename logic. 

4 

5This module is imported lazily by signals.py so that model imports happen 

6after Django is fully initialised. 

7""" 

8 

9import ast 

10import logging 

11import re 

12 

13from django.core.exceptions import ValidationError 

14from django.db import IntegrityError, transaction 

15from django.db.models.functions import Length 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20def _get_parent_module_type(module_bay): 

21 """Return the module type of the module installed in the parent bay, or None. 

22 

23 Used by ``apply_interface_name_rules`` to scope rules to a specific parent 

24 module type (e.g., SFP inside a CVR-X2-SFP converter). 

25 """ 

26 if module_bay.parent: 

27 parent_bay = module_bay.parent 

28 if hasattr(parent_bay, "installed_module") and parent_bay.installed_module: 

29 return parent_bay.installed_module.module_type 

30 return None 

31 

32 

33def _collect_unrenamed(interfaces, rule, raw_names, force_reapply): 

34 """Return the subset of *interfaces* that should be processed by the rule. 

35 

36 Normal (non-force) mode: only interfaces whose current name is still in the 

37 raw template names (idempotency guard). 

38 

39 force_reapply, non-channel: all interfaces (e.g. vc_position changed). 

40 

41 force_reapply, channel rule: one interface per base name, matching when the 

42 full base OR its last path segment is in *raw_names*; prefers the ":0" 

43 channel to avoid duplicate-name errors on re-apply. 

44 """ 

45 if not force_reapply: 

46 return [i for i in interfaces if i.name in raw_names] 

47 if rule.channel_count == 0: 

48 return interfaces 

49 # Breakout + force_reapply: deduplicate by base, prefer ":0" 

50 seen_bases: dict = {} 

51 for i in interfaces: 

52 base = i.name.rsplit(":", 1)[0] 

53 # Also match when the last path segment equals a raw name (already-renamed bases). 

54 if base in raw_names or base.rsplit("/", 1)[-1] in raw_names: 

55 if base not in seen_bases or i.name.endswith(":0"): 

56 seen_bases[base] = i 

57 return list(seen_bases.values()) 

58 

59 

60def apply_interface_name_rules(module, module_bay, force_reapply=False): 

61 """Apply InterfaceNameRule rename after module installation. 

62 

63 Looks up a matching rule for (module_type, parent_module_type, device_type, platform) 

64 and renames interfaces created by NetBox's template instantiation. 

65 

66 Only processes interfaces whose name still matches the raw bay position 

67 (i.e., haven't been renamed yet), ensuring idempotency. Pass 

68 ``force_reapply=True`` to skip this check and re-apply rules to ALL 

69 module interfaces (used when vc_position or other variables change). 

70 

71 Returns: 

72 Number of interfaces renamed/created, or 0 if no rule matched. 

73 

74 """ 

75 from dcim.models import Interface 

76 

77 device_type = module.device.device_type if module.device else None 

78 platform = module.device.platform if module.device else None 

79 rule = find_matching_rule(module.module_type, _get_parent_module_type(module_bay), device_type, platform) 

80 

81 if not rule: 

82 return 0 

83 

84 variables = build_variables(module_bay, device=module.device) 

85 interfaces = list(Interface.objects.filter(module=module)) 

86 

87 if not interfaces: 

88 return 0 

89 

90 # Determine raw names NetBox assigned from templates; fall back to bay_position. 

91 raw_names = _get_raw_interface_names(module) or {variables["bay_position"]} 

92 unrenamed = _collect_unrenamed(interfaces, rule, raw_names, force_reapply) 

93 

94 if not unrenamed: 

95 return 0 # Already renamed (idempotent guard) 

96 

97 renamed = 0 

98 for iface in unrenamed: 

99 vars_copy = dict(variables) 

100 vars_copy["base"] = iface.name 

101 renamed += _apply_rule_to_interface(rule, iface, vars_copy, module) 

102 

103 if unrenamed and renamed == 0: 

104 # All interfaces already have the names the rule would produce — flag as 

105 # potentially obsolete (e.g., newer NetBox generates correct names natively). 

106 _flag_rule_potentially_deprecated(rule) 

107 

108 return renamed 

109 

110 

111def _try_rename_device_interface(rule, iface, vc_position, device, renamed_pks): 

112 """Attempt to rename a single device-level interface using *rule*. 

113 

114 Returns ``True`` if the interface was successfully renamed, ``False`` otherwise. 

115 Mutates ``renamed_pks`` on success. 

116 """ 

117 if iface.pk in renamed_pks: 

118 return False # Already renamed by a higher-priority rule 

119 

120 if rule.module_type_pattern: 

121 try: 

122 if not re.fullmatch(rule.module_type_pattern, iface.name): 

123 return False 

124 except re.error: 

125 return False 

126 

127 port = iface.name.rsplit("/", 1)[-1] if "/" in iface.name else iface.name 

128 variables = {"vc_position": vc_position, "base": iface.name, "port": port} 

129 

130 try: 

131 new_name = evaluate_name_template(rule.name_template, variables) 

132 except (ValueError, TypeError, re.error): 

133 logger.exception( 

134 "Failed to evaluate template %r for interface %s (rule %s)", 

135 rule.name_template, 

136 iface.name, 

137 rule.pk, 

138 ) 

139 return False 

140 

141 if new_name == iface.name: 

142 return False 

143 

144 old_name = iface.name 

145 iface.name = new_name 

146 try: 

147 iface.full_clean() 

148 except ValidationError: 

149 logger.exception( 

150 "Validation failed for device interface %s → %s (rule %s, device %s)", 

151 old_name, 

152 new_name, 

153 rule.pk, 

154 device.pk, 

155 ) 

156 iface.name = old_name 

157 return False 

158 try: 

159 iface.save() 

160 except (IntegrityError, ValidationError): 

161 logger.exception( 

162 "DB save failed for device interface %s → %s (rule %s, device %s)", 

163 old_name, 

164 new_name, 

165 rule.pk, 

166 device.pk, 

167 ) 

168 iface.name = old_name 

169 return False 

170 

171 renamed_pks.add(iface.pk) 

172 logger.debug("Renamed device interface %s → %s (rule %s, device %s)", old_name, new_name, rule.pk, device.pk) 

173 return True 

174 

175 

176def apply_device_interface_rules(device): 

177 """Rename device-level interfaces (module=None) when a device joins/changes position in a VC. 

178 

179 Finds all enabled rules with ``applies_to_device_interfaces=True`` that match the device's 

180 type and platform, then renames any matching interfaces using the name_template. 

181 

182 Template variables available: ``{vc_position}``, ``{base}`` (full current name), 

183 ``{port}`` (segment after the last ``/``, or the full name if no ``/`` present). 

184 

185 Returns the number of interfaces renamed. 

186 """ 

187 from dcim.models import Interface 

188 

189 from .models import InterfaceNameRule 

190 

191 if not getattr(device, "virtual_chassis_id", None): 

192 return 0 # Only rename for VC members (vc_position must be set) 

193 

194 if device.vc_position is None: 

195 return 0 # vc_position unset (e.g. VC master before position assigned) 

196 

197 vc_position = str(device.vc_position) 

198 device_type = getattr(device, "device_type", None) 

199 platform = getattr(device, "platform", None) 

200 

201 from django.db.models import Q 

202 

203 rules = list( 

204 InterfaceNameRule.objects.filter( 

205 applies_to_device_interfaces=True, 

206 enabled=True, 

207 ) 

208 .filter(Q(device_type=device_type) | Q(device_type__isnull=True)) 

209 .filter(Q(platform=platform) | Q(platform__isnull=True)) 

210 ) 

211 # Sort Python-side: specificity_score descending, then module_type_pattern length 

212 # descending (for device-interface rules with ties), then pk ascending for stability. 

213 # (InterfaceNameRule has no DB 'priority' field; specificity_score is a property.) 

214 rules.sort( 

215 key=lambda r: ( 

216 -r.specificity_score, 

217 -(len(r.module_type_pattern or "") if r.applies_to_device_interfaces else 0), 

218 r.pk, 

219 ) 

220 ) 

221 

222 if not rules: 

223 return 0 

224 

225 interfaces = list(Interface.objects.filter(device=device, module=None)) 

226 if not interfaces: 

227 return 0 

228 

229 total = 0 

230 renamed_pks: set[int] = set() 

231 for rule in rules: 

232 for iface in interfaces: 

233 if _try_rename_device_interface(rule, iface, vc_position, device, renamed_pks): 

234 total += 1 

235 

236 return total 

237 

238 

239def _get_raw_interface_names(module): 

240 """Return the original interface names NetBox assigned from templates. 

241 

242 Prefetches module relationships to avoid per-template queries when 

243 InterfaceTemplate.resolve_name() dereferences the module bay chain. 

244 """ 

245 from dcim.models import InterfaceTemplate, Module 

246 

247 module_fresh = Module.objects.select_related( 

248 "module_bay", 

249 "module_bay__parent", 

250 "module_bay__module", 

251 "module_bay__module__module_bay", 

252 "module_bay__module__module_bay__parent", 

253 "module_bay__module__module_bay__module", 

254 ).get(pk=module.pk) 

255 templates = InterfaceTemplate.objects.filter(module_type=module_fresh.module_type) 

256 return {tmpl.resolve_name(module_fresh) for tmpl in templates} 

257 

258 

259def _flag_rule_potentially_deprecated(rule): 

260 """Tag a rule as 'potentially-deprecated' when its rename is a no-op. 

261 

262 Called from apply_interface_name_rules when a matching rule produces no 

263 renames because NetBox already generates the correct interface names. This 

264 may indicate the rule is no longer needed (e.g. after a NetBox upgrade that 

265 improved template resolution), or only needed for a subset of module types. 

266 

267 Adds a NetBox Tag 'potentially-deprecated' so the rule is visually flagged 

268 in the UI for operator review. Failures are logged but never re-raised so 

269 the install path is not disrupted. 

270 """ 

271 try: 

272 from extras.models import Tag 

273 

274 tag, _ = Tag.objects.get_or_create( 

275 slug="potentially-deprecated", 

276 defaults={"name": "potentially-deprecated", "color": "ffc107"}, 

277 ) 

278 rule.tags.add(tag) 

279 logger.info( 

280 "Rule '%s' flagged as potentially-deprecated: NetBox already generates the correct interface names.", 

281 rule, 

282 ) 

283 except Exception: 

284 logger.exception("Failed to flag rule '%s' as potentially-deprecated.", rule) 

285 

286 

287def _scope_filter(field: str, value) -> dict: 

288 """Return ``{field: value}`` or ``{field__isnull: True}`` for a nullable FK. 

289 

290 Avoids repeating the ``if value is None`` pattern throughout rule lookup. 

291 """ 

292 if value is None: 

293 return {f"{field}__isnull": True} 

294 return {field: value} 

295 

296 

297def _build_candidates(parent_module_type, device_type, platform) -> list: 

298 """Build ordered list of (pmt, dt, pl) tuples from most to least specific. 

299 

300 Each argument expands to ``[value, None]`` when provided, or ``[None]`` 

301 when already absent. Deduplication ensures no key appears twice (which 

302 would happen when multiple inputs are None). 

303 """ 

304 seen: set = set() 

305 candidates = [] 

306 pmt_opts = [parent_module_type, None] if parent_module_type else [None] 

307 dt_opts = [device_type, None] if device_type else [None] 

308 pl_opts = [platform, None] if platform else [None] 

309 for pmt in pmt_opts: 

310 for dt in dt_opts: 

311 for pl in pl_opts: 

312 key = (pmt, dt, pl) 

313 if key not in seen: 

314 seen.add(key) 

315 candidates.append(key) 

316 return candidates 

317 

318 

319def _find_exact_match(module_type, candidates): 

320 """Tier 1: return the first enabled exact-FK rule in specificity order, or None.""" 

321 from .models import InterfaceNameRule 

322 

323 for pmt, dt, pl in candidates: 

324 filters = {"module_type": module_type, "module_type_is_regex": False, "enabled": True} 

325 filters.update(_scope_filter("parent_module_type", pmt)) 

326 filters.update(_scope_filter("device_type", dt)) 

327 filters.update(_scope_filter("platform", pl)) 

328 rule = InterfaceNameRule.objects.filter(**filters).first() 

329 if rule: 

330 return rule 

331 return None 

332 

333 

334def _find_regex_match(model_name: str, candidates): 

335 """Tier 2: return the first enabled regex rule whose pattern fullmatches *model_name*, or None. 

336 

337 Tries candidates in specificity order; within each level longer patterns 

338 are tried first (more specific). Invalid regex patterns are silently skipped. 

339 """ 

340 from .models import InterfaceNameRule 

341 

342 for pmt, dt, pl in candidates: 

343 filters = {"module_type_is_regex": True, "enabled": True} 

344 filters.update(_scope_filter("parent_module_type", pmt)) 

345 filters.update(_scope_filter("device_type", dt)) 

346 filters.update(_scope_filter("platform", pl)) 

347 qs = ( 

348 InterfaceNameRule.objects.filter(**filters) 

349 .annotate(pattern_length=Length("module_type_pattern")) 

350 .order_by("-pattern_length", "pk") 

351 ) 

352 for rule in qs: 

353 try: 

354 if re.fullmatch(rule.module_type_pattern, model_name): 

355 return rule 

356 except re.error: 

357 continue 

358 return None 

359 

360 

361def find_matching_rule(module_type, parent_module_type, device_type, platform=None): 

362 """Find the most specific InterfaceNameRule matching the context. 

363 

364 Uses a two-tier strategy: 

365 Tier 1 — Exact FK match (priority order, most specific first): 

366 Iterates all combinations of (parent_module_type, device_type, platform) 

367 from fully-constrained to fully-unconstrained (None = any). 

368 Tier 2 — Regex pattern match (same priority order, longer patterns first): 

369 Same specificity cascade, but module_type_pattern is matched via 

370 re.fullmatch() against module_type.model. Patterns are iterated 

371 from longest to shortest to prefer more specific patterns. 

372 

373 Returns the first matching rule, or None if no rule matches. 

374 """ 

375 candidates = _build_candidates(parent_module_type, device_type, platform) 

376 return _find_exact_match(module_type, candidates) or _find_regex_match(module_type.model, candidates) 

377 

378 

379def _extract_trailing_digits(s: str) -> str: 

380 r"""Return the trailing digit run of *s* without regex backtracking. 

381 

382 Pure O(n) string scan — eliminates the polynomial backtracking risk that 

383 arises from using ``re.search(r"(\d+)$", ...)`` on strings ending in a 

384 non-digit character (e.g. ``"1" * n + "x"`` would cause O(n²) steps). 

385 

386 Returns an empty string when *s* has no trailing digits. 

387 """ 

388 i = len(s) 

389 while i > 0 and s[i - 1].isdigit(): 

390 i -= 1 

391 return s[i:] 

392 

393 

394def _resolve_bay_position(module_bay): 

395 """Return (bay_position, bay_position_num) from a module bay's position field. 

396 

397 Handles template expressions like ``{module}`` by extracting the trailing 

398 digit from the bay name. Falls back to ``"0"`` if no digit is found. 

399 """ 

400 bay_position = module_bay.position or "0" 

401 if bay_position.startswith("{"): 

402 digits = _extract_trailing_digits(module_bay.name) 

403 bay_position = digits if digits else "0" 

404 digits = _extract_trailing_digits(bay_position) 

405 bay_position_num = digits if digits else "0" 

406 return bay_position, bay_position_num 

407 

408 

409def _resolve_slot(module_bay, bay_position_num, parent_bay_position): 

410 """Return the ``slot`` variable from the module bay hierarchy. 

411 

412 When the bay has a parent bay, slot comes from the parent (or grandparent 

413 when two levels of nesting exist). When the bay belongs to an installed 

414 module with its own bay, slot comes from that module's bay position. 

415 Falls back to ``bay_position_num``. 

416 """ 

417 if module_bay.parent: 

418 parent_bay = module_bay.parent 

419 if parent_bay.parent and hasattr(parent_bay.parent, "installed_module"): 

420 return parent_bay.parent.position or parent_bay_position 

421 return parent_bay_position 

422 if hasattr(module_bay, "module") and module_bay.module: 

423 owner_module = module_bay.module 

424 if hasattr(owner_module, "module_bay") and owner_module.module_bay: 

425 return owner_module.module_bay.position or bay_position_num 

426 return bay_position_num 

427 

428 

429def build_variables(module_bay, device=None): 

430 """Build template variable dict from a module bay's position context. 

431 

432 Extracts numeric and raw position values from the bay and its parent chain, 

433 producing the variables available for name_template substitution. 

434 

435 Returns a dict with keys: slot, bay_position, bay_position_num, 

436 parent_bay_position, sfp_slot, and optionally vc_position. 

437 

438 ``vc_position`` is only injected when *device* is a Virtual Chassis member 

439 (device.virtual_chassis_id is set). Templates using ``{vc_position}`` on a 

440 non-VC device will raise ValueError during evaluation — this is intentional. 

441 Note: Juniper VC positions start at 0, so 0 is a valid real-world value and 

442 cannot be used as a "not in VC" sentinel. 

443 """ 

444 bay_position, bay_position_num = _resolve_bay_position(module_bay) 

445 

446 parent_bay_position = "0" 

447 if module_bay.parent: 

448 parent_bay_position = module_bay.parent.position or "0" 

449 

450 slot = _resolve_slot(module_bay, bay_position_num, parent_bay_position) 

451 

452 result = { 

453 "slot": slot, 

454 "bay_position": bay_position, 

455 "bay_position_num": bay_position_num, 

456 "parent_bay_position": parent_bay_position, 

457 "sfp_slot": bay_position_num, 

458 } 

459 if ( 

460 device is not None 

461 and getattr(device, "virtual_chassis_id", None) is not None 

462 and device.vc_position is not None 

463 ): 

464 result["vc_position"] = str(device.vc_position) 

465 return result 

466 

467 

468def _apply_rule_to_interface(rule, iface, variables, module): 

469 """Apply a single rule to an interface, handling breakout channels. 

470 

471 All saves are wrapped in a transaction so a failure mid-breakout rolls 

472 back any partially created interfaces. 

473 

474 Returns the number of interfaces renamed/created. 

475 """ 

476 from dcim.models import Interface 

477 

478 count = 0 

479 

480 with transaction.atomic(): 

481 if rule.channel_count > 0: 

482 # Breakout: rename base interface and create additional channel interfaces 

483 for ch in range(rule.channel_count): 

484 variables["channel"] = str(rule.channel_start + ch) 

485 new_name = evaluate_name_template(rule.name_template, variables) 

486 if ch == 0: 

487 if new_name != iface.name: 

488 iface.name = new_name 

489 iface.full_clean() 

490 iface.save() 

491 count += 1 

492 else: 

493 if not Interface.objects.filter(module=module, name=new_name).exists(): 

494 breakout_iface = Interface( 

495 device=module.device, 

496 module=module, 

497 name=new_name, 

498 type=iface.type, 

499 enabled=iface.enabled, 

500 ) 

501 breakout_iface.full_clean() 

502 breakout_iface.save() 

503 count += 1 

504 else: 

505 # Simple rename (converter offset, platform naming, etc.) 

506 new_name = evaluate_name_template(rule.name_template, variables) 

507 if new_name != iface.name: 

508 iface.name = new_name 

509 iface.full_clean() 

510 iface.save() 

511 count += 1 

512 

513 return count 

514 

515 

516def _find_channel_base(rule, ifaces, variables): 

517 """Find the best 'base' interface for a channel rule on a single module. 

518 

519 Prefers an interface whose current name already equals the expected ch=0 name 

520 (i.e. it has already been renamed to channel 0 and is safe to re-process). 

521 Falls back to the first interface (alphabetically) so that on first apply, 

522 the template-created base interface becomes channel 0. 

523 

524 This ensures apply_rule_to_existing / find_interfaces_for_rule call 

525 _apply_rule_to_interface exactly ONCE per module for channel rules, preventing 

526 duplicate-name IntegrityErrors when channels already exist. 

527 """ 

528 if not ifaces: 

529 return None 

530 for iface in ifaces: 

531 vars_copy = dict(variables) 

532 vars_copy["base"] = iface.name 

533 vars_copy["channel"] = str(rule.channel_start) # ch=0 

534 try: 

535 ch0_name = evaluate_name_template(rule.name_template, vars_copy) 

536 if iface.name == ch0_name: 

537 return iface 

538 except ValueError: 

539 pass 

540 return ifaces[0] 

541 

542 

543def _matching_moduletype_pks(module_type_pattern): 

544 """Return PKs of ModuleTypes whose model name matches the given regex pattern. 

545 

546 Raises ValueError for invalid regex patterns, mirroring evaluate_name_template's 

547 error-handling convention so callers can treat both as ValueError. 

548 """ 

549 from dcim.models import ModuleType 

550 

551 try: 

552 compiled = re.compile(module_type_pattern) 

553 except re.error as exc: 

554 raise ValueError(f"Invalid module_type_pattern regex '{module_type_pattern}': {exc}") from exc 

555 return [mt.pk for mt in ModuleType.objects.only("pk", "model") if compiled.fullmatch(mt.model)] 

556 

557 

558def has_applicable_interfaces(rule) -> bool: 

559 """Check whether applying this rule right now would rename at least one interface. 

560 

561 Calls find_interfaces_for_rule(limit=1) to determine if any currently installed 

562 interface would receive a new name. Returns False when: 

563 - no matching modules/interfaces are installed, OR 

564 - all matching interfaces are already correctly named. 

565 

566 This is more expensive than a plain EXISTS query but ensures the Applicable 

567 column in the Apply Rules list accurately reflects "would something change?" 

568 rather than the misleading "do interfaces exist?". 

569 """ 

570 try: 

571 results, _ = find_interfaces_for_rule(rule, limit=1) 

572 return len(results) > 0 

573 except (ValueError, re.error): 

574 return False 

575 

576 

577def _build_module_qs(rule): 

578 """Return a Module queryset filtered to the rule's scope (module type, parent, device, platform). 

579 

580 Shared by ``find_interfaces_for_rule`` and ``apply_rule_to_existing`` to avoid 

581 duplicating the filtering logic. 

582 """ 

583 from dcim.models import Module 

584 

585 if rule.module_type_is_regex: 

586 qs = Module.objects.filter(module_type__in=_matching_moduletype_pks(rule.module_type_pattern)) 

587 else: 

588 qs = Module.objects.filter(module_type=rule.module_type) 

589 if rule.parent_module_type: 

590 qs = qs.filter(module_bay__parent__installed_module__module_type=rule.parent_module_type) 

591 if rule.device_type: 

592 qs = qs.filter(device__device_type=rule.device_type) 

593 if rule.platform: 

594 qs = qs.filter(device__platform=rule.platform) 

595 return qs 

596 

597 

598def _evaluate_plain_interface(rule, module, iface, variables) -> dict | None: 

599 """Return a result dict if *iface* would be renamed by *rule*, else None.""" 

600 vars_copy = {**variables, "base": iface.name} 

601 try: 

602 new_name = evaluate_name_template(rule.name_template, vars_copy) 

603 except ValueError as exc: 

604 new_name = f"<error: {exc}>" 

605 if new_name == iface.name: 

606 return None 

607 return {"module": module, "interface": iface, "current_name": iface.name, "new_names": [new_name]} 

608 

609 

610def _channel_rule_entry(rule, module, ifaces, variables) -> dict | None: 

611 """Return a result dict if the channel rule would change any name for this module, else None.""" 

612 base_iface = _find_channel_base(rule, ifaces, variables) 

613 if base_iface is None: 

614 return None 

615 vars_copy = {**variables, "base": base_iface.name} 

616 expected_names = [] 

617 try: 

618 for ch in range(rule.channel_count): 

619 expected_names.append( 

620 evaluate_name_template(rule.name_template, {**vars_copy, "channel": str(rule.channel_start + ch)}) 

621 ) 

622 except ValueError as exc: 

623 expected_names = [f"<error: {exc}>"] 

624 existing_names = {i.name for i in ifaces} 

625 # Report if any channel name is missing or the base itself needs renaming 

626 if any(n not in existing_names for n in expected_names) or ( 

627 expected_names and expected_names[0] != base_iface.name 

628 ): 

629 return {"module": module, "interface": base_iface, "current_name": base_iface.name, "new_names": expected_names} 

630 return None 

631 

632 

633def _count_remaining_interfaces(module_qs, processed_pks) -> int: 

634 """Count interfaces in modules not yet visited during a find_interfaces_for_rule scan.""" 

635 from dcim.models import Interface 

636 

637 return Interface.objects.filter(module__in=module_qs.exclude(pk__in=processed_pks)).count() 

638 

639 

640def _process_channel_module(rule, module, ifaces, variables, limit, results, module_qs, processed_pks): 

641 """Process one module for a channel rule. Returns (checked_count, should_stop).""" 

642 checked = len(ifaces) 

643 if not ifaces: 

644 return checked, False 

645 entry = _channel_rule_entry(rule, module, ifaces, variables) 

646 if entry: 

647 results.append(entry) 

648 if limit is not None and len(results) >= limit: 

649 return checked + _count_remaining_interfaces(module_qs, processed_pks), True 

650 return checked, False 

651 

652 

653def _process_plain_module(rule, module, ifaces, variables, limit, results, module_qs, processed_pks): 

654 """Process one module for a plain (non-channel) rule. Returns (checked_count, should_stop).""" 

655 checked = 0 

656 for iface_idx, iface in enumerate(ifaces): 

657 checked += 1 

658 entry = _evaluate_plain_interface(rule, module, iface, variables) 

659 if entry: 

660 results.append(entry) 

661 if limit is not None and len(results) >= limit: 

662 checked += len(ifaces) - (iface_idx + 1) 

663 checked += _count_remaining_interfaces(module_qs, processed_pks) 

664 return checked, True 

665 return checked, False 

666 

667 

668def find_interfaces_for_rule(rule, limit=None): 

669 """Find interfaces that would be renamed by applying the given rule retroactively. 

670 

671 Searches for all Module instances matching the rule's criteria and computes 

672 what their interfaces would be renamed to. 

673 

674 Returns a tuple ``(results, total_checked)`` where *results* is a list of dicts:: 

675 

676 { 

677 "module": Module instance, 

678 "interface": Interface instance, 

679 "current_name": str, 

680 "new_names": list[str], # one entry per channel, or single-element 

681 } 

682 

683 Only includes entries where at least one new_name differs from current_name. 

684 If *limit* is set the list is truncated after that many changed entries, but 

685 *total_checked* always reflects the full count of interfaces examined. 

686 """ 

687 from collections import defaultdict 

688 

689 from dcim.models import Interface 

690 

691 module_qs = _build_module_qs(rule).select_related( 

692 "module_type", 

693 "device", 

694 "device__device_type", 

695 "device__platform", 

696 "device__virtual_chassis", 

697 "module_bay", 

698 "module_bay__parent", 

699 ) 

700 process_fn = _process_channel_module if rule.channel_count > 0 else _process_plain_module 

701 

702 # Batch-load all interfaces for matching modules to avoid N+1 queries. 

703 ifaces_by_module = defaultdict(list) 

704 for iface in Interface.objects.filter(module__in=module_qs).order_by("module_id", "name"): 

705 ifaces_by_module[iface.module_id].append(iface) 

706 

707 processed_pks = set() 

708 results = [] 

709 total_checked = 0 

710 for module in module_qs: 

711 processed_pks.add(module.pk) 

712 variables = build_variables(module.module_bay, device=module.device) 

713 ifaces = ifaces_by_module.get(module.pk, []) 

714 checked, stop = process_fn(rule, module, ifaces, variables, limit, results, module_qs, processed_pks) 

715 total_checked += checked 

716 if stop: 

717 return results, total_checked 

718 

719 return results, total_checked 

720 

721 

722def apply_rule_to_existing(rule, limit=None, interface_ids=None): 

723 """Apply a rule retroactively to all matching installed modules. 

724 

725 Unlike apply_interface_name_rules(), this does not skip already-renamed 

726 interfaces — it re-evaluates every interface on each matching module. 

727 

728 For channel rules (channel_count > 0), each module is processed as a single 

729 unit using _find_channel_base() to pick the base interface. Calling 

730 _apply_rule_to_interface for every interface in the module would produce 

731 duplicate-name IntegrityErrors when channel interfaces already exist. 

732 

733 If *interface_ids* is provided (list/set of Interface PKs), only those 

734 interfaces are processed; all others are skipped. For channel rules the 

735 base interface PK is used as the selector. An empty *interface_ids* 

736 collection returns 0 immediately without touching the database. 

737 

738 Returns the number of interfaces renamed/created. 

739 """ 

740 from collections import defaultdict 

741 

742 from dcim.models import Interface 

743 

744 id_set = frozenset(interface_ids) if interface_ids is not None else None 

745 if id_set is not None and not id_set: 

746 return 0 

747 

748 if not rule.enabled: 

749 return 0 

750 

751 module_qs = _build_module_qs(rule) 

752 

753 # Batch-load interfaces to avoid N+1 queries in the module loop. 

754 ifaces_by_module = defaultdict(list) 

755 for iface in Interface.objects.filter(module__in=module_qs).order_by("module_id", "name"): 

756 ifaces_by_module[iface.module_id].append(iface) 

757 

758 count = 0 

759 for module in module_qs.select_related("module_bay", "module_type", "device", "device__virtual_chassis"): 

760 variables = build_variables(module.module_bay, device=module.device) 

761 ifaces = ifaces_by_module.get(module.pk, []) 

762 

763 if rule.channel_count > 0: 

764 # Channel rule: process module ONCE using the best base interface. 

765 # Calling _apply_rule_to_interface for each existing interface would 

766 # attempt to create the same channel names multiple times. 

767 if not ifaces: 

768 continue 

769 base_iface = _find_channel_base(rule, ifaces, variables) 

770 if id_set is not None and base_iface.pk not in id_set: 

771 continue 

772 vars_copy = dict(variables) 

773 vars_copy["base"] = base_iface.name 

774 try: 

775 count += _apply_rule_to_interface(rule, base_iface, vars_copy, module) 

776 except (ValueError, ValidationError, IntegrityError): 

777 logger.exception( 

778 "Failed to apply channel rule '%s' to module '%s' (id=%s); skipping.", 

779 rule, 

780 module, 

781 module.pk, 

782 ) 

783 else: 

784 for iface in ifaces: 

785 if id_set is not None and iface.pk not in id_set: 

786 continue 

787 vars_copy = dict(variables) 

788 vars_copy["base"] = iface.name 

789 try: 

790 count += _apply_rule_to_interface(rule, iface, vars_copy, module) 

791 except (ValueError, ValidationError, IntegrityError): 

792 logger.exception( 

793 "Failed to apply rule '%s' to interface '%s' (id=%s); skipping.", 

794 rule, 

795 iface.name, 

796 iface.pk, 

797 ) 

798 continue 

799 

800 if limit is not None and count >= limit: 

801 return count 

802 

803 return count 

804 

805 

806def evaluate_name_template(template: str, variables: dict) -> str: 

807 """Evaluate a name template with variable substitution and safe arithmetic. 

808 

809 Supports templates like: 

810 "GigabitEthernet{slot}/{8 + ({parent_bay_position} - 1) * 2 + {sfp_slot}}" 

811 

812 Variables are substituted first, then any brace-enclosed expression 

813 containing arithmetic operators is safely evaluated via AST. True division 

814 (/) is not allowed — use floor division (//) instead. Results are cast to 

815 int to ensure interface names are always whole numbers. 

816 """ 

817 # First pass: substitute all simple variables 

818 result = template 

819 for key, value in variables.items(): 

820 result = result.replace(f"{{{key}}}", str(value)) 

821 

822 # Second pass: evaluate any remaining brace-enclosed arithmetic expressions 

823 def _eval_expr(match): 

824 expr = match.group(1).strip() 

825 # Allow digits, arithmetic operators (excluding lone /), parens, whitespace. 

826 # Negative lookahead disallows a single / that is not part of //. 

827 if not re.match(r"^(?!.*(?<!/)/(?!/))[\d\s\+\-\*\(\/\)]+$", expr): 

828 raise ValueError(f"Unsafe expression in name template: {expr}") 

829 try: 

830 node = ast.parse(expr, mode="eval") 

831 for child in ast.walk(node): 

832 if not isinstance( 

833 child, 

834 ( 

835 ast.Expression, 

836 ast.BinOp, 

837 ast.UnaryOp, 

838 ast.Constant, 

839 ast.Add, 

840 ast.Sub, 

841 ast.Mult, 

842 ast.FloorDiv, 

843 ast.USub, 

844 ast.UAdd, 

845 ), 

846 ): 

847 raise ValueError(f"Unsafe AST node in expression: {type(child).__name__}") 

848 return str(int(eval(compile(node, "<template>", "eval")))) # noqa: S307 

849 except (SyntaxError, TypeError) as e: 

850 raise ValueError(f"Invalid arithmetic expression '{expr}': {e}") from e 

851 

852 return re.sub(r"\{([^}]+)\}", _eval_expr, result)