1use crate::ClientKind;
2use crate::SimulationInitOptions;
3use crate::constructors::ConstructCustomStruct;
4use crate::diff_ser::DiffSerializer;
5use crate::multiplayer_tradeoff::{AnyTradeOff, Impl};
6use crate::networked_types::primitive::usize32;
7use crate::simulation::{Input, InputAge, State};
8use crate::snapshot_serdes::NewClientHeader;
9use crate::tick::{TickID, TickInfo};
10use crate::untracked::UntrackedState;
11use log::debug;
12use std::collections::VecDeque;
13use std::rc::Rc;
14use std::sync::mpsc::channel as sync_unbounded_channel;
15use std::time::Duration;
16use web_time::Instant;
17
18#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
19use crate::thread_comms::*;
20
21#[cfg(feature = "server")]
22use {
23 crate::tick::UnrollbackableNetEvent,
24 std::collections::HashMap,
25 std::sync::mpsc::{Receiver as SyncReceiver, Sender as SyncSender},
26 tokio::sync::mpsc::UnboundedSender as AsyncSender,
27};
28
29#[cfg(feature = "client")]
30use {
31 crate::presentation::PresentationContext, crate::snapshot_serdes, atomicbox::AtomicOptionBox,
32 std::sync::Arc,
33};
34
35#[cfg(not(feature = "singlethreaded"))]
36use wasm_thread as thread;
37
38#[cfg(feature = "client")]
39mod client;
40mod seek;
41#[cfg(feature = "server")]
42mod server;
43
44#[cfg(feature = "server")]
46const TRACE_TICK_ADVANCEMENT: bool = false;
47#[cfg(feature = "client")]
48const TRACE_TICK_ADVANCEMENT: bool = false;
49
50#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
53pub struct SimControllerExternals {
54 pub internals: SimThreading,
55
56 #[cfg(feature = "server")]
57 pub new_connection_sender: SyncSender<AsyncSender<SimToClientCommand>>,
58 #[cfg(feature = "client")]
59 pub comms: PresentationToSimChannel,
60}
61
62#[cfg(not(feature = "singlethreaded"))]
66struct SimMoveAcrossThreads {
67 o: SimulationInitOptions,
68
69 #[cfg(feature = "client")]
70 new_client_snapshot: Vec<u8>,
71
72 #[cfg(feature = "server")]
73 new_connection_receiver: SyncReceiver<AsyncSender<SimToClientCommand>>,
74 #[cfg(feature = "client")]
75 comms: SimToPresentationChannel,
76}
77
78#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
79pub enum SimThreading {
80 #[cfg(not(feature = "singlethreaded"))]
81 Multithreaded(thread::JoinHandle<()>),
82
83 #[cfg(any(feature = "singlethreaded", feature = "session_replay"))]
84 #[allow(private_interfaces)]
85 Singlethreaded(SimControllerInternals),
86}
87
88struct SimControllerInternals {
90 ctx: GameContext<Impl>,
91 o: SimulationInitOptions,
92
93 #[cfg(feature = "server")]
98 input_history: HashMap<usize32, InternalInputHistory>,
99 #[cfg(feature = "client")]
100 input_history: InternalInputHistory,
101
102 #[cfg(feature = "server")]
103 new_connection_receiver: SyncReceiver<AsyncSender<SimToClientCommand>>,
104 #[cfg(feature = "server")]
105 comms: HashMap<usize32, SimToClientChannel>,
106 #[cfg(feature = "client")]
107 comms: SimToPresentationChannel,
108
109 #[cfg(feature = "server")]
110 server_events: VecDeque<UnrollbackableNetEvent>,
111
112 #[cfg(feature = "client")]
113 local_client_id: usize32,
114 #[cfg(feature = "client")]
115 calibration_samples: VecDeque<i16>,
116 #[cfg(feature = "client")]
117 initial_calibration: bool,
118}
119
120pub struct GameContext<TradeOff: AnyTradeOff> {
121 pub state: State,
122 pub tick: TickInfo,
123 pub diff: DiffSerializer<TradeOff>,
124}
125
126#[derive(Default, Debug)]
127struct InternalInputHistory {
128 entries: VecDeque<InternalInputEntry>,
138
139 #[cfg(feature = "server")]
144 timed_out_ticks: TickID,
145
146 #[cfg(feature = "server")]
149 latest_received: Input,
150
151 #[cfg(feature = "server")]
154 is_timed_out: bool,
155}
156
157#[derive(Default, Debug, Clone)]
158struct InternalInputEntry {
159 input: Input,
160
161 #[cfg(feature = "server")]
162 age: InputAge,
163
164 #[cfg(feature = "server")]
184 ping: Option<i16>, #[cfg(feature = "client")]
186 ping: bool, }
188
189#[cfg(feature = "client")]
190struct ClientComms {
191 sim_comms: SimToPresentationChannel,
192 presentation_comms: PresentationToSimChannel,
193}
194
195#[cfg(feature = "client")]
196fn make_client_comms() -> ClientComms {
197 let sim_out = Arc::new(AtomicOptionBox::none());
198 let (to_presentation, from_sim) = sync_unbounded_channel();
199 let (to_sim, from_presentation) = sync_unbounded_channel();
200
201 let sim_comms = SimToPresentationChannel {
202 to_presentation,
203 from_presentation,
204 sim_out: sim_out.clone(),
205 };
206 let presentation_comms = PresentationToSimChannel {
207 to_sim,
208 from_sim,
209 sim_out,
210 };
211
212 ClientComms {
213 sim_comms,
214 presentation_comms,
215 }
216}
217
218struct MakeContext {
219 ctx: GameContext<Impl>,
220
221 #[cfg(feature = "client")]
222 new_client_header: NewClientHeader,
223}
224
225fn make_context(
226 o: &SimulationInitOptions,
227 #[cfg(feature = "client")] new_client_snapshot: Vec<u8>,
228) -> MakeContext {
229 let mut state = State::construct(&Rc::default(), ClientKind::NA);
230 o.init_static_level_geom.map(|init_level| {
231 init_level(&mut state);
232 state.reset_untracked(); });
234
235 #[cfg(feature = "client")]
236 let new_client_header = snapshot_serdes::des_new_client(&mut state, new_client_snapshot).unwrap();
237
238 #[cfg(feature = "server")]
239 let tick_info = TickInfo::new(0, 0);
240 #[cfg(feature = "client")]
241 let tick_info = TickInfo::new(
242 new_client_header.tick_id_snapshot,
243 new_client_header.fast_forward_ticks,
244 );
245
246 MakeContext {
247 ctx: GameContext {
248 state,
249 tick: tick_info,
250 diff: DiffSerializer::default(),
251 },
252
253 #[cfg(feature = "client")]
254 new_client_header,
255 }
256}
257
258#[cfg(not(feature = "singlethreaded"))]
259#[cfg_attr(not(any(feature = "server", feature = "client")), doc(hidden))]
260pub fn init_multithreaded(
261 o: SimulationInitOptions,
262 #[cfg(feature = "client")] new_client_snapshot: Vec<u8>,
263) -> SimControllerExternals {
264 #[cfg(feature = "server")]
265 let (new_connection_sender, new_connection_receiver) = sync_unbounded_channel();
266
267 #[cfg(feature = "client")]
268 let ClientComms {
269 sim_comms,
270 presentation_comms,
271 } = make_client_comms();
272
273 let move_me = SimMoveAcrossThreads {
274 o,
275
276 #[cfg(feature = "client")]
277 new_client_snapshot,
278
279 #[cfg(feature = "server")]
280 new_connection_receiver,
281 #[cfg(feature = "client")]
282 comms: sim_comms,
283 };
284
285 let thread = thread::spawn(move || loop_multithreaded(move_me));
286
287 SimControllerExternals {
288 internals: SimThreading::Multithreaded(thread),
289
290 #[cfg(feature = "server")]
291 new_connection_sender,
292 #[cfg(feature = "client")]
293 comms: presentation_comms,
294 }
295}
296
297#[cfg(not(feature = "singlethreaded"))]
298fn loop_multithreaded(moved_data: SimMoveAcrossThreads) {
299 let MakeContext {
300 ctx,
301 #[cfg(feature = "client")]
302 new_client_header,
303 } = make_context(
304 &moved_data.o,
305 #[cfg(feature = "client")]
306 moved_data.new_client_snapshot,
307 );
308
309 let mut sim = SimControllerInternals {
310 ctx,
311 o: moved_data.o,
312
313 #[cfg(feature = "server")]
314 input_history: HashMap::new(),
315 #[cfg(feature = "client")]
316 input_history: InternalInputHistory::default(),
317
318 #[cfg(feature = "server")]
319 new_connection_receiver: moved_data.new_connection_receiver,
320 #[cfg(feature = "server")]
321 comms: HashMap::new(),
322 #[cfg(feature = "client")]
323 comms: moved_data.comms,
324
325 #[cfg(feature = "server")]
326 server_events: VecDeque::from([UnrollbackableNetEvent::ServerStart]),
327
328 #[cfg(feature = "client")]
329 local_client_id: new_client_header.client_id,
330 #[cfg(feature = "client")]
331 calibration_samples: VecDeque::new(),
332 #[cfg(feature = "client")]
333 initial_calibration: true,
334 };
335
336 #[cfg(feature = "client")]
337 sim.initial_fast_forward(new_client_header.fast_forward_ticks);
338
339 loop {
340 if sim.scheduled_tick(
341 #[cfg(feature = "session_replay")]
342 false,
343 ) == None
344 {
345 break;
346 }
347
348 let next_tick_time = sim.ctx.tick.get_now();
349 let now = Instant::now();
350
351 if next_tick_time > now {
352 thread::sleep(next_tick_time - now);
357 } else if TRACE_TICK_ADVANCEMENT && now > next_tick_time {
358 debug!("simulation tick hiccuped");
362 }
363 }
364}
365
366#[cfg(any(feature = "singlethreaded", feature = "session_replay"))]
367pub fn init_singlethreaded(o: SimulationInitOptions, new_client_snapshot: Vec<u8>) -> SimControllerExternals {
368 let ClientComms {
369 sim_comms,
370 presentation_comms,
371 } = make_client_comms();
372
373 let MakeContext {
374 ctx,
375 new_client_header,
376 } = make_context(&o, new_client_snapshot);
377
378 let mut internals = SimControllerInternals {
379 ctx,
380 o,
381 input_history: InternalInputHistory::default(),
382 comms: sim_comms,
383 local_client_id: new_client_header.client_id,
384 calibration_samples: VecDeque::new(),
385 initial_calibration: true,
386 };
387
388 internals.initial_fast_forward(new_client_header.fast_forward_ticks);
389
390 SimControllerExternals {
391 internals: SimThreading::Singlethreaded(internals),
392 comms: presentation_comms,
393 }
394}
395
396#[cfg(feature = "singlethreaded")]
397impl SimControllerExternals {
398 pub fn loop_singlethreaded(&mut self) {
399 let SimThreading::Singlethreaded(sim) = &mut self.internals;
400
401 let tick_id_accumulator = sim.ctx.tick.get_tick_at(Instant::now());
402 while sim.ctx.tick.id_cur < tick_id_accumulator {
403 if sim.scheduled_tick(
404 #[cfg(feature = "session_replay")]
405 false,
406 ) == None
407 {
408 break;
409 }
410 }
411 }
412}
413
414#[cfg(feature = "session_replay")]
415pub fn replay_session(o: SimulationInitOptions, actions: Vec<SessionReplayAction>) -> Result<(), ()> {
416 let mut actions = actions.into_iter();
417 let Some(SessionReplayAction::Init(new_client_snapshot)) = actions.next() else {
418 return Err(());
419 };
420
421 let mut externals = init_singlethreaded(o, new_client_snapshot);
422 let SimThreading::Singlethreaded(sim) = &mut externals.internals else {
423 return Err(());
424 };
425
426 for action in actions {
428 match action {
429 SessionReplayAction::ScheduledTick => sim.scheduled_tick(true).unwrap(),
430 SessionReplayAction::ReceiveComm(msg) => externals.comms.to_sim.send(msg).map_err(|_| ())?,
431 SessionReplayAction::Init(_) => return Err(()),
432 };
433
434 while let Ok(_) = externals.comms.from_sim.try_recv() {}
436 }
437
438 sim.scheduled_tick(true).unwrap();
441
442 Ok(())
443}
444
445impl SimControllerInternals {
446 #[must_use]
447 fn scheduled_tick(&mut self, #[cfg(feature = "session_replay")] is_replay: bool) -> Option<()> {
449 self.ctx.tick.id_target += 1;
450 if TRACE_TICK_ADVANCEMENT {
451 debug!("begin scheduled tick @{:?}", self.ctx.tick);
452 }
453
454 self.scheduled_tick_impl()?;
480
481 if TRACE_TICK_ADVANCEMENT {
482 debug!("end scheduled tick");
483 }
484
485 #[cfg(feature = "session_replay")]
486 if !is_replay {
487 self.comms
488 .to_presentation
489 .send(SimToPresentationCommand::SessionReplayAction(
490 SessionReplayAction::ScheduledTick,
491 ))
492 .unwrap();
493 }
494
495 Some(())
496 }
497}
498
499impl InternalInputHistory {
500 fn generate_bogus_inputs(&mut self, amount: TickID) {
510 self.entries
511 .extend((0..amount + 1).map(|_| InternalInputEntry::default()));
512 }
513}