Skip to content

Medha (Core)

The Medha class is the primary entry point. It owns the waterfall search pipeline, coordinates the vector backend, L1 cache, and embedder, and exposes all cache operations as async methods.

Construct it as an async context manager to ensure clean startup and shutdown of the backend connection and background cleanup task.

from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter

async with Medha(
    collection="my_cache",
    embedder=FastEmbedAdapter(),
    settings=Settings(backend_type="qdrant"),
) as cache:
    await cache.store("question", "SELECT ...")
    hit = await cache.search("question")

Medha

Semantic Memory for AI Text-to-Query systems.

Provides a multi-tier "waterfall" cache search strategy to maximize cache hits and minimize LLM costs.

Parameters:

Name Type Description Default
collection_name str

Name of the cache collection.

required
embedder BaseEmbedder

An instance of BaseEmbedder (e.g., FastEmbedAdapter).

required
backend VectorStorageBackend | None

An instance of VectorStorageBackend (e.g., QdrantBackend). If None, creates a QdrantBackend from settings.

None
settings Settings | None

Configuration. If None, loads from environment.

None
templates list[QueryTemplate] | None

Pre-loaded query templates. If None, loads from settings.template_file (if configured).

None
Example

from medha import Medha, Settings from medha.embeddings.fastembed_adapter import FastEmbedAdapter

embedder = FastEmbedAdapter() medha = Medha(collection_name="my_cache", embedder=embedder) await medha.start()

Store a question-query pair

await medha.store("How many users?", "SELECT COUNT(*) FROM users")

hit = await medha.search("Show me user count") print(hit.generated_query) # "SELECT COUNT(*) FROM users" print(hit.strategy) # SearchStrategy.SEMANTIC_MATCH

await medha.close()

Source code in src/medha/core.py
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
class Medha:
    """Semantic Memory for AI Text-to-Query systems.

    Provides a multi-tier "waterfall" cache search strategy to maximize
    cache hits and minimize LLM costs.

    Args:
        collection_name: Name of the cache collection.
        embedder: An instance of BaseEmbedder (e.g., FastEmbedAdapter).
        backend: An instance of VectorStorageBackend (e.g., QdrantBackend).
            If None, creates a QdrantBackend from settings.
        settings: Configuration. If None, loads from environment.
        templates: Pre-loaded query templates. If None, loads from
            settings.template_file (if configured).

    Example:
        >>> from medha import Medha, Settings
        >>> from medha.embeddings.fastembed_adapter import FastEmbedAdapter
        >>>
        >>> embedder = FastEmbedAdapter()
        >>> medha = Medha(collection_name="my_cache", embedder=embedder)
        >>> await medha.start()
        >>>
        >>> # Store a question-query pair
        >>> await medha.store("How many users?", "SELECT COUNT(*) FROM users")
        >>>
        >>> # Search
        >>> hit = await medha.search("Show me user count")
        >>> print(hit.generated_query)  # "SELECT COUNT(*) FROM users"
        >>> print(hit.strategy)         # SearchStrategy.SEMANTIC_MATCH
        >>>
        >>> await medha.close()
    """

    def __init__(
        self,
        collection_name: str,
        embedder: BaseEmbedder,
        backend: VectorStorageBackend | None = None,
        settings: Settings | None = None,
        templates: list[QueryTemplate] | None = None,
        l1_backend: L1CacheBackend | None = None,
    ):
        self._collection_name = collection_name
        self._template_collection = f"__medha_templates_{collection_name}"
        self._embedder = embedder
        self._settings = settings or Settings()
        self._templates = templates or []

        # Backend: use provided instance or build from settings.backend_type
        self._backend = backend if backend is not None else self._build_backend()

        # L1 cache (Tier 0) — pluggable: in-memory (default) or Redis
        if l1_backend is not None:
            self._l1_backend = l1_backend
        else:
            from medha.l1_cache.memory import InMemoryL1Cache
            self._l1_backend = InMemoryL1Cache(max_size=self._settings.l1_cache_max_size)

        # Embedding cache (avoids redundant embedding calls)
        self._embedding_cache: OrderedDict[str, list[float]] = OrderedDict()
        self._embedding_cache_max = 10000
        self._embedding_cache_lock = asyncio.Lock()

        # Deduplication: tracks in-flight embedding computations
        self._pending_embeddings: dict[str, asyncio.Future[list[float]]] = {}

        # NLP parameter extractor
        self._param_extractor = ParameterExtractor()

        # Stats
        self._stats = _StatsCollector(
            enabled=self._settings.collect_stats,
            max_latency_samples=self._settings.stats_max_latency_samples,
        )
        self._total_stored = 0
        self._warm_loaded = 0
        self._cleanup_task: asyncio.Task[None] | None = None
        self._known_collections = [self._collection_name]

    # --- Private helpers ---

    def _build_backend(self) -> VectorStorageBackend:
        """Instantiate the correct vector backend from settings.backend_type.

        Called only when no backend is passed to __init__. Allows users to
        configure the backend via Settings without importing backend classes.

        Returns:
            A VectorStorageBackend instance (not yet connected/initialized).

        Raises:
            ConfigurationError: If backend_type is unknown or required deps are missing.
        """
        bt = self._settings.backend_type
        if bt == "qdrant":
            from medha.backends.qdrant import QdrantBackend
            return QdrantBackend(self._settings)
        elif bt == "memory":
            from medha.backends.memory import InMemoryBackend
            return InMemoryBackend()
        elif bt == "pgvector":
            from medha.backends.pgvector import PgVectorBackend
            return PgVectorBackend(self._settings)
        elif bt == "elasticsearch":
            from medha.backends.elasticsearch import ElasticsearchBackend
            return ElasticsearchBackend(self._settings)
        elif bt == "vectorchord":
            from medha.backends.vectorchord import VectorChordBackend
            return VectorChordBackend(self._settings)
        elif bt == "chroma":
            from medha.backends.chroma import ChromaBackend
            return ChromaBackend(self._settings)
        elif bt == "weaviate":
            from medha.backends.weaviate import WeaviateBackend
            return WeaviateBackend(self._settings)
        elif bt == "redis":
            from medha.backends.redis_vector import RedisVectorBackend
            return RedisVectorBackend(self._settings)
        elif bt == "azure-search":
            from medha.backends.azure_search import AzureSearchBackend
            return AzureSearchBackend(self._settings)
        elif bt == "lancedb":
            from medha.backends.lancedb import LanceDBBackend
            return LanceDBBackend(self._settings)
        else:
            raise ConfigurationError(f"Unknown backend_type: '{bt}'")

    # --- Lifecycle ---

    async def start(self) -> None:
        """Initialize the backend and sync templates.

        Must be called before search/store operations.

        Steps:
            1. Connect to the vector backend.
            2. Initialize the main collection.
            3. Initialize the template collection.
            4. Load templates from file (if configured).
            5. Sync templates to the template collection.
        """
        logger.debug(
            "Starting Medha: collection='%s', embedder=%s, backend=%s",
            self._collection_name,
            type(self._embedder).__name__,
            type(self._backend).__name__,
        )
        if hasattr(self._backend, "connect"):
            await self._backend.connect()

        dimension = self._embedder.dimension
        logger.debug("Embedder dimension: %d", dimension)

        await self._backend.initialize(self._collection_name, dimension)
        await self._backend.initialize(self._template_collection, dimension)

        # Warn once if a legacy-named template collection still exists
        legacy_collection = f"{self._collection_name}_templates"
        try:
            legacy_count = await self._backend.count(legacy_collection)
            if legacy_count > 0:
                logger.warning(
                    "Legacy template collection '%s' found with %d entries. "
                    "Templates will be re-synced to '%s'. "
                    "Delete the old collection manually when ready.",
                    legacy_collection,
                    legacy_count,
                    self._template_collection,
                )
        except StorageError:
            pass  # Old collection does not exist — fresh deployment

        if self._settings.template_file and not self._templates:
            await self.load_templates_from_file(self._settings.template_file)

        if self._templates:
            await self._sync_templates_to_backend()

        # Load persistent embedding cache from disk (if configured)
        if self._settings.embedding_cache_path:
            self._load_embedding_cache_from_disk()

        if self._settings.cleanup_interval_seconds:
            self._cleanup_task = asyncio.create_task(self._cleanup_loop())

        logger.info(
            "Medha started: collection='%s', templates=%d",
            self._collection_name,
            len(self._templates),
        )

    async def close(self) -> None:
        """Shut down the backend and release resources."""
        if self._cleanup_task is not None:
            self._cleanup_task.cancel()
            with suppress(asyncio.CancelledError):
                await self._cleanup_task
            self._cleanup_task = None
        if self._settings.embedding_cache_path:
            self._save_embedding_cache_to_disk()
        await self._l1_backend.close()
        await self._backend.close()
        logger.info("Medha closed")

    async def __aenter__(self) -> Medha:
        await self.start()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: object,
    ) -> bool:
        await self.close()
        return False

    # --- Waterfall Search ---

    async def search(self, question: str) -> CacheHit:
        """Search the cache using the waterfall strategy.

        Tiers (checked in order, first hit wins):
            0. L1 In-Memory Cache (exact hash match)
            1. Template Matching (intent recognition + parameter extraction)
            2. Exact Vector Match (score >= score_threshold_exact)
            3. Semantic Similarity (score >= score_threshold_semantic)
            4. Fuzzy Matching (Levenshtein distance, optional)

        Args:
            question: Natural language question from the user.

        Returns:
            CacheHit with the matched query, confidence, and strategy.
            Returns CacheHit(strategy=NO_MATCH) if no tier matches.
        """
        t0 = time.monotonic()
        result = await self._search_impl(question)
        latency_ms = (time.monotonic() - t0) * 1000
        await self._stats.record(result.strategy, latency_ms)
        return result

    async def _search_impl(self, question: str) -> CacheHit:
        try:
            if not question or not question.strip():
                logger.warning("Search called with empty question")
                return CacheHit(strategy=SearchStrategy.ERROR)

            if len(question) > self._settings.max_question_length:
                logger.warning(
                    "Search rejected: question length %d exceeds max %d",
                    len(question),
                    self._settings.max_question_length,
                )
                return CacheHit(strategy=SearchStrategy.ERROR)

            logger.debug("Search started for: '%s'", question[:80])

            # --- Tier 0: L1 Cache ---
            l1_hit = await self._check_l1_cache(question)
            if l1_hit:
                logger.debug("Tier 0 L1 cache HIT for: '%s'", question[:50])
                return l1_hit
            logger.debug("Tier 0 L1 cache MISS")

            # --- Tier 1: Template Matching ---
            template_hit = await self._search_templates(question)
            if template_hit:
                await self._store_in_l1(question, template_hit)
                logger.debug(
                    "Tier 1 template HIT: template='%s', confidence=%.3f",
                    template_hit.template_used,
                    template_hit.confidence,
                )
                return template_hit
            logger.debug("Tier 1 template MISS")

            # --- Get embedding (shared by Tier 2, 3) ---
            embedding = await self._get_embedding(question)
            if embedding is None:
                logger.error("Embedding failed, aborting search for: '%s'", question[:50])
                return CacheHit(strategy=SearchStrategy.ERROR)

            # --- Tier 2 + 3: Exact and Semantic in parallel ---
            # Running them concurrently reduces wall-clock latency from
            # ~(t_exact + t_semantic) to ~max(t_exact, t_semantic).
            exact_hit, semantic_hit = await asyncio.gather(
                self._search_exact(embedding),
                self._search_semantic(embedding),
            )

            if exact_hit:
                await self._store_in_l1(question, exact_hit)
                logger.debug(
                    "Tier 2 exact HIT: confidence=%.4f, query='%s'",
                    exact_hit.confidence,
                    exact_hit.generated_query[:50] if exact_hit.generated_query else "",
                )
                return exact_hit
            logger.debug("Tier 2 exact MISS (threshold=%.2f)", self._settings.score_threshold_exact)

            if semantic_hit:
                await self._store_in_l1(question, semantic_hit)
                logger.debug(
                    "Tier 3 semantic HIT: confidence=%.4f, query='%s'",
                    semantic_hit.confidence,
                    semantic_hit.generated_query[:50] if semantic_hit.generated_query else "",
                )
                return semantic_hit
            logger.debug("Tier 3 semantic MISS (threshold=%.2f)", self._settings.score_threshold_semantic)

            # --- Tier 4: Fuzzy Matching ---
            fuzzy_hit = await self._search_fuzzy(question, embedding)
            if fuzzy_hit:
                await self._store_in_l1(question, fuzzy_hit)
                logger.debug("Tier 4 fuzzy HIT: confidence=%.4f", fuzzy_hit.confidence)
                return fuzzy_hit
            logger.debug("Tier 4 fuzzy MISS (threshold=%.1f)", self._settings.score_threshold_fuzzy)

            logger.debug("All tiers exhausted, NO_MATCH for: '%s'", question[:50])
            return CacheHit(strategy=SearchStrategy.NO_MATCH)

        except Exception as e:
            logger.error("Search failed for '%s': %s", question[:50], e, exc_info=True)
            return CacheHit(strategy=SearchStrategy.ERROR)

    # --- Tier Implementations ---

    async def _check_l1_cache(self, question: str) -> CacheHit | None:
        """Check the L1 cache (pluggable backend).

        Key: MD5 hash of normalized question.
        Returns: CacheHit with strategy=L1_CACHE if found and not expired, None otherwise.
        """
        key = question_hash(question)
        hit = await self._l1_backend.get(key)
        if hit is not None:
            if hit.expires_at is not None and hit.expires_at <= datetime.now(timezone.utc):
                await self._l1_backend.invalidate(key)
                return None
            return hit.model_copy(update={"strategy": SearchStrategy.L1_CACHE})
        return None

    async def _store_in_l1(self, question: str, hit: CacheHit) -> None:
        """Store a result in the L1 cache."""
        key = question_hash(question)
        await self._l1_backend.set(key, hit)
        logger.debug(
            "L1 cache store: key=%s, strategy=%s",
            key[:8],
            hit.strategy.value if hit.strategy else "?",
        )

    async def _search_templates(self, question: str) -> CacheHit | None:
        """Search for template matches using parameter extraction + keyword scoring.

        Iterates over all loaded templates and attempts parameter extraction.
        Templates where all required parameters are successfully extracted are
        scored using keyword overlap and parameter completeness.

        Steps:
            1. For each template, try to extract parameters from the question.
            2. Skip templates where extraction fails or is incomplete.
            3. Score candidates: keyword_overlap * 0.5 + param_completeness * 0.3
               + priority_bonus.
            4. Return the best match above the configured threshold.
        """
        if not self._templates:
            logger.debug("Template search skipped: no templates loaded")
            return None

        best_hit: CacheHit | None = None
        best_score = 0.0
        normalized = normalize_question(question)

        for template in self._templates:
            # Try to extract parameters
            try:
                params = self._param_extractor.extract(question, template)
            except Exception:
                logger.debug(
                    "Parameter extraction failed for template '%s'", template.intent
                )
                continue

            # All parameters must be extracted for a valid match
            if template.parameters and len(params) != len(template.parameters):
                logger.debug(
                    "Incomplete params for template '%s': got %d/%d",
                    template.intent,
                    len(params),
                    len(template.parameters),
                )
                continue

            # Compute score from keyword overlap + param completeness
            keyword_bonus = keyword_overlap_score(normalized, template.template_text)
            param_completeness = 1.0 if not template.parameters else (
                len(params) / len(template.parameters)
            )
            final_score = (
                (keyword_bonus * 0.5)
                + (param_completeness * 0.3)
            )
            # Priority bonus: priority 1 (highest) gets most bonus
            final_score += (5 - template.priority) * 0.02

            logger.debug(
                "Template '%s': keyword=%.2f, params=%d/%d, score=%.3f",
                template.intent,
                keyword_bonus,
                len(params),
                len(template.parameters) if template.parameters else 0,
                final_score,
            )

            if final_score > best_score:
                best_score = final_score
                try:
                    rendered_query = self._param_extractor.render_query(
                        template, params
                    )
                except Exception:
                    continue

                best_hit = CacheHit(
                    generated_query=rendered_query,
                    confidence=min(final_score, 1.0),
                    strategy=SearchStrategy.TEMPLATE_MATCH,
                    template_used=template.intent,
                )

        if best_hit is None:
            logger.debug("Template search: no template could extract all required parameters")
            return None
        if best_score < self._settings.score_threshold_template:
            logger.debug(
                "Template search: best_score=%.3f below threshold=%.3f",
                best_score,
                self._settings.score_threshold_template,
            )
            return None
        return best_hit

    async def _search_exact(self, embedding: list[float]) -> CacheHit | None:
        """Search for exact vector match (score >= score_threshold_exact).

        Uses settings.score_threshold_exact (default 0.99).
        Returns the top-1 result if above threshold.
        """
        results = await self._backend.search(
            collection_name=self._collection_name,
            vector=embedding,
            limit=1,
            score_threshold=self._settings.score_threshold_exact,
        )
        if results:
            r = results[0]
            return CacheHit(
                generated_query=r.generated_query,
                response_summary=r.response_summary,
                confidence=r.score,
                strategy=SearchStrategy.EXACT_MATCH,
                template_used=r.template_id,
                expires_at=r.expires_at,
            )
        return None

    async def _search_semantic(self, embedding: list[float]) -> CacheHit | None:
        """Search for semantic similarity (score >= score_threshold_semantic).

        Uses settings.score_threshold_semantic (default 0.90).
        Returns the top-1 result with a slight confidence penalty (0.9x).
        """
        results = await self._backend.search(
            collection_name=self._collection_name,
            vector=embedding,
            limit=3,
            score_threshold=self._settings.score_threshold_semantic,
        )
        if results:
            r = results[0]
            return CacheHit(
                generated_query=r.generated_query,
                response_summary=r.response_summary,
                confidence=r.score * 0.9,  # Penalize slightly
                strategy=SearchStrategy.SEMANTIC_MATCH,
                template_used=r.template_id,
                expires_at=r.expires_at,
            )
        return None

    async def _search_fuzzy(
        self, question: str, embedding: list[float] | None = None
    ) -> CacheHit | None:
        """Search using Levenshtein distance (optional, requires rapidfuzz).

        When an embedding is provided, a vector pre-filter is applied first:
        only the top-K most similar candidates (by cosine similarity) are
        considered for fuzzy scoring.  This reduces complexity from O(n) to
        O(top_k) for large collections while preserving recall.

        Falls back to a full collection scroll when no embedding is available.

        Only activated if rapidfuzz is installed.
        """
        try:
            from rapidfuzz import fuzz
        except ImportError:
            logger.debug("Fuzzy search skipped: rapidfuzz not installed")
            return None

        normalized = normalize_question(question)
        threshold = self._settings.score_threshold_fuzzy

        best_match: CacheResult | None = None
        best_score = 0.0
        early_exit_score = 99.0

        if embedding is not None:
            # Fast path: vector pre-filter → fuzzy only on top-K candidates
            candidates = await self._backend.search(
                collection_name=self._collection_name,
                vector=embedding,
                limit=self._settings.fuzzy_prefilter_top_k,
                score_threshold=self._settings.score_threshold_fuzzy_prefilter,
            )
            logger.debug(
                "Fuzzy pre-filter: %d candidates (vector threshold=%.2f, top_k=%d)",
                len(candidates),
                self._settings.score_threshold_fuzzy_prefilter,
                self._settings.fuzzy_prefilter_top_k,
            )
            for r in candidates:
                score = fuzz.ratio(normalized, r.normalized_question)
                if score > best_score and score >= threshold:
                    best_score = score
                    best_match = r
                    if best_score >= early_exit_score:
                        break
        else:
            # Slow path: full collection scroll (no embedding available)
            logger.debug("Fuzzy search: no embedding, falling back to full scroll")
            offset = None
            while True:
                results, offset = await self._backend.scroll(
                    collection_name=self._collection_name,
                    limit=500,
                    offset=offset,
                )
                for r in results:
                    score = fuzz.ratio(normalized, r.normalized_question)
                    if score > best_score and score >= threshold:
                        best_score = score
                        best_match = r
                        if best_score >= early_exit_score:
                            break

                if offset is None or best_score >= early_exit_score:
                    break

        if best_match:
            return CacheHit(
                generated_query=best_match.generated_query,
                response_summary=best_match.response_summary,
                confidence=best_score / 100.0,
                strategy=SearchStrategy.FUZZY_MATCH,
                template_used=best_match.template_id,
                expires_at=best_match.expires_at,
            )
        return None

    # --- Store Operations ---

    async def store(
        self,
        question: str,
        generated_query: str,
        response_summary: str | None = None,
        template_id: str | None = None,
        ttl: int | None = _UNSET,  # type: ignore[assignment]
    ) -> bool:
        """Store a question-query pair in the cache.

        Also stores in L1 cache for immediate subsequent hits.

        Args:
            question: The natural language question.
            generated_query: The SQL/Cypher/GraphQL query.
            response_summary: Optional response summary.
            template_id: Optional template intent identifier.

        Returns:
            True if stored successfully, False otherwise.
        """
        if not question or not question.strip():
            logger.warning("Store skipped: question is empty or whitespace-only")
            return False
        if len(question) > self._settings.max_question_length:
            raise ValueError(
                f"Question length {len(question)} exceeds max_question_length "
                f"({self._settings.max_question_length})"
            )
        if not generated_query or not generated_query.strip():
            logger.warning("Store skipped: generated_query is empty or whitespace-only")
            return False

        try:
            logger.debug("Storing: '%s' -> '%s'", question[:50], generated_query[:50])
            embedding = await self._get_embedding(question)
            if embedding is None:
                logger.error("Store aborted: embedding failed for '%s'", question[:50])
                return False

            resolved_ttl = ttl if ttl is not _UNSET else self._settings.default_ttl_seconds
            expires_at = (
                datetime.now(timezone.utc) + timedelta(seconds=resolved_ttl)
                if resolved_ttl is not None
                else None
            )

            normalized = normalize_question(question)
            entry = CacheEntry(
                id=str(uuid.uuid4()),
                vector=embedding,
                original_question=question,
                normalized_question=normalized,
                generated_query=generated_query,
                query_hash=query_hash(generated_query),
                response_summary=response_summary,
                template_id=template_id,
                expires_at=expires_at,
            )

            await self._backend.upsert(self._collection_name, [entry])

            # Also store in L1
            await self._store_in_l1(
                question,
                CacheHit(
                    generated_query=generated_query,
                    response_summary=response_summary,
                    confidence=1.0,
                    strategy=SearchStrategy.EXACT_MATCH,
                    template_used=template_id,
                    expires_at=expires_at,
                ),
            )

            self._total_stored += 1
            logger.info("Stored: '%s' -> '%s'", question[:50], generated_query[:50])
            return True

        except Exception as e:
            logger.error("Store failed for '%s': %s", question[:50], e, exc_info=True)
            return False

    async def store_batch(self, entries: list[dict[str, Any]]) -> bool:
        """Store multiple question-query pairs efficiently.

        Uses aembed_batch() for a single round-trip to the embedder, then
        upserts all entries and populates the L1 cache.

        Args:
            entries: List of dicts with keys: question, generated_query,
                response_summary (optional), template_id (optional).

        Returns:
            True if all stored successfully, False if embedding or upsert fails.
        """
        if not entries:
            return True

        valid_entries = []
        for i, item in enumerate(entries):
            if not item.get("question", "").strip():
                logger.warning("store_batch: entry %d skipped — empty question", i)
                continue
            if not item.get("generated_query", "").strip():
                logger.warning("store_batch: entry %d skipped — empty generated_query", i)
                continue
            valid_entries.append(item)

        if not valid_entries:
            logger.warning("store_batch: no valid entries to store")
            return False
        entries = valid_entries

        try:
            logger.debug("Batch store started: %d entries", len(entries))

            questions = [item["question"] for item in entries]
            normalized_questions = [normalize_question(q) for q in questions]

            # Single batch embedding call — much faster than N sequential calls
            try:
                coro = self._embedder.aembed_batch(normalized_questions, is_document=True)
                if self._settings.embedding_timeout is not None:
                    coro = asyncio.wait_for(coro, timeout=self._settings.embedding_timeout)
                embeddings = await coro
            except asyncio.TimeoutError:
                logger.error(
                    "Batch store: embedding timed out after %.1fs for %d entries",
                    self._settings.embedding_timeout,
                    len(entries),
                )
                return False
            except EmbeddingError as e:
                logger.error("Batch store: embedding failed: %s", e)
                return False

            # Populate embedding cache with all computed vectors
            async with self._embedding_cache_lock:
                for question, vec in zip(questions, embeddings, strict=False):
                    cache_key = question_hash(question)
                    if len(self._embedding_cache) >= self._embedding_cache_max:
                        self._embedding_cache.popitem(last=False)
                    self._embedding_cache[cache_key] = vec

            # Build CacheEntry objects
            cache_entries = []
            for item, embedding in zip(entries, embeddings, strict=False):
                question = item["question"]
                normalized = normalize_question(question)
                entry = CacheEntry(
                    id=str(uuid.uuid4()),
                    vector=embedding,
                    original_question=question,
                    normalized_question=normalized,
                    generated_query=item["generated_query"],
                    query_hash=query_hash(item["generated_query"]),
                    response_summary=item.get("response_summary"),
                    template_id=item.get("template_id"),
                )
                cache_entries.append(entry)

            await self._backend.upsert(self._collection_name, cache_entries)

            # Populate L1 cache — consistent with store()
            for item in entries:
                await self._store_in_l1(
                    item["question"],
                    CacheHit(
                        generated_query=item["generated_query"],
                        response_summary=item.get("response_summary"),
                        confidence=1.0,
                        strategy=SearchStrategy.EXACT_MATCH,
                        template_used=item.get("template_id"),
                    ),
                )

            self._total_stored += len(cache_entries)
            logger.info("Batch stored %d entries", len(cache_entries))
            return True

        except Exception as e:
            logger.error("Batch store failed: %s", e, exc_info=True)
            return False

    # --- TTL / Expiry ---

    async def expire(self, collection_name: str | None = None) -> int:
        """Delete expired entries from the collection.

        Args:
            collection_name: Target collection. None = all known collections.

        Returns:
            Total number of entries deleted.
        """
        collections = [collection_name] if collection_name else self._known_collections
        total = 0
        for coll in collections:
            try:
                expired_ids = await self._backend.find_expired(coll)
                if expired_ids:
                    await self._backend.delete(coll, expired_ids)
                    total += len(expired_ids)
            except Exception:
                logger.exception("expire() failed for collection '%s'", coll)
        return total

    # --- Invalidation API ---

    async def invalidate(self, question: str) -> bool:
        """Invalidate a single cache entry by its original question.

        Finds the entry in the vector backend via normalized_question match,
        deletes it, and removes the corresponding L1 key.

        Args:
            question: Natural language question whose cached entry to remove.

        Returns:
            True if an entry was found and deleted, False if not found.
        """
        normalized = normalize_question(question)
        try:
            result = await self._backend.search_by_normalized_question(
                self._collection_name, normalized
            )
        except Exception:
            logger.exception("invalidate: backend lookup failed for '%s'", question[:50])
            return False

        if result is None:
            logger.debug("invalidate: no entry found for '%s'", question[:50])
            return False

        try:
            await self._backend.delete(self._collection_name, [result.id])
        except Exception:
            logger.exception("invalidate: backend delete failed for id='%s'", result.id)
            return False

        key = question_hash(question)
        await self._l1_backend.invalidate(key)
        logger.info("Invalidated entry for '%s' (id=%s)", question[:50], result.id)
        return True

    async def invalidate_by_query_hash(self, query_hash: str) -> int:
        """Invalidate all entries whose generated query matches *query_hash*.

        Args:
            query_hash: MD5 hash of the generated query (as stored in the backend).

        Returns:
            Number of entries deleted.
        """
        try:
            ids = await self._backend.find_by_query_hash(self._collection_name, query_hash)
        except Exception:
            logger.exception("invalidate_by_query_hash: lookup failed for hash='%s'", query_hash)
            return 0

        if not ids:
            return 0

        try:
            await self._backend.delete(self._collection_name, ids)
        except Exception:
            logger.exception("invalidate_by_query_hash: delete failed")
            return 0

        await self._l1_backend.invalidate_all()
        logger.info("Invalidated %d entries for query_hash='%s'", len(ids), query_hash)
        return len(ids)

    async def invalidate_by_template(self, template_id: str) -> int:
        """Invalidate all entries belonging to a template.

        Args:
            template_id: Template intent identifier.

        Returns:
            Number of entries deleted.
        """
        try:
            ids = await self._backend.find_by_template_id(self._collection_name, template_id)
        except Exception:
            logger.exception("invalidate_by_template: lookup failed for template_id='%s'", template_id)
            return 0

        if not ids:
            return 0

        try:
            await self._backend.delete(self._collection_name, ids)
        except Exception:
            logger.exception("invalidate_by_template: delete failed")
            return 0

        await self._l1_backend.invalidate_all()
        logger.info("Invalidated %d entries for template_id='%s'", len(ids), template_id)
        return len(ids)

    async def invalidate_collection(self, collection_name: str | None = None) -> int:
        """Drop and re-initialize a collection, clearing all its entries.

        Args:
            collection_name: Target collection. None = main collection.

        Returns:
            Number of entries that were in the collection before deletion.
        """
        coll = collection_name or self._collection_name
        try:
            count = await self._backend.count(coll)
        except Exception:
            count = 0

        try:
            await self._backend.drop_collection(coll)
        except Exception:
            logger.exception("invalidate_collection: drop failed for '%s'", coll)
            return 0

        try:
            await self._backend.initialize(coll, self._embedder.dimension)
        except Exception:
            logger.exception("invalidate_collection: re-initialize failed for '%s'", coll)

        await self._l1_backend.invalidate_all()
        logger.info("Invalidated collection '%s' (%d entries dropped)", coll, count)
        return count

    async def _cleanup_loop(self) -> None:
        interval = self._settings.cleanup_interval_seconds
        while True:
            await asyncio.sleep(interval)
            try:
                n = await self.expire()
                if n > 0:
                    logger.info("TTL cleanup: removed %d expired entries", n)
            except Exception:
                logger.exception("TTL cleanup failed")

    # --- Template Management ---

    def _resolve_and_check_path(self, raw_path: str, label: str) -> Path:
        """Resolve path and optionally enforce allowed_file_dir restriction."""
        resolved = Path(raw_path).resolve()
        if self._settings.allowed_file_dir is not None:
            allowed = Path(self._settings.allowed_file_dir).resolve()
            try:
                resolved.relative_to(allowed)
            except ValueError as err:
                raise ValueError(
                    f"{label}: path '{resolved}' is outside allowed_file_dir '{allowed}'"
                ) from err
        return resolved

    async def load_templates(self, templates: list[QueryTemplate]) -> None:
        """Load templates into memory and sync to the template collection.

        Args:
            templates: List of QueryTemplate objects.
        """
        self._templates = templates
        await self._sync_templates_to_backend()
        logger.info("Loaded %d templates", len(templates))

    async def load_templates_from_file(self, file_path: str) -> None:
        """Load templates from a JSON file.

        Expected format: List of objects matching QueryTemplate schema.

        Args:
            file_path: Path to the JSON template file.

        Raises:
            TemplateError: If the file cannot be read or parsed.
        """
        try:
            resolved = self._resolve_and_check_path(file_path, "load_templates_from_file")
            _max_bytes = self._settings.max_file_size_mb * 1024 * 1024
            file_size = os.path.getsize(resolved)
            if file_size > _max_bytes:
                raise TemplateError(
                    f"Template file '{resolved}' is {file_size / 1_048_576:.1f} MB, "
                    f"exceeds max_file_size_mb={self._settings.max_file_size_mb}"
                )
            with open(resolved, encoding="utf-8") as f:
                data = json.load(f)
            templates = [QueryTemplate(**item) for item in data]
            self._templates = templates
            logger.info("Loaded %d templates from '%s'", len(templates), resolved)
        except TemplateError:
            raise
        except Exception as e:
            raise TemplateError(
                f"Failed to load templates from '{file_path}': {e}"
            ) from e

    async def warm_from_file(
        self,
        path: str,
        batch_size: int | None = None,
        on_progress: Any = None,
    ) -> int:
        """Warm the cache from a JSON or JSONL file.

        Supports two formats:
          - JSON array: ``[{"question": ..., "generated_query": ...}, ...]``
          - JSONL: one JSON object per line (same keys)

        Optional per-entry keys: ``response_summary``, ``template_id``.

        Args:
            path: Path to the file.
            batch_size: Override the default batch size for chunked upserts.
            on_progress: Optional callback ``(done, total)`` called after each chunk.

        Returns:
            Number of entries successfully stored.

        Raises:
            MedhaError: If the file cannot be read or parsed.
        """
        try:
            resolved = self._resolve_and_check_path(path, "warm_from_file")
            _max_bytes = self._settings.max_file_size_mb * 1024 * 1024
            file_size = os.path.getsize(resolved)
            if file_size > _max_bytes:
                raise MedhaError(
                    f"warm_from_file: '{resolved}' is {file_size / 1_048_576:.1f} MB, "
                    f"exceeds max_file_size_mb={self._settings.max_file_size_mb}"
                )
            with open(resolved, encoding="utf-8") as f:
                content = f.read().strip()

            if content.startswith("["):
                data = json.loads(content)
            else:
                data = [
                    json.loads(line)
                    for line in content.splitlines()
                    if line.strip()
                ]
        except MedhaError:
            raise
        except Exception as e:
            raise MedhaError(f"warm_from_file: cannot read '{path}': {e}") from e

        if not data:
            logger.warning("warm_from_file: no entries found in '%s'", path)
            return 0

        count = await self.store_many(data, batch_size=batch_size, on_progress=on_progress)
        if count:
            self._warm_loaded += count
            logger.info("Cache warmed: %d entries from '%s'", count, path)
        return count

    def _build_cache_entries(
        self,
        chunk: list[dict[str, Any]],
        embeddings: list[list[float]],
        resolved_ttl: int | None,
    ) -> list[CacheEntry]:
        entries = []
        for item, embedding in zip(chunk, embeddings, strict=False):
            question = item["question"]
            normalized = normalize_question(question)
            expires_at = (
                datetime.now(timezone.utc) + timedelta(seconds=resolved_ttl)
                if resolved_ttl is not None
                else None
            )
            entries.append(CacheEntry(
                id=str(uuid.uuid4()),
                vector=embedding,
                original_question=question,
                normalized_question=normalized,
                generated_query=item["generated_query"],
                query_hash=query_hash(item["generated_query"]),
                response_summary=item.get("response_summary"),
                template_id=item.get("template_id"),
                expires_at=expires_at,
            ))
        return entries

    async def store_many(
        self,
        entries: list[dict[str, Any]],
        *,
        batch_size: int | None = None,
        on_progress: Any = None,
        ttl: int | None = _UNSET,  # type: ignore[assignment]
    ) -> int:
        """Store multiple entries using chunked, optionally concurrent embedding.

        Fail-fast: raises ValueError if any entry is missing 'question' or
        'generated_query' before any embedding or upsert is performed.

        Args:
            entries: List of dicts, each with at minimum 'question' and
                'generated_query'. Optional keys: 'response_summary', 'template_id'.
            batch_size: Chunk size. Defaults to settings.batch_size.
            on_progress: Optional callback ``(done: int, total: int)`` called
                after each chunk is upserted.
            ttl: TTL in seconds for the new entries. _UNSET = use settings default.

        Returns:
            Number of entries stored.

        Raises:
            ValueError: If any entry is missing required keys.
        """
        for i, item in enumerate(entries):
            if not item.get("question", "").strip():
                raise ValueError(f"store_many: entry {i} missing or empty 'question'")
            if not item.get("generated_query", "").strip():
                raise ValueError(f"store_many: entry {i} missing or empty 'generated_query'")

        if not entries:
            return 0

        chunk_size = batch_size or self._settings.batch_size
        total = len(entries)
        done = 0
        resolved_ttl = ttl if ttl is not _UNSET else self._settings.default_ttl_seconds
        concurrency = self._settings.batch_embed_concurrency
        chunks = [entries[i: i + chunk_size] for i in range(0, total, chunk_size)]

        async def _embed_chunk(chunk: list[dict[str, Any]]) -> list[list[float]]:
            normalized = [normalize_question(item["question"]) for item in chunk]
            coro = self._embedder.aembed_batch(normalized, is_document=True)
            if self._settings.embedding_timeout is not None:
                coro = asyncio.wait_for(coro, timeout=self._settings.embedding_timeout)
            return await coro

        async def _upsert_chunk(chunk: list[dict[str, Any]], embeddings: list[list[float]]) -> None:
            cache_entries = self._build_cache_entries(chunk, embeddings, resolved_ttl)
            await self._backend.upsert(self._collection_name, cache_entries)
            for item in chunk:
                await self._store_in_l1(
                    item["question"],
                    CacheHit(
                        generated_query=item["generated_query"],
                        response_summary=item.get("response_summary"),
                        confidence=1.0,
                        strategy=SearchStrategy.EXACT_MATCH,
                        template_used=item.get("template_id"),
                    ),
                )

        if concurrency > 1:
            for group_start in range(0, len(chunks), concurrency):
                group = chunks[group_start: group_start + concurrency]
                embeddings_list: list[list[list[float]]] = await asyncio.gather(
                    *[_embed_chunk(chunk) for chunk in group]
                )
                for chunk, embeddings in zip(group, embeddings_list, strict=False):
                    await _upsert_chunk(chunk, embeddings)
                    done += len(chunk)
                    if on_progress is not None:
                        on_progress(done, total)
        else:
            for chunk in chunks:
                embeddings = await _embed_chunk(chunk)
                await _upsert_chunk(chunk, embeddings)
                done += len(chunk)
                if on_progress is not None:
                    on_progress(done, total)

        self._total_stored += total
        logger.info("store_many: stored %d entries in %d chunks", total, len(chunks))
        return total

    async def warm_from_dataframe(
        self,
        df: Any,
        *,
        question_col: str = "question",
        query_col: str = "generated_query",
        response_col: str = "response_summary",
        template_col: str | None = None,
        batch_size: int | None = None,
        on_progress: Any = None,
        ttl: int | None = _UNSET,  # type: ignore[assignment]
    ) -> int:
        """Warm the cache from a pandas DataFrame.

        Args:
            df: A ``pandas.DataFrame`` with at least the question and query columns.
            question_col: Column name for questions.
            query_col: Column name for generated queries.
            response_col: Column name for optional response summaries.
            template_col: Column name for optional template IDs.
            batch_size: Override chunk size.
            on_progress: Optional ``(done, total)`` callback.
            ttl: TTL for new entries.

        Returns:
            Number of entries stored.

        Raises:
            ConfigurationError: If pandas is not installed.
        """
        try:
            import pandas as pd
        except ImportError as exc:
            raise ConfigurationError(
                "warm_from_dataframe requires pandas: pip install pandas"
            ) from exc

        rows: list[dict[str, Any]] = []
        for _, row in df.iterrows():
            item: dict[str, Any] = {
                "question": row[question_col],
                "generated_query": row[query_col],
            }
            if response_col in df.columns:
                val = row.get(response_col)
                if val is not None and pd.notna(val):
                    item["response_summary"] = val
            if template_col is not None and template_col in df.columns:
                val = row.get(template_col)
                if val is not None and pd.notna(val):
                    item["template_id"] = val
            rows.append(item)

        return await self.store_many(rows, batch_size=batch_size, on_progress=on_progress, ttl=ttl)

    async def export_to_dataframe(
        self,
        collection_name: str | None = None,
        *,
        include_vectors: bool = False,
    ) -> Any:
        """Export all cache entries to a pandas DataFrame.

        Scrolls the entire collection in pages of 500 and returns the result
        as a DataFrame where each row is the ``model_dump()`` of a CacheResult.

        Args:
            collection_name: Target collection. None = main collection.
            include_vectors: Whether to request vectors from the backend.

        Returns:
            ``pandas.DataFrame`` with one row per cache entry.

        Raises:
            ConfigurationError: If pandas is not installed.
        """
        try:
            import pandas as pd
        except ImportError as exc:
            raise ConfigurationError(
                "export_to_dataframe requires pandas: pip install pandas"
            ) from exc

        coll = collection_name or self._collection_name
        rows: list[dict[str, Any]] = []
        offset: str | None = None
        while True:
            results, offset = await self._backend.scroll(
                collection_name=coll,
                limit=500,
                offset=offset,
                with_vectors=include_vectors,
            )
            for r in results:
                rows.append(r.model_dump())
            if offset is None:
                break

        return pd.DataFrame(rows)

    async def dedup_collection(
        self,
        collection_name: str | None = None,
        *,
        strategy: str = "keep_latest",
    ) -> int:
        """Remove duplicate entries that share the same generated query hash.

        Args:
            collection_name: Target collection. None = main collection.
            strategy: ``"keep_latest"`` retains the most recently created entry;
                ``"keep_first"`` retains the oldest.

        Returns:
            Number of duplicate entries deleted.
        """
        coll = collection_name or self._collection_name
        seen: dict[str, Any] = {}
        to_delete: list[str] = []
        offset: str | None = None

        while True:
            results, offset = await self._backend.scroll(
                collection_name=coll,
                limit=500,
                offset=offset,
            )
            for r in results:
                qhash = r.query_hash
                if qhash not in seen:
                    seen[qhash] = r
                else:
                    existing = seen[qhash]
                    if strategy == "keep_latest":
                        r_ts = r.created_at
                        ex_ts = existing.created_at
                        if r_ts is not None and (ex_ts is None or r_ts > ex_ts):
                            to_delete.append(existing.id)
                            seen[qhash] = r
                        else:
                            to_delete.append(r.id)
                    else:
                        to_delete.append(r.id)
            if offset is None:
                break

        if to_delete:
            await self._backend.delete(coll, to_delete)
            logger.info("dedup_collection: deleted %d duplicates from '%s'", len(to_delete), coll)

        return len(to_delete)

    async def _sync_templates_to_backend(self) -> None:
        """Sync in-memory templates to the template collection in the backend.

        Idempotent: skips if templates already exist in the collection.
        """
        if not self._templates:
            return

        try:
            count = await self._backend.count(self._template_collection)
            if count > 0:
                logger.info(
                    "Template collection already has %d entries, skipping sync",
                    count,
                )
                return
        except StorageError:
            pass

        # Flatten all texts across templates into a single list for batch embedding.
        # Each item: (template, original_text, normalized_embed_text)
        # Strip {placeholder} braces so the embedder sees natural words.
        items: list[tuple[Any, ...]] = []
        for template in self._templates:
            for text in [template.template_text] + template.aliases:
                embed_text = re.sub(r"\{(\w+)\}", r"\1", text)
                items.append((template, text, normalize_question(embed_text)))

        if not items:
            return

        # Single batch embedding call instead of N sequential aembed() calls
        try:
            coro = self._embedder.aembed_batch(
                [embed_text for _, _, embed_text in items],
                is_document=True,
            )
            if self._settings.embedding_timeout is not None:
                coro = asyncio.wait_for(coro, timeout=self._settings.embedding_timeout)
            vectors = await coro
        except asyncio.TimeoutError:
            logger.error(
                "Template sync: batch embedding timed out after %.1fs for %d texts",
                self._settings.embedding_timeout,
                len(items),
            )
            return
        except EmbeddingError as e:
            logger.error("Template sync: batch embedding failed: %s", e)
            return

        entries = [
            CacheEntry(
                id=str(uuid.uuid4()),
                vector=vec,
                original_question=original_text,
                normalized_question=normalize_question(original_text),
                generated_query=template.query_template,
                query_hash=query_hash(template.query_template),
                template_id=template.intent,
            )
            for (template, original_text, _), vec in zip(items, vectors, strict=False)
        ]

        await self._backend.upsert(self._template_collection, entries)
        logger.info("Synced %d template entries to backend", len(entries))

    # --- Embedding Cache ---

    async def _get_embedding(self, question: str) -> list[float] | None:
        """Get or compute the embedding for a question.

        Checks the internal LRU cache first. If another coroutine is already
        computing the same embedding, waits for its result instead of
        duplicating the work (deduplication via asyncio.Future).

        Returns:
            Embedding vector, or None on failure.
        """
        normalized = normalize_question(question)
        cache_key = question_hash(question)

        our_future: asyncio.Future[list[float]] | None = None
        wait_future: asyncio.Future[list[float]] | None = None

        async with self._embedding_cache_lock:
            # Cache hit → LRU bump and return
            if cache_key in self._embedding_cache:
                vec = self._embedding_cache.pop(cache_key)
                self._embedding_cache[cache_key] = vec
                logger.debug("Embedding cache HIT for key=%s", cache_key[:8])
                return vec

            # Another coroutine is already computing this key → join it
            if cache_key in self._pending_embeddings:
                wait_future = self._pending_embeddings[cache_key]
                logger.debug("Embedding deduplication: joining in-flight key=%s", cache_key[:8])
            else:
                # We are the first → register a Future so others can join
                our_future = asyncio.get_running_loop().create_future()
                # Suppress "Future exception was never retrieved" if no waiter joins
                our_future.add_done_callback(
                    lambda f: f.exception() if not f.cancelled() and f.done() and f.exception() else None
                )
                self._pending_embeddings[cache_key] = our_future
                logger.debug("Embedding cache MISS for key=%s, computing...", cache_key[:8])

        if wait_future is not None:
            try:
                return await asyncio.shield(wait_future)
            except (EmbeddingError, asyncio.CancelledError, asyncio.TimeoutError):
                logger.warning("In-flight embedding unavailable for key=%s", cache_key[:8])
                return None

        # We own this computation
        if our_future is None:
            logger.error("Embedding deduplication: invariant violated for key=%s", cache_key[:8])
            return None

        try:
            coro = self._embedder.aembed(normalized)
            if self._settings.embedding_timeout is not None:
                coro = asyncio.wait_for(coro, timeout=self._settings.embedding_timeout)
            vec = await coro
        except asyncio.TimeoutError:
            err = EmbeddingError(
                f"Embedding timed out after {self._settings.embedding_timeout}s"
            )
            logger.error("Embedding timed out for '%s'", question[:50])
            async with self._embedding_cache_lock:
                self._pending_embeddings.pop(cache_key, None)
            if not our_future.done():
                our_future.set_exception(err)
            return None
        except EmbeddingError as e:
            logger.error("Embedding failed for '%s': %s", question[:50], e)
            async with self._embedding_cache_lock:
                self._pending_embeddings.pop(cache_key, None)
            if not our_future.done():
                our_future.set_exception(e)
            return None
        except BaseException:
            # CancelledError, KeyboardInterrupt, etc. — always unblock waiters
            async with self._embedding_cache_lock:
                self._pending_embeddings.pop(cache_key, None)
            if not our_future.done():
                our_future.cancel()
            raise

        # Store in cache and notify waiters
        async with self._embedding_cache_lock:
            if len(self._embedding_cache) >= self._embedding_cache_max:
                self._embedding_cache.popitem(last=False)
            self._embedding_cache[cache_key] = vec
            self._pending_embeddings.pop(cache_key, None)

        if not our_future.done():
            our_future.set_result(vec)

        logger.debug(
            "Embedding computed: dim=%d, cache_size=%d",
            len(vec),
            len(self._embedding_cache),
        )
        return vec

    # --- Statistics & Monitoring ---

    async def stats(self, collection_name: str | None = None) -> CacheStats:
        """Return a frozen snapshot of cache performance metrics.

        Args:
            collection_name: Collection to count entries for. None = main collection.

        Returns:
            CacheStats with hit/miss rates, latency percentiles, and backend count.
        """
        backend_count = await self._backend.count(collection_name or self._collection_name)
        return await self._stats.snapshot(backend_count)

    async def reset_stats(self) -> None:
        """Reset all collected statistics to zero."""
        await self._stats.reset()

    async def clear_caches(self) -> None:
        """Clear all caches (L1, embedding). Backend data is preserved."""
        await self._l1_backend.clear()
        self._embedding_cache.clear()
        await self._stats.reset()
        self._total_stored = 0
        self._warm_loaded = 0
        logger.info("In-memory caches cleared")

    # --- Persistent Embedding Cache ---

    def _load_embedding_cache_from_disk(self) -> None:
        """Load persisted embeddings from disk into the in-memory cache.

        Silently skips if the file does not exist yet (first run).
        """
        path = self._settings.embedding_cache_path
        if not path:
            return
        try:
            import os
            if not os.path.exists(path):
                logger.debug("Embedding cache file not found at '%s', starting empty", path)
                return
            with open(path, encoding="utf-8") as f:
                data: dict[str, list[float]] = json.load(f)
            loaded = 0
            for key, vec in data.items():
                if len(self._embedding_cache) >= self._embedding_cache_max:
                    break
                self._embedding_cache[key] = vec
                loaded += 1
            logger.info("Loaded %d embeddings from disk cache '%s'", loaded, path)
        except Exception as exc:
            logger.warning("Failed to load embedding cache from '%s': %s", path, exc)

    def _save_embedding_cache_to_disk(self) -> None:
        """Persist the current in-memory embedding cache to disk."""
        path = self._settings.embedding_cache_path
        if not path:
            return
        try:
            with open(path, "w", encoding="utf-8") as f:
                json.dump(dict(self._embedding_cache), f)
            logger.info(
                "Saved %d embeddings to disk cache '%s'",
                len(self._embedding_cache),
                path,
            )
        except Exception as exc:
            logger.warning("Failed to save embedding cache to '%s': %s", path, exc)

    # --- Sync Wrappers ---

    def search_sync(self, question: str) -> CacheHit:
        """Synchronous wrapper for search()."""
        return BaseEmbedder._run_sync(self.search(question))

    def store_sync(self, question: str, generated_query: str, **kwargs: Any) -> bool:
        """Synchronous wrapper for store()."""
        return BaseEmbedder._run_sync(self.store(question, generated_query, **kwargs))

    def warm_from_file_sync(self, path: str) -> int:
        """Synchronous wrapper for warm_from_file()."""
        return BaseEmbedder._run_sync(self.warm_from_file(path))

    def clear_caches_sync(self) -> None:
        """Synchronous wrapper for clear_caches()."""
        BaseEmbedder._run_sync(self.clear_caches())

start() async

Initialize the backend and sync templates.

Must be called before search/store operations.

Steps
  1. Connect to the vector backend.
  2. Initialize the main collection.
  3. Initialize the template collection.
  4. Load templates from file (if configured).
  5. Sync templates to the template collection.
Source code in src/medha/core.py
async def start(self) -> None:
    """Initialize the backend and sync templates.

    Must be called before search/store operations.

    Steps:
        1. Connect to the vector backend.
        2. Initialize the main collection.
        3. Initialize the template collection.
        4. Load templates from file (if configured).
        5. Sync templates to the template collection.
    """
    logger.debug(
        "Starting Medha: collection='%s', embedder=%s, backend=%s",
        self._collection_name,
        type(self._embedder).__name__,
        type(self._backend).__name__,
    )
    if hasattr(self._backend, "connect"):
        await self._backend.connect()

    dimension = self._embedder.dimension
    logger.debug("Embedder dimension: %d", dimension)

    await self._backend.initialize(self._collection_name, dimension)
    await self._backend.initialize(self._template_collection, dimension)

    # Warn once if a legacy-named template collection still exists
    legacy_collection = f"{self._collection_name}_templates"
    try:
        legacy_count = await self._backend.count(legacy_collection)
        if legacy_count > 0:
            logger.warning(
                "Legacy template collection '%s' found with %d entries. "
                "Templates will be re-synced to '%s'. "
                "Delete the old collection manually when ready.",
                legacy_collection,
                legacy_count,
                self._template_collection,
            )
    except StorageError:
        pass  # Old collection does not exist — fresh deployment

    if self._settings.template_file and not self._templates:
        await self.load_templates_from_file(self._settings.template_file)

    if self._templates:
        await self._sync_templates_to_backend()

    # Load persistent embedding cache from disk (if configured)
    if self._settings.embedding_cache_path:
        self._load_embedding_cache_from_disk()

    if self._settings.cleanup_interval_seconds:
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    logger.info(
        "Medha started: collection='%s', templates=%d",
        self._collection_name,
        len(self._templates),
    )

close() async

Shut down the backend and release resources.

Source code in src/medha/core.py
async def close(self) -> None:
    """Shut down the backend and release resources."""
    if self._cleanup_task is not None:
        self._cleanup_task.cancel()
        with suppress(asyncio.CancelledError):
            await self._cleanup_task
        self._cleanup_task = None
    if self._settings.embedding_cache_path:
        self._save_embedding_cache_to_disk()
    await self._l1_backend.close()
    await self._backend.close()
    logger.info("Medha closed")

search(question) async

Search the cache using the waterfall strategy.

Tiers (checked in order, first hit wins): 0. L1 In-Memory Cache (exact hash match) 1. Template Matching (intent recognition + parameter extraction) 2. Exact Vector Match (score >= score_threshold_exact) 3. Semantic Similarity (score >= score_threshold_semantic) 4. Fuzzy Matching (Levenshtein distance, optional)

Parameters:

Name Type Description Default
question str

Natural language question from the user.

required

Returns:

Type Description
CacheHit

CacheHit with the matched query, confidence, and strategy.

CacheHit

Returns CacheHit(strategy=NO_MATCH) if no tier matches.

Source code in src/medha/core.py
async def search(self, question: str) -> CacheHit:
    """Search the cache using the waterfall strategy.

    Tiers (checked in order, first hit wins):
        0. L1 In-Memory Cache (exact hash match)
        1. Template Matching (intent recognition + parameter extraction)
        2. Exact Vector Match (score >= score_threshold_exact)
        3. Semantic Similarity (score >= score_threshold_semantic)
        4. Fuzzy Matching (Levenshtein distance, optional)

    Args:
        question: Natural language question from the user.

    Returns:
        CacheHit with the matched query, confidence, and strategy.
        Returns CacheHit(strategy=NO_MATCH) if no tier matches.
    """
    t0 = time.monotonic()
    result = await self._search_impl(question)
    latency_ms = (time.monotonic() - t0) * 1000
    await self._stats.record(result.strategy, latency_ms)
    return result

store(question, generated_query, response_summary=None, template_id=None, ttl=_UNSET) async

Store a question-query pair in the cache.

Also stores in L1 cache for immediate subsequent hits.

Parameters:

Name Type Description Default
question str

The natural language question.

required
generated_query str

The SQL/Cypher/GraphQL query.

required
response_summary str | None

Optional response summary.

None
template_id str | None

Optional template intent identifier.

None

Returns:

Type Description
bool

True if stored successfully, False otherwise.

Source code in src/medha/core.py
async def store(
    self,
    question: str,
    generated_query: str,
    response_summary: str | None = None,
    template_id: str | None = None,
    ttl: int | None = _UNSET,  # type: ignore[assignment]
) -> bool:
    """Store a question-query pair in the cache.

    Also stores in L1 cache for immediate subsequent hits.

    Args:
        question: The natural language question.
        generated_query: The SQL/Cypher/GraphQL query.
        response_summary: Optional response summary.
        template_id: Optional template intent identifier.

    Returns:
        True if stored successfully, False otherwise.
    """
    if not question or not question.strip():
        logger.warning("Store skipped: question is empty or whitespace-only")
        return False
    if len(question) > self._settings.max_question_length:
        raise ValueError(
            f"Question length {len(question)} exceeds max_question_length "
            f"({self._settings.max_question_length})"
        )
    if not generated_query or not generated_query.strip():
        logger.warning("Store skipped: generated_query is empty or whitespace-only")
        return False

    try:
        logger.debug("Storing: '%s' -> '%s'", question[:50], generated_query[:50])
        embedding = await self._get_embedding(question)
        if embedding is None:
            logger.error("Store aborted: embedding failed for '%s'", question[:50])
            return False

        resolved_ttl = ttl if ttl is not _UNSET else self._settings.default_ttl_seconds
        expires_at = (
            datetime.now(timezone.utc) + timedelta(seconds=resolved_ttl)
            if resolved_ttl is not None
            else None
        )

        normalized = normalize_question(question)
        entry = CacheEntry(
            id=str(uuid.uuid4()),
            vector=embedding,
            original_question=question,
            normalized_question=normalized,
            generated_query=generated_query,
            query_hash=query_hash(generated_query),
            response_summary=response_summary,
            template_id=template_id,
            expires_at=expires_at,
        )

        await self._backend.upsert(self._collection_name, [entry])

        # Also store in L1
        await self._store_in_l1(
            question,
            CacheHit(
                generated_query=generated_query,
                response_summary=response_summary,
                confidence=1.0,
                strategy=SearchStrategy.EXACT_MATCH,
                template_used=template_id,
                expires_at=expires_at,
            ),
        )

        self._total_stored += 1
        logger.info("Stored: '%s' -> '%s'", question[:50], generated_query[:50])
        return True

    except Exception as e:
        logger.error("Store failed for '%s': %s", question[:50], e, exc_info=True)
        return False

store_batch(entries) async

Store multiple question-query pairs efficiently.

Uses aembed_batch() for a single round-trip to the embedder, then upserts all entries and populates the L1 cache.

Parameters:

Name Type Description Default
entries list[dict[str, Any]]

List of dicts with keys: question, generated_query, response_summary (optional), template_id (optional).

required

Returns:

Type Description
bool

True if all stored successfully, False if embedding or upsert fails.

Source code in src/medha/core.py
async def store_batch(self, entries: list[dict[str, Any]]) -> bool:
    """Store multiple question-query pairs efficiently.

    Uses aembed_batch() for a single round-trip to the embedder, then
    upserts all entries and populates the L1 cache.

    Args:
        entries: List of dicts with keys: question, generated_query,
            response_summary (optional), template_id (optional).

    Returns:
        True if all stored successfully, False if embedding or upsert fails.
    """
    if not entries:
        return True

    valid_entries = []
    for i, item in enumerate(entries):
        if not item.get("question", "").strip():
            logger.warning("store_batch: entry %d skipped — empty question", i)
            continue
        if not item.get("generated_query", "").strip():
            logger.warning("store_batch: entry %d skipped — empty generated_query", i)
            continue
        valid_entries.append(item)

    if not valid_entries:
        logger.warning("store_batch: no valid entries to store")
        return False
    entries = valid_entries

    try:
        logger.debug("Batch store started: %d entries", len(entries))

        questions = [item["question"] for item in entries]
        normalized_questions = [normalize_question(q) for q in questions]

        # Single batch embedding call — much faster than N sequential calls
        try:
            coro = self._embedder.aembed_batch(normalized_questions, is_document=True)
            if self._settings.embedding_timeout is not None:
                coro = asyncio.wait_for(coro, timeout=self._settings.embedding_timeout)
            embeddings = await coro
        except asyncio.TimeoutError:
            logger.error(
                "Batch store: embedding timed out after %.1fs for %d entries",
                self._settings.embedding_timeout,
                len(entries),
            )
            return False
        except EmbeddingError as e:
            logger.error("Batch store: embedding failed: %s", e)
            return False

        # Populate embedding cache with all computed vectors
        async with self._embedding_cache_lock:
            for question, vec in zip(questions, embeddings, strict=False):
                cache_key = question_hash(question)
                if len(self._embedding_cache) >= self._embedding_cache_max:
                    self._embedding_cache.popitem(last=False)
                self._embedding_cache[cache_key] = vec

        # Build CacheEntry objects
        cache_entries = []
        for item, embedding in zip(entries, embeddings, strict=False):
            question = item["question"]
            normalized = normalize_question(question)
            entry = CacheEntry(
                id=str(uuid.uuid4()),
                vector=embedding,
                original_question=question,
                normalized_question=normalized,
                generated_query=item["generated_query"],
                query_hash=query_hash(item["generated_query"]),
                response_summary=item.get("response_summary"),
                template_id=item.get("template_id"),
            )
            cache_entries.append(entry)

        await self._backend.upsert(self._collection_name, cache_entries)

        # Populate L1 cache — consistent with store()
        for item in entries:
            await self._store_in_l1(
                item["question"],
                CacheHit(
                    generated_query=item["generated_query"],
                    response_summary=item.get("response_summary"),
                    confidence=1.0,
                    strategy=SearchStrategy.EXACT_MATCH,
                    template_used=item.get("template_id"),
                ),
            )

        self._total_stored += len(cache_entries)
        logger.info("Batch stored %d entries", len(cache_entries))
        return True

    except Exception as e:
        logger.error("Batch store failed: %s", e, exc_info=True)
        return False

expire(collection_name=None) async

Delete expired entries from the collection.

Parameters:

Name Type Description Default
collection_name str | None

Target collection. None = all known collections.

None

Returns:

Type Description
int

Total number of entries deleted.

Source code in src/medha/core.py
async def expire(self, collection_name: str | None = None) -> int:
    """Delete expired entries from the collection.

    Args:
        collection_name: Target collection. None = all known collections.

    Returns:
        Total number of entries deleted.
    """
    collections = [collection_name] if collection_name else self._known_collections
    total = 0
    for coll in collections:
        try:
            expired_ids = await self._backend.find_expired(coll)
            if expired_ids:
                await self._backend.delete(coll, expired_ids)
                total += len(expired_ids)
        except Exception:
            logger.exception("expire() failed for collection '%s'", coll)
    return total

invalidate(question) async

Invalidate a single cache entry by its original question.

Finds the entry in the vector backend via normalized_question match, deletes it, and removes the corresponding L1 key.

Parameters:

Name Type Description Default
question str

Natural language question whose cached entry to remove.

required

Returns:

Type Description
bool

True if an entry was found and deleted, False if not found.

Source code in src/medha/core.py
async def invalidate(self, question: str) -> bool:
    """Invalidate a single cache entry by its original question.

    Finds the entry in the vector backend via normalized_question match,
    deletes it, and removes the corresponding L1 key.

    Args:
        question: Natural language question whose cached entry to remove.

    Returns:
        True if an entry was found and deleted, False if not found.
    """
    normalized = normalize_question(question)
    try:
        result = await self._backend.search_by_normalized_question(
            self._collection_name, normalized
        )
    except Exception:
        logger.exception("invalidate: backend lookup failed for '%s'", question[:50])
        return False

    if result is None:
        logger.debug("invalidate: no entry found for '%s'", question[:50])
        return False

    try:
        await self._backend.delete(self._collection_name, [result.id])
    except Exception:
        logger.exception("invalidate: backend delete failed for id='%s'", result.id)
        return False

    key = question_hash(question)
    await self._l1_backend.invalidate(key)
    logger.info("Invalidated entry for '%s' (id=%s)", question[:50], result.id)
    return True

invalidate_by_query_hash(query_hash) async

Invalidate all entries whose generated query matches query_hash.

Parameters:

Name Type Description Default
query_hash str

MD5 hash of the generated query (as stored in the backend).

required

Returns:

Type Description
int

Number of entries deleted.

Source code in src/medha/core.py
async def invalidate_by_query_hash(self, query_hash: str) -> int:
    """Invalidate all entries whose generated query matches *query_hash*.

    Args:
        query_hash: MD5 hash of the generated query (as stored in the backend).

    Returns:
        Number of entries deleted.
    """
    try:
        ids = await self._backend.find_by_query_hash(self._collection_name, query_hash)
    except Exception:
        logger.exception("invalidate_by_query_hash: lookup failed for hash='%s'", query_hash)
        return 0

    if not ids:
        return 0

    try:
        await self._backend.delete(self._collection_name, ids)
    except Exception:
        logger.exception("invalidate_by_query_hash: delete failed")
        return 0

    await self._l1_backend.invalidate_all()
    logger.info("Invalidated %d entries for query_hash='%s'", len(ids), query_hash)
    return len(ids)

invalidate_by_template(template_id) async

Invalidate all entries belonging to a template.

Parameters:

Name Type Description Default
template_id str

Template intent identifier.

required

Returns:

Type Description
int

Number of entries deleted.

Source code in src/medha/core.py
async def invalidate_by_template(self, template_id: str) -> int:
    """Invalidate all entries belonging to a template.

    Args:
        template_id: Template intent identifier.

    Returns:
        Number of entries deleted.
    """
    try:
        ids = await self._backend.find_by_template_id(self._collection_name, template_id)
    except Exception:
        logger.exception("invalidate_by_template: lookup failed for template_id='%s'", template_id)
        return 0

    if not ids:
        return 0

    try:
        await self._backend.delete(self._collection_name, ids)
    except Exception:
        logger.exception("invalidate_by_template: delete failed")
        return 0

    await self._l1_backend.invalidate_all()
    logger.info("Invalidated %d entries for template_id='%s'", len(ids), template_id)
    return len(ids)

invalidate_collection(collection_name=None) async

Drop and re-initialize a collection, clearing all its entries.

Parameters:

Name Type Description Default
collection_name str | None

Target collection. None = main collection.

None

Returns:

Type Description
int

Number of entries that were in the collection before deletion.

Source code in src/medha/core.py
async def invalidate_collection(self, collection_name: str | None = None) -> int:
    """Drop and re-initialize a collection, clearing all its entries.

    Args:
        collection_name: Target collection. None = main collection.

    Returns:
        Number of entries that were in the collection before deletion.
    """
    coll = collection_name or self._collection_name
    try:
        count = await self._backend.count(coll)
    except Exception:
        count = 0

    try:
        await self._backend.drop_collection(coll)
    except Exception:
        logger.exception("invalidate_collection: drop failed for '%s'", coll)
        return 0

    try:
        await self._backend.initialize(coll, self._embedder.dimension)
    except Exception:
        logger.exception("invalidate_collection: re-initialize failed for '%s'", coll)

    await self._l1_backend.invalidate_all()
    logger.info("Invalidated collection '%s' (%d entries dropped)", coll, count)
    return count

load_templates(templates) async

Load templates into memory and sync to the template collection.

Parameters:

Name Type Description Default
templates list[QueryTemplate]

List of QueryTemplate objects.

required
Source code in src/medha/core.py
async def load_templates(self, templates: list[QueryTemplate]) -> None:
    """Load templates into memory and sync to the template collection.

    Args:
        templates: List of QueryTemplate objects.
    """
    self._templates = templates
    await self._sync_templates_to_backend()
    logger.info("Loaded %d templates", len(templates))

load_templates_from_file(file_path) async

Load templates from a JSON file.

Expected format: List of objects matching QueryTemplate schema.

Parameters:

Name Type Description Default
file_path str

Path to the JSON template file.

required

Raises:

Type Description
TemplateError

If the file cannot be read or parsed.

Source code in src/medha/core.py
async def load_templates_from_file(self, file_path: str) -> None:
    """Load templates from a JSON file.

    Expected format: List of objects matching QueryTemplate schema.

    Args:
        file_path: Path to the JSON template file.

    Raises:
        TemplateError: If the file cannot be read or parsed.
    """
    try:
        resolved = self._resolve_and_check_path(file_path, "load_templates_from_file")
        _max_bytes = self._settings.max_file_size_mb * 1024 * 1024
        file_size = os.path.getsize(resolved)
        if file_size > _max_bytes:
            raise TemplateError(
                f"Template file '{resolved}' is {file_size / 1_048_576:.1f} MB, "
                f"exceeds max_file_size_mb={self._settings.max_file_size_mb}"
            )
        with open(resolved, encoding="utf-8") as f:
            data = json.load(f)
        templates = [QueryTemplate(**item) for item in data]
        self._templates = templates
        logger.info("Loaded %d templates from '%s'", len(templates), resolved)
    except TemplateError:
        raise
    except Exception as e:
        raise TemplateError(
            f"Failed to load templates from '{file_path}': {e}"
        ) from e

warm_from_file(path, batch_size=None, on_progress=None) async

Warm the cache from a JSON or JSONL file.

Supports two formats
  • JSON array: [{"question": ..., "generated_query": ...}, ...]
  • JSONL: one JSON object per line (same keys)

Optional per-entry keys: response_summary, template_id.

Parameters:

Name Type Description Default
path str

Path to the file.

required
batch_size int | None

Override the default batch size for chunked upserts.

None
on_progress Any

Optional callback (done, total) called after each chunk.

None

Returns:

Type Description
int

Number of entries successfully stored.

Raises:

Type Description
MedhaError

If the file cannot be read or parsed.

Source code in src/medha/core.py
async def warm_from_file(
    self,
    path: str,
    batch_size: int | None = None,
    on_progress: Any = None,
) -> int:
    """Warm the cache from a JSON or JSONL file.

    Supports two formats:
      - JSON array: ``[{"question": ..., "generated_query": ...}, ...]``
      - JSONL: one JSON object per line (same keys)

    Optional per-entry keys: ``response_summary``, ``template_id``.

    Args:
        path: Path to the file.
        batch_size: Override the default batch size for chunked upserts.
        on_progress: Optional callback ``(done, total)`` called after each chunk.

    Returns:
        Number of entries successfully stored.

    Raises:
        MedhaError: If the file cannot be read or parsed.
    """
    try:
        resolved = self._resolve_and_check_path(path, "warm_from_file")
        _max_bytes = self._settings.max_file_size_mb * 1024 * 1024
        file_size = os.path.getsize(resolved)
        if file_size > _max_bytes:
            raise MedhaError(
                f"warm_from_file: '{resolved}' is {file_size / 1_048_576:.1f} MB, "
                f"exceeds max_file_size_mb={self._settings.max_file_size_mb}"
            )
        with open(resolved, encoding="utf-8") as f:
            content = f.read().strip()

        if content.startswith("["):
            data = json.loads(content)
        else:
            data = [
                json.loads(line)
                for line in content.splitlines()
                if line.strip()
            ]
    except MedhaError:
        raise
    except Exception as e:
        raise MedhaError(f"warm_from_file: cannot read '{path}': {e}") from e

    if not data:
        logger.warning("warm_from_file: no entries found in '%s'", path)
        return 0

    count = await self.store_many(data, batch_size=batch_size, on_progress=on_progress)
    if count:
        self._warm_loaded += count
        logger.info("Cache warmed: %d entries from '%s'", count, path)
    return count

store_many(entries, *, batch_size=None, on_progress=None, ttl=_UNSET) async

Store multiple entries using chunked, optionally concurrent embedding.

Fail-fast: raises ValueError if any entry is missing 'question' or 'generated_query' before any embedding or upsert is performed.

Parameters:

Name Type Description Default
entries list[dict[str, Any]]

List of dicts, each with at minimum 'question' and 'generated_query'. Optional keys: 'response_summary', 'template_id'.

required
batch_size int | None

Chunk size. Defaults to settings.batch_size.

None
on_progress Any

Optional callback (done: int, total: int) called after each chunk is upserted.

None
ttl int | None

TTL in seconds for the new entries. _UNSET = use settings default.

_UNSET

Returns:

Type Description
int

Number of entries stored.

Raises:

Type Description
ValueError

If any entry is missing required keys.

Source code in src/medha/core.py
async def store_many(
    self,
    entries: list[dict[str, Any]],
    *,
    batch_size: int | None = None,
    on_progress: Any = None,
    ttl: int | None = _UNSET,  # type: ignore[assignment]
) -> int:
    """Store multiple entries using chunked, optionally concurrent embedding.

    Fail-fast: raises ValueError if any entry is missing 'question' or
    'generated_query' before any embedding or upsert is performed.

    Args:
        entries: List of dicts, each with at minimum 'question' and
            'generated_query'. Optional keys: 'response_summary', 'template_id'.
        batch_size: Chunk size. Defaults to settings.batch_size.
        on_progress: Optional callback ``(done: int, total: int)`` called
            after each chunk is upserted.
        ttl: TTL in seconds for the new entries. _UNSET = use settings default.

    Returns:
        Number of entries stored.

    Raises:
        ValueError: If any entry is missing required keys.
    """
    for i, item in enumerate(entries):
        if not item.get("question", "").strip():
            raise ValueError(f"store_many: entry {i} missing or empty 'question'")
        if not item.get("generated_query", "").strip():
            raise ValueError(f"store_many: entry {i} missing or empty 'generated_query'")

    if not entries:
        return 0

    chunk_size = batch_size or self._settings.batch_size
    total = len(entries)
    done = 0
    resolved_ttl = ttl if ttl is not _UNSET else self._settings.default_ttl_seconds
    concurrency = self._settings.batch_embed_concurrency
    chunks = [entries[i: i + chunk_size] for i in range(0, total, chunk_size)]

    async def _embed_chunk(chunk: list[dict[str, Any]]) -> list[list[float]]:
        normalized = [normalize_question(item["question"]) for item in chunk]
        coro = self._embedder.aembed_batch(normalized, is_document=True)
        if self._settings.embedding_timeout is not None:
            coro = asyncio.wait_for(coro, timeout=self._settings.embedding_timeout)
        return await coro

    async def _upsert_chunk(chunk: list[dict[str, Any]], embeddings: list[list[float]]) -> None:
        cache_entries = self._build_cache_entries(chunk, embeddings, resolved_ttl)
        await self._backend.upsert(self._collection_name, cache_entries)
        for item in chunk:
            await self._store_in_l1(
                item["question"],
                CacheHit(
                    generated_query=item["generated_query"],
                    response_summary=item.get("response_summary"),
                    confidence=1.0,
                    strategy=SearchStrategy.EXACT_MATCH,
                    template_used=item.get("template_id"),
                ),
            )

    if concurrency > 1:
        for group_start in range(0, len(chunks), concurrency):
            group = chunks[group_start: group_start + concurrency]
            embeddings_list: list[list[list[float]]] = await asyncio.gather(
                *[_embed_chunk(chunk) for chunk in group]
            )
            for chunk, embeddings in zip(group, embeddings_list, strict=False):
                await _upsert_chunk(chunk, embeddings)
                done += len(chunk)
                if on_progress is not None:
                    on_progress(done, total)
    else:
        for chunk in chunks:
            embeddings = await _embed_chunk(chunk)
            await _upsert_chunk(chunk, embeddings)
            done += len(chunk)
            if on_progress is not None:
                on_progress(done, total)

    self._total_stored += total
    logger.info("store_many: stored %d entries in %d chunks", total, len(chunks))
    return total

warm_from_dataframe(df, *, question_col='question', query_col='generated_query', response_col='response_summary', template_col=None, batch_size=None, on_progress=None, ttl=_UNSET) async

Warm the cache from a pandas DataFrame.

Parameters:

Name Type Description Default
df Any

A pandas.DataFrame with at least the question and query columns.

required
question_col str

Column name for questions.

'question'
query_col str

Column name for generated queries.

'generated_query'
response_col str

Column name for optional response summaries.

'response_summary'
template_col str | None

Column name for optional template IDs.

None
batch_size int | None

Override chunk size.

None
on_progress Any

Optional (done, total) callback.

None
ttl int | None

TTL for new entries.

_UNSET

Returns:

Type Description
int

Number of entries stored.

Raises:

Type Description
ConfigurationError

If pandas is not installed.

Source code in src/medha/core.py
async def warm_from_dataframe(
    self,
    df: Any,
    *,
    question_col: str = "question",
    query_col: str = "generated_query",
    response_col: str = "response_summary",
    template_col: str | None = None,
    batch_size: int | None = None,
    on_progress: Any = None,
    ttl: int | None = _UNSET,  # type: ignore[assignment]
) -> int:
    """Warm the cache from a pandas DataFrame.

    Args:
        df: A ``pandas.DataFrame`` with at least the question and query columns.
        question_col: Column name for questions.
        query_col: Column name for generated queries.
        response_col: Column name for optional response summaries.
        template_col: Column name for optional template IDs.
        batch_size: Override chunk size.
        on_progress: Optional ``(done, total)`` callback.
        ttl: TTL for new entries.

    Returns:
        Number of entries stored.

    Raises:
        ConfigurationError: If pandas is not installed.
    """
    try:
        import pandas as pd
    except ImportError as exc:
        raise ConfigurationError(
            "warm_from_dataframe requires pandas: pip install pandas"
        ) from exc

    rows: list[dict[str, Any]] = []
    for _, row in df.iterrows():
        item: dict[str, Any] = {
            "question": row[question_col],
            "generated_query": row[query_col],
        }
        if response_col in df.columns:
            val = row.get(response_col)
            if val is not None and pd.notna(val):
                item["response_summary"] = val
        if template_col is not None and template_col in df.columns:
            val = row.get(template_col)
            if val is not None and pd.notna(val):
                item["template_id"] = val
        rows.append(item)

    return await self.store_many(rows, batch_size=batch_size, on_progress=on_progress, ttl=ttl)

export_to_dataframe(collection_name=None, *, include_vectors=False) async

Export all cache entries to a pandas DataFrame.

Scrolls the entire collection in pages of 500 and returns the result as a DataFrame where each row is the model_dump() of a CacheResult.

Parameters:

Name Type Description Default
collection_name str | None

Target collection. None = main collection.

None
include_vectors bool

Whether to request vectors from the backend.

False

Returns:

Type Description
Any

pandas.DataFrame with one row per cache entry.

Raises:

Type Description
ConfigurationError

If pandas is not installed.

Source code in src/medha/core.py
async def export_to_dataframe(
    self,
    collection_name: str | None = None,
    *,
    include_vectors: bool = False,
) -> Any:
    """Export all cache entries to a pandas DataFrame.

    Scrolls the entire collection in pages of 500 and returns the result
    as a DataFrame where each row is the ``model_dump()`` of a CacheResult.

    Args:
        collection_name: Target collection. None = main collection.
        include_vectors: Whether to request vectors from the backend.

    Returns:
        ``pandas.DataFrame`` with one row per cache entry.

    Raises:
        ConfigurationError: If pandas is not installed.
    """
    try:
        import pandas as pd
    except ImportError as exc:
        raise ConfigurationError(
            "export_to_dataframe requires pandas: pip install pandas"
        ) from exc

    coll = collection_name or self._collection_name
    rows: list[dict[str, Any]] = []
    offset: str | None = None
    while True:
        results, offset = await self._backend.scroll(
            collection_name=coll,
            limit=500,
            offset=offset,
            with_vectors=include_vectors,
        )
        for r in results:
            rows.append(r.model_dump())
        if offset is None:
            break

    return pd.DataFrame(rows)

dedup_collection(collection_name=None, *, strategy='keep_latest') async

Remove duplicate entries that share the same generated query hash.

Parameters:

Name Type Description Default
collection_name str | None

Target collection. None = main collection.

None
strategy str

"keep_latest" retains the most recently created entry; "keep_first" retains the oldest.

'keep_latest'

Returns:

Type Description
int

Number of duplicate entries deleted.

Source code in src/medha/core.py
async def dedup_collection(
    self,
    collection_name: str | None = None,
    *,
    strategy: str = "keep_latest",
) -> int:
    """Remove duplicate entries that share the same generated query hash.

    Args:
        collection_name: Target collection. None = main collection.
        strategy: ``"keep_latest"`` retains the most recently created entry;
            ``"keep_first"`` retains the oldest.

    Returns:
        Number of duplicate entries deleted.
    """
    coll = collection_name or self._collection_name
    seen: dict[str, Any] = {}
    to_delete: list[str] = []
    offset: str | None = None

    while True:
        results, offset = await self._backend.scroll(
            collection_name=coll,
            limit=500,
            offset=offset,
        )
        for r in results:
            qhash = r.query_hash
            if qhash not in seen:
                seen[qhash] = r
            else:
                existing = seen[qhash]
                if strategy == "keep_latest":
                    r_ts = r.created_at
                    ex_ts = existing.created_at
                    if r_ts is not None and (ex_ts is None or r_ts > ex_ts):
                        to_delete.append(existing.id)
                        seen[qhash] = r
                    else:
                        to_delete.append(r.id)
                else:
                    to_delete.append(r.id)
        if offset is None:
            break

    if to_delete:
        await self._backend.delete(coll, to_delete)
        logger.info("dedup_collection: deleted %d duplicates from '%s'", len(to_delete), coll)

    return len(to_delete)

stats(collection_name=None) async

Return a frozen snapshot of cache performance metrics.

Parameters:

Name Type Description Default
collection_name str | None

Collection to count entries for. None = main collection.

None

Returns:

Type Description
CacheStats

CacheStats with hit/miss rates, latency percentiles, and backend count.

Source code in src/medha/core.py
async def stats(self, collection_name: str | None = None) -> CacheStats:
    """Return a frozen snapshot of cache performance metrics.

    Args:
        collection_name: Collection to count entries for. None = main collection.

    Returns:
        CacheStats with hit/miss rates, latency percentiles, and backend count.
    """
    backend_count = await self._backend.count(collection_name or self._collection_name)
    return await self._stats.snapshot(backend_count)

reset_stats() async

Reset all collected statistics to zero.

Source code in src/medha/core.py
async def reset_stats(self) -> None:
    """Reset all collected statistics to zero."""
    await self._stats.reset()

clear_caches() async

Clear all caches (L1, embedding). Backend data is preserved.

Source code in src/medha/core.py
async def clear_caches(self) -> None:
    """Clear all caches (L1, embedding). Backend data is preserved."""
    await self._l1_backend.clear()
    self._embedding_cache.clear()
    await self._stats.reset()
    self._total_stored = 0
    self._warm_loaded = 0
    logger.info("In-memory caches cleared")

search_sync(question)

Synchronous wrapper for search().

Source code in src/medha/core.py
def search_sync(self, question: str) -> CacheHit:
    """Synchronous wrapper for search()."""
    return BaseEmbedder._run_sync(self.search(question))

store_sync(question, generated_query, **kwargs)

Synchronous wrapper for store().

Source code in src/medha/core.py
def store_sync(self, question: str, generated_query: str, **kwargs: Any) -> bool:
    """Synchronous wrapper for store()."""
    return BaseEmbedder._run_sync(self.store(question, generated_query, **kwargs))

warm_from_file_sync(path)

Synchronous wrapper for warm_from_file().

Source code in src/medha/core.py
def warm_from_file_sync(self, path: str) -> int:
    """Synchronous wrapper for warm_from_file()."""
    return BaseEmbedder._run_sync(self.warm_from_file(path))

clear_caches_sync()

Synchronous wrapper for clear_caches().

Source code in src/medha/core.py
def clear_caches_sync(self) -> None:
    """Synchronous wrapper for clear_caches()."""
    BaseEmbedder._run_sync(self.clear_caches())