1use std::fmt;
39use std::os::raw::{c_int, c_void};
40
41use crate::error::{Error, Result};
42use crate::function::Function;
43use crate::state::RawLua;
44use crate::traits::{FromLuaMulti, IntoLuaMulti};
45use crate::types::{LuaType, ValueRef, XRc};
46use crate::util::{StackGuard, check_stack, error_traceback_thread, pop_error};
47
48#[cfg(not(feature = "luau"))]
49use crate::{
50 debug::{Debug, HookTriggers},
51 types::HookKind,
52};
53
54#[cfg(feature = "async")]
55use {
56 futures_util::stream::Stream,
57 std::{
58 future::Future,
59 marker::PhantomData,
60 pin::Pin,
61 ptr::NonNull,
62 task::{Context, Poll, Waker},
63 },
64};
65
66#[derive(Clone, Copy, Debug, Default)]
68#[non_exhaustive]
69pub struct ThreadTriggers {
70 pub on_create: bool,
72 pub on_resume: bool,
74 pub on_yield: bool,
76}
77
78impl ThreadTriggers {
79 pub const ON_CREATE: Self = Self::new().on_create();
81
82 pub const ON_RESUME: Self = Self::new().on_resume();
84
85 pub const ON_YIELD: Self = Self::new().on_yield();
87
88 pub const fn new() -> Self {
90 Self {
91 on_create: false,
92 on_resume: false,
93 on_yield: false,
94 }
95 }
96
97 pub const fn on_create(mut self) -> Self {
99 self.on_create = true;
100 self
101 }
102
103 pub const fn on_resume(mut self) -> Self {
105 self.on_resume = true;
106 self
107 }
108
109 pub const fn on_yield(mut self) -> Self {
111 self.on_yield = true;
112 self
113 }
114}
115
116impl std::ops::BitOr for ThreadTriggers {
117 type Output = Self;
118
119 fn bitor(mut self, rhs: Self) -> Self::Output {
120 self.on_create |= rhs.on_create;
121 self.on_resume |= rhs.on_resume;
122 self.on_yield |= rhs.on_yield;
123 self
124 }
125}
126
127impl std::ops::BitOrAssign for ThreadTriggers {
128 fn bitor_assign(&mut self, rhs: Self) {
129 *self = *self | rhs;
130 }
131}
132
133#[derive(Debug, Clone)]
135#[non_exhaustive]
136pub enum ThreadEvent {
137 Create(Thread),
139 Resume(Thread),
141 Yield(Thread),
143}
144
145#[derive(Debug, Copy, Clone, Eq, PartialEq)]
147pub enum ThreadStatus {
148 Resumable,
152 Running,
154 Finished,
156 Error,
158}
159
160#[derive(Clone, Copy)]
165enum ThreadStatusInner {
166 New(c_int),
167 Running,
168 Yielded(c_int),
169 Finished,
170 Error,
171}
172
173impl ThreadStatusInner {
174 #[cfg(feature = "async")]
175 #[inline(always)]
176 fn is_resumable(self) -> bool {
177 matches!(self, ThreadStatusInner::New(_) | ThreadStatusInner::Yielded(_))
178 }
179
180 #[inline(always)]
181 fn is_yielded(self) -> bool {
182 matches!(self, ThreadStatusInner::Yielded(_))
183 }
184}
185
186#[derive(Clone, PartialEq)]
188pub struct Thread(pub(crate) ValueRef, pub(crate) *mut ffi::lua_State);
189
190#[cfg(feature = "send")]
191unsafe impl Send for Thread {}
192#[cfg(feature = "send")]
193unsafe impl Sync for Thread {}
194
195#[cfg(feature = "async")]
200#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
201#[must_use = "futures do nothing unless you `.await` or poll them"]
202pub struct AsyncThread<R> {
203 thread: Thread,
204 ret: PhantomData<fn() -> R>,
205 recycle: bool,
206}
207
208impl Thread {
209 #[inline(always)]
211 pub fn state(&self) -> *mut ffi::lua_State {
212 self.1
213 }
214
215 pub fn resume<R>(&self, args: impl IntoLuaMulti) -> Result<R>
260 where
261 R: FromLuaMulti,
262 {
263 let lua = self.0.lua.lock();
264 let mut pushed_nargs = match self.status_inner(&lua) {
265 ThreadStatusInner::New(nargs) | ThreadStatusInner::Yielded(nargs) => nargs,
266 _ => return Err(Error::CoroutineUnresumable),
267 };
268
269 let state = lua.state();
270 let thread_state = self.state();
271 unsafe {
272 let _sg = StackGuard::new(state);
273
274 if lua.thread_event_triggers().on_resume
276 && let Some(cb) = lua.thread_event_callback()
277 && XRc::strong_count(&cb) <= 2
278 {
279 cb(lua.lua(), ThreadEvent::Resume(self.clone()))?;
280 }
281
282 let nargs = args.push_into_stack_multi(&lua)?;
283 if nargs > 0 {
284 check_stack(thread_state, nargs)?;
285 ffi::lua_xmove(state, thread_state, nargs);
286 pushed_nargs += nargs;
287 }
288
289 let _thread_sg = StackGuard::with_top(thread_state, 0);
290 let (status, nresults) = self.resume_inner(&lua, pushed_nargs)?;
291
292 if lua.thread_event_triggers().on_yield
294 && status.is_yielded()
295 && let Some(cb) = lua.thread_event_callback()
296 && XRc::strong_count(&cb) <= 2
297 {
298 cb(lua.lua(), ThreadEvent::Yield(self.clone()))?;
299 }
300
301 check_stack(state, nresults + 1)?;
302 ffi::lua_xmove(thread_state, state, nresults);
303
304 R::from_stack_multi(nresults, &lua)
305 }
306 }
307
308 #[cfg(feature = "luau")]
312 #[cfg_attr(docsrs, doc(cfg(feature = "luau")))]
313 pub fn resume_error<R>(&self, error: impl crate::IntoLua) -> Result<R>
314 where
315 R: FromLuaMulti,
316 {
317 let lua = self.0.lua.lock();
318 match self.status_inner(&lua) {
319 ThreadStatusInner::New(_) | ThreadStatusInner::Yielded(_) => {}
320 _ => return Err(Error::CoroutineUnresumable),
321 };
322
323 let state = lua.state();
324 let thread_state = self.state();
325 unsafe {
326 let _sg = StackGuard::new(state);
327
328 if lua.thread_event_triggers().on_resume
330 && let Some(cb) = lua.thread_event_callback()
331 && XRc::strong_count(&cb) <= 2
332 {
333 cb(lua.lua(), ThreadEvent::Resume(self.clone()))?;
334 }
335
336 check_stack(state, 1)?;
337 error.push_into_stack(&lua)?;
338 ffi::lua_xmove(state, thread_state, 1);
339
340 let _thread_sg = StackGuard::with_top(thread_state, 0);
341 let (status, nresults) = self.resume_inner(&lua, ffi::LUA_RESUMEERROR)?;
342
343 if lua.thread_event_triggers().on_yield
345 && status.is_yielded()
346 && let Some(cb) = lua.thread_event_callback()
347 && XRc::strong_count(&cb) <= 2
348 {
349 cb(lua.lua(), ThreadEvent::Yield(self.clone()))?;
350 }
351
352 check_stack(state, nresults + 1)?;
353 ffi::lua_xmove(thread_state, state, nresults);
354
355 R::from_stack_multi(nresults, &lua)
356 }
357 }
358
359 unsafe fn resume_inner(&self, lua: &RawLua, nargs: c_int) -> Result<(ThreadStatusInner, c_int)> {
363 let state = lua.state();
364 let thread_state = self.state();
365 let mut nresults = 0;
366 #[cfg(not(feature = "luau"))]
367 let ret = ffi::lua_resume(thread_state, state, nargs, &mut nresults as *mut c_int);
368 #[cfg(feature = "luau")]
369 let ret = ffi::lua_resumex(thread_state, state, nargs, &mut nresults as *mut c_int);
370 match ret {
371 ffi::LUA_OK => Ok((ThreadStatusInner::Finished, nresults)),
372 ffi::LUA_YIELD => Ok((ThreadStatusInner::Yielded(0), nresults)),
373 ffi::LUA_ERRMEM => {
374 Err(pop_error(thread_state, ret))
376 }
377 _ => {
378 check_stack(state, 3)?;
379 protect_lua!(state, 0, 1, |state| error_traceback_thread(state, thread_state))?;
380 Err(pop_error(state, ret))
381 }
382 }
383 }
384
385 pub fn status(&self) -> ThreadStatus {
387 match self.status_inner(&self.0.lua.lock()) {
388 ThreadStatusInner::New(_) | ThreadStatusInner::Yielded(_) => ThreadStatus::Resumable,
389 ThreadStatusInner::Running => ThreadStatus::Running,
390 ThreadStatusInner::Finished => ThreadStatus::Finished,
391 ThreadStatusInner::Error => ThreadStatus::Error,
392 }
393 }
394
395 fn status_inner(&self, lua: &RawLua) -> ThreadStatusInner {
397 let thread_state = self.state();
398 if thread_state == lua.state() {
399 return ThreadStatusInner::Running;
401 }
402 let status = unsafe { ffi::lua_status(thread_state) };
403 let top = unsafe { ffi::lua_gettop(thread_state) };
404 match status {
405 ffi::LUA_YIELD => ThreadStatusInner::Yielded(top),
406 ffi::LUA_OK if top > 0 => ThreadStatusInner::New(top - 1),
407 ffi::LUA_OK => ThreadStatusInner::Finished,
408 _ => ThreadStatusInner::Error,
409 }
410 }
411
412 #[inline(always)]
415 pub fn is_resumable(&self) -> bool {
416 self.status() == ThreadStatus::Resumable
417 }
418
419 #[inline(always)]
421 pub fn is_running(&self) -> bool {
422 self.status() == ThreadStatus::Running
423 }
424
425 #[inline(always)]
427 pub fn is_finished(&self) -> bool {
428 self.status() == ThreadStatus::Finished
429 }
430
431 #[inline(always)]
433 pub fn is_error(&self) -> bool {
434 self.status() == ThreadStatus::Error
435 }
436
437 #[cfg(not(feature = "luau"))]
446 #[cfg_attr(docsrs, doc(cfg(not(feature = "luau"))))]
447 pub fn set_hook<F>(&self, triggers: HookTriggers, callback: F) -> Result<()>
448 where
449 F: Fn(&crate::Lua, &Debug) -> Result<crate::VmState> + crate::MaybeSend + 'static,
450 {
451 let lua = self.0.lua.lock();
452 unsafe {
453 lua.set_thread_hook(
454 self.state(),
455 HookKind::Thread(triggers, crate::types::XRc::new(callback)),
456 )
457 }
458 }
459
460 #[cfg(not(feature = "luau"))]
462 #[cfg_attr(docsrs, doc(cfg(not(feature = "luau"))))]
463 pub fn remove_hook(&self) {
464 let _lua = self.0.lua.lock();
465 unsafe {
466 ffi::lua_sethook(self.state(), None, 0, 0);
467 }
468 }
469
470 pub fn reset(&self, func: Function) -> Result<()> {
485 let lua = self.0.lua.lock();
486 let thread_state = self.state();
487 unsafe {
488 let status = self.status_inner(&lua);
489 self.reset_inner(status)?;
490
491 ffi::lua_xpush(lua.ref_thread(), thread_state, func.0.index);
493
494 #[cfg(feature = "luau")]
495 {
496 ffi::lua_xpush(lua.main_state(), thread_state, ffi::LUA_GLOBALSINDEX);
498 ffi::lua_replace(thread_state, ffi::LUA_GLOBALSINDEX);
499 }
500
501 Ok(())
502 }
503 }
504
505 unsafe fn reset_inner(&self, status: ThreadStatusInner) -> Result<()> {
506 match status {
507 ThreadStatusInner::New(_) => {
508 ffi::lua_settop(self.state(), 0);
510 Ok(())
511 }
512 ThreadStatusInner::Running => Err(Error::runtime("cannot reset a running thread")),
513 ThreadStatusInner::Finished => Ok(()),
514 #[cfg(not(any(feature = "lua55", feature = "lua54", feature = "luau")))]
515 ThreadStatusInner::Yielded(_) | ThreadStatusInner::Error => {
516 Err(Error::runtime("cannot reset non-finished thread"))
517 }
518 #[cfg(any(feature = "lua55", feature = "lua54", feature = "luau"))]
519 ThreadStatusInner::Yielded(_) | ThreadStatusInner::Error => {
520 let thread_state = self.state();
521
522 #[cfg(all(feature = "lua54", not(feature = "vendored")))]
523 let status = ffi::lua_resetthread(thread_state);
524 #[cfg(any(feature = "lua55", all(feature = "lua54", feature = "vendored")))]
525 let status = {
526 let lua = self.0.lua.lock();
527 ffi::lua_closethread(thread_state, lua.state())
528 };
529 #[cfg(any(feature = "lua55", feature = "lua54"))]
530 if status != ffi::LUA_OK {
531 return Err(pop_error(thread_state, status));
532 }
533 #[cfg(feature = "luau")]
534 ffi::lua_resetthread(thread_state);
535
536 Ok(())
537 }
538 }
539 }
540
541 #[cfg(feature = "async")]
588 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
589 pub fn into_async<R>(self, args: impl IntoLuaMulti) -> Result<AsyncThread<R>>
590 where
591 R: FromLuaMulti,
592 {
593 let lua = self.0.lua.lock();
594 if !self.status_inner(&lua).is_resumable() {
595 return Err(Error::CoroutineUnresumable);
596 }
597
598 let state = lua.state();
599 let thread_state = self.state();
600 unsafe {
601 let _sg = StackGuard::new(state);
602
603 let nargs = args.push_into_stack_multi(&lua)?;
604 if nargs > 0 {
605 check_stack(thread_state, nargs)?;
606 ffi::lua_xmove(state, thread_state, nargs);
607 }
608
609 Ok(AsyncThread {
610 thread: self,
611 ret: PhantomData,
612 recycle: false,
613 })
614 }
615 }
616
617 #[cfg(any(feature = "luau", doc))]
653 #[cfg_attr(docsrs, doc(cfg(feature = "luau")))]
654 pub fn sandbox(&self) -> Result<()> {
655 let lua = self.0.lua.lock();
656 let state = lua.state();
657 let thread_state = self.state();
658 unsafe {
659 check_stack(thread_state, 3)?;
660 check_stack(state, 3)?;
661 protect_lua!(state, 0, 0, |_| ffi::luaL_sandboxthread(thread_state))
662 }
663 }
664
665 #[inline]
671 pub fn to_pointer(&self) -> *const c_void {
672 self.0.to_pointer()
673 }
674}
675
676impl fmt::Debug for Thread {
677 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
678 fmt.debug_tuple("Thread").field(&self.0).finish()
679 }
680}
681
682impl LuaType for Thread {
683 const TYPE_ID: c_int = ffi::LUA_TTHREAD;
684}
685
686#[cfg(feature = "async")]
687impl<R> AsyncThread<R> {
688 #[inline(always)]
689 pub(crate) fn set_recyclable(&mut self, recyclable: bool) {
690 self.recycle = recyclable;
691 }
692}
693
694#[cfg(feature = "async")]
695impl<R> Drop for AsyncThread<R> {
696 fn drop(&mut self) {
697 #[allow(clippy::collapsible_if)]
698 if self.recycle {
699 if let Some(lua) = self.thread.0.lua.try_lock() {
700 unsafe {
701 let mut status = self.thread.status_inner(&lua);
702 if matches!(status, ThreadStatusInner::Yielded(0)) {
703 ffi::lua_pushlightuserdata(self.thread.1, crate::Lua::poll_terminate().0);
705 if let Ok((new_status, _)) = self.thread.resume_inner(&lua, 1) {
706 status = new_status;
708 }
709 }
710
711 if self.thread.reset_inner(status).is_ok() {
713 lua.recycle_thread(&mut self.thread);
714 }
715 }
716 }
717 }
718 }
719}
720
721#[cfg(feature = "async")]
722impl<R: FromLuaMulti> Stream for AsyncThread<R> {
723 type Item = Result<R>;
724
725 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
726 let lua = self.thread.0.lua.lock();
727 let nargs = match self.thread.status_inner(&lua) {
728 ThreadStatusInner::New(nargs) | ThreadStatusInner::Yielded(nargs) => nargs,
729 _ => return Poll::Ready(None),
730 };
731
732 let state = lua.state();
733 let thread_state = self.thread.state();
734 unsafe {
735 let _sg = StackGuard::new(state);
736 let _thread_sg = StackGuard::with_top(thread_state, 0);
737 let _wg = WakerGuard::new(&lua, cx.waker());
738
739 if lua.thread_event_triggers().on_resume
741 && let Some(cb) = lua.thread_event_callback()
742 && XRc::strong_count(&cb) <= 2
743 {
744 cb(lua.lua(), ThreadEvent::Resume(self.thread.clone()))?;
745 }
746
747 let (status, nresults) = (self.thread).resume_inner(&lua, nargs)?;
748
749 if status.is_yielded() {
750 if lua.thread_event_triggers().on_yield
752 && let Some(cb) = lua.thread_event_callback()
753 && XRc::strong_count(&cb) <= 2
754 {
755 cb(lua.lua(), ThreadEvent::Yield(self.thread.clone()))?;
756 }
757
758 if nresults == 1 && is_poll_pending(thread_state) {
759 return Poll::Pending;
760 }
761 cx.waker().wake_by_ref();
763 }
764
765 check_stack(state, nresults + 1)?;
766 ffi::lua_xmove(thread_state, state, nresults);
767
768 Poll::Ready(Some(R::from_stack_multi(nresults, &lua)))
769 }
770 }
771}
772
773#[cfg(feature = "async")]
774impl<R: FromLuaMulti> Future for AsyncThread<R> {
775 type Output = Result<R>;
776
777 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
778 let lua = self.thread.0.lua.lock();
779 let nargs = match self.thread.status_inner(&lua) {
780 ThreadStatusInner::New(nargs) | ThreadStatusInner::Yielded(nargs) => nargs,
781 _ => return Poll::Ready(Err(Error::CoroutineUnresumable)),
782 };
783
784 let state = lua.state();
785 let thread_state = self.thread.state();
786 unsafe {
787 let _sg = StackGuard::new(state);
788 let _thread_sg = StackGuard::with_top(thread_state, 0);
789 let _wg = WakerGuard::new(&lua, cx.waker());
790
791 if lua.thread_event_triggers().on_resume
793 && let Some(cb) = lua.thread_event_callback()
794 && XRc::strong_count(&cb) <= 2
795 {
796 cb(lua.lua(), ThreadEvent::Resume(self.thread.clone()))?;
797 }
798
799 let (status, nresults) = self.thread.resume_inner(&lua, nargs)?;
800
801 if status.is_yielded() {
802 if lua.thread_event_triggers().on_yield
804 && let Some(cb) = lua.thread_event_callback()
805 && XRc::strong_count(&cb) <= 2
806 {
807 cb(lua.lua(), ThreadEvent::Yield(self.thread.clone()))?;
808 }
809
810 if !(nresults == 1 && is_poll_pending(thread_state)) {
811 cx.waker().wake_by_ref();
813 }
814 return Poll::Pending;
815 }
816
817 check_stack(state, nresults + 1)?;
818 ffi::lua_xmove(thread_state, state, nresults);
819
820 Poll::Ready(R::from_stack_multi(nresults, &lua))
821 }
822 }
823}
824
825#[cfg(feature = "async")]
826#[inline(always)]
827unsafe fn is_poll_pending(state: *mut ffi::lua_State) -> bool {
828 ffi::lua_tolightuserdata(state, -1) == crate::Lua::poll_pending().0
829}
830
831#[cfg(feature = "async")]
832struct WakerGuard<'lua, 'a> {
833 lua: &'lua RawLua,
834 prev: NonNull<Waker>,
835 _phantom: PhantomData<&'a ()>,
836}
837
838#[cfg(feature = "async")]
839impl<'lua, 'a> WakerGuard<'lua, 'a> {
840 #[inline]
841 pub fn new(lua: &'lua RawLua, waker: &'a Waker) -> Result<WakerGuard<'lua, 'a>> {
842 let prev = lua.set_waker(NonNull::from(waker));
843 Ok(WakerGuard {
844 lua,
845 prev,
846 _phantom: PhantomData,
847 })
848 }
849}
850
851#[cfg(feature = "async")]
852impl Drop for WakerGuard<'_, '_> {
853 fn drop(&mut self) {
854 self.lua.set_waker(self.prev);
855 }
856}
857
858#[cfg(test)]
859mod assertions {
860 use super::*;
861
862 #[cfg(not(feature = "send"))]
863 static_assertions::assert_not_impl_any!(Thread: Send);
864 #[cfg(feature = "send")]
865 static_assertions::assert_impl_all!(Thread: Send, Sync);
866 #[cfg(all(feature = "async", not(feature = "send")))]
867 static_assertions::assert_not_impl_any!(AsyncThread<()>: Send);
868 #[cfg(all(feature = "async", feature = "send"))]
869 static_assertions::assert_impl_all!(AsyncThread<()>: Send, Sync);
870}