From: Simon Ruderich Date: Sun, 6 Mar 2011 01:39:05 +0000 (+0100) Subject: src/tlsproxy.c: Add thread support. X-Git-Tag: 0.1~35 X-Git-Url: https://ruderich.org/simon/gitweb/?p=tlsproxy%2Ftlsproxy.git;a=commitdiff_plain;h=b5aeb6f8ea2147f64be9a8ce750917aed4bf7cef src/tlsproxy.c: Add thread support. By default 10 threads are spawned. --- diff --git a/configure.ac b/configure.ac index bf75ebb..e52a12e 100644 --- a/configure.ac +++ b/configure.ac @@ -12,6 +12,11 @@ if test "x$GCC" = xyes; then CFLAGS="-std=c89 -pedantic -Wall -Wextra -Werror -D_XOPEN_SOURCE=500 $CFLAGS" fi +AC_CHECK_LIB([pthread], [pthread_create]) +if test "x$ac_cv_lib_pthread_pthread_create" != xyes; then + AC_MSG_ERROR([pthread is required]) +fi + AC_CHECK_LIB([gnutls], [gnutls_certificate_verify_peers2]) if test "x$ac_cv_lib_gnutls_gnutls_certificate_verify_peers2" != xyes; then AC_MSG_ERROR([GnuTLS is required]) diff --git a/src/Makefile.am b/src/Makefile.am index 677f369..f7fd084 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,2 +1,2 @@ bin_PROGRAMS = tlsproxy -tlsproxy_SOURCES = tlsproxy.c +tlsproxy_SOURCES = tlsproxy.c sem.h sem.c diff --git a/src/sem.c b/src/sem.c new file mode 100644 index 0000000..86fe398 --- /dev/null +++ b/src/sem.c @@ -0,0 +1,84 @@ +/* + * Simple semaphore implementation, P() and V(). + * + * Copyright (C) 2011 Simon Ruderich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include + +#include "sem.h" + + +struct SEM { + pthread_mutex_t mutex; + pthread_cond_t condition; + int value; +}; + +SEM *sem_init(int init_value) { + SEM *sem = (SEM *)malloc(sizeof(SEM)); + if (NULL == sem) { + return NULL; + } + + if (0 != pthread_mutex_init(&sem->mutex, NULL)) { + free(sem); + return NULL; + } + if (0 != pthread_cond_init(&sem->condition, NULL)) { + pthread_mutex_destroy(&sem->mutex); + free(sem); + return NULL; + } + + sem->value = init_value; + return sem; +} + +int sem_del(SEM *sem) { + if (NULL == sem) { + return 0; + } + + if (0 != pthread_mutex_destroy(&sem->mutex)) { + free(sem); + return -1; + } + if (0 != pthread_cond_destroy(&sem->condition)) { + free(sem); + return -1; + } + + free(sem); + return 0; +} + +void P(SEM *sem) { + pthread_mutex_lock(&sem->mutex); + while (0 == sem->value) { + pthread_cond_wait(&sem->condition, &sem->mutex); + } + sem->value--; + pthread_mutex_unlock(&sem->mutex); +} + +void V(SEM *sem) { + pthread_mutex_lock(&sem->mutex); + sem->value++; + pthread_cond_broadcast(&sem->condition); + pthread_mutex_unlock(&sem->mutex); +} diff --git a/src/sem.h b/src/sem.h new file mode 100644 index 0000000..6108d22 --- /dev/null +++ b/src/sem.h @@ -0,0 +1,31 @@ +/* + * Simple semaphore implementation, P() and V(). + * + * Copyright (C) 2011 Simon Ruderich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef SEM_H +#define SEM_H + +typedef struct SEM SEM; + +SEM *sem_init(int init_value); +int sem_del(SEM *sem); + +void P(SEM *sem); +void V(SEM *sem); + +#endif diff --git a/src/tlsproxy.c b/src/tlsproxy.c index 508b09d..6fe9dfa 100644 --- a/src/tlsproxy.c +++ b/src/tlsproxy.c @@ -17,6 +17,8 @@ * along with this program. If not, see . */ +#include + #include #include /* socket(), bind(), accept(), listen() */ @@ -34,19 +36,34 @@ #include /* poll() */ #include +/* errno */ +#include +/* pthread_*() */ +#include -#include - +#include "sem.h" /* Maximum line of the request line. Longer request lines are aborted with an * error. The standard doesn't specify a maximum line length but this should * be a good limit to make processing simpler. */ #define MAX_REQUEST_LINE 4096 +/* Size of ringbuffer. */ +#define RINGBUFFER_SIZE 10 + /* Server should shut down. Set by SIGINT handler. */ static volatile int done; +/* Number of threads. */ +static size_t thread_count; +/* Synchronized ring buffer storing accept()ed client sockets. */ +static int ringbuffer[RINGBUFFER_SIZE]; +static int ringbuffer_read; +static int ringbuffer_write; +static SEM *ringbuffer_full; /* At least one element in the buffer? */ +static SEM *ringbuffer_free; /* Space for another element in the buffer? */ +static SEM *ringbuffer_lock; /* Read lock. */ /* Proxy hostname and port if specified on the command line. */ static char *use_proxy_host; static char *use_proxy_port; @@ -57,6 +74,7 @@ static void sigint_handler(int signal); static void parse_arguments(int argc, char **argv); static void print_usage(const char *argv); +static void worker_thread(void); static void handle_connection(int socket); static int read_http_request(FILE *client_fd, char *request, size_t length); static void send_close_bad_request(FILE *client_fd); @@ -76,6 +94,9 @@ int main(int argc, char **argv) { int client_socket, server_socket; struct sockaddr_in6 server_in; + size_t i; + pthread_t *threads; + struct sigaction action; parse_arguments(argc, argv); @@ -94,6 +115,40 @@ int main(int argc, char **argv) { action.sa_flags = 0; sigaction(SIGINT, &action, NULL); + /* Initialize ring buffer. */ + ringbuffer_read = 0; + ringbuffer_write = 0; + ringbuffer_full = sem_init(0); + ringbuffer_free = sem_init(RINGBUFFER_SIZE); + ringbuffer_lock = sem_init(1); + if (NULL == ringbuffer_full + || NULL == ringbuffer_free + || NULL == ringbuffer_lock) { + perror("sem_init()"); + return EXIT_FAILURE; + } + + /* Spawn worker threads to handle requests. */ + threads = (pthread_t *)malloc(thread_count * sizeof(pthread_t)); + if (NULL == threads) { + perror("thread malloc failed"); + return EXIT_FAILURE; + } + for (i = 0; i < thread_count; i++) { + int result; + pthread_t thread; + + result = pthread_create(&thread, NULL, + (void * (*)(void *))&worker_thread, + NULL); + if (0 != result) { + printf("failed to create worker thread: %s\n", strerror(result)); + return EXIT_FAILURE; + } + + threads[i] = thread; + } + server_socket = socket(PF_INET6, SOCK_STREAM, 0); if (-1 == server_socket) { perror("socket()"); @@ -141,11 +196,36 @@ int main(int argc, char **argv) { break; } - handle_connection(client_socket); + /* No lock, we only have one producer! */ + P(ringbuffer_free); + ringbuffer[ringbuffer_write] = client_socket; + ringbuffer_write = (ringbuffer_write + 1) % RINGBUFFER_SIZE; + V(ringbuffer_full); } close(server_socket); + /* Poison all threads and shut them down. */ + for (i = 0; i < thread_count; i++) { + P(ringbuffer_free); + ringbuffer[ringbuffer_write] = -1; /* poison */ + ringbuffer_write = (ringbuffer_write + 1) % RINGBUFFER_SIZE; + V(ringbuffer_full); + } + for (i = 0; i < thread_count; i++) { + errno = pthread_join(threads[i], NULL); + if (0 != errno) { + perror("pthread_join()"); + continue; + } + } + + free(ringbuffer_full); + free(ringbuffer_free); + free(ringbuffer_lock); + + free(threads); + free(use_proxy_host); free(use_proxy_port); @@ -161,7 +241,10 @@ static void sigint_handler(int signal_number) { static void parse_arguments(int argc, char **argv) { int option; - while (-1 != (option = getopt(argc, argv, "p:h?"))) { + /* Default values. */ + thread_count = 10; + + while (-1 != (option = getopt(argc, argv, "p:t:h?"))) { switch (option) { case 'p': { char *position; @@ -193,6 +276,14 @@ static void parse_arguments(int argc, char **argv) { break; } + case 't': { + if (0 >= atoi(optarg)) { + fprintf(stderr, "-t positive number required\n"); + exit(EXIT_FAILURE); + } + thread_count = (size_t)atoi(optarg); + break; + } case 'h': default: /* '?' */ print_usage(argv[0]); @@ -209,6 +300,28 @@ static void print_usage(const char *argv) { fprintf(stderr, "Usage: %s [-p host:port] port\n", argv); fprintf(stderr, "\n"); fprintf(stderr, "-p proxy hostname and port\n"); + fprintf(stderr, "-t number of threads [default: 10]\n"); +} + +static void worker_thread(void) { + int client_socket; + + for (;;) { + /* Get next element from ring buffer. */ + P(ringbuffer_full); + P(ringbuffer_lock); + client_socket = ringbuffer[ringbuffer_read]; + ringbuffer_read = (ringbuffer_read + 1) % RINGBUFFER_SIZE; + V(ringbuffer_lock); + V(ringbuffer_free); + + /* Negative value indicates we should shut down our thread. */ + if (client_socket < 0) { + break; + } + + handle_connection(client_socket); + } } static void handle_connection(int client_socket) {