/* * Copyright (c) 2010, Kelvin Lawson. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. No personal names or organizations' names associated with the * Atomthreads project may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE ATOMTHREADS PROJECT AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include "atom.h" #include "atomqueue.h" #include "atomtimer.h" #include "atomuser.h" /* Local data types */ typedef struct queue_timer { ATOM_TCB *tcb_ptr; /* Thread which is suspended with timeout */ ATOM_QUEUE *queue_ptr; /* Queue the thread is interested in */ ATOM_TCB **suspQ; /* TCB queue which thread is suspended on */ } QUEUE_TIMER; /* Forward declarations */ static uint8_t queue_remove (ATOM_QUEUE *qptr, uint8_t* msgptr); static uint8_t queue_insert (ATOM_QUEUE *qptr, uint8_t* msgptr); static void atomQueueTimerCallback (POINTER cb_data); /** * \b atomQueueCreate * * Initialises a queue object. * * Must be called before calling any other queue library routines on a * queue. Objects can be deleted later using atomQueueDelete(). * * Does not allocate storage, the caller provides the queue object. * * Callers pass in their own buffer area for storing the queue messages while * in transit between threads. The provided storage must be large enough to * store (\c unit_size * \c max_num_mgs) bytes. i.e. the storage area will be * used for up to \c max_num_msgs messages each of size \c unit_size. * * Queues use a fixed-size message. * * This function can be called from interrupt context. * * @param[in] qptr Pointer to queue object * @param[in] buff_ptr Pointer to buffer storage area * @param[in] unit_size Size in bytes of each queue message * @param[in] max_num_msgs Maximum number of messages in the queue * * @retval ATOM_OK Success * @retval ATOM_ERR_PARAM Bad parameters */ uint8_t atomQueueCreate (ATOM_QUEUE *qptr, uint8_t *buff_ptr, uint32_t unit_size, uint32_t max_num_msgs) { uint8_t status; /* Parameter check */ if ((qptr == NULL) || (buff_ptr == NULL)) { /* Bad pointers */ status = ATOM_ERR_PARAM; } else if ((unit_size == 0) || (max_num_msgs == 0)) { /* Bad values */ status = ATOM_ERR_PARAM; } else { /* Store the queue details */ qptr->buff_ptr = buff_ptr; qptr->unit_size = unit_size; qptr->max_num_msgs = max_num_msgs; /* Initialise the suspended threads queues */ qptr->putSuspQ = NULL; qptr->getSuspQ = NULL; /* Initialise the insert/remove pointers */ qptr->insert_index = 0; qptr->remove_index = 0; qptr->num_msgs_stored = 0; /* Successful */ status = ATOM_OK; } return (status); } /** * \b atomQueueDelete * * Deletes a queue object. * * Any threads currently suspended on the queue will be woken up with * return status ATOM_ERR_DELETED. If called at thread context then the * scheduler will be called during this function which may schedule in one * of the woken threads depending on relative priorities. * * This function can be called from interrupt context, but loops internally * waking up all threads blocking on the queue, so the potential * execution cycles cannot be determined in advance. * * @param[in] qptr Pointer to queue object * * @retval ATOM_OK Success * @retval ATOM_ERR_QUEUE Problem putting a woken thread on the ready queue * @retval ATOM_ERR_TIMER Problem cancelling a timeout on a woken thread */ uint8_t atomQueueDelete (ATOM_QUEUE *qptr) { uint8_t status; CRITICAL_STORE; ATOM_TCB *tcb_ptr; uint8_t woken_threads = FALSE; /* Parameter check */ if (qptr == NULL) { /* Bad pointer */ status = ATOM_ERR_PARAM; } else { /* Default to success status unless errors occur during wakeup */ status = ATOM_OK; /* Wake up all suspended tasks */ while (1) { /* Enter critical region */ CRITICAL_START (); /* Check if any threads are suspended */ if (((tcb_ptr = tcbDequeueHead (&qptr->getSuspQ)) != NULL) || ((tcb_ptr = tcbDequeueHead (&qptr->putSuspQ)) != NULL)) { /* A thread is waiting on a suspend queue */ /* Return error status to the waiting thread */ tcb_ptr->suspend_wake_status = ATOM_ERR_DELETED; /* Put the thread on the ready queue */ if (tcbEnqueuePriority (&tcbReadyQ, tcb_ptr) != ATOM_OK) { /* Exit critical region */ CRITICAL_END (); /* Quit the loop, returning error */ status = ATOM_ERR_QUEUE; break; } /* If there's a timeout on this suspension, cancel it */ if (tcb_ptr->suspend_timo_cb) { /* Cancel the callback */ if (atomTimerCancel (tcb_ptr->suspend_timo_cb) != ATOM_OK) { /* Exit critical region */ CRITICAL_END (); /* Quit the loop, returning error */ status = ATOM_ERR_TIMER; break; } /* Flag as no timeout registered */ tcb_ptr->suspend_timo_cb = NULL; } /* Exit critical region */ CRITICAL_END (); /* Request a reschedule */ woken_threads = TRUE; } /* No more suspended threads */ else { /* Exit critical region and quit the loop */ CRITICAL_END (); break; } } /* Call scheduler if any threads were woken up */ if (woken_threads == TRUE) { /** * Only call the scheduler if we are in thread context, otherwise * it will be called on exiting the ISR by atomIntExit(). */ if (atomCurrentContext()) atomSched (FALSE); } } return (status); } /** * \b atomQueueGet * * Attempt to retrieve a message from a queue. * * If the queue is currently empty, the call will do one of the following * depending on the \c timeout value specified: * * \c timeout == 0 : Call will block until a message is available * \c timeout > 0 : Call will block until a message or the specified timeout * \c timeout == -1 : Return immediately if no message is on the queue * * If a maximum timeout value is specified (\c timeout > 0), and no message * is present on the queue for the specified number of system ticks, the * call will return with \c ATOM_TIMEOUT. * * This function can only be called from interrupt context if the \c timeout * parameter is -1 (in which case it does not block). * * @param[in] qptr Pointer to queue object * @param[in] timeout Max system ticks to block (0 = forever, -1 = no block) * @param[out] msgptr Pointer to which the received message will be copied * * @retval ATOM_OK Success * @retval ATOM_TIMEOUT Queue wait timed out before being woken * @retval ATOM_WOULDBLOCK Called with timeout == -1 but queue was empty * @retval ATOM_ERR_DELETED Queue was deleted while suspended * @retval ATOM_ERR_CONTEXT Not called in thread context and attempted to block * @retval ATOM_ERR_PARAM Bad parameter * @retval ATOM_ERR_QUEUE Problem putting the thread on the suspend queue * @retval ATOM_ERR_TIMER Problem registering the timeout */ uint8_t atomQueueGet (ATOM_QUEUE *qptr, int32_t timeout, uint8_t *msgptr) { CRITICAL_STORE; uint8_t status; QUEUE_TIMER timer_data; ATOM_TIMER timer_cb; ATOM_TCB *curr_tcb_ptr; /* Check parameters */ if ((qptr == NULL) || (msgptr == NULL)) { /* Bad pointer */ status = ATOM_ERR_PARAM; } else { /* Protect access to the queue object and OS queues */ CRITICAL_START (); /* If no messages on the queue, block the calling thread */ if (qptr->num_msgs_stored == 0) { /* If called with timeout >= 0, we should block */ if (timeout >= 0) { /* Queue is empty, block the calling thread */ /* Get the current TCB */ curr_tcb_ptr = atomCurrentContext(); /* Check we are actually in thread context */ if (curr_tcb_ptr) { /* Add current thread to the list suspended on receives */ if (tcbEnqueuePriority (&qptr->getSuspQ, curr_tcb_ptr) == ATOM_OK) { /* Set suspended status for the current thread */ curr_tcb_ptr->suspended = TRUE; /* Track errors */ status = ATOM_OK; /* Register a timer callback if requested */ if (timeout) { /** * Fill out the data needed by the callback to * wake us up. */ timer_data.tcb_ptr = curr_tcb_ptr; timer_data.queue_ptr = qptr; timer_data.suspQ = &qptr->getSuspQ; /* Fill out the timer callback request structure */ timer_cb.cb_func = atomQueueTimerCallback; timer_cb.cb_data = (POINTER)&timer_data; timer_cb.cb_ticks = timeout; /** * Store the timer details in the TCB so that we * can cancel the timer callback if the queue is * put before the timeout occurs. */ curr_tcb_ptr->suspend_timo_cb = &timer_cb; /* Register a callback on timeout */ if (atomTimerRegister (&timer_cb) != ATOM_OK) { /* Timer registration failed */ status = ATOM_ERR_TIMER; /* Clean up and return to the caller */ (void)tcbDequeueEntry (&qptr->getSuspQ, curr_tcb_ptr); curr_tcb_ptr->suspended = FALSE; curr_tcb_ptr->suspend_timo_cb = NULL; } } /* Set no timeout requested */ else { /* No need to cancel timeouts on this one */ curr_tcb_ptr->suspend_timo_cb = NULL; } /* Exit critical region */ CRITICAL_END (); /* Check no errors occurred */ if (status == ATOM_OK) { /** * Current thread now blocking, schedule in a new * one. We already know we are in thread context * so can call the scheduler from here. */ atomSched (FALSE); /** * Normal atomQueuePut() wakeups will set ATOM_OK * status, while timeouts will set ATOM_TIMEOUT * and queue deletions will set ATOM_ERR_DELETED. */ status = curr_tcb_ptr->suspend_wake_status; /** * Check suspend_wake_status. If it is ATOM_OK * then we were woken because a message has been * put on the queue and we can now copy it out. * Otherwise we were woken because we timed out * waiting for a message, or the queue was * deleted, so we should just quit. */ if (status == ATOM_OK) { /* Enter critical region */ CRITICAL_START(); /* Copy the message out of the queue */ status = queue_remove (qptr, msgptr); /* Exit critical region */ CRITICAL_END(); } } } else { /* There was an error putting this thread on the suspend list */ CRITICAL_END (); status = ATOM_ERR_QUEUE; } } else { /* Not currently in thread context, can't suspend */ CRITICAL_END (); status = ATOM_ERR_CONTEXT; } } else { /* timeout == -1, requested not to block and queue is empty */ CRITICAL_END(); status = ATOM_WOULDBLOCK; } } else { /* No need to block, there is a message to copy out of the queue */ status = queue_remove (qptr, msgptr); /* Exit critical region */ CRITICAL_END (); /** * The scheduler may now make a policy decision to thread * switch if we are currently in thread context. If we are * in interrupt context it will be handled by atomIntExit(). */ if (atomCurrentContext()) atomSched (FALSE); } } return (status); } /** * \b atomQueuePut * * Attempt to put a message onto a queue. * * If the queue is currently full, the call will do one of the following * depending on the \c timeout value specified: * * \c timeout == 0 : Call will block until space is available * \c timeout > 0 : Call will block until space or the specified timeout * \c timeout == -1 : Return immediately if the queue is full * * If a maximum timeout value is specified (\c timeout > 0), and no space * is available on the queue for the specified number of system ticks, the * call will return with \c ATOM_TIMEOUT. * * This function can only be called from interrupt context if the \c timeout * parameter is -1 (in which case it does not block and may fail to post a * message if the queue is full). * * @param[in] qptr Pointer to queue object * @param[in] timeout Max system ticks to block (0 = forever, -1 = no block) * @param[out] msgptr Pointer from which the message should be copied out * * @retval ATOM_OK Success * @retval ATOM_WOULDBLOCK Called with timeout == -1 but queue was full * @retval ATOM_TIMEOUT Queue wait timed out before being woken * @retval ATOM_ERR_DELETED Queue was deleted while suspended * @retval ATOM_ERR_CONTEXT Not called in thread context and attempted to block * @retval ATOM_ERR_PARAM Bad parameter * @retval ATOM_ERR_QUEUE Problem putting the thread on the suspend queue * @retval ATOM_ERR_TIMER Problem registering the timeout */ uint8_t atomQueuePut (ATOM_QUEUE *qptr, int32_t timeout, uint8_t *msgptr) { CRITICAL_STORE; uint8_t status; QUEUE_TIMER timer_data; ATOM_TIMER timer_cb; ATOM_TCB *curr_tcb_ptr; /* Check parameters */ if ((qptr == NULL) || (msgptr == NULL)) { /* Bad pointer */ status = ATOM_ERR_PARAM; } else { /* Protect access to the queue object and OS queues */ CRITICAL_START (); /* If queue is full, block the calling thread */ if (qptr->num_msgs_stored == qptr->max_num_msgs) { /* If called with timeout >= 0, we should block */ if (timeout >= 0) { /* Queue is full, block the calling thread */ /* Get the current TCB */ curr_tcb_ptr = atomCurrentContext(); /* Check we are actually in thread context */ if (curr_tcb_ptr) { /* Add current thread to the suspend list on sends */ if (tcbEnqueuePriority (&qptr->putSuspQ, curr_tcb_ptr) == ATOM_OK) { /* Set suspended status for the current thread */ curr_tcb_ptr->suspended = TRUE; /* Track errors */ status = ATOM_OK; /* Register a timer callback if requested */ if (timeout) { /** * Fill out the data needed by the callback to * wake us up. */ timer_data.tcb_ptr = curr_tcb_ptr; timer_data.queue_ptr = qptr; timer_data.suspQ = &qptr->putSuspQ; /* Fill out the timer callback request structure */ timer_cb.cb_func = atomQueueTimerCallback; timer_cb.cb_data = (POINTER)&timer_data; timer_cb.cb_ticks = timeout; /** * Store the timer details in the TCB so that we * can cancel the timer callback if a message is * removed from the queue before the timeout * occurs. */ curr_tcb_ptr->suspend_timo_cb = &timer_cb; /* Register a callback on timeout */ if (atomTimerRegister (&timer_cb) != ATOM_OK) { /* Timer registration failed */ status = ATOM_ERR_TIMER; /* Clean up and return to the caller */ (void)tcbDequeueEntry (&qptr->putSuspQ, curr_tcb_ptr); curr_tcb_ptr->suspended = FALSE; curr_tcb_ptr->suspend_timo_cb = NULL; } } /* Set no timeout requested */ else { /* No need to cancel timeouts on this one */ curr_tcb_ptr->suspend_timo_cb = NULL; } /* Exit critical region */ CRITICAL_END (); /* Check timer registration was successful */ if (status == ATOM_OK) { /** * Current thread now blocking, schedule in a new * one. We already know we are in thread context * so can call the scheduler from here. */ atomSched (FALSE); /** * Normal atomQueueGet() wakeups will set ATOM_OK * status, while timeouts will set ATOM_TIMEOUT * and queue deletions will set ATOM_ERR_DELETED. */ status = curr_tcb_ptr->suspend_wake_status; /** * Check suspend_wake_status. If it is ATOM_OK * then we were woken because a message has been * removed from the queue and we can now add ours. * Otherwise we were woken because we timed out * waiting for a message, or the queue was * deleted, so we should just quit. */ if (status == ATOM_OK) { /* Enter critical region */ CRITICAL_START(); /* Copy the message into the queue */ status = queue_insert (qptr, msgptr); /* Exit critical region */ CRITICAL_END(); } } } else { /* There was an error putting this thread on the suspend list */ CRITICAL_END (); status = ATOM_ERR_QUEUE; } } else { /* Not currently in thread context, can't suspend */ CRITICAL_END (); status = ATOM_ERR_CONTEXT; } } else { /* timeout == -1, cannot block. Just return queue is full */ CRITICAL_END(); status = ATOM_WOULDBLOCK; } } else { /* No need to block, there is space to copy into the queue */ status = queue_insert (qptr, msgptr); /* Exit critical region */ CRITICAL_END (); /** * The scheduler may now make a policy decision to thread * switch if we are currently in thread context. If we are * in interrupt context it will be handled by atomIntExit(). */ if (atomCurrentContext()) atomSched (FALSE); } } return (status); } /** * \b atomQueueTimerCallback * * This is an internal function not for use by application code. * * Timeouts on suspended threads are notified by the timer system through * this generic callback. The timer system calls us back with a pointer to * the relevant \c QUEUE_TIMER object which is used to retrieve the * queue details. * * @param[in] cb_data Pointer to a QUEUE_TIMER object */ static void atomQueueTimerCallback (POINTER cb_data) { QUEUE_TIMER *timer_data_ptr; CRITICAL_STORE; /* Get the QUEUE_TIMER structure pointer */ timer_data_ptr = (QUEUE_TIMER *)cb_data; /* Check parameter is valid */ if (timer_data_ptr) { /* Enter critical region */ CRITICAL_START (); /* Set status to indicate to the waiting thread that it timed out */ timer_data_ptr->tcb_ptr->suspend_wake_status = ATOM_TIMEOUT; /* Flag as no timeout registered */ timer_data_ptr->tcb_ptr->suspend_timo_cb = NULL; /** * Remove this thread from the queue's suspend list. Handles threads * suspended on the receive list as well as the send list. */ (void)tcbDequeueEntry (timer_data_ptr->suspQ, timer_data_ptr->tcb_ptr); /* Put the thread on the ready queue */ (void)tcbEnqueuePriority (&tcbReadyQ, timer_data_ptr->tcb_ptr); /* Exit critical region */ CRITICAL_END (); /** * Note that we don't call the scheduler now as it will be called * when we exit the ISR by atomIntExit(). */ } } /** * \b queue_remove * * This is an internal function not for use by application code. * * Removes a message from a queue. Assumes that there is a message present, * which is already checked by the calling functions with interrupts locked * out. * * Also wakes up a suspended thread if there are any waiting to send on the * queue. * * Assumes interrupts are already locked out. * * @param[in] qptr Pointer to an ATOM_QUEUE object * @param[in] msgptr Destination pointer for the message to be copied into * * @retval ATOM_OK Success * @retval ATOM_ERR_PARAM Bad parameter * @retval ATOM_ERR_QUEUE Problem putting a thread on the ready queue * @retval ATOM_ERR_TIMER Problem cancelling a timeout */ static uint8_t queue_remove (ATOM_QUEUE *qptr, uint8_t* msgptr) { uint8_t status; ATOM_TCB *tcb_ptr; /* Check parameters */ if ((qptr == NULL) || (msgptr == NULL)) { /* Bad pointer */ status = ATOM_ERR_PARAM; } else { /* There is a message on the queue, copy it out */ memcpy (msgptr, (qptr->buff_ptr + qptr->remove_index), qptr->unit_size); qptr->remove_index += qptr->unit_size; qptr->num_msgs_stored--; /* Check if the remove index should now wrap to the beginning */ if (qptr->remove_index >= (qptr->unit_size * qptr->max_num_msgs)) qptr->remove_index = 0; /** * If there are threads waiting to send, wake one up now. Waiting * threads are woken up in priority order, with same-priority * threads woken up in FIFO order. */ tcb_ptr = tcbDequeueHead (&qptr->putSuspQ); if (tcb_ptr) { /* Move the waiting thread to the ready queue */ if (tcbEnqueuePriority (&tcbReadyQ, tcb_ptr) == ATOM_OK) { /* Set OK status to be returned to the waiting thread */ tcb_ptr->suspend_wake_status = ATOM_OK; /* If there's a timeout on this suspension, cancel it */ if ((tcb_ptr->suspend_timo_cb != NULL) && (atomTimerCancel (tcb_ptr->suspend_timo_cb) != ATOM_OK)) { /* There was a problem cancelling a timeout */ status = ATOM_ERR_TIMER; } else { /* Flag as no timeout registered */ tcb_ptr->suspend_timo_cb = NULL; /* Successful */ status = ATOM_OK; } } else { /** * There was a problem putting the thread on the ready * queue. */ status = ATOM_ERR_QUEUE; } } else { /* There were no threads waiting to send */ status = ATOM_OK; } } return (status); } /** * \b queue_insert * * This is an internal function not for use by application code. * * Inserts a message onto a queue. Assumes that the queue has space for one * message, which has already been checked by the calling function with * interrupts locked out. * * Also wakes up a suspended thread if there are any waiting to receive on the * queue. * * Assumes interrupts are already locked out. * * @param[in] qptr Pointer to an ATOM_QUEUE object * @param[in] msgptr Source pointer for the message to be copied out of * * @retval ATOM_OK Success * @retval ATOM_ERR_PARAM Bad parameter * @retval ATOM_ERR_QUEUE Problem putting a thread on the ready queue * @retval ATOM_ERR_TIMER Problem cancelling a timeout */ static uint8_t queue_insert (ATOM_QUEUE *qptr, uint8_t* msgptr) { uint8_t status; ATOM_TCB *tcb_ptr; /* Check parameters */ if ((qptr == NULL) || (msgptr == NULL)) { /* Bad pointer */ status = ATOM_ERR_PARAM; } else { /* There is space in the queue, copy it in */ memcpy ((qptr->buff_ptr + qptr->insert_index), msgptr, qptr->unit_size); qptr->insert_index += qptr->unit_size; qptr->num_msgs_stored++; /* Check if the insert index should now wrap to the beginning */ if (qptr->insert_index >= (qptr->unit_size * qptr->max_num_msgs)) qptr->insert_index = 0; /** * If there are threads waiting to receive, wake one up now. Waiting * threads are woken up in priority order, with same-priority * threads woken up in FIFO order. */ tcb_ptr = tcbDequeueHead (&qptr->getSuspQ); if (tcb_ptr) { /* Move the waiting thread to the ready queue */ if (tcbEnqueuePriority (&tcbReadyQ, tcb_ptr) == ATOM_OK) { /* Set OK status to be returned to the waiting thread */ tcb_ptr->suspend_wake_status = ATOM_OK; /* If there's a timeout on this suspension, cancel it */ if ((tcb_ptr->suspend_timo_cb != NULL) && (atomTimerCancel (tcb_ptr->suspend_timo_cb) != ATOM_OK)) { /* There was a problem cancelling a timeout */ status = ATOM_ERR_TIMER; } else { /* Flag as no timeout registered */ tcb_ptr->suspend_timo_cb = NULL; /* Successful */ status = ATOM_OK; } } else { /** * There was a problem putting the thread on the ready * queue. */ status = ATOM_ERR_QUEUE; } } else { /* There were no threads waiting to send */ status = ATOM_OK; } } return (status); }