1 |
|
|
#ifndef SRC_STREAM_BASE_INL_H_ |
2 |
|
|
#define SRC_STREAM_BASE_INL_H_ |
3 |
|
|
|
4 |
|
|
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
5 |
|
|
|
6 |
|
|
#include "stream_base.h" |
7 |
|
|
|
8 |
|
|
#include "node.h" |
9 |
|
|
#include "env-inl.h" |
10 |
|
|
#include "v8.h" |
11 |
|
|
|
12 |
|
|
namespace node { |
13 |
|
|
|
14 |
|
|
using v8::Signature; |
15 |
|
|
using v8::FunctionCallbackInfo; |
16 |
|
|
using v8::FunctionTemplate; |
17 |
|
|
using v8::HandleScope; |
18 |
|
|
using v8::Local; |
19 |
|
|
using v8::Object; |
20 |
|
|
using v8::PropertyAttribute; |
21 |
|
|
using v8::PropertyCallbackInfo; |
22 |
|
|
using v8::String; |
23 |
|
|
using v8::Value; |
24 |
|
|
|
25 |
|
62890 |
inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) { |
26 |
✗✓ |
125780 |
CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField), |
27 |
|
|
nullptr); |
28 |
|
62890 |
req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this); |
29 |
|
62890 |
} |
30 |
|
|
|
31 |
|
601 |
inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) { |
32 |
|
|
return static_cast<StreamReq*>( |
33 |
|
1202 |
req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField)); |
34 |
|
|
} |
35 |
|
|
|
36 |
|
62866 |
inline void StreamReq::Dispose() { |
37 |
|
125732 |
object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr); |
38 |
✓✗ |
62866 |
delete this; |
39 |
|
62866 |
} |
40 |
|
|
|
41 |
|
63477 |
inline v8::Local<v8::Object> StreamReq::object() { |
42 |
|
63477 |
return GetAsyncWrap()->object(); |
43 |
|
|
} |
44 |
|
|
|
45 |
|
133269 |
inline StreamListener::~StreamListener() { |
46 |
✓✓ |
133269 |
if (stream_ != nullptr) |
47 |
|
117630 |
stream_->RemoveStreamListener(this); |
48 |
✗✓ |
133269 |
} |
49 |
|
|
|
50 |
|
41019 |
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { |
51 |
✗✓ |
41019 |
CHECK_NOT_NULL(previous_listener_); |
52 |
|
41019 |
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); |
53 |
|
41019 |
} |
54 |
|
|
|
55 |
|
30450 |
inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { |
56 |
✗✓ |
30450 |
CHECK_NOT_NULL(previous_listener_); |
57 |
|
30450 |
previous_listener_->OnStreamAfterShutdown(w, status); |
58 |
|
30450 |
} |
59 |
|
|
|
60 |
|
3687 |
inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { |
61 |
✗✓ |
3687 |
CHECK_NOT_NULL(previous_listener_); |
62 |
|
3687 |
previous_listener_->OnStreamAfterWrite(w, status); |
63 |
|
3687 |
} |
64 |
|
|
|
65 |
|
93161 |
inline StreamResource::~StreamResource() { |
66 |
✓✓ |
186881 |
while (listener_ != nullptr) { |
67 |
|
559 |
StreamListener* listener = listener_; |
68 |
|
559 |
listener->OnStreamDestroy(); |
69 |
|
|
// Remove the listener if it didn’t remove itself. This makes the logic |
70 |
|
|
// in `OnStreamDestroy()` implementations easier, because they |
71 |
|
|
// may call generic cleanup functions which can just remove the |
72 |
|
|
// listener unconditionally. |
73 |
✓✓ |
559 |
if (listener == listener_) |
74 |
|
553 |
RemoveStreamListener(listener_); |
75 |
|
|
} |
76 |
✗✓ |
93161 |
} |
77 |
|
|
|
78 |
|
162335 |
inline void StreamResource::PushStreamListener(StreamListener* listener) { |
79 |
✗✓ |
162335 |
CHECK_NOT_NULL(listener); |
80 |
✗✓ |
162335 |
CHECK_NULL(listener->stream_); |
81 |
|
|
|
82 |
|
162335 |
listener->previous_listener_ = listener_; |
83 |
|
162335 |
listener->stream_ = this; |
84 |
|
|
|
85 |
|
162335 |
listener_ = listener; |
86 |
|
162335 |
} |
87 |
|
|
|
88 |
|
161667 |
inline void StreamResource::RemoveStreamListener(StreamListener* listener) { |
89 |
✗✓ |
161667 |
CHECK_NOT_NULL(listener); |
90 |
|
|
|
91 |
|
|
StreamListener* previous; |
92 |
|
|
StreamListener* current; |
93 |
|
|
|
94 |
|
|
// Remove from the linked list. |
95 |
|
162226 |
for (current = listener_, previous = nullptr; |
96 |
|
|
/* No loop condition because we want a crash if listener is not found */ |
97 |
|
|
; previous = current, current = current->previous_listener_) { |
98 |
✗✓ |
162226 |
CHECK_NOT_NULL(current); |
99 |
✓✓ |
162226 |
if (current == listener) { |
100 |
✓✓ |
161667 |
if (previous != nullptr) |
101 |
|
559 |
previous->previous_listener_ = current->previous_listener_; |
102 |
|
|
else |
103 |
|
161108 |
listener_ = listener->previous_listener_; |
104 |
|
161667 |
break; |
105 |
|
|
} |
106 |
|
559 |
} |
107 |
|
|
|
108 |
|
161667 |
listener->stream_ = nullptr; |
109 |
|
161667 |
listener->previous_listener_ = nullptr; |
110 |
|
161667 |
} |
111 |
|
|
|
112 |
|
505329 |
inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { |
113 |
|
505329 |
DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); |
114 |
|
505329 |
return listener_->OnStreamAlloc(suggested_size); |
115 |
|
|
} |
116 |
|
|
|
117 |
|
519477 |
inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { |
118 |
|
519477 |
DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); |
119 |
✓✓ |
519477 |
if (nread > 0) |
120 |
|
470839 |
bytes_read_ += static_cast<uint64_t>(nread); |
121 |
|
519477 |
listener_->OnStreamRead(nread, buf); |
122 |
|
519389 |
} |
123 |
|
|
|
124 |
|
6163 |
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { |
125 |
|
6163 |
DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); |
126 |
|
6163 |
listener_->OnStreamAfterWrite(w, status); |
127 |
|
6161 |
} |
128 |
|
|
|
129 |
|
32874 |
inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { |
130 |
|
32874 |
DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); |
131 |
|
32874 |
listener_->OnStreamAfterShutdown(w, status); |
132 |
|
32874 |
} |
133 |
|
|
|
134 |
|
3590 |
inline void StreamResource::EmitWantsWrite(size_t suggested_size) { |
135 |
|
3590 |
DebugSealHandleScope handle_scope(v8::Isolate::GetCurrent()); |
136 |
|
3590 |
listener_->OnStreamWantsWrite(suggested_size); |
137 |
|
3590 |
} |
138 |
|
|
|
139 |
|
93798 |
inline StreamBase::StreamBase(Environment* env) : env_(env) { |
140 |
|
93798 |
PushStreamListener(&default_listener_); |
141 |
|
93798 |
} |
142 |
|
|
|
143 |
|
1041829 |
inline Environment* StreamBase::stream_env() const { |
144 |
|
1041829 |
return env_; |
145 |
|
|
} |
146 |
|
|
|
147 |
|
56700 |
inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) { |
148 |
|
56700 |
Environment* env = stream_env(); |
149 |
|
|
|
150 |
|
56700 |
HandleScope handle_scope(env->isolate()); |
151 |
|
|
|
152 |
✓✓ |
56700 |
if (req_wrap_obj.IsEmpty()) { |
153 |
✗✓ |
1072 |
if (!env->shutdown_wrap_template() |
154 |
|
1608 |
->NewInstance(env->context()) |
155 |
|
1608 |
.ToLocal(&req_wrap_obj)) { |
156 |
|
|
return UV_EBUSY; |
157 |
|
|
} |
158 |
|
536 |
StreamReq::ResetObject(req_wrap_obj); |
159 |
|
|
} |
160 |
|
|
|
161 |
|
113400 |
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); |
162 |
|
56700 |
ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); |
163 |
|
56700 |
int err = DoShutdown(req_wrap); |
164 |
|
|
|
165 |
✓✓ |
56700 |
if (err != 0) { |
166 |
|
23825 |
req_wrap->Dispose(); |
167 |
|
|
} |
168 |
|
|
|
169 |
|
56700 |
const char* msg = Error(); |
170 |
✗✓ |
56700 |
if (msg != nullptr) { |
171 |
|
|
req_wrap_obj->Set( |
172 |
|
|
env->context(), |
173 |
|
|
env->error_string(), OneByteString(env->isolate(), msg)).Check(); |
174 |
|
|
ClearError(); |
175 |
|
|
} |
176 |
|
|
|
177 |
|
113400 |
return err; |
178 |
|
|
} |
179 |
|
|
|
180 |
|
205524 |
inline StreamWriteResult StreamBase::Write( |
181 |
|
|
uv_buf_t* bufs, |
182 |
|
|
size_t count, |
183 |
|
|
uv_stream_t* send_handle, |
184 |
|
|
v8::Local<v8::Object> req_wrap_obj) { |
185 |
|
205524 |
Environment* env = stream_env(); |
186 |
|
|
int err; |
187 |
|
|
|
188 |
|
205524 |
size_t total_bytes = 0; |
189 |
✓✓ |
1049206 |
for (size_t i = 0; i < count; ++i) |
190 |
|
843682 |
total_bytes += bufs[i].len; |
191 |
|
205524 |
bytes_written_ += total_bytes; |
192 |
|
|
|
193 |
✓✓ |
205524 |
if (send_handle == nullptr) { |
194 |
|
205422 |
err = DoTryWrite(&bufs, &count); |
195 |
✓✓✓✓
|
205422 |
if (err != 0 || count == 0) { |
196 |
|
199334 |
return StreamWriteResult { false, err, nullptr, total_bytes }; |
197 |
|
|
} |
198 |
|
|
} |
199 |
|
|
|
200 |
|
6190 |
HandleScope handle_scope(env->isolate()); |
201 |
|
|
|
202 |
✓✓ |
6190 |
if (req_wrap_obj.IsEmpty()) { |
203 |
✗✓ |
2812 |
if (!env->write_wrap_template() |
204 |
|
4218 |
->NewInstance(env->context()) |
205 |
|
4218 |
.ToLocal(&req_wrap_obj)) { |
206 |
|
|
return StreamWriteResult { false, UV_EBUSY, nullptr, 0 }; |
207 |
|
|
} |
208 |
|
1406 |
StreamReq::ResetObject(req_wrap_obj); |
209 |
|
|
} |
210 |
|
|
|
211 |
|
12379 |
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); |
212 |
|
6190 |
WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); |
213 |
|
|
|
214 |
|
6190 |
err = DoWrite(req_wrap, bufs, count, send_handle); |
215 |
|
6189 |
bool async = err == 0; |
216 |
|
|
|
217 |
✓✓ |
6189 |
if (!async) { |
218 |
|
6 |
req_wrap->Dispose(); |
219 |
|
6 |
req_wrap = nullptr; |
220 |
|
|
} |
221 |
|
|
|
222 |
|
6189 |
const char* msg = Error(); |
223 |
✓✓ |
6189 |
if (msg != nullptr) { |
224 |
|
|
req_wrap_obj->Set(env->context(), |
225 |
|
|
env->error_string(), |
226 |
|
15 |
OneByteString(env->isolate(), msg)).Check(); |
227 |
|
3 |
ClearError(); |
228 |
|
|
} |
229 |
|
|
|
230 |
|
12378 |
return StreamWriteResult { async, err, req_wrap, total_bytes }; |
231 |
|
|
} |
232 |
|
|
|
233 |
|
|
template <typename OtherBase> |
234 |
|
56700 |
SimpleShutdownWrap<OtherBase>::SimpleShutdownWrap( |
235 |
|
|
StreamBase* stream, |
236 |
|
|
v8::Local<v8::Object> req_wrap_obj) |
237 |
|
|
: ShutdownWrap(stream, req_wrap_obj), |
238 |
|
|
OtherBase(stream->stream_env(), |
239 |
|
|
req_wrap_obj, |
240 |
|
56700 |
AsyncWrap::PROVIDER_SHUTDOWNWRAP) { |
241 |
|
56700 |
} |
242 |
|
|
|
243 |
|
23833 |
inline ShutdownWrap* StreamBase::CreateShutdownWrap( |
244 |
|
|
v8::Local<v8::Object> object) { |
245 |
|
23833 |
return new SimpleShutdownWrap<AsyncWrap>(this, object); |
246 |
|
|
} |
247 |
|
|
|
248 |
|
|
template <typename OtherBase> |
249 |
|
6190 |
SimpleWriteWrap<OtherBase>::SimpleWriteWrap( |
250 |
|
|
StreamBase* stream, |
251 |
|
|
v8::Local<v8::Object> req_wrap_obj) |
252 |
|
|
: WriteWrap(stream, req_wrap_obj), |
253 |
|
|
OtherBase(stream->stream_env(), |
254 |
|
|
req_wrap_obj, |
255 |
|
6190 |
AsyncWrap::PROVIDER_WRITEWRAP) { |
256 |
|
6190 |
} |
257 |
|
|
|
258 |
|
5887 |
inline WriteWrap* StreamBase::CreateWriteWrap( |
259 |
|
|
v8::Local<v8::Object> object) { |
260 |
|
5887 |
return new SimpleWriteWrap<AsyncWrap>(this, object); |
261 |
|
|
} |
262 |
|
|
|
263 |
|
93798 |
inline void StreamBase::AttachToObject(v8::Local<v8::Object> obj) { |
264 |
|
93798 |
obj->SetAlignedPointerInInternalField(kStreamBaseField, this); |
265 |
|
93798 |
} |
266 |
|
|
|
267 |
|
1173826 |
inline StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) { |
268 |
✓✓ |
2347652 |
if (obj->GetAlignedPointerFromInternalField(0) == nullptr) |
269 |
|
1 |
return nullptr; |
270 |
|
|
|
271 |
|
|
return static_cast<StreamBase*>( |
272 |
|
2347650 |
obj->GetAlignedPointerFromInternalField(kStreamBaseField)); |
273 |
|
|
} |
274 |
|
|
|
275 |
|
|
|
276 |
|
32874 |
inline void ShutdownWrap::OnDone(int status) { |
277 |
|
32874 |
stream()->EmitAfterShutdown(this, status); |
278 |
|
32874 |
Dispose(); |
279 |
|
32874 |
} |
280 |
|
|
|
281 |
|
3486 |
inline void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) { |
282 |
✗✓ |
3486 |
CHECK_NULL(storage_.data()); |
283 |
|
3486 |
storage_ = std::move(storage); |
284 |
|
3486 |
} |
285 |
|
|
|
286 |
|
6163 |
inline void WriteWrap::OnDone(int status) { |
287 |
|
6163 |
stream()->EmitAfterWrite(this, status); |
288 |
|
6161 |
Dispose(); |
289 |
|
6161 |
} |
290 |
|
|
|
291 |
|
39037 |
inline void StreamReq::Done(int status, const char* error_str) { |
292 |
|
39037 |
AsyncWrap* async_wrap = GetAsyncWrap(); |
293 |
|
39037 |
Environment* env = async_wrap->env(); |
294 |
✓✓ |
39037 |
if (error_str != nullptr) { |
295 |
|
6 |
async_wrap->object()->Set(env->context(), |
296 |
|
|
env->error_string(), |
297 |
|
36 |
OneByteString(env->isolate(), error_str)) |
298 |
|
12 |
.Check(); |
299 |
|
|
} |
300 |
|
|
|
301 |
|
39037 |
OnDone(status); |
302 |
|
39035 |
} |
303 |
|
|
|
304 |
|
928318 |
inline void StreamReq::ResetObject(v8::Local<v8::Object> obj) { |
305 |
|
|
DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField); |
306 |
|
|
|
307 |
|
928318 |
obj->SetAlignedPointerInInternalField(0, nullptr); // BaseObject field. |
308 |
|
928318 |
obj->SetAlignedPointerInInternalField(StreamReq::kStreamReqField, nullptr); |
309 |
|
928318 |
} |
310 |
|
|
|
311 |
|
|
|
312 |
|
|
} // namespace node |
313 |
|
|
|
314 |
|
|
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
315 |
|
|
|
316 |
|
|
#endif // SRC_STREAM_BASE_INL_H_ |