--- /dev/null
+/*
+ * Simple semaphore implementation, P() and V().
+ *
+ * Copyright (C) 2011 Simon Ruderich
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "sem.h"
+
+
+struct SEM {
+ pthread_mutex_t mutex;
+ pthread_cond_t condition;
+ int value;
+};
+
+SEM *sem_init(int init_value) {
+ SEM *sem = (SEM *)malloc(sizeof(SEM));
+ if (NULL == sem) {
+ return NULL;
+ }
+
+ if (0 != pthread_mutex_init(&sem->mutex, NULL)) {
+ free(sem);
+ return NULL;
+ }
+ if (0 != pthread_cond_init(&sem->condition, NULL)) {
+ pthread_mutex_destroy(&sem->mutex);
+ free(sem);
+ return NULL;
+ }
+
+ sem->value = init_value;
+ return sem;
+}
+
+int sem_del(SEM *sem) {
+ if (NULL == sem) {
+ return 0;
+ }
+
+ if (0 != pthread_mutex_destroy(&sem->mutex)) {
+ free(sem);
+ return -1;
+ }
+ if (0 != pthread_cond_destroy(&sem->condition)) {
+ free(sem);
+ return -1;
+ }
+
+ free(sem);
+ return 0;
+}
+
+void P(SEM *sem) {
+ pthread_mutex_lock(&sem->mutex);
+ while (0 == sem->value) {
+ pthread_cond_wait(&sem->condition, &sem->mutex);
+ }
+ sem->value--;
+ pthread_mutex_unlock(&sem->mutex);
+}
+
+void V(SEM *sem) {
+ pthread_mutex_lock(&sem->mutex);
+ sem->value++;
+ pthread_cond_broadcast(&sem->condition);
+ pthread_mutex_unlock(&sem->mutex);
+}
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <config.h>
+
#include <stdlib.h>
#include <stdio.h>
/* socket(), bind(), accept(), listen() */
#include <signal.h>
/* poll() */
#include <poll.h>
+/* errno */
+#include <errno.h>
+/* pthread_*() */
+#include <pthread.h>
-#include <config.h>
-
+#include "sem.h"
/* Maximum line of the request line. Longer request lines are aborted with an
* error. The standard doesn't specify a maximum line length but this should
* be a good limit to make processing simpler. */
#define MAX_REQUEST_LINE 4096
+/* Size of ringbuffer. */
+#define RINGBUFFER_SIZE 10
+
/* Server should shut down. Set by SIGINT handler. */
static volatile int done;
+/* Number of threads. */
+static size_t thread_count;
+/* Synchronized ring buffer storing accept()ed client sockets. */
+static int ringbuffer[RINGBUFFER_SIZE];
+static int ringbuffer_read;
+static int ringbuffer_write;
+static SEM *ringbuffer_full; /* At least one element in the buffer? */
+static SEM *ringbuffer_free; /* Space for another element in the buffer? */
+static SEM *ringbuffer_lock; /* Read lock. */
/* Proxy hostname and port if specified on the command line. */
static char *use_proxy_host;
static char *use_proxy_port;
static void parse_arguments(int argc, char **argv);
static void print_usage(const char *argv);
+static void worker_thread(void);
static void handle_connection(int socket);
static int read_http_request(FILE *client_fd, char *request, size_t length);
static void send_close_bad_request(FILE *client_fd);
int client_socket, server_socket;
struct sockaddr_in6 server_in;
+ size_t i;
+ pthread_t *threads;
+
struct sigaction action;
parse_arguments(argc, argv);
action.sa_flags = 0;
sigaction(SIGINT, &action, NULL);
+ /* Initialize ring buffer. */
+ ringbuffer_read = 0;
+ ringbuffer_write = 0;
+ ringbuffer_full = sem_init(0);
+ ringbuffer_free = sem_init(RINGBUFFER_SIZE);
+ ringbuffer_lock = sem_init(1);
+ if (NULL == ringbuffer_full
+ || NULL == ringbuffer_free
+ || NULL == ringbuffer_lock) {
+ perror("sem_init()");
+ return EXIT_FAILURE;
+ }
+
+ /* Spawn worker threads to handle requests. */
+ threads = (pthread_t *)malloc(thread_count * sizeof(pthread_t));
+ if (NULL == threads) {
+ perror("thread malloc failed");
+ return EXIT_FAILURE;
+ }
+ for (i = 0; i < thread_count; i++) {
+ int result;
+ pthread_t thread;
+
+ result = pthread_create(&thread, NULL,
+ (void * (*)(void *))&worker_thread,
+ NULL);
+ if (0 != result) {
+ printf("failed to create worker thread: %s\n", strerror(result));
+ return EXIT_FAILURE;
+ }
+
+ threads[i] = thread;
+ }
+
server_socket = socket(PF_INET6, SOCK_STREAM, 0);
if (-1 == server_socket) {
perror("socket()");
break;
}
- handle_connection(client_socket);
+ /* No lock, we only have one producer! */
+ P(ringbuffer_free);
+ ringbuffer[ringbuffer_write] = client_socket;
+ ringbuffer_write = (ringbuffer_write + 1) % RINGBUFFER_SIZE;
+ V(ringbuffer_full);
}
close(server_socket);
+ /* Poison all threads and shut them down. */
+ for (i = 0; i < thread_count; i++) {
+ P(ringbuffer_free);
+ ringbuffer[ringbuffer_write] = -1; /* poison */
+ ringbuffer_write = (ringbuffer_write + 1) % RINGBUFFER_SIZE;
+ V(ringbuffer_full);
+ }
+ for (i = 0; i < thread_count; i++) {
+ errno = pthread_join(threads[i], NULL);
+ if (0 != errno) {
+ perror("pthread_join()");
+ continue;
+ }
+ }
+
+ free(ringbuffer_full);
+ free(ringbuffer_free);
+ free(ringbuffer_lock);
+
+ free(threads);
+
free(use_proxy_host);
free(use_proxy_port);
static void parse_arguments(int argc, char **argv) {
int option;
- while (-1 != (option = getopt(argc, argv, "p:h?"))) {
+ /* Default values. */
+ thread_count = 10;
+
+ while (-1 != (option = getopt(argc, argv, "p:t:h?"))) {
switch (option) {
case 'p': {
char *position;
break;
}
+ case 't': {
+ if (0 >= atoi(optarg)) {
+ fprintf(stderr, "-t positive number required\n");
+ exit(EXIT_FAILURE);
+ }
+ thread_count = (size_t)atoi(optarg);
+ break;
+ }
case 'h':
default: /* '?' */
print_usage(argv[0]);
fprintf(stderr, "Usage: %s [-p host:port] port\n", argv);
fprintf(stderr, "\n");
fprintf(stderr, "-p proxy hostname and port\n");
+ fprintf(stderr, "-t number of threads [default: 10]\n");
+}
+
+static void worker_thread(void) {
+ int client_socket;
+
+ for (;;) {
+ /* Get next element from ring buffer. */
+ P(ringbuffer_full);
+ P(ringbuffer_lock);
+ client_socket = ringbuffer[ringbuffer_read];
+ ringbuffer_read = (ringbuffer_read + 1) % RINGBUFFER_SIZE;
+ V(ringbuffer_lock);
+ V(ringbuffer_free);
+
+ /* Negative value indicates we should shut down our thread. */
+ if (client_socket < 0) {
+ break;
+ }
+
+ handle_connection(client_socket);
+ }
}
static void handle_connection(int client_socket) {