1use std::fmt;
39use std::os::raw::{c_int, c_void};
40
41use crate::error::{Error, Result};
42use crate::function::Function;
43use crate::state::RawLua;
44use crate::traits::{FromLuaMulti, IntoLuaMulti};
45use crate::types::{LuaType, ValueRef};
46use crate::util::{StackGuard, check_stack, error_traceback_thread, pop_error};
47
48#[cfg(not(feature = "luau"))]
49use crate::{
50 debug::{Debug, HookTriggers},
51 types::HookKind,
52};
53
54#[cfg(feature = "async")]
55use {
56 futures_util::stream::Stream,
57 std::{
58 future::Future,
59 marker::PhantomData,
60 pin::Pin,
61 ptr::NonNull,
62 task::{Context, Poll, Waker},
63 },
64};
65
66#[derive(Debug, Copy, Clone, Eq, PartialEq)]
68pub enum ThreadStatus {
69 Resumable,
73 Running,
75 Finished,
77 Error,
79}
80
81#[derive(Clone, Copy)]
86enum ThreadStatusInner {
87 New(c_int),
88 Running,
89 Yielded(c_int),
90 Finished,
91 Error,
92}
93
94impl ThreadStatusInner {
95 #[cfg(feature = "async")]
96 #[inline(always)]
97 fn is_resumable(self) -> bool {
98 matches!(self, ThreadStatusInner::New(_) | ThreadStatusInner::Yielded(_))
99 }
100
101 #[cfg(feature = "async")]
102 #[inline(always)]
103 fn is_yielded(self) -> bool {
104 matches!(self, ThreadStatusInner::Yielded(_))
105 }
106}
107
108#[derive(Clone, PartialEq)]
110pub struct Thread(pub(crate) ValueRef, pub(crate) *mut ffi::lua_State);
111
112#[cfg(feature = "send")]
113unsafe impl Send for Thread {}
114#[cfg(feature = "send")]
115unsafe impl Sync for Thread {}
116
117#[cfg(feature = "async")]
122#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
123#[must_use = "futures do nothing unless you `.await` or poll them"]
124pub struct AsyncThread<R> {
125 thread: Thread,
126 ret: PhantomData<fn() -> R>,
127 recycle: bool,
128}
129
130impl Thread {
131 #[inline(always)]
133 pub fn state(&self) -> *mut ffi::lua_State {
134 self.1
135 }
136
137 pub fn resume<R>(&self, args: impl IntoLuaMulti) -> Result<R>
182 where
183 R: FromLuaMulti,
184 {
185 let lua = self.0.lua.lock();
186 let mut pushed_nargs = match self.status_inner(&lua) {
187 ThreadStatusInner::New(nargs) | ThreadStatusInner::Yielded(nargs) => nargs,
188 _ => return Err(Error::CoroutineUnresumable),
189 };
190
191 let state = lua.state();
192 let thread_state = self.state();
193 unsafe {
194 let _sg = StackGuard::new(state);
195
196 let nargs = args.push_into_stack_multi(&lua)?;
197 if nargs > 0 {
198 check_stack(thread_state, nargs)?;
199 ffi::lua_xmove(state, thread_state, nargs);
200 pushed_nargs += nargs;
201 }
202
203 let _thread_sg = StackGuard::with_top(thread_state, 0);
204 let (_, nresults) = self.resume_inner(&lua, pushed_nargs)?;
205 check_stack(state, nresults + 1)?;
206 ffi::lua_xmove(thread_state, state, nresults);
207
208 R::from_stack_multi(nresults, &lua)
209 }
210 }
211
212 #[cfg(feature = "luau")]
216 #[cfg_attr(docsrs, doc(cfg(feature = "luau")))]
217 pub fn resume_error<R>(&self, error: impl crate::IntoLua) -> Result<R>
218 where
219 R: FromLuaMulti,
220 {
221 let lua = self.0.lua.lock();
222 match self.status_inner(&lua) {
223 ThreadStatusInner::New(_) | ThreadStatusInner::Yielded(_) => {}
224 _ => return Err(Error::CoroutineUnresumable),
225 };
226
227 let state = lua.state();
228 let thread_state = self.state();
229 unsafe {
230 let _sg = StackGuard::new(state);
231
232 check_stack(state, 1)?;
233 error.push_into_stack(&lua)?;
234 ffi::lua_xmove(state, thread_state, 1);
235
236 let _thread_sg = StackGuard::with_top(thread_state, 0);
237 let (_, nresults) = self.resume_inner(&lua, ffi::LUA_RESUMEERROR)?;
238 check_stack(state, nresults + 1)?;
239 ffi::lua_xmove(thread_state, state, nresults);
240
241 R::from_stack_multi(nresults, &lua)
242 }
243 }
244
245 unsafe fn resume_inner(&self, lua: &RawLua, nargs: c_int) -> Result<(ThreadStatusInner, c_int)> {
249 let state = lua.state();
250 let thread_state = self.state();
251 let mut nresults = 0;
252 #[cfg(not(feature = "luau"))]
253 let ret = ffi::lua_resume(thread_state, state, nargs, &mut nresults as *mut c_int);
254 #[cfg(feature = "luau")]
255 let ret = ffi::lua_resumex(thread_state, state, nargs, &mut nresults as *mut c_int);
256 match ret {
257 ffi::LUA_OK => Ok((ThreadStatusInner::Finished, nresults)),
258 ffi::LUA_YIELD => Ok((ThreadStatusInner::Yielded(0), nresults)),
259 ffi::LUA_ERRMEM => {
260 Err(pop_error(thread_state, ret))
262 }
263 _ => {
264 check_stack(state, 3)?;
265 protect_lua!(state, 0, 1, |state| error_traceback_thread(state, thread_state))?;
266 Err(pop_error(state, ret))
267 }
268 }
269 }
270
271 pub fn status(&self) -> ThreadStatus {
273 match self.status_inner(&self.0.lua.lock()) {
274 ThreadStatusInner::New(_) | ThreadStatusInner::Yielded(_) => ThreadStatus::Resumable,
275 ThreadStatusInner::Running => ThreadStatus::Running,
276 ThreadStatusInner::Finished => ThreadStatus::Finished,
277 ThreadStatusInner::Error => ThreadStatus::Error,
278 }
279 }
280
281 fn status_inner(&self, lua: &RawLua) -> ThreadStatusInner {
283 let thread_state = self.state();
284 if thread_state == lua.state() {
285 return ThreadStatusInner::Running;
287 }
288 let status = unsafe { ffi::lua_status(thread_state) };
289 let top = unsafe { ffi::lua_gettop(thread_state) };
290 match status {
291 ffi::LUA_YIELD => ThreadStatusInner::Yielded(top),
292 ffi::LUA_OK if top > 0 => ThreadStatusInner::New(top - 1),
293 ffi::LUA_OK => ThreadStatusInner::Finished,
294 _ => ThreadStatusInner::Error,
295 }
296 }
297
298 #[inline(always)]
301 pub fn is_resumable(&self) -> bool {
302 self.status() == ThreadStatus::Resumable
303 }
304
305 #[inline(always)]
307 pub fn is_running(&self) -> bool {
308 self.status() == ThreadStatus::Running
309 }
310
311 #[inline(always)]
313 pub fn is_finished(&self) -> bool {
314 self.status() == ThreadStatus::Finished
315 }
316
317 #[inline(always)]
319 pub fn is_error(&self) -> bool {
320 self.status() == ThreadStatus::Error
321 }
322
323 #[cfg(not(feature = "luau"))]
332 #[cfg_attr(docsrs, doc(cfg(not(feature = "luau"))))]
333 pub fn set_hook<F>(&self, triggers: HookTriggers, callback: F) -> Result<()>
334 where
335 F: Fn(&crate::Lua, &Debug) -> Result<crate::VmState> + crate::MaybeSend + 'static,
336 {
337 let lua = self.0.lua.lock();
338 unsafe {
339 lua.set_thread_hook(
340 self.state(),
341 HookKind::Thread(triggers, crate::types::XRc::new(callback)),
342 )
343 }
344 }
345
346 #[cfg(not(feature = "luau"))]
348 #[cfg_attr(docsrs, doc(cfg(not(feature = "luau"))))]
349 pub fn remove_hook(&self) {
350 let _lua = self.0.lua.lock();
351 unsafe {
352 ffi::lua_sethook(self.state(), None, 0, 0);
353 }
354 }
355
356 pub fn reset(&self, func: Function) -> Result<()> {
371 let lua = self.0.lua.lock();
372 let thread_state = self.state();
373 unsafe {
374 let status = self.status_inner(&lua);
375 self.reset_inner(status)?;
376
377 ffi::lua_xpush(lua.ref_thread(), thread_state, func.0.index);
379
380 #[cfg(feature = "luau")]
381 {
382 ffi::lua_xpush(lua.main_state(), thread_state, ffi::LUA_GLOBALSINDEX);
384 ffi::lua_replace(thread_state, ffi::LUA_GLOBALSINDEX);
385 }
386
387 Ok(())
388 }
389 }
390
391 unsafe fn reset_inner(&self, status: ThreadStatusInner) -> Result<()> {
392 match status {
393 ThreadStatusInner::New(_) => {
394 ffi::lua_settop(self.state(), 0);
396 Ok(())
397 }
398 ThreadStatusInner::Running => Err(Error::runtime("cannot reset a running thread")),
399 ThreadStatusInner::Finished => Ok(()),
400 #[cfg(not(any(feature = "lua55", feature = "lua54", feature = "luau")))]
401 ThreadStatusInner::Yielded(_) | ThreadStatusInner::Error => {
402 Err(Error::runtime("cannot reset non-finished thread"))
403 }
404 #[cfg(any(feature = "lua55", feature = "lua54", feature = "luau"))]
405 ThreadStatusInner::Yielded(_) | ThreadStatusInner::Error => {
406 let thread_state = self.state();
407
408 #[cfg(all(feature = "lua54", not(feature = "vendored")))]
409 let status = ffi::lua_resetthread(thread_state);
410 #[cfg(any(feature = "lua55", all(feature = "lua54", feature = "vendored")))]
411 let status = {
412 let lua = self.0.lua.lock();
413 ffi::lua_closethread(thread_state, lua.state())
414 };
415 #[cfg(any(feature = "lua55", feature = "lua54"))]
416 if status != ffi::LUA_OK {
417 return Err(pop_error(thread_state, status));
418 }
419 #[cfg(feature = "luau")]
420 ffi::lua_resetthread(thread_state);
421
422 Ok(())
423 }
424 }
425 }
426
427 #[cfg(feature = "async")]
474 #[cfg_attr(docsrs, doc(cfg(feature = "async")))]
475 pub fn into_async<R>(self, args: impl IntoLuaMulti) -> Result<AsyncThread<R>>
476 where
477 R: FromLuaMulti,
478 {
479 let lua = self.0.lua.lock();
480 if !self.status_inner(&lua).is_resumable() {
481 return Err(Error::CoroutineUnresumable);
482 }
483
484 let state = lua.state();
485 let thread_state = self.state();
486 unsafe {
487 let _sg = StackGuard::new(state);
488
489 let nargs = args.push_into_stack_multi(&lua)?;
490 if nargs > 0 {
491 check_stack(thread_state, nargs)?;
492 ffi::lua_xmove(state, thread_state, nargs);
493 }
494
495 Ok(AsyncThread {
496 thread: self,
497 ret: PhantomData,
498 recycle: false,
499 })
500 }
501 }
502
503 #[cfg(any(feature = "luau", doc))]
539 #[cfg_attr(docsrs, doc(cfg(feature = "luau")))]
540 pub fn sandbox(&self) -> Result<()> {
541 let lua = self.0.lua.lock();
542 let state = lua.state();
543 let thread_state = self.state();
544 unsafe {
545 check_stack(thread_state, 3)?;
546 check_stack(state, 3)?;
547 protect_lua!(state, 0, 0, |_| ffi::luaL_sandboxthread(thread_state))
548 }
549 }
550
551 #[inline]
557 pub fn to_pointer(&self) -> *const c_void {
558 self.0.to_pointer()
559 }
560}
561
562impl fmt::Debug for Thread {
563 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
564 fmt.debug_tuple("Thread").field(&self.0).finish()
565 }
566}
567
568impl LuaType for Thread {
569 const TYPE_ID: c_int = ffi::LUA_TTHREAD;
570}
571
572#[cfg(feature = "async")]
573impl<R> AsyncThread<R> {
574 #[inline(always)]
575 pub(crate) fn set_recyclable(&mut self, recyclable: bool) {
576 self.recycle = recyclable;
577 }
578}
579
580#[cfg(feature = "async")]
581impl<R> Drop for AsyncThread<R> {
582 fn drop(&mut self) {
583 #[allow(clippy::collapsible_if)]
584 if self.recycle {
585 if let Some(lua) = self.thread.0.lua.try_lock() {
586 unsafe {
587 let mut status = self.thread.status_inner(&lua);
588 if matches!(status, ThreadStatusInner::Yielded(0)) {
589 ffi::lua_pushlightuserdata(self.thread.1, crate::Lua::poll_terminate().0);
591 if let Ok((new_status, _)) = self.thread.resume_inner(&lua, 1) {
592 status = new_status;
594 }
595 }
596
597 if self.thread.reset_inner(status).is_ok() {
599 lua.recycle_thread(&mut self.thread);
600 }
601 }
602 }
603 }
604 }
605}
606
607#[cfg(feature = "async")]
608impl<R: FromLuaMulti> Stream for AsyncThread<R> {
609 type Item = Result<R>;
610
611 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
612 let lua = self.thread.0.lua.lock();
613 let nargs = match self.thread.status_inner(&lua) {
614 ThreadStatusInner::New(nargs) | ThreadStatusInner::Yielded(nargs) => nargs,
615 _ => return Poll::Ready(None),
616 };
617
618 let state = lua.state();
619 let thread_state = self.thread.state();
620 unsafe {
621 let _sg = StackGuard::new(state);
622 let _thread_sg = StackGuard::with_top(thread_state, 0);
623 let _wg = WakerGuard::new(&lua, cx.waker());
624
625 let (status, nresults) = (self.thread).resume_inner(&lua, nargs)?;
626
627 if status.is_yielded() {
628 if nresults == 1 && is_poll_pending(thread_state) {
629 return Poll::Pending;
630 }
631 cx.waker().wake_by_ref();
633 }
634
635 check_stack(state, nresults + 1)?;
636 ffi::lua_xmove(thread_state, state, nresults);
637
638 Poll::Ready(Some(R::from_stack_multi(nresults, &lua)))
639 }
640 }
641}
642
643#[cfg(feature = "async")]
644impl<R: FromLuaMulti> Future for AsyncThread<R> {
645 type Output = Result<R>;
646
647 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
648 let lua = self.thread.0.lua.lock();
649 let nargs = match self.thread.status_inner(&lua) {
650 ThreadStatusInner::New(nargs) | ThreadStatusInner::Yielded(nargs) => nargs,
651 _ => return Poll::Ready(Err(Error::CoroutineUnresumable)),
652 };
653
654 let state = lua.state();
655 let thread_state = self.thread.state();
656 unsafe {
657 let _sg = StackGuard::new(state);
658 let _thread_sg = StackGuard::with_top(thread_state, 0);
659 let _wg = WakerGuard::new(&lua, cx.waker());
660
661 let (status, nresults) = self.thread.resume_inner(&lua, nargs)?;
662
663 if status.is_yielded() {
664 if !(nresults == 1 && is_poll_pending(thread_state)) {
665 cx.waker().wake_by_ref();
667 }
668 return Poll::Pending;
669 }
670
671 check_stack(state, nresults + 1)?;
672 ffi::lua_xmove(thread_state, state, nresults);
673
674 Poll::Ready(R::from_stack_multi(nresults, &lua))
675 }
676 }
677}
678
679#[cfg(feature = "async")]
680#[inline(always)]
681unsafe fn is_poll_pending(state: *mut ffi::lua_State) -> bool {
682 ffi::lua_tolightuserdata(state, -1) == crate::Lua::poll_pending().0
683}
684
685#[cfg(feature = "async")]
686struct WakerGuard<'lua, 'a> {
687 lua: &'lua RawLua,
688 prev: NonNull<Waker>,
689 _phantom: PhantomData<&'a ()>,
690}
691
692#[cfg(feature = "async")]
693impl<'lua, 'a> WakerGuard<'lua, 'a> {
694 #[inline]
695 pub fn new(lua: &'lua RawLua, waker: &'a Waker) -> Result<WakerGuard<'lua, 'a>> {
696 let prev = lua.set_waker(NonNull::from(waker));
697 Ok(WakerGuard {
698 lua,
699 prev,
700 _phantom: PhantomData,
701 })
702 }
703}
704
705#[cfg(feature = "async")]
706impl Drop for WakerGuard<'_, '_> {
707 fn drop(&mut self) {
708 self.lua.set_waker(self.prev);
709 }
710}
711
712#[cfg(test)]
713mod assertions {
714 use super::*;
715
716 #[cfg(not(feature = "send"))]
717 static_assertions::assert_not_impl_any!(Thread: Send);
718 #[cfg(feature = "send")]
719 static_assertions::assert_impl_all!(Thread: Send, Sync);
720 #[cfg(all(feature = "async", not(feature = "send")))]
721 static_assertions::assert_not_impl_any!(AsyncThread<()>: Send);
722 #[cfg(all(feature = "async", feature = "send"))]
723 static_assertions::assert_impl_all!(AsyncThread<()>: Send, Sync);
724}