Skip to main content

borger/
simulation_controller.rs

1use crate::ClientKind;
2use crate::SimulationInitOptions;
3use crate::constructors::ConstructCustomStruct;
4use crate::diff_ser::DiffSerializer;
5use crate::multiplayer_tradeoff::{AnyTradeOff, Impl};
6use crate::networked_types::primitive::usize32;
7use crate::simulation::{Input, InputAge, State};
8use crate::snapshot_serdes::NewClientHeader;
9use crate::tick::{TickID, TickInfo};
10use crate::untracked::UntrackedState;
11use log::debug;
12use std::collections::VecDeque;
13use std::rc::Rc;
14use std::sync::mpsc::channel as sync_unbounded_channel;
15use std::time::Duration;
16use web_time::Instant;
17
18#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
19use crate::thread_comms::*;
20
21#[cfg(feature = "server")]
22use {
23	crate::tick::UnrollbackableNetEvent,
24	std::collections::HashMap,
25	std::sync::mpsc::{Receiver as SyncReceiver, Sender as SyncSender},
26	tokio::sync::mpsc::UnboundedSender as AsyncSender,
27};
28
29#[cfg(feature = "client")]
30use {
31	crate::presentation::PresentationContext, crate::snapshot_serdes, atomicbox::AtomicOptionBox,
32	std::sync::Arc,
33};
34
35#[cfg(not(feature = "singlethreaded"))]
36use wasm_thread as thread;
37
38#[cfg(feature = "client")]
39mod client;
40mod seek;
41#[cfg(feature = "server")]
42mod server;
43
44//recommend decreasing SIM_DT with this feature on
45#[cfg(feature = "server")]
46const TRACE_TICK_ADVANCEMENT: bool = false;
47#[cfg(feature = "client")]
48const TRACE_TICK_ADVANCEMENT: bool = false;
49
50//communications between the simulation thread
51//and the owning parent thread
52#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
53pub struct SimControllerExternals {
54	pub internals: SimThreading,
55
56	#[cfg(feature = "server")]
57	pub new_connection_sender: SyncSender<AsyncSender<SimToClientCommand>>,
58	#[cfg(feature = "client")]
59	pub comms: PresentationToSimChannel,
60}
61
62//data that needs to be moved across threads
63//during initialization of simulation thread.
64//all fields must be Send
65#[cfg(not(feature = "singlethreaded"))]
66struct SimMoveAcrossThreads {
67	o: SimulationInitOptions,
68
69	#[cfg(feature = "client")]
70	new_client_snapshot: Vec<u8>,
71
72	#[cfg(feature = "server")]
73	new_connection_receiver: SyncReceiver<AsyncSender<SimToClientCommand>>,
74	#[cfg(feature = "client")]
75	comms: SimToPresentationChannel,
76}
77
78#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
79pub enum SimThreading {
80	#[cfg(not(feature = "singlethreaded"))]
81	Multithreaded(thread::JoinHandle<()>),
82
83	#[cfg(any(feature = "singlethreaded", feature = "session_replay"))]
84	#[allow(private_interfaces)]
85	Singlethreaded(SimControllerInternals),
86}
87
88//on client, owned by simulation thread
89struct SimControllerInternals {
90	ctx: GameContext<Impl>,
91	o: SimulationInitOptions,
92
93	//inputs associated with ticks that haven't reached consensus yet
94	//are stored here. when more info is received ticks will be
95	//resimulated using this input history. server is aware of all
96	//clients' inputs; client only stores its own
97	#[cfg(feature = "server")]
98	input_history: HashMap<usize32, InternalInputHistory>,
99	#[cfg(feature = "client")]
100	input_history: InternalInputHistory,
101
102	#[cfg(feature = "server")]
103	new_connection_receiver: SyncReceiver<AsyncSender<SimToClientCommand>>,
104	#[cfg(feature = "server")]
105	comms: HashMap<usize32, SimToClientChannel>,
106	#[cfg(feature = "client")]
107	comms: SimToPresentationChannel,
108
109	#[cfg(feature = "server")]
110	server_events: VecDeque<UnrollbackableNetEvent>,
111
112	#[cfg(feature = "client")]
113	local_client_id: usize32,
114	#[cfg(feature = "client")]
115	calibration_samples: VecDeque<i16>,
116	#[cfg(feature = "client")]
117	initial_calibration: bool,
118}
119
120pub struct GameContext<TradeOff: AnyTradeOff> {
121	pub state: State,
122	pub tick: TickInfo,
123	pub diff: DiffSerializer<TradeOff>,
124}
125
126#[derive(Default, Debug)]
127struct InternalInputHistory {
128	//element at index 0 corresponds to the most recently
129	//completed consensus tick, and each subsequent element
130	//is a progressively more recent tick. between each
131	//scheduled tick (during the sleep period) it is
132	//guaranteed that every client has at least one element
133	//in order to compare the new input to the previous.
134	//some logic asserts at least 2 elements when consensus
135	//is taking place, in order to guarantee pop_front will
136	//leave at least 1 left by the end of the scheduled tick
137	entries: VecDeque<InternalInputEntry>,
138
139	//how many ticks timed out and reached consensus while
140	//waiting for this client's inputs. used to prevent
141	//newly received inputs from pushing to the inputs
142	//buffer until there are no more missing
143	#[cfg(feature = "server")]
144	timed_out_ticks: TickID,
145
146	//the diff received from the client is always applied to
147	//this, regardless of whether there are .timed_out inputs
148	#[cfg(feature = "server")]
149	latest_received: Input,
150
151	//if true, server will not wait for this client before
152	//coming to consensus
153	#[cfg(feature = "server")]
154	is_timed_out: bool,
155}
156
157#[derive(Default, Debug, Clone)]
158struct InternalInputEntry {
159	input: Input,
160
161	#[cfg(feature = "server")]
162	age: InputAge,
163
164	//ping is measured in 2 different ways depending on
165	//whether this is the server or client:
166	//1. server: time in ticks between the tick that this entry
167	//is associated with and the server's scheduled tick.id_cur
168	//at the time of receiving. shipped to the corresponding
169	//client as part of its state diff packet header (see the
170	//signaling scheme). normally negative but can be positive
171	//if client is ahead
172	//2. client: rtt in ticks between the time the client sends
173	//this input and the time it receives the corresponding
174	//authoritative state diff from server. can only be
175	//positive
176	//together these 2 numbers are used to calibrate the
177	//client's tick.id_target to try to match the server's
178	//tick.id_target in real world time, in case they have
179	//become desynced with each other. the time between
180	//the client receiving the initial state snapshot and
181	//actually starting the simulation always causes a
182	//noticeable desync that this can fix
183	#[cfg(feature = "server")]
184	ping: Option<i16>, //offset
185	#[cfg(feature = "client")]
186	ping: bool, //whether waiting to be acked for the first time
187}
188
189#[cfg(feature = "client")]
190struct ClientComms {
191	sim_comms: SimToPresentationChannel,
192	presentation_comms: PresentationToSimChannel,
193}
194
195#[cfg(feature = "client")]
196fn make_client_comms() -> ClientComms {
197	let sim_out = Arc::new(AtomicOptionBox::none());
198	let (to_presentation, from_sim) = sync_unbounded_channel();
199	let (to_sim, from_presentation) = sync_unbounded_channel();
200
201	let sim_comms = SimToPresentationChannel {
202		to_presentation,
203		from_presentation,
204		sim_out: sim_out.clone(),
205	};
206	let presentation_comms = PresentationToSimChannel {
207		to_sim,
208		from_sim,
209		sim_out,
210	};
211
212	ClientComms {
213		sim_comms,
214		presentation_comms,
215	}
216}
217
218struct MakeContext {
219	ctx: GameContext<Impl>,
220
221	#[cfg(feature = "client")]
222	new_client_header: NewClientHeader,
223}
224
225fn make_context(
226	o: &SimulationInitOptions,
227	#[cfg(feature = "client")] new_client_snapshot: Vec<u8>,
228) -> MakeContext {
229	let mut state = State::construct(&Rc::default(), ClientKind::NA);
230	o.init_static_level_geom.map(|init_level| {
231		init_level(&mut state);
232		state.reset_untracked(); //in case init_static_level_geom did anything nefarious
233	});
234
235	#[cfg(feature = "client")]
236	let new_client_header = snapshot_serdes::des_new_client(&mut state, new_client_snapshot).unwrap();
237
238	#[cfg(feature = "server")]
239	let tick_info = TickInfo::new(0, 0);
240	#[cfg(feature = "client")]
241	let tick_info = TickInfo::new(
242		new_client_header.tick_id_snapshot,
243		new_client_header.fast_forward_ticks,
244	);
245
246	MakeContext {
247		ctx: GameContext {
248			state,
249			tick: tick_info,
250			diff: DiffSerializer::default(),
251		},
252
253		#[cfg(feature = "client")]
254		new_client_header,
255	}
256}
257
258#[cfg(not(feature = "singlethreaded"))]
259#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
260pub fn init_multithreaded(
261	o: SimulationInitOptions,
262	#[cfg(feature = "client")] new_client_snapshot: Vec<u8>,
263) -> SimControllerExternals {
264	#[cfg(feature = "server")]
265	let (new_connection_sender, new_connection_receiver) = sync_unbounded_channel();
266
267	#[cfg(feature = "client")]
268	let ClientComms {
269		sim_comms,
270		presentation_comms,
271	} = make_client_comms();
272
273	let move_me = SimMoveAcrossThreads {
274		o,
275
276		#[cfg(feature = "client")]
277		new_client_snapshot,
278
279		#[cfg(feature = "server")]
280		new_connection_receiver,
281		#[cfg(feature = "client")]
282		comms: sim_comms,
283	};
284
285	let thread = thread::spawn(move || loop_multithreaded(move_me));
286
287	SimControllerExternals {
288		internals: SimThreading::Multithreaded(thread),
289
290		#[cfg(feature = "server")]
291		new_connection_sender,
292		#[cfg(feature = "client")]
293		comms: presentation_comms,
294	}
295}
296
297#[cfg(not(feature = "singlethreaded"))]
298fn loop_multithreaded(moved_data: SimMoveAcrossThreads) {
299	let MakeContext {
300		ctx,
301		#[cfg(feature = "client")]
302		new_client_header,
303	} = make_context(
304		&moved_data.o,
305		#[cfg(feature = "client")]
306		moved_data.new_client_snapshot,
307	);
308
309	let mut sim = SimControllerInternals {
310		ctx,
311		o: moved_data.o,
312
313		#[cfg(feature = "server")]
314		input_history: HashMap::new(),
315		#[cfg(feature = "client")]
316		input_history: InternalInputHistory::default(),
317
318		#[cfg(feature = "server")]
319		new_connection_receiver: moved_data.new_connection_receiver,
320		#[cfg(feature = "server")]
321		comms: HashMap::new(),
322		#[cfg(feature = "client")]
323		comms: moved_data.comms,
324
325		#[cfg(feature = "server")]
326		server_events: VecDeque::from([UnrollbackableNetEvent::ServerStart]),
327
328		#[cfg(feature = "client")]
329		local_client_id: new_client_header.client_id,
330		#[cfg(feature = "client")]
331		calibration_samples: VecDeque::new(),
332		#[cfg(feature = "client")]
333		initial_calibration: true,
334	};
335
336	#[cfg(feature = "client")]
337	sim.initial_fast_forward(new_client_header.fast_forward_ticks);
338
339	loop {
340		if sim.scheduled_tick(
341			#[cfg(feature = "session_replay")]
342			false,
343		) == None
344		{
345			break;
346		}
347
348		let next_tick_time = sim.ctx.tick.get_now();
349		let now = Instant::now();
350
351		if next_tick_time > now {
352			//unfortunately this is blocking, so using repl console on this thread
353			//is probably a no go. not that you could do much anyway since it's
354			//written in rust. also seems to sleep too long+cause clock drift if
355			//tick rate is fast?
356			thread::sleep(next_tick_time - now);
357		} else if TRACE_TICK_ADVANCEMENT && now > next_tick_time {
358			//simulation tick is running behind. possible death spiral.
359			//intentionally not handling this because game is unplayable
360			//and player will just quit
361			debug!("simulation tick hiccuped");
362		}
363	}
364}
365
366#[cfg(any(feature = "singlethreaded", feature = "session_replay"))]
367pub fn init_singlethreaded(o: SimulationInitOptions, new_client_snapshot: Vec<u8>) -> SimControllerExternals {
368	let ClientComms {
369		sim_comms,
370		presentation_comms,
371	} = make_client_comms();
372
373	let MakeContext {
374		ctx,
375		new_client_header,
376	} = make_context(&o, new_client_snapshot);
377
378	let mut internals = SimControllerInternals {
379		ctx,
380		o,
381		input_history: InternalInputHistory::default(),
382		comms: sim_comms,
383		local_client_id: new_client_header.client_id,
384		calibration_samples: VecDeque::new(),
385		initial_calibration: true,
386	};
387
388	internals.initial_fast_forward(new_client_header.fast_forward_ticks);
389
390	SimControllerExternals {
391		internals: SimThreading::Singlethreaded(internals),
392		comms: presentation_comms,
393	}
394}
395
396#[cfg(feature = "singlethreaded")]
397impl SimControllerExternals {
398	pub fn loop_singlethreaded(&mut self) {
399		let SimThreading::Singlethreaded(sim) = &mut self.internals;
400
401		let tick_id_accumulator = sim.ctx.tick.get_tick_at(Instant::now());
402		while sim.ctx.tick.id_cur < tick_id_accumulator {
403			if sim.scheduled_tick(
404				#[cfg(feature = "session_replay")]
405				false,
406			) == None
407			{
408				break;
409			}
410		}
411	}
412}
413
414#[cfg(feature = "session_replay")]
415pub fn replay_session(o: SimulationInitOptions, actions: Vec<SessionReplayAction>) -> Result<(), ()> {
416	let mut actions = actions.into_iter();
417	let Some(SessionReplayAction::Init(new_client_snapshot)) = actions.next() else {
418		return Err(());
419	};
420
421	let mut externals = init_singlethreaded(o, new_client_snapshot);
422	let SimThreading::Singlethreaded(sim) = &mut externals.internals else {
423		return Err(());
424	};
425
426	//ok to unwrap scheduled_tick() in here. no risk of disconnection
427	for action in actions {
428		match action {
429			SessionReplayAction::ScheduledTick => sim.scheduled_tick(true).unwrap(),
430			SessionReplayAction::ReceiveComm(msg) => externals.comms.to_sim.send(msg).map_err(|_| ())?,
431			SessionReplayAction::Init(_) => return Err(()),
432		};
433
434		//play the part of presentation thread to avoid mem leak
435		while let Ok(_) = externals.comms.from_sim.try_recv() {}
436	}
437
438	//one more time. in the event of a crash, the final
439	//ReplayAction::ScheduledTick is not recorded
440	sim.scheduled_tick(true).unwrap();
441
442	Ok(())
443}
444
445impl SimControllerInternals {
446	#[must_use]
447	//on a client, this returns none upon disconnection
448	fn scheduled_tick(&mut self, #[cfg(feature = "session_replay")] is_replay: bool) -> Option<()> {
449		self.ctx.tick.id_target += 1;
450		if TRACE_TICK_ADVANCEMENT {
451			debug!("begin scheduled tick @{:?}", self.ctx.tick);
452		}
453
454		/*
455		server to client tick signaling protocol:
456		- when server receives a client's input, as long as it hasn't timed out yet (INPUT_TOO_LATE) it rolls back to the tick associated with it and resimulates
457		- send separate state diff packets for simulation-driven vs. server events
458		- when server executes server events, the associated diffs are considered to happen between id_consensus and the predicted tick after it
459		- first value written to every state diff packet is the type of tick that this buffer contains state diffs for:
460		  TickType::ServerEvents -> client does not increment either of the tick id's. this diff is applied to the end of the most recent consensus tick to avoid rollback
461		  TickType::Consensus -> client increments both tick id's. pop the oldest element from input_history
462		  TickType::Predicted -> client increments id_cur only
463		- second value written depends on tick type:
464		  TickType::ServerEvents -> nothing. no client inputs are associated with server events
465		  TickType::Consensus -> whether the receiving client's inputs are acked (true) or a timeout occurred (false)
466		  TickType::Predicted -> the associated tick id. clients who receive predicted ticks are guaranteed to be acked in this packet
467		- the first time a client's inputs are acked, a third value is written:
468		  it is the number of ticks between server's tick.id_cur at reception time and the acked input's associated tick id
469		  TickType::ServerEvents -> n/a, server events don't have associated inputs
470		  TickType::Consensus -> only write if acked for the first time (implies previous value was true)
471		  TickType::Predicted -> only write if acked for the first time
472		- client will roll back to the correct id upon arrival. for ServerEvents and Consensus, this means rolling back as far as possible (until the rollback buffer is empty)
473		- after all state diff packets are processed, client then locally simulates/predicts up to id_target
474		*/
475
476		//remember tick.id_cur is the number of completed ticks. the
477		//target/goal of this iteration of the loop is to simulate 1
478		//more tick than has currently finished simulating
479		self.scheduled_tick_impl()?;
480
481		if TRACE_TICK_ADVANCEMENT {
482			debug!("end scheduled tick");
483		}
484
485		#[cfg(feature = "session_replay")]
486		if !is_replay {
487			self.comms
488				.to_presentation
489				.send(SimToPresentationCommand::SessionReplayAction(
490					SessionReplayAction::ScheduledTick,
491				))
492				.unwrap();
493		}
494
495		Some(())
496	}
497}
498
499impl InternalInputHistory {
500	//new client will attempt to rapidly fast forward
501	//from id_consensus to id_cur, so it won't have time
502	//to populate real input states. generate a bunch of
503	//bogus ones locally without transmitting to avoid
504	//wasting bandwidth. it is expected to start sending
505	//inputs at id_cur. the +1 is so that there is always
506	//at least 1 last known input, in the event that a
507	//client hasn't sent anything at all since the last
508	//consensus tick
509	fn generate_bogus_inputs(&mut self, amount: TickID) {
510		self.entries
511			.extend((0..amount + 1).map(|_| InternalInputEntry::default()));
512	}
513}