日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

java与python之间的混合开发

發布時間:2024/3/24 python 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java与python之间的混合开发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

java與python之間的混合開發

1. 項目需求

使用java進行進行數據庫的訪問,并對查詢出的數據進行的數據清理,使用python進行無監督分類

應用場景:

. 整個項目是BS架構,基于SpringBoot2.0

. 業務端主要是通過Java來實現后臺與數據庫的連接與交互

. 算法端主要通過python實現

實現的目標:

  • Java通過多線程任務調度定時完成任務調度,完成對數據庫中的某一數據進行聚類并生成對應的聚類模型,聚類和模型生成由Python實現;
  • 通過前端調用Resful接口來調取后端的接口,再通過java來與python交互,實現模型的調用來完成新數據的推理;
  • 整個過程python不涉及查數據庫,Java查數據庫之后實現數據的清洗和預處理,按照約定的格式傳遞給python來進行數據的算法實現
  • 2.方案

  • 可以通過udp或者tcp協議來傳遞數據

    缺點:a. 不能異步通信;b.安全性不高;c.調試麻煩;d.數據容易丟失

  • 通過網絡接口(Restful)的方式,調用python服務(通過http的POST請求中的Body將數據傳入,沒有測試過數據量)

  • 通過消息中間鍵,來作為數據傳遞和消息傳遞的媒介,通過隊列的方式來傳遞數據,實現異步通信,并且通過中間鍵作為中介有管理系統,安全性高,有對應的管理界面調試比較簡單,能夠查看推送的數據是什么;使用隊列來作為日志信息的輸入和輸出;

    • 最終確定的方案:選用方案二
      使用消息中間鍵的好處:

    • 數據不容易丟

    • 能夠實現異步通信

      異步方式指消息發送方在發送消息時不必知道接收方的狀態,更無需等待接收方的回復,而接收方在收到消息時也不必知道發送方的目前狀態,更無需進行同步的消息處理,它們之間的連接完全是松耦合的,通信是非阻塞的,這種異步通信方式是由消息中間件中的消息隊列及其服務機制保障的

    • 能夠消峰(通過隊列的形式,防止一直訪問服務)

      用消息隊列來緩沖瞬時流量,把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去

    3.技術選型

    Java端使用springboot2.0框架來搭建服務,使用MyBattisPlus操作數據庫

    消息中間鍵:本文針對的數據量不算特別大,也不需要很高的吞吐量,所以選用穩定且功能完備,spring集成的ActiveMQ

    python端使用stomp消息傳輸框架來對ActiveMq的消息隊列進行監督

    4. 代碼實現

    SpringBoot2.0集成ActiveMq

    添加pom文件

    <!--ActiveMq--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>1.5.0.RELEASE</version> </dependency> <!--消息隊列連接池--> <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.0</version> </dependency>

    構建一個生產者服務

    java和ActiveMq相關的配置文件

    • 通過yml文件配置
    server:port: 777 #生產者,微服務端口號 #ActiveMq spring:activemq:broker-url: tcp://192.168.1.151:61616 #mq的位置(61616是后臺的位置)user: adminpassword: adminjms:pub-sub-domain: false #false為Queue true為topic,默認為false,目前正在探索怎么隊列和topic一起使用 processQueueName: activemq-queue-data-process reciveQueueName: activemq-queue-result trainQueueName: activemq-queue-data-train reciveFigQueueName: activemq-queue-result_fig reciveLogTopic: /topic/Log
    • config配置,注意**@Bean**注解的是對象的初始化函數,后面使用Autoware注入的時候,會自動的調用這個初始化函數,生成對應的對象
    package com.atguigu.activemq.bootMqProduce.bootMqProduce.config;import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.stereotype.Component;import javax.jms.Queue;//只要是boot,就把Component標注在上面 //類似于Spring的applicationContext.xml @Component //讓SpringBoot管理起來 @EnableJms //開啟Jms配置 public class ConfigBean {//從yml中讀取對應鍵值的配置注入@Value("${myqueue}")private String myQueue;//配置了一個ActiveMQ的隊列@Bean //配置Bean 有點兒類似于之前Spring的配置<bean id=" " class=" ">public Queue queue(){return new ActiveMQQueue(myQueue);}}
    • 生產消息
    package com.atguigu.activemq.bootMqProduce.bootMqProduce.produce;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;import javax.jms.Queue; import java.util.UUID;//這個就相當于impl@Component //只要是boot都要標注 public class QueueProduce {@Autowired //這是一個通用的jms(java的消息標準)的類,里面定義了java消息的生產消費流程標準private JmsMessagingTemplate jmsMessagingTemplate;@Autowired //之前在配置里面定義了一個Bean,那么就可以通過@Autowired自動注入進來,是需要配置的private Queue queue;public void produceMsg(){jmsMessagingTemplate.convertAndSend(queue, "*****"+ UUID.randomUUID().toString().substring(0,6));//自動轉換后寫入}@Scheduled(fixedDelay = 3000) //定時發送,每隔3000ms發送一次public void produceMsgScheduled(){jmsMessagingTemplate.convertAndSend(queue, "*****"+ UUID.randomUUID().toString().substring(0,6));//自動轉換后寫入System.out.println("***Msg發送成功***");} }
    • 主函數,啟動服務
    package com.atguigu.activemq.bootMqProduce.bootMqProduce;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication @EnableScheduling //定時發送需要打開這個開關 public class MainApp_produce {public static void main(String[] args) {SpringApplication.run(MainApp_produce.class,args);} }

    構建一個消費者服務

    • yml配置和生產者一樣

    • config配置(無,只需要指定需要監聽得隊列即可)

    • 注冊消費者監聽消息隊列

      package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;import javax.jms.JMSException; import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component public class Queue_Consumer {//監聽注冊,指定需要監聽得隊列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消費者收到消息***"+textMessage);} }

      注意在這個過程中,python通過stomp協議在進行隊列消息得傳輸過程中使用的二進制流數進行傳輸的,所以,在java端接收python返回的數據的時候,需要將二進制流先轉換為字符串,所以此處不能使用textmessage來定義接收的消息,而是使用BytesMessage來定義接收到的消息

      package com.atguigu.activemq.bootMqConsumer.consumer;import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;import javax.jms.JMSException; import javax.jms.TextMessage;@Component //只要是boot,要么是service,要么是Component public class Queue_Consumer {//監聽注冊,指定需要監聽得隊列@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws JMSException{System.out.println("***消費者收到消息***"+textMessage);} }

    python端往隊列中發送消息和監聽隊列中的消息

    使用stomp消息傳輸協議

    在進行消息消費的時候,采用兩種模式

    • 訓練模式,只消費對應的隊列中最新的消息

    • 推斷模式,將隊列中的數據來一條處理一條

    # -*- coding: utf-8 -*- """ Created on Thu Jul 19 09:54:08 2018@author: lihc """# -*-coding:utf-8-*- import stomp import datetime import json from FINCH import * import joblib import os import tracebackqueue_receive_train = 'activemq-queue-data-train' queue_receive_process = 'activemq-queue-data-process' topic_log = '/topic/Log' listener_name = 'SampleListener' queue_send_data = 'activemq-queue-result' queue_process_result = 'activemq-process-result' queue_send_fig = 'activemq-queue-result_fig_bak' post = 61613 url = '192.168.1.151'#消息回調函數,有消息傳入的時候調用on_message函數 class SampleListener(object):def on_message(self, headers, message):data_new = json.loads(message)# # print('headers: %s' % headers)print(">>>>>>>>>>>>>>>>>>>>>>>>>>>listener1")print('message: %s' % data_new)class HaddleListener(object):def __init__(self):self.flag = Falseself.data = {}self.header = ''#將不同signal_type的最新數據存入data字典中def on_message(self, headers, message):# return messageself.flag = Truedata = json.loads(message)signal_type = ''.join(list(data.keys()))self.data[signal_type] = dataself.header = headersprint(data)class ActiveMqManager(object):def __init__(self, url, post):self.url = urlself.port = post#建立連接self.con = stomp.Connection10([(self.url, self.port)])self.con.connect(username='admin', passcode='admin', wait=True)# 推送到隊列queuedef send_to_queue(self, msg, queue_name):con = stomp.Connection10([(self.url, self.port)])if not isinstance(msg, bytes):msg_json = json.dumps(msg)# 二進制流直接傳輸?else:msg_json = msgcon.connect(username='admin', passcode='admin', wait=True)con.send(destination=queue_name, body=msg_json, content_type='text/plain')con.disconnect()# 推送到主題def send_to_topic(self, msg, topic_name):con = stomp.Connection10([(self.url, self.port)])con.connect()con.send(topic_name, msg)con.disconnect()##從隊列接收消息def receive_from_queue(self, queue_receive, listener_name, listener):con = stomp.Connection10([(self.url, self.port)])con.set_listener(listener_name, listener)con.connect(username='admin', passcode='admin', wait=True)con.subscribe(queue_receive, headers={"listener_name": listener_name})# con.subscribe(queue_receive, headers={"listener_name": listener_name}, ack='client')##從主題接收消息def receive_from_topic(self, topic_name):con = stomp.Connection10([(self.url, self.port)])con.set_listener(listener_name, SampleListener())con.connect()con.subscribe(topic_name)while 1:self.send_to_topic('topic')time.sleep(3) # secscon.disconnect()if __name__ == '__main__':activeMqManager = ActiveMqManager(url, post)train_data_listener = HaddleListener()process_data_listener = HaddleListener()# 只要主線程不結束,這個監聽就一直都在,也不能 disconnecion# 注冊兩個監聽器,分別監聽queue_receive_train和queue_receive_process隊列activeMqManager.receive_from_queue(queue_receive_train, listener_name="TrainListener", listener=train_data_listener)activeMqManager.receive_from_queue(queue_receive_process, listener_name="ProcessListener", listener=process_data_listener)print("主線程阻塞")while True:##對于訓練數據的隊列,我們需要使用的是隊列中最新的一條數據,所以通過這種機制訓練最新的一條數據""">>>>>>>>>>>>>>>>>>通過這種控制機制,讓每次程序啟動的時候只訓練最新的數據<<<<<<<<<<<<<<<<<<<<<<"""try:if train_data_listener.flag == True:time.sleep(8) # secs# 訓練 (只處理最新的數據)if(train_data_listener.flag == True and train_data_listener.data != {}):train_data_listener.flag = False# data = train_data_listener.data# print('message: %s' % train_data_listener.data)print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ">>>>>>>>>>>>>>>>>>>>>>處理")for key, value in train_data_listener.data.items():# 將不同的signal_type的最新的數據保存下來signal_type = ''.join(list(value.keys()))# 處理消息dataProcessing = DataProcessing(type=signal_type, train_data_dic=value)# dataProcessing = DataProcessing(type=signal_type, train_data_dic=data, model="FINCH")result_map = dataProcessing.structSignalData()#python數據處理的結果回傳給java的業務端activeMqManager.send_to_queue(result_map, queue_send_data)print("發送成功")# 通過話題來傳輸日志activeMqManager.send_to_topic("發送成功", topic_log) # 消費完了之后,將data的數據給清空train_data_listener.data = {}# 推斷:來一條處理一條# if ( process_data_listener.data is not None):if (process_data_listener.flag == True and process_data_listener.data != {}):process_data_listener.flag = False# data = process_data_listener.dataprint(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ">>>>>>>>>>>>>>>>>>>>>>測試")for key, value in process_data_listener.data.items():# 通過訂閱話題來實時傳輸日志activeMqManager.send_to_topic(">>>>>>>>>>>>>>>>>>>>>>測試", topic_log)signal_type = ''.join(list(value.keys()))if signal_type:result_map = dataProcessing.process(value)activeMqManager.send_to_topic("識別完成", topic_log)activeMqManager.send_to_queue(result_map, queue_process_result)print("識別結果發送成功")activeMqManager.send_to_topic("識別結果發送成功", topic_log)# 消費完了之后清空process_data_listener.data = {} except (Exception) as error:print(traceback.print_exc())print("失敗代碼:"+ str(error))

    4.傳輸的消息的格式

    • java生產者生產消息的格式如下:
    {"LD": {"LD":["PL":300,"JD":30,"WD":30]},"SX": {"SX":["PL":345,"JD":80,"WD":65]} }

    數據以JSON字符串的形式進行傳輸

    • Java端接收數據并解析數據的方式

      消費者的代碼

      接收的數據格式為BytesMessage

      @JmsListener(destination = "activemq-queue-result")@Transactionalpublic void recieve(BytesMessage bytesMessage) throws JMSException {byte[] bys = null;//解析二進制數據為字符串數據bys = new byte[(int) bytesMessage.getBodyLength()];bytesMessage.readBytes(bys);Gson gson = new Gson();//將數據轉換為指定格式的類,Map.class或者某個結構體Map map = gson.fromJson(new String(bys),Map.class);System.out.println("***消費者收到消息***"+map.toString());System.out.println("***消費者處理消息結果***------>>" + result);}

      生產者業務代碼

      業務實現,通過yaml的配置,來找到需要加載的表名和字段,根據約定的數據格式,轉換為json發送給MQ消息隊列

      工具:Yaml2Gson,用來將yaml文件轉換為gson格式()、Gson

      public void send2Mq(String queryTable) {Map featureMap = new HashMap();Map configMap = (Map) Yaml2Gson.yamlToMap(zcYml).get(queryTable); // System.out.println(configMap.keySet().toString());featureMap.put("config",configMap );StringBuilder conditionBuilder = new StringBuilder();String features = configMap.keySet().toString();StringBuilder columnsBuilder = new StringBuilder();//拼湊出各個列的查詢//查詢的特征項為空的,依據其是否為連續值為其設定初始值None or 0for(String feature: features.substring(1,features.length()-1).split(", ")){Map featsMap = (Map)configMap.get(feature);if((Integer)featsMap.get("isContinuous") == 0) columnsBuilder.append("nvl(to_char("+feature+"),'None') "+feature);else columnsBuilder.append("nvl("+feature+",0) "+feature);conditionBuilder.append(feature+" is not null or ");columnsBuilder.append(", ");}String columns = columnsBuilder.substring(0,columnsBuilder.length()-2);String condition = conditionBuilder.substring(0,conditionBuilder.length()-3);List mess = combineQueryMapper.getFeatures(columns,queryTable,condition);//featureMap.put("data",mess);Map resMap = new HashMap();resMap.put(queryTable,featureMap);//數據格式如下produceQueue.produceTrainMsg(new Gson().toJson(resMap));}

    5.python解析數據的方式

    如果是二進制的數據流就直接傳輸,如果是其他形式的數據,通過json.dumps將數據轉為json字符串進行傳輸,并在java中進行解析

    if not isinstance(msg, bytes):msg_json = json.dumps(msg)# 二進制流直接傳輸? else:msg_json = msg con.connect(username='admin', passcode='admin', wait=True) ##通過這個函數發送數據的時候,是自動會轉換為二進制數據流的(轉換為二進制數據流的json格式的字符串) con.send(destination=queue_name, body=msg_json, content_type='text/plain')

    6.總結

    在不同架構的代碼之間傳輸傳遞不同結構的結構體的數據,使用JSON字符串能夠最大程度的保留數據的原本形式。配合使用消息中間鍵通過數據流就能夠很好的將兩種不同的語言進行組合使用,發揮他們各自的優勢,并且規定好特定的數據格式之后,還能夠很容易的將兩塊的功能進行松耦合并且達到不同模塊復用的目的。同時在使用的過程中一定要注意處理好數據的類型和數據為空的情況(根據實際的數據處理業務來將為空的數據填寫入合適的值),無論是python還是java,因為null類型一般都是使用object這種大的類來向下兼容的,傳遞過來的數字都被轉換為object類型的數據。最后第一次完全原創的寫一篇自己的開發總結,希望大家能夠留言指正以及私信討論!

    總結

    以上是生活随笔為你收集整理的java与python之间的混合开发的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。