java与python之间的混合开发
java與python之間的混合開發
1. 項目需求
使用java進行進行數據庫的訪問,并對查詢出的數據進行的數據清理,使用python進行無監督分類
應用場景:
. 整個項目是BS架構,基于SpringBoot2.0
. 業務端主要是通過Java來實現后臺與數據庫的連接與交互
. 算法端主要通過python實現
實現的目標:
2.方案
可以通過udp或者tcp協議來傳遞數據
缺點:a. 不能異步通信;b.安全性不高;c.調試麻煩;d.數據容易丟失
通過網絡接口(Restful)的方式,調用python服務(通過http的POST請求中的Body將數據傳入,沒有測試過數據量)
通過消息中間鍵,來作為數據傳遞和消息傳遞的媒介,通過隊列的方式來傳遞數據,實現異步通信,并且通過中間鍵作為中介有管理系統,安全性高,有對應的管理界面調試比較簡單,能夠查看推送的數據是什么;使用隊列來作為日志信息的輸入和輸出;
-
最終確定的方案:選用方案二
使用消息中間鍵的好處: -
數據不容易丟
-
能夠實現異步通信
異步方式指消息發送方在發送消息時不必知道接收方的狀態,更無需等待接收方的回復,而接收方在收到消息時也不必知道發送方的目前狀態,更無需進行同步的消息處理,它們之間的連接完全是松耦合的,通信是非阻塞的,這種異步通信方式是由消息中間件中的消息隊列及其服務機制保障的
-
能夠消峰(通過隊列的形式,防止一直訪問服務)
用消息隊列來緩沖瞬時流量,把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去。
3.技術選型
Java端使用springboot2.0框架來搭建服務,使用MyBattisPlus操作數據庫
消息中間鍵:本文針對的數據量不算特別大,也不需要很高的吞吐量,所以選用穩定且功能完備,spring集成的ActiveMQ
python端使用stomp消息傳輸框架來對ActiveMq的消息隊列進行監督
4. 代碼實現
SpringBoot2.0集成ActiveMq
添加pom文件
<!--ActiveMq--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>1.5.0.RELEASE</version> </dependency> <!--消息隊列連接池--> <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.0</version> </dependency>構建一個生產者服務
java和ActiveMq相關的配置文件
- 通過yml文件配置
- config配置,注意**@Bean**注解的是對象的初始化函數,后面使用Autoware注入的時候,會自動的調用這個初始化函數,生成對應的對象
- 生產消息
- 主函數,啟動服務
構建一個消費者服務
-
yml配置和生產者一樣
-
config配置(無,只需要指定需要監聽得隊列即可)
-
注冊消費者監聽消息隊列
package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;import javax.jms.JMSException; import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component public class Queue_Consumer {//監聽注冊,指定需要監聽得隊列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消費者收到消息***"+textMessage);} }注意在這個過程中,python通過stomp協議在進行隊列消息得傳輸過程中使用的二進制流數進行傳輸的,所以,在java端接收python返回的數據的時候,需要將二進制流先轉換為字符串,所以此處不能使用textmessage來定義接收的消息,而是使用BytesMessage來定義接收到的消息
package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;import javax.jms.JMSException; import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component public class Queue_Consumer {//監聽注冊,指定需要監聽得隊列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消費者收到消息***"+textMessage);} }
python端往隊列中發送消息和監聽隊列中的消息
使用stomp消息傳輸協議
在進行消息消費的時候,采用兩種模式
-
訓練模式,只消費對應的隊列中最新的消息
-
推斷模式,將隊列中的數據來一條處理一條
4.傳輸的消息的格式
- java生產者生產消息的格式如下:
數據以JSON字符串的形式進行傳輸
-
Java端接收數據并解析數據的方式
消費者的代碼
接收的數據格式為BytesMessage
@JmsListener(destination = "activemq-queue-result")@Transactionalpublic void recieve(BytesMessage bytesMessage) throws JMSException {byte[] bys = null;//解析二進制數據為字符串數據bys = new byte[(int) bytesMessage.getBodyLength()];bytesMessage.readBytes(bys);Gson gson = new Gson();//將數據轉換為指定格式的類,Map.class或者某個結構體Map map = gson.fromJson(new String(bys),Map.class);System.out.println("***消費者收到消息***"+map.toString());System.out.println("***消費者處理消息結果***------>>" + result);}生產者業務代碼
業務實現,通過yaml的配置,來找到需要加載的表名和字段,根據約定的數據格式,轉換為json發送給MQ消息隊列
工具:Yaml2Gson,用來將yaml文件轉換為gson格式()、Gson
public void send2Mq(String queryTable) {Map featureMap = new HashMap();Map configMap = (Map) Yaml2Gson.yamlToMap(zcYml).get(queryTable); // System.out.println(configMap.keySet().toString());featureMap.put("config",configMap );StringBuilder conditionBuilder = new StringBuilder();String features = configMap.keySet().toString();StringBuilder columnsBuilder = new StringBuilder();//拼湊出各個列的查詢//查詢的特征項為空的,依據其是否為連續值為其設定初始值None or 0for(String feature: features.substring(1,features.length()-1).split(", ")){Map featsMap = (Map)configMap.get(feature);if((Integer)featsMap.get("isContinuous") == 0) columnsBuilder.append("nvl(to_char("+feature+"),'None') "+feature);else columnsBuilder.append("nvl("+feature+",0) "+feature);conditionBuilder.append(feature+" is not null or ");columnsBuilder.append(", ");}String columns = columnsBuilder.substring(0,columnsBuilder.length()-2);String condition = conditionBuilder.substring(0,conditionBuilder.length()-3);List mess = combineQueryMapper.getFeatures(columns,queryTable,condition);//featureMap.put("data",mess);Map resMap = new HashMap();resMap.put(queryTable,featureMap);//數據格式如下produceQueue.produceTrainMsg(new Gson().toJson(resMap));}
5.python解析數據的方式
如果是二進制的數據流就直接傳輸,如果是其他形式的數據,通過json.dumps將數據轉為json字符串進行傳輸,并在java中進行解析
if not isinstance(msg, bytes):msg_json = json.dumps(msg)# 二進制流直接傳輸? else:msg_json = msg con.connect(username='admin', passcode='admin', wait=True) ##通過這個函數發送數據的時候,是自動會轉換為二進制數據流的(轉換為二進制數據流的json格式的字符串) con.send(destination=queue_name, body=msg_json, content_type='text/plain')6.總結
在不同架構的代碼之間傳輸傳遞不同結構的結構體的數據,使用JSON字符串能夠最大程度的保留數據的原本形式。配合使用消息中間鍵通過數據流就能夠很好的將兩種不同的語言進行組合使用,發揮他們各自的優勢,并且規定好特定的數據格式之后,還能夠很容易的將兩塊的功能進行松耦合并且達到不同模塊復用的目的。同時在使用的過程中一定要注意處理好數據的類型和數據為空的情況(根據實際的數據處理業務來將為空的數據填寫入合適的值),無論是python還是java,因為null類型一般都是使用object這種大的類來向下兼容的,傳遞過來的數字都被轉換為object類型的數據。最后第一次完全原創的寫一篇自己的開發總結,希望大家能夠留言指正以及私信討論!
總結
以上是生活随笔為你收集整理的java与python之间的混合开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: git 报错
- 下一篇: websocket python爬虫_p