GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_base.cc Lines: 212 231 91.8 %
Date: 2017-06-14 Branches: 127 248 51.2 %

Line Branch Exec Source
1
#include "stream_base.h"
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "env.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <limits.h>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::Context;
21
using v8::FunctionCallbackInfo;
22
using v8::HandleScope;
23
using v8::Integer;
24
using v8::Local;
25
using v8::Number;
26
using v8::Object;
27
using v8::String;
28
using v8::Value;
29
30
template int StreamBase::WriteString<ASCII>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UTF8>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<UCS2>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<LATIN1>(
37
    const FunctionCallbackInfo<Value>& args);
38
39
40
7100
int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
41
7100
  return ReadStart();
42
}
43
44
45
248
int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
46
248
  return ReadStop();
47
}
48
49
50
2096
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
51
2096
  Environment* env = Environment::GetCurrent(args);
52
53
4192
  CHECK(args[0]->IsObject());
54
4192
  Local<Object> req_wrap_obj = args[0].As<Object>();
55
56
2096
  AsyncWrap* wrap = GetAsyncWrap();
57
2096
  CHECK_NE(wrap, nullptr);
58
2096
  env->set_init_trigger_id(wrap->get_id());
59
  ShutdownWrap* req_wrap = new ShutdownWrap(env,
60
                                            req_wrap_obj,
61
                                            this,
62
2096
                                            AfterShutdown);
63
64
2096
  int err = DoShutdown(req_wrap);
65
2096
  if (err)
66
    delete req_wrap;
67
2096
  return err;
68
}
69
70
71
2090
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
72
2090
  StreamBase* wrap = req_wrap->wrap();
73
2090
  Environment* env = req_wrap->env();
74
75
  // The wrap and request objects should still be there.
76
4180
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
77
78
2090
  HandleScope handle_scope(env->isolate());
79
2090
  Context::Scope context_scope(env->context());
80
81
2090
  Local<Object> req_wrap_obj = req_wrap->object();
82
  Local<Value> argv[3] = {
83
    Integer::New(env->isolate(), status),
84
2090
    wrap->GetObject(),
85
    req_wrap_obj
86
10450
  };
87
88
8360
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
89
2090
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
90
91
4180
  delete req_wrap;
92
2090
}
93
94
95
13046
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
96
13046
  Environment* env = Environment::GetCurrent(args);
97
98
26092
  CHECK(args[0]->IsObject());
99
26092
  CHECK(args[1]->IsArray());
100
101
26092
  Local<Object> req_wrap_obj = args[0].As<Object>();
102
26092
  Local<Array> chunks = args[1].As<Array>();
103
26092
  bool all_buffers = args[2]->IsTrue();
104
105
  size_t count;
106
13046
  if (all_buffers)
107
1575
    count = chunks->Length();
108
  else
109
11471
    count = chunks->Length() >> 1;
110
111
13046
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
112
13046
  uv_buf_t* buf_list = *bufs;
113
114
13046
  size_t storage_size = 0;
115
13046
  uint32_t bytes = 0;
116
  size_t offset;
117
  AsyncWrap* wrap;
118
  WriteWrap* req_wrap;
119
  int err;
120
121
13046
  if (!all_buffers) {
122
    // Determine storage size first
123
63802
    for (size_t i = 0; i < count; i++) {
124
52331
      storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
125
126
104662
      Local<Value> chunk = chunks->Get(i * 2);
127
128
52331
      if (Buffer::HasInstance(chunk))
129
22182
        continue;
130
        // Buffer chunk, no additional storage required
131
132
      // String chunk
133
60298
      Local<String> string = chunk->ToString(env->isolate());
134
      enum encoding encoding = ParseEncoding(env->isolate(),
135
60298
                                             chunks->Get(i * 2 + 1));
136
      size_t chunk_size;
137

32724
      if (encoding == UTF8 && string->Length() > 65535)
138
        chunk_size = StringBytes::Size(env->isolate(), string, encoding);
139
      else
140
30149
        chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
141
142
30149
      storage_size += chunk_size;
143
    }
144
145
11471
    if (storage_size > INT_MAX)
146
      return UV_ENOBUFS;
147
  } else {
148
851328
    for (size_t i = 0; i < count; i++) {
149
1699506
      Local<Value> chunk = chunks->Get(i);
150
849753
      bufs[i].base = Buffer::Data(chunk);
151
849753
      bufs[i].len = Buffer::Length(chunk);
152
849753
      bytes += bufs[i].len;
153
    }
154
155
    // Try writing immediately without allocation
156
1575
    err = DoTryWrite(&buf_list, &count);
157

1575
    if (err != 0 || count == 0)
158
      goto done;
159
  }
160
161
11866
  wrap = GetAsyncWrap();
162
11866
  CHECK_NE(wrap, nullptr);
163
11866
  env->set_init_trigger_id(wrap->get_id());
164
11866
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
165
166
11866
  offset = 0;
167
11866
  if (!all_buffers) {
168
63802
    for (size_t i = 0; i < count; i++) {
169
104662
      Local<Value> chunk = chunks->Get(i * 2);
170
171
      // Write buffer
172
52331
      if (Buffer::HasInstance(chunk)) {
173
22182
        bufs[i].base = Buffer::Data(chunk);
174
22182
        bufs[i].len = Buffer::Length(chunk);
175
22182
        bytes += bufs[i].len;
176
22182
        continue;
177
      }
178
179
      // Write string
180
30149
      offset = ROUND_UP(offset, WriteWrap::kAlignSize);
181
30149
      CHECK_LE(offset, storage_size);
182
30149
      char* str_storage = req_wrap->Extra(offset);
183
30149
      size_t str_size = storage_size - offset;
184
185
60298
      Local<String> string = chunk->ToString(env->isolate());
186
      enum encoding encoding = ParseEncoding(env->isolate(),
187
60298
                                             chunks->Get(i * 2 + 1));
188
      str_size = StringBytes::Write(env->isolate(),
189
                                    str_storage,
190
                                    str_size,
191
                                    string,
192
30149
                                    encoding);
193
30149
      bufs[i].base = str_storage;
194
30149
      bufs[i].len = str_size;
195
30149
      offset += str_size;
196
30149
      bytes += str_size;
197
    }
198
  }
199
200
11866
  err = DoWrite(req_wrap, buf_list, count, nullptr);
201
35598
  req_wrap_obj->Set(env->async(), True(env->isolate()));
202
203
11866
  if (err)
204
    req_wrap->Dispose();
205
206
 done:
207
13046
  const char* msg = Error();
208
13046
  if (msg != nullptr) {
209
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
210
    ClearError();
211
  }
212
39138
  req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
213
214
13046
  return err;
215
}
216
217
218
219
220
1400
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
221
2800
  CHECK(args[0]->IsObject());
222
223
1400
  Environment* env = Environment::GetCurrent(args);
224
225
2800
  if (!args[1]->IsUint8Array()) {
226
1
    env->ThrowTypeError("Second argument must be a buffer");
227
1
    return 0;
228
  }
229
230
2798
  Local<Object> req_wrap_obj = args[0].As<Object>();
231
1399
  const char* data = Buffer::Data(args[1]);
232
1399
  size_t length = Buffer::Length(args[1]);
233
234
  AsyncWrap* wrap;
235
  WriteWrap* req_wrap;
236
  uv_buf_t buf;
237
1399
  buf.base = const_cast<char*>(data);
238
1399
  buf.len = length;
239
240
  // Try writing immediately without allocation
241
1399
  uv_buf_t* bufs = &buf;
242
1399
  size_t count = 1;
243
1399
  int err = DoTryWrite(&bufs, &count);
244
1399
  if (err != 0)
245
    goto done;
246
1399
  if (count == 0)
247
1042
    goto done;
248
357
  CHECK_EQ(count, 1);
249
250
357
  wrap = GetAsyncWrap();
251
357
  if (wrap != nullptr)
252
357
    env->set_init_trigger_id(wrap->get_id());
253
  // Allocate, or write rest
254
357
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
255
256
357
  err = DoWrite(req_wrap, bufs, count, nullptr);
257
1071
  req_wrap_obj->Set(env->async(), True(env->isolate()));
258
714
  req_wrap_obj->Set(env->buffer_string(), args[1]);
259
260
357
  if (err)
261
    req_wrap->Dispose();
262
263
 done:
264
1399
  const char* msg = Error();
265
1399
  if (msg != nullptr) {
266
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
267
    ClearError();
268
  }
269
  req_wrap_obj->Set(env->bytes_string(),
270
4197
                    Integer::NewFromUnsigned(env->isolate(), length));
271
1399
  return err;
272
}
273
274
275
template <enum encoding enc>
276
25038
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
277
25038
  Environment* env = Environment::GetCurrent(args);
278


50076
  CHECK(args[0]->IsObject());
279


75114
  CHECK(args[1]->IsString());
280
281
50076
  Local<Object> req_wrap_obj = args[0].As<Object>();
282
50076
  Local<String> string = args[1].As<String>();
283
  Local<Object> send_handle_obj;
284
  AsyncWrap* wrap;
285


50076
  if (args[2]->IsObject())
286
180
    send_handle_obj = args[2].As<Object>();
287
288
  int err;
289
290
  // Compute the size of the storage that the string will be flattened into.
291
  // For UTF8 strings that are very long, go ahead and take the hit for
292
  // computing their actual size, rather than tripling the storage.
293
  size_t storage_size;
294
23292
  if (enc == UTF8 && string->Length() > 65535)
295
12
    storage_size = StringBytes::Size(env->isolate(), string, enc);
296
  else
297
25026
    storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
298
299


25038
  if (storage_size > INT_MAX)
300
    return UV_ENOBUFS;
301
302
  // Try writing immediately if write size isn't too big
303
  WriteWrap* req_wrap;
304
  char* data;
305
  char stack_storage[16384];  // 16kb
306
  size_t data_size;
307
  uv_buf_t buf;
308
309
  bool try_write = storage_size <= sizeof(stack_storage) &&
310






26301
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
311


25038
  if (try_write) {
312
24598
    data_size = StringBytes::Write(env->isolate(),
313
                                   stack_storage,
314
                                   storage_size,
315
                                   string,
316
49196
                                   enc);
317
24598
    buf = uv_buf_init(stack_storage, data_size);
318
319
24598
    uv_buf_t* bufs = &buf;
320
24598
    size_t count = 1;
321
24598
    err = DoTryWrite(&bufs, &count);
322
323
    // Failure
324


24598
    if (err != 0)
325
24350
      goto done;
326
327
    // Success
328


24597
    if (count == 0)
329
24348
      goto done;
330
331
    // Partial write
332


249
    CHECK_EQ(count, 1);
333
  }
334
335
689
  wrap = GetAsyncWrap();
336


689
  if (wrap != nullptr)
337
689
    env->set_init_trigger_id(wrap->get_id());
338
689
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
339
340
689
  data = req_wrap->Extra();
341
342


689
  if (try_write) {
343
    // Copy partial data
344
249
    memcpy(data, buf.base, buf.len);
345
249
    data_size = buf.len;
346
  } else {
347
    // Write it
348
440
    data_size = StringBytes::Write(env->isolate(),
349
                                   data,
350
                                   storage_size,
351
                                   string,
352
880
                                   enc);
353
  }
354
355


689
  CHECK_LE(data_size, storage_size);
356
357
689
  buf = uv_buf_init(data, data_size);
358
359


689
  if (!IsIPCPipe()) {
360
597
    err = DoWrite(req_wrap, &buf, 1, nullptr);
361
  } else {
362
92
    uv_handle_t* send_handle = nullptr;
363
364


92
    if (!send_handle_obj.IsEmpty()) {
365
      HandleWrap* wrap;
366


90
      ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
367
90
      send_handle = wrap->GetHandle();
368
      // Reference StreamWrap instance to prevent it from being garbage
369
      // collected before `AfterWrite` is called.
370


180
      CHECK_EQ(false, req_wrap->persistent().IsEmpty());
371
180
      req_wrap_obj->Set(env->handle_string(), send_handle_obj);
372
    }
373
374
92
    err = DoWrite(
375
        req_wrap,
376
        &buf,
377
        1,
378
92
        reinterpret_cast<uv_stream_t*>(send_handle));
379
  }
380
381
2067
  req_wrap_obj->Set(env->async(), True(env->isolate()));
382
383


689
  if (err)
384
    req_wrap->Dispose();
385
386
 done:
387
25038
  const char* msg = Error();
388


25038
  if (msg != nullptr) {
389
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
390
    ClearError();
391
  }
392
25038
  req_wrap_obj->Set(env->bytes_string(),
393
100152
                    Integer::NewFromUnsigned(env->isolate(), data_size));
394
25038
  return err;
395
}
396
397
398
12382
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
399
12382
  StreamBase* wrap = req_wrap->wrap();
400
12382
  Environment* env = req_wrap->env();
401
402
12382
  HandleScope handle_scope(env->isolate());
403
12382
  Context::Scope context_scope(env->context());
404
405
  // The wrap and request objects should still be there.
406
24764
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
407
408
  // Unref handle property
409
12382
  Local<Object> req_wrap_obj = req_wrap->object();
410
49528
  req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
411
12382
  wrap->OnAfterWrite(req_wrap);
412
413
  Local<Value> argv[] = {
414
    Integer::New(env->isolate(), status),
415
12382
    wrap->GetObject(),
416
    req_wrap_obj,
417
    Undefined(env->isolate())
418
74292
  };
419
420
12382
  const char* msg = wrap->Error();
421
12382
  if (msg != nullptr) {
422
    argv[3] = OneByteString(env->isolate(), msg);
423
    wrap->ClearError();
424
  }
425
426
49528
  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
427
12382
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
428
429
24760
  req_wrap->Dispose();
430
12380
}
431
432
433
23648
void StreamBase::EmitData(ssize_t nread,
434
                          Local<Object> buf,
435
                          Local<Object> handle) {
436
23648
  Environment* env = env_;
437
438
  Local<Value> argv[] = {
439
    Integer::New(env->isolate(), nread),
440
    buf,
441
    handle
442
94592
  };
443
444
23648
  if (argv[1].IsEmpty())
445
6826
    argv[1] = Undefined(env->isolate());
446
447
23648
  if (argv[2].IsEmpty())
448
47012
    argv[2] = Undefined(env->isolate());
449
450
23648
  AsyncWrap* wrap = GetAsyncWrap();
451
23648
  CHECK_NE(wrap, nullptr);
452
23648
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
453
23586
}
454
455
456
4
bool StreamBase::IsIPCPipe() {
457
4
  return false;
458
}
459
460
461
int StreamBase::GetFD() {
462
  return -1;
463
}
464
465
466
14472
Local<Object> StreamBase::GetObject() {
467
14472
  return GetAsyncWrap()->object();
468
}
469
470
471
669
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
472
  // No TryWrite by default
473
669
  return 0;
474
}
475
476
477
50290
const char* StreamResource::Error() const {
478
50290
  return nullptr;
479
}
480
481
482
void StreamResource::ClearError() {
483
  // No-op
484
}
485
486
}  // namespace node