Hoje, os dados estão crescendo e se acumulando mais rápido do que antes. Atualmente, cerca de 90% dos dados gerados em nosso mundo foram gerados nos últimos dois anos. Devido a este crescimento na taxa, as plataformas big data eles tiveram que adotar soluções radicais para poder manter volumes tão grandes de dados.
como obter o pivô de energia
Uma das fontes de dados mais importantes hoje são as mídias sociais. Deixe-me demonstrar um exemplo da vida real: gerenciamento, análise e extração de informações de dados de mídia social em tempo real, usando uma das soluções ecológicas em big data os mais importantes por aí - Apache Spark e Python.
Neste artigo, mostrarei como construir um aplicativo simples que lê feeds online do Twitter usando Python e, em seguida, processa tweets usando Apache Spark Streaming para identificar as hashtags e, por fim, retornar as hashtags de tendência mais importantes e renderizar esses dados no painel em tempo real.
Para obter tweets do Twitter, você deve se registrar em TwitterApps Ao clicar em 'Crie um novo aplicativo' e após preencher o formulário abaixo, clique em 'Crie seu aplicativo Twitter'.
Em segundo lugar, vá para o aplicativo recém-criado e abra a janela 'Identificadores e chaves de acesso'. Em seguida, clique em 'Gerar meu identificador de acesso'.
Seus novos IDs de login serão exibidos conforme mostrado abaixo.
E agora você está pronto para a próxima etapa.
Nesta etapa, vou mostrar como construir um cliente simples que irá buscar tweets da API do Twitter usando Python e, em seguida, passá-los para a instância Spark Streaming . Deve ser fácil de seguir para qualquer desenvolvedor python profissional.
Primeiro, vamos criar um arquivo chamado twitter_app.py
e então adicionaremos o código conforme visto abaixo.
técnicas de ajuste de desempenho no servidor sql
Importe as bibliotecas que vamos usar conforme mostrado abaixo:
import socket import sys import requests import requests_oauthlib import json
E adicione as variáveis que serão usadas no OAuth para se conectar ao Twitter, conforme mostrado abaixo:
# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Agora, vamos criar uma nova função chamada get_tweets
que chamará a URL da API do Twitter e retornará a resposta para uma sequência de tweets.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Em seguida, você cria uma função que obtém a resposta da visualização acima e extrai o texto dos tweets do objeto JSON dos tweets completos. Depois disso, envie cada tweet para a instância Spark Streaming (será discutido mais tarde) em uma conexão TCP.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Agora faremos a parte principal. Isso fará com que o aplicativo hospede as conexões tomada , com o qual mais tarde se conectará Faísca . Vamos definir o IP aqui como localhost
uma vez que tudo será executado na mesma máquina e na porta 9009
. Então, vamos chamar o método get_tweets
, que fizemos acima, para obter os tweets do Twitter e passar sua resposta com a conexão tomada a send_tweets_to_spark
para enviar os tweets para o Spark.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Vamos construir nosso aplicativo Spark Streaming , que irá processar os tweets recebidos em tempo real, extrair hashtags deles e calcular quantas hashtags foram mencionadas.
Primeiro, temos que criar uma instância Contexto Spark sc
, então criamos Contexto de streaming ssc
de sc
com um intervalo de dois segundos que fará a transformação em todas as transmissões recebidas a cada dois segundos. Observe que definimos o nível de log para ERROR
para ser capaz de desativar a maioria dos logs que você escreve Faísca .
Definimos um ponto de verificação aqui para permitir uma verificação RDD periódica; isso é obrigatório para ser usado em nosso aplicativo, visto que usaremos transformações de combate a incêndios com monitoração de estado (será discutido posteriormente na mesma seção).
Em seguida, definimos nosso DStream dataStream principal, que conectará o servidor tomada que criamos anteriormente na porta 9009
e ele vai ler os tweets dessa porta. Cada registro no DStream será um tweet.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)
Agora, vamos definir nossa lógica de transformação. Primeiro, vamos quebrar todos os tweets em palavras e colocá-los em palavras RDD. Em seguida, filtramos apenas hashtags de todas as palavras e as plotamos ao lado de (hashtag, 1)
e os colocamos em hashtags RDD.
Em seguida, devemos calcular quantas vezes a hashtag foi mencionada. Podemos fazer isso usando a função reduceByKey
. Esta função calculará quantas vezes a hashtag foi mencionada por cada grupo, ou seja, zera a conta em cada grupo.
No nosso caso, precisamos calcular as contagens em todos os grupos, então usaremos outra função chamada updateStateByKey
uma vez que esta função permite manter o status RDD enquanto o atualiza com novos dados. Este formulário é denominado Stateful Transformation
.
Observe que para usar updateStateByKey
, você deve configurar um ponto de verificação e o que foi feito na etapa anterior.
# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()
updateStateByKey
recebe uma função como parâmetro denominado função update
. Isso é executado em cada item no RDD e executa a lógica desejada.
qual é uma boa prática de design de aplicativo?
Em nosso caso, criamos uma função de atualização chamada aggregate_tags_count
que irá somar todos os new_values
(novos valores) para cada hashtag e adicioná-los ao total_sum
(soma total), que é a soma de todos os grupos e salva os dados em RDD tags_totals
.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Em seguida, fazemos o processamento RDD tags_totals
em cada grupo para ser capaz de convertê-lo em uma mesa temporária usando Contexto do Spark SQL e depois disso, faça uma declaração para ser capaz de pegar as dez principais hashtags com suas contas e colocá-las no data frame hashtag_counts_df
.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
A última etapa em nosso aplicativo Spark é enviar o quadro de dados hashtag_counts_df
para o aplicativo de painel. Assim, converteremos o quadro de dados em duas matrizes, uma para as hashtags e outra para suas contas. Em seguida, enviaremos para o aplicativo de painel por meio da API REST.
def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Finalmente, aqui está uma amostra da saída de Spark Streaming durante a execução e impressão de hashtag_counts_df
. Você notará que a saída é impressa exatamente a cada dois segundos para cada intervalo de grupo.
Agora, vamos criar um aplicativo de painel simples que será atualizado em tempo real pelo Spark. Vamos construí-lo usando Python, Flask e Charts.js .
Primeiro, vamos criar um projeto Python com a estrutura vista abaixo, baixar e adicionar o arquivo Chart.js no diretório estático.
download gratuito de software de hacker de cartão de crédito
Então, no arquivo app.py
, vamos criar uma função chamada update_data
, que será chamada pelo Spark através da URL http://localhost:5001/updateData
para poder atualizar rótulos globais e matrizes de valor.
Da mesma forma, a função refresh_graph_data
ele é criado para ser chamado pela solicitação AJAX para retornar os novos rótulos atualizados e matrizes de valor como JSON. A função get_chart_page
sairá da página chart.html
quando chamado.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
Agora vamos criar um gráfico simples no arquivo chart.html
ser capaz de mostrar os dados da hashtag e atualizá-los em tempo real. Conforme definido abaixo, precisamos importar as bibliotecas JavaScript, Chart.js
e jquery.min.js
.
No corpo da tag, precisamos criar uma tela e dar a ela um id para poder fazer referência a ela enquanto exibimos o gráfico ao usar o JavaScript na próxima etapa.
Top Trending Twitter Hashtags Top Trending Twitter Hashtags
Agora vamos criar o gráfico usando o código JavaScript abaixo. Primeiro, pegamos o elemento canvas e, em seguida, criamos um novo objeto gráfico e passamos para ele o elemento canvas e definimos o objeto de dados como visto abaixo.
Observe que os rótulos de dados são unidos a rótulos e variáveis de valor que são retornados ao sair da página, ao chamar o get_chart_page
no arquivo app.py
.
A última parte é a função configurada para fazer uma solicitação Ajax a cada segundo e chamar a URL /refreshData
, que executará refresh_graph_data
em app.py
e ele retornará os novos dados atualizados e, em seguida, atualizará o gráfico que os novos dados deixam.
var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);
Vamos executar os três aplicativos na ordem abaixo: 1. Twitter App Client. 2. Spark App. 3. Dashboard Web App.
Então você pode acessar o painel de controle em tempo real, procurando pelo URL
Agora você pode ver seu gráfico sendo atualizado abaixo:
Aprendemos a fazer análises de dados simples em dados em tempo real usando Spark Streaming e integrando-os diretamente com um painel de controle simples, usando um serviço da web RESTful. A partir deste exemplo, podemos ver o quão poderoso é o Spark, pois captura um fluxo de dados massivo, o transforma e extrai informações valiosas que podem ser facilmente usadas para tomar decisões em um curto espaço de tempo. Existem muitos casos de uso úteis, que podem ser implementados e podem servir a diferentes setores, como notícias ou marketing.
Exemplo da indústria de notícias
Podemos rastrear as hashtags mencionadas com mais frequência para descobrir sobre quais tópicos as pessoas estão falando nas redes sociais. Também podemos rastrear hashtags específicas e seus tweets para descobrir o que as pessoas estão dizendo sobre tópicos ou eventos específicos no mundo.
Exemplo de Marketing
Podemos coletar a transmissão de tweets e, fazendo uma análise de opinião, categorizá-los e determinar os interesses das pessoas a fim de trazer ofertas relacionadas aos seus interesses.
Além disso, existem muitos casos de uso que podem ser aplicados especificamente para análises. big data e podem servir a muitos setores. Para mais casos de uso do Apache Spark em geral, sugiro que você dê uma olhada em um de nossos posts anteriores .
princípios do design nas artes
Eu recomendo que você leia mais sobre Spark Streaming Aqui para aprender mais sobre seus recursos e fazer uma transformação de dados mais avançada para obter mais informações em tempo real ao usá-lo.