/* * simoop.c * * Copyright (C) 2016 Facebook * Chris Mason * * GPLv2, portions copied from the kernel and from Jens Axboe's fio */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "xxhash.h" /* these are part of the histogram accounting */ #define PLAT_BITS 8 #define PLAT_VAL (1 << PLAT_BITS) #define PLAT_GROUP_NR 19 #define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL) #define PLAT_LIST_MAX 20 /* how deep a directory chain to make */ #define DIR_LEVEL 64 /* buffer size for reads and writes during filler */ #define BUF_SIZE (1 * 1024 * 1024) #define NAME_LEN 256 #define FILES_SPLIT 8 #ifndef O_DIRECT # define O_DIRECT 00040000 #endif /* * we make a few different kinds of files, these are appended onto the * file name to separate them */ #define DATA_FILE NULL #define RESULT_FILE "extra" #define TMP_FILE "tmp" #define FILL_FILE "fill" /* each path in the paths array gets a thread pool hammering on it. */ char **paths; int total_paths = 0; /* -t number of workers thread */ static int worker_threads = 16; /* -r seconds */ static unsigned long runtime = 30; /* -c usec */ static unsigned long long cputime = 0; /* -f size of the files we create */ static unsigned long long file_size = 64 * 1024 * 1024; /* -n number of files we create */ static unsigned long num_files = 65536; /* -R read size */ static unsigned long read_size = 2 * 1024 * 1024; /* -W write size */ static unsigned long write_size = 2 * 1024 * 1024; /* -D number of threads running du */ static int du_threads = 0; /* memory to allocate and use during each task */ static unsigned long thinking_mem = 128 * 1024 * 1024UL; /* should we fsync sometimes? */ static int funksync = 0; static int writethrough = 0; /* should we just exit after filling? */ static int fill_only = 0; /* are we just appending bytes onto the ends of the working set files */ static int append_mode = 0; static int truncate_original = 0; /* dont do any crc checks -I */ static int ignore_crcs = 0; /* randomize the write size */ static int oddsizes = 0; /* use odirect sometimes */ static int odirect = 0; /* check contents at startup */ static int check_initial_files = 0; /* verify files immediately after writing */ static int verify_writes = 0; /* -M how much memory we allocate to benchmark allocations */ static unsigned long mmap_size = 0; /* these do nothing but spin */ static int cpu_threads = 0; /* how long we sleep while processing requests */ static int sleeptime = 0; /* should we use fallocate instead of writing initial file contents */ static int zallocate = 0; /* should we delete the .tmp and .results files? */ static int nocleanup = 0; /* take extra steps to be a jerk to the filesystem */ static int unaligned_stress = 0; static uint64_t global_rand_seed = 0x89ABCEF; /* * after warmup_seconds, we reset the counters to get rid of noise from * early in the run */ static unsigned long warmup_seconds = 60; /* reporting interval */ static unsigned long interval_seconds = 120; /* the master thread flips this to true when runtime is up */ static volatile unsigned long stopping = 0; static volatile unsigned long warmup_done = 0; static double filler_completion_time = 0; /* * one stat struct per thread data, when the workers sleep this records the * latency between when they are woken up and when they actually get the * CPU again. The message threads sum up the stats of all the workers and * then bubble them up to main() for printing */ struct stats { unsigned int plat[PLAT_NR]; unsigned int nr_samples; unsigned int max; unsigned int min; }; /* this defines which latency profiles get printed */ #define PLIST_P99 2 #define PLIST_P95 1 #define PLIST_P50 0 static double plist[PLAT_LIST_MAX] = { 50.0, 95.0, 99.0, }; enum { HELP_LONG_OPT = 1, }; /* this enum needs to match up with the labels array below */ enum { READ_STATS = 0, WRITE_STATS, ALLOC_STATS, TOTAL_STATS, }; char *stat_labels[] = { "Read latency", "Write latency", "Allocation latency", NULL, }; /* match this with vmstat_labels */ enum { ALLOCSTALLS, VMSCAN_NR_WRITE, TOTAL_VMSTATS, }; char *vmstat_labels[] = { "allocstall", "nr_vmscan_write", NULL, }; struct vmstat_info { double instant_rate[TOTAL_VMSTATS]; double last_rate[TOTAL_VMSTATS]; double rate[TOTAL_VMSTATS]; struct stats stats[TOTAL_VMSTATS]; }; #define VERIFY_MAGIC "simoopv1" #define VERIFY_MAGIC_LEN 8 /* our verify blocks are pretty small, which allows us to sub-page writes */ #define VERIFY_ALIGNMENT ((loff_t)512) char pattern_buffer[VERIFY_ALIGNMENT]; struct verify_header { uint64_t crc; /* how many bytes did we crc */ uint64_t length; /* starting offset of this block in the file */ uint64_t offset; /* seed for recreating the random data */ uint64_t rand_seed; uint64_t inode; uint64_t spare[1]; /* VERIFY_MAGIC above (zero filled first) */ char magic[VERIFY_MAGIC_LEN]; }; void *verify_crc_start(struct verify_header *verify_header) { return &verify_header->length; } unsigned long verify_crc_offset(void) { struct verify_header *verify_header = NULL; return (unsigned long)(&verify_header->length); } static uint64_t __calc_crc(void *xxhash_state, struct verify_header *verify_header) { int ret; if (ignore_crcs) return 0; ret = XXH32_resetState(xxhash_state, verify_header->rand_seed); if (ret) { fprintf(stderr, "error %d during XXH32_resetState\n", ret); exit(1); } ret = XXH32_update(xxhash_state, verify_crc_start(verify_header), verify_header->length - verify_crc_offset()); if (ret) { fprintf(stderr, "error %d from XXH32_update\n", ret); exit(1); } return XXH32_intermediateDigest(xxhash_state); } static void init_pattern_buffer(char *buffer, uint64_t seed) { char pattern[128]; int i; int this_copy; int length = VERIFY_ALIGNMENT; char *p = buffer; unsigned int randr_seed = seed; pattern[0] = rand_r(&randr_seed); for (i = 1; i < 128; i++) { pattern[i] = rand_r(&randr_seed); } while(length > 0) { if (length > 128) this_copy = 128; else this_copy = length; memcpy(p, pattern, this_copy); p += this_copy; length -= this_copy; } } static void crc_block(void *xxhash_state, char *block, uint64_t offset, uint64_t length, uint64_t rand_seed, uint64_t ino) { struct verify_header *verify_header = (struct verify_header *)block; if (length < sizeof(*verify_header)) { fprintf(stderr, "blocksize too small for crc header\n"); exit(1); } memset(verify_header, 0, sizeof(*verify_header)); verify_header->length = length; verify_header->offset = offset; verify_header->rand_seed = rand_seed; verify_header->inode = ino; memcpy(verify_header->magic, VERIFY_MAGIC, VERIFY_MAGIC_LEN); verify_header->crc = __calc_crc(xxhash_state, verify_header); } static loff_t verify_align(loff_t value) { return (value / VERIFY_ALIGNMENT) * VERIFY_ALIGNMENT; } static loff_t verify_align_up(loff_t value) { value += VERIFY_ALIGNMENT - 1; return verify_align(value); } static void dump_bad_block(char *block, uint64_t offset) { struct verify_header *verify_header = (struct verify_header *)block; uint64_t seed = verify_header->rand_seed; char *tmp = malloc(VERIFY_ALIGNMENT); int i; if (!tmp) { fprintf(stderr, "malloc failed\n"); exit(1); } init_pattern_buffer(tmp, seed); memcpy(tmp, verify_header, sizeof(*verify_header)); for (i = 0; i < VERIFY_ALIGNMENT; i++) { if (tmp[i] != block[i]) { fprintf(stderr, "bad_block offset %lu (index %d) found 0x%x wanted 0x%x\n", offset + i, i, block[i], tmp[i]); break; } } free(tmp); } static void check_block_headers(void *xxhash_state, char *filename, char *block, uint64_t offset, uint64_t length, uint64_t ino) { struct verify_header *verify_header = (struct verify_header *)block; uint64_t crc; int i; int failed = 0; if (ignore_crcs) return; if (verify_header->inode != ino) { fprintf(stderr, "bad buffer inode file %s inode %lu found %lu offset %lu\n", filename, ino, verify_header->inode, offset); failed++; } if (memcmp(verify_header->magic, VERIFY_MAGIC, VERIFY_MAGIC_LEN)) { fprintf(stderr, "bad magic file %s offset %lu\n", filename, offset); fprintf(stderr, "found: "); for (i = 0; i < VERIFY_MAGIC_LEN; i++) { fprintf(stderr, "[%x %x] ", verify_header->magic[i], VERIFY_MAGIC[i]); } fprintf(stderr, "\n"); failed++; } if (verify_header->offset != offset) { fprintf(stderr, "bad offset file %s offset %lu found %lu\n", filename, offset, verify_header->offset); failed++; } if (verify_header->length != length) { fprintf(stderr, "bad buffer length file %s length %lu found %lu offset %lu\n", filename, length, verify_header->length, offset); failed++; } crc = __calc_crc(xxhash_state, verify_header); if (crc != verify_header->crc) { fprintf(stderr, "bad crc file %s crc %lx found %lx\n", filename, crc, verify_header->crc); dump_bad_block(block, offset); failed++; } if (failed) { exit(1); } } /* * A not-so-good version fls64. No fascinating optimization since * no one except parse_size use it */ static int fls64(unsigned long long x) { int i; for (i = 0; i <64; i++) if (x << i & (1ULL << 63)) return 64 - i; return 64 - i; } unsigned long long parse_size(char *s) { char c; char *endptr; unsigned long long mult = 1; unsigned long long ret; if (!s) { fprintf(stderr, "size value is empty\n"); exit(1); } if (s[0] == '-') { fprintf(stderr, "size value '%s' is less equal than 0\n", s); exit(1); } ret = strtoull(s, &endptr, 10); if (endptr == s) { fprintf(stderr, "size value '%s' is invalid\n", s); exit(1); } if (endptr[0] && endptr[1]) { fprintf(stderr, "illegal suffix contains character '%c' in wrong position\n", endptr[1]); exit(1); } /* * strtoll returns LLONG_MAX when overflow, if this happens, * need to call strtoull to get the real size */ if (errno == ERANGE && ret == ULLONG_MAX) { fprintf(stderr, "size value '%s' is too large for unsigned long long", s); exit(1); } if (endptr[0]) { c = tolower(endptr[0]); switch (c) { case 'e': mult *= 1024; /* fallthrough */ case 'p': mult *= 1024; /* fallthrough */ case 't': mult *= 1024; /* fallthrough */ case 'g': mult *= 1024; /* fallthrough */ case 'm': mult *= 1024; /* fallthrough */ case 'k': mult *= 1024; /* fallthrough */ case 'b': break; default: fprintf(stderr, "unknown size descriptor '%c'", c); exit(1); } } /* Check whether ret * mult overflow */ if (fls64(ret) + fls64(mult) - 1 > 64) { fprintf(stderr, "size value '%s' is too large for unsigned long long\n", s); exit(1); } ret *= mult; return ret; } char *option_string = "t:s:C:c:r:n:f:FR:T:m:W:M:w:i:D:oaOVzNIevUuE"; static struct option long_options[] = { {"appendmode", required_argument, 0, 'a'}, {"mmapsize", required_argument, 0, 'M'}, {"filesize", required_argument, 0, 'f'}, {"numfiles", required_argument, 0, 'n'}, {"readsize", required_argument, 0, 'R'}, {"writesize", required_argument, 0, 'W'}, {"readthreads", required_argument, 0, 'T'}, {"duthreads", required_argument, 0, 'D'}, {"threads", required_argument, 0, 't'}, {"runtime", required_argument, 0, 'r'}, {"warmuptime", required_argument, 0, 'w'}, {"sleeptime", required_argument, 0, 's'}, {"interval", required_argument, 0, 'i'}, {"ignorecrcs", required_argument, 0, 'I'}, {"cputime", required_argument, 0, 'c'}, {"cputhreads", required_argument, 0, 'C'}, {"memory", required_argument, 0, 'm'}, {"funksync", no_argument, 0, 'F'}, {"oddsizes", no_argument, 0, 'o'}, {"odirect", no_argument, 0, 'O'}, {"verify-writes", no_argument, 0, 'v'}, {"verify-startup", no_argument, 0, 'V'}, {"unalignedstress", no_argument, 0, 'u'}, {"zallocate", no_argument, 0, 'z'}, {"nocleanup", no_argument, 0, 'N'}, {"erase", no_argument, 0, 'e'}, {"fillonly", no_argument, 0, 'E'}, {"help", no_argument, 0, HELP_LONG_OPT}, {0, 0, 0, 0} }; static void print_usage(void) { fprintf(stderr, "simoop usage:\n" "\t-a (--appendmode): append onto working files\n" "\t-t (--threads): worker threads (def: 16)\n" "\t-m (--memory): memory to allocate during think time in each worker (def 0)\n" "\t-M (--mmapsize): amount to mmap to time allocator (def 0)\n" "\t-r (--runtime): How long to run before exiting (seconds, def: 30)\n" "\t-w (--warmuptime): How long to warmup before resetting the stats (seconds, def: 60)\n" "\t-i (--interval): Sleep time in seconds between latency reports (sec, def: 120\n" "\t-I (--ignorecrcs): don't verify crcs\n" "\t-v (--verify-writes): immediately verify files written (def: no)\n" "\t-V (--verify-startup): Verify all files on startup (def: no)\n" "\t-s (--sleeptime): Sleep time in usecs between worker loops (usec, def: 0\n" "\t-c (--cputime): How long to think during each worker loop (seconds, def: 0)\n" "\t-C (--cputhreads): How many threads do the cpu time loop (0)\n" "\t-n (--numfiles): Number of files per directory tree (65536)\n" "\t-f (--filesize): Size of each file (64M)\n" "\t-R (--readsize): amount to read from each file (2M)\n" "\t-W (--writesize): amount to write to tmp files (2M)\n" "\t-D (--duthraeds): how many threads to scanning the working dirs (0)\n" "\t-F (--funksync): fsync sometimes\n" "\t-o (--oddsizes): randomize sizes to unaligned values\n" "\t-O (--odirect): use O_DIRECT sometimes\n" "\t-z (--zallocate): use fallocate for initial file creation\n" "\t-N (--nocleanup): don't cleanup temp files from the last run\n" "\t-e (--erase): delete the data files at the start of the run\n" "\t-E (--fillonly): exit after filling\n" "\t-U (--writethrough): sync_file_range every write\n" "\t-u (--unalignedstress): take extra steps to be mean to the FS\n" "\t dir1 [dir2 ... dirN]\n" "\nall sizes are in bytes k,m,g,t modifiers can be used\n" ); exit(1); } static void parse_options(int ac, char **av) { int c; int found_sleeptime = -1; int found_cputime = -1; int i; while (1) { int option_index = 0; c = getopt_long(ac, av, option_string, long_options, &option_index); if (c == -1) break; switch(c) { case 'a': append_mode = 1; break; case 'e': truncate_original = 1; break; case 'E': fill_only = 1; break; case 's': found_sleeptime = atoi(optarg); break; case 'm': thinking_mem = parse_size(optarg); fprintf(stderr, "thinking_mem %lu\n", thinking_mem); break; case 'M': mmap_size = parse_size(optarg); break; case 'c': found_cputime = atoi(optarg); break; case 'C': cpu_threads = atoi(optarg); break; case 't': worker_threads = atoi(optarg); break; case 'r': runtime = atoi(optarg); break; case 'w': warmup_seconds = atoi(optarg); break; case 'i': interval_seconds = atoi(optarg); break; case 'I': ignore_crcs = 1; break; case 'F': funksync = 1; break; case 'f': file_size = parse_size(optarg); file_size = verify_align_up(file_size); break; case 'n': num_files = atoi(optarg); num_files = ((num_files + FILES_SPLIT - 1)/ FILES_SPLIT) * FILES_SPLIT; fprintf(stderr, "Creating %ld files\n", num_files); break; case 'R': read_size = parse_size(optarg); read_size = verify_align_up(read_size); break; case 'W': write_size = parse_size(optarg); write_size = verify_align_up(write_size); break; case 'D': du_threads = atoi(optarg); break; case 'o': oddsizes = 1; break; case 'O': odirect = 1; break; case 'U': writethrough = 1; break; case 'u': unaligned_stress = 1; break; case 'v': verify_writes = 1; break; case 'V': check_initial_files = 1; break; case 'z': zallocate = 1; break; case '?': case HELP_LONG_OPT: print_usage(); break; default: break; } } total_paths = ac - optind; if (total_paths <= 0) { fprintf(stderr, "No directories specified\n"); print_usage(); exit(1); } paths = malloc(sizeof(char *) * (total_paths + 1)); paths[total_paths] = NULL; for (i = 0; i < total_paths; i++) { paths[i] = strdup(av[optind++]); fprintf(stderr, "adding path %s\n", paths[i]); } /* * by default pipe mode zeros out cputime and sleep time. This * sets them to any args that were actually passed in */ if (found_sleeptime >= 0) sleeptime = found_sleeptime; if (found_cputime >= 0) cputime = found_cputime * 1000000; if (cputime == 0 || cpu_threads == 0) { cputime = 0; cpu_threads = 0; if (!found_sleeptime) sleeptime = 0; } if (optind < ac) { fprintf(stderr, "Error Extra arguments '%s'\n", av[optind]); exit(1); } } void tvsub(struct timeval * tdiff, struct timeval * t1, struct timeval * t0) { tdiff->tv_sec = t1->tv_sec - t0->tv_sec; tdiff->tv_usec = t1->tv_usec - t0->tv_usec; if (tdiff->tv_usec < 0 && tdiff->tv_sec > 0) { tdiff->tv_sec--; tdiff->tv_usec += 1000000; if (tdiff->tv_usec < 0) { fprintf(stderr, "lat_fs: tvsub shows test time ran backwards!\n"); exit(1); } } /* time shouldn't go backwards!!! */ if (tdiff->tv_usec < 0 || t1->tv_sec < t0->tv_sec) { tdiff->tv_sec = 0; tdiff->tv_usec = 0; } } /* * returns the difference between start and stop in usecs. Negative values * are turned into 0 */ unsigned long long tvdelta(struct timeval *start, struct timeval *stop) { struct timeval td; unsigned long long usecs; tvsub(&td, stop, start); usecs = td.tv_sec; usecs *= 1000000; usecs += td.tv_usec; return (usecs); } /* mr axboe's magic latency histogram */ static unsigned int plat_val_to_idx(unsigned int val) { unsigned int msb, error_bits, base, offset; /* Find MSB starting from bit 0 */ if (val == 0) msb = 0; else msb = sizeof(val)*8 - __builtin_clz(val) - 1; /* * MSB <= (PLAT_BITS-1), cannot be rounded off. Use * all bits of the sample as index */ if (msb <= PLAT_BITS) return val; /* Compute the number of error bits to discard*/ error_bits = msb - PLAT_BITS; /* Compute the number of buckets before the group */ base = (error_bits + 1) << PLAT_BITS; /* * Discard the error bits and apply the mask to find the * index for the buckets in the group */ offset = (PLAT_VAL - 1) & (val >> error_bits); /* Make sure the index does not exceed (array size - 1) */ return (base + offset) < (PLAT_NR - 1) ? (base + offset) : (PLAT_NR - 1); } /* * Convert the given index of the bucket array to the value * represented by the bucket */ static unsigned int plat_idx_to_val(unsigned int idx) { unsigned int error_bits, k, base; if (idx >= PLAT_NR) { fprintf(stderr, "idx %u is too large\n", idx); exit(1); } /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use * all bits of the sample as index */ if (idx < (PLAT_VAL << 1)) return idx; /* Find the group and compute the minimum value of that group */ error_bits = (idx >> PLAT_BITS) - 1; base = 1 << (error_bits + PLAT_BITS); /* Find its bucket number of the group */ k = idx % PLAT_VAL; /* Return the mean of the range of the bucket */ return base + ((k + 0.5) * (1 << error_bits)); } static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr, unsigned int **output) { unsigned long sum = 0; unsigned int len, i, j = 0; unsigned int oval_len = 0; unsigned int *ovals = NULL; int is_last; len = 0; while (len < PLAT_LIST_MAX && plist[len] != 0.0) len++; if (!len) return 0; /* * Calculate bucket values, note down max and min values */ is_last = 0; for (i = 0; i < PLAT_NR && !is_last; i++) { sum += io_u_plat[i]; while (sum >= (plist[j] / 100.0 * nr)) { if (j == oval_len) { oval_len += 100; ovals = realloc(ovals, oval_len * sizeof(unsigned int)); } ovals[j] = plat_idx_to_val(i); is_last = (j == len - 1); if (is_last) break; j++; } } *output = ovals; return len; } static void calc_p99(struct stats *s, unsigned int *p50, unsigned int *p95, unsigned int *p99) { unsigned int *ovals = NULL; int len; *p50 = 0; *p95 = 0; *p99 = 0; len = calc_percentiles(s->plat, s->nr_samples, &ovals); if (len && len > PLIST_P99) *p99 = ovals[PLIST_P99]; if (len && len > PLIST_P99) *p95 = ovals[PLIST_P95]; if (len && len > PLIST_P50) *p50 = ovals[PLIST_P50]; if (ovals) free(ovals); } /* fold latency info from s into d */ void combine_stats(struct stats *d, struct stats *s) { int i; for (i = 0; i < PLAT_NR; i++) d->plat[i] += s->plat[i]; d->nr_samples += s->nr_samples; if (s->max > d->max) d->max = s->max; if (s->min < d->min) d->min = s->min; } /* record a latency result into the histogram */ static void add_lat(struct stats *s, unsigned int us) { int lat_index = 0; if (us > s->max) s->max = us; if (us < s->min) s->min = us; lat_index = plat_val_to_idx(us); __sync_fetch_and_add(&s->plat[lat_index], 1); __sync_fetch_and_add(&s->nr_samples, 1); } /* * every thread has one of these, it comes out to about 19K thanks to the * giant stats struct */ struct thread_data { pthread_t tid; /* per-thread count of worker loops over the life of the run */ unsigned long long work_done; char *read_buf; char *write_buf; /* latency histogram */ struct stats stats[TOTAL_STATS]; }; #define nop __asm__ __volatile__("rep;nop": : :"memory") static void usec_spin(unsigned long spin_time) { struct timeval now; struct timeval start; unsigned long long delta; if (spin_time == 0) return; gettimeofday(&start, NULL); while (1) { gettimeofday(&now, NULL); delta = tvdelta(&start, &now); if (delta > spin_time) return; nop; } } /* * runs during initial file creation to create one dir * in the tree */ static void make_one_dir(char *path, int a, int b) { char subdir[NAME_LEN]; int ret; if (b >= 0) ret = snprintf(subdir, NAME_LEN, "%s/%d/%d", path, a, b); else ret = snprintf(subdir, NAME_LEN, "%s/%d", path, a); if (ret >= NAME_LEN || ret < 0) { perror("file name too long\n"); exit(1); } ret = mkdir(subdir, 0700); if (ret && errno != EEXIST) { perror("mkdir"); exit(1); } } /* create the subdir tree (no files) */ static void make_dirs(char *path) { int first; int second; for (first = 0; first < 64; first++) { make_one_dir(path, first, -1); for (second = 0; second < 64; second++) { make_one_dir(path, first, second); } } } /* * helper to form pathnames, if postfix isn't NULL, it'll be tossed * onto the end of the filename */ static void join_path(char *name, char *path, int seq, char *postfix) { int a; int b; int ret; a = seq % DIR_LEVEL; b = (seq / DIR_LEVEL) % DIR_LEVEL; if (postfix) ret = snprintf(name, NAME_LEN, "%s/%d/%d/%d-%s", path, a, b, seq, postfix); else ret = snprintf(name, NAME_LEN, "%s/%d/%d/%d", path, a, b, seq); if (ret >= NAME_LEN || ret < 0) { perror("file name too long\n"); exit(1); } } static void read_whole_file(char *path, int seq, char *postfix, char *buf, size_t buf_size); static void read_whole_fd(int fd, char *name, char *buf, size_t buf_size); /* unlink working files not part of the main dataset for a given filename. */ static void unlink_extra(char *path, int seq, char *buf, size_t buf_size) { char name[NAME_LEN]; int ret; if (nocleanup) return; join_path(name, path, seq, RESULT_FILE); if (check_initial_files) read_whole_file(path, seq, RESULT_FILE, buf, buf_size); ret = unlink(name); if (ret < 0 && errno != ENOENT) { perror("unlink"); exit(1); } join_path(name, path, seq, TMP_FILE); if (check_initial_files) read_whole_file(path, seq, TMP_FILE, buf, buf_size); ret = unlink(name); if (ret < 0 && errno != ENOENT) { perror("unlink"); exit(1); } } /* construct a filename and return the fd */ static int open_path(char *path, int seq, char *postfix, int flags) { int fd; char name[NAME_LEN]; join_path(name, path, seq, postfix); fd = open(name, O_RDWR | O_CREAT | flags, 0600); if (fd < 0) { if (errno == EEXIST) return -EEXIST; perror("open"); exit(1); } return fd; } static loff_t randomize_size(loff_t sz) { loff_t val; if (!oddsizes) return sz; val = rand() % sz; if (val == 0) val = sz; return val; } static void maybe_fsync(int fd) { int ret; if (!funksync) return; ret = sync_file_range(fd, 0, 0, SYNC_FILE_RANGE_WRITE); if (ret < 0) { perror("sync_file_range"); exit(1); } if ((rand() % 5) != 0) return; ret = fsync(fd); if (ret < 0) { perror("fsync"); exit(1); } } static void maybe_toggle_odirect(int fd, unsigned long start, unsigned long len) { int flags; int ret; if (!odirect) return; flags = fcntl(fd, F_GETFL); /* * if we're doing an unaligned IO, turn off O_DIRECT and * exit */ if ((start & 511) || (len & 511)) { if (flags & O_DIRECT) { ret = fcntl(fd, F_SETFL, flags & (~O_DIRECT)); if (ret) { perror("fcntl"); exit(1); } } return; } if ((rand() % 128) != 0) return; if (flags & O_DIRECT) { ret = fcntl(fd, F_SETFL, flags & (~O_DIRECT)); } else { ret = fcntl(fd, F_SETFL, flags | O_DIRECT); } if (ret) { perror("fcntl"); exit(1); } } static void maybe_write_through(int fd, loff_t start, loff_t bytes) { int ret; if (!writethrough) return; ret = sync_file_range(fd, start, bytes, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER); if (ret) { perror("sync_file_range"); exit(1); } } static void send_pwrite(int fd, char *buf, loff_t start, ssize_t bytes) { ssize_t this_write; int ret; int i; if (unaligned_stress) { for (i = 0; i < 3; i++) { maybe_toggle_odirect(fd, start, this_write); /* * the goal here is to break up our huge IO into * something that isn't completely page aligned. */ this_write = VERIFY_ALIGNMENT; while (this_write > 0) { if (this_write > bytes) break; ret = pwrite(fd, buf, this_write, start); if (ret <= 0) { perror("pwrite"); exit(1); } maybe_write_through(fd, start, this_write); start += ret; this_write -= ret; buf += ret; bytes -= ret; } maybe_fsync(fd); if (bytes <= 0) break; } } while (bytes > 0) { ret = pwrite(fd, buf, bytes, start); if (ret <= 0) { perror("pwrite"); exit(1); } start += ret; bytes -= ret; buf += ret; } maybe_write_through(fd, start, bytes); maybe_fsync(fd); } static void write_pattern(int fd, void *xxhash_state, char *buf, int buffer_len, loff_t start, off_t length, uint64_t ino) { loff_t aligned_start; char *p; ssize_t this_write; ssize_t cur_len; /* round down the start */ aligned_start = verify_align(start); length += start - aligned_start; /* round up the length */ length = verify_align_up(length); while (length > 0) { if (length > buffer_len) this_write = buffer_len; else this_write = length; /* fill the buffer with the pattern and headers */ cur_len = 0; p = buf; while (cur_len < this_write) { memcpy(p, pattern_buffer, VERIFY_ALIGNMENT); crc_block(xxhash_state, p, aligned_start + cur_len, VERIFY_ALIGNMENT, global_rand_seed, ino); cur_len += VERIFY_ALIGNMENT; p += VERIFY_ALIGNMENT; } send_pwrite(fd, buf, aligned_start, this_write); aligned_start += this_write; length -= this_write; if (stopping) break; } } static void read_and_crc(int fd, char *filename, void **xxhash_state, char *buf, int buffer_len, loff_t start, off_t length, uint64_t ino) { ssize_t ret; loff_t aligned_start; char *p; ssize_t this_read; ssize_t cur_len; if (!read_size) return; aligned_start = verify_align(start); length += start - aligned_start; length = verify_align_up(length); while (length > 0) { if (length > buffer_len) this_read = buffer_len; else this_read = length; maybe_toggle_odirect(fd, aligned_start, this_read); ret = posix_fadvise(fd, aligned_start, this_read, POSIX_FADV_DONTNEED); if (ret) { perror("fadvise"); } ret = pread(fd, buf, this_read, aligned_start); if (ret != this_read) { fprintf(stderr, "bad read length %ld wanted %ld offset %lu file %s\n", ret, this_read, aligned_start, filename); exit(1); } p = buf; cur_len = 0; while (cur_len < this_read) { check_block_headers(xxhash_state, filename, p, aligned_start + cur_len, VERIFY_ALIGNMENT, ino); cur_len += VERIFY_ALIGNMENT; p += VERIFY_ALIGNMENT; } aligned_start += this_read; length -= this_read; if (stopping) break; } } /* helper for startup, do initial writes to a given fd */ static void fill_one_file(int fd, void *xxhash_state, char *buf, size_t buf_size) { struct stat st; int ret; loff_t cur_size; loff_t this_size = randomize_size(file_size); ret = fstat(fd, &st); if (ret < 0) { perror("stat"); exit(1); } cur_size = st.st_size; if (oddsizes && cur_size != 0) return; if (cur_size >= this_size) { ftruncate(fd, this_size); return; } write_pattern(fd, xxhash_state, buf, buf_size, cur_size, this_size - cur_size, st.st_ino); } /* * The du thread runs every so often and stats every single file in a * given path. This puts a lot of stress on the slab caches, and at * least for XFS sets a bunch of radix bits used to track which allocation * groups need to have their inodes cleaned. It creates stress inside * the shrinker. */ static void *du_thread(void *arg) { unsigned long seq; char *path = arg; struct stat st; int fd; int ret; while (!stopping) { fprintf(stderr, "du thread is running %s\n", path); for (seq = 0; seq < num_files; seq++) { fd = open_path(path, seq, DATA_FILE, 0); ret = fstat(fd, &st); if (ret < 0 && errno != ENOENT) { perror("fstat"); exit(1); } close(fd); } fprintf(stderr, "du thread is done %s\n", path); /* * we need some jitter in here so all the du threads are * staggered */ sleep(45 + (rand() % 90)); } return NULL; } static void record_one_lat(struct stats *stat, struct timeval *start, struct timeval *finish) { unsigned long long delta; delta = tvdelta(start, finish); if (delta > 0) add_lat(stat, delta); } /* reads from a random (well aligned) offset in one of the main data files */ static void read_from_file(char *path, int seq, char *buf) { int fd; int ret; off_t offset; ssize_t read_bytes = read_size; struct stat st; void *xxhash_state = XXH32_init(global_rand_seed); char name[NAME_LEN]; join_path(name, path, seq, DATA_FILE); fd = open_path(path, seq, DATA_FILE, 0); ret = fstat(fd, &st); if (ret < 0) { perror("stat"); exit(1); } offset = rand() % 100; offset = (offset * st.st_size) / 100; offset = verify_align(offset); /* we are too big? */ if (offset + read_bytes > st.st_size) offset = 0; if (offset + read_bytes > st.st_size) read_bytes = verify_align(st.st_size); read_and_crc(fd, name, xxhash_state, buf, read_bytes, offset, read_bytes, st.st_ino); close(fd); XXH32_digest(xxhash_state); } static void read_whole_fd(int fd, char *name, char *buf, size_t buf_size) { int ret; off_t offset; ssize_t read_bytes = buf_size; struct stat st; void *xxhash_state; if (read_size == 0) return; ret = fstat(fd, &st); if (ret < 0) return; xxhash_state = XXH32_init(global_rand_seed); offset = 0; read_and_crc(fd, name, xxhash_state, buf, read_bytes, offset, st.st_size, st.st_ino); XXH32_digest(xxhash_state); } static void read_whole_file(char *path, int seq, char *postfix, char *buf, size_t buf_size) { int fd; char name[NAME_LEN]; if (buf_size == 0) return; join_path(name, path, seq, postfix); fd = open(name, O_RDONLY, 0600); if (fd < 0) return; read_whole_fd(fd, name, buf, buf_size); close(fd); } /* creates a temp file in one of the subdirs and sends down write_bytes to it */ static void write_to_file(char *path, int seq, char *buf) { int fd; int ret; int write_bytes = randomize_size(write_size); loff_t offset; void *xxhash_state = XXH32_init(global_rand_seed); char *postfix; struct stat st; char name[NAME_LEN]; if (append_mode) { postfix = DATA_FILE; fd = open_path(path, seq, DATA_FILE, O_CREAT); offset = lseek(fd, 0, SEEK_END); if (offset < 0) { perror("lseek"); exit(1); } join_path(name, path, seq, postfix); } else { postfix = RESULT_FILE; join_path(name, path, seq, postfix); /* * make sure that we're making new inodes for the result * files */ ret = unlink(name); if (ret < 0 && errno != ENOENT) { perror("unlink"); exit(1); } fd = open_path(path, seq, RESULT_FILE, O_CREAT|O_EXCL); if (fd < 0) { return; } offset = 0; } ret = fstat(fd, &st); if (ret < 0) { perror("stat"); exit(1); } write_pattern(fd, xxhash_state, buf, write_size, offset, write_bytes, st.st_ino); XXH32_digest(xxhash_state); maybe_write_through(fd, 0, 0); if (verify_writes && write_size >= BUF_SIZE) read_whole_fd(fd, name, buf, BUF_SIZE); close(fd); } /* make all the worker files under a main path */ static void make_files(char *path, unsigned long seq_start, unsigned long seq_num) { unsigned long seq; int fd; void *xxhash_state = XXH32_init(global_rand_seed); int ret; char *buf; ret = posix_memalign((void **)(&buf), getpagesize(), BUF_SIZE); if (ret) { perror("posix_memalign"); exit(1); } for (seq = seq_start; seq < seq_start + seq_num; seq++) { if (read_size && check_initial_files) read_whole_file(path, seq, DATA_FILE, buf, BUF_SIZE); if (zallocate) { char *kind; loff_t this_size = randomize_size(file_size); if (ignore_crcs) kind = DATA_FILE; else kind = FILL_FILE; fd = open_path(path, seq, kind, O_CREAT); ret = fallocate(fd, 0, 0, this_size); if (ret) { perror("fallocate"); exit(1); } } else { int flags = O_CREAT|O_APPEND; if (truncate_original) flags |= O_TRUNC; fd = open_path(path, seq, DATA_FILE, flags); fill_one_file(fd, xxhash_state, buf, BUF_SIZE); } close(fd); /* cleanup from the last run */ unlink_extra(path, seq, buf, BUF_SIZE); } free(buf); /* just to free the state */ XXH32_digest(xxhash_state); } struct filler { char *path; unsigned long seq_start; unsigned long seq_num; }; void *filler_thread(void *arg) { struct filler *filler = arg; make_dirs(filler->path); make_files(filler->path, filler->seq_start, filler->seq_num); free(filler); return 0; } /* start one thread per path, create the directory tree */ void run_filler_threads(void) { int i; int ret; int j; pthread_t *tids; pthread_t *this_tid; struct timeval now; struct timeval start; unsigned long long delta; double seconds; tids = malloc(sizeof(*tids) * total_paths * FILES_SPLIT); if (!tids) { perror("malloc"); exit(1); } gettimeofday(&start, NULL); fprintf(stderr, "Creating working files\n"); this_tid = tids; for (i = 0; i < total_paths; i++) { fprintf(stderr, "filling %s\n", paths[i]); for (j = 0; j < FILES_SPLIT; j++) { pthread_t tid; struct filler *filler; filler = malloc(sizeof(*filler)); filler->path = paths[i]; filler->seq_start = j * (num_files / FILES_SPLIT); filler->seq_num = num_files / FILES_SPLIT; ret = pthread_create(&tid, NULL, filler_thread, filler); if (ret) { fprintf(stderr, "error %d from pthread_create\n", ret); exit(1); } *this_tid = tid; this_tid++; } } this_tid = tids; for (i = 0; i < total_paths; i++) { for (j = 0; j < FILES_SPLIT; j++) { pthread_join(*this_tid, NULL); this_tid++; } } gettimeofday(&now, NULL); delta = tvdelta(&start, &now); seconds = (double)delta / 1000000; filler_completion_time = seconds; fprintf(stderr, "done creating working files %.2f seconds total time\n", seconds); free(tids); } void read_one(char *buf) { int index = rand() % total_paths; int seq = rand() % num_files; char *path = paths[index]; read_from_file(path, seq, buf); } void write_one(char *buf) { int index = rand() % total_paths; int seq = rand() % num_files; char *path = paths[index]; write_to_file(path, seq, buf); } char *aligned_memory_alloc(unsigned long size) { char *ptr; unsigned long aligned_size = (size + 4095) & ~4095UL; int ret; ptr = mmap(NULL, aligned_size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); if (ptr == MAP_FAILED) { perror("mmap"); exit(1); } ret = madvise(ptr, aligned_size, MADV_HUGEPAGE); if (ret) { perror("madvise huge"); exit(1); } return ptr; } void aligned_memory_free(char *ptr, unsigned long size) { unsigned long aligned_size = (size + 4095) & ~4095UL; if (ptr) munmap(ptr, aligned_size); } /* main work loop */ void *worker_thread(void *arg) { struct timeval now; struct timeval start; struct thread_data *td = arg; char *read_buf = NULL; char *write_buf = NULL; char *mem = NULL; char *mmap_ptr; int warmup_zerod = 0; unsigned long i; if (thinking_mem) { mem = malloc(thinking_mem); if (!mem) { perror("allocation failed\n"); exit(1); } } while(!stopping) { /* * reset our stats after warmup so we don't have noise * from initial thread creation */ if (warmup_done && !warmup_zerod) { memset(td->stats, 0, sizeof(*td->stats) * TOTAL_STATS); warmup_zerod = 1; } if (read_size) { if (!read_buf) read_buf = aligned_memory_alloc(read_size); } else { read_buf = NULL; } if (write_size) { if (!write_buf) write_buf = aligned_memory_alloc(write_size); } else { write_buf = NULL; } /* if someone swapped out our thinking mem, bring it back */ if (thinking_mem) memset(mem, 0, thinking_mem); /* Start the threads to read files */ if (read_size) { gettimeofday(&start, NULL); read_one(read_buf); gettimeofday(&now, NULL); /* * record how long the reading stage took. This * includes all of the latencies for thread creation, * doing the reads and waiting for completeion */ record_one_lat(&td->stats[READ_STATS], &start, &now); } /* write out the (pretend) results */ if (write_size) { gettimeofday(&start, NULL); write_one(write_buf); gettimeofday(&now, NULL); record_one_lat(&td->stats[WRITE_STATS], &start, &now); } /* * we also track the latency to allocate and fault in * a chunk of pages. This is basicaly the user-visible * impact of allocation stalls */ if (mmap_size) { gettimeofday(&start, NULL); mmap_ptr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); if (mmap_ptr == MAP_FAILED) { perror("mmap"); exit(1); } /* fault in all those pages */ for (i = 0; i < mmap_size; i += 4096) { mmap_ptr[i] = 'a'; } /* measure how long all of this took */ gettimeofday(&now, NULL); record_one_lat(&td->stats[ALLOC_STATS], &start, &now); munmap(mmap_ptr, mmap_size); } usec_spin(cputime); td->work_done++; if (sleeptime) usleep(sleeptime); } aligned_memory_free(read_buf, read_size); aligned_memory_free(write_buf, write_size); free(mem); return NULL; } /* * we want to keep the CPUs saturated so kswapd has to compete for CPU time * these cpu threads don't do IO. */ static void *cpu_thread(void *arg) { char *unused = arg; arg = unused; while(!stopping) { usec_spin(cputime); usleep(1); } return NULL; } static void save_vmstat_rates(struct vmstat_info *vmstat_info) { int i; for (i = 0; i < TOTAL_VMSTATS; i++) { vmstat_info->last_rate[i] = vmstat_info->rate[i]; } } static void save_instant_vmstat_rates(struct vmstat_info *vmstat_info) { int i; for (i = 0; i < TOTAL_VMSTATS; i++) { vmstat_info->instant_rate[i] = vmstat_info->rate[i]; } } /* * read in /proc/vmstat so we can sum the allocation stall lines and * print them out */ static void read_vmstat(struct vmstat_info *vmstat_info) { int val; FILE * fp; char * line = NULL; size_t len = 0; ssize_t read; int i; fp = fopen("/proc/vmstat", "r"); if (fp == NULL) return; memset(vmstat_info->rate, 0, sizeof(double) * TOTAL_VMSTATS); while ((read = getline(&line, &len, fp)) != -1) { /* * newer kernels break out different types of allocstall, * just add them all together */ for (i = 0; i < TOTAL_VMSTATS; i++) { if (strstr(line, vmstat_labels[i])) { char *p = strchr(line, ' '); if (p && p[1] != '\0') { val = atoi(p + 1); vmstat_info->rate[i] += val; } } } } if (line) free(line); fclose(fp); } /* * every worker thread tracks latencies individually. This pulls them all * into a single destination stat array for printing */ static void collect_stats(struct stats *dest, struct thread_data *worker_threads_mem) { int i; int j; memset(dest, 0, sizeof(*dest) * TOTAL_STATS); for (i = 0; i < TOTAL_STATS; i++) { for (j = 0; j < worker_threads; j++) combine_stats(&dest[i], &worker_threads_mem[j].stats[i]); } for (i = 0; i < TOTAL_STATS; i++) { unsigned int p50 = 0, p95 = 0, p99 = 0; calc_p99(&dest[i], &p50, &p95, &p99); printf("%s usec: (p50: %'d) (p95: %'d) (p99: %'d) (max: %'d)\n", stat_labels[i], p50, p95, p99, dest[i].max); } } /* * print out the current stats, along with averages and latency histogram * numbers */ static void print_latencies(struct thread_data *worker_threads_mem, struct stats *stats, struct stats *work_done_stats, struct vmstat_info *vmstat_info, double work_done, double instant_work_done, unsigned long long delta, unsigned long long instant_delta) { double rate; double instant_rate; double seconds = (double)delta / 1000000; unsigned int p50, p95, p99; int i; printf("___\n"); printf("Run time: %.0f seconds\n", seconds); /* this also prints the histogram results from the workers */ collect_stats(stats, worker_threads_mem); /* calculate the work done over this period, add to histogram */ rate = (work_done * 1000000) / delta; instant_rate = (instant_work_done * 1000000) / instant_delta; add_lat(work_done_stats, rate * 100); calc_p99(work_done_stats, &p50, &p95, &p99); printf("work rate = %.2f/sec (avg %.2f/sec) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n", instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00); for (i = 0; i < TOTAL_VMSTATS; i++) { rate = vmstat_info->rate[i] - vmstat_info->last_rate[i]; if (rate < 0) rate = 0; instant_rate = vmstat_info->rate[i] - vmstat_info->instant_rate[i]; if (instant_rate < 0) instant_rate = 0; rate = (rate * 1000000) / delta; instant_rate = (instant_rate * 1000000) / delta; add_lat(&vmstat_info->stats[i], rate * 100); calc_p99(&vmstat_info->stats[i], &p50, &p95, &p99); printf("%s rate = %.2f/sec (avg: %.2f) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n", vmstat_labels[i], instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00); } } void mysigint(int sig) { fprintf(stderr, "Stopping due to signal %d...\n", sig); __sync_synchronize(); stopping = 1; } /* runtime from the command line is in seconds. Sleep until its up */ static void sleep_for_runtime(struct thread_data *worker_threads_mem) { struct timeval now; struct timeval start; struct timeval rate_start; struct timeval instant_start; unsigned long long delta; unsigned long long rate_delta; unsigned long long instant_delta; unsigned long long runtime_usec = runtime * 1000000; unsigned long long warmup_usec = warmup_seconds * 1000000; double work_done = 0; double instant_work_done = 0; double last_work_done = 0; struct stats stats[TOTAL_STATS]; struct vmstat_info vmstat_info; struct stats work_done_stats; int i; gettimeofday(&start, NULL); rate_start = start; memset(&work_done_stats, 0, sizeof(work_done_stats)); memset(&vmstat_info, 0, sizeof(vmstat_info)); read_vmstat(&vmstat_info); save_vmstat_rates(&vmstat_info); save_instant_vmstat_rates(&vmstat_info); if (interval_seconds > runtime && runtime > 0) interval_seconds = runtime; while(!stopping) { gettimeofday(&now, NULL); instant_start = now; delta = tvdelta(&start, &now); if (!warmup_done && delta > warmup_usec) { printf("Warmup complete (%lu seconds)\n", warmup_seconds); __sync_synchronize(); warmup_done = 1; memset(&work_done_stats, 0, sizeof(work_done_stats)); memset(&vmstat_info, 0, sizeof(vmstat_info)); read_vmstat(&vmstat_info); save_vmstat_rates(&vmstat_info); save_instant_vmstat_rates(&vmstat_info); last_work_done = work_done; rate_start = now; } instant_work_done = work_done; if (runtime_usec == 0 || delta < runtime_usec) sleep(interval_seconds); else break; gettimeofday(&now, NULL); rate_delta = tvdelta(&rate_start, &now); instant_delta = tvdelta(&instant_start, &now); work_done = 0; for (i = 0; i < worker_threads; i++) work_done += worker_threads_mem[i].work_done; read_vmstat(&vmstat_info); print_latencies(worker_threads_mem, stats, &work_done_stats, &vmstat_info, work_done - last_work_done, work_done - instant_work_done, rate_delta, instant_delta); save_instant_vmstat_rates(&vmstat_info); } __sync_synchronize(); stopping = 1; fprintf(stderr, "\n------\nFinal stats:\n\n"); fprintf(stderr, "Initial filler completion time %.2f seconds\n", filler_completion_time); for (i = 0; i < cpu_threads + worker_threads; i++) { pthread_join(worker_threads_mem[i].tid, NULL); } work_done = 0; for (i = 0; i < worker_threads; i++) work_done += worker_threads_mem[i].work_done; gettimeofday(&now, NULL); rate_delta = tvdelta(&rate_start, &now); instant_delta = tvdelta(&instant_start, &now); read_vmstat(&vmstat_info); print_latencies(worker_threads_mem, stats, &work_done_stats, &vmstat_info, work_done - last_work_done, work_done - instant_work_done, rate_delta, instant_delta); } int main(int ac, char **av) { int i; int ret; int index; struct thread_data *worker_threads_mem = NULL; pthread_t *du_tids; setlocale(LC_NUMERIC, ""); parse_options(ac, av); if (du_threads > total_paths) du_threads = total_paths; init_pattern_buffer(pattern_buffer, global_rand_seed); /* du threads might be zero */ du_tids = calloc(du_threads + 1, sizeof(pthread_t)); worker_threads_mem = calloc(worker_threads + cpu_threads, sizeof(struct thread_data)); if (!worker_threads_mem || !du_tids) { perror("calloc"); exit(1); } /* fill up our directory tree. This might take a really long time */ run_filler_threads(); if (fill_only) exit(0); stopping = 0; if (signal(SIGINT, mysigint) == SIG_ERR) { perror("signal"); exit(1); } /* worker threads do the IO and the real stuff */ for (i = 0; i < worker_threads; i++) { pthread_t tid; ret = pthread_create(&tid, NULL, worker_thread, worker_threads_mem + i); if (ret) { perror("pthread_create"); exit(1); } worker_threads_mem[i].tid = tid; } /* CPU threads just soak up cycles */ for (i = 0; i < cpu_threads; i++) { pthread_t tid; ret = pthread_create(&tid, NULL, cpu_thread, worker_threads_mem + i + worker_threads); if (ret) { perror("pthread_create"); exit(1); } worker_threads_mem[i + worker_threads].tid = tid; } /* * du threads read in inodes, the goal is to have it happen on just * a couple of paths */ index = rand(); for (i = 0; i < du_threads; i++) { ret = pthread_create(&du_tids[i], NULL, du_thread, paths[index++ % total_paths]); if (ret) { fprintf(stderr, "error %d from pthread_create\n", ret); exit(1); } } /* let all the magic happen and collect results */ sleep_for_runtime(worker_threads_mem); for (i = 0; i < du_threads; i++) { pthread_join(du_tids[i], NULL); } free(worker_threads_mem); return 0; }