日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > c/c++ >内容正文

c/c++

mqtt 发送消息过多_阿里云MQTT服务端注解式消息处理分发与同步调用实践小结

發(fā)布時(shí)間:2025/3/8 c/c++ 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mqtt 发送消息过多_阿里云MQTT服务端注解式消息处理分发与同步调用实践小结 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、前言

前段時(shí)間公司預(yù)研了設(shè)備app端與服務(wù)端的交互方案,出于多方面考量最終選用了阿里云的微服務(wù)隊(duì)列MQTT方案,基于此方案,本人主要實(shí)踐有:

1. 封裝了RocketMQ實(shí)現(xiàn)MQTT訂閱與發(fā)布的實(shí)現(xiàn)細(xì)節(jié);

2. 實(shí)現(xiàn)了注解式分發(fā)處理,可利用如MqttController, MqttTopicMapping等相關(guān)自定義注解的方式來(lái)統(tǒng)一訂閱MQTT的Topic以及消息處理的分發(fā);

3. 使用了一套請(qǐng)求和響應(yīng)的同步機(jī)制來(lái)達(dá)到PUB/SUB異步通信的偽同步調(diào)用。

Github 地址點(diǎn)此鏈接

二、RocketMQ的接入細(xì)節(jié)

1. 為什么服務(wù)端要使用RocketMQ接入

阿里云微消息隊(duì)列MQTT是在以消息隊(duì)列 RocketMQ 為核心存儲(chǔ)的基礎(chǔ)上,實(shí)現(xiàn)更適合移動(dòng)互聯(lián)網(wǎng)和IoT領(lǐng)域的無(wú)狀態(tài)網(wǎng)關(guān),兩者之間具備天然的數(shù)據(jù)互通性。MQTT實(shí)例本身并不提供消息數(shù)據(jù)持久化功能,消息數(shù)據(jù)持久化需要搭配后端的消息存儲(chǔ)實(shí)例來(lái)使用。因此現(xiàn)階段每一個(gè)阿里云MQTT實(shí)例都必須配套一個(gè)消息存儲(chǔ)實(shí)例,即RocketMQ實(shí)例來(lái)提供消息數(shù)據(jù)持久化功能,因此他們之間可以說(shuō)是消息互通的,即可用RocketMQ訂閱的方式來(lái)消費(fèi)用MQTT協(xié)議發(fā)布的消息,同理也可用 MQTT協(xié)議訂閱的方式來(lái)消費(fèi)RocketMQ發(fā)布的消息。

幫助文檔也給出了以下兩種產(chǎn)品的區(qū)別說(shuō)明:

微消息隊(duì)列MQTT基于MQTT協(xié)議實(shí)現(xiàn),單個(gè)客戶(hù)端的處理能力較弱。因此,微消息隊(duì)列MQTT適用于擁有大量在線客戶(hù)端(很多企業(yè)設(shè)備端過(guò)萬(wàn),甚至上百萬(wàn)),但每個(gè)客戶(hù)端消息較少的場(chǎng)景。
相比之下,消息隊(duì)列RocketMQ是面向服務(wù)端的消息引擎,主要用于服務(wù)組件之間的解耦、異步通知、削峰填谷等,服務(wù)器規(guī)模較小(極少企業(yè)服務(wù)器規(guī)模過(guò)萬(wàn)),但需要大量的消息處理,吞吐量要求高。因此,消息隊(duì)列RocketMQ適用于服務(wù)端進(jìn)行大批量的數(shù)據(jù)處理和分析的場(chǎng)景。

基于以上區(qū)別,官方也推薦在移動(dòng)端設(shè)備上使用微消息隊(duì)列MQTT,而在服務(wù)端應(yīng)用中則使用消息隊(duì)列RocketMQ,具體則可以通過(guò) MQTT SDK 以公網(wǎng)訪問(wèn)方式來(lái)實(shí)現(xiàn)設(shè)備間的通信,通過(guò)MQ SDK以?xún)?nèi)網(wǎng)方式來(lái)實(shí)現(xiàn)服務(wù)端通信。

2. RocketMQ如何對(duì)接

RocketMQ與MQTT在消息結(jié)構(gòu)和一些屬性字段上都有一定的映射關(guān)系,具體內(nèi)容(摘自幫助文檔)如下。

微消息隊(duì)列MQTT使用MQTT協(xié)議接入,而消息隊(duì)列RocketMQ使用的是私有協(xié)議,因此,兩者的關(guān)鍵概念存在如下映射關(guān)系。

MQ與MQTT消息結(jié)構(gòu)映射關(guān)系

如上圖所示,MQTT協(xié)議中Topic是多級(jí)結(jié)構(gòu),而消息隊(duì)列RocketMQ的Topic 僅有一級(jí),因此,MQTT中的一級(jí)Topic映射到消息隊(duì)列RocketMQ的Topic,而二級(jí)和三級(jí)Topic則映射到消息隊(duì)列RocketMQ的消息屬性(Properties)中。

消息隊(duì)列 RocketMQ 協(xié)議中的消息(Message)可以擁有自定義屬性(Properties),而MQTT協(xié)議目前的版本不支持屬性,但為了方便對(duì)MQTT協(xié)議中的Header信息和設(shè)備信息進(jìn)行溯源,MQTT的部分信息將被映射到 RocketMQ的消息屬性中,方便使用消息隊(duì)列RocketMQ的SDK接入的用戶(hù)獲取。

目前,微消息隊(duì)列MQTT和消息隊(duì)列RocketMQ支持的屬性字段映射表如下圖所示。使用消息隊(duì)列RocketMQ的SDK的應(yīng)用和使用消息隊(duì)列MQTT的SDK的應(yīng)用進(jìn)行交互時(shí),可以通過(guò)讀寫(xiě)這些屬性字段來(lái)達(dá)到信息獲取或者設(shè)置的目的。

屬性字段映射關(guān)系

3. RocketMQ對(duì)MQTT消息訂閱的實(shí)現(xiàn)

Properties properties = new Properties(); // 在控制臺(tái)創(chuàng)建的Group ID properties.put(PropertyKeyConst.GROUP_ID, "xxx"); // 阿里云AccessKey properties.put(PropertyKeyConst.AccessKey, "xxx"); // 阿里云SecretKey properties.put(PropertyKeyConst.SecretKey, "xxx"); // 在RocketMQ控制臺(tái)的實(shí)例基本信息中可查看到的TCP協(xié)議接入點(diǎn) properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("topic", "*", new MessageListener() { //訂閱全部 Tagpublic Action consume(Message message, ConsumeContext context) {//獲得mqtt消息中的第一級(jí)topicString mqttFirstTopic = message.getTopic();//獲得mqtt消息中除去1級(jí)后的所有topicString mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);//獲得mqtt消息中的messageIdString messageId = message.getUserProperties("UNIQ_KEY");//獲得mqtt消息中的消息體String messageBody = new String(message.getBody());//...return Action.CommitMessage;}}); consumer.start();

實(shí)現(xiàn)主要注意2點(diǎn):

  • 這邊的 MQ 只需要訂閱 MQTT 的一級(jí) Topic 。如果 MQTT 會(huì)發(fā)布2個(gè) Topic 的消息 robot/alarm 和 robot/task/test ,則在此處只需要訂閱 robot 這個(gè)第一級(jí)Topic即可。
  • MQTT 的一些屬性字段可以從 RocketMQ 消息 Message 的 userProperties 字段中獲得,比如上面代碼中通過(guò) message.getUserProperties(PropertyKeyConst.MqttSecondTopic); 可以獲得 MQTT 中的 除去1級(jí)后的所有 Topic 字符串,如上述舉例的2個(gè) Topic 可分別獲得 /alarm 和 /task/test。 具體能夠獲得哪些字段可以參考上一節(jié)的屬性字段映射表,也可自行查看 PropertyKeyConst 類(lèi)中定義的一些字符串常量來(lái)大概知曉。

使用阿里云MQTT控制臺(tái)發(fā)送一個(gè)MQTT消息,如圖所示:

MQTT控制臺(tái)發(fā)送消

在程序中加一個(gè)斷點(diǎn)獲得當(dāng)前Message對(duì)象的字段如下:

Message消息體

上圖可看到userProperties中的一些值,比如qoslevel,mqttSecondTopic等,這些字段都可以在PropertyKeyConst 類(lèi)中找到對(duì)應(yīng)的字符串常量,但是UNIQ_KEY,cleansessionflag等PropertyKeyConst 類(lèi)中并沒(méi)有對(duì)應(yīng)的字符串常量,這邊暫時(shí)就message.getUserProperties("UNIQ_KEY")這樣使用自定義字符量來(lái)獲得。

4. RocketMQ對(duì)MQTT消息發(fā)布的實(shí)現(xiàn)

Properties properties = new Properties(); // 在控制臺(tái)創(chuàng)建的Group ID properties.put(PropertyKeyConst.GROUP_ID, "xxx"); // 阿里云AccessKey properties.put(PropertyKeyConst.AccessKey, "xxx"); // 阿里云SecretKey properties.put(PropertyKeyConst.SecretKey, "xxx"); // 在RocketMQ控制臺(tái)的實(shí)例基本信息中可查看到的TCP協(xié)議接入點(diǎn) properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx"); //設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); Producer producer = ONSFactory.createProducer(properties); // 在發(fā)送消息前,必須調(diào)用 start 方法來(lái)啟動(dòng) Producer,只需調(diào)用一次即可 producer.start();//發(fā)送一個(gè)mqtt消息 String parentTopic = topic.substring(0, topic.indexOf("/")); String subTopic = topic.substring(topic.indexOf("/")); Message msg = new Message(parentTopic, "", message.getBytes()); msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic); msg.putUserProperties(PropertyKeyConst.MqttQOS, qos); msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag); SendResult result = producer.send(msg);
  • 該代碼僅實(shí)現(xiàn)了普通消息的同步發(fā)送,若需發(fā)送順序消息、延時(shí)消息等,可參考SDK幫助文檔創(chuàng)建不同的Producer實(shí)現(xiàn)即可。
  • 上述代碼將需要發(fā)送的MQTT全量Topic拆分成1級(jí)與2級(jí),1級(jí)Topic設(shè)置為MQ中的Topic參數(shù),2級(jí)Topic字符串則設(shè)為userProperties中PropertyKeyConst.MqttSecondTopic的,其他屬相如qoslevel和cleansessionflag等也是通過(guò)userProperties的相關(guān)字段來(lái)設(shè)置。

三、注解式分發(fā)處理的實(shí)現(xiàn)

1. 前置知識(shí)點(diǎn)

1.1 BeanPostProcessor

BeanPostProcessor是Spring IOC容器給我們提供的一個(gè)擴(kuò)展接口。BeanPostProcessor接口定義了兩個(gè)方法:

public interface BeanPostProcessor {// 前置處理Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;// 后置處理Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException; }

Spring中Bean的整個(gè)生命周期如圖所示:

Bean生命周期

postProcessBeforeInitialization()方法與postProcessAfterInitialization()分別對(duì)應(yīng)圖中前置處理和后置處理兩個(gè)步驟將執(zhí)行的方法。這兩個(gè)方法中都傳入了bean對(duì)象實(shí)例的引用,為擴(kuò)展容器的對(duì)象實(shí)例化過(guò)程提供了很大便利,在這兒幾乎可以對(duì)傳入的實(shí)例執(zhí)行任何操作。

可以看到,Spring容器通過(guò)BeanPostProcessor給了我們一個(gè)機(jī)會(huì)對(duì)Spring管理的bean進(jìn)行再加工,注解、AOP等功能的實(shí)現(xiàn)均大量使用了BeanPostProcessor。通過(guò)實(shí)現(xiàn)BeanPostProcessor的接口,在其中處理方法中判斷bean對(duì)象上是否有自定義的一些注解,如果有,則可以對(duì)這個(gè)bean實(shí)例繼續(xù)進(jìn)行其他操作,這也是本例中使用該接口要實(shí)現(xiàn)的主要目的。

1.2 ApplicationListener

在IOC的容器的啟動(dòng)過(guò)程,當(dāng)所有的bean都已經(jīng)處理完成之后,spring ioc容器會(huì)有一個(gè)發(fā)布事件的動(dòng)作。從 AbstractApplicationContext 的源碼中就可以看出:

protected void finishRefresh() {// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this); }

因此當(dāng)所有的bean都初始化完成并被成功裝載后會(huì)觸發(fā)ContextRefreshedEvent事件。

ApplicationListener是spring中用來(lái)監(jiān)聽(tīng)事件(ApplicationEvent)的傳遞,每個(gè)實(shí)現(xiàn)了ApplicationListener接口的bean都會(huì)收到ApplicationEvent對(duì)象的通知,每個(gè)ApplicationListener可根據(jù)事件類(lèi)型只接收處理自己感興趣的事件,因此利用實(shí)現(xiàn)ApplicationListener的接口可以收到監(jiān)聽(tīng)ContextRefreshedEvent動(dòng)作,然后可以寫(xiě)自己的一些處理邏輯,比如初始化環(huán)境,準(zhǔn)備測(cè)試數(shù)據(jù)、加載一些數(shù)據(jù)到內(nèi)存等等。用法如下:

@Component public class TestApplicationListener implements ApplicationListener<ContextRefreshedEvent>{@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//todo:一些處理邏輯}}

1.3 反射

Java反射機(jī)制是在運(yùn)行狀態(tài)中,對(duì)于任意一個(gè)類(lèi),都能夠知道這個(gè)類(lèi)的所有屬性和方法;對(duì)于任意一個(gè)對(duì)象,都能夠調(diào)用它的任意一個(gè)方法和屬性;這種動(dòng)態(tài)獲取的信息以及動(dòng)態(tài)調(diào)用對(duì)象的方法的功能稱(chēng)為java語(yǔ)言的反射機(jī)制。

在Java中,Class類(lèi)與java.lang.reflect類(lèi)庫(kù)一起對(duì)反射技術(shù)進(jìn)行了全力的支持。獲取Class對(duì)象有三種方式:

  • 通過(guò)實(shí)例對(duì)象獲得:Class<?> class = object.getClass();
  • 通過(guò)類(lèi)名獲得:Class<?> class = ClassName.class;
  • 通過(guò)類(lèi)名全路徑獲得:Class<?> class = Class.forName("類(lèi)名全路徑");

反射包中常用的類(lèi)主要有

  • Constructor,表示的類(lèi)的構(gòu)造方法信息,利用它可以在運(yùn)行時(shí)動(dòng)態(tài)創(chuàng)建對(duì)象
  • Field,表示類(lèi)的成員變量信息,通過(guò)它可以在運(yùn)行時(shí)動(dòng)態(tài)修改成員變量的屬性值(包含private)
  • Method,表示類(lèi)的成員方法信息,通過(guò)它可以動(dòng)態(tài)調(diào)用對(duì)象的方法(包含private)

下面說(shuō)明一下本例中用到的一些反射api:

//獲得Class對(duì)象 Class clazz= obj.getClass();//判斷注解B是否在此A上 boolean isAnnotation= A.isAnnotationPresent(B.class);//獲得該clazz上的注解對(duì)象 B b=clazz.getAnnotation(B.class));//獲得本類(lèi)以及父類(lèi)或者父接口中所有的公共方法 Method[] methods=clazz.getMethods();//獲取方法上的所有參數(shù) Parameter[] parameters = method.getParameters();//執(zhí)行某對(duì)象的方法,owner為該對(duì)象,paramValues為入?yún)?shù)組,method為Method對(duì)象 method.invoke(owner, paramValues);

2. 整體實(shí)現(xiàn)思路

  • 自定義注解MqttController,MqttTopicMapping,MqttMessageId,MqttMessageBody;
  • 利用BeanPostProcessor,獲得所有注解了MqttController的bean及其注解值,獲得其所有注解了MqttTopicMapping的Method方法及其注解值,利用兩者的注解值作為其key,分別將bean,Method為value放入不同的map中,記錄所有注解了MqttController的注解值作為下一步需要訂閱的Topic;
  • 利用ApplicationListener在所有bean加載完成后使用實(shí)例化的mqConsumer來(lái)訂閱所有需要訂閱的Topic;
  • 在mq訂閱的處理方法中,根據(jù)消息的全Topic在上述步驟的map中獲得其對(duì)應(yīng)的bean和Method,同時(shí)根據(jù)MqttMessageId,MqttMessageBody來(lái)設(shè)置相關(guān)參數(shù),使用method.invoke(owner, paramValues);實(shí)現(xiàn)方法的調(diào)用,來(lái)達(dá)到消息的處理分發(fā)。
  • 3. 實(shí)現(xiàn)細(xì)節(jié)

    3.1 自定義注解

    @MqttController在類(lèi)上使用,其中parentTopic值為需要監(jiān)聽(tīng)的1級(jí)Topic,其中使用@Component可以使其注解的類(lèi)實(shí)例化為為Bean對(duì)象放入到Spring容器中,基于此才能在利用BeanPostProcessor中獲得其對(duì)象。

    @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface MqttController {/*** 監(jiān)聽(tīng)的父topic** @return 監(jiān)聽(tīng)的父topic*/String parentTopic(); }

    @MqttTopicMapping在方法上使用,其中subTopic的值為需要訂閱的子Topic,與1級(jí)Topic共同組成MQTT的Topic

    @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqttTopicMapping {/*** 訂閱的子topic,默認(rèn)可以只訂閱1級(jí)topic** @return 訂閱的子topic*/String subTopic() default ""; }

    MqttMessageBody在方法參數(shù)上使用,使得參數(shù)自動(dòng)獲得messageBody對(duì)象

    @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqttMessageBody {}

    MqttMessageId在方法參數(shù)上使用,使得參數(shù)自動(dòng)獲得messageId的值

    @Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqttMessageId {}

    自定義注解的使用示例如下:

    @Slf4j @MqttController(parentTopic = "robot1") public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只處理了一級(jí)topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");} }

    3.2 提取Method和Bean對(duì)象

    在MqttHandlerFactory類(lèi)中定義以下幾個(gè)容器,分別存儲(chǔ)mqtt處理類(lèi)的bean,mqtt處理方法以及parentTopic列表

    /*** 用于存儲(chǔ)mqtt處理類(lèi)的bean,key為parentTopic/subTopic*/ private static Map<String, Object> mqttControllers = new HashMap<>();/*** 用于存儲(chǔ)mqtt處理方法,key為parentTopic/subTopic*/ private static Map<String, Method> mqttHandlers = new HashMap<>();/*** 存儲(chǔ)parentTopic列表*/ private static Set<String> parentTopicSet = new HashSet<>();

    利用BeanPostProcessor接口來(lái)處理實(shí)現(xiàn)了自定義注解的Bean對(duì)象。

    具體代碼及注釋如下:

    @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class beanClazz = bean.getClass();//使用的MqttController注解的bean對(duì)象if (beanClazz.isAnnotationPresent(MqttController.class)) {//獲得MqttController的注解值//存儲(chǔ)parentTopic列表String parentTopic = ((MqttController) beanClazz.getAnnotation(MqttController.class)).parentTopic();MqttHandlerFactory.getParentTopicSet().add(parentTopic);for (Method method : beanClazz.getMethods()) {//獲得MqttTopicMapping的Method對(duì)象if (method.isAnnotationPresent(MqttTopicMapping.class)) {//獲得MqttTopicMapping的注解值String subTopic = method.getAnnotation(MqttTopicMapping.class).subTopic();String realTopic;if ("".equals(subTopic)) {realTopic = parentTopic + "/";} else {realTopic = (parentTopic + "/" + subTopic + "/").replaceAll("/+", "/");}if (null != MqttHandlerFactory.getMqttHandler(realTopic)) {throw new MqttBeansException(bean.getClass().getSimpleName() + " topic 重復(fù)定義,值為" + realTopic);}//存儲(chǔ)mqtt處理類(lèi)的beanMqttHandlerFactory.registerMqttHandler(realTopic, method);//存儲(chǔ)mqtt處理方法MqttHandlerFactory.registerMqttController(realTopic, bean);log.info("MqttHandler Mapped "{}" onto {}", realTopic, method.toString());}}}return bean; }

    3.3 mq的消息訂閱與處理

    實(shí)現(xiàn)ApplicationListener接口在所有Bean對(duì)象加載完之后根據(jù)前面記錄的parentTopicSet作為所有需要訂閱的1級(jí)Topic開(kāi)始訂閱。

    在訂閱消息處理中從message信息可以獲得其對(duì)應(yīng)的1級(jí)Topic與2級(jí)Topic,將其處理成MQTT的全Toic并從前面記錄的mqttHandlers,mqttControllers中獲得對(duì)應(yīng)的Method對(duì)象及Bean對(duì)象,從message信息中提取對(duì)應(yīng)的messageId級(jí)messageBody并設(shè)置為使用了@MqttMessageBody和@MqttMessageId的注解的參數(shù)中,利用反射method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);來(lái)實(shí)現(xiàn)方法的調(diào)用。

    具體代碼及注釋如下:

    @Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//...//設(shè)置mqConsumer實(shí)例化所需propertiesConsumer mqConsumer = ONSFactory.createConsumer(properties);Set<String> parentTopicSet = MqttHandlerFactory.getParentTopicSet();if (parentTopicSet.size() == 0) {log.warn("當(dāng)前應(yīng)用并未有任何topic訂閱");}//根據(jù)parentTopic和subTopic訂閱parentTopicSet.forEach(parentTopic -> {log.info("Add a new rocketMq subscription,topic:{}", parentTopic);mqConsumer.subscribe(parentTopic, "*", (message, context) -> {log.debug("MqReceive Message: " + message);//獲得topicString mqttFirstTopic = message.getTopic();String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {//只有1級(jí)topic的情況mqttSecondTopic = "/";}if (!"/".equals(mqttSecondTopic.substring(mqttSecondTopic.length() - 1))) {mqttSecondTopic += "/";}String mqttTopic = mqttFirstTopic + mqttSecondTopic;Method method = MqttHandlerFactory.getMqttHandler(mqttTopic);if (null == method) {log.warn("當(dāng)前沒(méi)有處理該topic的handler,topic:{}", mqttTopic);return Action.CommitMessage;} else {//獲得mqtt的一些數(shù)據(jù)String messageId = message.getUserProperties("UNIQ_KEY");String messageBody = new String(message.getBody());//處理入?yún)arameter[] parameters = method.getParameters();Object[] paramValues = new Object[parameters.length];for (int i = 0; i < parameters.length; i++) {if (parameters[i].isAnnotationPresent(MqttMessageId.class)) {//@MqttMessageId注解的參數(shù)paramValues[i] = messageId;} else if (parameters[i].isAnnotationPresent(MqttMessageBody.class)) {//@MqttMessageBody注解的參數(shù)Class parameterClazz = parameters[i].getType();try {paramValues[i] = JSONObject.parseObject(messageBody, parameterClazz);} catch (Exception e) {log.error("mqttMessageBody 格式錯(cuò)誤,messageId:{},messageBody:{}", messageId, messageBody);// return Action.ReconsumeLater;return Action.CommitMessage;}} else {//自己定義的一些參數(shù)就給null把paramValues[i] = null;}}try {method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);} catch (Exception e) {log.error("處理失敗啦");}}return Action.CommitMessage;});});mqConsumer.start();log.info("MqConsumer Started");}

    四、MQTT同步調(diào)用的實(shí)現(xiàn)

    MQTT協(xié)議是基于PUB/SUB的異步通信模式,不適用于服務(wù)端同步控制設(shè)備端返回結(jié)果的場(chǎng)景。通過(guò)制定一套請(qǐng)求和響應(yīng)的同步機(jī)制,可以無(wú)需改動(dòng)MQTT協(xié)議來(lái)達(dá)到同步調(diào)用的目的。

    1. 整體實(shí)現(xiàn)思路

    MQTT的同步調(diào)用實(shí)際上是使用了兩個(gè)異步調(diào)用完成的,即生產(chǎn)者調(diào)用消費(fèi)者的同時(shí),自己也作為消費(fèi)者等待某一隊(duì)列的返回消息,消費(fèi)者接受到生產(chǎn)者的消息同時(shí),也作為消息發(fā)送者發(fā)送一消息給生產(chǎn)者。

    具體同步調(diào)用機(jī)制示意如下:

    同步調(diào)用示意圖

    首先服務(wù)端和設(shè)備端服務(wù)端都訂閱了相關(guān)的Topic,服務(wù)端發(fā)起同步調(diào)用即發(fā)布一個(gè)示意需同步返回的message到指定request Topic,設(shè)備端接收到該message后處理完業(yè)務(wù)邏輯則會(huì)將調(diào)用結(jié)果發(fā)布一個(gè)返回message到request消息體中攜帶的response Topic中,最后服務(wù)端接收到設(shè)備端返回的message可以從消息體中獲得其調(diào)用結(jié)果。

    整個(gè)調(diào)用過(guò)程中客戶(hù)端需要做的工作有:

  • 訂閱request Topic
  • 收到消息,判斷消息是否是同步消息,是的話則處理完業(yè)務(wù)邏輯后異步發(fā)送特別的response Topic
  • 服務(wù)端需要做的工作有:

  • 統(tǒng)一訂閱特別的設(shè)備的response Topic
  • 發(fā)送消息到特別的request Topic
  • Future.get(timeout) 模式處理request和response的關(guān)系
  • 異步超時(shí)請(qǐng)求線程處理(處理超時(shí)請(qǐng)求,30ms運(yùn)行一次)
  • 消息監(jiān)控和異常補(bǔ)救
  • 服務(wù)端處理異步為同步調(diào)用的邏輯借鑒了Dubbo底層將Netty的異步調(diào)用轉(zhuǎn)化成同步的方式,下面在實(shí)現(xiàn)細(xì)節(jié)中會(huì)具體闡述。

    2. 實(shí)現(xiàn)細(xì)節(jié)

    MQTT同步調(diào)用的代碼如下:

    public String publishSync(String topic, String qos, boolean cleanSessionFlag, Object data, int timeout)throws MqttRemoteException {String parentTopic = topic.substring(0, topic.indexOf("/"));String subTopic = topic.substring(topic.indexOf("/"));String mId = UUID.randomUUID().toString().replaceAll("-", "");MqttMessage mqttMessage = new MqttMessage(mId, replyParentTopic + "/" + mId, data);Message msg = new Message(parentTopic, "", JSON.toJSONString(mqttMessage).getBytes());msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);MqttFuture mqttFuture = new MqttFuture(mqttMessage, timeout);try {producer.send(msg);} catch (ONSClientException e) {mqttFuture.cancel();throw e;}return mqttFuture.get(); }

    前部分就是正常RocketMQ發(fā)布MQTT消息的代碼,但是發(fā)送的消息是自定義的MqttMessage,同時(shí)這邊在調(diào)用producer.send(msg);前先構(gòu)建了一個(gè)MqttFuture對(duì)象然后發(fā)送完消息后使用mqttFuture.get();來(lái)獲得同步調(diào)用的結(jié)果。

    MqttMessage定義如下,其中syncFlag表示該消息是否需要同步返回,mId表示該消息的唯一id,用于本地判斷消息返回具體映射哪個(gè)同步調(diào)用的key,replyTopic表示該消息需要返回消息的Topic,data則是具體業(yè)務(wù)數(shù)據(jù)。

    @Data public class MqttMessage implements Serializable {private static final long serialVersionUID = 6648680154051903549L;/*** 是否需要同步返回,默認(rèn)為true*/private boolean syncFlag = true;/*** 生成的id,用uuid生成把*/private final String mId;/*** 客戶(hù)端返回消息的Topic*/private final String replyTopic;/*** 發(fā)送數(shù)據(jù)*/private Object data;public MqttMessage(String mId, String replyTopic, Object data) {this.mId = mId;this.replyTopic = replyTopic;this.data = data;} }

    MqttFuture對(duì)象則是用來(lái)處理同步調(diào)用的邏輯,每一個(gè)MqttFuture對(duì)象都有有一個(gè)mId作為唯一標(biāo)識(shí),發(fā)送的message消息體,調(diào)用返回結(jié)果MqttResponse(包括正常mqtt返回或者超時(shí)等異常返回結(jié)果)和同步調(diào)用超時(shí)時(shí)間,還有一個(gè)鎖及其創(chuàng)建的Condition,用來(lái)處理線程的等待與通知,后面會(huì)對(duì)其具體邏輯進(jìn)行分析。每創(chuàng)建一個(gè)新的MqttFuture對(duì)象都會(huì)將其放入到存儲(chǔ)MqttFuture的Map中,key即為該消息的mId。

    @Slf4j @Data public class MqttFuture {public static final int DEFAULT_TIMEOUT = 1000;public static final Map<String, MqttFuture> FUTURES = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();/*** 唯一id*/private final String mId;/*** 發(fā)送的message消息體*/private final MqttMessage message;/*** 設(shè)置的同步調(diào)用超時(shí)時(shí)間*/private final int timeout;/*** 等待開(kāi)始時(shí)間*/private final long start = System.currentTimeMillis();/*** 返回結(jié)果*/private volatile MqttResponse response;public MqttFuture(MqttMessage message, int timeout) {this.message = message;this.timeout = timeout;this.mId = message.getMId();FUTURES.put(mId, this);}//...

    調(diào)用返回結(jié)果MqttResponse定義如下,如果是正常成功返回則返回狀態(tài)是OK且會(huì)有對(duì)應(yīng)的消息體messageResult,如果超時(shí)或者其他異常情況則會(huì)返回對(duì)應(yīng)的錯(cuò)誤消息errorMessage。

    @Data public class MqttResponse {/*** ok狀態(tài),正常返回result,否則返回errorMessage*/public static final Integer OK = 20000;/*** 客戶(hù)端超時(shí)未處理*/public static final Integer TIMEOUT = 40001;/*** 服務(wù)端主動(dòng)取消*/public static final Integer CANCEL = 40002;private Integer mStatus = OK;/*** request生成的messageId*/private String mId;/*** 收到的消息體*/private String messageResult;/*** 狀態(tài)不是成功返回的錯(cuò)誤信息*/private String errorMessage;public MqttResponse(String mId) {this.mId = mId;} }

    在上述MQ同步調(diào)用代碼中,若調(diào)用producer.send(msg);同步發(fā)送mqtt消息失敗的話,則會(huì)調(diào)用mqttFuture.cancel();來(lái)取消該MqttFture對(duì)象,代碼如下,這邊主要是設(shè)置了異常的相應(yīng)response避過(guò)將該MqttFuture對(duì)象從存儲(chǔ)MqttFuture的Map中移除。

    @Slf4j @Data public class MqttFuture {//...public void cancel() {MqttResponse errorResult = new MqttResponse(mId);errorResult.setMStatus(MqttResponse.CANCEL);errorResult.setErrorMessage("主動(dòng)請(qǐng)求取消");response = errorResult;FUTURES.remove(mId);}//...

    若調(diào)用producer.send(msg);同步發(fā)送mqtt消息成功的話則會(huì)調(diào)用mqttFuture.get();來(lái)獲得其同步調(diào)用的結(jié)果,具體代碼如下,首先判斷是否調(diào)用已完成(有響應(yīng)結(jié)果mqttResponse,包括正常獲得返回結(jié)果或者超時(shí)等其他異常的情況),若完成的話則直接返回調(diào)用結(jié)果或者拋出相應(yīng)的異常,若沒(méi)有調(diào)用完成則獲得鎖并循環(huán)判斷是否調(diào)用完成,沒(méi)有的話則調(diào)用done.await(timeout, TimeUnit.MILLISECONDS);來(lái)實(shí)現(xiàn)該線程的超時(shí)等待直至其他線程調(diào)用該Condition的signal方法將其喚醒。

    @Slf4j @Data public class MqttFuture {//...public String get() throws MqttRemoteException {return this.get(timeout);}public String get(int timeout) throws MqttRemoteException {if (timeout <= 0) {timeout = DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new MqttTimeoutException("Waiting client-side response timeout");}}if (response == null) {throw new IllegalStateException("response cannot be null");}if (response.getMStatus().equals(MqttResponse.OK)) {return response.getMessageResult();}if (response.getMStatus().equals(MqttResponse.TIMEOUT)) {throw new MqttTimeoutException("Waiting client-side response timeout");}throw new MqttRemoteException(response.getErrorMessage());}/*** 判斷是否有response結(jié)果** @return 是否返回結(jié)果*/public boolean isDone() {return response != null;}//...

    將上述等待喚醒的代碼如下,只需調(diào)用下面的received方法并傳入相應(yīng)的MqttResponse結(jié)果即可,該方法會(huì)在后面講的判斷調(diào)用超時(shí)和正常mqtt消息結(jié)果返回的情況中調(diào)用。

    @Slf4j @Data public class MqttFuture {//...private void doReceived(MqttResponse res) {lock.lock();try {response = res;done.signal();} finally {lock.unlock();}}public static void received(MqttResponse response) {MqttFuture future = FUTURES.remove(response.getMId());if (future != null) {future.doReceived(response);} else {log.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response);}}//...

    后臺(tái)會(huì)開(kāi)啟一個(gè)線程來(lái)掃描超時(shí)任務(wù),主要是遍歷上述存儲(chǔ)MqttFuture的Map,如果對(duì)應(yīng)的MqttFuture并沒(méi)有完成(獲得對(duì)應(yīng)的response)且調(diào)用時(shí)間已超過(guò)設(shè)置的超時(shí)時(shí)間,則設(shè)置一個(gè)超時(shí)異常的MqttResponse并調(diào)用MqttFuture.received(timeoutResponse);來(lái)喚醒上述代碼中的done.await(timeout, TimeUnit.MILLISECONDS);的線程等待。該線程每30s會(huì)遍歷一個(gè)存儲(chǔ)MqttFuture的Map。

    @Slf4j @Data public class MqttFuture {//...private static class RemotingInvocationTimeoutScan implements Runnable {@Overridepublic void run() {while (true) {try {for (MqttFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {//當(dāng)前mqtt請(qǐng)求已超時(shí)MqttResponse timeoutResponse = new MqttResponse(future.getMId());timeoutResponse.setMStatus(MqttResponse.TIMEOUT);MqttFuture.received(timeoutResponse);}}//每30ms掃一次Thread.sleep(30);} catch (Throwable e) {log.error("Exception when scan the timeout invocation of remoting.", e);}}}}static {Thread th = new Thread(new RemotingInvocationTimeoutScan(), "MqttResponseTimeoutScanTimer");th.setDaemon(true);th.start();}//...

    為了處理正常的mqtt的消息返回,除了使用上一節(jié)中講到的自定義注解MqttController來(lái)訂閱相關(guān)的Topic,還需要訂閱特殊的一個(gè)Topic來(lái)處理mqtt同步調(diào)用的返回消息,訂閱代碼如下,根據(jù)收到的mId和messageBody設(shè)置對(duì)應(yīng)的MqttResponse并調(diào)用MqttFuture.received(response);來(lái)喚醒上述代碼中的done.await(timeout, TimeUnit.MILLISECONDS);的線程等待。

    //訂閱偽同步請(qǐng)求回復(fù) log.info("Add a new rocketMq subscription,topic:{}", replyParentTopic); mqConsumer.subscribe(replyParentTopic, "*", (message, context) -> {String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {mqttSecondTopic = "";}String mId = mqttSecondTopic.replaceAll("/", "");String messageBody = new String(message.getBody());MqttResponse response = new MqttResponse(mId);response.setMessageResult(messageBody);MqttFuture.received(response);return Action.CommitMessage; });

    五、demo使用

    1. 項(xiàng)目結(jié)構(gòu)

    由于使用了springboot框架來(lái)實(shí)現(xiàn)該demo,所以項(xiàng)目結(jié)構(gòu)如下:

    demo項(xiàng)目結(jié)

    其中mqtt工具包目錄如下:

    mqtt工具包結(jié)構(gòu)

    2. mqtt工具包的使用

    2.1 在yml配置中添加相關(guān)配置

    配置示例如下,其中xxx改為自己使用的即可

    ali:mqtt:accessKey: xxxsecretKey: xxxgroupId: xxxnamesrvAddr: xxxsendMsgTimeoutMillis: 3000#消費(fèi)者線程固定位50個(gè)consumeThreadNums: 50 # 用于同步調(diào)用返回發(fā)送的topicreplyParentTopic: xxx

    2.2 添加工具包中的MqttConfig

    @Import({ MqttConfig.class}) @Configuration public class MqttConfigure {}

    2.3 自定義注解的使用

    @Slf4j @MqttController(parentTopic = "robot1") public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只處理了一級(jí)topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");} }

    2.4 測(cè)試同步調(diào)用,模擬MQTT客戶(hù)端消息返回代碼

    mqtt客戶(hù)端實(shí)現(xiàn)代碼示例參考阿里云官方demo https://github.com/AliwareMQ/lmq-demo

    其中xxx的地方都改成自己的即可,下面代碼中mqttClient2.publish(replyTopic, message);即將結(jié)果發(fā)送到replyTopic中。

    public class MqttClientTest {public static void main(String[] args) throws Exception {String instanceId = "xxx";String endPoint = "xxx";String accessKey = "xxx";String secretKey = "xxx";String clientId = "xxx";final String parentTopic = "xxx";//這邊需自定義mqtt客戶(hù)端topic,final String mq4IotTopic = parentTopic + "/" + "xxx" + "/xxx";final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey,clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);final MqttClient mqttClient2 = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客戶(hù)端設(shè)置好發(fā)送超時(shí)時(shí)間,防止無(wú)限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客戶(hù)端連接成功后就需要盡快訂閱需要的 topic*/System.out.println("connect success");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = { mq4IotTopic };final int[] qos = { qosLevel };mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));JSONObject jsonObject = JSON.parseObject(new String(mqttMessage.getPayload()));String mId = jsonObject.getString("mId");String replyTopic = jsonObject.getString("replyTopic");String result = mId + "回復(fù)啦";MqttMessage message = new MqttMessage(result.getBytes());message.setQos(qosLevel);//這邊會(huì)將結(jié)果發(fā)送到replyTopic中mqttClient2.publish(replyTopic, message);System.out.println("發(fā)送啦");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());mqttClient2.connect(connectionOptionWrapper.getMqttConnectOptions());Thread.sleep(Long.MAX_VALUE);} }

    3. demo體驗(yàn)

    啟動(dòng)項(xiàng)目可以在控制臺(tái)看到有如下日志:

    MQTT處理Topic映射日志

    MQTT訂閱Topic日志

    從日志中可以看出程序自動(dòng)處理了自定義注解的Mqtt消息處理的映射,并根據(jù)mqtt的1級(jí)Topic進(jìn)行了RocketMQ的相關(guān)訂閱。

    提供了一個(gè)測(cè)試MQTT消息簡(jiǎn)單發(fā)送接口如下:

    @GetMapping("/publish") public String doPublish(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publish(topic, message);} catch (ONSClientException e) {return e.getMessage();} }

    使用 http://localhost:8080/mqtt/publish?topic=robot1/alarm&message={id:"1",code:"heheh"}來(lái)調(diào)用該接口。

    可以在控制臺(tái)看到如下日志打印:

    MQTT訂閱的消息日志

    由于本demo中使用自定義注解訂閱了該Topic,所以調(diào)用該接口發(fā)送消息之后也會(huì)被本demo成功接收,并分發(fā)到對(duì)應(yīng)的處理函數(shù)中,因此調(diào)用了該接口后可以在控制臺(tái)看到如上日志打印,可以看到MQTT消息成功發(fā)布,訂閱到該消息并實(shí)現(xiàn)了MQTT消息的處理分發(fā)。

    提供了一個(gè)測(cè)試MQTT同步調(diào)用的接口如下:

    @GetMapping("/publish-sync")public String publishSync(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publishSync(topic, message, 5000);} catch (MqttRemoteException e) {return e.getMessage();} }

    使用http://localhost:8080/mqtt/publish-sync?topic=robot1/alarm/GID_ROBOT@@@DEVICEID_001&message=hehehe來(lái)調(diào)用接口。

    可以看到下圖的結(jié)果:

    MQTT同步調(diào)用超時(shí)

    這是MQTT同步調(diào)用超時(shí)的情況,因?yàn)榇藭r(shí)還沒(méi)有開(kāi)啟對(duì)應(yīng)的MQTT客戶(hù)端,因此發(fā)送的MQTT消息并沒(méi)有客戶(hù)端進(jìn)行回應(yīng),所以出現(xiàn)了調(diào)用超時(shí)的情況,如果運(yùn)行上述模擬MQTT客戶(hù)端消息返回的代碼后再次調(diào)用該接口,可以看到同步調(diào)用成功返回了對(duì)應(yīng)的結(jié)果,如圖所示。

    MQTT同步調(diào)用成功返回

    源碼地址如下,僅供學(xué)習(xí)參考

    DavidDingXu/panda-mqtt?github.com

    六、參考

    • 阿里云微消息隊(duì)列 MQTT幫助文檔
    • 阿里云消息隊(duì)列 RocketMQ幫助文檔

    本文原創(chuàng),歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明出處,如有不正確的地方懇請(qǐng)各位看官指正。

    總結(jié)

    以上是生活随笔為你收集整理的mqtt 发送消息过多_阿里云MQTT服务端注解式消息处理分发与同步调用实践小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    日韩精品一区二区在线观看视频 | 国产中文字幕在线看 | 免费看黄色毛片 | 九九精品视频在线观看 | 午夜影院在线观看18 | 日韩免费av在线 | 国产精品嫩草55av | 亚洲成人av在线播放 | 久久国产精品99精国产 | 日韩精品中文字幕有码 | 久久香蕉电影网 | 久久久久久高潮国产精品视 | 97精品国产97久久久久久久久久久久 | 天天射天天爽 | 亚洲理论片在线观看 | 在线高清 | 91视频专区| 国产精品婷婷 | 久久久久二区 | 欧美日韩中文国产 | 国产91精品久久久久久 | 国产一区视频免费在线观看 | 青青河边草免费直播 | 成人午夜精品久久久久久久3d | 国产精品一区二区三区视频免费 | 婷婷中文字幕综合 | 成人国产精品入口 | 日日夜夜狠狠干 | 精品a视频 | 国产精品乱看 | 9在线观看免费高清完整版在线观看明 | 成人久久18免费网站麻豆 | 国产一级做a | 国产精品久久久久久吹潮天美传媒 | 中文字幕电影高清在线观看 | av激情五月 | 日韩国产在线观看 | 在线观看深夜视频 | 精品一区 在线 | 久久成人免费电影 | 久久久久久高潮国产精品视 | 日韩大片在线观看 | 99精品在线 | 亚洲精品在线二区 | 日韩在线观 | 黄色av在 | 欧美激情精品久久久久久变态 | 婷婷www | 日韩激情精品 | 午夜.dj高清免费观看视频 | 少妇高潮冒白浆 | 国产精品久久久区三区天天噜 | 国产伦理久久精品久久久久_ | 欧美日韩午夜爽爽 | 午夜视频免费 | 婷婷av色综合 | 久久久久久福利 | 成人cosplay福利网站 | 黄色美女免费网站 | 国产高清精 | 亚洲精品视频久久 | 久久久久久久免费 | 99操视频| 超碰在线最新地址 | 国产精品亚洲视频 | 欧美成人精品欧美一级乱黄 | 日韩精品一区二区三区中文字幕 | 四虎成人免费影院 | 国产精品久久久久久久妇 | 探花视频免费观看 | 国产专区欧美专区 | 色播五月婷婷 | 亚洲精品在线免费看 | 在线一区二区三区 | 免费一级片久久 | av短片在线观看 | 午夜国产福利在线 | 日韩视频一区二区三区在线播放免费观看 | 波多野结衣电影久久 | 成人高清在线 | 日韩免 | 亚洲视频综合在线 | 五月开心婷婷 | 日本精品xxxx| av最新资源 | 国产精品无av码在线观看 | 精品国产一区二区在线 | 久久久久综合网 | 丁香资源影视免费观看 | 久久久福利 | 97视频免费 | 久久国产精品偷 | 婷婷综合 | 国产精品99久久久久的智能播放 | 亚洲精品视频中文字幕 | 久久免费在线 | 免费观看91视频大全 | 97超碰影视| 日韩成人在线免费观看 | 色资源二区在线视频 | 久久免费视频在线观看6 | 国产精品久久三 | 一级国产视频 | 欧美a级片免费看 | 视频91| 青青河边草免费直播 | 在线观看亚洲专区 | 天天操天天干天天插 | 久久国产精品99久久久久 | 国产99久久九九精品免费 | 一区二区欧美在线观看 | 亚洲女欲精品久久久久久久18 | 免费日韩一级片 | 狠狠做六月爱婷婷综合aⅴ 日本高清免费中文字幕 | 欧美激情亚洲综合 | 99精品免费视频 | 91精品久久久久久综合乱菊 | 国产视频18 | 五月天婷婷丁香花 | 国产99久久久精品 | 综合网久久 | 国产一级在线观看 | 久久99久久99精品免费看小说 | 成人av在线影视 | 亚洲三级视频 | 精品在线观看一区二区 | 久久精品超碰 | av在线免费观看网站 | 四虎成人精品在永久免费 | 国产亚洲精品久久久久久久久久久久 | 日韩高清在线观看 | 看片网站黄色 | 国产精品日韩久久久久 | 91av短视频 | 亚州中文av| 片黄色毛片黄色毛片 | 中文字幕av有码 | 91在线视频导航 | 波多野结衣电影一区二区三区 | 黄网站色欧美视频 | 国产精品福利视频 | 国产精品视频资源 | 亚洲一区久久 | 视频一区视频二区在线观看 | 成 人 免费 黄 色 视频 | www.com久久 | 久久久久久综合 | 麻豆系列在线观看 | 国产精品免费观看久久 | 91麻豆看国产在线紧急地址 | 欧美性生爱 | 92国产精品久久久久首页 | 免费国产在线观看 | 91色蜜桃 | 免费午夜在线视频 | 在线观看91久久久久久 | 深夜精品福利 | 黄色a级片在线观看 | 97在线观 | 日韩av看片| 亚洲作爱视频 | 波多野结衣一区三区 | 看片一区二区三区 | 欧美久久久影院 | 欧美日比视频 | 国产精品视频内 | 国产精品一区二 | 天天操天天射天天插 | 97日日碰人人模人人澡分享吧 | 久久色亚洲 | 国产手机在线视频 | 天天激情综合 | 日精品 | 成人小视频在线观看免费 | 国产自在线观看 | 免费特级黄色片 | 久久免费看视频 | 在线 国产 日韩 | 久久不卡av | 嫩草av影院 | 国产免费一区二区三区网站免费 | 爱情影院aqdy鲁丝片二区 | 一本色道久久精品 | 亚洲国产精品va在线 | 国产麻豆精品久久 | 精品国产电影一区二区 | 91亚洲夫妻 | 三级动态视频在线观看 | 亚洲视频aaa | 波多野结衣最新 | 欧美久久久久久久 | 国语精品视频 | 欧美黑人xxxx猛性大交 | 69亚洲视频 | 日本巨乳在线 | 国产精品一区电影 | 99热超碰 | 日韩精品欧美专区 | 伊人电影天堂 | 欧美一区在线观看视频 | 久久综合影音 | www.日韩免费 | 超碰日韩 | 日韩精品在线一区 | 亚洲春色综合另类校园电影 | 国产999视频| 亚洲免费精彩视频 | a黄色片在线观看 | 国产成人在线免费观看 | 黄色精品国产 | 久久免费在线观看视频 | 日韩av不卡在线 | 国产麻豆果冻传媒在线观看 | 国产成人精品久久久 | 黄色成人小视频 | 国产资源免费 | 99久久精品一区二区成人 | 一区二区三区四区五区六区 | 黄色在线免费观看网址 | 婷婷综合亚洲 | 91av中文字幕 | 久久99热精品| 欧美a级在线 | 99re在线视频观看 | 天天天天色综合 | 超碰在线人人草 | 久久免费美女视频 | 日韩av在线网站 | 亚洲精品午夜国产va久久成人 | 国产黄在线免费观看 | 狠狠操影视| 亚洲涩涩网| 欧美激情片在线观看 | 免费日韩一区二区三区 | 国产中文字幕久久 | 久久久久久久久毛片 | 综合色亚洲 | 成人a级大片 | 97理论电影 | 在线探花| 日本婷婷色 | 欧美日韩一区二区免费在线观看 | 久久久久国产精品午夜一区 | 国产精品一区一区三区 | 国产亚洲在线观看 | avove黑丝| 69精品视频 | 国产成人精品免高潮在线观看 | 久久久久久久免费 | 日韩在线观看三区 | 狠狠操狠狠操 | 国产又粗又硬又爽的视频 | 国产永久网站 | 久久激情视频免费观看 | 成人免费中文字幕 | 成人毛片一区 | 成年人看片网站 | 色婷婷狠狠五月综合天色拍 | 99re8这里有精品热视频免费 | 干干夜夜 | 国产精品久久三 | 视频一区亚洲 | 成人久久18免费网站麻豆 | 亚洲成av人片在线观看香蕉 | 青青草视频精品 | 99免费精品 | 国产人成一区二区三区影院 | 久草av在线播放 | 日韩欧美在线影院 | 在线有码中文 | 91麻豆免费版| 五月婷婷视频在线观看 | 69国产精品视频 | 久久黄色免费视频 | 黄色大片网 | 国产精品18久久久久久不卡孕妇 | 国产精品免费小视频 | 91视频在线 | 欧美精品在线观看免费 | 久久国产成人午夜av影院宅 | 91av视频在线观看免费 | 国产人成在线视频 | 九草视频在线 | 日韩黄色影院 | 日韩美女av在线 | 天天操天天操天天干 | 久久成人免费视频 | 亚洲精品久久久久999中文字幕 | av午夜电影 | 日韩精品一区二区在线 | 69人人| 人人擦| 69视频在线 | 日韩精品一区二区三区电影 | 国产一区在线观看免费 | 99久久久久久久久久 | 亚洲欧美乱综合图片区小说区 | 国产另类xxxxhd高清 | 国产精品乱码久久久久久1区2区 | 日韩综合第一页 | 97国产情侣爱久久免费观看 | 五月天天av | 人人爽人人爽人人片av免 | 亚洲午夜久久久久久久久电影网 | 国产香蕉97碰碰碰视频在线观看 | 不卡国产视频 | 精品国产一区二区三区久久久蜜臀 | 久久婷婷精品 | 精品国偷自产在线 | av免费观看在线 | 69国产盗摄一区二区三区五区 | 日韩特级片 | 亚洲成人av电影在线 | 999久久久国产精品 高清av免费观看 | 黄色在线观看免费网站 | 久久成视频 | 国产一区在线免费 | 久草网在线观看 | 亚洲午夜剧场 | 久久久免费在线观看 | 免费看国产a | 欧美做受高潮 | av片中文| 五月花激情 | 久久字幕精品一区 | 久久精品3| 日韩av影视在线观看 | 国产午夜精品一区二区三区嫩草 | 天天色棕合合合合合合 | 午夜 在线 | 久久中文字幕在线视频 | 国产精品日韩欧美一区二区 | 中文字幕亚洲字幕 | 久久国语 | 日本高清dvd | 日韩视频在线不卡 | 日韩欧美69 | 日本在线精品视频 | 免费看网站在线 | 91九色porny蝌蚪视频 | 国产一级精品绿帽视频 | 99久久久久久久久 | 国产色视频网站 | 狠狠狠色丁香综合久久天下网 | 免费h漫在线观看 | 日韩性片 | 天天操夜夜操夜夜操 | www亚洲精品 | 人人搞人人干 | 观看免费av | 色多多视频在线观看 | 亚洲在线精品视频 | 午夜性盈盈 | 最新av网址在线 | 性色视频在线 | 国产伦理久久精品久久久久_ | 天天射综合网视频 | 99r在线观看 | 99热日本| 中文字幕在线视频一区二区 | 欧美一级视频免费 | 日韩av不卡在线观看 | 久久精品91久久久久久再现 | 99在线高清视频在线播放 | 久久男女视频 | 免费观看的黄色片 | 亚洲精品日韩在线观看 | www.五月激情.com | av黄色大片 | 国产偷国产偷亚洲清高 | 免费视频资源 | 国产高清日韩 | 日韩高清二区 | 日韩一区二区三区免费电影 | 麻豆视频国产在线观看 | 韩国在线一区 | 精品一区二区在线观看 | 国产精品嫩草69影院 | 美女久久 | avove黑丝| 国产精品免费观看视频 | 欧美日韩国产精品一区二区 | 国产在线毛片 | 日韩av在线免费播放 | 久久久久久免费网 | 欧美一区二区三区在线播放 | 国产一级在线视频 | 婷婷视频在线播放 | 婷婷丁香av | 亚洲成人第一区 | 亚洲精品中文字幕在线观看 | 天天干天天做 | 天堂视频中文在线 | 一区二区三区影院 | 欧美日韩在线免费视频 | 亚洲天堂精品视频 | 久久99网站 | 992tv人人网tv亚洲精品 | 婷婷在线网站 | 午夜91视频 | 亚洲欧美日韩国产精品一区午夜 | 性色av免费看 | 欧洲成人av | 91成人免费观看视频 | 国产成人av电影在线观看 | 97人人爽 | 一区二区三区动漫 | 精品国产精品久久 | 九九热视频在线 | 国产录像在线观看 | 亚洲高清视频一区二区三区 | 激情在线网站 | 日韩精品 在线视频 | 久久久久久久久国产 | 国产剧在线观看片 | 免费观看av网站 | 日韩欧美在线综合网 | 久久久精品国产一区二区电影四季 | 欧美精品久久人人躁人人爽 | 在线观看韩国av | 国产精品久久久久久高潮 | a天堂中文在线 | 久久成人高清视频 | 国产黄色a | 欧美日韩不卡一区 | 久久91久久久久麻豆精品 | 日韩理论在线 | 久久精品国产一区二区电影 | 国产又粗又猛又爽 | 色在线高清 | av在线不卡观看 | 国产精品日韩在线观看 | 丁香九月激情 | 四虎永久精品在线 | 天堂av在线7| 毛片的网址 | 国内久久久久久 | 丁香婷婷色综合亚洲电影 | 在线观看国产麻豆 | 99re8这里有精品热视频免费 | 久久久在线观看 | 国产视频日韩视频欧美视频 | 国产精品永久久久久久久久久 | 一区二区三区 亚洲 | 国产精品一区在线观看你懂的 | 国产精品久久久久久影院 | 欧美91精品久久久久国产性生爱 | 欧美日韩另类在线 | 国产精品久久 | 亚洲天天草 | av免费在线观看网站 | 国产精品久久久久永久免费看 | 啪啪资源 | 欧美午夜一区二区福利视频 | 久久久精品 | 日韩欧美电影 | 国产剧情在线一区 | 一区二区三区电影在线播 | 色综合久久久久综合体 | 人人插人人舔 | 在线观看亚洲精品 | 在线看一级片 | 99热这里只有精品8 久久综合毛片 | 五月黄色| 欧美成人亚洲成人 | 丁香六月婷婷激情 | 日日干激情五月 | 久久草网站 | 国产欧美精品一区二区三区 | 免费看的黄色小视频 | 久久爽久久爽久久av东京爽 | 亚洲欧美视频一区二区三区 | 亚洲一级片在线看 | 亚洲综合在线发布 | 在线观看中文字幕视频 | 欧美一区在线看 | 麻豆影视在线观看 | 免费三级在线 | 亚洲有 在线 | 久插视频 | 精品欧美小视频在线观看 | 亚洲视频456 | 播五月综合 | 国产精品久久久免费看 | 久久久91精品国产一区二区三区 | 天天操天天摸天天射 | 国产精品久久久久久久久久 | 色婷婷丁香 | 97国产超碰在线 | 视频二区在线视频 | 亚洲欧美观看 | 成年人在线免费看片 | 天天射天天爱天天干 | 日韩网站视频 | 久久久在线免费观看 | 99久久精| 日韩三级中文字幕 | 成人h视频在线 | 天天操综| 久久精品亚洲综合专区 | 国产精品永久久久久久久www | 免费高清在线视频一区· | 91丨精品丨蝌蚪丨白丝jk | 99久久久成人国产精品 | 999精品在线| 在线观看激情av | 国产精品久久久久一区二区三区共 | 中文字幕永久免费 | 日韩三级视频在线观看 | 久草在线费播放视频 | 日韩欧美一区二区三区视频 | 久久电影网站中文字幕 | 亚洲免费永久精品国产 | 久久久三级视频 | 成人羞羞视频在线观看免费 | 欧美激情视频三区 | www.夜夜夜 | 激情文学综合丁香 | 国产色拍拍拍拍在线精品 | 久久人人插 | 九九视频一区 | 免费在线一区二区三区 | 日韩激情视频在线观看 | 黄色av成人在线观看 | 国产美女在线精品免费观看 | 黄在线免费看 | 久久激情五月丁香伊人 | 日韩中文免费视频 | 日本护士三级少妇三级999 | 国产精品 欧美 日韩 | 91av超碰| 婷婷色影院| 色婷婷综合在线 | 免费在线观看a v | 国产成免费视频 | 欧美成人91 | 色吊丝在线永久观看最新版本 | 免费电影一区二区三区 | 亚洲精品国产第一综合99久久 | 欧美少妇18p| 在线免费视频你懂的 | 久久精品国产免费看久久精品 | 黄色毛片一级片 | 在线黄色毛片 | 在线黄色av电影 | www久久九 | 中文免费在线观看 | 就操操久久 | 黄在线| 国产精品久久久久久久久久久久午夜 | 亚洲精品大全 | 天天天干天天射天天天操 | 国产手机在线精品 | 免费中午字幕无吗 | 成人av免费播放 | 超碰在线免费福利 | 高清精品久久 | 99久久99久国产黄毛片 | 91精彩在线视频 | 伊人天天狠天天添日日拍 | 免费观看一级一片 | 国产精品免费久久久久影院仙踪林 | 手机av永久免费 | 丁香花在线视频观看免费 | 欧美精品久久久久性色 | 97国产人人 | 天天操操 | 免费日韩电影 | 日韩在线视频播放 | www.com久久久| 免费看片网页 | 最近免费在线观看 | 天天操婷婷 | 国产精品亚洲综合久久 | 亚洲永久av | 伊人久在线 | 久久久久久久久久久网 | 黄色99视频| 91麻豆精品国产午夜天堂 | 伊人伊成久久人综合网小说 | 欧美精品乱码久久久久久 | 欧美另类高清 | 国产一区二区三区午夜 | 91视频高清 | 久久9精品| 狠狠干狠狠久久 | www日日夜夜 | 天天射天天干天天爽 | 最近中文字幕高清字幕在线视频 | 国内久久精品 | 69夜色精品国产69乱 | 天天射天天爱天天干 | 欧美特一级片 | 久久久久一区二区三区 | 国产成人一区二区三区影院在线 | 色搞搞| 青草草在线 | 成人久久国产 | 天天操夜夜干 | 中文字幕在线不卡国产视频 | av观看网站| 玖操| 国产精品久久久久久久电影 | 91.麻豆视频| 日韩欧美在线免费观看 | 久久久国产99久久国产一 | 99超碰在线观看 | 亚洲色五月| 日日摸日日添夜夜爽97 | 99爱精品视频 | 成人黄色电影在线播放 | 亚洲视频h| 69国产盗摄一区二区三区五区 | 国产久草在线 | 天天爽人人爽 | 99精品国产高清在线观看 | 中文字幕一区二区三区四区视频 | 国产精品高潮久久av | 又黄又爽又色无遮挡免费 | 日本xxxx.com | 最近最新mv字幕免费观看 | 激情综合久久 | 在线观看mv的中文字幕网站 | 国产精品高清在线观看 | 91av在线视频免费观看 | 精品久久福利 | 欧美一级性 | 亚洲天堂网视频 | 久久无码精品一区二区三区 | 日韩在线视 | 伊人色综合久久天天 | 国产女人40精品一区毛片视频 | 狠狠躁日日躁狂躁夜夜躁 | 国产淫片 | 激情综合五月天 | 久久久在线视频 | 免费高清在线观看电视网站 | 国产精品美女久久久久久久久 | 国产破处视频在线播放 | 日日干网址 | 中文字幕在线观看完整版电影 | 国产91勾搭技师精品 | 日韩精品高清视频 | 日韩高清一区在线 | 九九欧美| 91在线播放视频 | 成人免费在线观看入口 | 天天操天天色综合 | 中文字幕精品一区二区三区电影 | 久久av伊人 | 亚洲欧洲美洲av | 国产精品久久久久久吹潮天美传媒 | 色综合久久中文综合久久牛 | 亚洲欧美成aⅴ人在线观看 四虎在线观看 | 在线观看视频日韩 | 国产精品21区 | 99精品久久久 | 免费看黄的 | www.国产在线观看 | 国产第一福利 | 亚洲综合激情小说 | 久久999精品 | 精品国产一区二区三区久久久蜜臀 | 99久久精品免费看国产一区二区三区 | 午夜国产一区二区三区四区 | 欧美日本不卡高清 | 久久中文字幕视频 | 毛片3| 欧美成人一区二区 | 免费视频网 | 公开超碰在线 | 日韩久久久久久久 | 精品久久久久一区二区国产 | 欧美日韩性生活 | 日韩高清在线不卡 | 欧美一级日韩免费不卡 | 国产精品99久久久久 | 免费在线成人av电影 | 天天射天天爽 | 欧美尹人 | 亚洲精品永久免费视频 | 午夜美女网站 | 国产在线观看,日本 | 久久综合9988久久爱 | 五月天久久狠狠 | 久久看免费视频 | 国产精品一区二区久久精品爱微奶 | 婷婷激情在线观看 | 久久亚洲私人国产精品 | 国产精品区二区三区日本 | 亚洲乱码久久久 | 亚州国产视频 | 91av大全 | 国产精品久久久久久久免费 | 天天干天天草 | 欧美日韩国产一二 | 国产精品一区二区三区免费视频 | 天天天干天天射天天天操 | 麻花天美星空视频 | 久久在线精品视频 | www免费视频com━ | 国产成人av网站 | 欧美日韩在线观看一区二区三区 | 日本aaa在线观看 | 精品国产91亚洲一区二区三区www | 黄色小说视频网站 | 婷婷看片| 久久成人精品视频 | 亚洲精品一区二区三区新线路 | a视频在线观看免费 | 精品国产一区二区三区久久久蜜月 | 四虎免费在线观看视频 | 在线观看亚洲精品视频 | 91桃色在线观看视频 | 中文字幕第一页在线播放 | 久久伊99综合婷婷久久伊 | 99热超碰 | 久久久久国产成人精品亚洲午夜 | 黄色在线看网站 | 亚洲精品一区中文字幕乱码 | 91免费网址 | 免费成人av在线看 | 黄污污网站 | 国产综合片 | 99久久日韩精品免费热麻豆美女 | 二区三区精品 | 成人黄色免费观看 | 久久久久麻豆v国产 | 黄色一级大片免费看 | 亚洲欧美偷拍另类 | 日韩精品视频免费看 | 欧美一二三区在线观看 | 在线观看久| 成人国产精品 | 日韩在线激情 | 黄色三级在线看 | 色综合天天干 | 色www免费视频 | 99久久99| 久久这里有精品 | 亚洲九九 | 精品久久精品 | а天堂中文最新一区二区三区 | 韩日视频在线 | av动图| 国产在线观看a | 精精国产xxxx视频在线播放 | 综合五月婷婷 | 欧美日韩不卡一区二区 | 天天插日日操 | 黄色软件视频网站 | 国产区精品在线观看 | 久久久福利影院 | 亚洲成人精品 | 二区三区视频 | 久草在线资源观看 | 精品国产免费看 | 精品国模一区二区三区 | 精品国产人成亚洲区 | 国产做a爱一级久久 | 日本在线中文在线 | 亚洲三级国产 | 美女视频黄是免费的 | 一区二区欧美在线观看 | 婷婷av在线 | 婷婷国产视频 | 黄色一区二区在线观看 | 91精品国产高清自在线观看 | 在线观看亚洲国产 | 91在线视频一区 | 中文字幕在线观看网站 | 久久国产欧美日韩 | 色婷婷在线观看视频 | 精品久久久久久久久久久久久 | 天天插夜夜操 | 久久成人麻豆午夜电影 | 欧美一级淫片videoshd | 热久久影视| 三上悠亚一区二区在线观看 | 999视频在线观看 | 欧美久久久久久 | av夜夜操| 国产精品久久久久aaaa | 色a在线观看| 99久久精品国产系列 | 狠狠久久婷婷 | 国产小视频免费在线观看 | 97在线观看免费视频 | 国产精品日韩高清 | 97国产在线 | 黄色免费网 | 欧美三级高清 | 91成人免费在线 | 黄色软件视频网站 | 99电影| 久久国产视频网站 | 在线a人v观看视频 | 国产无套精品久久久久久 | 国产精品免费看久久久8精臀av | 99久久精品午夜一区二区小说 | 黄a网 | 国产精品久久久久久久久久免费 | 欧洲精品久久久久毛片完整版 | 少妇精品久久久一区二区免费 | 国产一级久久久 | 美女久久久 | 欧美男同网站 | 成人网在线免费视频 | 九九精品毛片 | 久久免费视频播放 | 色网站免费在线看 | 亚洲精品福利在线观看 | 97在线观 | 99热精品免费观看 | 久久精品视频免费 | 91成年人网站 | 久久伊人91 | 天天se天天cao天天干 | 99在线观看免费视频精品观看 | 在线国产精品视频 | 久久婷婷一区二区三区 | 国产夫妻自拍av | 亚洲精品一区二区三区四区高清 | 日韩一级黄色片 | 美女黄久久 | 在线精品国产 | 日本精品中文字幕 | 久久不卡国产精品一区二区 | 丁香色天天 | 欧美一级特黄aaaaaa大片在线观看 | 人人干狠狠操 | 98久久 | 久久免费视频2 | 91传媒免费在线观看 | 免费观看特级毛片 | 久久久av电影 | 91精品在线麻豆 | 天天爽天天爽天天爽 | 久久精品亚洲国产 | 天天操夜夜爱 | 激情伊人| 天天综合久久 | 色视频在线观看 | 97成人在线免费视频 | 亚洲最大免费成人网 | 狠狠色丁香婷婷综合久小说久 | 欧美男男激情videos | 粉嫩高清一区二区三区 | 亚洲三级在线免费观看 | 国产美女在线精品免费观看 | 久久久不卡影院 | 欧美久久精品 | 成人久久免费 | av在线专区 | bbw av| www.com.黄 | 五月激情视频 | 在线观看小视频 | 国产精品96久久久久久吹潮 | 久久国产精品第一页 | 一本一道久久a久久精品蜜桃 | 欧美性直播| 免费视频97 | 在线观看岛国av | 伊人网av| 91视频在线免费看 | 操操操操网 | 免费麻豆 | 日韩xxx视频 | 西西www4444大胆视频 | 亚洲国产免费看 | 国产淫片免费看 | 欧美日韩啪啪 | 国产精品视频在线观看 | 天天插视频 | 成人免费在线网 | 久草在线免费在线观看 | 久久人人爽爽人人爽人人片av | 97视频在线观看网址 | 一级免费观看 | 99久久er热在这里只有精品66 | 亚洲精品视频播放 | www蜜桃视频 | 久久久久国产成人免费精品免费 | 五月的婷婷 | 国产资源在线观看 | 欧美日韩xxxxx | 麻豆你懂的 | 国产亚洲精品久久久久动 | 亚洲乱码一区 | 一区二区三区中文字幕在线观看 | 有码中文在线 | 91丨九色丨蝌蚪丨对白 | 免费看黄在线 | 日本特黄一级片 | 日韩美在线 | 日韩精品国产一区 | 欧美在线视频a | 在线观看mv的中文字幕网站 | 成 人 a v天堂 | 久久久麻豆视频 | 正在播放国产一区二区 | 超碰在线公开免费 | 欧美日韩三级 | 999精品| 天天色天天操综合网 | 欧美-第1页-屁屁影院 | 亚洲伊人第一页 | 蜜臀av性久久久久蜜臀aⅴ流畅 | 亚洲 欧美 变态 国产 另类 | 蜜臀av夜夜澡人人爽人人桃色 | 亚洲影视资源 | 在线成人短视频 | 2023亚洲精品国偷拍自产在线 | 日韩有色| 国产精品视频不卡 | 久久久精品国产免费观看一区二区 | 在线看国产 | 日日干av| 91麻豆精品国产自产 | 精品久久久久久久久亚洲 | 色婷婷激情五月 | 日韩动漫免费观看高清完整版在线观看 | 正在播放国产精品 | 日韩夜夜爽 | 日本公乱妇视频 | 亚洲欧美日韩精品久久久 | 日韩最新av在线 | 日本一区二区三区免费看 | 狠狠干狠狠色 | 亚洲精品免费在线视频 | 人人爽人人 | 午夜精品一区二区三区在线 | 亚洲精品免费看 | 综合久久久 | 亚洲免费成人 | 欧美成人黄 | 狠狠操狠狠操 | 国产日本在线 | 国产精品精品国产色婷婷 | 91刺激视频 | 成人av电影在线 | av在线一级 | 国产1区2| www成人av | 国产正在播放 | 亚洲一区二区精品3399 | 国产精品21区| 激情丁香5月 | 久久精品福利视频 | 国产精品a久久久久 | 国产区第一页 | 中文字幕中文字幕在线中文字幕三区 | 午夜男人影院 | 久久涩涩网站 | 日本高清免费中文字幕 | 中文字幕视频免费观看 | 日韩高清一区在线 | 久久综合狠狠 | 亚洲欧美综合精品久久成人 | 亚洲播放一区 | 成年人黄色免费视频 | 国产视频在线观看免费 | 午夜av网站 | 日韩国产欧美在线视频 | 六月丁香久久 | 久久免费高清视频 | 日韩av专区 | 中文字幕.av.在线 | 国内久久精品 | 亚洲精品国偷拍自产在线观看 | 午夜精品一二区 | 国产精品一区二区免费在线观看 | 久久久蜜桃 | 激情久久综合网 | 国产精品视频最多的网站 | 9在线观看免费高清完整版在线观看明 | 国产综合精品久久 | 日韩动漫免费观看高清完整版在线观看 | 久久毛片视频 | 久久欧美综合 | av在线精品 | 99久久精品无免国产免费 | 婷婷草 | 日韩色区| 91福利视频久久久久 | 免费看片成年人 | 国产资源精品 | 亚洲专区一二三 | 国产精品免费在线播放 | 成人av电影网址 | 天堂资源在线观看视频 | 激情综合五月 | 丰满少妇在线观看网站 | 免费av影视| 国产不卡一 | 综合色播 | 国产视频精品久久 |