日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

elasticSearch6源码分析(6)http和transport模块

發(fā)布時間:2025/6/15 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 elasticSearch6源码分析(6)http和transport模块 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.http模塊概述

The http module allows to expose Elasticsearch APIs over HTTP.The http mechanism is completely asynchronous in nature, meaning that there is no blocking thread waiting for a response. The benefit of using asynchronous communication for HTTP is solving the C10k problem.When possible, consider using HTTP keep alive when connecting for better performance and try to get your favorite client not to do HTTP chunking.

2.http配置類HttpTransportSettings

public final class HttpTransportSettings {public static final Setting<Boolean> SETTING_CORS_ENABLED =Setting.boolSetting("http.cors.enabled", false, Property.NodeScope);public static final Setting<String> SETTING_CORS_ALLOW_ORIGIN =new Setting<>("http.cors.allow-origin", "", (value) -> value, Property.NodeScope);public static final Setting<Integer> SETTING_CORS_MAX_AGE =Setting.intSetting("http.cors.max-age", 1728000, Property.NodeScope);public static final Setting<String> SETTING_CORS_ALLOW_METHODS =new Setting<>("http.cors.allow-methods", "OPTIONS,HEAD,GET,POST,PUT,DELETE", (value) -> value, Property.NodeScope);public static final Setting<String> SETTING_CORS_ALLOW_HEADERS =new Setting<>("http.cors.allow-headers", "X-Requested-With,Content-Type,Content-Length", (value) -> value, Property.NodeScope);public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS =Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope);public static final Setting<Integer> SETTING_PIPELINING_MAX_EVENTS =Setting.intSetting("http.pipelining.max_events", 10000, Property.NodeScope);public static final Setting<Boolean> SETTING_HTTP_COMPRESSION =Setting.boolSetting("http.compression", true, Property.NodeScope);// we intentionally use a different compression level as Netty here as our benchmarks have shown that a compression level of 3 is the// best compromise between reduction in network traffic and added latency. For more details please check #7309.public static final Setting<Integer> SETTING_HTTP_COMPRESSION_LEVEL =Setting.intSetting("http.compression_level", 3, Property.NodeScope);public static final Setting<List<String>> SETTING_HTTP_HOST =listSetting("http.host", emptyList(), Function.identity(), Property.NodeScope);public static final Setting<List<String>> SETTING_HTTP_PUBLISH_HOST =listSetting("http.publish_host", SETTING_HTTP_HOST, Function.identity(), Property.NodeScope);public static final Setting<List<String>> SETTING_HTTP_BIND_HOST =listSetting("http.bind_host", SETTING_HTTP_HOST, Function.identity(), Property.NodeScope);public static final Setting<PortsRange> SETTING_HTTP_PORT =new Setting<>("http.port", "9200-9300", PortsRange::new, Property.NodeScope);public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT =Setting.intSetting("http.publish_port", -1, -1, Property.NodeScope);public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED =Setting.boolSetting("http.detailed_errors.enabled", true, Property.NodeScope);public static final Setting<Boolean> SETTING_HTTP_CONTENT_TYPE_REQUIRED =new Setting<>("http.content_type.required", (s) -> Boolean.toString(true), (s) -> {final boolean value = Booleans.parseBoolean(s);if (value == false) {throw new IllegalArgumentException("http.content_type.required cannot be set to false. It exists only to make a rolling" +" upgrade easier");}return true;}, Property.NodeScope, Property.Deprecated);public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_CONTENT_LENGTH =Setting.byteSizeSetting("http.max_content_length",new ByteSizeValue(100, ByteSizeUnit.MB),new ByteSizeValue(0, ByteSizeUnit.BYTES),new ByteSizeValue(Integer.MAX_VALUE, ByteSizeUnit.BYTES),Property.NodeScope);public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_CHUNK_SIZE =Setting.byteSizeSetting("http.max_chunk_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_HEADER_SIZE =Setting.byteSizeSetting("http.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);public static final Setting<Integer> SETTING_HTTP_MAX_WARNING_HEADER_COUNT =Setting.intSetting("http.max_warning_header_count", -1, -1, Property.NodeScope);public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_WARNING_HEADER_SIZE =Setting.byteSizeSetting("http.max_warning_header_size", new ByteSizeValue(-1), Property.NodeScope);public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_INITIAL_LINE_LENGTH =Setting.byteSizeSetting("http.max_initial_line_length", new ByteSizeValue(4, ByteSizeUnit.KB), Property.NodeScope);// don't reset cookies by default, since I don't think we really need to// note, parsing cookies was fixed in netty 3.5.1 regarding stack allocation, but still, currently, we don't need cookiespublic static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES =Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);// A default of 0 means that by default there is no read timeoutpublic static final Setting<TimeValue> SETTING_HTTP_READ_TIMEOUT =Setting.timeSetting("http.read_timeout", new TimeValue(0), new TimeValue(0), Property.NodeScope);public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =boolSetting("http.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =boolSetting("http.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_SEND_BUFFER_SIZE =Setting.byteSizeSetting("http.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE =Setting.byteSizeSetting("http.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);private HttpTransportSettings() {} }

3.使用Netty4HttpServerTransport

protected void bindServer() {// Bind and start to accept incoming connections. InetAddress hostAddresses[];try {hostAddresses = networkService.resolveBindHostAddresses(bindHosts);} catch (IOException e) {throw new BindHttpException("Failed to resolve host [" + Arrays.toString(bindHosts) + "]", e);}List<TransportAddress> boundAddresses = new ArrayList<>(hostAddresses.length);for (InetAddress address : hostAddresses) {boundAddresses.add(bindAddress(address));}final InetAddress publishInetAddress;try {publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);} catch (Exception e) {throw new BindTransportException("Failed to resolve publish address", e);}final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress);TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress);logger.info("{}", boundAddress);}

4.http消息處理

4.1?RestController 請求分發(fā)器

@Overridepublic void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {if (request.rawPath().equals("/favicon.ico")) {handleFavicon(request, channel);return;}try {tryAllHandlers(request, channel, threadContext);} catch (Exception e) {try {channel.sendResponse(new BytesRestResponse(channel, e));} catch (Exception inner) {inner.addSuppressed(e);logger.error(() ->new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);}}}

4.2 處理request的類RestHandler

以search為例

其具體實現(xiàn)為:RestSearchAction

?

@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parametersfinal RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}usageCount.increment();// execute the action action.accept(channel);}

5.transport概述

The transport module is used for internal communication between nodes within the cluster. Each call that goes from one node to the other uses the transport module (for example, when an HTTP GET request is processed by one node, and should actually be processed by another node that holds the data).The transport mechanism is completely asynchronous in nature, meaning that there is no blocking thread waiting for a response. The benefit of using asynchronous communication is first solving the C10k problem, as well as being the ideal solution for scatter (broadcast) / gather operations such as search in Elasticsearch.

6.tansport配置類TcpTransport

public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";public static final Setting<List<String>> HOST =listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope);public static final Setting<List<String>> BIND_HOST =listSetting("transport.bind_host", HOST, Function.identity(), Setting.Property.NodeScope);public static final Setting<List<String>> PUBLISH_HOST =listSetting("transport.publish_host", HOST, Function.identity(), Setting.Property.NodeScope);public static final Setting<String> PORT =new Setting<>("transport.tcp.port", "9300-9400", Function.identity(), Setting.Property.NodeScope);public static final Setting<Integer> PUBLISH_PORT =intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope);public static final String DEFAULT_PROFILE = "default";// the scheduled internal ping interval setting, defaults to disabled (-1)public static final Setting<TimeValue> PING_SCHEDULE =timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);public static final Setting<Boolean> TCP_NO_DELAY =boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);public static final Setting<Boolean> TCP_KEEP_ALIVE =boolSetting("transport.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);public static final Setting<Boolean> TCP_REUSE_ADDRESS =boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",key -> boolSetting(key, TcpTransport.TCP_NO_DELAY, Setting.Property.NodeScope));public static final Setting.AffixSetting<Boolean> TCP_KEEP_ALIVE_PROFILE = affixKeySetting("transport.profiles.", "tcp_keep_alive",key -> boolSetting(key, TcpTransport.TCP_KEEP_ALIVE, Setting.Property.NodeScope));public static final Setting.AffixSetting<Boolean> TCP_REUSE_ADDRESS_PROFILE = affixKeySetting("transport.profiles.", "reuse_address",key -> boolSetting(key, TcpTransport.TCP_REUSE_ADDRESS, Setting.Property.NodeScope));public static final Setting.AffixSetting<ByteSizeValue> TCP_SEND_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.","send_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope));public static final Setting.AffixSetting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE_PROFILE = affixKeySetting("transport.profiles.","receive_buffer_size", key -> Setting.byteSizeSetting(key, TcpTransport.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope));public static final Setting.AffixSetting<List<String>> BIND_HOST_PROFILE = affixKeySetting("transport.profiles.", "bind_host",key -> listSetting(key, BIND_HOST, Function.identity(), Setting.Property.NodeScope));public static final Setting.AffixSetting<List<String>> PUBLISH_HOST_PROFILE = affixKeySetting("transport.profiles.", "publish_host",key -> listSetting(key, PUBLISH_HOST, Function.identity(), Setting.Property.NodeScope));public static final Setting.AffixSetting<String> PORT_PROFILE = affixKeySetting("transport.profiles.", "port",key -> new Setting<>(key, PORT, Function.identity(), Setting.Property.NodeScope));public static final Setting.AffixSetting<Integer> PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port",key -> intSetting(key, -1, -1, Setting.Property.NodeScope));

7.配置使用Netty4Transport

@Overrideprotected void doStart() {boolean success = false;try {clientBootstrap = createClientBootstrap();if (NetworkService.NETWORK_SERVER.get(settings)) {for (ProfileSettings profileSettings : profileSettings) {createServerBootstrap(profileSettings);bindServer(profileSettings);}}super.doStart();success = true;} finally {if (success == false) {doStop();}}}

8.服務(wù)端啟動TransportService

@Overrideprotected void doStart() {transport.addMessageListener(this);connectionManager.addListener(this);transport.start();if (transport.boundAddress() != null && logger.isInfoEnabled()) {logger.info("{}", transport.boundAddress());for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());}}localNode = localNodeFactory.apply(transport.boundAddress());if (connectToRemoteCluster) {// here we start to connect to the remote clusters remoteClusterService.initializeRemoteClusters();}}

7.客戶端啟動TransportClient

/*** Creates a new TransportClient with the given settings and plugins*/public TransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {this(buildTemplate(settings, Settings.EMPTY, plugins, null));}private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();}final PluginsService pluginsService = newPluginService(providedSettings, plugins);final Settings settings =Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true).build();final List<Closeable> resourcesToClose = new ArrayList<>();final ThreadPool threadPool = new ThreadPool(settings);resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));final NetworkService networkService = new NetworkService(Collections.emptyList());try {final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());for (final ExecutorBuilder<?> builder : threadPool.builders()) {additionalSettings.addAll(builder.getRegisteredSettings());}SettingsModule settingsModule =new SettingsModule(settings, additionalSettings, additionalSettingsFilter, Collections.emptySet());SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));IndicesModule indicesModule = new IndicesModule(Collections.emptyList());List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();entries.addAll(NetworkModule.getNamedWriteables());entries.addAll(searchModule.getNamedWriteables());entries.addAll(indicesModule.getNamedWriteables());entries.addAll(ClusterModule.getNamedWriteables());entries.addAll(pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedWriteables().stream()).collect(Collectors.toList()));NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(searchModule.getNamedXContents().stream(),pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedXContent().stream())).flatMap(Function.identity()).collect(toList()));ModulesBuilder modules = new ModulesBuilder();// plugin modules must be added here, before others or we can get crazy injection errors...for (Module pluginModule : pluginsService.createGuiceModules()) {modules.add(pluginModule);}modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,pluginsService.filterPlugins(ActionPlugin.class), null, null, null);modules.add(actionModule);CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),settingsModule.getClusterSettings());resourcesToClose.add(circuitBreakerService);PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);resourcesToClose.add(bigArrays);modules.add(settingsModule);NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);final Transport transport = networkModule.getTransportSupplier().get();final TransportService transportService = new TransportService(settings, transport, threadPool,networkModule.getTransportInterceptor(),boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),UUIDs.randomBase64UUID()), null, Collections.emptySet());modules.add((b -> {b.bind(BigArrays.class).toInstance(bigArrays);b.bind(PluginsService.class).toInstance(pluginsService);b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);b.bind(Transport.class).toInstance(transport);b.bind(TransportService.class).toInstance(transportService);b.bind(NetworkService.class).toInstance(networkService);}));Injector injector = modules.createInjector();final TransportClientNodesService nodesService =new TransportClientNodesService(settings, transportService, threadPool, failureListner == null? (t, e) -> {} : failureListner);// construct the list of client actionsfinal List<ActionPlugin> actionPlugins = pluginsService.filterPlugins(ActionPlugin.class);final List<Action> clientActions =actionPlugins.stream().flatMap(p -> p.getClientActions().stream()).collect(Collectors.toList());// add all the base actionsfinal List<? extends Action<?>> baseActions =actionModule.getActions().values().stream().map(ActionPlugin.ActionHandler::getAction).collect(Collectors.toList());clientActions.addAll(baseActions);final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService, clientActions);List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>(pluginsService.getGuiceServiceClasses().stream().map(injector::getInstance).collect(Collectors.toList()));resourcesToClose.addAll(pluginLifecycleComponents);transportService.start();transportService.acceptIncomingRequests();ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy,namedWriteableRegistry);resourcesToClose.clear();return transportClient;} finally {IOUtils.closeWhileHandlingException(resourcesToClose);}}

?

總結(jié)

以上是生活随笔為你收集整理的elasticSearch6源码分析(6)http和transport模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亲女禁h啪啪宫交 | 国产噜噜噜噜噜久久久久久久久 | 在线免费av播放 | 亚洲综合区 | 日韩区一区二 | 日日摸夜夜添狠狠添欧美 | 成人hd| 亚洲色图13p | 激情宗合 | 亚洲午夜精品久久久久久app | 好吊日在线观看 | 日韩一区二区在线观看视频 | 免费爱爱网址 | 激情偷乱人成视频在线观看 | 日韩专区在线 | 爱乃なみ加勒比在线播放 | 黑人巨大精品欧美黑白配亚洲 | 午夜视频黄| 蜜色av| 97在线免费视频观看 | 麻豆影视在线免费观看 | 涩色网 | 色综合久久久久久久 | 久久视频在线免费观看 | 亚洲久久久久 | 国产av一区二区三区最新精品 | 亚洲少妇在线 | 亚洲久草| 欧美成人性生活视频 | 欧美高清性xxxxhdvideosex | 三日本三级少妇三级99 | 国产成人在线视频免费观看 | 中文字幕在线字幕中文 | 性久久久久久 | 欧美日韩国产中文字幕 | 99re这里只有精品首页 | 国产综合久久 | 老妇女玩小男生毛片 | 日本免费一区二区三区视频 | 黄色视屏在线免费观看 | 国产午夜成人久久无码一区二区 | 亚洲综合第一页 | 91蝌蚪视频在线观看 | 日本午夜一级 | 欧洲一级黄 | aaa大片十八岁禁止 中文字幕亚洲在线观看 | 国产亚洲精品成人 | 999www | 亚洲成人一区二区在线观看 | aⅴ天堂网| 中文字幕在线第一页 | 日本免费不卡一区二区 | 国产又粗又黄的视频 | 激情欧美一区二区 | 久久精品国产99国产精品 | 韩国一级一片高清免费观看 | 综合伊人 | 影音先锋人妻啪啪av资源网站 | 茄子av | 国内偷拍第一页 | 乱色熟女综合一区二区三区 | 国产日批视频 | 欧洲精品一区二区 | 福利社91 | 亚洲色成人一区二区三区小说 | 免播放器在线视频 | 一区二区三区四区影院 | 国产精品天堂 | 一区二区国产在线观看 | 中文字幕第31页 | 96视频在线 | 四虎网站最新网址 | 女人叉开腿让男人桶 | 操女人的逼逼 | 亚洲欧美日韩第一页 | 国产精品aⅴ | 国产破处视频 | 国产一精品一aⅴ一免费 | 伊人网国产 | 日韩 欧美 精品 | 亚洲av无码一区二区乱孑伦as | 日本三级黄色录像 | 国产亚洲成av人片在线观看桃 | 在线观看免费小视频 | 国产精品免费av一区二区三区 | 久久超碰精品 | 91黄色在线视频 | 青草国产 | 在线观看不卡的av | 久久久久久久久久一区二区三区 | 国产欧美日韩综合精品 | 91久久综合亚洲鲁鲁五月天 | 国产日本精品 | 国产熟妇一区二区三区四区 | 国产一区二区视频播放 | 夜夜骚av一区二区三区 | 草草影院ccyycom | 另类男人与善交video | 波多野结衣视频免费观看 |