summaryrefslogtreecommitdiff
path: root/tools/testing/selftests/sched_ext/dequeue.c
blob: 4e93262703ca8454bc8f0b19133c1dcc17f6adbd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
// SPDX-License-Identifier: GPL-2.0
/*
 * Copyright (c) 2025 NVIDIA Corporation.
 */
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <bpf/bpf.h>
#include <scx/common.h>
#include <sys/wait.h>
#include <sched.h>
#include <pthread.h>
#include "scx_test.h"
#include "dequeue.bpf.skel.h"

#define NUM_WORKERS 8
#define AFFINITY_HAMMER_MS 500

/*
 * Worker function that creates enqueue/dequeue events via CPU work and
 * sleep.
 */
static void worker_fn(int id)
{
	int i;
	volatile int sum = 0;

	for (i = 0; i < 1000; i++) {
		volatile int j;

		/* Do some work to trigger scheduling events */
		for (j = 0; j < 10000; j++)
			sum += j;

		/* Sleep to trigger dequeue */
		usleep(1000 + (id * 100));
	}

	exit(0);
}

/*
 * This thread changes workers' affinity from outside so that some changes
 * hit tasks while they are still in the scheduler's queue and trigger
 * property-change dequeues.
 */
static void *affinity_hammer_fn(void *arg)
{
	pid_t *pids = arg;
	cpu_set_t cpuset;
	int i = 0, n = NUM_WORKERS;
	struct timespec start, now;

	clock_gettime(CLOCK_MONOTONIC, &start);
	while (1) {
		int w = i % n;
		int cpu = (i / n) % 4;

		CPU_ZERO(&cpuset);
		CPU_SET(cpu, &cpuset);
		sched_setaffinity(pids[w], sizeof(cpuset), &cpuset);
		i++;

		/* Check elapsed time every 256 iterations to limit gettime cost */
		if ((i & 255) == 0) {
			long long elapsed_ms;

			clock_gettime(CLOCK_MONOTONIC, &now);
			elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL +
				     (now.tv_nsec - start.tv_nsec) / 1000000;
			if (elapsed_ms >= AFFINITY_HAMMER_MS)
				break;
		}
	}
	return NULL;
}

static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario,
					 const char *scenario_name)
{
	struct bpf_link *link;
	pid_t pids[NUM_WORKERS];
	pthread_t hammer;

	int i, status;
	u64 enq_start, deq_start,
	    dispatch_deq_start, change_deq_start, bpf_queue_full_start;
	u64 enq_delta, deq_delta,
	    dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta;

	/* Set the test scenario */
	skel->bss->test_scenario = scenario;

	/* Record starting counts */
	enq_start = skel->bss->enqueue_cnt;
	deq_start = skel->bss->dequeue_cnt;
	dispatch_deq_start = skel->bss->dispatch_dequeue_cnt;
	change_deq_start = skel->bss->change_dequeue_cnt;
	bpf_queue_full_start = skel->bss->bpf_queue_full;

	link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops);
	SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name);

	/* Fork worker processes to generate enqueue/dequeue events */
	for (i = 0; i < NUM_WORKERS; i++) {
		pids[i] = fork();
		SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i);

		if (pids[i] == 0) {
			worker_fn(i);
			/* Should not reach here */
			exit(1);
		}
	}

	/*
	 * Run an "affinity hammer" so that some property changes hit tasks
	 * while they are still in BPF custody (e.g., in user DSQ or BPF
	 * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues.
	 */
	SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0,
		    "Failed to create affinity hammer thread");
	pthread_join(hammer, NULL);

	/* Wait for all workers to complete */
	for (i = 0; i < NUM_WORKERS; i++) {
		SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i],
			    "Failed to wait for worker %d", i);
		SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status);
	}

	bpf_link__destroy(link);

	SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG));

	/* Calculate deltas */
	enq_delta = skel->bss->enqueue_cnt - enq_start;
	deq_delta = skel->bss->dequeue_cnt - deq_start;
	dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start;
	change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start;
	bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start;

	printf("%s:\n", scenario_name);
	printf("  enqueues: %lu\n", (unsigned long)enq_delta);
	printf("  dequeues: %lu (dispatch: %lu, property_change: %lu)\n",
	       (unsigned long)deq_delta,
	       (unsigned long)dispatch_deq_delta,
	       (unsigned long)change_deq_delta);
	printf("  BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta);

	/*
	 * Validate enqueue/dequeue lifecycle tracking.
	 *
	 * For scenarios 0, 1, 3, 4 (local and global DSQs from
	 * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues
	 * should be 0 because tasks bypass the BPF scheduler entirely:
	 * tasks never enter BPF scheduler's custody.
	 *
	 * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect
	 * both enqueues and dequeues.
	 *
	 * The BPF code does strict state machine validation with
	 * scx_bpf_error() to ensure the workflow semantics are correct.
	 *
	 * If we reach this point without errors, the semantics are
	 * validated correctly.
	 */
	if (scenario == 0 || scenario == 1 ||
	    scenario == 3 || scenario == 4) {
		/* Tasks bypass BPF scheduler completely */
		SCX_EQ(enq_delta, 0);
		SCX_EQ(deq_delta, 0);
		SCX_EQ(dispatch_deq_delta, 0);
		SCX_EQ(change_deq_delta, 0);
	} else {
		/*
		 * User DSQ from ops.enqueue() or ops.select_cpu(): tasks
		 * enter BPF scheduler's custody.
		 *
		 * Also validate 1:1 enqueue/dequeue pairing.
		 */
		SCX_GT(enq_delta, 0);
		SCX_GT(deq_delta, 0);
		SCX_EQ(enq_delta, deq_delta);
	}

	return SCX_TEST_PASS;
}

static enum scx_test_status setup(void **ctx)
{
	struct dequeue *skel;

	skel = dequeue__open();
	SCX_FAIL_IF(!skel, "Failed to open skel");
	SCX_ENUM_INIT(skel);
	SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel");

	*ctx = skel;

	return SCX_TEST_PASS;
}

static enum scx_test_status run(void *ctx)
{
	struct dequeue *skel = ctx;
	enum scx_test_status status;

	status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()");
	if (status != SCX_TEST_PASS)
		return status;

	status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()");
	if (status != SCX_TEST_PASS)
		return status;

	status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()");
	if (status != SCX_TEST_PASS)
		return status;

	status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()");
	if (status != SCX_TEST_PASS)
		return status;

	status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()");
	if (status != SCX_TEST_PASS)
		return status;

	status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()");
	if (status != SCX_TEST_PASS)
		return status;

	status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()");
	if (status != SCX_TEST_PASS)
		return status;

	printf("\n=== Summary ===\n");
	printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt);
	printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt);
	printf("  Dispatch dequeues: %lu (no flag, normal workflow)\n",
	       (unsigned long)skel->bss->dispatch_dequeue_cnt);
	printf("  Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n",
	       (unsigned long)skel->bss->change_dequeue_cnt);
	printf("  BPF queue full: %lu\n",
	       (unsigned long)skel->bss->bpf_queue_full);
	printf("\nAll scenarios passed - no state machine violations detected\n");
	printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n");
	printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n");
	printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n");
	printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n");
	printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n");
	printf("-> Validated: No duplicate enqueues or invalid state transitions\n");

	return SCX_TEST_PASS;
}

static void cleanup(void *ctx)
{
	struct dequeue *skel = ctx;

	dequeue__destroy(skel);
}

struct scx_test dequeue_test = {
	.name = "dequeue",
	.description = "Verify ops.dequeue() semantics",
	.setup = setup,
	.run = run,
	.cleanup = cleanup,
};

REGISTER_SCX_TEST(&dequeue_test)