summaryrefslogtreecommitdiff
path: root/drivers/md/dm-vdo/dedupe.c
blob: 80628ae93fbacc938dc98539b2239aed07b799c4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */

/**
 * DOC:
 *
 * Hash Locks:
 *
 * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
 * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
 * also against each other. This saves on index queries and allows those data_vios to concurrently
 * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
 * index query is needed for each hash_lock, instead of one for every data_vio.
 *
 * Hash_locks are assigned to hash_zones by computing a modulus on the hash itself. Each hash_zone
 * has a single dedicated queue and thread for performing all operations on the hash_locks assigned
 * to that zone. The concurrency guarantees of this single-threaded model allow the code to omit
 * more fine-grained locking for the hash_lock structures.
 *
 * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
 * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
 * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
 * containing the lock. An asynchronous operation is almost always performed upon entering a state,
 * and the callback from that operation triggers exiting the state and entering a new state.
 *
 * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
 * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
 * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
 * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
 * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
 * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
 * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
 * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
 *
 * The existence of lock waiters is a key factor controlling which state the lock transitions to
 * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
 * doesn't, it will try to clean up and exit.
 *
 * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
 * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
 * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
 * new copy of the data to a full data block or a slot in a compressed block (WRITING).
 *
 * Cleaning up consists of updating the index when the data location is different from the initial
 * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
 * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
 * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
 *
 * The shortest sequence of states is for non-concurrent writes of new data:
 *   INITIALIZING -> QUERYING -> WRITING -> BYPASSING
 * This sequence is short because no PBN read lock or index update is needed.
 *
 * Non-concurrent, finding valid advice looks like this (endpoints elided):
 *   -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
 * Or with stale advice (endpoints elided):
 *   -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
 *
 * When there are not enough available reference count increments available on a PBN for a data_vio
 * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
 * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
 * data_vios will be directed to it. The two locks will proceed independently, but only the new
 * lock will have the right to update the index (unless it also forks).
 *
 * Since rollover happens in a lock instance, once a valid data location has been selected, it will
 * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
 * non-endpoint states can be re-entered.
 *
 * The function names in this module follow a convention referencing the states and transitions in
 * the state machine. For example, for the LOCKING state, there are start_locking() and
 * finish_locking() functions.  start_locking() is invoked by the finish function of the state (or
 * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
 * on the hash zone thread.  finish_locking() is called by (or continued via callback from) the
 * code actually obtaining the lock. It does any bookkeeping or decision-making required and
 * invokes the appropriate start function of the state being transitioned to after LOCKING.
 *
 * ----------------------------------------------------------------------
 *
 * Index Queries:
 *
 * A query to the UDS index is handled asynchronously by the index's threads. When the query is
 * complete, a callback supplied with the query will be called from one of the those threads. Under
 * heavy system load, the index may be slower to respond than is desirable for reasonable I/O
 * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
 * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
 * request without deduplicating. However, because the uds_request struct itself is supplied by the
 * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
 * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
 * reference to the data_vio on behalf of which they are performing a query.
 *
 * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
 * its hash_zone's pool. If one is available, that context is prepared, associated with the
 * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
 * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
 * goes well, the dedupe callback will be called by the index which will change the context's state
 * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
 * zone where the query results will be processed and the context will be put back in the idle
 * state and returned to the hash_zone's available list.
 *
 * The first time an index query is launched from a given hash_zone, a timer is started. When the
 * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
 * pending list will be searched for any contexts in the pending state which have been running for
 * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
 * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
 * data_vios associated with timed out contexts are sent to continue processing their write
 * operation without deduplicating. The timer is also restarted.
 *
 * When the dedupe callback is run for a context which is in the timed out state, that context is
 * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
 * associated data_vios have already been dispatched.
 *
 * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
 * be searched for any contexts which are timed out and complete. One of these will be used
 * immediately, and the rest will be returned to the available list and marked idle.
 */

#include "dedupe.h"

#include <linux/atomic.h>
#include <linux/jiffies.h>
#include <linux/kernel.h>
#include <linux/list.h>
#include <linux/ratelimit.h>
#include <linux/spinlock.h>
#include <linux/timer.h>

#include "logger.h"
#include "memory-alloc.h"
#include "numeric.h"
#include "permassert.h"
#include "string-utils.h"

#include "indexer.h"

#include "action-manager.h"
#include "admin-state.h"
#include "completion.h"
#include "constants.h"
#include "data-vio.h"
#include "int-map.h"
#include "io-submitter.h"
#include "packer.h"
#include "physical-zone.h"
#include "slab-depot.h"
#include "statistics.h"
#include "types.h"
#include "vdo.h"
#include "wait-queue.h"

#define DEDUPE_QUERY_TIMER_IDLE 0
#define DEDUPE_QUERY_TIMER_RUNNING 1
#define DEDUPE_QUERY_TIMER_FIRED 2

enum dedupe_context_state {
	DEDUPE_CONTEXT_IDLE,
	DEDUPE_CONTEXT_PENDING,
	DEDUPE_CONTEXT_TIMED_OUT,
	DEDUPE_CONTEXT_COMPLETE,
	DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
};

/* Possible index states: closed, opened, or transitioning between those two. */
enum index_state {
	IS_CLOSED,
	IS_CHANGING,
	IS_OPENED,
};

static const char *CLOSED = "closed";
static const char *CLOSING = "closing";
static const char *ERROR = "error";
static const char *OFFLINE = "offline";
static const char *ONLINE = "online";
static const char *OPENING = "opening";
static const char *SUSPENDED = "suspended";
static const char *UNKNOWN = "unknown";

/* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
#define UDS_ADVICE_VERSION 2
/* version byte + state byte + 64-bit little-endian PBN */
#define UDS_ADVICE_SIZE (1 + 1 + sizeof(u64))

enum hash_lock_state {
	/* State for locks that are not in use or are being initialized. */
	VDO_HASH_LOCK_INITIALIZING,

	/* This is the sequence of states typically used on the non-dedupe path. */
	VDO_HASH_LOCK_QUERYING,
	VDO_HASH_LOCK_WRITING,
	VDO_HASH_LOCK_UPDATING,

	/* The remaining states are typically used on the dedupe path in this order. */
	VDO_HASH_LOCK_LOCKING,
	VDO_HASH_LOCK_VERIFYING,
	VDO_HASH_LOCK_DEDUPING,
	VDO_HASH_LOCK_UNLOCKING,

	/*
	 * Terminal state for locks returning to the pool. Must be last both because it's the final
	 * state, and also because it's used to count the states.
	 */
	VDO_HASH_LOCK_BYPASSING,
};

static const char * const LOCK_STATE_NAMES[] = {
	[VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
	[VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
	[VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
	[VDO_HASH_LOCK_LOCKING] = "LOCKING",
	[VDO_HASH_LOCK_QUERYING] = "QUERYING",
	[VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
	[VDO_HASH_LOCK_UPDATING] = "UPDATING",
	[VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
	[VDO_HASH_LOCK_WRITING] = "WRITING",
};

struct hash_lock {
	/* The block hash covered by this lock */
	struct uds_record_name hash;

	/* When the lock is unused, this list entry allows the lock to be pooled */
	struct list_head pool_node;

	/*
	 * A list containing the data VIOs sharing this lock, all having the same record name and
	 * data block contents, linked by their hash_lock_node fields.
	 */
	struct list_head duplicate_ring;

	/* The number of data_vios sharing this lock instance */
	data_vio_count_t reference_count;

	/* The maximum value of reference_count in the lifetime of this lock */
	data_vio_count_t max_references;

	/* The current state of this lock */
	enum hash_lock_state state;

	/* True if the UDS index should be updated with new advice */
	bool update_advice;

	/* True if the advice has been verified to be a true duplicate */
	bool verified;

	/* True if the lock has already accounted for an initial verification */
	bool verify_counted;

	/* True if this lock is registered in the lock map (cleared on rollover) */
	bool registered;

	/*
	 * If verified is false, this is the location of a possible duplicate. If verified is true,
	 * it is the verified location of a true duplicate.
	 */
	struct zoned_pbn duplicate;

	/* The PBN lock on the block containing the duplicate data */
	struct pbn_lock *duplicate_lock;

	/* The data_vio designated to act on behalf of the lock */
	struct data_vio *agent;

	/*
	 * Other data_vios with data identical to the agent who are currently waiting for the agent
	 * to get the information they all need to deduplicate--either against each other, or
	 * against an existing duplicate on disk.
	 */
	struct vdo_wait_queue waiters;
};

#define LOCK_POOL_CAPACITY MAXIMUM_VDO_USER_VIOS

struct hash_zones {
	struct action_manager *manager;
	struct uds_parameters parameters;
	struct uds_index_session *index_session;
	struct ratelimit_state ratelimiter;
	atomic64_t timeouts;
	atomic64_t dedupe_context_busy;

	/* This spinlock protects the state fields and the starting of dedupe requests. */
	spinlock_t lock;

	/* The fields in the next block are all protected by the lock */
	struct vdo_completion completion;
	enum index_state index_state;
	enum index_state index_target;
	struct admin_state state;
	bool changing;
	bool create_flag;
	bool dedupe_flag;
	bool error_flag;
	u64 reported_timeouts;

	/* The number of zones */
	zone_count_t zone_count;
	/* The hash zones themselves */
	struct hash_zone zones[];
};

/* These are in milliseconds. */
unsigned int vdo_dedupe_index_timeout_interval = 5000;
unsigned int vdo_dedupe_index_min_timer_interval = 100;
/* Same two variables, in jiffies for easier consumption. */
static u64 vdo_dedupe_index_timeout_jiffies;
static u64 vdo_dedupe_index_min_timer_jiffies;

static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
{
	vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
	return container_of(completion, struct hash_zone, completion);
}

static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
{
	vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
	return container_of(completion, struct hash_zones, completion);
}

static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
{
	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
			    "%s called on hash zone thread", name);
}

static inline bool change_context_state(struct dedupe_context *context, int old, int new)
{
	return (atomic_cmpxchg(&context->state, old, new) == old);
}

static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
{
	return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
}

/**
 * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
 * @zone: The zone from which the lock was borrowed.
 * @lock: The lock that is no longer in use.
 */
static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
{
	memset(lock, 0, sizeof(*lock));
	INIT_LIST_HEAD(&lock->pool_node);
	INIT_LIST_HEAD(&lock->duplicate_ring);
	vdo_waitq_init(&lock->waiters);
	list_add_tail(&lock->pool_node, &zone->lock_pool);
}

/**
 * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
 *                            the hash_lock the data_vio holds (if there is one).
 * @data_vio: The data_vio to query.
 *
 * Return: The PBN lock on the data_vio's duplicate location.
 */
struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
{
	if (data_vio->hash_lock == NULL)
		return NULL;

	return data_vio->hash_lock->duplicate_lock;
}

/**
 * hash_lock_key() - Return hash_lock's record name as a hash code.
 * @lock: The hash lock.
 *
 * Return: The key to use for the int map.
 */
static inline u64 hash_lock_key(struct hash_lock *lock)
{
	return get_unaligned_le64(&lock->hash.name);
}

/**
 * get_hash_lock_state_name() - Get the string representation of a hash lock state.
 * @state: The hash lock state.
 *
 * Return: The short string representing the state
 */
static const char *get_hash_lock_state_name(enum hash_lock_state state)
{
	/* Catch if a state has been added without updating the name array. */
	BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
	return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
}

/**
 * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
 *                            is being called in the hash zone.
 * @data_vio: The data_vio expected to be the lock agent.
 * @where: A string describing the function making the assertion.
 */
static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
{
	/* Not safe to access the agent field except from the hash zone. */
	assert_data_vio_in_hash_zone(data_vio);
	VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
			    "%s must be for the hash lock agent", where);
}

/**
 * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
 *                        physical zone of the PBN lock.
 * @hash_lock: The hash lock to update.
 * @pbn_lock: The PBN read lock to use as the duplicate lock.
 */
static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
{
	VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
			    "hash lock must not already hold a duplicate lock");
	pbn_lock->holder_count += 1;
	hash_lock->duplicate_lock = pbn_lock;
}

/**
 * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
 * @lock: The lock containing the wait queue.
 *
 * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
 */
static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
{
	return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
}

/**
 * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
 * @data_vio: The data_vio to update.
 * @new_lock: The hash lock the data_vio is joining.
 *
 * Updates the hash lock (or locks) to reflect the change in membership.
 */
static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
{
	struct hash_lock *old_lock = data_vio->hash_lock;

	if (old_lock != NULL) {
		VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
				    "must have a hash zone when holding a hash lock");
		VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
				    "must be on a hash lock ring when holding a hash lock");
		VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0,
				    "hash lock reference must be counted");

		if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
		    (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
			/*
			 * If the reference count goes to zero in a non-terminal state, we're most
			 * likely leaking this lock.
			 */
			VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1,
					    "hash locks should only become unreferenced in a terminal state, not state %s",
					    get_hash_lock_state_name(old_lock->state));
		}

		list_del_init(&data_vio->hash_lock_entry);
		old_lock->reference_count -= 1;

		data_vio->hash_lock = NULL;
	}

	if (new_lock != NULL) {
		/*
		 * Keep all data_vios sharing the lock on a ring since they can complete in any
		 * order and we'll always need a pointer to one to compare data.
		 */
		list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
		new_lock->reference_count += 1;
		if (new_lock->max_references < new_lock->reference_count)
			new_lock->max_references = new_lock->reference_count;

		data_vio->hash_lock = new_lock;
	}
}

/* There are loops in the state diagram, so some forward decl's are needed. */
static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
			   bool agent_is_done);
static void start_locking(struct hash_lock *lock, struct data_vio *agent);
static void start_writing(struct hash_lock *lock, struct data_vio *agent);
static void unlock_duplicate_pbn(struct vdo_completion *completion);
static void transfer_allocation_lock(struct data_vio *data_vio);

/**
 * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
 *                    longer needed to be an agent for the hash lock.
 * @data_vio: The data_vio to complete and send to be cleaned up.
 */
static void exit_hash_lock(struct data_vio *data_vio)
{
	/* Release the hash lock now, saving a thread transition in cleanup. */
	vdo_release_hash_lock(data_vio);

	/* Complete the data_vio and start the clean-up path to release any locks it still holds. */
	data_vio->vio.completion.callback = complete_data_vio;

	continue_data_vio(data_vio);
}

/**
 * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
 *                            is_duplicate and duplicate fields from a zoned_pbn.
 * @data_vio: The data_vio to modify.
 * @source: The location of the duplicate.
 */
static void set_duplicate_location(struct data_vio *data_vio,
				   const struct zoned_pbn source)
{
	data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
	data_vio->duplicate = source;
}

/**
 * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
 *                       make the retired agent exit the hash lock.
 * @lock: The hash lock to update.
 *
 * Return: The new lock agent (which will be NULL if there was no waiter)
 */
static struct data_vio *retire_lock_agent(struct hash_lock *lock)
{
	struct data_vio *old_agent = lock->agent;
	struct data_vio *new_agent = dequeue_lock_waiter(lock);

	lock->agent = new_agent;
	exit_hash_lock(old_agent);
	if (new_agent != NULL)
		set_duplicate_location(new_agent, lock->duplicate);
	return new_agent;
}

/**
 * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
 * @lock: The hash lock on which to wait.
 * @data_vio: The data_vio to add to the queue.
 */
static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
{
	vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);

	/*
	 * Make sure the agent doesn't block indefinitely in the packer since it now has at least
	 * one other data_vio waiting on it.
	 */
	if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
		return;

	/*
	 * Even though we're waiting, we also have to send ourselves as a one-way message to the
	 * packer to ensure the agent continues executing. This is safe because
	 * cancel_vio_compression() guarantees the agent won't continue executing until this
	 * message arrives in the packer, and because the wait queue link isn't used for sending
	 * the message.
	 */
	data_vio->compression.lock_holder = lock->agent;
	launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
}

/**
 * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
 *                  optimization.
 * @waiter: The data_vio's waiter link.
 * @context: Not used.
 */
static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
{
	write_data_vio(vdo_waiter_as_data_vio(waiter));
}

/**
 * start_bypassing() - Stop using the hash lock.
 * @lock: The hash lock.
 * @agent: The data_vio acting as the agent for the lock.
 *
 * Stops using the hash lock. This is the final transition for hash locks which did not get an
 * error.
 */
static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
{
	lock->state = VDO_HASH_LOCK_BYPASSING;
	exit_hash_lock(agent);
}

void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
{
	struct hash_lock *lock = data_vio->hash_lock;

	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
		exit_hash_lock(data_vio);
		return;
	}

	if (lock->agent == NULL) {
		lock->agent = data_vio;
	} else if (data_vio != lock->agent) {
		exit_hash_lock(data_vio);
		return;
	}

	lock->state = VDO_HASH_LOCK_BYPASSING;

	/* Ensure we don't attempt to update advice when cleaning up. */
	lock->update_advice = false;

	vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);

	if (lock->duplicate_lock != NULL) {
		/* The agent must reference the duplicate zone to launch it. */
		data_vio->duplicate = lock->duplicate;
		launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
		return;
	}

	lock->agent = NULL;
	data_vio->is_duplicate = false;
	exit_hash_lock(data_vio);
}

/**
 * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
 *                      duplicate candidate.
 * @completion: The completion of the data_vio acting as the lock's agent.
 *
 * This continuation is registered in unlock_duplicate_pbn().
 */
static void finish_unlocking(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	struct hash_lock *lock = agent->hash_lock;

	assert_hash_lock_agent(agent, __func__);

	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
			    "must have released the duplicate lock for the hash lock");

	if (!lock->verified) {
		/*
		 * UNLOCKING -> WRITING transition: The lock we released was on an unverified
		 * block, so it must have been a lock on advice we were verifying, not on a
		 * location that was used for deduplication. Go write (or compress) the block to
		 * get a location to dedupe against.
		 */
		start_writing(lock, agent);
		return;
	}

	/*
	 * With the lock released, the verified duplicate block may already have changed and will
	 * need to be re-verified if a waiter arrived.
	 */
	lock->verified = false;

	if (vdo_waitq_has_waiters(&lock->waiters)) {
		/*
		 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
		 * agent was releasing the PBN lock. The current agent exits and the waiter has to
		 * re-lock and re-verify the duplicate location.
		 *
		 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
		 * to re-verify.
		 */
		agent = retire_lock_agent(lock);
		start_locking(lock, agent);
		return;
	}

	/*
	 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
	 * data_vios reference it, so remove it from the lock map and return it to the pool.
	 */
	start_bypassing(lock, agent);
}

/**
 * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
 *                          contained duplicate data.
 * @completion: The completion of the data_vio acting as the lock's agent.
 *
 * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
 * hash zone thread.
 */
static void unlock_duplicate_pbn(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	struct hash_lock *lock = agent->hash_lock;

	assert_data_vio_in_duplicate_zone(agent);
	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
			    "must have a duplicate lock to release");

	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
					   vdo_forget(lock->duplicate_lock));
	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
		complete_data_vio(completion);
		return;
	}

	launch_data_vio_hash_zone_callback(agent, finish_unlocking);
}

/**
 * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
 *                     contained duplicate data.
 * @lock: The hash lock.
 * @agent: The data_vio currently acting as the agent for the lock.
 */
static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
{
	lock->state = VDO_HASH_LOCK_UNLOCKING;
	launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
}

static void release_context(struct dedupe_context *context)
{
	struct hash_zone *zone = context->zone;

	WRITE_ONCE(zone->active, zone->active - 1);
	list_move(&context->list_entry, &zone->available);
}

static void process_update_result(struct data_vio *agent)
{
	struct dedupe_context *context = agent->dedupe_context;

	if ((context == NULL) ||
	    !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
		return;

	agent->dedupe_context = NULL;
	release_context(context);
}

/**
 * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
 * @completion: The completion of the data_vio that performed the update
 *
 * This continuation is registered in start_querying().
 */
static void finish_updating(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	struct hash_lock *lock = agent->hash_lock;

	assert_hash_lock_agent(agent, __func__);

	process_update_result(agent);

	/*
	 * UDS was updated successfully, so don't update again unless the duplicate location
	 * changes due to rollover.
	 */
	lock->update_advice = false;

	if (vdo_waitq_has_waiters(&lock->waiters)) {
		/*
		 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
		 * Send it on the verified dedupe path. The agent is done with the lock, but the
		 * lock may still need to use it to clean up after rollover.
		 */
		start_deduping(lock, agent, true);
		return;
	}

	if (lock->duplicate_lock != NULL) {
		/*
		 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
		 * duplicate PBN lock, so go release it.
		 */
		start_unlocking(lock, agent);
		return;
	}

	/*
	 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
	 * release.
	 */
	start_bypassing(lock, agent);
}

static void query_index(struct data_vio *data_vio, enum uds_request_type operation);

/**
 * start_updating() - Continue deduplication with the last step, updating UDS with the location of
 *                    the duplicate that should be returned as advice in the future.
 * @lock: The hash lock.
 * @agent: The data_vio currently acting as the agent for the lock.
 */
static void start_updating(struct hash_lock *lock, struct data_vio *agent)
{
	lock->state = VDO_HASH_LOCK_UPDATING;

	VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
	VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");

	agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
	set_data_vio_hash_zone_callback(agent, finish_updating);
	query_index(agent, UDS_UPDATE);
}

/**
 * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
 *                     by the hash lock.
 * @lock: The hash lock.
 * @data_vio: The lock holder that has finished deduplicating.
 *
 * If there are other data_vios still sharing the lock, this will just release the data_vio's share
 * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
 * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
 * eventually be released.
 */
static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
{
	struct data_vio *agent = data_vio;

	VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
			    "shouldn't have any lock waiters in DEDUPING");

	/* Just release the lock reference if other data_vios are still deduping. */
	if (lock->reference_count > 1) {
		exit_hash_lock(data_vio);
		return;
	}

	/* The hash lock must have an agent for all other lock states. */
	lock->agent = agent;
	if (lock->update_advice) {
		/*
		 * DEDUPING -> UPDATING transition: The location of the duplicate block changed
		 * since the initial UDS query because of compression, rollover, or because the
		 * query agent didn't have an allocation. The UDS update was delayed in case there
		 * was another change in location, but with only this data_vio using the hash lock,
		 * it's time to update the advice.
		 */
		start_updating(lock, agent);
	} else {
		/*
		 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
		 * location so the hash lock itself can be released (contingent on no new data_vios
		 * arriving in the lock before the agent returns).
		 */
		start_unlocking(lock, agent);
	}
}

/**
 * acquire_lock() - Get the lock for a record name.
 * @zone: The zone responsible for the hash.
 * @hash: The hash to lock.
 * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
 *                the new lock.
 * @lock_ptr: A pointer to receive the hash lock.
 *
 * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
 * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
 * zone. This must only be called in the correct thread for the zone.
 *
 * Return: VDO_SUCCESS or an error code.
 */
static int __must_check acquire_lock(struct hash_zone *zone,
				     const struct uds_record_name *hash,
				     struct hash_lock *replace_lock,
				     struct hash_lock **lock_ptr)
{
	struct hash_lock *lock, *new_lock;
	int result;

	/*
	 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
	 * in the common case of no lock contention.
	 */
	result = VDO_ASSERT(!list_empty(&zone->lock_pool),
			    "never need to wait for a free hash lock");
	if (result != VDO_SUCCESS)
		return result;

	new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
	list_del_init(&new_lock->pool_node);

	/*
	 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
	 * map key.
	 */
	new_lock->hash = *hash;

	result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
				 new_lock, (replace_lock != NULL), (void **) &lock);
	if (result != VDO_SUCCESS) {
		return_hash_lock_to_pool(zone, vdo_forget(new_lock));
		return result;
	}

	if (replace_lock != NULL) {
		/* On mismatch put the old lock back and return a severe error */
		VDO_ASSERT_LOG_ONLY(lock == replace_lock,
				    "old lock must have been in the lock map");
		/* TODO: Check earlier and bail out? */
		VDO_ASSERT_LOG_ONLY(replace_lock->registered,
				    "old lock must have been marked registered");
		replace_lock->registered = false;
	}

	if (lock == replace_lock) {
		lock = new_lock;
		lock->registered = true;
	} else {
		/* There's already a lock for the hash, so we don't need the borrowed lock. */
		return_hash_lock_to_pool(zone, vdo_forget(new_lock));
	}

	*lock_ptr = lock;
	return VDO_SUCCESS;
}

/**
 * enter_forked_lock() - Bind the data_vio to a new hash lock.
 *
 * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
 * on that lock.
 */
static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
{
	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
	struct hash_lock *new_lock = context;

	set_hash_lock(data_vio, new_lock);
	wait_on_hash_lock(new_lock, data_vio);
}

/**
 * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
 * @old_lock: The hash lock to fork.
 * @new_agent: The data_vio that will be the agent for the new lock.
 *
 * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
 * of the old lock in the lock map. The old lock remains active, but will not update advice.
 */
static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
{
	struct hash_lock *new_lock;
	int result;

	result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
			      &new_lock);
	if (result != VDO_SUCCESS) {
		continue_data_vio_with_error(new_agent, result);
		return;
	}

	/*
	 * Only one of the two locks should update UDS. The old lock is out of references, so it
	 * would be poor dedupe advice in the short term.
	 */
	old_lock->update_advice = false;
	new_lock->update_advice = true;

	set_hash_lock(new_agent, new_lock);
	new_lock->agent = new_agent;

	vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);

	new_agent->is_duplicate = false;
	start_writing(new_lock, new_agent);
}

/**
 * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
 *                   path.
 * @lock: The hash lock.
 * @data_vio: The data_vio to deduplicate using the hash lock.
 * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
 *
 * If no increments are available, this will roll over to a new hash lock and launch the data_vio
 * as the writing agent for that lock.
 */
static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
			  bool has_claim)
{
	if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
		/* Out of increments, so must roll over to a new lock. */
		fork_hash_lock(lock, data_vio);
		return;
	}

	/* Deduplicate against the lock's verified location. */
	set_duplicate_location(data_vio, lock->duplicate);
	data_vio->new_mapped = data_vio->duplicate;
	update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
}

/**
 * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
 *                    true copy of their data on disk.
 * @lock: The hash lock.
 * @agent: The data_vio acting as the agent for the lock.
 * @agent_is_done: true only if the agent has already written or deduplicated against its data.
 *
 * If the agent itself needs to deduplicate, an increment for it must already have been claimed
 * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
 */
static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
			   bool agent_is_done)
{
	lock->state = VDO_HASH_LOCK_DEDUPING;

	/*
	 * We don't take the downgraded allocation lock from the agent unless we actually need to
	 * deduplicate against it.
	 */
	if (lock->duplicate_lock == NULL) {
		VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
				    "compression must have shared a lock");
		VDO_ASSERT_LOG_ONLY(agent_is_done,
				    "agent must have written the new duplicate");
		transfer_allocation_lock(agent);
	}

	VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
			    "duplicate_lock must be a PBN read lock");

	/*
	 * This state is not like any of the other states. There is no designated agent--the agent
	 * transitioning to this state and all the waiters will be launched to deduplicate in
	 * parallel.
	 */
	lock->agent = NULL;

	/*
	 * Launch the agent (if not already deduplicated) and as many lock waiters as we have
	 * available increments for on the dedupe path. If we run out of increments, rollover will
	 * be triggered and the remaining waiters will be transferred to the new lock.
	 */
	if (!agent_is_done) {
		launch_dedupe(lock, agent, true);
		agent = NULL;
	}
	while (vdo_waitq_has_waiters(&lock->waiters))
		launch_dedupe(lock, dequeue_lock_waiter(lock), false);

	if (agent_is_done) {
		/*
		 * In the degenerate case where all the waiters rolled over to a new lock, this
		 * will continue to use the old agent to clean up this lock, and otherwise it just
		 * lets the agent exit the lock.
		 */
		finish_deduping(lock, agent);
	}
}

/**
 * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
 * @stat: The statistic field to increment.
 */
static inline void increment_stat(u64 *stat)
{
	/*
	 * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
	 * affecting other threads reading stats.
	 */
	WRITE_ONCE(*stat, *stat + 1);
}

/**
 * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
 *                      duplicate candidate.
 * @completion: The completion of the data_vio used to verify dedupe
 *
 * This continuation is registered in start_verifying().
 */
static void finish_verifying(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	struct hash_lock *lock = agent->hash_lock;

	assert_hash_lock_agent(agent, __func__);

	lock->verified = agent->is_duplicate;

	/*
	 * Only count the result of the initial verification of the advice as valid or stale, and
	 * not any re-verifications due to PBN lock releases.
	 */
	if (!lock->verify_counted) {
		lock->verify_counted = true;
		if (lock->verified)
			increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
		else
			increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
	}

	/*
	 * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
	 * claim a reference count increment for the agent.
	 */
	if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
		agent->is_duplicate = false;
		lock->verified = false;
	}

	if (lock->verified) {
		/*
		 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
		 * deduplicating against it, if references are available.
		 */
		start_deduping(lock, agent, false);
	} else {
		/*
		 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
		 * dedupe and roll over immediately, which would fail because it would leave the
		 * lock without an agent to release the PBN lock. In both cases, the data will have
		 * to be written or compressed, but first the advice PBN must be unlocked by the
		 * VERIFYING agent.
		 */
		lock->update_advice = true;
		start_unlocking(lock, agent);
	}
}

static bool blocks_equal(char *block1, char *block2)
{
	int i;

	for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
		if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
			return false;
	}

	return true;
}

static void verify_callback(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);

	agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
	launch_data_vio_hash_zone_callback(agent, finish_verifying);
}

static void uncompress_and_verify(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	int result;

	result = uncompress_data_vio(agent, agent->duplicate.state,
				     agent->scratch_block);
	if (result == VDO_SUCCESS) {
		verify_callback(completion);
		return;
	}

	agent->is_duplicate = false;
	launch_data_vio_hash_zone_callback(agent, finish_verifying);
}

static void verify_endio(struct bio *bio)
{
	struct data_vio *agent = vio_as_data_vio(bio->bi_private);
	int result = blk_status_to_errno(bio->bi_status);

	vdo_count_completed_bios(bio);
	if (result != VDO_SUCCESS) {
		agent->is_duplicate = false;
		launch_data_vio_hash_zone_callback(agent, finish_verifying);
		return;
	}

	if (vdo_is_state_compressed(agent->duplicate.state)) {
		launch_data_vio_cpu_callback(agent, uncompress_and_verify,
					     CPU_Q_COMPRESS_BLOCK_PRIORITY);
		return;
	}

	launch_data_vio_cpu_callback(agent, verify_callback,
				     CPU_Q_COMPLETE_READ_PRIORITY);
}

/**
 * start_verifying() - Begin the data verification phase.
 * @lock: The hash lock (must be LOCKING).
 * @agent: The data_vio to use to read and compare candidate data.
 *
 * Continue the deduplication path for a hash lock by using the agent to read (and possibly
 * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
 * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
 * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
 * dedupe.
 */
static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
{
	int result;
	struct vio *vio = &agent->vio;
	char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
			(char *) agent->compression.block :
			agent->scratch_block);

	lock->state = VDO_HASH_LOCK_VERIFYING;
	VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");

	agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
	result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
			       agent->duplicate.pbn);
	if (result != VDO_SUCCESS) {
		set_data_vio_hash_zone_callback(agent, finish_verifying);
		continue_data_vio_with_error(agent, result);
		return;
	}

	set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
	vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
}

/**
 * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
 *                    lock on the candidate duplicate block.
 * @completion: The completion of the data_vio that attempted to get the read lock.
 *
 * This continuation is registered in lock_duplicate_pbn().
 */
static void finish_locking(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	struct hash_lock *lock = agent->hash_lock;

	assert_hash_lock_agent(agent, __func__);

	if (!agent->is_duplicate) {
		VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
				    "must not hold duplicate_lock if not flagged as a duplicate");
		/*
		 * LOCKING -> WRITING transition: The advice block is being modified or has no
		 * available references, so try to write or compress the data, remembering to
		 * update UDS later with the new advice.
		 */
		increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
		lock->update_advice = true;
		start_writing(lock, agent);
		return;
	}

	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
			    "must hold duplicate_lock if flagged as a duplicate");

	if (!lock->verified) {
		/*
		 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
		 * the candidate duplicate and comparing it to the agent's data to decide whether
		 * it is a true duplicate or stale advice.
		 */
		start_verifying(lock, agent);
		return;
	}

	if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
		/*
		 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
		 * available increments left. Must first release the useless PBN read lock before
		 * rolling over to a new copy of the block.
		 */
		agent->is_duplicate = false;
		lock->verified = false;
		lock->update_advice = true;
		start_unlocking(lock, agent);
		return;
	}

	/*
	 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
	 * against a location that was previously verified or written to.
	 */
	start_deduping(lock, agent, false);
}

static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
					  struct slab_depot *depot)
{
	/* Ensure that the newly-locked block is referenced. */
	struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
	int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);

	if (result == VDO_SUCCESS)
		return true;

	vdo_log_warning_strerror(result,
				 "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
	agent->is_duplicate = false;
	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
					   agent->duplicate.pbn, lock);
	continue_data_vio_with_error(agent, result);
	return false;
}

/**
 * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
 *                        duplicate data (compressed or uncompressed).
 * @completion: The completion of the data_vio attempting to acquire the physical block lock on
 *              behalf of its hash lock.
 *
 * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
 * cleared before calling back. This continuation is launched from start_locking(), and calls back
 * to finish_locking() on the hash zone thread.
 */
static void lock_duplicate_pbn(struct vdo_completion *completion)
{
	unsigned int increment_limit;
	struct pbn_lock *lock;
	int result;

	struct data_vio *agent = as_data_vio(completion);
	struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
	struct physical_zone *zone = agent->duplicate.zone;

	assert_data_vio_in_duplicate_zone(agent);

	set_data_vio_hash_zone_callback(agent, finish_locking);

	/*
	 * While in the zone that owns it, find out how many additional references can be made to
	 * the block if it turns out to truly be a duplicate.
	 */
	increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
	if (increment_limit == 0) {
		/*
		 * We could deduplicate against it later if a reference happened to be released
		 * during verification, but it's probably better to bail out now.
		 */
		agent->is_duplicate = false;
		continue_data_vio(agent);
		return;
	}

	result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
						    VIO_READ_LOCK, &lock);
	if (result != VDO_SUCCESS) {
		continue_data_vio_with_error(agent, result);
		return;
	}

	if (!vdo_is_pbn_read_lock(lock)) {
		/*
		 * There are three cases of write locks: uncompressed data block writes, compressed
		 * (packed) block writes, and block map page writes. In all three cases, we give up
		 * on trying to verify the advice and don't bother to try deduplicate against the
		 * data in the write lock holder.
		 *
		 * 1) We don't ever want to try to deduplicate against a block map page.
		 *
		 * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
		 * because of the chance of matching it, and because we don't record advice for it,
		 * but for the uncompressed representation of all the fragments it contains. The
		 * only way we'd be getting lock contention is if we've written the same
		 * representation coincidentally before, had it become unreferenced, and it just
		 * happened to be packed together from compressed writes when we go to verify the
		 * lucky advice. Giving up is a minuscule loss of potential dedupe.
		 *
		 * 2b) If the advice is for a slot of a compressed block, it's about to get
		 * smashed, and the write smashing it cannot contain our data--it would have to be
		 * writing on behalf of our hash lock, but that's impossible since we're the lock
		 * agent.
		 *
		 * 3a) If the lock is held by a data_vio with different data, the advice is already
		 * stale or is about to become stale.
		 *
		 * 3b) If the lock is held by a data_vio that matches us, we may as well either
		 * write it ourselves (or reference the copy we already wrote) instead of
		 * potentially having many duplicates wait for the lock holder to write, journal,
		 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
		 * update in the very rare case of advice for a free block that just happened to be
		 * allocated to a data_vio with the same hash. There's also a chance to save on a
		 * block write, at the cost of a block verify. Saving on a full block compare in
		 * all stale advice cases almost certainly outweighs saving a UDS update and
		 * trading a write for a read in a lucky case where advice would have been saved
		 * from becoming stale.
		 */
		agent->is_duplicate = false;
		continue_data_vio(agent);
		return;
	}

	if (lock->holder_count == 0) {
		if (!acquire_provisional_reference(agent, lock, depot))
			return;

		/*
		 * The increment limit we grabbed earlier is still valid. The lock now holds the
		 * rights to acquire all those references. Those rights will be claimed by hash
		 * locks sharing this read lock.
		 */
		lock->increment_limit = increment_limit;
	}

	/*
	 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
	 */
	set_duplicate_lock(agent->hash_lock, lock);

	/*
	 * TODO: Optimization: We could directly launch the block verify, then switch to a hash
	 * thread.
	 */
	continue_data_vio(agent);
}

/**
 * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
 *                   potential duplicate through its agent.
 * @lock: The hash lock (currently must be QUERYING).
 * @agent: The data_vio bearing the dedupe advice.
 */
static void start_locking(struct hash_lock *lock, struct data_vio *agent)
{
	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
			    "must not acquire a duplicate lock when already holding it");

	lock->state = VDO_HASH_LOCK_LOCKING;

	/*
	 * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
	 * accepting the advice, and don't explicitly change lock states (or use an agent-local
	 * state, or an atomic), we can avoid a thread transition here.
	 */
	agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
	launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
}

/**
 * finish_writing() - Re-entry point for the lock agent after it has finished writing or
 *                    compressing its copy of the data block.
 * @lock: The hash lock, which must be in state WRITING.
 * @agent: The data_vio that wrote its data for the lock.
 *
 * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
 * may not be finished with it, as a UDS update might still be needed.
 *
 * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
 * the lock to deduplicate against the just-written block. If there are no other lock holders, the
 * agent either exits (and later tears down the hash lock), or it remains the agent and updates
 * UDS.
 */
static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
{
	/*
	 * Dedupe against the data block or compressed block slot the agent wrote. Since we know
	 * the write succeeded, there's no need to verify it.
	 */
	lock->duplicate = agent->new_mapped;
	lock->verified = true;

	if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) {
		/*
		 * Compression means the location we gave in the UDS query is not the location
		 * we're using to deduplicate.
		 */
		lock->update_advice = true;
	}

	/* If there are any waiters, we need to start deduping them. */
	if (vdo_waitq_has_waiters(&lock->waiters)) {
		/*
		 * WRITING -> DEDUPING transition: an asynchronously-written block failed to
		 * compress, so the PBN lock on the written copy was already transferred. The agent
		 * is done with the lock, but the lock may still need to use it to clean up after
		 * rollover.
		 */
		start_deduping(lock, agent, true);
		return;
	}

	/*
	 * There are no waiters and the agent has successfully written, so take a step towards
	 * being able to release the hash lock (or just release it).
	 */
	if (lock->update_advice) {
		/*
		 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
		 * retain the WRITING agent and use it to launch the update. The happens on
		 * compression, rollover, or the QUERYING agent not having an allocation.
		 */
		start_updating(lock, agent);
	} else if (lock->duplicate_lock != NULL) {
		/*
		 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
		 * compressed write gave us a shared duplicate lock that we must release.
		 */
		set_duplicate_location(agent, lock->duplicate);
		start_unlocking(lock, agent);
	} else {
		/*
		 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
		 * duplicate lock held, so both the agent and lock have no more work to do. The
		 * agent will release its allocation lock in cleanup.
		 */
		start_bypassing(lock, agent);
	}
}

/**
 * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
 * @lock: The hash lock to modify.
 *
 * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
 * return the new agent. Otherwise, just return the current agent.
 */
static struct data_vio *select_writing_agent(struct hash_lock *lock)
{
	struct vdo_wait_queue temp_queue;
	struct data_vio *data_vio;

	vdo_waitq_init(&temp_queue);

	/*
	 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
	 * search, but it only happens when nearly out of space.
	 */
	while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
	       !data_vio_has_allocation(data_vio)) {
		/* Use the lower-level enqueue since we're just moving waiters around. */
		vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
	}

	if (data_vio != NULL) {
		/*
		 * Move the rest of the waiters over to the temp queue, preserving the order they
		 * arrived at the lock.
		 */
		vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);

		/*
		 * The current agent is being replaced and will have to wait to dedupe; make it the
		 * first waiter since it was the first to reach the lock.
		 */
		vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
		lock->agent = data_vio;
	} else {
		/* No one has an allocation, so keep the current agent. */
		data_vio = lock->agent;
	}

	/* Swap all the waiters back onto the lock's queue. */
	vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
	return data_vio;
}

/**
 * start_writing() - Begin the non-duplicate write path.
 * @lock: The hash lock (currently must be QUERYING).
 * @agent: The data_vio currently acting as the agent for the lock.
 *
 * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
 * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
 * path.
 */
static void start_writing(struct hash_lock *lock, struct data_vio *agent)
{
	lock->state = VDO_HASH_LOCK_WRITING;

	/*
	 * The agent might not have received an allocation and so can't be used for writing, but
	 * it's entirely possible that one of the waiters did.
	 */
	if (!data_vio_has_allocation(agent)) {
		agent = select_writing_agent(lock);
		/* If none of the waiters had an allocation, the writes all have to fail. */
		if (!data_vio_has_allocation(agent)) {
			/*
			 * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
			 * fail immediately if they don't have an allocation? It might be possible
			 * that on some path there would be non-waiters still referencing the lock,
			 * so it would remain in the map as everything is currently spelled, even
			 * if the agent and all waiters release.
			 */
			continue_data_vio_with_error(agent, VDO_NO_SPACE);
			return;
		}
	}

	/*
	 * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
	 * there are any other data_vios waiting.
	 */
	if (vdo_waitq_has_waiters(&lock->waiters))
		cancel_data_vio_compression(agent);

	/*
	 * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
	 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
	 */
	launch_compress_data_vio(agent);
}

/*
 * Decode VDO duplicate advice from the old_metadata field of a UDS request.
 * Returns true if valid advice was found and decoded
 */
static bool decode_uds_advice(struct dedupe_context *context)
{
	const struct uds_request *request = &context->request;
	struct data_vio *data_vio = context->requestor;
	size_t offset = 0;
	const struct uds_record_data *encoding = &request->old_metadata;
	struct vdo *vdo = vdo_from_data_vio(data_vio);
	struct zoned_pbn *advice = &data_vio->duplicate;
	u8 version;
	int result;

	if ((request->status != UDS_SUCCESS) || !request->found)
		return false;

	version = encoding->data[offset++];
	if (version != UDS_ADVICE_VERSION) {
		vdo_log_error("invalid UDS advice version code %u", version);
		return false;
	}

	advice->state = encoding->data[offset++];
	advice->pbn = get_unaligned_le64(&encoding->data[offset]);
	offset += sizeof(u64);
	BUG_ON(offset != UDS_ADVICE_SIZE);

	/* Don't use advice that's clearly meaningless. */
	if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
		vdo_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
			      (unsigned long long) advice->pbn, advice->state,
			      (unsigned long long) data_vio->logical.lbn);
		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
		return false;
	}

	result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
	if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
		vdo_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
			      (unsigned long long) advice->pbn,
			      (unsigned long long) data_vio->logical.lbn);
		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
		return false;
	}

	return true;
}

static void process_query_result(struct data_vio *agent)
{
	struct dedupe_context *context = agent->dedupe_context;

	if (context == NULL)
		return;

	if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
		agent->is_duplicate = decode_uds_advice(context);
		agent->dedupe_context = NULL;
		release_context(context);
	}
}

/**
 * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
 * @completion: The completion of the data_vio that performed the query.
 *
 * This continuation is registered in start_querying().
 */
static void finish_querying(struct vdo_completion *completion)
{
	struct data_vio *agent = as_data_vio(completion);
	struct hash_lock *lock = agent->hash_lock;

	assert_hash_lock_agent(agent, __func__);

	process_query_result(agent);

	if (agent->is_duplicate) {
		lock->duplicate = agent->duplicate;
		/*
		 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
		 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
		 * that the advice can be used.
		 */
		start_locking(lock, agent);
	} else {
		/*
		 * The agent will be used as the duplicate if has an allocation; if it does, that
		 * location was posted to UDS, so no update will be needed.
		 */
		lock->update_advice = !data_vio_has_allocation(agent);
		/*
		 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
		 * so try to write or compress the data.
		 */
		start_writing(lock, agent);
	}
}

/**
 * start_querying() - Start deduplication for a hash lock.
 * @lock: The initialized hash lock.
 * @data_vio: The data_vio that has just obtained the new lock.
 *
 * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
 * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
 * query on behalf of the lock.
 */
static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
{
	lock->agent = data_vio;
	lock->state = VDO_HASH_LOCK_QUERYING;
	data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
	set_data_vio_hash_zone_callback(data_vio, finish_querying);
	query_index(data_vio,
		    (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
}

/**
 * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
 *                             unimplemented or unusable state and continue the data_vio with an
 *                             error.
 * @lock: The hash lock.
 * @data_vio: The data_vio attempting to enter the lock.
 */
static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
{
	VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
			    get_hash_lock_state_name(lock->state));
	continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
}

/**
 * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
 *                            deduplicating.
 * @data_vio: The data_vio to continue processing in its hash lock.
 *
 * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
 * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
 * lock, or update the UDS index, or simply release its share of the lock.
 *
 * Context: This must only be called in the correct thread for the hash zone.
 */
void vdo_continue_hash_lock(struct vdo_completion *completion)
{
	struct data_vio *data_vio = as_data_vio(completion);
	struct hash_lock *lock = data_vio->hash_lock;

	switch (lock->state) {
	case VDO_HASH_LOCK_WRITING:
		VDO_ASSERT_LOG_ONLY(data_vio == lock->agent,
				    "only the lock agent may continue the lock");
		finish_writing(lock, data_vio);
		break;

	case VDO_HASH_LOCK_DEDUPING:
		finish_deduping(lock, data_vio);
		break;

	case VDO_HASH_LOCK_BYPASSING:
		/* This data_vio has finished the write path and the lock doesn't need it. */
		exit_hash_lock(data_vio);
		break;

	case VDO_HASH_LOCK_INITIALIZING:
	case VDO_HASH_LOCK_QUERYING:
	case VDO_HASH_LOCK_UPDATING:
	case VDO_HASH_LOCK_LOCKING:
	case VDO_HASH_LOCK_VERIFYING:
	case VDO_HASH_LOCK_UNLOCKING:
		/* A lock in this state should never be re-entered. */
		report_bogus_lock_state(lock, data_vio);
		break;

	default:
		report_bogus_lock_state(lock, data_vio);
	}
}

/**
 * is_hash_collision() - Check to see if a hash collision has occurred.
 * @lock: The lock to check.
 * @candidate: The data_vio seeking to share the lock.
 *
 * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
 * share the lock, which should only be possible in the extremely unlikely case of a hash
 * collision.
 *
 * Return: true if the given data_vio must not share the lock because it doesn't have the same data
 *         as the lock holders.
 */
static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
{
	struct data_vio *lock_holder;
	struct hash_zone *zone;
	bool collides;

	if (list_empty(&lock->duplicate_ring))
		return false;

	lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio,
				       hash_lock_entry);
	zone = candidate->hash_zone;
	collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
	if (collides)
		increment_stat(&zone->statistics.concurrent_hash_collisions);
	else
		increment_stat(&zone->statistics.concurrent_data_matches);

	return collides;
}

static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
{
	int result;

	/* FIXME: BUG_ON() and/or enter read-only mode? */
	result = VDO_ASSERT(data_vio->hash_lock == NULL,
			    "must not already hold a hash lock");
	if (result != VDO_SUCCESS)
		return result;

	result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry),
			    "must not already be a member of a hash lock ring");
	if (result != VDO_SUCCESS)
		return result;

	return VDO_ASSERT(data_vio->recovery_sequence_number == 0,
			  "must not hold a recovery lock when getting a hash lock");
}

/**
 * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
 * @data_vio: The data_vio acquiring a lock on its record name.
 *
 * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
 * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
 * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
 * a lock reference.
 */
void vdo_acquire_hash_lock(struct vdo_completion *completion)
{
	struct data_vio *data_vio = as_data_vio(completion);
	struct hash_lock *lock;
	int result;

	assert_data_vio_in_hash_zone(data_vio);

	result = assert_hash_lock_preconditions(data_vio);
	if (result != VDO_SUCCESS) {
		continue_data_vio_with_error(data_vio, result);
		return;
	}

	result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
	if (result != VDO_SUCCESS) {
		continue_data_vio_with_error(data_vio, result);
		return;
	}

	if (is_hash_collision(lock, data_vio)) {
		/*
		 * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
		 * corruption. Bypass optimization entirely. We can't compress a data_vio without
		 * a hash_lock as the compressed write depends on the hash_lock to manage the
		 * references for the compressed block.
		 */
		write_data_vio(data_vio);
		return;
	}

	set_hash_lock(data_vio, lock);
	switch (lock->state) {
	case VDO_HASH_LOCK_INITIALIZING:
		start_querying(lock, data_vio);
		return;

	case VDO_HASH_LOCK_QUERYING:
	case VDO_HASH_LOCK_WRITING:
	case VDO_HASH_LOCK_UPDATING:
	case VDO_HASH_LOCK_LOCKING:
	case VDO_HASH_LOCK_VERIFYING:
	case VDO_HASH_LOCK_UNLOCKING:
		/* The lock is busy, and can't be shared yet. */
		wait_on_hash_lock(lock, data_vio);
		return;

	case VDO_HASH_LOCK_BYPASSING:
		/* We can't use this lock, so bypass optimization entirely. */
		vdo_release_hash_lock(data_vio);
		write_data_vio(data_vio);
		return;

	case VDO_HASH_LOCK_DEDUPING:
		launch_dedupe(lock, data_vio, false);
		return;

	default:
		/* A lock in this state should not be acquired by new VIOs. */
		report_bogus_lock_state(lock, data_vio);
	}
}

/**
 * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
 *                           data_vio's reference to it.
 * @data_vio: The data_vio releasing its hash lock.
 *
 * If the data_vio is the only one holding the lock, this also releases any resources or locks used
 * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
 * returns the lock to the hash zone's lock pool.
 *
 * Context: This must only be called in the correct thread for the hash zone.
 */
void vdo_release_hash_lock(struct data_vio *data_vio)
{
	u64 lock_key;
	struct hash_lock *lock = data_vio->hash_lock;
	struct hash_zone *zone = data_vio->hash_zone;

	if (lock == NULL)
		return;

	set_hash_lock(data_vio, NULL);

	if (lock->reference_count > 0) {
		/* The lock is still in use by other data_vios. */
		return;
	}

	lock_key = hash_lock_key(lock);
	if (lock->registered) {
		struct hash_lock *removed;

		removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
		VDO_ASSERT_LOG_ONLY(lock == removed,
				    "hash lock being released must have been mapped");
	} else {
		VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
				    "unregistered hash lock must not be in the lock map");
	}

	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
			    "hash lock returned to zone must have no waiters");
	VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
			    "hash lock returned to zone must not reference a PBN lock");
	VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
			    "returned hash lock must not be in use with state %s",
			    get_hash_lock_state_name(lock->state));
	VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
			    "hash lock returned to zone must not be in a pool ring");
	VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
			    "hash lock returned to zone must not reference DataVIOs");

	return_hash_lock_to_pool(zone, lock);
}

/**
 * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
 *                              data_vio's hash lock, converting it to a duplicate PBN lock.
 * @data_vio: The data_vio holding the allocation lock to transfer.
 */
static void transfer_allocation_lock(struct data_vio *data_vio)
{
	struct allocation *allocation = &data_vio->allocation;
	struct hash_lock *hash_lock = data_vio->hash_lock;

	VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
			    "transferred lock must be for the block written");

	allocation->pbn = VDO_ZERO_BLOCK;

	VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
			    "must have downgraded the allocation lock before transfer");

	hash_lock->duplicate = data_vio->new_mapped;
	data_vio->duplicate = data_vio->new_mapped;

	/*
	 * Since the lock is being transferred, the holder count doesn't change (and isn't even
	 * safe to examine on this thread).
	 */
	hash_lock->duplicate_lock = vdo_forget(allocation->lock);
}

/**
 * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
 *                                     on the compressed block to which its data was just written.
 * @data_vio: The data_vio which was just compressed.
 * @pbn_lock: The PBN lock on the compressed block.
 *
 * If the lock is still a write lock (as it will be for the first share), it will be converted to a
 * read lock. This also reserves a reference count increment for the data_vio.
 */
void vdo_share_compressed_write_lock(struct data_vio *data_vio,
				     struct pbn_lock *pbn_lock)
{
	bool claimed;

	VDO_ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
			    "a duplicate PBN lock should not exist when writing");
	VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
			    "lock transfer must be for a compressed write");
	assert_data_vio_in_new_mapped_zone(data_vio);

	/* First sharer downgrades the lock. */
	if (!vdo_is_pbn_read_lock(pbn_lock))
		vdo_downgrade_pbn_write_lock(pbn_lock, true);

	/*
	 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
	 * has had a chance to journal a reference.
	 */
	data_vio->duplicate = data_vio->new_mapped;
	data_vio->hash_lock->duplicate = data_vio->new_mapped;
	set_duplicate_lock(data_vio->hash_lock, pbn_lock);

	/*
	 * Claim a reference for this data_vio. Necessary since another hash_lock might start
	 * deduplicating against it before our incRef.
	 */
	claimed = vdo_claim_pbn_lock_increment(pbn_lock);
	VDO_ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
}

static void start_uds_queue(void *ptr)
{
	/*
	 * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
	 * during the UDS calls that open or close an index, but those allocations can safely sleep
	 * while reserving a large amount of memory. We could use an allocations_allowed boolean
	 * (like the base threads do), but it would be an unnecessary embellishment.
	 */
	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());

	vdo_register_allocating_thread(&thread->allocating_thread, NULL);
}

static void finish_uds_queue(void *ptr __always_unused)
{
	vdo_unregister_allocating_thread();
}

static void close_index(struct hash_zones *zones)
	__must_hold(&zones->lock)
{
	int result;

	/*
	 * Change the index state so that get_index_statistics() will not try to use the index
	 * session we are closing.
	 */
	zones->index_state = IS_CHANGING;
	/* Close the index session, while not holding the lock. */
	spin_unlock(&zones->lock);
	result = uds_close_index(zones->index_session);

	if (result != UDS_SUCCESS)
		vdo_log_error_strerror(result, "Error closing index");
	spin_lock(&zones->lock);
	zones->index_state = IS_CLOSED;
	zones->error_flag |= result != UDS_SUCCESS;
	/* ASSERTION: We leave in IS_CLOSED state. */
}

static void open_index(struct hash_zones *zones)
	__must_hold(&zones->lock)
{
	/* ASSERTION: We enter in IS_CLOSED state. */
	int result;
	bool create_flag = zones->create_flag;

	zones->create_flag = false;
	/*
	 * Change the index state so that the it will be reported to the outside world as
	 * "opening".
	 */
	zones->index_state = IS_CHANGING;
	zones->error_flag = false;

	/* Open the index session, while not holding the lock */
	spin_unlock(&zones->lock);
	result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
				&zones->parameters, zones->index_session);
	if (result != UDS_SUCCESS)
		vdo_log_error_strerror(result, "Error opening index");

	spin_lock(&zones->lock);
	if (!create_flag) {
		switch (result) {
		case -ENOENT:
			/*
			 * Either there is no index, or there is no way we can recover the index.
			 * We will be called again and try to create a new index.
			 */
			zones->index_state = IS_CLOSED;
			zones->create_flag = true;
			return;
		default:
			break;
		}
	}
	if (result == UDS_SUCCESS) {
		zones->index_state = IS_OPENED;
	} else {
		zones->index_state = IS_CLOSED;
		zones->index_target = IS_CLOSED;
		zones->error_flag = true;
		spin_unlock(&zones->lock);
		vdo_log_info("Setting UDS index target state to error");
		spin_lock(&zones->lock);
	}
	/*
	 * ASSERTION: On success, we leave in IS_OPENED state.
	 * ASSERTION: On failure, we leave in IS_CLOSED state.
	 */
}

static void change_dedupe_state(struct vdo_completion *completion)
{
	struct hash_zones *zones = as_hash_zones(completion);

	spin_lock(&zones->lock);

	/* Loop until the index is in the target state and the create flag is clear. */
	while (vdo_is_state_normal(&zones->state) &&
	       ((zones->index_state != zones->index_target) || zones->create_flag)) {
		if (zones->index_state == IS_OPENED)
			close_index(zones);
		else
			open_index(zones);
	}

	zones->changing = false;
	spin_unlock(&zones->lock);
}

static void start_expiration_timer(struct dedupe_context *context)
{
	u64 start_time = context->submission_jiffies;
	u64 end_time;

	if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
				DEDUPE_QUERY_TIMER_RUNNING))
		return;

	end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
		       jiffies + vdo_dedupe_index_min_timer_jiffies);
	mod_timer(&context->zone->timer, end_time);
}

/**
 * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
 *                            expiration time without getting answers, so we timed them out.
 * @zones: the hash zones.
 * @timeouts: the number of newly timed out requests.
 */
static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
{
	atomic64_add(timeouts, &zones->timeouts);
	spin_lock(&zones->lock);
	if (__ratelimit(&zones->ratelimiter)) {
		u64 unreported = atomic64_read(&zones->timeouts);

		unreported -= zones->reported_timeouts;
		vdo_log_debug("UDS index timeout on %llu requests",
			      (unsigned long long) unreported);
		zones->reported_timeouts += unreported;
	}
	spin_unlock(&zones->lock);
}

static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
{
	int result;
	off_t uds_offset;
	struct volume_geometry geometry = vdo->geometry;
	static const struct vdo_work_queue_type uds_queue_type = {
		.start = start_uds_queue,
		.finish = finish_uds_queue,
		.max_priority = UDS_Q_MAX_PRIORITY,
		.default_priority = UDS_Q_PRIORITY,
	};

	vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
	vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);

	/*
	 * Since we will save up the timeouts that would have been reported but were ratelimited,
	 * we don't need to report ratelimiting.
	 */
	ratelimit_default_init(&zones->ratelimiter);
	ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
	uds_offset = ((vdo_get_index_region_start(geometry) -
		       geometry.bio_offset) * VDO_BLOCK_SIZE);
	zones->parameters = (struct uds_parameters) {
		.bdev = vdo->device_config->owned_device->bdev,
		.offset = uds_offset,
		.size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
		.memory_size = geometry.index_config.mem,
		.sparse = geometry.index_config.sparse,
		.nonce = (u64) geometry.nonce,
	};

	result = uds_create_index_session(&zones->index_session);
	if (result != UDS_SUCCESS)
		return result;

	result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
				 1, NULL);
	if (result != VDO_SUCCESS) {
		uds_destroy_index_session(vdo_forget(zones->index_session));
		vdo_log_error("UDS index queue initialization failed (%d)", result);
		return result;
	}

	vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
	vdo_set_completion_callback(&zones->completion, change_dedupe_state,
				    vdo->thread_config.dedupe_thread);
	return VDO_SUCCESS;
}

/**
 * finish_index_operation() - This is the UDS callback for index queries.
 * @request: The uds request which has just completed.
 */
static void finish_index_operation(struct uds_request *request)
{
	struct dedupe_context *context = container_of(request, struct dedupe_context,
						      request);

	if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
				 DEDUPE_CONTEXT_COMPLETE)) {
		/*
		 * This query has not timed out, so send its data_vio back to its hash zone to
		 * process the results.
		 */
		continue_data_vio(context->requestor);
		return;
	}

	/*
	 * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
	 * data_vio has already moved on.
	 */
	if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
				  DEDUPE_CONTEXT_TIMED_OUT_COMPLETE)) {
		VDO_ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
				    atomic_read(&context->state));
	}

	vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
}

/**
 * check_for_drain_complete() - Check whether this zone has drained.
 * @zone: The zone to check.
 */
static void check_for_drain_complete(struct hash_zone *zone)
{
	data_vio_count_t recycled = 0;

	if (!vdo_is_state_draining(&zone->state))
		return;

	if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
	    change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
			       DEDUPE_QUERY_TIMER_IDLE)) {
		del_timer_sync(&zone->timer);
	} else {
		/*
		 * There is an in flight time-out, which must get processed before we can continue.
		 */
		return;
	}

	for (;;) {
		struct dedupe_context *context;
		struct funnel_queue_entry *entry;

		entry = vdo_funnel_queue_poll(zone->timed_out_complete);
		if (entry == NULL)
			break;

		context = container_of(entry, struct dedupe_context, queue_entry);
		atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
		list_add(&context->list_entry, &zone->available);
		recycled++;
	}

	if (recycled > 0)
		WRITE_ONCE(zone->active, zone->active - recycled);
	VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
	vdo_finish_draining(&zone->state);
}

static void timeout_index_operations_callback(struct vdo_completion *completion)
{
	struct dedupe_context *context, *tmp;
	struct hash_zone *zone = as_hash_zone(completion);
	u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
	unsigned long cutoff = jiffies - timeout_jiffies;
	unsigned int timed_out = 0;

	atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
	list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
		if (cutoff <= context->submission_jiffies) {
			/*
			 * We have reached the oldest query which has not timed out yet, so restart
			 * the timer.
			 */
			start_expiration_timer(context);
			break;
		}

		if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
					  DEDUPE_CONTEXT_TIMED_OUT)) {
			/*
			 * This context completed between the time the timeout fired, and now. We
			 * can treat it as a successful query, its requestor is already enqueued
			 * to process it.
			 */
			continue;
		}

		/*
		 * Remove this context from the pending list so we won't look at it again on a
		 * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
		 * send its requestor on its way.
		 */
		list_del_init(&context->list_entry);
		context->requestor->dedupe_context = NULL;
		continue_data_vio(context->requestor);
		timed_out++;
	}

	if (timed_out > 0)
		report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);

	check_for_drain_complete(zone);
}

static void timeout_index_operations(struct timer_list *t)
{
	struct hash_zone *zone = from_timer(zone, t, timer);

	if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
			       DEDUPE_QUERY_TIMER_FIRED))
		vdo_launch_completion(&zone->completion);
}

static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
					zone_count_t zone_number)
{
	int result;
	data_vio_count_t i;
	struct hash_zone *zone = &zones->zones[zone_number];

	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
	if (result != VDO_SUCCESS)
		return result;

	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
	zone->zone_number = zone_number;
	zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
	vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
	vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
				    zone->thread_id);
	INIT_LIST_HEAD(&zone->lock_pool);
	result = vdo_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
			      &zone->lock_array);
	if (result != VDO_SUCCESS)
		return result;

	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
		return_hash_lock_to_pool(zone, &zone->lock_array[i]);

	INIT_LIST_HEAD(&zone->available);
	INIT_LIST_HEAD(&zone->pending);
	result = vdo_make_funnel_queue(&zone->timed_out_complete);
	if (result != VDO_SUCCESS)
		return result;

	timer_setup(&zone->timer, timeout_index_operations, 0);

	for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
		struct dedupe_context *context = &zone->contexts[i];

		context->zone = zone;
		context->request.callback = finish_index_operation;
		context->request.session = zones->index_session;
		list_add(&context->list_entry, &zone->available);
	}

	return vdo_make_default_thread(vdo, zone->thread_id);
}

/** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
{
	struct hash_zones *zones = context;

	return zones->zones[zone_number].thread_id;
}

/**
 * vdo_make_hash_zones() - Create the hash zones.
 *
 * @vdo: The vdo to which the zone will belong.
 * @zones_ptr: A pointer to hold the zones.
 *
 * Return: VDO_SUCCESS or an error code.
 */
int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
{
	int result;
	struct hash_zones *zones;
	zone_count_t z;
	zone_count_t zone_count = vdo->thread_config.hash_zone_count;

	if (zone_count == 0)
		return VDO_SUCCESS;

	result = vdo_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
				       __func__, &zones);
	if (result != VDO_SUCCESS)
		return result;

	result = initialize_index(vdo, zones);
	if (result != VDO_SUCCESS) {
		vdo_free(zones);
		return result;
	}

	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);

	zones->zone_count = zone_count;
	for (z = 0; z < zone_count; z++) {
		result = initialize_zone(vdo, zones, z);
		if (result != VDO_SUCCESS) {
			vdo_free_hash_zones(zones);
			return result;
		}
	}

	result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
					 vdo->thread_config.admin_thread, zones, NULL,
					 vdo, &zones->manager);
	if (result != VDO_SUCCESS) {
		vdo_free_hash_zones(zones);
		return result;
	}

	*zones_ptr = zones;
	return VDO_SUCCESS;
}

void vdo_finish_dedupe_index(struct hash_zones *zones)
{
	if (zones == NULL)
		return;

	uds_destroy_index_session(vdo_forget(zones->index_session));
}

/**
 * vdo_free_hash_zones() - Free the hash zones.
 * @zones: The zone to free.
 */
void vdo_free_hash_zones(struct hash_zones *zones)
{
	zone_count_t i;

	if (zones == NULL)
		return;

	vdo_free(vdo_forget(zones->manager));

	for (i = 0; i < zones->zone_count; i++) {
		struct hash_zone *zone = &zones->zones[i];

		vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete));
		vdo_int_map_free(vdo_forget(zone->hash_lock_map));
		vdo_free(vdo_forget(zone->lock_array));
	}

	if (zones->index_session != NULL)
		vdo_finish_dedupe_index(zones);

	ratelimit_state_exit(&zones->ratelimiter);
	vdo_free(zones);
}

static void initiate_suspend_index(struct admin_state *state)
{
	struct hash_zones *zones = container_of(state, struct hash_zones, state);
	enum index_state index_state;

	spin_lock(&zones->lock);
	index_state = zones->index_state;
	spin_unlock(&zones->lock);

	if (index_state != IS_CLOSED) {
		bool save = vdo_is_state_saving(&zones->state);
		int result;

		result = uds_suspend_index_session(zones->index_session, save);
		if (result != UDS_SUCCESS)
			vdo_log_error_strerror(result, "Error suspending dedupe index");
	}

	vdo_finish_draining(state);
}

/**
 * suspend_index() - Suspend the UDS index prior to draining hash zones.
 *
 * Implements vdo_action_preamble_fn
 */
static void suspend_index(void *context, struct vdo_completion *completion)
{
	struct hash_zones *zones = context;

	vdo_start_draining(&zones->state,
			   vdo_get_current_manager_operation(zones->manager), completion,
			   initiate_suspend_index);
}

/**
 * initiate_drain() - Initiate a drain.
 *
 * Implements vdo_admin_initiator_fn.
 */
static void initiate_drain(struct admin_state *state)
{
	check_for_drain_complete(container_of(state, struct hash_zone, state));
}

/**
 * drain_hash_zone() - Drain a hash zone.
 *
 * Implements vdo_zone_action_fn.
 */
static void drain_hash_zone(void *context, zone_count_t zone_number,
			    struct vdo_completion *parent)
{
	struct hash_zones *zones = context;

	vdo_start_draining(&zones->zones[zone_number].state,
			   vdo_get_current_manager_operation(zones->manager), parent,
			   initiate_drain);
}

/** vdo_drain_hash_zones() - Drain all hash zones. */
void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
{
	vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
			       drain_hash_zone, NULL, parent);
}

static void launch_dedupe_state_change(struct hash_zones *zones)
	__must_hold(&zones->lock)
{
	/* ASSERTION: We enter with the lock held. */
	if (zones->changing || !vdo_is_state_normal(&zones->state))
		/* Either a change is already in progress, or changes are not allowed. */
		return;

	if (zones->create_flag || (zones->index_state != zones->index_target)) {
		zones->changing = true;
		vdo_launch_completion(&zones->completion);
		return;
	}

	/* ASSERTION: We exit with the lock held. */
}

/**
 * resume_index() - Resume the UDS index prior to resuming hash zones.
 *
 * Implements vdo_action_preamble_fn
 */
static void resume_index(void *context, struct vdo_completion *parent)
{
	struct hash_zones *zones = context;
	struct device_config *config = parent->vdo->device_config;
	int result;

	zones->parameters.bdev = config->owned_device->bdev;
	result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
	if (result != UDS_SUCCESS)
		vdo_log_error_strerror(result, "Error resuming dedupe index");

	spin_lock(&zones->lock);
	vdo_resume_if_quiescent(&zones->state);

	if (config->deduplication) {
		zones->index_target = IS_OPENED;
		WRITE_ONCE(zones->dedupe_flag, true);
	} else {
		zones->index_target = IS_CLOSED;
	}

	launch_dedupe_state_change(zones);
	spin_unlock(&zones->lock);

	vdo_finish_completion(parent);
}

/**
 * resume_hash_zone() - Resume a hash zone.
 *
 * Implements vdo_zone_action_fn.
 */
static void resume_hash_zone(void *context, zone_count_t zone_number,
			     struct vdo_completion *parent)
{
	struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);

	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
}

/**
 * vdo_resume_hash_zones() - Resume a set of hash zones.
 * @zones: The hash zones to resume.
 * @parent: The object to notify when the zones have resumed.
 */
void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
{
	if (vdo_is_read_only(parent->vdo)) {
		vdo_launch_completion(parent);
		return;
	}

	vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
			       resume_hash_zone, NULL, parent);
}

/**
 * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
 * @zone: The hash zone to query.
 * @tally: The tally
 */
static void get_hash_zone_statistics(const struct hash_zone *zone,
				     struct hash_lock_statistics *tally)
{
	const struct hash_lock_statistics *stats = &zone->statistics;

	tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
	tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
	tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
	tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
	tally->curr_dedupe_queries += READ_ONCE(zone->active);
}

static void get_index_statistics(struct hash_zones *zones,
				 struct index_statistics *stats)
{
	enum index_state state;
	struct uds_index_stats index_stats;
	int result;

	spin_lock(&zones->lock);
	state = zones->index_state;
	spin_unlock(&zones->lock);

	if (state != IS_OPENED)
		return;

	result = uds_get_index_session_stats(zones->index_session, &index_stats);
	if (result != UDS_SUCCESS) {
		vdo_log_error_strerror(result, "Error reading index stats");
		return;
	}

	stats->entries_indexed = index_stats.entries_indexed;
	stats->posts_found = index_stats.posts_found;
	stats->posts_not_found = index_stats.posts_not_found;
	stats->queries_found = index_stats.queries_found;
	stats->queries_not_found = index_stats.queries_not_found;
	stats->updates_found = index_stats.updates_found;
	stats->updates_not_found = index_stats.updates_not_found;
	stats->entries_discarded = index_stats.entries_discarded;
}

/**
 * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
 * @hash_zones: The hash zones to query
 *
 * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
 *         index
 */
void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)

{
	zone_count_t zone;

	for (zone = 0; zone < zones->zone_count; zone++)
		get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);

	get_index_statistics(zones, &stats->index);

	/*
	 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
	 * of queries not made because of earlier timeouts.
	 */
	stats->dedupe_advice_timeouts =
		(atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
}

/**
 * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
 * @zones: The hash_zones from which to select.
 * @name: The record name.
 *
 * Return: The hash zone responsible for the record name.
 */
struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
				       const struct uds_record_name *name)
{
	/*
	 * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
	 * since the number of hash zones is small.
	 * TODO: Verify that the first byte is independent enough.
	 */
	u32 hash = name->name[0];

	/*
	 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
	 * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
	 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
	 * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
	 */
	hash = (hash * zones->zone_count) >> 8;
	return &zones->zones[hash];
}

/**
 * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
 *                    free list.
 * @lock: The hash lock to dump.
 */
static void dump_hash_lock(const struct hash_lock *lock)
{
	const char *state;

	if (!list_empty(&lock->pool_node)) {
		/* This lock is on the free list. */
		return;
	}

	/*
	 * Necessarily cryptic since we can log a lot of these. First three chars of state is
	 * unambiguous. 'U' indicates a lock not registered in the map.
	 */
	state = get_hash_lock_state_name(lock->state);
	vdo_log_info("  hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
		     lock, state, (lock->registered ? 'D' : 'U'),
		     (unsigned long long) lock->duplicate.pbn,
		     lock->duplicate.state, lock->reference_count,
		     vdo_waitq_num_waiters(&lock->waiters), lock->agent);
}

static const char *index_state_to_string(struct hash_zones *zones,
					 enum index_state state)
{
	if (!vdo_is_state_normal(&zones->state))
		return SUSPENDED;

	switch (state) {
	case IS_CLOSED:
		return zones->error_flag ? ERROR : CLOSED;
	case IS_CHANGING:
		return zones->index_target == IS_OPENED ? OPENING : CLOSING;
	case IS_OPENED:
		return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
	default:
		return UNKNOWN;
	}
}

/**
 * dump_hash_zone() - Dump information about a hash zone to the log for debugging.
 * @zone: The zone to dump.
 */
static void dump_hash_zone(const struct hash_zone *zone)
{
	data_vio_count_t i;

	if (zone->hash_lock_map == NULL) {
		vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number);
		return;
	}

	vdo_log_info("struct hash_zone %u: mapSize=%zu",
		     zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
		dump_hash_lock(&zone->lock_array[i]);
}

/**
 * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
 * @zones: The zones to dump.
 */
void vdo_dump_hash_zones(struct hash_zones *zones)
{
	const char *state, *target;
	zone_count_t zone;

	spin_lock(&zones->lock);
	state = index_state_to_string(zones, zones->index_state);
	target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
	spin_unlock(&zones->lock);

	vdo_log_info("UDS index: state: %s", state);
	if (target != NULL)
		vdo_log_info("UDS index: changing to state: %s", target);

	for (zone = 0; zone < zones->zone_count; zone++)
		dump_hash_zone(&zones->zones[zone]);
}

void vdo_set_dedupe_index_timeout_interval(unsigned int value)
{
	u64 alb_jiffies;

	/* Arbitrary maximum value is two minutes */
	if (value > 120000)
		value = 120000;
	/* Arbitrary minimum value is 2 jiffies */
	alb_jiffies = msecs_to_jiffies(value);

	if (alb_jiffies < 2) {
		alb_jiffies = 2;
		value = jiffies_to_msecs(alb_jiffies);
	}
	vdo_dedupe_index_timeout_interval = value;
	vdo_dedupe_index_timeout_jiffies = alb_jiffies;
}

void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
{
	u64 min_jiffies;

	/* Arbitrary maximum value is one second */
	if (value > 1000)
		value = 1000;

	/* Arbitrary minimum value is 2 jiffies */
	min_jiffies = msecs_to_jiffies(value);

	if (min_jiffies < 2) {
		min_jiffies = 2;
		value = jiffies_to_msecs(min_jiffies);
	}

	vdo_dedupe_index_min_timer_interval = value;
	vdo_dedupe_index_min_timer_jiffies = min_jiffies;
}

/**
 * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
 * @zone: the hash zone
 *
 * Return: A dedupe_context or NULL if none are available
 */
static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
{
	struct dedupe_context *context;
	struct funnel_queue_entry *entry;

	assert_in_hash_zone(zone, __func__);

	if (!list_empty(&zone->available)) {
		WRITE_ONCE(zone->active, zone->active + 1);
		context = list_first_entry(&zone->available, struct dedupe_context,
					   list_entry);
		list_del_init(&context->list_entry);
		return context;
	}

	entry = vdo_funnel_queue_poll(zone->timed_out_complete);
	return ((entry == NULL) ?
		NULL : container_of(entry, struct dedupe_context, queue_entry));
}

static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
				enum uds_request_type operation)
{
	request->record_name = data_vio->record_name;
	request->type = operation;
	if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
		size_t offset = 0;
		struct uds_record_data *encoding = &request->new_metadata;

		encoding->data[offset++] = UDS_ADVICE_VERSION;
		encoding->data[offset++] = data_vio->new_mapped.state;
		put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
		offset += sizeof(u64);
		BUG_ON(offset != UDS_ADVICE_SIZE);
	}
}

/*
 * The index operation will inquire about data_vio.record_name, providing (if the operation is
 * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
 * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
 * set to the return status code of any asynchronous index processing.
 */
static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
{
	int result;
	struct dedupe_context *context;
	struct vdo *vdo = vdo_from_data_vio(data_vio);
	struct hash_zone *zone = data_vio->hash_zone;

	assert_data_vio_in_hash_zone(data_vio);

	if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
		continue_data_vio(data_vio);
		return;
	}

	context = acquire_context(zone);
	if (context == NULL) {
		atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
		continue_data_vio(data_vio);
		return;
	}

	data_vio->dedupe_context = context;
	context->requestor = data_vio;
	context->submission_jiffies = jiffies;
	prepare_uds_request(&context->request, data_vio, operation);
	atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
	list_add_tail(&context->list_entry, &zone->pending);
	start_expiration_timer(context);
	result = uds_launch_request(&context->request);
	if (result != UDS_SUCCESS) {
		context->request.status = result;
		finish_index_operation(&context->request);
	}
}

static void set_target_state(struct hash_zones *zones, enum index_state target,
			     bool change_dedupe, bool dedupe, bool set_create)
{
	const char *old_state, *new_state;

	spin_lock(&zones->lock);
	old_state = index_state_to_string(zones, zones->index_target);
	if (change_dedupe)
		WRITE_ONCE(zones->dedupe_flag, dedupe);

	if (set_create)
		zones->create_flag = true;

	zones->index_target = target;
	launch_dedupe_state_change(zones);
	new_state = index_state_to_string(zones, zones->index_target);
	spin_unlock(&zones->lock);

	if (old_state != new_state)
		vdo_log_info("Setting UDS index target state to %s", new_state);
}

const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
{
	const char *state;

	spin_lock(&zones->lock);
	state = index_state_to_string(zones, zones->index_state);
	spin_unlock(&zones->lock);

	return state;
}

/* Handle a dmsetup message relevant to the index. */
int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
{
	if (strcasecmp(name, "index-close") == 0) {
		set_target_state(zones, IS_CLOSED, false, false, false);
		return 0;
	} else if (strcasecmp(name, "index-create") == 0) {
		set_target_state(zones, IS_OPENED, false, false, true);
		return 0;
	} else if (strcasecmp(name, "index-disable") == 0) {
		set_target_state(zones, IS_OPENED, true, false, false);
		return 0;
	} else if (strcasecmp(name, "index-enable") == 0) {
		set_target_state(zones, IS_OPENED, true, true, false);
		return 0;
	}

	return -EINVAL;
}

void vdo_set_dedupe_state_normal(struct hash_zones *zones)
{
	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
}

/* If create_flag, create a new index without first attempting to load an existing index. */
void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
{
	set_target_state(zones, IS_OPENED, true, true, create_flag);
}