/* Copyright (c) 2012 The Chromium OS Authors. All rights reserved. * Use of this source code is governed by a BSD-style license that can be * found in the LICENSE file. */ #ifndef _GNU_SOURCE #define _GNU_SOURCE /* for ppoll */ #endif #include <pthread.h> #include <poll.h> #include <stdbool.h> #include <sys/param.h> #include <syslog.h> #include "audio_thread_log.h" #include "cras_audio_thread_monitor.h" #include "cras_config.h" #include "cras_fmt_conv.h" #include "cras_iodev.h" #include "cras_rstream.h" #include "cras_system_state.h" #include "cras_types.h" #include "cras_util.h" #include "dev_stream.h" #include "audio_thread.h" #include "utlist.h" #define MIN_PROCESS_TIME_US 500 /* 0.5ms - min amount of time to mix/src. */ #define SLEEP_FUZZ_FRAMES 10 /* # to consider "close enough" to sleep frames. */ #define MIN_READ_WAIT_US 2000 /* 2ms */ /* * # to check whether a busyloop event happens */ #define MAX_CONTINUOUS_ZERO_SLEEP_COUNT 2 /* Messages that can be sent from the main context to the audio thread. */ enum AUDIO_THREAD_COMMAND { AUDIO_THREAD_ADD_OPEN_DEV, AUDIO_THREAD_RM_OPEN_DEV, AUDIO_THREAD_IS_DEV_OPEN, AUDIO_THREAD_ADD_STREAM, AUDIO_THREAD_DISCONNECT_STREAM, AUDIO_THREAD_STOP, AUDIO_THREAD_DUMP_THREAD_INFO, AUDIO_THREAD_DRAIN_STREAM, AUDIO_THREAD_CONFIG_GLOBAL_REMIX, AUDIO_THREAD_DEV_START_RAMP, AUDIO_THREAD_REMOVE_CALLBACK, AUDIO_THREAD_AEC_DUMP, }; struct audio_thread_msg { size_t length; enum AUDIO_THREAD_COMMAND id; }; struct audio_thread_config_global_remix { struct audio_thread_msg header; struct cras_fmt_conv *fmt_conv; }; struct audio_thread_open_device_msg { struct audio_thread_msg header; struct cras_iodev *dev; }; struct audio_thread_rm_callback_msg { struct audio_thread_msg header; int fd; }; struct audio_thread_add_rm_stream_msg { struct audio_thread_msg header; struct cras_rstream *stream; struct cras_iodev **devs; unsigned int num_devs; }; struct audio_thread_dump_debug_info_msg { struct audio_thread_msg header; struct audio_debug_info *info; }; struct audio_thread_dev_start_ramp_msg { struct audio_thread_msg header; struct cras_iodev *dev; enum CRAS_IODEV_RAMP_REQUEST request; }; struct audio_thread_aec_dump_msg { struct audio_thread_msg header; cras_stream_id_t stream_id; unsigned int start; /* */ int fd; }; /* Audio thread logging. */ struct audio_thread_event_log *atlog; static struct iodev_callback_list *iodev_callbacks; static struct timespec longest_wake; struct iodev_callback_list { int fd; int is_write; int enabled; thread_callback cb; void *cb_data; struct pollfd *pollfd; struct iodev_callback_list *prev, *next; }; static void _audio_thread_add_callback(int fd, thread_callback cb, void *data, int is_write) { struct iodev_callback_list *iodev_cb; /* Don't add iodev_cb twice */ DL_FOREACH(iodev_callbacks, iodev_cb) if (iodev_cb->fd == fd && iodev_cb->cb_data == data) return; iodev_cb = (struct iodev_callback_list *)calloc(1, sizeof(*iodev_cb)); iodev_cb->fd = fd; iodev_cb->cb = cb; iodev_cb->cb_data = data; iodev_cb->enabled = 1; iodev_cb->is_write = is_write; DL_APPEND(iodev_callbacks, iodev_cb); } void audio_thread_add_callback(int fd, thread_callback cb, void *data) { _audio_thread_add_callback(fd, cb, data, 0); } void audio_thread_add_write_callback(int fd, thread_callback cb, void *data) { _audio_thread_add_callback(fd, cb, data, 1); } void audio_thread_rm_callback(int fd) { struct iodev_callback_list *iodev_cb; DL_FOREACH(iodev_callbacks, iodev_cb) { if (iodev_cb->fd == fd) { DL_DELETE(iodev_callbacks, iodev_cb); free(iodev_cb); return; } } } void audio_thread_enable_callback(int fd, int enabled) { struct iodev_callback_list *iodev_cb; DL_FOREACH(iodev_callbacks, iodev_cb) { if (iodev_cb->fd == fd) { iodev_cb->enabled = !!enabled; return; } } } /* Sends a response (error code) from the audio thread to the main thread. * Indicates that the last message sent to the audio thread has been handled * with an error code of rc. * Args: * thread - thread responding to command. * rc - Result code to send back to the main thread. * Returns: * The number of bytes written to the main thread. */ static int audio_thread_send_response(struct audio_thread *thread, int rc) { return write(thread->to_main_fds[1], &rc, sizeof(rc)); } /* Reads from a file descriptor until all bytes are read. * * Args: * fd - file descriptor to read * buf - the buffer to be written. * count - the number of bytes to read from fd * Returns: * |count| on success, negative error code on failure. */ static int read_until_finished(int fd, void *buf, size_t count) { int nread, count_left = count; while (count_left > 0) { nread = read(fd, (uint8_t *)buf + count - count_left, count_left); if (nread < 0) { if (errno == EINTR) continue; else return nread; } else if (nread == 0) { syslog(LOG_ERR, "Pipe has been closed."); return -EPIPE; } count_left -= nread; } return count; } /* Reads a command from the main thread. Called from the playback/capture * thread. This will read the next available command from the main thread and * put it in buf. * Args: * thread - thread reading the command. * buf - Message is stored here on return. * max_len - maximum length of message to put into buf. * Returns: * 0 on success, negative error code on failure. */ static int audio_thread_read_command(struct audio_thread *thread, uint8_t *buf, size_t max_len) { int to_read, nread, rc; struct audio_thread_msg *msg = (struct audio_thread_msg *)buf; /* Get the length of the message first */ nread = read_until_finished( thread->to_thread_fds[0], buf, sizeof(msg->length)); if (nread < 0) return nread; if (msg->length > max_len) return -ENOMEM; to_read = msg->length - sizeof(msg->length); rc = read_until_finished(thread->to_thread_fds[0], &buf[0] + sizeof(msg->length), to_read); if (rc < 0) return rc; return 0; } /* Builds an initial buffer to avoid an underrun. Adds min_level of latency. */ static void fill_odevs_zeros_min_level(struct cras_iodev *odev) { cras_iodev_fill_odev_zeros(odev, odev->min_buffer_level); } /* Append a new stream to a specified set of iodevs. */ static int append_stream(struct audio_thread *thread, struct cras_rstream *stream, struct cras_iodev **iodevs, unsigned int num_iodevs) { struct open_dev *open_dev; struct cras_iodev *dev; struct dev_stream *out; struct timespec init_cb_ts; const struct timespec *stream_ts; unsigned int i; bool cb_ts_set = false; int rc = 0; for (i = 0; i < num_iodevs; i++) { DL_SEARCH_SCALAR(thread->open_devs[stream->direction], open_dev, dev, iodevs[i]); if (!open_dev) continue; dev = iodevs[i]; DL_SEARCH_SCALAR(dev->streams, out, stream, stream); if (out) continue; /* For output, if open device already has stream, get the earliest next * callback time from these streams to align with. Otherwise, use the * timestamp now as the initial callback time for new stream so dev_stream * can set its own schedule. * If next callback time is too far from now, it will block writing and * lower hardware level. Else if we fetch the new stream immediately, it * may cause device buffer level stack up. */ if (stream->direction == CRAS_STREAM_OUTPUT && dev->streams) { DL_FOREACH(dev->streams, out) { stream_ts = dev_stream_next_cb_ts(out); if (stream_ts && (!cb_ts_set || timespec_after(&init_cb_ts, stream_ts))) { init_cb_ts = *stream_ts; cb_ts_set = true; } } } if (!cb_ts_set) clock_gettime(CLOCK_MONOTONIC_RAW, &init_cb_ts); out = dev_stream_create(stream, dev->info.idx, dev->ext_format, dev, &init_cb_ts); if (!out) { rc = -EINVAL; break; } /* When the first input stream is added, flush the input buffer * so that we can read from multiple input devices of the same * buffer level. */ if ((stream->direction == CRAS_STREAM_INPUT) && !dev->streams) { int num_flushed = dev->flush_buffer(dev); if (num_flushed < 0) { rc = num_flushed; break; } } cras_iodev_add_stream(dev, out); /* For multiple inputs case, if the new stream is not the first * one to append, copy the 1st stream's offset to it so that * future read offsets can be aligned across all input streams * to avoid the deadlock scenario when multiple streams reading * from multiple devices. */ if ((stream->direction == CRAS_STREAM_INPUT) && (dev->streams != out)) { unsigned int offset = cras_iodev_stream_offset(dev, dev->streams); if (offset > stream->cb_threshold) offset = stream->cb_threshold; cras_iodev_stream_written(dev, out, offset); offset = cras_rstream_dev_offset(dev->streams->stream, dev->info.idx); if (offset > stream->cb_threshold) offset = stream->cb_threshold; cras_rstream_dev_offset_update(stream, offset, dev->info.idx); } } if (rc) { DL_FOREACH(thread->open_devs[stream->direction], open_dev) { dev = open_dev->dev; DL_SEARCH_SCALAR(dev->streams, out, stream, stream); if (!out) continue; cras_iodev_rm_stream(dev, stream); dev_stream_destroy(out); } } return rc; } /* Handles messages from main thread to add a new active device. */ static int thread_add_open_dev(struct audio_thread *thread, struct cras_iodev *iodev) { struct open_dev *adev; DL_SEARCH_SCALAR(thread->open_devs[iodev->direction], adev, dev, iodev); if (adev) return -EEXIST; adev = (struct open_dev *)calloc(1, sizeof(*adev)); adev->dev = iodev; /* * Start output devices by padding the output. This avoids a burst of * audio callbacks when the stream starts */ if (iodev->direction == CRAS_STREAM_OUTPUT) fill_odevs_zeros_min_level(iodev); ATLOG(atlog, AUDIO_THREAD_DEV_ADDED, iodev->info.idx, 0, 0); DL_APPEND(thread->open_devs[iodev->direction], adev); return 0; } /* Handles messages from the main thread to remove an active device. */ static int thread_rm_open_dev(struct audio_thread *thread, struct cras_iodev *iodev) { struct open_dev *adev = dev_io_find_open_dev( thread->open_devs[iodev->direction], iodev); if (!adev) return -EINVAL; dev_io_rm_open_dev(&thread->open_devs[iodev->direction], adev); return 0; } /* * Handles message from the main thread to check if an iodev is in the * open dev list. */ static int thread_is_dev_open(struct audio_thread *thread, struct cras_iodev *iodev) { struct open_dev *adev = dev_io_find_open_dev( thread->open_devs[iodev->direction], iodev); return !!adev; } /* Handles messages from the main thread to start ramping on a device. */ static int thread_dev_start_ramp(struct audio_thread *thread, struct cras_iodev *iodev, enum CRAS_IODEV_RAMP_REQUEST request) { /* Do nothing if device wasn't already in the active dev list. */ struct open_dev *adev = dev_io_find_open_dev( thread->open_devs[iodev->direction], iodev); if (!adev) return -EINVAL; return cras_iodev_start_ramp(iodev, request); } /* Return non-zero if the stream is attached to any device. */ static int thread_find_stream(struct audio_thread *thread, struct cras_rstream *rstream) { struct open_dev *open_dev; struct dev_stream *s; DL_FOREACH(thread->open_devs[rstream->direction], open_dev) { DL_FOREACH(open_dev->dev->streams, s) { if (s->stream == rstream) return 1; } } return 0; } /* Handles the disconnect_stream message from the main thread. */ static int thread_disconnect_stream(struct audio_thread* thread, struct cras_rstream* stream, struct cras_iodev *dev) { int rc; if (!thread_find_stream(thread, stream)) return 0; rc = dev_io_remove_stream(&thread->open_devs[stream->direction], stream, dev); return rc; } /* Initiates draining of a stream or returns the status of a draining stream. * If the stream has completed draining the thread forfeits ownership and must * never reference it again. Returns the number of milliseconds it will take to * finish draining, a minimum of one ms if any samples remain. */ static int thread_drain_stream_ms_remaining(struct audio_thread *thread, struct cras_rstream *rstream) { int fr_in_buff; struct cras_audio_shm *shm; if (rstream->direction != CRAS_STREAM_OUTPUT) return 0; shm = cras_rstream_output_shm(rstream); fr_in_buff = cras_shm_get_frames(shm); if (fr_in_buff <= 0) return 0; cras_rstream_set_is_draining(rstream, 1); return 1 + cras_frames_to_ms(fr_in_buff, rstream->format.frame_rate); } /* Handles a request to begin draining and return the amount of time left to * draing a stream. */ static int thread_drain_stream(struct audio_thread *thread, struct cras_rstream *rstream) { int ms_left; if (!thread_find_stream(thread, rstream)) return 0; ms_left = thread_drain_stream_ms_remaining(thread, rstream); if (ms_left == 0) dev_io_remove_stream(&thread->open_devs[rstream->direction], rstream, NULL); return ms_left; } /* Handles the add_stream message from the main thread. */ static int thread_add_stream(struct audio_thread *thread, struct cras_rstream *stream, struct cras_iodev **iodevs, unsigned int num_iodevs) { int rc; rc = append_stream(thread, stream, iodevs, num_iodevs); if (rc < 0) return rc; ATLOG(atlog, AUDIO_THREAD_STREAM_ADDED, stream->stream_id, num_iodevs ? iodevs[0]->info.idx : 0, num_iodevs); return 0; } /* Starts or stops aec dump task. */ static int thread_set_aec_dump(struct audio_thread *thread, cras_stream_id_t stream_id, unsigned int start, int fd) { struct open_dev *idev_list = thread->open_devs[CRAS_STREAM_INPUT]; struct open_dev *adev; struct dev_stream *stream; DL_FOREACH(idev_list, adev) { if (!cras_iodev_is_open(adev->dev)) continue; DL_FOREACH(adev->dev->streams, stream) { if ((stream->stream->apm_list == NULL) || (stream->stream->stream_id != stream_id)) continue; cras_apm_list_set_aec_dump(stream->stream->apm_list, adev->dev, start, fd); } } return 0; } /* Stop the playback thread */ static void terminate_pb_thread() { pthread_exit(0); } static void append_dev_dump_info(struct audio_dev_debug_info *di, struct open_dev *adev) { struct cras_audio_format *fmt = adev->dev->ext_format; strncpy(di->dev_name, adev->dev->info.name, sizeof(di->dev_name)); di->buffer_size = adev->dev->buffer_size; di->min_buffer_level = adev->dev->min_buffer_level; di->min_cb_level = adev->dev->min_cb_level; di->max_cb_level = adev->dev->max_cb_level; di->direction = adev->dev->direction; di->num_underruns = cras_iodev_get_num_underruns(adev->dev); di->num_severe_underruns = cras_iodev_get_num_severe_underruns( adev->dev); di->highest_hw_level = adev->dev->highest_hw_level; if (fmt) { di->frame_rate = fmt->frame_rate; di->num_channels = fmt->num_channels; di->est_rate_ratio = cras_iodev_get_est_rate_ratio(adev->dev); } else { di->frame_rate = 0; di->num_channels = 0; di->est_rate_ratio = 0; } } /* Put stream info for the given stream into the info struct. */ static void append_stream_dump_info(struct audio_debug_info *info, struct dev_stream *stream, unsigned int dev_idx, int index) { struct audio_stream_debug_info *si; si = &info->streams[index]; si->stream_id = stream->stream->stream_id; si->dev_idx = dev_idx; si->direction = stream->stream->direction; si->stream_type = stream->stream->stream_type; si->buffer_frames = stream->stream->buffer_frames; si->cb_threshold = stream->stream->cb_threshold; si->frame_rate = stream->stream->format.frame_rate; si->num_channels = stream->stream->format.num_channels; memcpy(si->channel_layout, stream->stream->format.channel_layout, sizeof(si->channel_layout)); si->longest_fetch_sec = stream->stream->longest_fetch_interval.tv_sec; si->longest_fetch_nsec = stream->stream->longest_fetch_interval.tv_nsec; si->num_overruns = cras_shm_num_overruns(&stream->stream->shm); si->effects = cras_apm_list_get_effects(stream->stream->apm_list); longest_wake.tv_sec = 0; longest_wake.tv_nsec = 0; } /* Handle a message sent to the playback thread */ static int handle_playback_thread_message(struct audio_thread *thread) { uint8_t buf[256]; struct audio_thread_msg *msg = (struct audio_thread_msg *)buf; int ret = 0; int err; err = audio_thread_read_command(thread, buf, 256); if (err < 0) return err; ATLOG(atlog, AUDIO_THREAD_PB_MSG, msg->id, 0, 0); switch (msg->id) { case AUDIO_THREAD_ADD_STREAM: { struct audio_thread_add_rm_stream_msg *amsg; amsg = (struct audio_thread_add_rm_stream_msg *)msg; ATLOG(atlog, AUDIO_THREAD_WRITE_STREAMS_WAIT, amsg->stream->stream_id, 0, 0); ret = thread_add_stream(thread, amsg->stream, amsg->devs, amsg->num_devs); break; } case AUDIO_THREAD_DISCONNECT_STREAM: { struct audio_thread_add_rm_stream_msg *rmsg; rmsg = (struct audio_thread_add_rm_stream_msg *)msg; ret = thread_disconnect_stream(thread, rmsg->stream, rmsg->devs[0]); break; } case AUDIO_THREAD_ADD_OPEN_DEV: { struct audio_thread_open_device_msg *rmsg; rmsg = (struct audio_thread_open_device_msg *)msg; ret = thread_add_open_dev(thread, rmsg->dev); break; } case AUDIO_THREAD_RM_OPEN_DEV: { struct audio_thread_open_device_msg *rmsg; rmsg = (struct audio_thread_open_device_msg *)msg; ret = thread_rm_open_dev(thread, rmsg->dev); break; } case AUDIO_THREAD_IS_DEV_OPEN: { struct audio_thread_open_device_msg *rmsg; rmsg = (struct audio_thread_open_device_msg *)msg; ret = thread_is_dev_open(thread, rmsg->dev); break; } case AUDIO_THREAD_STOP: ret = 0; err = audio_thread_send_response(thread, ret); if (err < 0) return err; terminate_pb_thread(); break; case AUDIO_THREAD_DUMP_THREAD_INFO: { struct dev_stream *curr; struct open_dev *adev; struct audio_thread_dump_debug_info_msg *dmsg; struct audio_debug_info *info; unsigned int num_streams = 0; unsigned int num_devs = 0; ret = 0; dmsg = (struct audio_thread_dump_debug_info_msg *)msg; info = dmsg->info; /* Go through all open devices. */ DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) { append_dev_dump_info(&info->devs[num_devs], adev); if (++num_devs == MAX_DEBUG_DEVS) break; DL_FOREACH(adev->dev->streams, curr) { if (num_streams == MAX_DEBUG_STREAMS) break; append_stream_dump_info(info, curr, adev->dev->info.idx, num_streams++); } } DL_FOREACH(thread->open_devs[CRAS_STREAM_INPUT], adev) { if (num_devs == MAX_DEBUG_DEVS) break; append_dev_dump_info(&info->devs[num_devs], adev); DL_FOREACH(adev->dev->streams, curr) { if (num_streams == MAX_DEBUG_STREAMS) break; append_stream_dump_info(info, curr, adev->dev->info.idx, num_streams++); } ++num_devs; } info->num_devs = num_devs; info->num_streams = num_streams; memcpy(&info->log, atlog, sizeof(info->log)); break; } case AUDIO_THREAD_DRAIN_STREAM: { struct audio_thread_add_rm_stream_msg *rmsg; rmsg = (struct audio_thread_add_rm_stream_msg *)msg; ret = thread_drain_stream(thread, rmsg->stream); break; } case AUDIO_THREAD_REMOVE_CALLBACK: { struct audio_thread_rm_callback_msg *rmsg; rmsg = (struct audio_thread_rm_callback_msg *)msg; audio_thread_rm_callback(rmsg->fd); break; } case AUDIO_THREAD_CONFIG_GLOBAL_REMIX: { struct audio_thread_config_global_remix *rmsg; void *rsp; /* Respond the pointer to the old remix converter, so it can be * freed later in main thread. */ rsp = (void *)thread->remix_converter; rmsg = (struct audio_thread_config_global_remix *)msg; thread->remix_converter = rmsg->fmt_conv; return write(thread->to_main_fds[1], &rsp, sizeof(rsp)); } case AUDIO_THREAD_DEV_START_RAMP: { struct audio_thread_dev_start_ramp_msg *rmsg; rmsg = (struct audio_thread_dev_start_ramp_msg*)msg; ret = thread_dev_start_ramp(thread, rmsg->dev, rmsg->request); break; } case AUDIO_THREAD_AEC_DUMP: { struct audio_thread_aec_dump_msg *rmsg; rmsg = (struct audio_thread_aec_dump_msg *)msg; ret = thread_set_aec_dump(thread, rmsg->stream_id, rmsg->start, rmsg->fd); break; } default: ret = -EINVAL; break; } err = audio_thread_send_response(thread, ret); if (err < 0) return err; return ret; } /* Fills the time that the next stream needs to be serviced. */ static int get_next_stream_wake_from_list(struct dev_stream *streams, struct timespec *min_ts) { struct dev_stream *dev_stream; int ret = 0; /* The total number of streams to wait on. */ DL_FOREACH(streams, dev_stream) { const struct timespec *next_cb_ts; if (cras_rstream_get_is_draining(dev_stream->stream) && dev_stream_playback_frames(dev_stream) <= 0) continue; if (!dev_stream_can_fetch(dev_stream)) continue; next_cb_ts = dev_stream_next_cb_ts(dev_stream); if (!next_cb_ts) continue; ATLOG(atlog, AUDIO_THREAD_STREAM_SLEEP_TIME, dev_stream->stream->stream_id, next_cb_ts->tv_sec, next_cb_ts->tv_nsec); if (timespec_after(min_ts, next_cb_ts)) *min_ts = *next_cb_ts; ret++; } return ret; } static int get_next_output_wake(struct open_dev **odevs, struct timespec *min_ts, const struct timespec *now) { struct open_dev *adev; int ret = 0; DL_FOREACH(*odevs, adev) ret += get_next_stream_wake_from_list( adev->dev->streams, min_ts); DL_FOREACH(*odevs, adev) { if (!cras_iodev_odev_should_wake(adev->dev)) continue; ret++; if (timespec_after(min_ts, &adev->wake_ts)) *min_ts = adev->wake_ts; } return ret; } /* Returns the number of active streams plus the number of active devices. */ static int fill_next_sleep_interval(struct audio_thread *thread, struct timespec *ts) { struct timespec min_ts; struct timespec now; int ret; ts->tv_sec = 0; ts->tv_nsec = 0; /* Limit the sleep time to 20 seconds. */ min_ts.tv_sec = 20; min_ts.tv_nsec = 0; clock_gettime(CLOCK_MONOTONIC_RAW, &now); add_timespecs(&min_ts, &now); ret = get_next_output_wake(&thread->open_devs[CRAS_STREAM_OUTPUT], &min_ts, &now); ret += dev_io_next_input_wake(&thread->open_devs[CRAS_STREAM_INPUT], &min_ts); if (timespec_after(&min_ts, &now)) subtract_timespecs(&min_ts, &now, ts); return ret; } static struct pollfd *add_pollfd(struct audio_thread *thread, int fd, int is_write) { thread->pollfds[thread->num_pollfds].fd = fd; if (is_write) thread->pollfds[thread->num_pollfds].events = POLLOUT; else thread->pollfds[thread->num_pollfds].events = POLLIN; thread->num_pollfds++; if (thread->num_pollfds >= thread->pollfds_size) { thread->pollfds_size *= 2; thread->pollfds = (struct pollfd *)realloc(thread->pollfds, sizeof(*thread->pollfds) * thread->pollfds_size); return NULL; } return &thread->pollfds[thread->num_pollfds - 1]; } static int continuous_zero_sleep_count = 0; static void check_busyloop(struct timespec* wait_ts) { if(wait_ts->tv_sec == 0 && wait_ts->tv_nsec == 0) { continuous_zero_sleep_count ++; if(continuous_zero_sleep_count == MAX_CONTINUOUS_ZERO_SLEEP_COUNT) cras_audio_thread_busyloop(); } else { continuous_zero_sleep_count = 0; } } /* For playback, fill the audio buffer when needed, for capture, pull out * samples when they are ready. * This thread will attempt to run at a high priority to allow for low latency * streams. This thread sleeps while the device plays back or captures audio, * it will wake up as little as it can while avoiding xruns. It can also be * woken by sending it a message using the "audio_thread_post_message" function. */ static void *audio_io_thread(void *arg) { struct audio_thread *thread = (struct audio_thread *)arg; struct open_dev *adev; struct dev_stream *curr; struct timespec ts, now, last_wake; int msg_fd; int rc; msg_fd = thread->to_thread_fds[0]; /* Attempt to get realtime scheduling */ if (cras_set_rt_scheduling(CRAS_SERVER_RT_THREAD_PRIORITY) == 0) cras_set_thread_priority(CRAS_SERVER_RT_THREAD_PRIORITY); last_wake.tv_sec = 0; longest_wake.tv_sec = 0; longest_wake.tv_nsec = 0; thread->pollfds[0].fd = msg_fd; thread->pollfds[0].events = POLLIN; while (1) { struct timespec *wait_ts; struct iodev_callback_list *iodev_cb; wait_ts = NULL; thread->num_pollfds = 1; /* device opened */ dev_io_run(&thread->open_devs[CRAS_STREAM_OUTPUT], &thread->open_devs[CRAS_STREAM_INPUT], thread->remix_converter); if (fill_next_sleep_interval(thread, &ts)) wait_ts = &ts; restart_poll_loop: thread->num_pollfds = 1; DL_FOREACH(iodev_callbacks, iodev_cb) { if (!iodev_cb->enabled) continue; iodev_cb->pollfd = add_pollfd(thread, iodev_cb->fd, iodev_cb->is_write); if (!iodev_cb->pollfd) goto restart_poll_loop; } /* TODO(dgreid) - once per rstream not per dev_stream */ DL_FOREACH(thread->open_devs[CRAS_STREAM_OUTPUT], adev) { DL_FOREACH(adev->dev->streams, curr) { int fd = dev_stream_poll_stream_fd(curr); if (fd < 0) continue; if (!add_pollfd(thread, fd, 0)) goto restart_poll_loop; } } DL_FOREACH(thread->open_devs[CRAS_STREAM_INPUT], adev) { DL_FOREACH(adev->dev->streams, curr) { int fd = dev_stream_poll_stream_fd(curr); if (fd < 0) continue; if (!add_pollfd(thread, fd, 0)) goto restart_poll_loop; } } if (last_wake.tv_sec) { struct timespec this_wake; clock_gettime(CLOCK_MONOTONIC_RAW, &now); subtract_timespecs(&now, &last_wake, &this_wake); if (timespec_after(&this_wake, &longest_wake)) longest_wake = this_wake; } ATLOG(atlog, AUDIO_THREAD_SLEEP, wait_ts ? wait_ts->tv_sec : 0, wait_ts ? wait_ts->tv_nsec : 0, longest_wake.tv_nsec); if(wait_ts) check_busyloop(wait_ts); rc = ppoll(thread->pollfds, thread->num_pollfds, wait_ts, NULL); clock_gettime(CLOCK_MONOTONIC_RAW, &last_wake); ATLOG(atlog, AUDIO_THREAD_WAKE, rc, 0, 0); if (rc <= 0) continue; if (thread->pollfds[0].revents & POLLIN) { rc = handle_playback_thread_message(thread); if (rc < 0) syslog(LOG_INFO, "handle message %d", rc); } DL_FOREACH(iodev_callbacks, iodev_cb) { if (iodev_cb->pollfd && iodev_cb->pollfd->revents & (POLLIN | POLLOUT)) { ATLOG(atlog, AUDIO_THREAD_IODEV_CB, iodev_cb->is_write, 0, 0); iodev_cb->cb(iodev_cb->cb_data); } } } return NULL; } /* Write a message to the playback thread and wait for an ack, This keeps these * operations synchronous for the main server thread. For instance when the * RM_STREAM message is sent, the stream can be deleted after the function * returns. Making this synchronous also allows the thread to return an error * code that can be handled by the caller. * Args: * thread - thread to receive message. * msg - The message to send. * Returns: * A return code from the message handler in the thread. */ static int audio_thread_post_message(struct audio_thread *thread, struct audio_thread_msg *msg) { int err, rsp; err = write(thread->to_thread_fds[1], msg, msg->length); if (err < 0) { syslog(LOG_ERR, "Failed to post message to thread."); return err; } /* Synchronous action, wait for response. */ err = read_until_finished(thread->to_main_fds[0], &rsp, sizeof(rsp)); if (err < 0) { syslog(LOG_ERR, "Failed to read reply from thread."); return err; } return rsp; } static void init_open_device_msg(struct audio_thread_open_device_msg *msg, enum AUDIO_THREAD_COMMAND id, struct cras_iodev *dev) { memset(msg, 0, sizeof(*msg)); msg->header.id = id; msg->header.length = sizeof(*msg); msg->dev = dev; } static void init_add_rm_stream_msg(struct audio_thread_add_rm_stream_msg *msg, enum AUDIO_THREAD_COMMAND id, struct cras_rstream *stream, struct cras_iodev **devs, unsigned int num_devs) { memset(msg, 0, sizeof(*msg)); msg->header.id = id; msg->header.length = sizeof(*msg); msg->stream = stream; msg->devs = devs; msg->num_devs = num_devs; } static void init_dump_debug_info_msg( struct audio_thread_dump_debug_info_msg *msg, struct audio_debug_info *info) { memset(msg, 0, sizeof(*msg)); msg->header.id = AUDIO_THREAD_DUMP_THREAD_INFO; msg->header.length = sizeof(*msg); msg->info = info; } static void init_config_global_remix_msg( struct audio_thread_config_global_remix *msg) { memset(msg, 0, sizeof(*msg)); msg->header.id = AUDIO_THREAD_CONFIG_GLOBAL_REMIX; msg->header.length = sizeof(*msg); } static void init_device_start_ramp_msg( struct audio_thread_dev_start_ramp_msg *msg, enum AUDIO_THREAD_COMMAND id, struct cras_iodev *dev, enum CRAS_IODEV_RAMP_REQUEST request) { memset(msg, 0, sizeof(*msg)); msg->header.id = id; msg->header.length = sizeof(*msg); msg->dev = dev; msg->request = request; } /* Exported Interface */ int audio_thread_add_stream(struct audio_thread *thread, struct cras_rstream *stream, struct cras_iodev **devs, unsigned int num_devs) { struct audio_thread_add_rm_stream_msg msg; assert(thread && stream); if (!thread->started) return -EINVAL; init_add_rm_stream_msg(&msg, AUDIO_THREAD_ADD_STREAM, stream, devs, num_devs); return audio_thread_post_message(thread, &msg.header); } int audio_thread_disconnect_stream(struct audio_thread *thread, struct cras_rstream *stream, struct cras_iodev *dev) { struct audio_thread_add_rm_stream_msg msg; assert(thread && stream); init_add_rm_stream_msg(&msg, AUDIO_THREAD_DISCONNECT_STREAM, stream, &dev, 0); return audio_thread_post_message(thread, &msg.header); } int audio_thread_drain_stream(struct audio_thread *thread, struct cras_rstream *stream) { struct audio_thread_add_rm_stream_msg msg; assert(thread && stream); init_add_rm_stream_msg(&msg, AUDIO_THREAD_DRAIN_STREAM, stream, NULL, 0); return audio_thread_post_message(thread, &msg.header); } int audio_thread_dump_thread_info(struct audio_thread *thread, struct audio_debug_info *info) { struct audio_thread_dump_debug_info_msg msg; init_dump_debug_info_msg(&msg, info); return audio_thread_post_message(thread, &msg.header); } int audio_thread_set_aec_dump(struct audio_thread *thread, cras_stream_id_t stream_id, unsigned int start, int fd) { struct audio_thread_aec_dump_msg msg; memset(&msg, 0, sizeof(msg)); msg.header.id = AUDIO_THREAD_AEC_DUMP; msg.header.length = sizeof(msg); msg.stream_id = stream_id; msg.start = start; msg.fd = fd; return audio_thread_post_message(thread, &msg.header); } int audio_thread_rm_callback_sync(struct audio_thread *thread, int fd) { struct audio_thread_rm_callback_msg msg; memset(&msg, 0, sizeof(msg)); msg.header.id = AUDIO_THREAD_REMOVE_CALLBACK; msg.header.length = sizeof(msg); msg.fd = fd; return audio_thread_post_message(thread, &msg.header); } int audio_thread_config_global_remix(struct audio_thread *thread, unsigned int num_channels, const float *coefficient) { int err; int identity_remix = 1; unsigned int i, j; struct audio_thread_config_global_remix msg; void *rsp; init_config_global_remix_msg(&msg); /* Check if the coefficients represent an identity matrix for remix * conversion, which means no remix at all. If so then leave the * converter as NULL. */ for (i = 0; i < num_channels; i++) { if (coefficient[i * num_channels + i] != 1.0f) { identity_remix = 0; break; } for (j = i + 1; j < num_channels; j++) { if (coefficient[i * num_channels + j] != 0 || coefficient[j * num_channels + i] != 0) { identity_remix = 0; break; } } } if (!identity_remix) { msg.fmt_conv = cras_channel_remix_conv_create(num_channels, coefficient); if (NULL == msg.fmt_conv) return -ENOMEM; } err = write(thread->to_thread_fds[1], &msg, msg.header.length); if (err < 0) { syslog(LOG_ERR, "Failed to post message to thread."); return err; } /* Synchronous action, wait for response. */ err = read_until_finished(thread->to_main_fds[0], &rsp, sizeof(rsp)); if (err < 0) { syslog(LOG_ERR, "Failed to read reply from thread."); return err; } if (rsp) cras_fmt_conv_destroy((struct cras_fmt_conv **)&rsp); return 0; } struct audio_thread *audio_thread_create() { int rc; struct audio_thread *thread; thread = (struct audio_thread *)calloc(1, sizeof(*thread)); if (!thread) return NULL; thread->to_thread_fds[0] = -1; thread->to_thread_fds[1] = -1; thread->to_main_fds[0] = -1; thread->to_main_fds[1] = -1; /* Two way pipes for communication with the device's audio thread. */ rc = pipe(thread->to_thread_fds); if (rc < 0) { syslog(LOG_ERR, "Failed to pipe"); free(thread); return NULL; } rc = pipe(thread->to_main_fds); if (rc < 0) { syslog(LOG_ERR, "Failed to pipe"); free(thread); return NULL; } atlog = audio_thread_event_log_init(); thread->pollfds_size = 32; thread->pollfds = (struct pollfd *)malloc(sizeof(*thread->pollfds) * thread->pollfds_size); return thread; } int audio_thread_add_open_dev(struct audio_thread *thread, struct cras_iodev *dev) { struct audio_thread_open_device_msg msg; assert(thread && dev); if (!thread->started) return -EINVAL; init_open_device_msg(&msg, AUDIO_THREAD_ADD_OPEN_DEV, dev); return audio_thread_post_message(thread, &msg.header); } int audio_thread_rm_open_dev(struct audio_thread *thread, struct cras_iodev *dev) { struct audio_thread_open_device_msg msg; assert(thread && dev); if (!thread->started) return -EINVAL; init_open_device_msg(&msg, AUDIO_THREAD_RM_OPEN_DEV, dev); return audio_thread_post_message(thread, &msg.header); } int audio_thread_is_dev_open(struct audio_thread *thread, struct cras_iodev *dev) { struct audio_thread_open_device_msg msg; if (!dev) return 0; init_open_device_msg(&msg, AUDIO_THREAD_IS_DEV_OPEN, dev); return audio_thread_post_message(thread, &msg.header); } int audio_thread_dev_start_ramp(struct audio_thread *thread, struct cras_iodev *dev, enum CRAS_IODEV_RAMP_REQUEST request) { struct audio_thread_dev_start_ramp_msg msg; assert(thread && dev); if (!thread->started) return -EINVAL; init_device_start_ramp_msg(&msg, AUDIO_THREAD_DEV_START_RAMP, dev, request); return audio_thread_post_message(thread, &msg.header); } int audio_thread_start(struct audio_thread *thread) { int rc; rc = pthread_create(&thread->tid, NULL, audio_io_thread, thread); if (rc) { syslog(LOG_ERR, "Failed pthread_create"); return rc; } thread->started = 1; return 0; } void audio_thread_destroy(struct audio_thread *thread) { if (thread->started) { struct audio_thread_msg msg; msg.id = AUDIO_THREAD_STOP; msg.length = sizeof(msg); audio_thread_post_message(thread, &msg); pthread_join(thread->tid, NULL); } free(thread->pollfds); audio_thread_event_log_deinit(atlog); if (thread->to_thread_fds[0] != -1) { close(thread->to_thread_fds[0]); close(thread->to_thread_fds[1]); } if (thread->to_main_fds[0] != -1) { close(thread->to_main_fds[0]); close(thread->to_main_fds[1]); } if (thread->remix_converter) cras_fmt_conv_destroy(&thread->remix_converter); free(thread); }