GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 24 28 85.7 %
Date: 2020-05-27 22:15:15 Branches: 1 2 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap.h"
8
#include "node.h"
9
#include "util.h"
10
11
#include "v8.h"
12
13
namespace node {
14
15
// Forward declarations
16
class Environment;
17
class ShutdownWrap;
18
class WriteWrap;
19
class StreamBase;
20
class StreamResource;
21
22
struct StreamWriteResult {
23
  bool async;
24
  int err;
25
  WriteWrap* wrap;
26
  size_t bytes;
27
};
28
29
using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
30
31
class StreamReq {
32
 public:
33
  // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
34
  // here because instances derived from StreamReq will also derive from
35
  // BaseObject, and the slots are used for the identical purpose.
36
  enum InternalFields {
37
    kSlot = BaseObject::kSlot,
38
    kStreamReqField = BaseObject::kInternalFieldCount,
39
    kInternalFieldCount
40
  };
41
42
  inline explicit StreamReq(
43
      StreamBase* stream,
44
      v8::Local<v8::Object> req_wrap_obj);
45
46
13836
  virtual ~StreamReq() = default;
47
  virtual AsyncWrap* GetAsyncWrap() = 0;
48
  inline v8::Local<v8::Object> object();
49
50
  inline void Done(int status, const char* error_str = nullptr);
51
  inline void Dispose();
52
53
13837
  StreamBase* stream() const { return stream_; }
54
55
  static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
56
57
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
58
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
59
  // and what we use in C++ after creating these objects from their
60
  // v8::ObjectTemplates, to avoid the overhead of calling the
61
  // constructor explicitly.
62
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
63
64
 protected:
65
  virtual void OnDone(int status) = 0;
66
67
  inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
68
69
 private:
70
  StreamBase* const stream_;
71
};
72
73
6492
class ShutdownWrap : public StreamReq {
74
 public:
75
  inline ShutdownWrap(
76
      StreamBase* stream,
77
      v8::Local<v8::Object> req_wrap_obj);
78
79
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
80
  void OnDone(int status) override;
81
};
82
83
7344
class WriteWrap : public StreamReq {
84
 public:
85
  inline void SetAllocatedStorage(AllocatedBuffer&& storage);
86
87
  inline WriteWrap(
88
      StreamBase* stream,
89
      v8::Local<v8::Object> req_wrap_obj);
90
91
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
92
  void OnDone(int status) override;
93
94
 private:
95
  AllocatedBuffer storage_;
96
};
97
98
99
// This is the generic interface for objects that control Node.js' C++ streams.
100
// For example, the default `EmitToJSStreamListener` emits a stream's data
101
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
102
102779
class StreamListener {
103
 public:
104
  virtual ~StreamListener();
105
106
  // This is called when a stream wants to allocate memory before
107
  // reading data into the freshly allocated buffer (i.e. it is always followed
108
  // by a `OnStreamRead()` call).
109
  // This memory may be statically or dynamically allocated; for example,
110
  // a protocol parser may want to read data into a static buffer if it knows
111
  // that all data is going to be fully handled during the next
112
  // `OnStreamRead()` call.
113
  // The returned buffer does not need to contain `suggested_size` bytes.
114
  // The default implementation of this method returns a buffer that has exactly
115
  // the suggested size and is allocated using malloc().
116
  // It is not valid to return a zero-length buffer from this method.
117
  // It is not guaranteed that the corresponding `OnStreamRead()` call
118
  // happens in the same event loop turn as this call.
119
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
120
121
  // `OnStreamRead()` is called when data is available on the socket and has
122
  // been read into the buffer provided by `OnStreamAlloc()`.
123
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
124
  // with base nullptr in case of an error.
125
  // `nread` is the number of read bytes (which is at most the buffer length),
126
  // or, if negative, a libuv error code.
127
  virtual void OnStreamRead(ssize_t nread,
128
                            const uv_buf_t& buf) = 0;
129
130
  // This is called once a write has finished. `status` may be 0 or,
131
  // if negative, a libuv error code.
132
  // By default, this is simply passed on to the previous listener
133
  // (and raises an assertion if there is none).
134
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
135
136
  // This is called once a shutdown has finished. `status` may be 0 or,
137
  // if negative, a libuv error code.
138
  // By default, this is simply passed on to the previous listener
139
  // (and raises an assertion if there is none).
140
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
141
142
  // This is called by the stream if it determines that it wants more data
143
  // to be written to it. Not all streams support this.
144
  // This callback will not be called as long as there are active writes.
145
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
146
  // true if it is supported by a stream.
147
3422
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
148
149
  // This is called immediately before the stream is destroyed.
150
553
  virtual void OnStreamDestroy() {}
151
152
  // The stream this is currently associated with, or nullptr if there is none.
153
884
  StreamResource* stream() const { return stream_; }
154
155
 protected:
156
  // Pass along a read error to the `StreamListener` instance that was active
157
  // before this one. For example, a protocol parser does not care about read
158
  // errors and may instead want to let the original handler
159
  // (e.g. the JS handler) take care of the situation.
160
  inline void PassReadErrorToPreviousListener(ssize_t nread);
161
162
  StreamResource* stream_ = nullptr;
163
  StreamListener* previous_listener_ = nullptr;
164
165
  friend class StreamResource;
166
};
167
168
169
// An (incomplete) stream listener class that calls the `.oncomplete()`
170
// method of the JS objects associated with the wrap objects.
171
126016
class ReportWritesToJSStreamListener : public StreamListener {
172
 public:
173
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
174
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
175
176
 private:
177
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
178
};
179
180
181
// A default emitter that just pushes data chunks as Buffer instances to
182
// JS land via the handle’s .ondata method.
183
126004
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
184
 public:
185
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
186
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
187
};
188
189
190
// An alternative listener that uses a custom, user-provided buffer
191
// for reading data.
192
12
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
193
 public:
194
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
195
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
196
6
  void OnStreamDestroy() override { delete this; }
197
198
6
  explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
199
200
 private:
201
  uv_buf_t buffer_;
202
};
203
204
205
// A generic stream, comparable to JS land’s `Duplex` streams.
206
// A stream is always controlled through one `StreamListener` instance.
207
63167
class StreamResource {
208
 public:
209
  virtual ~StreamResource();
210
211
  // These need to be implemented on the readable side of this stream:
212
213
  // Start reading from the underlying resource. This is called by the consumer
214
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
215
  // pass data along to the consumer.
216
  virtual int ReadStart() = 0;
217
  // Stop reading from the underlying resource. This is called by the
218
  // consumer when its buffers are full and no more data can be handled.
219
  virtual int ReadStop() = 0;
220
221
  // These need to be implemented on the writable side of this stream:
222
  // All of these methods may return an error code synchronously.
223
  // In that case, the finish callback should *not* be called.
224
225
  // Perform a shutdown operation, and either call req_wrap->Done() when
226
  // finished and return 0, return 1 for synchronous success, or
227
  // a libuv error code for synchronous failures.
228
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
229
  // Try to write as much data as possible synchronously, and modify
230
  // `*bufs` and `*count` accordingly. This is a no-op by default.
231
  // Return 0 for success and a libuv error code for failures.
232
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
233
  // Initiate a write of data. If the write completes synchronously, return 0 on
234
  // success (with bufs modified to indicate how much data was consumed) or a
235
  // libuv error code on failure. If the write will complete asynchronously,
236
  // return 0. When the write completes asynchronously, call req_wrap->Done()
237
  // with 0 on success (with bufs modified to indicate how much data was
238
  // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
239
  // the write completes synchronously, that is, it should never be called
240
  // before DoWrite() has returned.
241
  virtual int DoWrite(WriteWrap* w,
242
                      uv_buf_t* bufs,
243
                      size_t count,
244
                      uv_stream_t* send_handle) = 0;
245
246
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
247
  virtual bool HasWantsWrite() const { return false; }
248
249
  // Optionally, this may provide an error message to be used for
250
  // failing writes.
251
  virtual const char* Error() const;
252
  // Clear the current error (i.e. that would be returned by Error()).
253
  virtual void ClearError();
254
255
  // Transfer ownership of this stream to `listener`. The previous listener
256
  // will not receive any more callbacks while the new listener was active.
257
  inline void PushStreamListener(StreamListener* listener);
258
  // Remove a listener, and, if this was the currently active one,
259
  // transfer ownership back to the previous listener.
260
  inline void RemoveStreamListener(StreamListener* listener);
261
262
 protected:
263
  // Call the current listener's OnStreamAlloc() method.
264
  inline uv_buf_t EmitAlloc(size_t suggested_size);
265
  // Call the current listener's OnStreamRead() method and update the
266
  // stream's read byte counter.
267
  inline void EmitRead(
268
      ssize_t nread,
269
27794
      const uv_buf_t& buf = uv_buf_init(nullptr, 0));
270
  // Call the current listener's OnStreamAfterWrite() method.
271
  inline void EmitAfterWrite(WriteWrap* w, int status);
272
  // Call the current listener's OnStreamAfterShutdown() method.
273
  inline void EmitAfterShutdown(ShutdownWrap* w, int status);
274
  // Call the current listener's OnStreamWantsWrite() method.
275
  inline void EmitWantsWrite(size_t suggested_size);
276
277
  StreamListener* listener_ = nullptr;
278
  uint64_t bytes_read_ = 0;
279
  uint64_t bytes_written_ = 0;
280
281
  friend class StreamListener;
282
};
283
284
285
62837
class StreamBase : public StreamResource {
286
 public:
287
  // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
288
  // because instances deriving from StreamBase generally also derived from
289
  // BaseObject (it's possible for it not to, however).
290
  enum InternalFields {
291
    kSlot = BaseObject::kSlot,
292
    kStreamBaseField = BaseObject::kInternalFieldCount,
293
    kOnReadFunctionField,
294
    kInternalFieldCount
295
  };
296
297
  static void AddMethods(Environment* env,
298
                         v8::Local<v8::FunctionTemplate> target);
299
300
  virtual bool IsAlive() = 0;
301
  virtual bool IsClosing() = 0;
302
  virtual bool IsIPCPipe();
303
  virtual int GetFD();
304
305
  enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
306
307
  v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
308
      ssize_t nread,
309
      v8::Local<v8::ArrayBuffer> ab,
310
      size_t offset = 0,
311
      StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
312
313
  // This is named `stream_env` to avoid name clashes, because a lot of
314
  // subclasses are also `BaseObject`s.
315
748480
  Environment* stream_env() const { return env_; }
316
317
  // Shut down the current stream. This request can use an existing
318
  // ShutdownWrap object (that was created in JS), or a new one will be created.
319
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
320
  // completion, and a libuv error case in case of synchronous failure.
321
  inline int Shutdown(
322
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
323
324
  // Write data to the current stream. This request can use an existing
325
  // WriteWrap object (that was created in JS), or a new one will be created.
326
  // This will first try to write synchronously using `DoTryWrite()`, then
327
  // asynchronously using `DoWrite()`.
328
  // If the return value indicates a synchronous completion, no callback will
329
  // be invoked.
330
  inline StreamWriteResult Write(
331
      uv_buf_t* bufs,
332
      size_t count,
333
      uv_stream_t* send_handle = nullptr,
334
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
335
336
  // These can be overridden by subclasses to get more specific wrap instances.
337
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
338
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
339
  // an associated libuv request.
340
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
341
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
342
343
  // One of these must be implemented
344
  virtual AsyncWrap* GetAsyncWrap() = 0;
345
  virtual v8::Local<v8::Object> GetObject();
346
347
  static inline StreamBase* FromObject(v8::Local<v8::Object> obj);
348
349
 protected:
350
  inline explicit StreamBase(Environment* env);
351
352
  // JS Methods
353
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
354
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
355
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
356
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
357
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
358
  template <enum encoding enc>
359
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
360
  int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
361
362
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
363
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
364
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
365
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
366
  inline void AttachToObject(v8::Local<v8::Object> obj);
367
368
  template <int (StreamBase::*Method)(
369
      const v8::FunctionCallbackInfo<v8::Value>& args)>
370
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
371
372
  // Internal, used only in StreamBase methods + env.cc.
373
  enum StreamBaseStateFields {
374
    kReadBytesOrError,
375
    kArrayBufferOffset,
376
    kBytesWritten,
377
    kLastWriteWasAsync,
378
    kNumStreamBaseStateFields
379
  };
380
381
 private:
382
  Environment* env_;
383
  EmitToJSStreamListener default_listener_;
384
385
  void SetWriteResult(const StreamWriteResult& res);
386
  static void AddMethod(Environment* env,
387
                        v8::Local<v8::Signature> sig,
388
                        enum v8::PropertyAttribute attributes,
389
                        v8::Local<v8::FunctionTemplate> t,
390
                        JSMethodFunction* stream_method,
391
                        v8::Local<v8::String> str);
392
393
  friend class WriteWrap;
394
  friend class ShutdownWrap;
395
  friend class Environment;  // For kNumStreamBaseStateFields.
396
};
397
398
399
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
400
// `OtherBase` must have a constructor that matches the `AsyncWrap`
401
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
402
// and be a subclass of `AsyncWrap`.
403
template <typename OtherBase>
404
12984
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
405
 public:
406
  SimpleShutdownWrap(StreamBase* stream,
407
                     v8::Local<v8::Object> req_wrap_obj);
408
409
25968
  AsyncWrap* GetAsyncWrap() override { return this; }
410
411
3
  SET_NO_MEMORY_INFO()
412
3
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
413
3
  SET_SELF_SIZE(SimpleShutdownWrap)
414
};
415
416
template <typename OtherBase>
417
14688
class SimpleWriteWrap : public WriteWrap, public OtherBase {
418
 public:
419
  SimpleWriteWrap(StreamBase* stream,
420
                  v8::Local<v8::Object> req_wrap_obj);
421
422
27870
  AsyncWrap* GetAsyncWrap() override { return this; }
423
424
  SET_NO_MEMORY_INFO()
425
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
426
  SET_SELF_SIZE(SimpleWriteWrap)
427
};
428
429
}  // namespace node
430
431
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
432
433
#endif  // SRC_STREAM_BASE_H_