前言
接着上篇文章如何实现一个简单的RunLoop(1),我们来继续分析。
LWURLConnection
LWURLConnection也是这个框架中比较有意思的一部分,同时也比较复杂,相信看完之后对AFN的理解会有很大的帮助。直接上代码:1
2
3
4
5
6
7
8
9
10- (void)performURLConnectionOnRunLoopThread
{
NSMutableURLRequest *request = [NSMutableURLRequest requestWithURL:[NSURL URLWithString:@"http://192.168.1.11:8080/v1/list/post"]];
request.HTTPMethod = @"POST";
NSString *content = @"name=john&address=beijing&mobile=140005";
request.HTTPBody = [content dataUsingEncoding:NSUTF8StringEncoding];
LWURLConnection *conn = [[LWURLConnection alloc] initWithRequest:request delegate:self startImmediately:NO];
[conn scheduleInRunLoop:_lwRunLoopThread.looper];
[conn start];
}
LWURLConnection的初始化
1 | - (instancetype _Nonnull)initWithRequest:(NSMutableURLRequest * _Nullable)request |
可以看到LWURLConnection
内部持有一个LWConnectionInternal
对象,并把LWConnectionInternal
的代理设置为自己。
LWConnectionInternal初始化
1 | - (instancetype)init |
1 | - (void)start |
可以看到,start方法时重启了一个子线程执行startInternal
方法,在该方法里面进行了链接、请求参数准备,以及建立HttpRequest请求,接下来逐步拆解。
建立Socket链接
1 | - (BOOL)establishConnection |
1 | bool LWConnHelper::establishSocket(const char *ip, const int port) |
准备请求头和请求体
负责一些请求参数的设置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30- (void)prepareHttpRequest
{
NSURL *targetURL = request.URL;
NSString *httpMethod = request.HTTPMethod;
NSString *path = targetURL.path;
NSString *host = targetURL.host;
NSMutableString *httpRequestLineAndHeader = [[NSMutableString alloc] init];
NSString *requestLine = [NSString stringWithFormat:@"%@ %@ HTTP/1.1 \r\n",httpMethod, path];
NSString *hostHeader = [NSString stringWithFormat:@"HOST: %@ \r\n", host];
[httpRequestLineAndHeader appendString:requestLine];
[httpRequestLineAndHeader appendString:hostHeader];
NSMutableDictionary *allHTTPHeaderFields = [[NSMutableDictionary alloc] init];
[allHTTPHeaderFields setValue:@"application/x-www-form-urlencoded" forKey:@"Content-Type"];
[allHTTPHeaderFields setValue:@(request.HTTPBody.length) forKey:@"Content-Length"];
[allHTTPHeaderFields setValue:@"wuyunfeng@LWURLConnection" forKey:@"Accept"];
[allHTTPHeaderFields setValue:@"gzip, deflate" forKey:@"Accept-Encoding"];
[allHTTPHeaderFields setValue:@"utf-8" forKey:@"Accept-Charset"];
[allHTTPHeaderFields setValue:@"LWRunLoopAgent" forKey:@"User-Agent"];
[allHTTPHeaderFields setValue:@"no-cache" forKey:@"Cache-Control"];
[allHTTPHeaderFields setValue:@"close" forKey:@"Connection"];
[allHTTPHeaderFields addEntriesFromDictionary:request.allHTTPHeaderFields];
NSString *httpHeaderAndValues = LWHeaderStringFromHTTPHeaderFieldsDictironary(allHTTPHeaderFields);
[httpRequestLineAndHeader appendString:httpHeaderAndValues];
// 把请求行和请求头发送出去
_helper->sendMsg([httpRequestLineAndHeader UTF8String], (int)httpRequestLineAndHeader.length);
}
1 | - (void)prepareHTTPBody |
1 | void LWConnHelper::sendMsg(const char *content, int length) |
建立Http请求
1 | void LWConnHelper::createHttpRequest(int timeoutMills) |
实际上这个名字createHttpRequest
稍微有点欠妥,因为在这个方法里面,已经是在通过IO多路复用技术select等待网络数据回来,请求实际上是在prepareHttpRequest
的时候已经发出去了。
网络回调流程
1 | @class LWConnectionInternal; |
LWConnectionInternal
内部有个LWConnectionInternalDelegate
,会把网络结果代理给LWURLConnection
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50- (void)internal_connection:(LWConnectionInternal * _Nonnull)connection didReceiveData:(NSData * _Nullable)data
{
//The new object must have its selector set with setSelector: and its arguments set with setArgument:atIndex: before it can be invoked. Do not use the alloc/init approach to create NSInvocation objects.
if ([self.delegate respondsToSelector:@selector(lw_connection:didReceiveData:)]) {
id target = self.delegate;
NSMethodSignature *sig = [target methodSignatureForSelector:@selector(lw_connection:didReceiveData:)];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:sig];
invocation.target = self.delegate;
invocation.selector = @selector(lw_connection:didReceiveData:);
id argument = self;
[invocation setArgument:&argument atIndex:2];
[invocation setArgument:&data atIndex:3];
[invocation retainArguments];
LWMessage *msg = [[LWMessage alloc] initWithTarget:invocation aSel:@selector(invoke) withArgument:nil at:0];
[_runloop postMessage:msg];
}
}
- (void)internal_connection:(LWConnectionInternal * _Nonnull)connection didFailWithError:(NSError * _Nullable)error
{
if ([self.delegate respondsToSelector:@selector(lw_connection:didFailWithError:)]) {
id target = self.delegate;
NSMethodSignature *sig = [target methodSignatureForSelector:@selector(lw_connection:didFailWithError:)];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:sig];
invocation.target = self.delegate;
invocation.selector = @selector(lw_connection:didFailWithError:);
id argument = self;
[invocation setArgument:&argument atIndex:2];
[invocation setArgument:&error atIndex:3];
[invocation retainArguments];
LWMessage *msg = [[LWMessage alloc] initWithTarget:invocation aSel:@selector(invoke) withArgument:nil at:0];
[_runloop postMessage:msg];
}
}
- (void)internal_connectionDidFinishLoading:(LWConnectionInternal * _Nonnull)connection
{
if ([self.delegate respondsToSelector:@selector(lw_connectionDidFinishLoading:)]) {
id target = self.delegate;
NSMethodSignature *sig = [target methodSignatureForSelector:@selector(lw_connectionDidFinishLoading:)];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:sig];
invocation.target = self.delegate;
invocation.selector = @selector(lw_connectionDidFinishLoading:);
id argument = self;
[invocation setArgument:&argument atIndex:2];
[invocation retainArguments];
LWMessage *msg = [[LWMessage alloc] initWithTarget:invocation aSel:@selector(invoke) withArgument:nil at:0];
[_runloop postMessage:msg];
}
}
以internal_connectionDidFinishLoading
为例,可以看到它将LWURLConnection
的代理方法(就是最外面的业务层)包装为LWMessage
,调用我们熟悉的[_runloop postMessage:msg]
,进入Runloop的消息队列。
可以看到,LWURLConnection
本身的网络请求是基于Socket和select技术来实现的,然后其网络结果回来之后,就又再次和Runloop打上了交道:将代理方法封装为消息,进入Runloop的消息队列,接下来怎么处理就是Runloop本身的事情了。
LWPort
Mach中最基本的概念是消息,消息在两个端口之间进行传递,从设计上看,任何两个端口之间都可以传递消息——不论是同一台机器上的端口还是远程主机上的端口(本文中讨论的源码是同一台机器上的端口)。消息从一个端口发送到另一个端口,像一个端口发送消息实际上是将消息放在一个队列中,直到消息能被接受者处理。
例如CFRunloop里面有两种CFRunLoopSource:
source0:处理如UIEvent,CFSocket这样的事件
source1:Mach port驱动,CFMachport,CFMessagePort
从字面上看,这里的source1和端口就脱不了干系。
Leader
首先创建一个线程,获取对应runloop,定义一个LWSocketPort
对象_leaderPort,设置其端口为8082,并且把_leaderPort
添加到runloop中。1
2
3
4
5
6
7
8@autoreleasepool
{
LWRunLoop *looper = [LWRunLoop currentLWRunLoop];
_leaderPort = [[LWSocketPort alloc] initWithTCPPort:8082];
_leaderPort.delegate = self;
[looper addPort:_leaderPort forMode:LWDefaultRunLoop];
[looper runMode:LWDefaultRunLoop];
}
1 | - (void)addPort:(LWPort *)aPort forMode:(NSString *)mode |
Follow
同时,利用detachNewThreadSelector
重新创建了一个线程,并定义了一个WorkerClass
对象。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37_worker = [[WorkerClass alloc] init];
[NSThread detachNewThreadSelector:@selector(launchThreadWithPort:) toTarget:_worker withObject:_leaderPort];
- (void)launchThreadWithPort:(LWPort *)port
{
@autoreleasepool {
[self prepare:port];
}
}
- (void)prepare:(LWPort *)port
{
_workPortRunLoopThread = [NSThread currentThread];
[NSThread currentThread].name = @"workerPortLoopThread";
_distantPort = (LWSocketPort *)port;
_localPort = [[LWSocketPort alloc] initWithTCPPort:8082];
_localPort.delegate = self;
[_localPort setType:LWSocketPortRoleTypeFollower];
LWRunLoop *_currentRunLoop = [LWRunLoop currentLWRunLoop];
[_currentRunLoop addPort:_localPort forMode:LWDefaultRunLoop];
[_currentRunLoop runMode:LWDefaultRunLoop];
}
- (void)addPort:(LWPort *)aPort forMode:(NSString *)mode
{
if (_allPorts) {
_allPorts = [[NSMutableArray alloc] init];
}
[_allPorts addObject:aPort];
if ([aPort isKindOfClass:[LWSocketPort class]]) {
LWSocketPort *socketTypePort = (LWSocketPort *)aPort;
int fd = socketTypePort.socket;
LWSocketPortRoleType roleType = LWSocketPortRoleTypeFollower;
LWPortContext context = socketTypePort.context;
[_queue.nativeRunLoop addFd:fd type:LWNativeRunLoopFdSocketClientType filter:LWNativeRunLoopEventFilterRead callback:context.LWPortReceiveDataCallBack data:context.info];
}
}
LWSocket
来看一下LWSocket
的初始化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41- (BOOL)initInternalWithTCPPort:(unsigned short)port
{
if ((_sockFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
return NO;
}
_context.info = (__bridge void *)(self);
_context.LWPortReceiveDataCallBack = PortBasedReceiveDataRoutine;
struct sockaddr_in sockAddr;
memset(&sockAddr, 0, sizeof(sockAddr));
sockAddr.sin_family = AF_INET;
sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
sockAddr.sin_port = htons(port);
_roleType = LWSocketPortRoleTypeLeader;//leader
int option = 1;
setsockopt(_sockFd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option));
//if bind failure, the _sockFd become follower, othrewise leader
if (-1 == bind(_sockFd, (struct sockaddr *)&sockAddr, sizeof(sockAddr))) {
_roleType = LWSocketPortRoleTypeFollower;//follower
}
if (_roleType == LWSocketPortRoleTypeLeader) {
if (listen(_sockFd, 5) == -1) {
return NO;
}
[self setValid:YES];
_port = port;
} else {
//we can ignore the `connect` delay for the local TCP connect
int flag = connect(_sockFd, (struct sockaddr *)&sockAddr, sizeof(sockAddr));
if (-1 == flag) {
return NO;
}
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
getsockname(_sockFd, (struct sockaddr *)&name, &namelen);
_port = name.sin_port;
[self setValid:YES];
}
return YES;
}
- 对于Leader,其主要的功能是定义一个socket,打开一个网络通讯端口,用bind绑定一个固定的网络地址(127.0.0.1)和端口号(8082),然后调用listen监听请求
- 对于Follow,其主要的功能是定义一个socket(其端口是默认分配的),并且调connect连接服务器,connect和bind的参数形式一致,区别在于bind的参数是自己的地址,而connect的参数是对方的地址
- 非常典型的socket编程
消息发送
到此时,FollowPort和LeaderPort都有各自的线程以及对应的runloop,
以Follow向Leader发送消息为例,先把消息打包为LWPortMessage
,然后调用sendBeforeDate
,随后进入internalSendBeforDate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30- (void)actualSendContent:(id)content
{
int length = (int)[content length];
NSMutableData *data = [[NSMutableData alloc] init];
[data appendBytes:&length length:sizeof(int)];
[data appendData:[content dataUsingEncoding:NSUTF8StringEncoding]];
LWPortMessage *message = [[LWPortMessage alloc] initWithSendPort:_localPort receivePort:_distantPort components:data];
[message sendBeforeDate:0];
}
- (BOOL)sendBeforeDate:(NSInteger)delay
{
[self internalSendBeforDate:delay];
return YES;
}
- (void)internalSendBeforDate:(NSInteger)delay
{
LWSocketPort *_sendSocketPort = (LWSocketPort *)_sendPort;
LWSocketPort *_receiveSocketPort = (LWSocketPort *)_receivePort;
LWRunLoop *runloop = [LWRunLoop currentLWRunLoop];
//send `data` from `leader` to `follower`
if (_sendSocketPort.roleType == LWSocketPortRoleTypeLeader) {
short port = _receiveSocketPort.port;
[runloop send:_components toPort:port];
} else {//send `data` from `follower` to `leader`
int fd = _sendSocketPort.socket;
[runloop send:_components toFd:fd];
}
}
可以看到,在internalSendBeforDate
中,首先拿到目的地的port,然后调用runloop的send方法1
2
3
4
5
6
7
8
9
10
11
12- (void)send:(NSData *)data toFd:(int)fd
{
[_queue.nativeRunLoop send:data toFd:fd];
}
- (void)send:(NSData *)data toFd:(int)fd
{
ssize_t nWrite;
do {
nWrite = write(fd, [data bytes], [data length]);
} while (nWrite == -1 && errno == EINTR);
}
消息接受
Leader所在的runloop中kevent检测到了socket文件描述符的变化,然后唤醒该线程,随后调用方法handleFollowerToLeader
进行处理,从中将前面发送的消息读出来,到此一次消息发送接收的流程就结束了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63- (BOOL)handleFollowerToLeader:(int)event fd:(int)fd
{
if (event & EVFILT_READ) {
int length = 0;
ssize_t nRead;
do {
nRead = read(fd, &length, sizeof(int));
} while (nRead == -1 && EINTR == errno);
if (nRead == -1 && EAGAIN == errno) {
//The file was marked for non-blocking I/O, and no data were ready to be read.
return false;
}
//buffer `follower` LWPort send `buffer` to `leader` LWPort
char *buffer = malloc(length);
do {
nRead = read(fd, buffer, length);
} while (nRead == -1 && EINTR == errno);
NSValue *data = [_requests objectForKey:@(_leader)];
PortWrapper request;
[data getValue:&request];
//notify leader
if (request.callback) {
request.callback(fd, request.info, buffer, length);
}
//remember release malloc memory
free(buffer);
buffer = NULL;
struct sockaddr_in sockaddr;
socklen_t len;
int ret = getpeername(fd, (struct sockaddr *)&sockaddr, &len);
if (ret < 0) {
return false;
}
LWPortClientInfo *info = [_portClients valueForKey:[NSString stringWithFormat:@"%d", sockaddr.sin_port]];
if (info.cacheSend && info.cacheSend.length > 0) {
//write cached on next event
[self kevent:fd filter:EVFILT_WRITE action:EV_ADD];
}
} else if (event & EVFILT_WRITE) {
struct sockaddr_in sockaddr;
socklen_t len;
int ret = getpeername(fd, (struct sockaddr *)&sockaddr, &len);
if (ret < 0) {
return false;
}
LWPortClientInfo *info = [_portClients valueForKey:[NSString stringWithFormat:@"%d", sockaddr.sin_port]];
if (info.cacheSend && info.cacheSend.length > 0) {
ssize_t nWrite;
do {
nWrite = write(fd, [info.cacheSend bytes], info.cacheSend.length);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1 && errno != EAGAIN) {
return false;
}
//clean the sending cache
info.cacheSend = nil;
} else {
return false;
}
}
return true;
}
总结
分析完该源码,回头再仔细想想Runloop,如果有可能,再去深究一下,如果相对之前理解上能有或许提升,本文的目的就达到了。
最后再盗用下作者github中的另一张图,也再次感谢作者 wuyunfeng !