GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 24 31 77.4 %
Date: 2019-08-17 22:35:23 Branches: 10 20 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap-inl.h"
8
#include "node.h"
9
#include "util.h"
10
11
#include "v8.h"
12
13
namespace node {
14
15
// Forward declarations
16
class ShutdownWrap;
17
class WriteWrap;
18
class StreamBase;
19
class StreamResource;
20
21
struct StreamWriteResult {
22
  bool async;
23
  int err;
24
  WriteWrap* wrap;
25
  size_t bytes;
26
};
27
28
using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
29
30
class StreamReq {
31
 public:
32
  static constexpr int kStreamReqField = 1;
33
34
63644
  explicit StreamReq(StreamBase* stream,
35
63644
                     v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
36
63644
    AttachToObject(req_wrap_obj);
37
63644
  }
38
39
63630
  virtual ~StreamReq() = default;
40
  virtual AsyncWrap* GetAsyncWrap() = 0;
41
  v8::Local<v8::Object> object();
42
43
  void Done(int status, const char* error_str = nullptr);
44
  void Dispose();
45
46
39617
  inline StreamBase* stream() const { return stream_; }
47
48
  static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
49
50
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
51
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
52
  // and what we use in C++ after creating these objects from their
53
  // v8::ObjectTemplates, to avoid the overhead of calling the
54
  // constructor explicitly.
55
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
56
57
 protected:
58
  virtual void OnDone(int status) = 0;
59
60
  void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
61
62
 private:
63
  StreamBase* const stream_;
64
};
65
66
57276
class ShutdownWrap : public StreamReq {
67
 public:
68
57277
  ShutdownWrap(StreamBase* stream,
69
               v8::Local<v8::Object> req_wrap_obj)
70
57277
    : StreamReq(stream, req_wrap_obj) { }
71
72
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
73
  void OnDone(int status) override;
74
};
75
76
6354
class WriteWrap : public StreamReq {
77
 public:
78
  void SetAllocatedStorage(AllocatedBuffer&& storage);
79
80
6367
  WriteWrap(StreamBase* stream,
81
            v8::Local<v8::Object> req_wrap_obj)
82
6367
    : StreamReq(stream, req_wrap_obj) { }
83
84
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
85
  void OnDone(int status) override;
86
87
 private:
88
  AllocatedBuffer storage_;
89
};
90
91
92
// This is the generic interface for objects that control Node.js' C++ streams.
93
// For example, the default `EmitToJSStreamListener` emits a stream's data
94
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
95
134848
class StreamListener {
96
 public:
97
  virtual ~StreamListener();
98
99
  // This is called when a stream wants to allocate memory before
100
  // reading data into the freshly allocated buffer (i.e. it is always followed
101
  // by a `OnStreamRead()` call).
102
  // This memory may be statically or dynamically allocated; for example,
103
  // a protocol parser may want to read data into a static buffer if it knows
104
  // that all data is going to be fully handled during the next
105
  // `OnStreamRead()` call.
106
  // The returned buffer does not need to contain `suggested_size` bytes.
107
  // The default implementation of this method returns a buffer that has exactly
108
  // the suggested size and is allocated using malloc().
109
  // It is not valid to return a zero-length buffer from this method.
110
  // It is not guaranteed that the corresponding `OnStreamRead()` call
111
  // happens in the same event loop turn as this call.
112
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
113
114
  // `OnStreamRead()` is called when data is available on the socket and has
115
  // been read into the buffer provided by `OnStreamAlloc()`.
116
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
117
  // with base nullptr in case of an error.
118
  // `nread` is the number of read bytes (which is at most the buffer length),
119
  // or, if negative, a libuv error code.
120
  virtual void OnStreamRead(ssize_t nread,
121
                            const uv_buf_t& buf) = 0;
122
123
  // This is called once a write has finished. `status` may be 0 or,
124
  // if negative, a libuv error code.
125
  // By default, this is simply passed on to the previous listener
126
  // (and raises an assertion if there is none).
127
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
128
129
  // This is called once a shutdown has finished. `status` may be 0 or,
130
  // if negative, a libuv error code.
131
  // By default, this is simply passed on to the previous listener
132
  // (and raises an assertion if there is none).
133
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
134
135
  // This is called by the stream if it determines that it wants more data
136
  // to be written to it. Not all streams support this.
137
  // This callback will not be called as long as there are active writes.
138
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
139
  // true if it is supported by a stream.
140
3414
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
141
142
  // This is called immediately before the stream is destroyed.
143
548
  virtual void OnStreamDestroy() {}
144
145
  // The stream this is currently associated with, or nullptr if there is none.
146
940
  inline StreamResource* stream() { return stream_; }
147
148
 protected:
149
  // Pass along a read error to the `StreamListener` instance that was active
150
  // before this one. For example, a protocol parser does not care about read
151
  // errors and may instead want to let the original handler
152
  // (e.g. the JS handler) take care of the situation.
153
  void PassReadErrorToPreviousListener(ssize_t nread);
154
155
  StreamResource* stream_ = nullptr;
156
  StreamListener* previous_listener_ = nullptr;
157
158
  friend class StreamResource;
159
};
160
161
162
// An (incomplete) stream listener class that calls the `.oncomplete()`
163
// method of the JS objects associated with the wrap objects.
164
188284
class ReportWritesToJSStreamListener : public StreamListener {
165
 public:
166
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
167
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
168
169
 private:
170
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
171
};
172
173
174
// A default emitter that just pushes data chunks as Buffer instances to
175
// JS land via the handle’s .ondata method.
176
188284
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
177
 public:
178
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
179
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
180
};
181
182
183
// A generic stream, comparable to JS land’s `Duplex` streams.
184
// A stream is always controlled through one `StreamListener` instance.
185
94410
class StreamResource {
186
 public:
187
  virtual ~StreamResource();
188
189
  // These need to be implemented on the readable side of this stream:
190
191
  // Start reading from the underlying resource. This is called by the consumer
192
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
193
  // pass data along to the consumer.
194
  virtual int ReadStart() = 0;
195
  // Stop reading from the underlying resource. This is called by the
196
  // consumer when its buffers are full and no more data can be handled.
197
  virtual int ReadStop() = 0;
198
199
  // These need to be implemented on the writable side of this stream:
200
  // All of these methods may return an error code synchronously.
201
  // In that case, the finish callback should *not* be called.
202
203
  // Perform a shutdown operation, and either call req_wrap->Done() when
204
  // finished and return 0, return 1 for synchronous success, or
205
  // a libuv error code for synchronous failures.
206
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
207
  // Try to write as much data as possible synchronously, and modify
208
  // `*bufs` and `*count` accordingly. This is a no-op by default.
209
  // Return 0 for success and a libuv error code for failures.
210
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
211
  // Initiate a write of data. If the write completes synchronously, return 0 on
212
  // success (with bufs modified to indicate how much data was consumed) or a
213
  // libuv error code on failure. If the write will complete asynchronously,
214
  // return 0. When the write completes asynchronously, call req_wrap->Done()
215
  // with 0 on success (with bufs modified to indicate how much data was
216
  // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
217
  // the write completes synchronously, that is, it should never be called
218
  // before DoWrite() has returned.
219
  virtual int DoWrite(WriteWrap* w,
220
                      uv_buf_t* bufs,
221
                      size_t count,
222
                      uv_stream_t* send_handle) = 0;
223
224
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
225
  virtual bool HasWantsWrite() const { return false; }
226
227
  // Optionally, this may provide an error message to be used for
228
  // failing writes.
229
  virtual const char* Error() const;
230
  // Clear the current error (i.e. that would be returned by Error()).
231
  virtual void ClearError();
232
233
  // Transfer ownership of this stream to `listener`. The previous listener
234
  // will not receive any more callbacks while the new listener was active.
235
  void PushStreamListener(StreamListener* listener);
236
  // Remove a listener, and, if this was the currently active one,
237
  // transfer ownership back to the previous listener.
238
  void RemoveStreamListener(StreamListener* listener);
239
240
 protected:
241
  // Call the current listener's OnStreamAlloc() method.
242
  uv_buf_t EmitAlloc(size_t suggested_size);
243
  // Call the current listener's OnStreamRead() method and update the
244
  // stream's read byte counter.
245
  void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
246
  // Call the current listener's OnStreamAfterWrite() method.
247
  void EmitAfterWrite(WriteWrap* w, int status);
248
  // Call the current listener's OnStreamAfterShutdown() method.
249
  void EmitAfterShutdown(ShutdownWrap* w, int status);
250
  // Call the current listener's OnStreamWantsWrite() method.
251
  void EmitWantsWrite(size_t suggested_size);
252
253
  StreamListener* listener_ = nullptr;
254
  uint64_t bytes_read_ = 0;
255
  uint64_t bytes_written_ = 0;
256
257
  friend class StreamListener;
258
};
259
260
261
93874
class StreamBase : public StreamResource {
262
 public:
263
  // 0 is reserved for the BaseObject pointer.
264
  static constexpr int kStreamBaseField = 1;
265
  static constexpr int kOnReadFunctionField = 2;
266
  static constexpr int kStreamBaseFieldCount = 3;
267
268
  static void AddMethods(Environment* env,
269
                         v8::Local<v8::FunctionTemplate> target);
270
271
  virtual bool IsAlive() = 0;
272
  virtual bool IsClosing() = 0;
273
  virtual bool IsIPCPipe();
274
  virtual int GetFD();
275
276
  void CallJSOnreadMethod(ssize_t nread,
277
                          v8::Local<v8::ArrayBuffer> ab,
278
                          size_t offset = 0);
279
280
  // This is named `stream_env` to avoid name clashes, because a lot of
281
  // subclasses are also `BaseObject`s.
282
  Environment* stream_env() const;
283
284
  // Shut down the current stream. This request can use an existing
285
  // ShutdownWrap object (that was created in JS), or a new one will be created.
286
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
287
  // completion, and a libuv error case in case of synchronous failure.
288
  int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
289
290
  // Write data to the current stream. This request can use an existing
291
  // WriteWrap object (that was created in JS), or a new one will be created.
292
  // This will first try to write synchronously using `DoTryWrite()`, then
293
  // asynchronously using `DoWrite()`.
294
  // If the return value indicates a synchronous completion, no callback will
295
  // be invoked.
296
  StreamWriteResult Write(
297
      uv_buf_t* bufs,
298
      size_t count,
299
      uv_stream_t* send_handle = nullptr,
300
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
301
302
  // These can be overridden by subclasses to get more specific wrap instances.
303
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
304
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
305
  // an associated libuv request.
306
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
307
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
308
309
  // One of these must be implemented
310
  virtual AsyncWrap* GetAsyncWrap() = 0;
311
  virtual v8::Local<v8::Object> GetObject();
312
313
  static StreamBase* FromObject(v8::Local<v8::Object> obj);
314
315
 protected:
316
  explicit StreamBase(Environment* env);
317
318
  // JS Methods
319
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
320
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
321
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
322
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
323
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
324
  template <enum encoding enc>
325
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
326
327
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
328
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
329
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
330
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
331
  void AttachToObject(v8::Local<v8::Object> obj);
332
333
  template <int (StreamBase::*Method)(
334
      const v8::FunctionCallbackInfo<v8::Value>& args)>
335
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
336
337
  // Internal, used only in StreamBase methods + env.cc.
338
  enum StreamBaseStateFields {
339
    kReadBytesOrError,
340
    kArrayBufferOffset,
341
    kBytesWritten,
342
    kLastWriteWasAsync,
343
    kNumStreamBaseStateFields
344
  };
345
346
 private:
347
  Environment* env_;
348
  EmitToJSStreamListener default_listener_;
349
350
  void SetWriteResult(const StreamWriteResult& res);
351
  static void AddMethod(Environment* env,
352
                        v8::Local<v8::Signature> sig,
353
                        enum v8::PropertyAttribute attributes,
354
                        v8::Local<v8::FunctionTemplate> t,
355
                        JSMethodFunction* stream_method,
356
                        v8::Local<v8::String> str);
357
358
  friend class WriteWrap;
359
  friend class ShutdownWrap;
360
  friend class Environment;  // For kNumStreamBaseStateFields.
361
};
362
363
364
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
365
// `OtherBase` must have a constructor that matches the `AsyncWrap`
366
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
367
// and be a subclass of `AsyncWrap`.
368
template <typename OtherBase>
369

114552
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
370
 public:
371
  SimpleShutdownWrap(StreamBase* stream,
372
                     v8::Local<v8::Object> req_wrap_obj);
373
374
123828
  AsyncWrap* GetAsyncWrap() override { return this; }
375
376
  SET_NO_MEMORY_INFO()
377
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
378
  SET_SELF_SIZE(SimpleShutdownWrap)
379
};
380
381
template <typename OtherBase>
382

12708
class SimpleWriteWrap : public WriteWrap, public OtherBase {
383
 public:
384
  SimpleWriteWrap(StreamBase* stream,
385
                  v8::Local<v8::Object> req_wrap_obj);
386
387
18254
  AsyncWrap* GetAsyncWrap() override { return this; }
388
389
  SET_NO_MEMORY_INFO()
390
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
391
  SET_SELF_SIZE(SimpleWriteWrap)
392
};
393
394
}  // namespace node
395
396
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
397
398
#endif  // SRC_STREAM_BASE_H_